Skip to content

[Data] boost hash_partition w/ sort_indices + zero-copy slices#63498

Merged
goutamvenkat-anyscale merged 6 commits into
ray-project:masterfrom
owenowenisme:data/hash-partition-sort-indices
Jun 4, 2026
Merged

[Data] boost hash_partition w/ sort_indices + zero-copy slices#63498
goutamvenkat-anyscale merged 6 commits into
ray-project:masterfrom
owenowenisme:data/hash-partition-sort-indices

Conversation

@owenowenisme

@owenowenisme owenowenisme commented May 19, 2026

Copy link
Copy Markdown
Member

Description

hash_partition previously did three expensive things in sequence:
N = num_partition | R = num_rows

  1. Built per-partition index arrays via N × np.where(part_ids == p) — O(N · R) scans
  2. Defragmented the input via try_combine_chunked_columns(table) — a full-table copy
  3. Ran N independent table.take(indices[p]) calls

This change replaces all three with:

  1. pyarrow.compute.sort_indices(partition_ids) — radix sort on integers, one O(R) pass
  2. One take_table(table, sort_indices) on the original (possibly chunked) input
  3. N zero-copy Table.slice() calls

The N takes together form a permutation of the table, so consolidating them into one sort + N zero-copy slices is equivalent and strictly cheaper (fixed take overhead paid once instead of N times). The defrag copy can also be removed: the original Arrow problem (apache/arrow#35126) is that every take on a chunked table internally concatenates all chunks first, so try_combine_chunked_columns exists to pay that concat once externally and let the subsequent N takes use the fast path. By calling take only once, the internal concat happens just once anyway — the external defrag becomes redundant. And because the take output already arranges each partition's rows contiguously, we can carve out the N partitions with zero-copy slices instead of materializing a second copy — which would be another 1GB for a 1GB input.

Benchmark: 1GB block → 1000 partitions

Single thread, PyArrow 23.0.1. K = number of chunks in the input table; K=256 mirrors realistic multi-chunk input.

Block shape K Time before Time after Speedup Peak Arrow before Peak Arrow after
16M rows × 8 int64 1 6542 ms 939 ms 6.96× 1024 MB (no copy needed) 1152 MB
16M rows × 8 int64 256 6887 ms 1057 ms 6.51× 2048 MB 1280 MB
8M rows × 16 int64 1 4818 ms 825 ms 5.84× 1024 MB (no copy needed) 1088 MB
8M rows × 16 int64 256 5086 ms 982 ms 5.18× 2048 MB 1152 MB
2M rows × 64 int64 1 2468 ms 322 ms 7.66× 1026 MB (no copy needed) 1040 MB
2M rows × 64 int64 256 2216 ms 369 ms 6.01× 2050 MB 1056 MB
  • Throughput: 5–8× faster across all shapes.
  • Peak Arrow allocation on chunked inputs (K=256): ~2.0 GB → ~1.1 GB (~40% reduction) — the input no longer has to coexist with a defragmented copy.
Before After Speedup
aggregate_groups (84 groups, mean) 61 s 40 s 1.53×

Related issues

Link related issues: "Fixes #1234", "Closes #1234", or "Related to #1234".

Additional information

Optional: Add implementation details, API changes, usage examples, screenshots, etc.

Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
@owenowenisme owenowenisme requested a review from a team as a code owner May 19, 2026 09:44
@owenowenisme owenowenisme changed the title [Data] May 19, 2026
@owenowenisme owenowenisme added data Ray Data-related issues go add ONLY when ready to merge, run all tests labels May 19, 2026

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request optimizes the hash_partition function in python/ray/data/_internal/arrow_ops/transform_pyarrow.py by replacing multiple independent take operations with a single take followed by zero-copy slices. The implementation uses pyarrow.compute.sort_indices to sort the table by partition ID and numpy.bincount to determine partition boundaries. I have no feedback to provide as there were no review comments to evaluate.

Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
@owenowenisme owenowenisme marked this pull request as draft May 19, 2026 15:22
@owenowenisme owenowenisme marked this pull request as ready for review May 28, 2026 08:01
@goutamvenkat-anyscale goutamvenkat-anyscale merged commit 5289887 into ray-project:master Jun 4, 2026
6 checks passed
rueian pushed a commit to rueian/ray that referenced this pull request Jun 4, 2026
…roject#63498)

## Description
`hash_partition` previously did three expensive things in sequence:
N = `num_partition`  |  R = `num_rows`
1. Built per-partition index arrays via `N × np.where(part_ids == p)` —
O(N · R) scans
2. Defragmented the input via `try_combine_chunked_columns(table)` — a
full-table copy
3. Ran `N` independent `table.take(indices[p])` calls

This change replaces all three with:
1. `pyarrow.compute.sort_indices(partition_ids)` — radix sort on
integers, one O(R) pass
2. One `take_table(table, sort_indices)` on the original (possibly
chunked) input
3. `N` zero-copy `Table.slice()` calls

The `N` takes together form a permutation of the table, so consolidating
them into one sort + N zero-copy slices is equivalent and strictly
cheaper (fixed take overhead paid once instead of N times). The defrag
copy can also be removed: the original Arrow problem
(apache/arrow#35126) is that every `take` on a chunked table internally
concatenates all chunks first, so `try_combine_chunked_columns` exists
to pay that concat once externally and let the subsequent N takes use
the fast path. By calling `take` only once, the internal concat happens
just once anyway — the external defrag becomes redundant. And because
the take output already arranges each partition's rows contiguously, we
can carve out the N partitions with zero-copy slices instead of
materializing a second copy — which would be another 1GB for a 1GB
input.
## Benchmark: 1GB block → 1000 partitions

Single thread, PyArrow 23.0.1. `K` = number of chunks in the input
table; `K=256` mirrors realistic multi-chunk input.

| Block shape | K | Time before | Time after | Speedup | Peak Arrow
before | Peak Arrow after |
|---|---|---|---|---|---|---|
| 16M rows × 8 int64 | 1 | 6542 ms | **939 ms** | **6.96×** | 1024 MB
(no copy needed) | 1152 MB |
| 16M rows × 8 int64 | 256 | 6887 ms | **1057 ms** | **6.51×** | 2048 MB
| **1280 MB** |
| 8M rows × 16 int64 | 1 | 4818 ms | **825 ms** | **5.84×** | 1024 MB
(no copy needed) | 1088 MB |
| 8M rows × 16 int64 | 256 | 5086 ms | **982 ms** | **5.18×** | 2048 MB
| **1152 MB** |
| 2M rows × 64 int64 | 1 | 2468 ms | **322 ms** | **7.66×** | 1026 MB
(no copy needed) | 1040 MB |
| 2M rows × 64 int64 | 256 | 2216 ms | **369 ms** | **6.01×** | 2050 MB
| **1056 MB** |

- **Throughput**: 5–8× faster across all shapes.
- **Peak Arrow allocation** on chunked inputs (K=256): ~2.0 GB → ~1.1 GB
(~40% reduction) — the input no longer has to coexist with a
defragmented copy.


| | Before | After | Speedup |
|---|---|---|---|
| `aggregate_groups` (84 groups, mean) | 61 s | **40 s** | **1.53×** |

## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
edoakes pushed a commit to edoakes/ray that referenced this pull request Jun 5, 2026
…roject#63498)

## Description
`hash_partition` previously did three expensive things in sequence:
N = `num_partition`  |  R = `num_rows`
1. Built per-partition index arrays via `N × np.where(part_ids == p)` —
O(N · R) scans
2. Defragmented the input via `try_combine_chunked_columns(table)` — a
full-table copy
3. Ran `N` independent `table.take(indices[p])` calls

This change replaces all three with:
1. `pyarrow.compute.sort_indices(partition_ids)` — radix sort on
integers, one O(R) pass
2. One `take_table(table, sort_indices)` on the original (possibly
chunked) input
3. `N` zero-copy `Table.slice()` calls

The `N` takes together form a permutation of the table, so consolidating
them into one sort + N zero-copy slices is equivalent and strictly
cheaper (fixed take overhead paid once instead of N times). The defrag
copy can also be removed: the original Arrow problem
(apache/arrow#35126) is that every `take` on a chunked table internally
concatenates all chunks first, so `try_combine_chunked_columns` exists
to pay that concat once externally and let the subsequent N takes use
the fast path. By calling `take` only once, the internal concat happens
just once anyway — the external defrag becomes redundant. And because
the take output already arranges each partition's rows contiguously, we
can carve out the N partitions with zero-copy slices instead of
materializing a second copy — which would be another 1GB for a 1GB
input.
## Benchmark: 1GB block → 1000 partitions

Single thread, PyArrow 23.0.1. `K` = number of chunks in the input
table; `K=256` mirrors realistic multi-chunk input.

| Block shape | K | Time before | Time after | Speedup | Peak Arrow
before | Peak Arrow after |
|---|---|---|---|---|---|---|
| 16M rows × 8 int64 | 1 | 6542 ms | **939 ms** | **6.96×** | 1024 MB
(no copy needed) | 1152 MB |
| 16M rows × 8 int64 | 256 | 6887 ms | **1057 ms** | **6.51×** | 2048 MB
| **1280 MB** |
| 8M rows × 16 int64 | 1 | 4818 ms | **825 ms** | **5.84×** | 1024 MB
(no copy needed) | 1088 MB |
| 8M rows × 16 int64 | 256 | 5086 ms | **982 ms** | **5.18×** | 2048 MB
| **1152 MB** |
| 2M rows × 64 int64 | 1 | 2468 ms | **322 ms** | **7.66×** | 1026 MB
(no copy needed) | 1040 MB |
| 2M rows × 64 int64 | 256 | 2216 ms | **369 ms** | **6.01×** | 2050 MB
| **1056 MB** |

- **Throughput**: 5–8× faster across all shapes.
- **Peak Arrow allocation** on chunked inputs (K=256): ~2.0 GB → ~1.1 GB
(~40% reduction) — the input no longer has to coexist with a
defragmented copy.


| | Before | After | Speedup |
|---|---|---|---|
| `aggregate_groups` (84 groups, mean) | 61 s | **40 s** | **1.53×** |

## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
limarkdcunha pushed a commit to limarkdcunha/ray that referenced this pull request Jun 30, 2026
…roject#63498)

## Description
`hash_partition` previously did three expensive things in sequence:
N = `num_partition`  |  R = `num_rows`
1. Built per-partition index arrays via `N × np.where(part_ids == p)` —
O(N · R) scans
2. Defragmented the input via `try_combine_chunked_columns(table)` — a
full-table copy
3. Ran `N` independent `table.take(indices[p])` calls

This change replaces all three with:
1. `pyarrow.compute.sort_indices(partition_ids)` — radix sort on
integers, one O(R) pass
2. One `take_table(table, sort_indices)` on the original (possibly
chunked) input
3. `N` zero-copy `Table.slice()` calls

The `N` takes together form a permutation of the table, so consolidating
them into one sort + N zero-copy slices is equivalent and strictly
cheaper (fixed take overhead paid once instead of N times). The defrag
copy can also be removed: the original Arrow problem
(apache/arrow#35126) is that every `take` on a chunked table internally
concatenates all chunks first, so `try_combine_chunked_columns` exists
to pay that concat once externally and let the subsequent N takes use
the fast path. By calling `take` only once, the internal concat happens
just once anyway — the external defrag becomes redundant. And because
the take output already arranges each partition's rows contiguously, we
can carve out the N partitions with zero-copy slices instead of
materializing a second copy — which would be another 1GB for a 1GB
input.
## Benchmark: 1GB block → 1000 partitions

Single thread, PyArrow 23.0.1. `K` = number of chunks in the input
table; `K=256` mirrors realistic multi-chunk input.

| Block shape | K | Time before | Time after | Speedup | Peak Arrow
before | Peak Arrow after |
|---|---|---|---|---|---|---|
| 16M rows × 8 int64 | 1 | 6542 ms | **939 ms** | **6.96×** | 1024 MB
(no copy needed) | 1152 MB |
| 16M rows × 8 int64 | 256 | 6887 ms | **1057 ms** | **6.51×** | 2048 MB
| **1280 MB** |
| 8M rows × 16 int64 | 1 | 4818 ms | **825 ms** | **5.84×** | 1024 MB
(no copy needed) | 1088 MB |
| 8M rows × 16 int64 | 256 | 5086 ms | **982 ms** | **5.18×** | 2048 MB
| **1152 MB** |
| 2M rows × 64 int64 | 1 | 2468 ms | **322 ms** | **7.66×** | 1026 MB
(no copy needed) | 1040 MB |
| 2M rows × 64 int64 | 256 | 2216 ms | **369 ms** | **6.01×** | 2050 MB
| **1056 MB** |

- **Throughput**: 5–8× faster across all shapes.
- **Peak Arrow allocation** on chunked inputs (K=256): ~2.0 GB → ~1.1 GB
(~40% reduction) — the input no longer has to coexist with a
defragmented copy.


| | Before | After | Speedup |
|---|---|---|---|
| `aggregate_groups` (84 groups, mean) | 61 s | **40 s** | **1.53×** |

## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

2 participants