Skip to content

[Data] Add descriptive error when using local:// paths with a zero-resource head node#60709

Merged
bveeramani merged 1 commit into
ray-project:masterfrom
KaisennHu:local-path-err
Feb 11, 2026
Merged

[Data] Add descriptive error when using local:// paths with a zero-resource head node#60709
bveeramani merged 1 commit into
ray-project:masterfrom
KaisennHu:local-path-err

Conversation

@KaisennHu

Copy link
Copy Markdown
Contributor

Description

When users read from or write to local:// paths, Ray Data schedules these tasks on the head node using NodeAffinitySchedulingStrategy(head_node_id, soft=False). If the head node has no logical resources (a recommended best practice to avoid head OOM), these tasks become unschedulable and produce a confusing error:

ray.exceptions.TaskUnschedulableError: The task is not schedulable: The node specified
via NodeAffinitySchedulingStrategy doesn't exist any more or is infeasible, and soft=False
was specified. task_id=..., task_name=Write

Add a clear, actionable error message that explains why the operation failed and how to resolve it.

Solution

  • Add a helper _validate_head_node_resources_for_local_scheduling that inspects the final merged ray_remote_args and raises a clear, actionable ValueError when an operation pinned to the head node requires resources the head node lacks
  • Call this validation after merge_resources_to_ray_remote_args for head-pinned reads read_datasource and writes Dataset.write_datasink, so explicit user settings (e.g., num_cpus=0) are respected
  • Include regression tests that reproduce the zero-resource head-node + local:// scenario and assert the descriptive error is raised

@tianyi-ge

Related issues

Closes #60698

@KaisennHu KaisennHu requested a review from a team as a code owner February 3, 2026 14:29

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a helpful validation check to provide a more descriptive error message when attempting to use local:// paths with a head node that has zero resources. The implementation adds a new utility function _validate_head_node_resources_for_local_scheduling and integrates it into the read and write paths. The changes are well-implemented and include corresponding regression tests. The code is clear and addresses the issue effectively. I have one minor suggestion for improving code clarity.

Comment thread python/ray/data/_internal/util.py Outdated
Comment thread python/ray/data/_internal/util.py Outdated
@ray-gardener ray-gardener Bot added data Ray Data-related issues community-contribution Contributed by the community labels Feb 3, 2026
@iamjustinhsu iamjustinhsu self-assigned this Feb 4, 2026
@bveeramani bveeramani self-assigned this Feb 4, 2026

@iamjustinhsu iamjustinhsu left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the contribution! left some feedback below

Comment thread python/ray/data/_internal/util.py Outdated
Comment thread python/ray/data/_internal/util.py Outdated
@KaisennHu

Copy link
Copy Markdown
Contributor Author

thanks for the contribution! left some feedback below

Thanks for the feedback! All addressed.

@iamjustinhsu iamjustinhsu left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added more feedback, after you address those, feel free to ping me again

Comment thread python/ray/data/_internal/util.py Outdated
Comment on lines +422 to +425
if not head_node:
# The head node metadata is unavailable (e.g., during shutdown). Fall back
# to the default behavior and let Ray surface its own error.
return

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my understanding, do u have a script of when this can occur? (head_node is None , BUT next(...) doesn't throw a StopIteration exception?)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the question. next(..., None) is intentional, so it returns None (no StopIteration) when no matching head node is visible. That can happen during shutdown/teardown or before head resources are fully registered, so we fall back and let Ray surface its own error.

Comment thread python/ray/data/_internal/util.py Outdated
Comment thread python/ray/data/_internal/util.py Outdated
Comment thread python/ray/data/_internal/util.py Outdated
Comment thread python/ray/data/_internal/util.py Outdated
@KaisennHu KaisennHu force-pushed the local-path-err branch 2 times, most recently from 4c4c02f to c590e9a Compare February 7, 2026 03:06
@KaisennHu

Copy link
Copy Markdown
Contributor Author

@iamjustinhsu Thanks for the detailed review. I've implemented the suggested changes. Let me know if there's anything else!

Comment thread python/ray/data/_internal/util.py Outdated
if num_gpus > 0:
required_resources["GPU"] = float(num_gpus)
if memory > 0:
required_resources["memory"] = float(memory)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing None handling for standard resource arguments

Medium Severity

The standard resources (num_cpus, num_gpus, memory) use .get() with a default value, but this only applies when the key is absent. If ray_remote_args contains an explicit None value (e.g., {"num_cpus": None}), the .get() returns None, and the subsequent comparison like num_cpus > 0 raises a TypeError. This is inconsistent with the custom resources handling at lines 404-405, which explicitly checks for and skips None values.

Fix in Cursor Fix in Web

Comment thread python/ray/data/_internal/util.py Outdated
and "node:__internal_head__" in node.get("Resources", {})
),
None,
)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Validation checks head node but tasks scheduled elsewhere

Medium Severity

The validation function explicitly searches for the head node using node:__internal_head__ in resources, but the actual NodeAffinitySchedulingStrategy is set using ray.get_runtime_context().get_node_id(), which returns the current node (driver's node). If the driver is running on a non-head node (e.g., in a multi-node cluster where a script runs from a worker node), the validation checks resources on the wrong node. This could cause false positives (blocking operations that would succeed) or false negatives (allowing operations that will fail).

Additional Locations (2)

Fix in Cursor Fix in Web

Comment thread python/ray/data/_internal/util.py Outdated

@iamjustinhsu iamjustinhsu left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some small feedback and then I think we should be good to go

Comment thread python/ray/data/_internal/util.py Outdated
) -> None:
"""Ensure the head node has enough resources before pinning work there.

Local paths (``local://``) and other driver-local I/O force tasks onto the

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Local paths (``local://``) and other driver-local I/O force tasks onto the
Local paths (``local://``) and other driver-local I/O schedule tasks on the
@@ -743,6 +743,42 @@ def check_dataset_is_local(ds):
).materialize()


def test_read_local_scheme_zero_head_resources(ray_start_cluster, tmp_path):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these tests test very similar things. What I recommend is just keep 1 of the tests and use _validate_head_node_resources_for_local_scheduling directly for testing (since it's not really testing read/write E2E), or you can pytest.parameterize? I prefer the 1st option since it's not high risk if missing, and can makes the testing a faster (since we only need one of the tests)

Comment thread python/ray/data/_internal/util.py Outdated

required_resources: Dict[str, float] = {}
if num_cpus > 0:
required_resources["CPU"] = float(num_cpus)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should add a comment here specifying that these keys come from ray.nodes() for future users

Comment thread python/ray/data/_internal/util.py Outdated
memory = ray_remote_args.get("memory", required_memory)

required_resources: Dict[str, float] = {}
if num_cpus > 0:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thinking about this more, are these if num_cpus > 0 checks necessary? I think if they are 0, then we should catch it when building the insufficient dict (since we default to 0)

Comment thread python/ray/data/_internal/util.py Outdated
amount = float(amount)
except (TypeError, ValueError) as err:
raise ValueError(f"Invalid resource amount for '{name}': {amount}") from err
if amount > 0:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here above

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Comment thread python/ray/data/_internal/util.py Outdated
@KaisennHu KaisennHu force-pushed the local-path-err branch 2 times, most recently from 367993e to c19a569 Compare February 11, 2026 02:37
@KaisennHu

Copy link
Copy Markdown
Contributor Author

some small feedback and then I think we should be good to go

Thanks! All points addressed.

@bveeramani bveeramani left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ty for the contribution!

(And ty @iamjustinhsu for shepherding)

Comment thread python/ray/data/_internal/util.py Outdated
op_name: str,
required_num_cpus: int = 1,
required_num_gpus: int = 0,
required_memory: int = 0,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I felt confused by these names, because it wasn't clear from how these are different from the ray_remote_args. For example, if a user specifies ray_remote_args, aren't those the required logical resources for a task to execute?

I think these represent like default_num_cpus?

Comment thread python/ray/data/_internal/util.py Outdated
def _validate_head_node_resources_for_local_scheduling(
ray_remote_args: Dict[str, Any],
*,
op_name: str,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: This parameter might be confused with Operator.name.

Comment on lines +758 to +759
cluster.add_node(num_cpus=1)
cluster.wait_for_nodes()

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Is this logic necessary for the assertion below? If not, would it make sense to remove it to simplify the test?

Comment on lines +747 to +749
from ray.data._internal.util import (
_validate_head_node_resources_for_local_scheduling,
)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Unless there's a circular import, move to top of file for consistency with the more-common Ray pattern of importing at the top of modules?

Comment thread python/ray/data/_internal/util.py Outdated
@@ -368,6 +368,86 @@ def _is_local_scheme(paths: Union[str, List[str]]) -> bool:
return num == len(paths)


def _validate_head_node_resources_for_local_scheduling(

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Consider moving this to ray.data.datasource.util if you think it makes more sense there than in the generic util file

Comment thread python/ray/data/_internal/util.py Outdated
try:
amount = float(amount)
except (TypeError, ValueError) as err:
raise ValueError(f"Invalid resource amount for '{name}': {amount}") from err

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to changes anything since I think this might conflict with an earlier review comment, but I felt surprised that we're validating custom resources types in this abstraction. I guess I would've assumed a valid ray_remote_args is more of a pre-condition and we validate ray_remote_args earlier, if at all

…source head node

Signed-off-by: Haichuan Hu <kaisennhu@gmail.com>
@bveeramani bveeramani enabled auto-merge (squash) February 11, 2026 18:25
@github-actions github-actions Bot added the go add ONLY when ready to merge, run all tests label Feb 11, 2026
@bveeramani bveeramani merged commit cab739f into ray-project:master Feb 11, 2026
8 checks passed
@KaisennHu KaisennHu deleted the local-path-err branch February 14, 2026 06:53
ans9868 pushed a commit to ans9868/ray that referenced this pull request Feb 18, 2026
…resource head node (ray-project#60709)

## Description
When users read from or write to `local://` paths, Ray Data schedules
these tasks on the head node using
`NodeAffinitySchedulingStrategy(head_node_id, soft=False)`. If the head
node has no logical resources (a recommended best practice to avoid head
OOM), these tasks become unschedulable and produce a confusing error:

```
ray.exceptions.TaskUnschedulableError: The task is not schedulable: The node specified
via NodeAffinitySchedulingStrategy doesn't exist any more or is infeasible, and soft=False
was specified. task_id=..., task_name=Write
```
Add a clear, actionable error message that explains why the operation
failed and how to resolve it.

## Solution
- Add a helper `_validate_head_node_resources_for_local_scheduling` that
inspects the final merged `ray_remote_args` and raises a clear,
actionable `ValueError` when an operation pinned to the head node
requires resources the head node lacks
- Call this validation after `merge_resources_to_ray_remote_args` for
head-pinned reads `read_datasource` and writes `Dataset.write_datasink`,
so explicit user settings (e.g., `num_cpus=0`) are respected
- Include regression tests that reproduce the zero-resource head-node +
`local://` scenario and assert the descriptive error is raised

@tianyi-ge

## Related issues
Closes ray-project#60698

Signed-off-by: Haichuan Hu <kaisennhu@gmail.com>
Signed-off-by: Adel Nour <ans9868@nyu.edu>
Aydin-ab pushed a commit to kunling-anyscale/ray that referenced this pull request Feb 20, 2026
…resource head node (ray-project#60709)

## Description
When users read from or write to `local://` paths, Ray Data schedules
these tasks on the head node using
`NodeAffinitySchedulingStrategy(head_node_id, soft=False)`. If the head
node has no logical resources (a recommended best practice to avoid head
OOM), these tasks become unschedulable and produce a confusing error:

```
ray.exceptions.TaskUnschedulableError: The task is not schedulable: The node specified
via NodeAffinitySchedulingStrategy doesn't exist any more or is infeasible, and soft=False
was specified. task_id=..., task_name=Write
```
Add a clear, actionable error message that explains why the operation
failed and how to resolve it.

## Solution
- Add a helper `_validate_head_node_resources_for_local_scheduling` that
inspects the final merged `ray_remote_args` and raises a clear,
actionable `ValueError` when an operation pinned to the head node
requires resources the head node lacks
- Call this validation after `merge_resources_to_ray_remote_args` for
head-pinned reads `read_datasource` and writes `Dataset.write_datasink`,
so explicit user settings (e.g., `num_cpus=0`) are respected
- Include regression tests that reproduce the zero-resource head-node +
`local://` scenario and assert the descriptive error is raised

@tianyi-ge 

## Related issues
Closes ray-project#60698

Signed-off-by: Haichuan Hu <kaisennhu@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-contribution Contributed by the community data Ray Data-related issues go add ONLY when ready to merge, run all tests

3 participants