[data] Adding Kafka datasink.#60307
Conversation
Signed-off-by: Justin Miller <justinrmiller@gmail.com>
There was a problem hiding this comment.
Code Review
This pull request introduces a KafkaDatasink for Ray Data, which is a valuable addition. The implementation is generally well-structured. I've identified a few areas for improvement to enhance robustness and maintainability. My feedback includes suggestions to refactor duplicated code, correct potentially buggy logic in object-to-dictionary conversion, add parameter validation, and fix an incorrect docstring example. Addressing these points will strengthen the new datasink implementation.
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Signed-off-by: Justin Miller <justinrmiller@users.noreply.github.com>
owenowenisme
left a comment
There was a problem hiding this comment.
Thanks for the contribution!
Signed-off-by: Justin Miller <justinrmiller@gmail.com>
Signed-off-by: Justin Miller <justinrmiller@gmail.com>
…n't happen again. Signed-off-by: Justin Miller <justinrmiller@gmail.com>
…er/ray into 58725-Kafka-Datasync
Signed-off-by: Justin Miller <justinrmiller@gmail.com>
…er/ray into 58725-Kafka-Datasync
|
@owenowenisme Do you think we should make the flush interval a configurable parameter? I went with 10k as I thought that would be a reasonable amount (guidance is generally to keep message sizes under 1.5 KB which is 15 MB max (10k * 1.5 KB). |
Signed-off-by: Justin Miller <justinrmiller@gmail.com>
There was a problem hiding this comment.
Code Review
This pull request introduces a Kafka Datasink for Ray Data, which is a great addition. The implementation is well-structured, with good error handling for producer buffer errors and delivery failures. The periodic polling to manage memory is also a thoughtful touch. The accompanying test suite is comprehensive, covering unit tests, integration tests, and various edge cases. I have one suggestion to improve code maintainability by refactoring duplicated serialization logic.
Note: Security Review is unavailable for this PR.
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
iamjustinhsu
left a comment
There was a problem hiding this comment.
nice work!! I'm not too too familiar with kafka, so i had some questions. But overall looks good
| return self._serialize(key, self.key_serializer) | ||
|
|
||
| def _extract_key(self, row_dict: Any) -> Optional[bytes]: | ||
| """Extract and encode message key from row dict.""" |
There was a problem hiding this comment.
can you add a comment/docstring on what None signifies, and if/how it affects message distribution?
| except BufferError: | ||
| # Internal queue is full, poll to serve callbacks | ||
| # and free space, then retry | ||
| producer.poll(_BUFFER_FULL_POLL_TIMEOUT_S) |
There was a problem hiding this comment.
if there are a lot of backlog messages that need to be delivered, will this line try to free up all space, and if so, do u know how long that will take? I'm wondering if this should be less blocking in the worst, like 5 seconds, but not sure
- Add SerializerFormat enum and extract _serialize as standalone function - Add _produce_with_retry method to encapsulate produce+retry-on-BufferError logic - Document _extract_key None return (default partitioner behavior) - Document constants rationale (_POLL_BATCH_SIZE sizing, configurability) - Add librdkafka CONFIGURATION.md link to producer_config docstring - Include topic name in KafkaException error message - Change write completion log from info to debug (fires once per task) - Make _row_to_dict a @staticmethod Signed-off-by: Justin Miller <justinrmiller@gmail.com>
|
Overall LGTM. One thing to note: we should document that the current implementation provides best-effort For future improvements, we could explore:
|
Signed-off-by: Justin Miller <justinrmiller@gmail.com>
| break | ||
| messages.append(msg) | ||
|
|
||
| return messages |
There was a problem hiding this comment.
Test never polls Kafka when expected count is zero
Low Severity
The consume_messages helper has a while len(messages) < expected_count loop condition. When expected_count=0, 0 < 0 is False, so the loop body never executes — the function returns an empty list immediately without ever polling Kafka. In test_write_kafka_empty_dataset, the assertion assert len(messages) == 0 is therefore trivially true regardless of whether messages were actually written, making the verification meaningless.
Additional Locations (1)
| except KafkaException as e: | ||
| raise RuntimeError( | ||
| f"Failed to write to Kafka topic '{self.topic}': {e}" | ||
| ) from e |
There was a problem hiding this comment.
Dead exception handler catches nothing in try block
Low Severity
The except KafkaException handler in write() is unreachable. Within the try block, producer.produce() is asynchronous and reports errors via the on_delivery callback (not by raising). producer.poll() and producer.flush() also don't raise KafkaException. All other operations are Ray/Python-internal and can't produce a KafkaException. This creates a false sense of error handling — a real KafkaException from the constructor on line 236 occurs before the try block and would propagate uncaught.
|
@owenowenisme Are we good to go on this one? Thanks! |
| topic_partitions = [ | ||
| TopicPartition(topic, p, 0) for p in topic_meta.partitions.keys() | ||
| ] | ||
| consumer.assign(topic_partitions) |
There was a problem hiding this comment.
Stale consumer assignment when topic metadata missing
Medium Severity
The consume_messages helper only calls consumer.assign() when topic metadata is found (if topic_meta and topic_meta.partitions). Since the kafka_consumer fixture is session-scoped and shared across all integration tests, if topic metadata is not found for a given test, assign() is never called, and the consumer retains the partition assignment from a previous test. This means it would poll from the wrong topic, returning stale messages and causing misleading test results instead of a clean failure.
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
| except KafkaException as e: | ||
| raise RuntimeError( | ||
| f"Failed to write to Kafka topic '{self.topic}': {e}" | ||
| ) from e |
There was a problem hiding this comment.
Missing producer flush on non-KafkaException error paths
Medium Severity
The try/except KafkaException block in write() only catches KafkaException. If _produce_with_retry raises RuntimeError (persistent BufferError) or serialization raises TypeError (e.g., numpy types from Pandas blocks not being JSON-serializable), producer.flush() is never called. Messages already successfully enqueued in the producer's internal buffer before the error are silently dropped without any delivery attempt. A try/finally wrapping the producer.flush() call would ensure buffered messages get a chance to be delivered even on unexpected error paths.
There was a problem hiding this comment.
Not a bug. Flushing on error paths would be worse — the task has failed and Ray Data may retry it from scratch. Flushing a partial batch means the retry produces duplicates for the already-delivered prefix. Skipping flush minimizes the partial write window, which is the right behavior. The class docstring already documents the at-most-once-per-attempt delivery semantics.
### Description
This PR introduces a Kafka `Datasink` for Ray Data, allowing users to
write Ray Datasets directly to Apache Kafka topics. The implementation
leverages Confluent's Kafka python library and supports distributed
writes with configurable serialization, key extraction, and performance
tuning (e.g., periodic flushing to manage memory).
Signed-off-by: ryanaoleary <ryanaoleary@google.com>
---
### Key Changes
#### `ray.data._internal.datasource.kafka_datasink.py`
- **New `KafkaDatasink` Class**: Implements the `Datasink` interface to
handle parallel writes across Ray workers.
- **Serialization Support**: Provides built-in support for `json`,
`string`, and `bytes` for both message keys and values.
- **Smart Row Conversion**: Includes a logic handler to convert various
Ray row types (Dict, ArrowRow, PandasRow, NamedTuple) into serializable
formats.
- **Memory Management**:
- * Implements a `_FLUSH_INTERVAL` (set to 10,000) to periodically flush
the producer and wait for acknowledgments, preventing memory exhaustion
from too many un-flushed futures.
- * Handles `BufferError` by flushing and retrying the send operation.
- **Error Handling**: Captures and reports the number of failed
messages, ensuring that the first encountered exception is chained to
the final `RuntimeError` for easier debugging.
#### `ray.data.dataset.py`
* **`write_kafka()` Method**: Added as a top-level convenience method on
the `Dataset` class.
* **API Exposure**: Decorated with `@ConsumptionAPI`, allowing users to
call `ds.write_kafka(...)` with standard Ray remote arguments and
concurrency controls.
#### `ray.data.tests.datasource.test_kafka.py`
* **Unit Tests**: Validates initialization, row-to-dict conversion, and
serialization logic in isolation.
* **Integration Tests**: Comprehensive test suite (requires a Kafka
broker) covering:
* Basic writes and multi-block writes.
* Key extraction and custom serializers.
* Producer configurations and delivery callbacks.
* Edge cases like empty datasets, null values, and invalid connection
strings.
---
### Basic Usage
```python
import ray
ds = ray.data.range(100)
ds.write_kafka(
topic="my-topic",
bootstrap_servers="localhost:9092",
key_field="id",
key_serializer="string",
value_serializer="json"
)
```
## Related issues
Closes ray-project#58725
---------
Signed-off-by: Justin Miller <justinrmiller@gmail.com>
Signed-off-by: Justin Miller <justinrmiller@users.noreply.github.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Co-authored-by: You-Cheng Lin <mses010108@gmail.com>
### Description
This PR introduces a Kafka `Datasink` for Ray Data, allowing users to
write Ray Datasets directly to Apache Kafka topics. The implementation
leverages Confluent's Kafka python library and supports distributed
writes with configurable serialization, key extraction, and performance
tuning (e.g., periodic flushing to manage memory).
---
### Key Changes
#### `ray.data._internal.datasource.kafka_datasink.py`
- **New `KafkaDatasink` Class**: Implements the `Datasink` interface to
handle parallel writes across Ray workers.
- **Serialization Support**: Provides built-in support for `json`,
`string`, and `bytes` for both message keys and values.
- **Smart Row Conversion**: Includes a logic handler to convert various
Ray row types (Dict, ArrowRow, PandasRow, NamedTuple) into serializable
formats.
- **Memory Management**:
- * Implements a `_FLUSH_INTERVAL` (set to 10,000) to periodically flush
the producer and wait for acknowledgments, preventing memory exhaustion
from too many un-flushed futures.
- * Handles `BufferError` by flushing and retrying the send operation.
- **Error Handling**: Captures and reports the number of failed
messages, ensuring that the first encountered exception is chained to
the final `RuntimeError` for easier debugging.
#### `ray.data.dataset.py`
* **`write_kafka()` Method**: Added as a top-level convenience method on
the `Dataset` class.
* **API Exposure**: Decorated with `@ConsumptionAPI`, allowing users to
call `ds.write_kafka(...)` with standard Ray remote arguments and
concurrency controls.
#### `ray.data.tests.datasource.test_kafka.py`
* **Unit Tests**: Validates initialization, row-to-dict conversion, and
serialization logic in isolation.
* **Integration Tests**: Comprehensive test suite (requires a Kafka
broker) covering:
* Basic writes and multi-block writes.
* Key extraction and custom serializers.
* Producer configurations and delivery callbacks.
* Edge cases like empty datasets, null values, and invalid connection
strings.
---
### Basic Usage
```python
import ray
ds = ray.data.range(100)
ds.write_kafka(
topic="my-topic",
bootstrap_servers="localhost:9092",
key_field="id",
key_serializer="string",
value_serializer="json"
)
```
## Related issues
Closes ray-project#58725
---------
Signed-off-by: Justin Miller <justinrmiller@gmail.com>
Signed-off-by: Justin Miller <justinrmiller@users.noreply.github.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Co-authored-by: You-Cheng Lin <mses010108@gmail.com>


Description
This PR introduces a Kafka
Datasinkfor Ray Data, allowing users to write Ray Datasets directly to Apache Kafka topics. The implementation leverages Confluent's Kafka python library and supports distributed writes with configurable serialization, key extraction, and performance tuning (e.g., periodic flushing to manage memory).Key Changes
ray.data._internal.datasource.kafka_datasink.pyNew
KafkaDatasinkClass: Implements theDatasinkinterface to handle parallel writes across Ray workers.Serialization Support: Provides built-in support for
json,string, andbytesfor both message keys and values.Smart Row Conversion: Includes a logic handler to convert various Ray row types (Dict, ArrowRow, PandasRow, NamedTuple) into serializable formats.
Memory Management:
_FLUSH_INTERVAL(set to 10,000) to periodically flush the producer and wait for acknowledgments, preventing memory exhaustion from too many un-flushed futures.BufferErrorby flushing and retrying the send operation.Error Handling: Captures and reports the number of failed messages, ensuring that the first encountered exception is chained to the final
RuntimeErrorfor easier debugging.ray.data.dataset.pywrite_kafka()Method: Added as a top-level convenience method on theDatasetclass.@ConsumptionAPI, allowing users to callds.write_kafka(...)with standard Ray remote arguments and concurrency controls.ray.data.tests.datasource.test_kafka.pyBasic Usage
Related issues
Closes #58725