[Data] Add descriptive error when using local:// paths with a zero-resource head node#60709
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a helpful validation check to provide a more descriptive error message when attempting to use local:// paths with a head node that has zero resources. The implementation adds a new utility function _validate_head_node_resources_for_local_scheduling and integrates it into the read and write paths. The changes are well-implemented and include corresponding regression tests. The code is clear and addresses the issue effectively. I have one minor suggestion for improving code clarity.
iamjustinhsu
left a comment
There was a problem hiding this comment.
thanks for the contribution! left some feedback below
0638b78 to
f1e2735
Compare
Thanks for the feedback! All addressed. |
iamjustinhsu
left a comment
There was a problem hiding this comment.
I added more feedback, after you address those, feel free to ping me again
| if not head_node: | ||
| # The head node metadata is unavailable (e.g., during shutdown). Fall back | ||
| # to the default behavior and let Ray surface its own error. | ||
| return |
There was a problem hiding this comment.
For my understanding, do u have a script of when this can occur? (head_node is None , BUT next(...) doesn't throw a StopIteration exception?)
There was a problem hiding this comment.
Thanks for the question. next(..., None) is intentional, so it returns None (no StopIteration) when no matching head node is visible. That can happen during shutdown/teardown or before head resources are fully registered, so we fall back and let Ray surface its own error.
4c4c02f to
c590e9a
Compare
|
@iamjustinhsu Thanks for the detailed review. I've implemented the suggested changes. Let me know if there's anything else! |
| if num_gpus > 0: | ||
| required_resources["GPU"] = float(num_gpus) | ||
| if memory > 0: | ||
| required_resources["memory"] = float(memory) |
There was a problem hiding this comment.
Missing None handling for standard resource arguments
Medium Severity
The standard resources (num_cpus, num_gpus, memory) use .get() with a default value, but this only applies when the key is absent. If ray_remote_args contains an explicit None value (e.g., {"num_cpus": None}), the .get() returns None, and the subsequent comparison like num_cpus > 0 raises a TypeError. This is inconsistent with the custom resources handling at lines 404-405, which explicitly checks for and skips None values.
| and "node:__internal_head__" in node.get("Resources", {}) | ||
| ), | ||
| None, | ||
| ) |
There was a problem hiding this comment.
Validation checks head node but tasks scheduled elsewhere
Medium Severity
The validation function explicitly searches for the head node using node:__internal_head__ in resources, but the actual NodeAffinitySchedulingStrategy is set using ray.get_runtime_context().get_node_id(), which returns the current node (driver's node). If the driver is running on a non-head node (e.g., in a multi-node cluster where a script runs from a worker node), the validation checks resources on the wrong node. This could cause false positives (blocking operations that would succeed) or false negatives (allowing operations that will fail).
Additional Locations (2)
c590e9a to
8fd4696
Compare
iamjustinhsu
left a comment
There was a problem hiding this comment.
some small feedback and then I think we should be good to go
| ) -> None: | ||
| """Ensure the head node has enough resources before pinning work there. | ||
|
|
||
| Local paths (``local://``) and other driver-local I/O force tasks onto the |
There was a problem hiding this comment.
| Local paths (``local://``) and other driver-local I/O force tasks onto the | |
| Local paths (``local://``) and other driver-local I/O schedule tasks on the |
| @@ -743,6 +743,42 @@ def check_dataset_is_local(ds): | |||
| ).materialize() | |||
|
|
|||
|
|
|||
| def test_read_local_scheme_zero_head_resources(ray_start_cluster, tmp_path): | |||
There was a problem hiding this comment.
I think these tests test very similar things. What I recommend is just keep 1 of the tests and use _validate_head_node_resources_for_local_scheduling directly for testing (since it's not really testing read/write E2E), or you can pytest.parameterize? I prefer the 1st option since it's not high risk if missing, and can makes the testing a faster (since we only need one of the tests)
|
|
||
| required_resources: Dict[str, float] = {} | ||
| if num_cpus > 0: | ||
| required_resources["CPU"] = float(num_cpus) |
There was a problem hiding this comment.
I think we should add a comment here specifying that these keys come from ray.nodes() for future users
| memory = ray_remote_args.get("memory", required_memory) | ||
|
|
||
| required_resources: Dict[str, float] = {} | ||
| if num_cpus > 0: |
There was a problem hiding this comment.
thinking about this more, are these if num_cpus > 0 checks necessary? I think if they are 0, then we should catch it when building the insufficient dict (since we default to 0)
| amount = float(amount) | ||
| except (TypeError, ValueError) as err: | ||
| raise ValueError(f"Invalid resource amount for '{name}': {amount}") from err | ||
| if amount > 0: |
8fd4696 to
8d93588
Compare
367993e to
c19a569
Compare
Thanks! All points addressed. |
bveeramani
left a comment
There was a problem hiding this comment.
ty for the contribution!
(And ty @iamjustinhsu for shepherding)
| op_name: str, | ||
| required_num_cpus: int = 1, | ||
| required_num_gpus: int = 0, | ||
| required_memory: int = 0, |
There was a problem hiding this comment.
Nit: I felt confused by these names, because it wasn't clear from how these are different from the ray_remote_args. For example, if a user specifies ray_remote_args, aren't those the required logical resources for a task to execute?
I think these represent like default_num_cpus?
| def _validate_head_node_resources_for_local_scheduling( | ||
| ray_remote_args: Dict[str, Any], | ||
| *, | ||
| op_name: str, |
There was a problem hiding this comment.
Nit: This parameter might be confused with Operator.name.
| cluster.add_node(num_cpus=1) | ||
| cluster.wait_for_nodes() |
There was a problem hiding this comment.
Nit: Is this logic necessary for the assertion below? If not, would it make sense to remove it to simplify the test?
| from ray.data._internal.util import ( | ||
| _validate_head_node_resources_for_local_scheduling, | ||
| ) |
There was a problem hiding this comment.
Nit: Unless there's a circular import, move to top of file for consistency with the more-common Ray pattern of importing at the top of modules?
| @@ -368,6 +368,86 @@ def _is_local_scheme(paths: Union[str, List[str]]) -> bool: | |||
| return num == len(paths) | |||
|
|
|||
|
|
|||
| def _validate_head_node_resources_for_local_scheduling( | |||
There was a problem hiding this comment.
Nit: Consider moving this to ray.data.datasource.util if you think it makes more sense there than in the generic util file
| try: | ||
| amount = float(amount) | ||
| except (TypeError, ValueError) as err: | ||
| raise ValueError(f"Invalid resource amount for '{name}': {amount}") from err |
There was a problem hiding this comment.
No need to changes anything since I think this might conflict with an earlier review comment, but I felt surprised that we're validating custom resources types in this abstraction. I guess I would've assumed a valid ray_remote_args is more of a pre-condition and we validate ray_remote_args earlier, if at all
…source head node Signed-off-by: Haichuan Hu <kaisennhu@gmail.com>
c19a569 to
af4914c
Compare
…resource head node (ray-project#60709) ## Description When users read from or write to `local://` paths, Ray Data schedules these tasks on the head node using `NodeAffinitySchedulingStrategy(head_node_id, soft=False)`. If the head node has no logical resources (a recommended best practice to avoid head OOM), these tasks become unschedulable and produce a confusing error: ``` ray.exceptions.TaskUnschedulableError: The task is not schedulable: The node specified via NodeAffinitySchedulingStrategy doesn't exist any more or is infeasible, and soft=False was specified. task_id=..., task_name=Write ``` Add a clear, actionable error message that explains why the operation failed and how to resolve it. ## Solution - Add a helper `_validate_head_node_resources_for_local_scheduling` that inspects the final merged `ray_remote_args` and raises a clear, actionable `ValueError` when an operation pinned to the head node requires resources the head node lacks - Call this validation after `merge_resources_to_ray_remote_args` for head-pinned reads `read_datasource` and writes `Dataset.write_datasink`, so explicit user settings (e.g., `num_cpus=0`) are respected - Include regression tests that reproduce the zero-resource head-node + `local://` scenario and assert the descriptive error is raised @tianyi-ge ## Related issues Closes ray-project#60698 Signed-off-by: Haichuan Hu <kaisennhu@gmail.com> Signed-off-by: Adel Nour <ans9868@nyu.edu>
…resource head node (ray-project#60709) ## Description When users read from or write to `local://` paths, Ray Data schedules these tasks on the head node using `NodeAffinitySchedulingStrategy(head_node_id, soft=False)`. If the head node has no logical resources (a recommended best practice to avoid head OOM), these tasks become unschedulable and produce a confusing error: ``` ray.exceptions.TaskUnschedulableError: The task is not schedulable: The node specified via NodeAffinitySchedulingStrategy doesn't exist any more or is infeasible, and soft=False was specified. task_id=..., task_name=Write ``` Add a clear, actionable error message that explains why the operation failed and how to resolve it. ## Solution - Add a helper `_validate_head_node_resources_for_local_scheduling` that inspects the final merged `ray_remote_args` and raises a clear, actionable `ValueError` when an operation pinned to the head node requires resources the head node lacks - Call this validation after `merge_resources_to_ray_remote_args` for head-pinned reads `read_datasource` and writes `Dataset.write_datasink`, so explicit user settings (e.g., `num_cpus=0`) are respected - Include regression tests that reproduce the zero-resource head-node + `local://` scenario and assert the descriptive error is raised @tianyi-ge ## Related issues Closes ray-project#60698 Signed-off-by: Haichuan Hu <kaisennhu@gmail.com>


Description
When users read from or write to
local://paths, Ray Data schedules these tasks on the head node usingNodeAffinitySchedulingStrategy(head_node_id, soft=False). If the head node has no logical resources (a recommended best practice to avoid head OOM), these tasks become unschedulable and produce a confusing error:Add a clear, actionable error message that explains why the operation failed and how to resolve it.
Solution
_validate_head_node_resources_for_local_schedulingthat inspects the final mergedray_remote_argsand raises a clear, actionableValueErrorwhen an operation pinned to the head node requires resources the head node lacksmerge_resources_to_ray_remote_argsfor head-pinned readsread_datasourceand writesDataset.write_datasink, so explicit user settings (e.g.,num_cpus=0) are respectedlocal://scenario and assert the descriptive error is raised@tianyi-ge
Related issues
Closes #60698