Skip to content

[Data] Add include_row_hash to read_parquet#61408

Merged
richardliaw merged 1 commit into
ray-project:masterfrom
wingkitlee0:kit/read-row-hash
Apr 28, 2026
Merged

[Data] Add include_row_hash to read_parquet#61408
richardliaw merged 1 commit into
ray-project:masterfrom
wingkitlee0:kit/read-row-hash

Conversation

@wingkitlee0

@wingkitlee0 wingkitlee0 commented Mar 1, 2026

Copy link
Copy Markdown
Contributor

Description

This PR adds an include_row_hash option to read_parquet, which adds a new column. The row hash is computed from the file path, each row's index after filtering, and a mixing step (so values are spread across the uint64 range rather than clustering in a few buckets).

Row hashes are unique across the rows you actually read for a given read configuration (same files, same filter, same ordering). They are reproducible under that same configuration, which supports checkpointing for Ray Data and Ray Train.

The column type is unsigned 64-bit integer (uint64).

Row hash semantics (filters and checkpointing)

Each row_hash is deterministic for a given read: it uses the file path and the row's position after filtering (0-based—the first row that survives the filter is 0, the next is 1, and so on). It is not the row's index in the raw Parquet file before filtering.

If you change the filter, which columns you read, or which files you read, which rows appear—and their positions after filtering—can change, so hashes can change too.

For checkpointing and resume, we assume you keep the same read setup, including the same filter, across runs. Rows that were filtered out are not part of the pipeline anyway, so identifying rows after filtering is enough; we do not rely on pre-filter physical row positions for that use case.

Related issues

Closes #61410

Additional information

How it works:

  1. Path seed: For each Parquet file, MD5-hash its file path and take the first 8 bytes as a uint64 seed. Identical data in different files still gets different hashes because paths differ.

  2. Row keys: After filtering, add each row's 0-based index in the filtered output for that file (tracked across batches) to the path seed: key = path_seed + row_index.

  3. Mix: Apply the splitmix64 finalizer (a bijective 64-bit integer mixing function) to scatter nearby keys across the full uint64 range:

  keys ^= keys >> 30
  keys *= 0xBF58476D1CE4E5B9
  keys ^= keys >> 27
  keys *= 0x94D049BB133111EB
  keys ^= keys >> 31

All operations are vectorized with NumPy—no Python loops.

Properties:

  • Reproducible: Same file path + same filter + same position after filtering → same hash.
  • Unique: Different files get different seeds (via MD5 of path); different rows in the filtered output get different indices. The splitmix64 step is bijective, so distinct inputs do not collide.
  • Fast: One MD5 call per file, then pure NumPy vectorized arithmetic per batch.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a useful include_row_hash option to read_parquet, which is valuable for checkpointing and data versioning. The implementation is generally solid and consistent with existing features like include_paths. However, I've identified a critical bug that can cause a crash when include_row_hash=True is used on a file that already contains a row_hash column, particularly when no specific columns are selected for reading. I've provided details and a suggested fix for this issue. Additionally, I've included a few medium-severity suggestions to improve user experience by adding a warning for column name conflicts, updating the documentation to clarify this behavior, and enhancing test coverage for this edge case.

Comment thread python/ray/data/_internal/datasource/parquet_datasource.py
Comment thread python/ray/data/_internal/datasource/parquet_datasource.py
Comment thread python/ray/data/read_api.py Outdated
Comment thread python/ray/data/tests/datasource/test_parquet.py
@wingkitlee0 wingkitlee0 force-pushed the kit/read-row-hash branch 2 times, most recently from 5baca95 to a403b17 Compare March 15, 2026 14:20
@wingkitlee0 wingkitlee0 added the go add ONLY when ready to merge, run all tests label Mar 15, 2026
@wingkitlee0 wingkitlee0 marked this pull request as ready for review March 18, 2026 12:24
@wingkitlee0 wingkitlee0 requested a review from a team as a code owner March 18, 2026 12:24
Comment thread python/ray/data/_internal/datasource/parquet_datasource.py Outdated
Comment thread python/ray/data/_internal/datasource/parquet_datasource.py Outdated
@ray-gardener ray-gardener Bot added data Ray Data-related issues community-contribution Contributed by the community labels Mar 18, 2026
@wingkitlee0 wingkitlee0 marked this pull request as draft March 19, 2026 02:32
@wingkitlee0 wingkitlee0 force-pushed the kit/read-row-hash branch 4 times, most recently from f49769f to 05965d0 Compare March 22, 2026 00:41
@wingkitlee0 wingkitlee0 marked this pull request as ready for review March 22, 2026 03:39
@wingkitlee0 wingkitlee0 marked this pull request as draft April 5, 2026 19:05
@wingkitlee0 wingkitlee0 force-pushed the kit/read-row-hash branch 2 times, most recently from b4305f3 to 09edc82 Compare April 11, 2026 12:48
@wingkitlee0 wingkitlee0 marked this pull request as ready for review April 11, 2026 12:50
@iamjustinhsu iamjustinhsu self-assigned this Apr 14, 2026
logger.warning(
"The Parquet file(s) already contain a column named 'row_hash'. "
"It will be overwritten by the generated row hash column."
)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the warning is necessary, since

  1. We don't use it for include_paths
  2. We explicitly say in the documentation

With that said, if u make a warning hear, u probably want to add a warning for include_paths too to keep it consistent

@wingkitlee0 wingkitlee0 force-pushed the kit/read-row-hash branch 2 times, most recently from b2474d7 to fc7a149 Compare April 18, 2026 19:46

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

Reviewed by Cursor Bugbot for commit cb74257. Configure here.

Comment thread python/ray/data/_internal/datasource/parquet_datasource.py
Signed-off-by: Kit Lee <7000003+wingkitlee0@users.noreply.github.com>
@richardliaw richardliaw merged commit e1fe22f into ray-project:master Apr 28, 2026
6 checks passed
Lucas61000 pushed a commit to Lucas61000/ray that referenced this pull request May 15, 2026
## Description

This PR adds an `include_row_hash` option to `read_parquet`, which adds
a new column. The row hash is computed from the file path, each row's
**index after filtering**, and a mixing step (so values are spread
across the uint64 range rather than clustering in a few buckets).

Row hashes are unique across the rows you actually read for a given read
configuration (same files, same filter, same ordering). They are
reproducible under that same configuration, which supports checkpointing
for Ray Data and Ray Train.

The column type is unsigned 64-bit integer (`uint64`).

## Row hash semantics (filters and checkpointing)

Each `row_hash` is deterministic for a given read: it uses the file path
and the row's **position after filtering** (0-based—the first row that
survives the filter is 0, the next is 1, and so on). It is **not** the
row's index in the raw Parquet file before filtering.

If you change the filter, which columns you read, or which files you
read, which rows appear—and their positions after filtering—can change,
so hashes can change too.

For **checkpointing and resume**, we assume you keep the **same read
setup**, including the **same filter**, across runs. Rows that were
filtered out are not part of the pipeline anyway, so identifying rows
**after filtering** is enough; we do not rely on pre-filter physical row
positions for that use case.

## Related issues

Closes ray-project#61410

## Additional information

How it works:

1. Path seed: For each Parquet file, MD5-hash its file path and take the
first 8 bytes as a uint64 seed. Identical data in different files still
gets different hashes because paths differ.

2. Row keys: After filtering, add each row's **0-based index in the
filtered output** for that file (tracked across batches) to the path
seed: `key = path_seed + row_index`.

3. Mix: Apply the splitmix64 finalizer (a bijective 64-bit integer
mixing function) to scatter nearby keys across the full uint64 range:

```
  keys ^= keys >> 30
  keys *= 0xBF58476D1CE4E5B9
  keys ^= keys >> 27
  keys *= 0x94D049BB133111EB
  keys ^= keys >> 31
```

All operations are vectorized with NumPy—no Python loops.

Properties:

- **Reproducible:** Same file path + same filter + same position after
filtering → same hash.
- **Unique:** Different files get different seeds (via MD5 of path);
different rows in the filtered output get different indices. The
splitmix64 step is bijective, so distinct inputs do not collide.
- **Fast:** One MD5 call per file, then pure NumPy vectorized arithmetic
per batch.

Signed-off-by: Kit Lee <7000003+wingkitlee0@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-contribution Contributed by the community data Ray Data-related issues go add ONLY when ready to merge, run all tests

3 participants