[data][3/n] DataSourceV2: ParquetDatasourceV2 + read_parquet V2 dispatch#63113
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces the DataSourceV2 pipeline for Parquet files, establishing a logical chain from ListFiles to ReadFiles. It adds the ParquetDatasourceV2 class, integrates a configuration flag in DataContext to enable this new path, and provides extensive testing for schema inference and operator logic. Feedback is provided to optimize performance by eliminating unnecessary list conversions when indexing or iterating over file paths.
b6c3da1 to
97c4779
Compare
97c4779 to
1909034
Compare
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
There are 2 total unresolved issues (including 1 from previous review).
Reviewed by Cursor Bugbot for commit 1909034. Configure here.
| ray_remote_args=ray_remote_args, | ||
| concurrency=concurrency, | ||
| partition_filter=partition_filter, | ||
| ) |
There was a problem hiding this comment.
V2 path silently drops include_row_hash parameter
Medium Severity
The V2 dispatch block raises NotImplementedError for unsupported parameters (_block_udf, tensor_column_schema, dataset_kwargs, columns) but silently ignores include_row_hash. When a user passes include_row_hash=True with use_datasource_v2=True, no row_hash column is produced and no error is raised, leading to silent data loss. The V1 path correctly passes include_row_hash to ParquetDatasource.
Reviewed by Cursor Bugbot for commit 1909034. Configure here.
There was a problem hiding this comment.
Resolved this in a follow up PR
2598656 to
92aef3b
Compare
iamjustinhsu
left a comment
There was a problem hiding this comment.
some questions + comments, but overall nice. didn't look at tests
| File-based partitioned datasources override this to populate | ||
| path-discovered field names (e.g. hive ``Partitioning`` ships | ||
| with ``field_names=None`` and needs to read a sample path to | ||
| learn the keys). Keeping the discovery on a dedicated method |
There was a problem hiding this comment.
I'm not really sure I understand the "Keeping the discovery ..." last bit of the paragraph. Can you elaborate?
There was a problem hiding this comment.
Basically after grabbing the FileManifests from the sample, this computes the Hive Partitioning scheme from that sample. Then this Partitioning is propagated to the scanner class.
I can make this clearer
| # Parquet footer reads against high-latency object stores | ||
| # (S3, GCS) are ~50-100 ms each. Reading the sample's footers in | ||
| # parallel keeps driver-side schema inference bounded by the | ||
| # slowest single read rather than the sum. Order is preserved |
There was a problem hiding this comment.
Why does order matter in schema unification?
There was a problem hiding this comment.
Two reasons (neither is type promotion, my original comment was wrong, fixed in the latest push):
Field order anchors on the first schema. Verified on pyarrow 23.0.1:
s1 = pa.schema([('a', pa.int64()), ('b', pa.string())])
s2 = pa.schema([('c', pa.float64()), ('a', pa.int64())])
pa.unify_schemas([s1, s2]).names # ['a', 'b', 'c']
pa.unify_schemas([s2, s1]).names # ['c', 'a', 'b']
Without preserved input order, the unified schema's column order is non-deterministic.
sample_paths[0] drives partition discovery below, PathPartitionParser runs against the first path to extract hive keys, so the schema and path lists need to stay aligned.
Type promotion itself is order-independent under permissive mode (null + int → int regardless of position)
There was a problem hiding this comment.
oh are u saying that the FileManifests underlying block schema is ordered?
| partition_pa_schema = _partition_field_types_to_pa_schema( | ||
| list(partition_kv.keys()), resolved_partitioning.field_types or {} | ||
| ) | ||
| for field_name in partition_kv.keys(): |
There was a problem hiding this comment.
no unify_schemas_with_validation?
There was a problem hiding this comment.
It breaks test_read_file_with_partition_values for instance.
The loop only adds partition fields that are missing from the file schema, so on a name collision the file's type silently wins. unify_schemas_with_validation instead tries to merge both types, and PyArrow has no promotion path between unrelated primitives (e.g. int64 <-> string), so any collision where the file column's type doesn't match the partition's declared/default type raises ArrowTypeError and the read crashes.
There was a problem hiding this comment.
wait should that be an error tho? like if the column's types don't match? Or are u saying that the inference of column types might be wrong? not sure what test_read_file_with_partition_values is supposed to test.
There was a problem hiding this comment.
Yea the inference could be incorrect.
| min_bucket_size=min_bucket_size, | ||
| max_bucket_size=max_bucket_size, |
There was a problem hiding this comment.
The size here is referring to size of the file, not length of file name, right?
There was a problem hiding this comment.
You mean the get_size_estimator()?
That's just the encoding ratio * uncompressed file sizes for parquet. It's not pertaining to the length of the file name.
|
|
||
| read_op = ReadFiles( | ||
| input_op=list_files_op, | ||
| datasource=datasource, |
There was a problem hiding this comment.
can you remind me why datasource needs to be passed in for ReadFiles? I thought we just needed scanner?
There was a problem hiding this comment.
Oh lol it's actually just being used for retrieving the name of the datasource for the logical operator naming
There was a problem hiding this comment.
I can change ReadFiles to just take in name instead.
| f"no files found under {datasource.paths!r}. Check the path and any " | ||
| "configured `partition_filter` or `file_extensions` filters." | ||
| ) | ||
| schema = datasource.infer_schema(sample) |
There was a problem hiding this comment.
Hmm with 1000s of files, will this be slow? Should delay schema inference until we start streaming execution?
There was a problem hiding this comment.
Discussed offline. The sample size is ~16 files. Delaying schema inference until execution won't be possible without rearchitecting the logical plan to have its own IR
Wires the V2 Parquet read path end-to-end. Behind the ``DataContext.use_datasource_v2`` opt-in flag (default ``False``); ``read_parquet`` continues to take the V1 path until pr-Z flips the default. - datasource_v2/datasource_v2.py: ``resolve_partitioning`` base method (default ``None``) so subclasses can derive partitioning field names from a sample without mutating instance state. - datasource_v2/parquet_datasource_v2.py (new): ParquetDatasourceV2 with file indexer, scanner factory, schema inference (parallelized footer reads + thread pool), ``resolve_partitioning`` override populating field_names from path discovery, and user-supplied ``schema`` overrides. - read_api.py: ``_read_datasource_v2`` driver entry that wires the ListFiles → ReadFiles op pair and threads through the file pruner list. ``read_parquet`` dispatches to V2 when ``use_datasource_v2=True``. Raises ``NotImplementedError`` for ``_block_udf``, ``tensor_column_schema``, ``dataset_kwargs``, and ``columns=`` (each gets its own follow-up — pr-C, pr-G). - context.py: ``use_datasource_v2: bool`` flag, default ``False``. - BUILD.bazel: register the new V2 unit-test packages. - doc/source/data/performance-tips.rst: brief mention. - tests: parquet datasource unit tests, ListFiles-op fixture tests, read_files logical-op tests, V2 read_parquet end-to-end tests. Signed-off-by: Goutam <goutam@anyscale.com> Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
92aef3b to
93f7970
Compare
| """ | ||
| ... | ||
|
|
||
| def resolve_partitioning(self, sample: InputSplit) -> Optional[Any]: |
There was a problem hiding this comment.
Can the type annotation be Optional[Partitioning]
There was a problem hiding this comment.
Partitioning seems to be catering only to file based datasources. So I can't use that across the board
| partition_pa_schema = _partition_field_types_to_pa_schema( | ||
| list(partition_kv.keys()), resolved_partitioning.field_types or {} | ||
| ) | ||
| for field_name in partition_kv.keys(): |
There was a problem hiding this comment.
wait should that be an error tho? like if the column's types don't match? Or are u saying that the inference of column types might be wrong? not sure what test_read_file_with_partition_values is supposed to test.
| # Parquet footer reads against high-latency object stores | ||
| # (S3, GCS) are ~50-100 ms each. Reading the sample's footers in | ||
| # parallel keeps driver-side schema inference bounded by the | ||
| # slowest single read rather than the sum. Order is preserved |
There was a problem hiding this comment.
oh are u saying that the FileManifests underlying block schema is ordered?
…tch (ray-project#63113) Wires the V2 Parquet read path end-to-end. Behind the ``DataContext.use_datasource_v2`` opt-in flag (default ``False``); ``read_parquet`` continues to take the V1 path until pr-Z flips the default. - datasource_v2/datasource_v2.py: ``resolve_partitioning`` base method (default ``None``) so subclasses can derive partitioning field names from a sample without mutating instance state. - datasource_v2/parquet_datasource_v2.py (new): ParquetDatasourceV2 with file indexer, scanner factory, schema inference (parallelized footer reads + thread pool), ``resolve_partitioning`` override populating field_names from path discovery, and user-supplied ``schema`` overrides. - read_api.py: ``_read_datasource_v2`` driver entry that wires the ListFiles → ReadFiles op pair and threads through the file pruner list. ``read_parquet`` dispatches to V2 when ``use_datasource_v2=True``. Raises ``NotImplementedError`` for ``_block_udf``, ``tensor_column_schema``, ``dataset_kwargs``, and ``columns=`` (each gets its own follow-up — pr-C, pr-G). - context.py: ``use_datasource_v2: bool`` flag, default ``False``. - BUILD.bazel: register the new V2 unit-test packages. - doc/source/data/performance-tips.rst: brief mention. - tests: parquet datasource unit tests, ListFiles-op fixture tests, read_files logical-op tests, V2 read_parquet end-to-end tests. Signed-off-by: Goutam <goutam@anyscale.com> Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Goutam <goutam@anyscale.com> Co-authored-by: Goutam V. <> Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>


Wires the V2 Parquet read path end-to-end. Behind the
DataContext.use_datasource_v2opt-in flag (defaultFalse);read_parquetcontinues to take the V1 path until pr-Z flips thedefault.
resolve_partitioningbasemethod (default
None) so subclasses can derive partitioningfield names from a sample without mutating instance state.
with file indexer, scanner factory, schema inference (parallelized
footer reads + thread pool),
resolve_partitioningoverridepopulating field_names from path discovery, and user-supplied
schemaoverrides._read_datasource_v2driver entry that wires theListFiles → ReadFiles op pair and threads through the file pruner
list.
read_parquetdispatches to V2 whenuse_datasource_v2=True. RaisesNotImplementedErrorfor_block_udf,tensor_column_schema,dataset_kwargs,and
columns=(each gets its own follow-up — pr-C, pr-G).use_datasource_v2: boolflag, defaultFalse.read_files logical-op tests, V2 read_parquet end-to-end tests.
Signed-off-by: Goutam goutam@anyscale.com
Co-Authored-By: Claude Opus 4.7 (1M context) noreply@anthropic.com