[Data] Remove default task-level timeout and clamp end_offset to watermark in Kafka datasource#61476
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces several improvements to the Kafka datasource. It removes the default task-level timeout, simplifying the timeout logic by relying on a fixed 10-second timeout for each consume call. A key change is clamping the end_offset to the high watermark, which robustly prevents read tasks from hanging when trying to read beyond available data. The logic for handling invalid offset ranges is also improved, now raising a ValueError if the start_offset exceeds the clamped end_offset. The code is cleaner, more robust, and the accompanying test changes correctly validate the new behavior. Overall, these are excellent changes.
Note: Security Review did not run due to the size of the PR.
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
slfan1989
left a comment
There was a problem hiding this comment.
Generally agree, except for one small point.
| kafka_auth_config: Optional[KafkaAuthConfig] = None, | ||
| consumer_config: Optional[Dict[str, Any]] = None, | ||
| timeout_ms: int = 10000, | ||
| timeout_ms: Optional[int] = None, |
There was a problem hiding this comment.
Is it expected that the timeout changes from 10s to None?
Consider keeping timeout_ms=10000 as default for backward compatibility?
There was a problem hiding this comment.
@owenowenisme did the primary consumer of this API request we change the default?
@slfan1989 this API is labeled as stability="alpha", so we can technically break the defaults while still honoring our backwards-compatibility obligations, though it's not ideal
There was a problem hiding this comment.
Thank you very much for the explanation, LGTM.
…termark in Kafka datasource (ray-project#61476) ## Description - Default `timeout_ms` to None (no task-level timeout) for read_kafka. Each consumer.consume() call is already capped at 10s, which is sufficient. - Clamp `end_offset` to the high watermark during offset resolution so reads never target offsets beyond available data. - Raise ValueError if `start_offset` exceeds the clamped `end_offset`. - Simplify the read loop by removing per-consume timeout shrinking logic. ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: You-Cheng Lin <mses010108@gmail.com> Signed-off-by: Parag Ekbote <thecoolekbote189@gmail.com>
Description
timeout_msto None (no task-level timeout) for read_kafka. Each consumer.consume() callis already capped at 10s, which is sufficient.
end_offsetto the high watermark during offset resolution so reads never target offsetsbeyond available data.
start_offsetexceeds the clampedend_offset.Related issues
Additional information