[serve] [taskiq - 2/n] Add multi-broker Taskiq adapter config and initialization#61008
Merged
Conversation
Contributor
|
Warning You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again! |
Add a Taskiq-based task processor adapter as an alternative to Celery, with native async/await support and at-least-once delivery via Redis Streams. Components: - TaskiqAdapterConfig: Pydantic config (broker_url, result_backend, consumer_group, idle_timeout) - TaskiqTaskProcessorAdapter: Full adapter implementing TaskProcessorAdapter interface (init, register, enqueue, consume, status, shutdown) Includes E2E tests for: basic task processing, multiple tasks, multiple handlers, and cancel_task NotImplementedError. Retry middleware (SmartRetryMiddleware) and dead letter queue (DLQMiddleware) will be added in a follow-up PR. Signed-off-by: harshit <harshit@anyscale.com> Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> [serve] [taskiq - 2/n] Add multi-broker config and initialization for Taskiq adapter Restructure TaskiqAdapterConfig to support multiple Taskiq brokers (Redis Streams, Redis Cluster, Redis Sentinel, RabbitMQ, NATS, Kafka) via broker_type + broker_kwargs pass-through pattern. This follows the same approach as CeleryAdapterConfig — broker-specific params are passed directly to the broker constructor without maintaining per-broker config classes. Key changes: - TaskiqAdapterConfig: broker_type (required), broker_kwargs (optional), result_backend_url (optional) - Broker registry with lazy imports, queue param mapping, and required kwargs validation - TaskiqTaskProcessorAdapter with __init__ + initialize() implemented; remaining methods stubbed for follow-up PR Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
4a4898e to
7877a29
Compare
Switch from direct pydantic import to ray._common.pydantic_compat to match all other Serve config classes and avoid serialization mismatches in pydantic v2 environments. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Signed-off-by: harshit <harshit@anyscale.com>
…oncurrency - Mock importlib.import_module in tests so CI doesn't need taskiq-redis installed - Store consumer_concurrency in initialize() for use by start_consumer() in follow-up PR Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
abrarsheikh
approved these changes
Feb 12, 2026
Signed-off-by: harshit <harshit@anyscale.com>
abrarsheikh
approved these changes
Feb 12, 2026
ans9868
pushed a commit
to ans9868/ray
that referenced
this pull request
Feb 18, 2026
…tialization (ray-project#61008) ## Summary - Adds **TaskiqAdapterConfig** and **TaskiqTaskProcessorAdapter** (config + initialization only) for async task processing in Ray Serve - Full adapter implementation (enqueue, consume, shutdown) coming in follow-up PR ## Design: Why `broker_type` + `broker_kwargs` We evaluated several approaches for supporting multiple Taskiq brokers. In taskiq, for each broker, there is a separate class that we need to initialize, so somewhere we need to identify the broker type and accordingly call that broker's class in taskiq: ### Approaches considered 1. **Per-broker config classes with discriminated union** — A typed Pydantic config class per broker (e.g., `RedisStreamBrokerConfig`, `RabbitMQBrokerConfig`) with `create_broker()` factory methods, combined via a Pydantic discriminated union. 2. **`broker_type` + `broker_url` + `broker_kwargs`** — A single flat config with the connection URL as a top-level field and everything else passed through a dict. 3. **`broker_type` + `broker_kwargs` (chosen)** — A single flat config where *all* broker constructor arguments (including connection URLs) are passed through `broker_kwargs`. ### Why we chose approach 3 **Per-broker config classes (approach 1) had a maintenance problem:** - Each broker has a unique constructor signature with different parameter names, types, and defaults - We'd need to mirror every broker's defaults accurately in their respective adapter config classes - Every upstream broker update could silently break our config classes - Taskiq-redis alone has **9 broker classes** — maintaining typed configs for all of them is not scalable **Separating `broker_url` (approach 2) didn't work uniformly:** - Different brokers use different parameter names for the connection: `url` (Redis, RabbitMQ), `servers` (NATS), `bootstrap_servers` (Kafka) - Redis Sentinel brokers don't take a URL at all — they require `sentinels: list[tuple[str, int]]` + `master_name: str` - This forced us to maintain a URL-to-param mapping dict and still couldn't handle Sentinel **The `broker_kwargs` pass-through (approach 3) is the simplest:** - Follows the same pattern as `CeleryAdapterConfig` (which uses `broker_transport_options: Dict[str, Any]` for pass-through) - Broker handles its own defaults — no mismatch risk - Adding a new broker type = one registry entry (import path + package name + queue param) ### What we validate Even with pass-through kwargs, we validate at our layer: - `broker_type` must be a known registry key - Required kwargs per broker are checked before calling the constructor (e.g., `redis_stream` requires `url`, `redis_stream_sentinel` requires `sentinels` + `master_name`) - Lazy imports with helpful error messages if the broker package isn't installed ## Config Structure ```python class TaskiqAdapterConfig(BaseModel): broker_type: str # required — e.g. "redis_stream", "rabbitmq", "nats" broker_kwargs: Optional[Dict[str, Any]] # pass-through to broker constructor result_backend_url: Optional[str] # Redis URL for result backend ``` ## Usage Examples ```python # Redis Streams TaskiqAdapterConfig( broker_type="redis_stream", broker_kwargs={"url": "redis://localhost:6379"}, ) # Redis Sentinel TaskiqAdapterConfig( broker_type="redis_stream_sentinel", broker_kwargs={ "sentinels": [("sentinel1", 26379), ("sentinel2", 26379)], "master_name": "mymaster", }, ) # RabbitMQ TaskiqAdapterConfig( broker_type="rabbitmq", broker_kwargs={"url": "amqp://guest:guest@localhost:5672", "qos": 10}, ) # NATS (multi-server) TaskiqAdapterConfig( broker_type="nats", broker_kwargs={"servers": ["nats://host1:4222", "nats://host2:4222"]}, ) # Kafka TaskiqAdapterConfig( broker_type="kafka", broker_kwargs={"bootstrap_servers": ["localhost:9092"]}, ) ``` ## What's in this PR - `TaskiqAdapterConfig` — config class with `broker_type`, `broker_kwargs`, `result_backend_url` - `TaskiqTaskProcessorAdapter.__init__()` + `initialize()` — creates broker and optional result backend - Abstract method stubs for remaining adapter methods (follow-up PR) ## Related PRs - Depends on: ray-project#60977 (`[taskiq - 1/n]` Enable async task handlers) — **merged** - Follow-up: `[taskiq - 3/n]` Full adapter implementation + E2E tests - Follow-up: `[taskiq - 4/n]` Retry, DLQ middleware and reliability tests 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Signed-off-by: harshit <harshit@anyscale.com> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Adel Nour <ans9868@nyu.edu>
Aydin-ab
pushed a commit
to kunling-anyscale/ray
that referenced
this pull request
Feb 20, 2026
…tialization (ray-project#61008) ## Summary - Adds **TaskiqAdapterConfig** and **TaskiqTaskProcessorAdapter** (config + initialization only) for async task processing in Ray Serve - Full adapter implementation (enqueue, consume, shutdown) coming in follow-up PR ## Design: Why `broker_type` + `broker_kwargs` We evaluated several approaches for supporting multiple Taskiq brokers. In taskiq, for each broker, there is a separate class that we need to initialize, so somewhere we need to identify the broker type and accordingly call that broker's class in taskiq: ### Approaches considered 1. **Per-broker config classes with discriminated union** — A typed Pydantic config class per broker (e.g., `RedisStreamBrokerConfig`, `RabbitMQBrokerConfig`) with `create_broker()` factory methods, combined via a Pydantic discriminated union. 2. **`broker_type` + `broker_url` + `broker_kwargs`** — A single flat config with the connection URL as a top-level field and everything else passed through a dict. 3. **`broker_type` + `broker_kwargs` (chosen)** — A single flat config where *all* broker constructor arguments (including connection URLs) are passed through `broker_kwargs`. ### Why we chose approach 3 **Per-broker config classes (approach 1) had a maintenance problem:** - Each broker has a unique constructor signature with different parameter names, types, and defaults - We'd need to mirror every broker's defaults accurately in their respective adapter config classes - Every upstream broker update could silently break our config classes - Taskiq-redis alone has **9 broker classes** — maintaining typed configs for all of them is not scalable **Separating `broker_url` (approach 2) didn't work uniformly:** - Different brokers use different parameter names for the connection: `url` (Redis, RabbitMQ), `servers` (NATS), `bootstrap_servers` (Kafka) - Redis Sentinel brokers don't take a URL at all — they require `sentinels: list[tuple[str, int]]` + `master_name: str` - This forced us to maintain a URL-to-param mapping dict and still couldn't handle Sentinel **The `broker_kwargs` pass-through (approach 3) is the simplest:** - Follows the same pattern as `CeleryAdapterConfig` (which uses `broker_transport_options: Dict[str, Any]` for pass-through) - Broker handles its own defaults — no mismatch risk - Adding a new broker type = one registry entry (import path + package name + queue param) ### What we validate Even with pass-through kwargs, we validate at our layer: - `broker_type` must be a known registry key - Required kwargs per broker are checked before calling the constructor (e.g., `redis_stream` requires `url`, `redis_stream_sentinel` requires `sentinels` + `master_name`) - Lazy imports with helpful error messages if the broker package isn't installed ## Config Structure ```python class TaskiqAdapterConfig(BaseModel): broker_type: str # required — e.g. "redis_stream", "rabbitmq", "nats" broker_kwargs: Optional[Dict[str, Any]] # pass-through to broker constructor result_backend_url: Optional[str] # Redis URL for result backend ``` ## Usage Examples ```python # Redis Streams TaskiqAdapterConfig( broker_type="redis_stream", broker_kwargs={"url": "redis://localhost:6379"}, ) # Redis Sentinel TaskiqAdapterConfig( broker_type="redis_stream_sentinel", broker_kwargs={ "sentinels": [("sentinel1", 26379), ("sentinel2", 26379)], "master_name": "mymaster", }, ) # RabbitMQ TaskiqAdapterConfig( broker_type="rabbitmq", broker_kwargs={"url": "amqp://guest:guest@localhost:5672", "qos": 10}, ) # NATS (multi-server) TaskiqAdapterConfig( broker_type="nats", broker_kwargs={"servers": ["nats://host1:4222", "nats://host2:4222"]}, ) # Kafka TaskiqAdapterConfig( broker_type="kafka", broker_kwargs={"bootstrap_servers": ["localhost:9092"]}, ) ``` ## What's in this PR - `TaskiqAdapterConfig` — config class with `broker_type`, `broker_kwargs`, `result_backend_url` - `TaskiqTaskProcessorAdapter.__init__()` + `initialize()` — creates broker and optional result backend - Abstract method stubs for remaining adapter methods (follow-up PR) ## Related PRs - Depends on: ray-project#60977 (`[taskiq - 1/n]` Enable async task handlers) — **merged** - Follow-up: `[taskiq - 3/n]` Full adapter implementation + E2E tests - Follow-up: `[taskiq - 4/n]` Retry, DLQ middleware and reliability tests 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Signed-off-by: harshit <harshit@anyscale.com> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Design: Why
broker_type+broker_kwargsWe evaluated several approaches for supporting multiple Taskiq brokers. In taskiq, for each broker, there is a separate class that we need to initialize, so somewhere we need to identify the broker type and accordingly call that broker's class in taskiq:
Approaches considered
Per-broker config classes with discriminated union — A typed Pydantic config class per broker (e.g.,
RedisStreamBrokerConfig,RabbitMQBrokerConfig) withcreate_broker()factory methods, combined via a Pydantic discriminated union.broker_type+broker_url+broker_kwargs— A single flat config with the connection URL as a top-level field and everything else passed through a dict.broker_type+broker_kwargs(chosen) — A single flat config where all broker constructor arguments (including connection URLs) are passed throughbroker_kwargs.Why we chose approach 3
Per-broker config classes (approach 1) had a maintenance problem:
Separating
broker_url(approach 2) didn't work uniformly:url(Redis, RabbitMQ),servers(NATS),bootstrap_servers(Kafka)sentinels: list[tuple[str, int]]+master_name: strThe
broker_kwargspass-through (approach 3) is the simplest:CeleryAdapterConfig(which usesbroker_transport_options: Dict[str, Any]for pass-through)What we validate
Even with pass-through kwargs, we validate at our layer:
broker_typemust be a known registry keyredis_streamrequiresurl,redis_stream_sentinelrequiressentinels+master_name)Config Structure
Usage Examples
What's in this PR
TaskiqAdapterConfig— config class withbroker_type,broker_kwargs,result_backend_urlTaskiqTaskProcessorAdapter.__init__()+initialize()— creates broker and optional result backendRelated PRs
[taskiq - 1/n]Enable async task handlers) — merged[taskiq - 3/n]Full adapter implementation + E2E tests[taskiq - 4/n]Retry, DLQ middleware and reliability tests🤖 Generated with Claude Code