[Data] Fix read_parquet() ArrowNotImplementedError for nested column types exceeding ~2GB row group#61824
Conversation
…types When reading Parquet files with nested column types (list<struct>, map, struct with nested fields) whose binary/string data in a single row group exceeds PyArrow's internal ~2GB chunking threshold, fragment.to_batches() raises ArrowNotImplementedError in the Arrow Dataset Scanner's WrapIntoListArray code path. Add a fallback in _read_batches_from() that catches ArrowNotImplementedError with the specific "Nested data conversions not implemented for chunked array outputs" message and retries using pyarrow.parquet.ParquetFile.read_row_group() per row group followed by combine_chunks(), which uses a different (non-scanner) code path that avoids the chunked-array reconstruction issue. Closes ray-project#61675 See: apache/arrow#21526
There was a problem hiding this comment.
Code Review
This pull request effectively addresses the ArrowNotImplementedError encountered when reading Parquet files with nested column types exceeding PyArrow's internal chunking threshold. The implemented fallback mechanism, which switches to pyarrow.parquet.ParquetFile.read_row_group() and combine_chunks(), is a robust solution for this known upstream limitation. The inclusion of a UserWarning is a thoughtful addition, guiding users on how to avoid this fallback in the future. The changes are well-contained, correctly re-applying necessary transformations to maintain data consistency, and the lazy import of pyarrow.parquet is a good practice for performance.
…types The original approach using read_row_group() + combine_chunks() hits the same Arrow limitation. This fix uses pq.ParquetFile.iter_batches() which reads smaller row slices within a row group, staying under the ~2GB threshold. A safe batch_size is computed from Parquet metadata. Changes: - Add _get_safe_batch_size_for_nested_types() to compute batch size from row group metadata (uncompressed column sizes) with 50% safety margin - Add _iter_batches_with_nested_fallback() shared helper used by both _generate_tables() and _fetch_parquet_file_info() - Extract _postprocess_table() to eliminate duplicated post-processing - Add regression test with real 2.3GB nested parquet file Fixes ray-project#61675 See also: apache/arrow#21526 (ARROW-5030) Signed-off-by: Goutam <goutam@anyscale.com>
…types The original approach using read_row_group() + combine_chunks() hits the same Arrow limitation. This fix uses pq.ParquetFile.iter_batches() which reads smaller row slices within a row group, staying under the ~2GB threshold. A safe batch_size is computed from Parquet metadata. Predicate pushdown is preserved at the row-group level by using fragment.subset(filter=) to prune non-matching row groups via Parquet statistics before falling back to iter_batches. Changes: - Add _get_safe_batch_size_for_nested_types() to compute batch size from row group metadata (uncompressed column sizes) with 50% safety margin - Add _iter_batches_with_nested_fallback() shared helper used by both _generate_tables() and _fetch_parquet_file_info() - Use fragment.subset(filter=) for row-group-level predicate pushdown in the fallback path (metadata only, no data read) - Extract _postprocess_table() to eliminate duplicated post-processing - Add regression test with real 2.3GB nested parquet file Fixes ray-project#61675 See also: apache/arrow#21526 (ARROW-5030) Signed-off-by: Goutam <goutam@anyscale.com>
_iter_batches_with_nested_fallback was silently dropping the batch_size parameter in the happy path, causing PyArrow to read entire row groups in one batch during sampling instead of respecting the 1024-row cap from PARQUET_ENCODING_RATIO_ESTIMATE_NUM_ROWS. Signed-off-by: Goutam <goutam@anyscale.com>
Replace random.choices() with os.urandom().hex() for ~13x faster string generation. Add 300s timeout for slower CI machines. Signed-off-by: Goutam <goutam@anyscale.com>
bveeramani
left a comment
There was a problem hiding this comment.
This PR adds two divergent paths for reading Parquet data, and I think it might introduce the risk of duplicate rows (?)
What're the alternative approaches we considered? Is there an easy and effective workaround for users? If so, might be easier to just raise an actionable error for users until we have more signal that the complexity is justified
| return safe_batch_size | ||
|
|
||
|
|
||
| def _iter_batches_with_nested_fallback( |
There was a problem hiding this comment.
@goutamvenkat-anyscale is the duplicate row problem still an issue with this implementation? What happens if you yield some batches in the try clause before falling back to the except clause?
There was a problem hiding this comment.
Good callout. I changed it to reading the schema first and if there are nested dtypes and the uncompressed row group size exceeds 2GB then it deviates the reading strategy. That should avoid any duplicate batches
…ng strategy Signed-off-by: Goutam <goutam@anyscale.com>
…ng strategy Signed-off-by: Goutam <goutam@anyscale.com>
…rquet reader Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
…sted fallback path Signed-off-by: Goutam <goutam@anyscale.com>
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a fallback mechanism to handle Arrow's 2GB chunking limitation for nested types in Parquet files (ARROW-5030). It adds detection logic for susceptible schemas and implements a row-level reader using pq.ParquetFile.iter_batches when row groups are too large. Review feedback identifies a potential schema inconsistency for empty batches, a performance regression when using filters with specific column projections, and the need to move an import statement out of a hot loop.
… schema alignment Signed-off-by: Goutam <goutam@anyscale.com>
…in nested fallback path Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Extract _resolve_read_columns() and _iter_batches_fallback() from _iter_batches_with_nested_fallback() to reduce nesting and improve readability. No behavioral changes. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Signed-off-by: Goutam <goutam@anyscale.com>
…size row groups Skip row groups where uncompressed size is zero for the selected columns (e.g. all-null nested data) to avoid dividing by zero when computing bytes_per_row. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
…ow OSError Signed-off-by: Goutam <goutam@anyscale.com>
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Reviewed by Cursor Bugbot for commit 625fd16. Configure here.
| # unified dataset schema to match the normal (scanner) path which | ||
| # handles type promotion and missing-column filling automatically. | ||
| if align_schema is not None: | ||
| table = _align_struct_fields([table], align_schema)[0].cast(align_schema) |
There was a problem hiding this comment.
Fallback path missing column projection before cast
Medium Severity
When columns is None, the fallback path reads all file columns via pf.iter_batches(columns=None) and sets align_schema = schema (the unified dataset schema). However, _align_struct_fields only adds missing columns and aligns struct types — it does not remove extra columns or reorder them. If the file has columns not in the unified schema (schema evolution) or columns in a different order, table.cast(align_schema) will raise a ValueError due to column count or name mismatch. The normal scanner path handles this via its schema argument, but the fallback lacks equivalent column projection before the cast.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit 625fd16. Configure here.
There was a problem hiding this comment.
Doesn't reproduce
Signed-off-by: Goutam <goutam@anyscale.com>
…types exceeding ~2GB row group (ray-project#61824) ## Why are these changes needed? Fixes ray-project#61675 `ray.data.read_parquet()` raises `ArrowNotImplementedError: Nested data conversions not implemented for chunked array outputs` when reading Parquet files with nested column types (`list<struct>`, `map`, `struct` with nested fields) whose binary/string data in a single row group exceeds PyArrow's internal ~2GB chunking threshold. **Root cause**: `fragment.to_batches()` uses the Arrow Dataset Scanner (`WrapIntoListArray` in C++) which cannot reconstruct nested types from internally-chunked binary arrays. This is a known upstream PyArrow limitation. See: apache/arrow#21526 ## What changes were proposed in this pull request? The original approach (`read_row_group()` + `combine_chunks()`) **does not work** — `read_row_group()` reads the entire row group at once and hits the same ~2GB Arrow limitation during the read itself, before `combine_chunks()` is ever called. The fix uses `pq.ParquetFile.iter_batches(batch_size=N)` which reads smaller row slices *within* a row group, keeping each batch's nested column data under the threshold. The reading strategy is chosen **upfront** based on schema and metadata inspection — no try/catch mid-stream — to avoid any risk of duplicating already-yielded batches. ### Key changes: 1. **`_has_susceptible_nested_types(schema)`** — Checks if the schema contains nested column types (list, struct, map, fixed-size list) wrapping variable-length leaves (string, binary, and their large/view variants) that are susceptible to ARROW-5030. Fixed-width leaves (int, float, bool) never trigger chunking. 2. **`_needs_nested_type_fallback(fragment, columns=None)`** — Metadata-only check that returns True if the *requested* columns (or all columns when `columns` is None) have susceptible nested types AND any row group's uncompressed data for those columns exceeds the ~2GB threshold. This avoids unnecessarily triggering the fallback when only flat columns are selected from a file that also contains large nested columns. 3. **`_resolve_leaf_column_indices(metadata, columns)`** — Maps top-level column names to Parquet metadata leaf column indices. Parquet flattens nested types into leaf columns (e.g., `list<struct<key, payload, value>>` becomes 3 leaves), so this mapping is needed to scope uncompressed size calculations to only the requested columns. 4. **`_get_safe_batch_size_for_nested_types(pf, column_indices=None)`** — Computes a safe batch size from Parquet row group metadata (uncompressed column sizes per row) with a 50% safety margin under the 2GB limit. When `column_indices` is provided, only those leaf columns are measured — preventing artificially small batch sizes when large unrequested columns exist in the file. 5. **`_row_group_uncompressed_size(rg_meta, column_indices=None)`** — Sums uncompressed sizes for all or a subset of leaf columns in a row group. Avoids `rg_meta.total_byte_size` which can return compressed sizes for some files (apache/arrow#48138). 6. **`_iter_batches_with_nested_fallback(fragment, ...)`** — Shared helper that checks `_needs_nested_type_fallback()` upfront (scoped to the union of projected + filter columns) and either uses the normal scanner path (with full predicate pushdown) or the fallback `pf.iter_batches()` path. No try/catch mid-stream. 7. **Row-group-level predicate pushdown in fallback** — Uses `fragment.subset(filter=expr)` to prune non-matching row groups via Parquet statistics (metadata only, no data read) before calling `iter_batches(row_groups=...)`. Row-level filtering is still applied post-read for rows within matching row groups. 8. **Filter column handling in fallback** — When a filter references columns outside the projection, the fallback path reads the union of projected + filter columns, applies the filter, then projects down to the requested columns. This mirrors what PyArrow's scanner does internally in the normal path. 9. **Schema alignment in fallback** — `pq.ParquetFile.iter_batches()` returns batches with the file's physical schema. The fallback path aligns batches to the unified dataset schema (type promotion, missing-column filling) to match the normal scanner path's behavior. 10. **`_postprocess_table()`** — Extracted from `_generate_tables()` to eliminate duplicated post-processing logic (partition columns, path column, stub column). 11. Both **`_generate_tables()`** and **`_fetch_parquet_file_info()`** use the shared helper — the error also occurs during the metadata sampling phase, not just the read path. 12. **Regression tests** — End-to-end test with a real ~2.3GB Parquet file with `list<struct>` columns that triggers the Arrow limitation. Plus a test verifying that selecting only flat columns from a mixed-schema file does NOT trigger the fallback. All existing behavior is preserved for files that do not trigger the fallback. ## Related issues - Fixes: ray-project#61675 - PyArrow upstream: apache/arrow#21526 - JIRA: https://issues.apache.org/jira/browse/ARROW-5030 --------- Signed-off-by: Goutam <goutam@anyscale.com> Co-authored-by: xi377266 <xi377266@github.com> Co-authored-by: Goutam <goutam@anyscale.com> Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…types exceeding ~2GB row group (ray-project#61824) ## Why are these changes needed? Fixes ray-project#61675 `ray.data.read_parquet()` raises `ArrowNotImplementedError: Nested data conversions not implemented for chunked array outputs` when reading Parquet files with nested column types (`list<struct>`, `map`, `struct` with nested fields) whose binary/string data in a single row group exceeds PyArrow's internal ~2GB chunking threshold. **Root cause**: `fragment.to_batches()` uses the Arrow Dataset Scanner (`WrapIntoListArray` in C++) which cannot reconstruct nested types from internally-chunked binary arrays. This is a known upstream PyArrow limitation. See: apache/arrow#21526 ## What changes were proposed in this pull request? The original approach (`read_row_group()` + `combine_chunks()`) **does not work** — `read_row_group()` reads the entire row group at once and hits the same ~2GB Arrow limitation during the read itself, before `combine_chunks()` is ever called. The fix uses `pq.ParquetFile.iter_batches(batch_size=N)` which reads smaller row slices *within* a row group, keeping each batch's nested column data under the threshold. The reading strategy is chosen **upfront** based on schema and metadata inspection — no try/catch mid-stream — to avoid any risk of duplicating already-yielded batches. ### Key changes: 1. **`_has_susceptible_nested_types(schema)`** — Checks if the schema contains nested column types (list, struct, map, fixed-size list) wrapping variable-length leaves (string, binary, and their large/view variants) that are susceptible to ARROW-5030. Fixed-width leaves (int, float, bool) never trigger chunking. 2. **`_needs_nested_type_fallback(fragment, columns=None)`** — Metadata-only check that returns True if the *requested* columns (or all columns when `columns` is None) have susceptible nested types AND any row group's uncompressed data for those columns exceeds the ~2GB threshold. This avoids unnecessarily triggering the fallback when only flat columns are selected from a file that also contains large nested columns. 3. **`_resolve_leaf_column_indices(metadata, columns)`** — Maps top-level column names to Parquet metadata leaf column indices. Parquet flattens nested types into leaf columns (e.g., `list<struct<key, payload, value>>` becomes 3 leaves), so this mapping is needed to scope uncompressed size calculations to only the requested columns. 4. **`_get_safe_batch_size_for_nested_types(pf, column_indices=None)`** — Computes a safe batch size from Parquet row group metadata (uncompressed column sizes per row) with a 50% safety margin under the 2GB limit. When `column_indices` is provided, only those leaf columns are measured — preventing artificially small batch sizes when large unrequested columns exist in the file. 5. **`_row_group_uncompressed_size(rg_meta, column_indices=None)`** — Sums uncompressed sizes for all or a subset of leaf columns in a row group. Avoids `rg_meta.total_byte_size` which can return compressed sizes for some files (apache/arrow#48138). 6. **`_iter_batches_with_nested_fallback(fragment, ...)`** — Shared helper that checks `_needs_nested_type_fallback()` upfront (scoped to the union of projected + filter columns) and either uses the normal scanner path (with full predicate pushdown) or the fallback `pf.iter_batches()` path. No try/catch mid-stream. 7. **Row-group-level predicate pushdown in fallback** — Uses `fragment.subset(filter=expr)` to prune non-matching row groups via Parquet statistics (metadata only, no data read) before calling `iter_batches(row_groups=...)`. Row-level filtering is still applied post-read for rows within matching row groups. 8. **Filter column handling in fallback** — When a filter references columns outside the projection, the fallback path reads the union of projected + filter columns, applies the filter, then projects down to the requested columns. This mirrors what PyArrow's scanner does internally in the normal path. 9. **Schema alignment in fallback** — `pq.ParquetFile.iter_batches()` returns batches with the file's physical schema. The fallback path aligns batches to the unified dataset schema (type promotion, missing-column filling) to match the normal scanner path's behavior. 10. **`_postprocess_table()`** — Extracted from `_generate_tables()` to eliminate duplicated post-processing logic (partition columns, path column, stub column). 11. Both **`_generate_tables()`** and **`_fetch_parquet_file_info()`** use the shared helper — the error also occurs during the metadata sampling phase, not just the read path. 12. **Regression tests** — End-to-end test with a real ~2.3GB Parquet file with `list<struct>` columns that triggers the Arrow limitation. Plus a test verifying that selecting only flat columns from a mixed-schema file does NOT trigger the fallback. All existing behavior is preserved for files that do not trigger the fallback. ## Related issues - Fixes: ray-project#61675 - PyArrow upstream: apache/arrow#21526 - JIRA: https://issues.apache.org/jira/browse/ARROW-5030 --------- Signed-off-by: Goutam <goutam@anyscale.com> Co-authored-by: xi377266 <xi377266@github.com> Co-authored-by: Goutam <goutam@anyscale.com> Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>


Why are these changes needed?
Fixes #61675
ray.data.read_parquet()raisesArrowNotImplementedError: Nested data conversions not implemented for chunked array outputswhen reading Parquet files with nested column types (list<struct>,map,structwith nested fields) whose binary/string data in a single row group exceeds PyArrow's internal ~2GB chunking threshold.Root cause:
fragment.to_batches()uses the Arrow Dataset Scanner (WrapIntoListArrayin C++) which cannot reconstruct nested types from internally-chunked binary arrays. This is a known upstream PyArrow limitation. See: apache/arrow#21526What changes were proposed in this pull request?
The original approach (
read_row_group()+combine_chunks()) does not work —read_row_group()reads the entire row group at once and hits the same ~2GB Arrow limitation during the read itself, beforecombine_chunks()is ever called.The fix uses
pq.ParquetFile.iter_batches(batch_size=N)which reads smaller row slices within a row group, keeping each batch's nested column data under the threshold. The reading strategy is chosen upfront based on schema and metadata inspection — no try/catch mid-stream — to avoid any risk of duplicating already-yielded batches.Key changes:
_has_susceptible_nested_types(schema)— Checks if the schema contains nested column types (list, struct, map, fixed-size list) wrapping variable-length leaves (string, binary, and their large/view variants) that are susceptible to ARROW-5030. Fixed-width leaves (int, float, bool) never trigger chunking._needs_nested_type_fallback(fragment, columns=None)— Metadata-only check that returns True if the requested columns (or all columns whencolumnsis None) have susceptible nested types AND any row group's uncompressed data for those columns exceeds the ~2GB threshold. This avoids unnecessarily triggering the fallback when only flat columns are selected from a file that also contains large nested columns._resolve_leaf_column_indices(metadata, columns)— Maps top-level column names to Parquet metadata leaf column indices. Parquet flattens nested types into leaf columns (e.g.,list<struct<key, payload, value>>becomes 3 leaves), so this mapping is needed to scope uncompressed size calculations to only the requested columns._get_safe_batch_size_for_nested_types(pf, column_indices=None)— Computes a safe batch size from Parquet row group metadata (uncompressed column sizes per row) with a 50% safety margin under the 2GB limit. Whencolumn_indicesis provided, only those leaf columns are measured — preventing artificially small batch sizes when large unrequested columns exist in the file._row_group_uncompressed_size(rg_meta, column_indices=None)— Sums uncompressed sizes for all or a subset of leaf columns in a row group. Avoidsrg_meta.total_byte_sizewhich can return compressed sizes for some files ([Python][Parquet] RowGroupMetadata.total_byte_size computes wrong uncompressed size apache/arrow#48138)._iter_batches_with_nested_fallback(fragment, ...)— Shared helper that checks_needs_nested_type_fallback()upfront (scoped to the union of projected + filter columns) and either uses the normal scanner path (with full predicate pushdown) or the fallbackpf.iter_batches()path. No try/catch mid-stream.Row-group-level predicate pushdown in fallback — Uses
fragment.subset(filter=expr)to prune non-matching row groups via Parquet statistics (metadata only, no data read) before callingiter_batches(row_groups=...). Row-level filtering is still applied post-read for rows within matching row groups.Filter column handling in fallback — When a filter references columns outside the projection, the fallback path reads the union of projected + filter columns, applies the filter, then projects down to the requested columns. This mirrors what PyArrow's scanner does internally in the normal path.
Schema alignment in fallback —
pq.ParquetFile.iter_batches()returns batches with the file's physical schema. The fallback path aligns batches to the unified dataset schema (type promotion, missing-column filling) to match the normal scanner path's behavior._postprocess_table()— Extracted from_generate_tables()to eliminate duplicated post-processing logic (partition columns, path column, stub column).Both
_generate_tables()and_fetch_parquet_file_info()use the shared helper — the error also occurs during the metadata sampling phase, not just the read path.Regression tests — End-to-end test with a real ~2.3GB Parquet file with
list<struct>columns that triggers the Arrow limitation. Plus a test verifying that selecting only flat columns from a mixed-schema file does NOT trigger the fallback.All existing behavior is preserved for files that do not trigger the fallback.
Related issues