Skip to content

[Data] Remove default task-level timeout and clamp end_offset to watermark in Kafka datasource#61476

Merged
bveeramani merged 2 commits into
ray-project:masterfrom
owenowenisme:data/make-read-kafka-timeout-default-to-none
Mar 5, 2026
Merged

[Data] Remove default task-level timeout and clamp end_offset to watermark in Kafka datasource#61476
bveeramani merged 2 commits into
ray-project:masterfrom
owenowenisme:data/make-read-kafka-timeout-default-to-none

Conversation

@owenowenisme

Copy link
Copy Markdown
Member

Description

  • Default timeout_ms to None (no task-level timeout) for read_kafka. Each consumer.consume() call
    is already capped at 10s, which is sufficient.
  • Clamp end_offset to the high watermark during offset resolution so reads never target offsets
    beyond available data.
  • Raise ValueError if start_offset exceeds the clamped end_offset.
  • Simplify the read loop by removing per-consume timeout shrinking logic.

Related issues

Link related issues: "Fixes #1234", "Closes #1234", or "Related to #1234".

Additional information

Optional: Add implementation details, API changes, usage examples, screenshots, etc.

Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
@owenowenisme owenowenisme requested a review from a team as a code owner March 4, 2026 03:42
@owenowenisme owenowenisme added the data Ray Data-related issues label Mar 4, 2026

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces several improvements to the Kafka datasource. It removes the default task-level timeout, simplifying the timeout logic by relying on a fixed 10-second timeout for each consume call. A key change is clamping the end_offset to the high watermark, which robustly prevents read tasks from hanging when trying to read beyond available data. The logic for handling invalid offset ranges is also improved, now raising a ValueError if the start_offset exceeds the clamped end_offset. The code is cleaner, more robust, and the accompanying test changes correctly validate the new behavior. Overall, these are excellent changes.

Note: Security Review did not run due to the size of the PR.

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Comment thread python/ray/data/_internal/datasource/kafka_datasource.py Outdated
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
@bveeramani bveeramani self-assigned this Mar 4, 2026

@slfan1989 slfan1989 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally agree, except for one small point.

kafka_auth_config: Optional[KafkaAuthConfig] = None,
consumer_config: Optional[Dict[str, Any]] = None,
timeout_ms: int = 10000,
timeout_ms: Optional[int] = None,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it expected that the timeout changes from 10s to None?

Consider keeping timeout_ms=10000 as default for backward compatibility?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@owenowenisme did the primary consumer of this API request we change the default?

@slfan1989 this API is labeled as stability="alpha", so we can technically break the defaults while still honoring our backwards-compatibility obligations, though it's not ideal

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much for the explanation, LGTM.

@bveeramani bveeramani added the go add ONLY when ready to merge, run all tests label Mar 4, 2026
@bveeramani bveeramani removed their assignment Mar 4, 2026
@bveeramani bveeramani merged commit 5d3a8a3 into ray-project:master Mar 5, 2026
7 checks passed
ParagEkbote pushed a commit to ParagEkbote/ray that referenced this pull request Mar 10, 2026
…termark in Kafka datasource (ray-project#61476)

## Description
- Default `timeout_ms` to None (no task-level timeout) for read_kafka.
Each consumer.consume() call
   is already capped at 10s, which is sufficient.
- Clamp `end_offset` to the high watermark during offset resolution so
reads never target offsets
  beyond available data.
- Raise ValueError if `start_offset` exceeds the clamped `end_offset`.
- Simplify the read loop by removing per-consume timeout shrinking
logic.

## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: Parag Ekbote <thecoolekbote189@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

3 participants