Skip to content

[data] Adding Kafka datasink.#60307

Merged
richardliaw merged 27 commits into
ray-project:masterfrom
justinrmiller:58725-Kafka-Datasync
Mar 18, 2026
Merged

[data] Adding Kafka datasink.#60307
richardliaw merged 27 commits into
ray-project:masterfrom
justinrmiller:58725-Kafka-Datasync

Conversation

@justinrmiller

@justinrmiller justinrmiller commented Jan 20, 2026

Copy link
Copy Markdown
Contributor

Description

This PR introduces a Kafka Datasink for Ray Data, allowing users to write Ray Datasets directly to Apache Kafka topics. The implementation leverages Confluent's Kafka python library and supports distributed writes with configurable serialization, key extraction, and performance tuning (e.g., periodic flushing to manage memory).


Key Changes

ray.data._internal.datasource.kafka_datasink.py

  • New KafkaDatasink Class: Implements the Datasink interface to handle parallel writes across Ray workers.

  • Serialization Support: Provides built-in support for json, string, and bytes for both message keys and values.

  • Smart Row Conversion: Includes a logic handler to convert various Ray row types (Dict, ArrowRow, PandasRow, NamedTuple) into serializable formats.

  • Memory Management:

    • Implements a _FLUSH_INTERVAL (set to 10,000) to periodically flush the producer and wait for acknowledgments, preventing memory exhaustion from too many un-flushed futures.
    • Handles BufferError by flushing and retrying the send operation.
  • Error Handling: Captures and reports the number of failed messages, ensuring that the first encountered exception is chained to the final RuntimeError for easier debugging.

ray.data.dataset.py

  • write_kafka() Method: Added as a top-level convenience method on the Dataset class.
  • API Exposure: Decorated with @ConsumptionAPI, allowing users to call ds.write_kafka(...) with standard Ray remote arguments and concurrency controls.

ray.data.tests.datasource.test_kafka.py

  • Unit Tests: Validates initialization, row-to-dict conversion, and serialization logic in isolation.
  • Integration Tests: Comprehensive test suite (requires a Kafka broker) covering:
  • Basic writes and multi-block writes.
  • Key extraction and custom serializers.
  • Producer configurations and delivery callbacks.
  • Edge cases like empty datasets, null values, and invalid connection strings.

Basic Usage

import ray

ds = ray.data.range(100)
ds.write_kafka(
    topic="my-topic", 
    bootstrap_servers="localhost:9092",
    key_field="id",
    key_serializer="string",
    value_serializer="json"
)

Related issues

Closes #58725

Signed-off-by: Justin Miller <justinrmiller@gmail.com>
@justinrmiller justinrmiller requested a review from a team as a code owner January 20, 2026 00:59

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a KafkaDatasink for Ray Data, which is a valuable addition. The implementation is generally well-structured. I've identified a few areas for improvement to enhance robustness and maintainability. My feedback includes suggestions to refactor duplicated code, correct potentially buggy logic in object-to-dictionary conversion, add parameter validation, and fix an incorrect docstring example. Addressing these points will strengthen the new datasink implementation.

Comment thread python/ray/data/_internal/datasource/kafka_datasink.py Outdated
Comment thread python/ray/data/_internal/datasource/kafka_datasink.py Outdated
Comment thread python/ray/data/_internal/datasource/kafka_datasink.py Outdated
Comment thread python/ray/data/dataset.py Outdated
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Signed-off-by: Justin Miller <justinrmiller@users.noreply.github.com>
Comment thread python/ray/data/dataset.py Outdated
Comment thread python/ray/data/_internal/datasource/kafka_datasink.py Outdated
@ray-gardener ray-gardener Bot added data Ray Data-related issues community-contribution Contributed by the community labels Jan 20, 2026

@owenowenisme owenowenisme left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution!

Comment thread python/ray/data/_internal/datasource/kafka_datasink.py Outdated
Comment thread python/ray/data/_internal/datasource/kafka_datasink.py Outdated
Comment thread python/ray/data/_internal/datasource/kafka_datasink.py Outdated
Comment thread python/ray/data/_internal/datasource/kafka_datasink.py Outdated
Comment thread python/ray/data/_internal/datasource/kafka_datasink.py Outdated
Comment thread python/ray/data/_internal/datasource/kafka_datasink.py
Justin Miller added 2 commits February 4, 2026 20:40
Signed-off-by: Justin Miller <justinrmiller@gmail.com>
Signed-off-by: Justin Miller <justinrmiller@gmail.com>
Comment thread python/ray/data/_internal/datasource/kafka_datasink.py Outdated
Justin Miller added 2 commits February 4, 2026 20:57
…n't happen again.

Signed-off-by: Justin Miller <justinrmiller@gmail.com>
Comment thread python/ray/data/_internal/datasource/kafka_datasink.py Outdated
Comment thread python/ray/data/_internal/datasource/kafka_datasink.py Outdated
Comment thread python/ray/data/_internal/datasource/kafka_datasink.py Outdated
Comment thread python/ray/data/_internal/datasource/kafka_datasink.py Outdated
Comment thread python/ray/data/dataset.py Outdated
Comment thread python/ray/data/dataset.py Outdated
Comment thread python/ray/data/tests/datasource/test_kafka_datasink.py Outdated
Comment thread python/ray/data/_internal/datasource/kafka_datasink.py Outdated
Comment thread python/ray/data/_internal/datasource/kafka_datasink.py Outdated
Comment thread python/ray/data/_internal/datasource/kafka_datasink.py Outdated
Comment thread python/ray/data/_internal/datasource/kafka_datasink.py Outdated
Comment thread python/ray/data/dataset.py Outdated
Comment thread python/ray/data/_internal/datasource/kafka_datasink.py
Comment thread python/ray/data/_internal/datasource/kafka_datasink.py Outdated
Comment thread python/ray/data/_internal/datasource/kafka_datasink.py Outdated
Comment thread python/ray/data/dataset.py Outdated
Comment thread python/ray/data/_internal/datasource/kafka_datasink.py Outdated
Comment thread python/ray/data/_internal/datasource/kafka_datasink.py Outdated
Comment thread python/ray/data/dataset.py Outdated
Comment thread python/ray/data/_internal/datasource/kafka_datasink.py Outdated
Comment thread python/ray/data/_internal/datasource/kafka_datasink.py Outdated
Comment thread python/ray/data/dataset.py Outdated
Justin Miller added 2 commits February 21, 2026 13:30
Signed-off-by: Justin Miller <justinrmiller@gmail.com>
@justinrmiller

Copy link
Copy Markdown
Contributor Author

@owenowenisme Do you think we should make the flush interval a configurable parameter? I went with 10k as I thought that would be a reasonable amount (guidance is generally to keep message sizes under 1.5 KB which is 15 MB max (10k * 1.5 KB).

Comment thread python/ray/data/_internal/datasource/kafka_datasink.py Outdated
Comment thread python/ray/data/_internal/datasource/kafka_datasink.py Outdated
Signed-off-by: Justin Miller <justinrmiller@gmail.com>
Comment thread python/ray/data/_internal/datasource/kafka_datasink.py Outdated

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a Kafka Datasink for Ray Data, which is a great addition. The implementation is well-structured, with good error handling for producer buffer errors and delivery failures. The periodic polling to manage memory is also a thoughtful touch. The accompanying test suite is comprehensive, covering unit tests, integration tests, and various edge cases. I have one suggestion to improve code maintainability by refactoring duplicated serialization logic.

Note: Security Review is unavailable for this PR.

Comment thread python/ray/data/_internal/datasource/kafka_datasink.py Outdated
Comment thread python/ray/data/_internal/datasource/kafka_datasink.py Outdated
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
@owenowenisme owenowenisme added the go add ONLY when ready to merge, run all tests label Mar 9, 2026
Comment thread python/ray/data/_internal/datasource/kafka_datasink.py Outdated

@iamjustinhsu iamjustinhsu left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice work!! I'm not too too familiar with kafka, so i had some questions. But overall looks good

Comment thread python/ray/data/_internal/datasource/kafka_datasink.py Outdated
Comment thread python/ray/data/_internal/datasource/kafka_datasink.py Outdated
return self._serialize(key, self.key_serializer)

def _extract_key(self, row_dict: Any) -> Optional[bytes]:
"""Extract and encode message key from row dict."""

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a comment/docstring on what None signifies, and if/how it affects message distribution?

Comment thread python/ray/data/_internal/datasource/kafka_datasink.py
Comment thread python/ray/data/_internal/datasource/kafka_datasink.py Outdated
except BufferError:
# Internal queue is full, poll to serve callbacks
# and free space, then retry
producer.poll(_BUFFER_FULL_POLL_TIMEOUT_S)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if there are a lot of backlog messages that need to be delivered, will this line try to free up all space, and if so, do u know how long that will take? I'm wondering if this should be less blocking in the worst, like 5 seconds, but not sure

Comment thread python/ray/data/_internal/datasource/kafka_datasink.py Outdated
Comment thread python/ray/data/_internal/datasource/kafka_datasink.py
Comment thread python/ray/data/_internal/datasource/kafka_datasink.py
Comment thread python/ray/data/_internal/datasource/kafka_datasink.py Outdated
- Add SerializerFormat enum and extract _serialize as standalone function
- Add _produce_with_retry method to encapsulate produce+retry-on-BufferError logic
- Document _extract_key None return (default partitioner behavior)
- Document constants rationale (_POLL_BATCH_SIZE sizing, configurability)
- Add librdkafka CONFIGURATION.md link to producer_config docstring
- Include topic name in KafkaException error message
- Change write completion log from info to debug (fires once per task)
- Make _row_to_dict a @staticmethod

Signed-off-by: Justin Miller <justinrmiller@gmail.com>

@iamjustinhsu iamjustinhsu left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm!

@owenowenisme

owenowenisme commented Mar 12, 2026

Copy link
Copy Markdown
Member

Overall LGTM. One thing to note: we should document that the current implementation provides best-effort
delivery only — partial writes can occur if a task fails midway, and duplicates are possible on system-error
task retries (e.g., node failures).

For future improvements, we could explore:

  1. Per-task Kafka transactions — to guarantee atomicity within each write task (no partial messages from failed
    tasks, no duplicates on retry)
  2. Checkpoint-based delivery tracking — to enable resumable writes and stronger end-to-end guarantees
Comment thread python/ray/data/_internal/datasource/kafka_datasink.py Outdated
Signed-off-by: Justin Miller <justinrmiller@gmail.com>
break
messages.append(msg)

return messages

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test never polls Kafka when expected count is zero

Low Severity

The consume_messages helper has a while len(messages) < expected_count loop condition. When expected_count=0, 0 < 0 is False, so the loop body never executes — the function returns an empty list immediately without ever polling Kafka. In test_write_kafka_empty_dataset, the assertion assert len(messages) == 0 is therefore trivially true regardless of whether messages were actually written, making the verification meaningless.

Additional Locations (1)
Fix in Cursor Fix in Web
except KafkaException as e:
raise RuntimeError(
f"Failed to write to Kafka topic '{self.topic}': {e}"
) from e

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dead exception handler catches nothing in try block

Low Severity

The except KafkaException handler in write() is unreachable. Within the try block, producer.produce() is asynchronous and reports errors via the on_delivery callback (not by raising). producer.poll() and producer.flush() also don't raise KafkaException. All other operations are Ray/Python-internal and can't produce a KafkaException. This creates a false sense of error handling — a real KafkaException from the constructor on line 236 occurs before the try block and would propagate uncaught.

Fix in Cursor Fix in Web
@justinrmiller

Copy link
Copy Markdown
Contributor Author

@owenowenisme Are we good to go on this one? Thanks!

topic_partitions = [
TopicPartition(topic, p, 0) for p in topic_meta.partitions.keys()
]
consumer.assign(topic_partitions)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stale consumer assignment when topic metadata missing

Medium Severity

The consume_messages helper only calls consumer.assign() when topic metadata is found (if topic_meta and topic_meta.partitions). Since the kafka_consumer fixture is session-scoped and shared across all integration tests, if topic metadata is not found for a given test, assign() is never called, and the consumer retains the partition assignment from a previous test. This means it would poll from the wrong topic, returning stale messages and causing misleading test results instead of a clean failure.

Fix in Cursor Fix in Web
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>

@owenowenisme owenowenisme left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

There are 4 total unresolved issues (including 3 from previous reviews).

Fix All in Cursor

except KafkaException as e:
raise RuntimeError(
f"Failed to write to Kafka topic '{self.topic}': {e}"
) from e

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing producer flush on non-KafkaException error paths

Medium Severity

The try/except KafkaException block in write() only catches KafkaException. If _produce_with_retry raises RuntimeError (persistent BufferError) or serialization raises TypeError (e.g., numpy types from Pandas blocks not being JSON-serializable), producer.flush() is never called. Messages already successfully enqueued in the producer's internal buffer before the error are silently dropped without any delivery attempt. A try/finally wrapping the producer.flush() call would ensure buffered messages get a chance to be delivered even on unexpected error paths.

Fix in Cursor Fix in Web

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a bug. Flushing on error paths would be worse — the task has failed and Ray Data may retry it from scratch. Flushing a partial batch means the retry produces duplicates for the already-delivered prefix. Skipping flush minimizes the partial write window, which is the right behavior. The class docstring already documents the at-most-once-per-attempt delivery semantics.

@richardliaw richardliaw changed the title Adding Kafka datasink. Mar 18, 2026
@richardliaw richardliaw merged commit 0951139 into ray-project:master Mar 18, 2026
6 checks passed
ryanaoleary pushed a commit to ryanaoleary/ray that referenced this pull request Mar 25, 2026
### Description

This PR introduces a Kafka `Datasink` for Ray Data, allowing users to
write Ray Datasets directly to Apache Kafka topics. The implementation
leverages Confluent's Kafka python library and supports distributed
writes with configurable serialization, key extraction, and performance
tuning (e.g., periodic flushing to manage memory).

Signed-off-by: ryanaoleary <ryanaoleary@google.com>

---

### Key Changes

#### `ray.data._internal.datasource.kafka_datasink.py`

- **New `KafkaDatasink` Class**: Implements the `Datasink` interface to
handle parallel writes across Ray workers.

- **Serialization Support**: Provides built-in support for `json`,
`string`, and `bytes` for both message keys and values.

- **Smart Row Conversion**: Includes a logic handler to convert various
Ray row types (Dict, ArrowRow, PandasRow, NamedTuple) into serializable
formats.

- **Memory Management**:
- * Implements a `_FLUSH_INTERVAL` (set to 10,000) to periodically flush
the producer and wait for acknowledgments, preventing memory exhaustion
from too many un-flushed futures.
- * Handles `BufferError` by flushing and retrying the send operation.


- **Error Handling**: Captures and reports the number of failed
messages, ensuring that the first encountered exception is chained to
the final `RuntimeError` for easier debugging.

#### `ray.data.dataset.py`

* **`write_kafka()` Method**: Added as a top-level convenience method on
the `Dataset` class.
* **API Exposure**: Decorated with `@ConsumptionAPI`, allowing users to
call `ds.write_kafka(...)` with standard Ray remote arguments and
concurrency controls.

#### `ray.data.tests.datasource.test_kafka.py`

* **Unit Tests**: Validates initialization, row-to-dict conversion, and
serialization logic in isolation.
* **Integration Tests**: Comprehensive test suite (requires a Kafka
broker) covering:
* Basic writes and multi-block writes.
* Key extraction and custom serializers.
* Producer configurations and delivery callbacks.
* Edge cases like empty datasets, null values, and invalid connection
strings.



---

### Basic Usage

```python
import ray

ds = ray.data.range(100)
ds.write_kafka(
    topic="my-topic", 
    bootstrap_servers="localhost:9092",
    key_field="id",
    key_serializer="string",
    value_serializer="json"
)

```

## Related issues
Closes ray-project#58725

---------

Signed-off-by: Justin Miller <justinrmiller@gmail.com>
Signed-off-by: Justin Miller <justinrmiller@users.noreply.github.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Co-authored-by: You-Cheng Lin <mses010108@gmail.com>
Lucas61000 pushed a commit to Lucas61000/ray that referenced this pull request May 15, 2026
### Description

This PR introduces a Kafka `Datasink` for Ray Data, allowing users to
write Ray Datasets directly to Apache Kafka topics. The implementation
leverages Confluent's Kafka python library and supports distributed
writes with configurable serialization, key extraction, and performance
tuning (e.g., periodic flushing to manage memory).

---

### Key Changes

#### `ray.data._internal.datasource.kafka_datasink.py`

- **New `KafkaDatasink` Class**: Implements the `Datasink` interface to
handle parallel writes across Ray workers.

- **Serialization Support**: Provides built-in support for `json`,
`string`, and `bytes` for both message keys and values.

- **Smart Row Conversion**: Includes a logic handler to convert various
Ray row types (Dict, ArrowRow, PandasRow, NamedTuple) into serializable
formats.

- **Memory Management**:
- * Implements a `_FLUSH_INTERVAL` (set to 10,000) to periodically flush
the producer and wait for acknowledgments, preventing memory exhaustion
from too many un-flushed futures.
- * Handles `BufferError` by flushing and retrying the send operation.


- **Error Handling**: Captures and reports the number of failed
messages, ensuring that the first encountered exception is chained to
the final `RuntimeError` for easier debugging.

#### `ray.data.dataset.py`

* **`write_kafka()` Method**: Added as a top-level convenience method on
the `Dataset` class.
* **API Exposure**: Decorated with `@ConsumptionAPI`, allowing users to
call `ds.write_kafka(...)` with standard Ray remote arguments and
concurrency controls.

#### `ray.data.tests.datasource.test_kafka.py`

* **Unit Tests**: Validates initialization, row-to-dict conversion, and
serialization logic in isolation.
* **Integration Tests**: Comprehensive test suite (requires a Kafka
broker) covering:
* Basic writes and multi-block writes.
* Key extraction and custom serializers.
* Producer configurations and delivery callbacks.
* Edge cases like empty datasets, null values, and invalid connection
strings.



---

### Basic Usage

```python
import ray

ds = ray.data.range(100)
ds.write_kafka(
    topic="my-topic", 
    bootstrap_servers="localhost:9092",
    key_field="id",
    key_serializer="string",
    value_serializer="json"
)

```

## Related issues
Closes ray-project#58725

---------

Signed-off-by: Justin Miller <justinrmiller@gmail.com>
Signed-off-by: Justin Miller <justinrmiller@users.noreply.github.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Co-authored-by: You-Cheng Lin <mses010108@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-contribution Contributed by the community data Ray Data-related issues go add ONLY when ready to merge, run all tests

4 participants