[Data] Convert drop_columns to a Project logical operator when input schema is known#63813
Conversation
… known drop_columns reshapes into self.select_columns(keep_cols) when the input op's infer_schema() returns a pa.Schema, keeping the typed schema chain intact so Dataset.schema() resolves without a limit(1) execution. Missing columns raise KeyError eagerly at the call site on the typed path. When the input schema is opaque (UDF chain, PandasBlockSchema source) or all columns are dropped (avoids the internal __bsp_stub placeholder that select_columns([]) inserts), falls back to the existing MapBatches path. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
There was a problem hiding this comment.
Code Review
This pull request updates Dataset.drop_columns to reshape the operation into a Project over the surviving columns when the input schema is statically known. This keeps the typed schema chain intact and allows missing columns to be reported eagerly. If the schema is unknown or all columns are dropped, it falls back to the MapBatches implementation. Unit tests were added to verify these behaviors. The reviewer suggested performance improvements, including an early return when cols is empty and using sets for O(1) membership lookups during column filtering.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
Return early when cols is empty to skip schema inference and a redundant Project. Use sets for membership checks, reducing the missing/keep computation from O(M*N) to O(M+N). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: Goutam <goutam@anyscale.com>
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes using default effort and found 1 potential issue.
Reviewed by Cursor Bugbot for commit 088f044. Configure here.
| compute=compute, | ||
| concurrency=concurrency, | ||
| **ray_remote_args, | ||
| ) |
There was a problem hiding this comment.
Typed path ignores compute
Medium Severity
When the input has a static PyArrow schema, drop_columns routes through select_columns, which always builds a TaskPoolStrategy from concurrency and never applies the compute argument. The map_batches fallback still resolves compute via get_compute_strategy, so execution settings can change depending on schema visibility.
Reviewed by Cursor Bugbot for commit 088f044. Configure here.
There was a problem hiding this comment.
I believe compute is deprecated for drop_columns...
| f"{missing}. Available columns: {input_schema.names}" | ||
| ) | ||
| keep = [c for c in input_schema.names if c not in cols_set] | ||
| if keep: |
There was a problem hiding this comment.
what if keep is empty?
…put schema is known (ray-project#63813) ## Change `drop_columns` reshapes into `self.select_columns(keep_cols)` when the input op's `infer_schema()` returns a `pa.Schema`, keeping the typed schema chain intact so `Dataset.schema()` resolves without a `limit(1)` execution. Missing columns raise `KeyError` eagerly at the call site on the typed path. When the input schema is opaque (UDF chain, `PandasBlockSchema` source) or all columns are dropped (avoids the internal __bsp_stub placeholder that `select_columns([])` inserts), falls back to the existing `MapBatches` path. --------- Signed-off-by: Goutam <goutam@anyscale.com> Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
…put schema is known (ray-project#63813) ## Change `drop_columns` reshapes into `self.select_columns(keep_cols)` when the input op's `infer_schema()` returns a `pa.Schema`, keeping the typed schema chain intact so `Dataset.schema()` resolves without a `limit(1)` execution. Missing columns raise `KeyError` eagerly at the call site on the typed path. When the input schema is opaque (UDF chain, `PandasBlockSchema` source) or all columns are dropped (avoids the internal __bsp_stub placeholder that `select_columns([])` inserts), falls back to the existing `MapBatches` path. --------- Signed-off-by: Goutam <goutam@anyscale.com> Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>


Change
drop_columnsreshapes intoself.select_columns(keep_cols)when the input op'sinfer_schema()returns apa.Schema, keeping the typed schema chain intact soDataset.schema()resolves without alimit(1)execution. Missing columns raiseKeyErroreagerly at the call site on the typed path.When the input schema is opaque (UDF chain,
PandasBlockSchemasource) or all columns are dropped (avoids the internal __bsp_stub placeholder thatselect_columns([])inserts), falls back to the existingMapBatchespath.