[Data] Add include_row_hash to read_parquet#61408
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a useful include_row_hash option to read_parquet, which is valuable for checkpointing and data versioning. The implementation is generally solid and consistent with existing features like include_paths. However, I've identified a critical bug that can cause a crash when include_row_hash=True is used on a file that already contains a row_hash column, particularly when no specific columns are selected for reading. I've provided details and a suggested fix for this issue. Additionally, I've included a few medium-severity suggestions to improve user experience by adding a warning for column name conflicts, updating the documentation to clarify this behavior, and enhancing test coverage for this edge case.
e1514fa to
2a97a98
Compare
5baca95 to
a403b17
Compare
f49769f to
05965d0
Compare
05965d0 to
a24e8a4
Compare
a24e8a4 to
e9c89a4
Compare
b4305f3 to
09edc82
Compare
| logger.warning( | ||
| "The Parquet file(s) already contain a column named 'row_hash'. " | ||
| "It will be overwritten by the generated row hash column." | ||
| ) |
There was a problem hiding this comment.
I don't think the warning is necessary, since
- We don't use it for
include_paths - We explicitly say in the documentation
With that said, if u make a warning hear, u probably want to add a warning for include_paths too to keep it consistent
b2474d7 to
fc7a149
Compare
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Reviewed by Cursor Bugbot for commit cb74257. Configure here.
cb74257 to
77e18b9
Compare
Signed-off-by: Kit Lee <7000003+wingkitlee0@users.noreply.github.com>
77e18b9 to
e3eb186
Compare
## Description This PR adds an `include_row_hash` option to `read_parquet`, which adds a new column. The row hash is computed from the file path, each row's **index after filtering**, and a mixing step (so values are spread across the uint64 range rather than clustering in a few buckets). Row hashes are unique across the rows you actually read for a given read configuration (same files, same filter, same ordering). They are reproducible under that same configuration, which supports checkpointing for Ray Data and Ray Train. The column type is unsigned 64-bit integer (`uint64`). ## Row hash semantics (filters and checkpointing) Each `row_hash` is deterministic for a given read: it uses the file path and the row's **position after filtering** (0-based—the first row that survives the filter is 0, the next is 1, and so on). It is **not** the row's index in the raw Parquet file before filtering. If you change the filter, which columns you read, or which files you read, which rows appear—and their positions after filtering—can change, so hashes can change too. For **checkpointing and resume**, we assume you keep the **same read setup**, including the **same filter**, across runs. Rows that were filtered out are not part of the pipeline anyway, so identifying rows **after filtering** is enough; we do not rely on pre-filter physical row positions for that use case. ## Related issues Closes ray-project#61410 ## Additional information How it works: 1. Path seed: For each Parquet file, MD5-hash its file path and take the first 8 bytes as a uint64 seed. Identical data in different files still gets different hashes because paths differ. 2. Row keys: After filtering, add each row's **0-based index in the filtered output** for that file (tracked across batches) to the path seed: `key = path_seed + row_index`. 3. Mix: Apply the splitmix64 finalizer (a bijective 64-bit integer mixing function) to scatter nearby keys across the full uint64 range: ``` keys ^= keys >> 30 keys *= 0xBF58476D1CE4E5B9 keys ^= keys >> 27 keys *= 0x94D049BB133111EB keys ^= keys >> 31 ``` All operations are vectorized with NumPy—no Python loops. Properties: - **Reproducible:** Same file path + same filter + same position after filtering → same hash. - **Unique:** Different files get different seeds (via MD5 of path); different rows in the filtered output get different indices. The splitmix64 step is bijective, so distinct inputs do not collide. - **Fast:** One MD5 call per file, then pure NumPy vectorized arithmetic per batch. Signed-off-by: Kit Lee <7000003+wingkitlee0@users.noreply.github.com>

Description
This PR adds an
include_row_hashoption toread_parquet, which adds a new column. The row hash is computed from the file path, each row's index after filtering, and a mixing step (so values are spread across the uint64 range rather than clustering in a few buckets).Row hashes are unique across the rows you actually read for a given read configuration (same files, same filter, same ordering). They are reproducible under that same configuration, which supports checkpointing for Ray Data and Ray Train.
The column type is unsigned 64-bit integer (
uint64).Row hash semantics (filters and checkpointing)
Each
row_hashis deterministic for a given read: it uses the file path and the row's position after filtering (0-based—the first row that survives the filter is 0, the next is 1, and so on). It is not the row's index in the raw Parquet file before filtering.If you change the filter, which columns you read, or which files you read, which rows appear—and their positions after filtering—can change, so hashes can change too.
For checkpointing and resume, we assume you keep the same read setup, including the same filter, across runs. Rows that were filtered out are not part of the pipeline anyway, so identifying rows after filtering is enough; we do not rely on pre-filter physical row positions for that use case.
Related issues
Closes #61410
Additional information
How it works:
Path seed: For each Parquet file, MD5-hash its file path and take the first 8 bytes as a uint64 seed. Identical data in different files still gets different hashes because paths differ.
Row keys: After filtering, add each row's 0-based index in the filtered output for that file (tracked across batches) to the path seed:
key = path_seed + row_index.Mix: Apply the splitmix64 finalizer (a bijective 64-bit integer mixing function) to scatter nearby keys across the full uint64 range:
All operations are vectorized with NumPy—no Python loops.
Properties: