Skip to content

[data][3/n] DataSourceV2: ParquetDatasourceV2 + read_parquet V2 dispatch#63113

Merged
goutamvenkat-anyscale merged 1 commit into
ray-project:masterfrom
goutamvenkat-anyscale:pr63113
May 6, 2026
Merged

[data][3/n] DataSourceV2: ParquetDatasourceV2 + read_parquet V2 dispatch#63113
goutamvenkat-anyscale merged 1 commit into
ray-project:masterfrom
goutamvenkat-anyscale:pr63113

Conversation

@goutamvenkat-anyscale

Copy link
Copy Markdown
Contributor

Wires the V2 Parquet read path end-to-end. Behind the
DataContext.use_datasource_v2 opt-in flag (default False);
read_parquet continues to take the V1 path until pr-Z flips the
default.

  • datasource_v2/datasource_v2.py: resolve_partitioning base
    method (default None) so subclasses can derive partitioning
    field names from a sample without mutating instance state.
  • datasource_v2/parquet_datasource_v2.py (new): ParquetDatasourceV2
    with file indexer, scanner factory, schema inference (parallelized
    footer reads + thread pool), resolve_partitioning override
    populating field_names from path discovery, and user-supplied
    schema overrides.
  • read_api.py: _read_datasource_v2 driver entry that wires the
    ListFiles → ReadFiles op pair and threads through the file pruner
    list. read_parquet dispatches to V2 when
    use_datasource_v2=True. Raises NotImplementedError for
    _block_udf, tensor_column_schema, dataset_kwargs,
    and columns= (each gets its own follow-up — pr-C, pr-G).
  • context.py: use_datasource_v2: bool flag, default False.
  • BUILD.bazel: register the new V2 unit-test packages.
  • doc/source/data/performance-tips.rst: brief mention.
  • tests: parquet datasource unit tests, ListFiles-op fixture tests,
    read_files logical-op tests, V2 read_parquet end-to-end tests.

Signed-off-by: Goutam goutam@anyscale.com
Co-Authored-By: Claude Opus 4.7 (1M context) noreply@anthropic.com

@goutamvenkat-anyscale goutamvenkat-anyscale requested a review from a team as a code owner May 4, 2026 20:46
@goutamvenkat-anyscale goutamvenkat-anyscale changed the title [data] DataSourceV2: ParquetDatasourceV2 + read_parquet V2 dispatch May 4, 2026

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces the DataSourceV2 pipeline for Parquet files, establishing a logical chain from ListFiles to ReadFiles. It adds the ParquetDatasourceV2 class, integrates a configuration flag in DataContext to enable this new path, and provides extensive testing for schema inference and operator logic. Feedback is provided to optimize performance by eliminating unnecessary list conversions when indexing or iterating over file paths.

Comment thread python/ray/data/_internal/datasource_v2/parquet_datasource_v2.py
Comment thread python/ray/data/_internal/datasource_v2/parquet_datasource_v2.py Outdated
Comment thread python/ray/data/read_api.py
Comment thread python/ray/data/read_api.py
@goutamvenkat-anyscale goutamvenkat-anyscale added data Ray Data-related issues go add ONLY when ready to merge, run all tests labels May 4, 2026

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

There are 2 total unresolved issues (including 1 from previous review).

Fix All in Cursor

Reviewed by Cursor Bugbot for commit 1909034. Configure here.

ray_remote_args=ray_remote_args,
concurrency=concurrency,
partition_filter=partition_filter,
)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

V2 path silently drops include_row_hash parameter

Medium Severity

The V2 dispatch block raises NotImplementedError for unsupported parameters (_block_udf, tensor_column_schema, dataset_kwargs, columns) but silently ignores include_row_hash. When a user passes include_row_hash=True with use_datasource_v2=True, no row_hash column is produced and no error is raised, leading to silent data loss. The V1 path correctly passes include_row_hash to ParquetDatasource.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 1909034. Configure here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolved this in a follow up PR

@goutamvenkat-anyscale goutamvenkat-anyscale force-pushed the pr63113 branch 2 times, most recently from 2598656 to 92aef3b Compare May 5, 2026 03:06

@iamjustinhsu iamjustinhsu left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some questions + comments, but overall nice. didn't look at tests

File-based partitioned datasources override this to populate
path-discovered field names (e.g. hive ``Partitioning`` ships
with ``field_names=None`` and needs to read a sample path to
learn the keys). Keeping the discovery on a dedicated method

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not really sure I understand the "Keeping the discovery ..." last bit of the paragraph. Can you elaborate?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically after grabbing the FileManifests from the sample, this computes the Hive Partitioning scheme from that sample. Then this Partitioning is propagated to the scanner class.

I can make this clearer

Comment thread python/ray/data/_internal/datasource_v2/parquet_datasource_v2.py
# Parquet footer reads against high-latency object stores
# (S3, GCS) are ~50-100 ms each. Reading the sample's footers in
# parallel keeps driver-side schema inference bounded by the
# slowest single read rather than the sum. Order is preserved

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does order matter in schema unification?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two reasons (neither is type promotion, my original comment was wrong, fixed in the latest push):

Field order anchors on the first schema. Verified on pyarrow 23.0.1:

s1 = pa.schema([('a', pa.int64()), ('b', pa.string())])
s2 = pa.schema([('c', pa.float64()), ('a', pa.int64())])
pa.unify_schemas([s1, s2]).names  # ['a', 'b', 'c']
pa.unify_schemas([s2, s1]).names  # ['c', 'a', 'b']

Without preserved input order, the unified schema's column order is non-deterministic.

sample_paths[0] drives partition discovery below, PathPartitionParser runs against the first path to extract hive keys, so the schema and path lists need to stay aligned.

Type promotion itself is order-independent under permissive mode (null + int → int regardless of position)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh are u saying that the FileManifests underlying block schema is ordered?

Comment thread python/ray/data/_internal/datasource_v2/parquet_datasource_v2.py Outdated
Comment thread python/ray/data/_internal/datasource_v2/parquet_datasource_v2.py
partition_pa_schema = _partition_field_types_to_pa_schema(
list(partition_kv.keys()), resolved_partitioning.field_types or {}
)
for field_name in partition_kv.keys():

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no unify_schemas_with_validation?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It breaks test_read_file_with_partition_values for instance.

The loop only adds partition fields that are missing from the file schema, so on a name collision the file's type silently wins. unify_schemas_with_validation instead tries to merge both types, and PyArrow has no promotion path between unrelated primitives (e.g. int64 <-> string), so any collision where the file column's type doesn't match the partition's declared/default type raises ArrowTypeError and the read crashes.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait should that be an error tho? like if the column's types don't match? Or are u saying that the inference of column types might be wrong? not sure what test_read_file_with_partition_values is supposed to test.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea the inference could be incorrect.

Comment on lines +498 to +499
min_bucket_size=min_bucket_size,
max_bucket_size=max_bucket_size,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The size here is referring to size of the file, not length of file name, right?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean the get_size_estimator()?

That's just the encoding ratio * uncompressed file sizes for parquet. It's not pertaining to the length of the file name.

Comment thread python/ray/data/read_api.py Outdated

read_op = ReadFiles(
input_op=list_files_op,
datasource=datasource,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you remind me why datasource needs to be passed in for ReadFiles? I thought we just needed scanner?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh lol it's actually just being used for retrieving the name of the datasource for the logical operator naming

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can change ReadFiles to just take in name instead.

f"no files found under {datasource.paths!r}. Check the path and any "
"configured `partition_filter` or `file_extensions` filters."
)
schema = datasource.infer_schema(sample)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm with 1000s of files, will this be slow? Should delay schema inference until we start streaming execution?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline. The sample size is ~16 files. Delaying schema inference until execution won't be possible without rearchitecting the logical plan to have its own IR

Wires the V2 Parquet read path end-to-end. Behind the
``DataContext.use_datasource_v2`` opt-in flag (default ``False``);
``read_parquet`` continues to take the V1 path until pr-Z flips the
default.

- datasource_v2/datasource_v2.py: ``resolve_partitioning`` base
  method (default ``None``) so subclasses can derive partitioning
  field names from a sample without mutating instance state.
- datasource_v2/parquet_datasource_v2.py (new): ParquetDatasourceV2
  with file indexer, scanner factory, schema inference (parallelized
  footer reads + thread pool), ``resolve_partitioning`` override
  populating field_names from path discovery, and user-supplied
  ``schema`` overrides.
- read_api.py: ``_read_datasource_v2`` driver entry that wires the
  ListFiles → ReadFiles op pair and threads through the file pruner
  list. ``read_parquet`` dispatches to V2 when
  ``use_datasource_v2=True``. Raises ``NotImplementedError`` for
  ``_block_udf``, ``tensor_column_schema``, ``dataset_kwargs``,
  and ``columns=`` (each gets its own follow-up — pr-C, pr-G).
- context.py: ``use_datasource_v2: bool`` flag, default ``False``.
- BUILD.bazel: register the new V2 unit-test packages.
- doc/source/data/performance-tips.rst: brief mention.
- tests: parquet datasource unit tests, ListFiles-op fixture tests,
  read_files logical-op tests, V2 read_parquet end-to-end tests.

Signed-off-by: Goutam <goutam@anyscale.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
"""
...

def resolve_partitioning(self, sample: InputSplit) -> Optional[Any]:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the type annotation be Optional[Partitioning]

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partitioning seems to be catering only to file based datasources. So I can't use that across the board

partition_pa_schema = _partition_field_types_to_pa_schema(
list(partition_kv.keys()), resolved_partitioning.field_types or {}
)
for field_name in partition_kv.keys():

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait should that be an error tho? like if the column's types don't match? Or are u saying that the inference of column types might be wrong? not sure what test_read_file_with_partition_values is supposed to test.

# Parquet footer reads against high-latency object stores
# (S3, GCS) are ~50-100 ms each. Reading the sample's footers in
# parallel keeps driver-side schema inference bounded by the
# slowest single read rather than the sum. Order is preserved

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh are u saying that the FileManifests underlying block schema is ordered?

@goutamvenkat-anyscale goutamvenkat-anyscale merged commit abf0d0a into ray-project:master May 6, 2026
7 checks passed
@goutamvenkat-anyscale goutamvenkat-anyscale deleted the pr63113 branch May 6, 2026 20:22
Lucas61000 pushed a commit to Lucas61000/ray that referenced this pull request May 15, 2026
…tch (ray-project#63113)

Wires the V2 Parquet read path end-to-end. Behind the
``DataContext.use_datasource_v2`` opt-in flag (default ``False``);
``read_parquet`` continues to take the V1 path until pr-Z flips the
default.

- datasource_v2/datasource_v2.py: ``resolve_partitioning`` base
  method (default ``None``) so subclasses can derive partitioning
  field names from a sample without mutating instance state.
- datasource_v2/parquet_datasource_v2.py (new): ParquetDatasourceV2
  with file indexer, scanner factory, schema inference (parallelized
  footer reads + thread pool), ``resolve_partitioning`` override
  populating field_names from path discovery, and user-supplied
  ``schema`` overrides.
- read_api.py: ``_read_datasource_v2`` driver entry that wires the
  ListFiles → ReadFiles op pair and threads through the file pruner
  list. ``read_parquet`` dispatches to V2 when
  ``use_datasource_v2=True``. Raises ``NotImplementedError`` for
  ``_block_udf``, ``tensor_column_schema``, ``dataset_kwargs``,
  and ``columns=`` (each gets its own follow-up — pr-C, pr-G).
- context.py: ``use_datasource_v2: bool`` flag, default ``False``.
- BUILD.bazel: register the new V2 unit-test packages.
- doc/source/data/performance-tips.rst: brief mention.
- tests: parquet datasource unit tests, ListFiles-op fixture tests,
  read_files logical-op tests, V2 read_parquet end-to-end tests.

Signed-off-by: Goutam <goutam@anyscale.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

Signed-off-by: Goutam <goutam@anyscale.com>
Co-authored-by: Goutam V. <>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

2 participants