Skip to content

[serve] [taskiq - 2/n] Add multi-broker Taskiq adapter config and initialization#61008

Merged
abrarsheikh merged 7 commits into
masterfrom
taskiq-task-processor-core
Feb 12, 2026
Merged

[serve] [taskiq - 2/n] Add multi-broker Taskiq adapter config and initialization#61008
abrarsheikh merged 7 commits into
masterfrom
taskiq-task-processor-core

Conversation

@harshit-anyscale

@harshit-anyscale harshit-anyscale commented Feb 12, 2026

Copy link
Copy Markdown
Contributor

Summary

  • Adds TaskiqAdapterConfig and TaskiqTaskProcessorAdapter (config + initialization only) for async task processing in Ray Serve
  • Full adapter implementation (enqueue, consume, shutdown) coming in follow-up PR

Design: Why broker_type + broker_kwargs

We evaluated several approaches for supporting multiple Taskiq brokers. In taskiq, for each broker, there is a separate class that we need to initialize, so somewhere we need to identify the broker type and accordingly call that broker's class in taskiq:

Approaches considered

  1. Per-broker config classes with discriminated union — A typed Pydantic config class per broker (e.g., RedisStreamBrokerConfig, RabbitMQBrokerConfig) with create_broker() factory methods, combined via a Pydantic discriminated union.

  2. broker_type + broker_url + broker_kwargs — A single flat config with the connection URL as a top-level field and everything else passed through a dict.

  3. broker_type + broker_kwargs (chosen) — A single flat config where all broker constructor arguments (including connection URLs) are passed through broker_kwargs.

Why we chose approach 3

Per-broker config classes (approach 1) had a maintenance problem:

  • Each broker has a unique constructor signature with different parameter names, types, and defaults
  • We'd need to mirror every broker's defaults accurately in their respective adapter config classes
  • Every upstream broker update could silently break our config classes
  • Taskiq-redis alone has 9 broker classes — maintaining typed configs for all of them is not scalable

Separating broker_url (approach 2) didn't work uniformly:

  • Different brokers use different parameter names for the connection: url (Redis, RabbitMQ), servers (NATS), bootstrap_servers (Kafka)
  • Redis Sentinel brokers don't take a URL at all — they require sentinels: list[tuple[str, int]] + master_name: str
  • This forced us to maintain a URL-to-param mapping dict and still couldn't handle Sentinel

The broker_kwargs pass-through (approach 3) is the simplest:

  • Follows the same pattern as CeleryAdapterConfig (which uses broker_transport_options: Dict[str, Any] for pass-through)
  • Broker handles its own defaults — no mismatch risk
  • Adding a new broker type = one registry entry (import path + package name + queue param)

What we validate

Even with pass-through kwargs, we validate at our layer:

  • broker_type must be a known registry key
  • Required kwargs per broker are checked before calling the constructor (e.g., redis_stream requires url, redis_stream_sentinel requires sentinels + master_name)
  • Lazy imports with helpful error messages if the broker package isn't installed

Config Structure

class TaskiqAdapterConfig(BaseModel):
    broker_type: str                          # required — e.g. "redis_stream", "rabbitmq", "nats"
    broker_kwargs: Optional[Dict[str, Any]]   # pass-through to broker constructor
    result_backend_url: Optional[str]         # Redis URL for result backend

Usage Examples

# Redis Streams
TaskiqAdapterConfig(
    broker_type="redis_stream",
    broker_kwargs={"url": "redis://localhost:6379"},
)

# Redis Sentinel
TaskiqAdapterConfig(
    broker_type="redis_stream_sentinel",
    broker_kwargs={
        "sentinels": [("sentinel1", 26379), ("sentinel2", 26379)],
        "master_name": "mymaster",
    },
)

# RabbitMQ
TaskiqAdapterConfig(
    broker_type="rabbitmq",
    broker_kwargs={"url": "amqp://guest:guest@localhost:5672", "qos": 10},
)

# NATS (multi-server)
TaskiqAdapterConfig(
    broker_type="nats",
    broker_kwargs={"servers": ["nats://host1:4222", "nats://host2:4222"]},
)

# Kafka
TaskiqAdapterConfig(
    broker_type="kafka",
    broker_kwargs={"bootstrap_servers": ["localhost:9092"]},
)

What's in this PR

  • TaskiqAdapterConfig — config class with broker_type, broker_kwargs, result_backend_url
  • TaskiqTaskProcessorAdapter.__init__() + initialize() — creates broker and optional result backend
  • Abstract method stubs for remaining adapter methods (follow-up PR)

Related PRs

🤖 Generated with Claude Code

@harshit-anyscale harshit-anyscale requested a review from a team as a code owner February 12, 2026 05:02
@gemini-code-assist

Copy link
Copy Markdown
Contributor

Warning

You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again!

Comment thread python/ray/serve/taskiq_task_processor.py Outdated
Comment thread python/ray/serve/taskiq_task_processor.py Outdated
Comment thread python/ray/serve/taskiq_task_processor.py Outdated
@harshit-anyscale harshit-anyscale marked this pull request as draft February 12, 2026 07:21
@harshit-anyscale harshit-anyscale changed the title [serve] [taskiq - 2/n] Add Taskiq task processor adapter for async task processing Feb 12, 2026
@harshit-anyscale harshit-anyscale self-assigned this Feb 12, 2026
Add a Taskiq-based task processor adapter as an alternative to Celery,
with native async/await support and at-least-once delivery via Redis
Streams.

Components:
- TaskiqAdapterConfig: Pydantic config (broker_url, result_backend,
  consumer_group, idle_timeout)
- TaskiqTaskProcessorAdapter: Full adapter implementing
  TaskProcessorAdapter interface (init, register, enqueue, consume,
  status, shutdown)

Includes E2E tests for: basic task processing, multiple tasks,
multiple handlers, and cancel_task NotImplementedError.

Retry middleware (SmartRetryMiddleware) and dead letter queue
(DLQMiddleware) will be added in a follow-up PR.

Signed-off-by: harshit <harshit@anyscale.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

[serve] [taskiq - 2/n] Add multi-broker config and initialization for Taskiq adapter

Restructure TaskiqAdapterConfig to support multiple Taskiq brokers
(Redis Streams, Redis Cluster, Redis Sentinel, RabbitMQ, NATS, Kafka)
via broker_type + broker_kwargs pass-through pattern. This follows the
same approach as CeleryAdapterConfig — broker-specific params are passed
directly to the broker constructor without maintaining per-broker config
classes.

Key changes:
- TaskiqAdapterConfig: broker_type (required), broker_kwargs (optional),
  result_backend_url (optional)
- Broker registry with lazy imports, queue param mapping, and required
  kwargs validation
- TaskiqTaskProcessorAdapter with __init__ + initialize() implemented;
  remaining methods stubbed for follow-up PR

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@harshit-anyscale harshit-anyscale force-pushed the taskiq-task-processor-core branch from 4a4898e to 7877a29 Compare February 12, 2026 10:13
@harshit-anyscale harshit-anyscale added the go add ONLY when ready to merge, run all tests label Feb 12, 2026
@harshit-anyscale harshit-anyscale marked this pull request as ready for review February 12, 2026 10:13
Comment thread python/ray/serve/taskiq_task_processor.py Outdated
Switch from direct pydantic import to ray._common.pydantic_compat
to match all other Serve config classes and avoid serialization
mismatches in pydantic v2 environments.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Comment thread python/ray/serve/taskiq_task_processor.py
harshit-anyscale and others added 2 commits February 12, 2026 12:07
Signed-off-by: harshit <harshit@anyscale.com>
…oncurrency

- Mock importlib.import_module in tests so CI doesn't need taskiq-redis installed
- Store consumer_concurrency in initialize() for use by start_consumer() in follow-up PR

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Comment thread python/ray/serve/taskiq_task_processor.py
Signed-off-by: harshit <harshit@anyscale.com>
Comment thread python/ray/serve/taskiq_task_processor.py
Comment thread python/ray/serve/taskiq_task_processor.py
Comment thread python/ray/serve/taskiq_task_processor.py Outdated
Comment thread python/ray/serve/taskiq_task_processor.py
Signed-off-by: harshit <harshit@anyscale.com>
@abrarsheikh abrarsheikh merged commit 90301f3 into master Feb 12, 2026
6 checks passed
@abrarsheikh abrarsheikh deleted the taskiq-task-processor-core branch February 12, 2026 20:58
ans9868 pushed a commit to ans9868/ray that referenced this pull request Feb 18, 2026
…tialization (ray-project#61008)

## Summary
- Adds **TaskiqAdapterConfig** and **TaskiqTaskProcessorAdapter**
(config + initialization only) for async task processing in Ray Serve
- Full adapter implementation (enqueue, consume, shutdown) coming in
follow-up PR

## Design: Why `broker_type` + `broker_kwargs`

We evaluated several approaches for supporting multiple Taskiq brokers.
In taskiq, for each broker, there is a separate class that we need to
initialize, so somewhere we need to identify the broker type and
accordingly call that broker's class in taskiq:

### Approaches considered

1. **Per-broker config classes with discriminated union** — A typed
Pydantic config class per broker (e.g., `RedisStreamBrokerConfig`,
`RabbitMQBrokerConfig`) with `create_broker()` factory methods, combined
via a Pydantic discriminated union.

2. **`broker_type` + `broker_url` + `broker_kwargs`** — A single flat
config with the connection URL as a top-level field and everything else
passed through a dict.

3. **`broker_type` + `broker_kwargs` (chosen)** — A single flat config
where *all* broker constructor arguments (including connection URLs) are
passed through `broker_kwargs`.

### Why we chose approach 3

**Per-broker config classes (approach 1) had a maintenance problem:**
- Each broker has a unique constructor signature with different
parameter names, types, and defaults
- We'd need to mirror every broker's defaults accurately in their
respective adapter config classes
- Every upstream broker update could silently break our config classes
- Taskiq-redis alone has **9 broker classes** — maintaining typed
configs for all of them is not scalable

**Separating `broker_url` (approach 2) didn't work uniformly:**
- Different brokers use different parameter names for the connection:
`url` (Redis, RabbitMQ), `servers` (NATS), `bootstrap_servers` (Kafka)
- Redis Sentinel brokers don't take a URL at all — they require
`sentinels: list[tuple[str, int]]` + `master_name: str`
- This forced us to maintain a URL-to-param mapping dict and still
couldn't handle Sentinel

**The `broker_kwargs` pass-through (approach 3) is the simplest:**
- Follows the same pattern as `CeleryAdapterConfig` (which uses
`broker_transport_options: Dict[str, Any]` for pass-through)
- Broker handles its own defaults — no mismatch risk
- Adding a new broker type = one registry entry (import path + package
name + queue param)

### What we validate

Even with pass-through kwargs, we validate at our layer:
- `broker_type` must be a known registry key
- Required kwargs per broker are checked before calling the constructor
(e.g., `redis_stream` requires `url`, `redis_stream_sentinel` requires
`sentinels` + `master_name`)
- Lazy imports with helpful error messages if the broker package isn't
installed

## Config Structure

```python
class TaskiqAdapterConfig(BaseModel):
    broker_type: str                          # required — e.g. "redis_stream", "rabbitmq", "nats"
    broker_kwargs: Optional[Dict[str, Any]]   # pass-through to broker constructor
    result_backend_url: Optional[str]         # Redis URL for result backend
```

## Usage Examples

```python
# Redis Streams
TaskiqAdapterConfig(
    broker_type="redis_stream",
    broker_kwargs={"url": "redis://localhost:6379"},
)

# Redis Sentinel
TaskiqAdapterConfig(
    broker_type="redis_stream_sentinel",
    broker_kwargs={
        "sentinels": [("sentinel1", 26379), ("sentinel2", 26379)],
        "master_name": "mymaster",
    },
)

# RabbitMQ
TaskiqAdapterConfig(
    broker_type="rabbitmq",
    broker_kwargs={"url": "amqp://guest:guest@localhost:5672", "qos": 10},
)

# NATS (multi-server)
TaskiqAdapterConfig(
    broker_type="nats",
    broker_kwargs={"servers": ["nats://host1:4222", "nats://host2:4222"]},
)

# Kafka
TaskiqAdapterConfig(
    broker_type="kafka",
    broker_kwargs={"bootstrap_servers": ["localhost:9092"]},
)
```

## What's in this PR
- `TaskiqAdapterConfig` — config class with `broker_type`,
`broker_kwargs`, `result_backend_url`
- `TaskiqTaskProcessorAdapter.__init__()` + `initialize()` — creates
broker and optional result backend
- Abstract method stubs for remaining adapter methods (follow-up PR)

## Related PRs
- Depends on: ray-project#60977 (`[taskiq - 1/n]` Enable async task handlers) —
**merged**
- Follow-up: `[taskiq - 3/n]` Full adapter implementation + E2E tests
- Follow-up: `[taskiq - 4/n]` Retry, DLQ middleware and reliability
tests

🤖 Generated with [Claude Code](https://claude.com/claude-code)

---------

Signed-off-by: harshit <harshit@anyscale.com>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
Signed-off-by: Adel Nour <ans9868@nyu.edu>
Aydin-ab pushed a commit to kunling-anyscale/ray that referenced this pull request Feb 20, 2026
…tialization (ray-project#61008)

## Summary
- Adds **TaskiqAdapterConfig** and **TaskiqTaskProcessorAdapter**
(config + initialization only) for async task processing in Ray Serve
- Full adapter implementation (enqueue, consume, shutdown) coming in
follow-up PR

## Design: Why `broker_type` + `broker_kwargs`

We evaluated several approaches for supporting multiple Taskiq brokers.
In taskiq, for each broker, there is a separate class that we need to
initialize, so somewhere we need to identify the broker type and
accordingly call that broker's class in taskiq:

### Approaches considered

1. **Per-broker config classes with discriminated union** — A typed
Pydantic config class per broker (e.g., `RedisStreamBrokerConfig`,
`RabbitMQBrokerConfig`) with `create_broker()` factory methods, combined
via a Pydantic discriminated union.

2. **`broker_type` + `broker_url` + `broker_kwargs`** — A single flat
config with the connection URL as a top-level field and everything else
passed through a dict.

3. **`broker_type` + `broker_kwargs` (chosen)** — A single flat config
where *all* broker constructor arguments (including connection URLs) are
passed through `broker_kwargs`.

### Why we chose approach 3

**Per-broker config classes (approach 1) had a maintenance problem:**
- Each broker has a unique constructor signature with different
parameter names, types, and defaults
- We'd need to mirror every broker's defaults accurately in their
respective adapter config classes
- Every upstream broker update could silently break our config classes
- Taskiq-redis alone has **9 broker classes** — maintaining typed
configs for all of them is not scalable

**Separating `broker_url` (approach 2) didn't work uniformly:**
- Different brokers use different parameter names for the connection:
`url` (Redis, RabbitMQ), `servers` (NATS), `bootstrap_servers` (Kafka)
- Redis Sentinel brokers don't take a URL at all — they require
`sentinels: list[tuple[str, int]]` + `master_name: str`
- This forced us to maintain a URL-to-param mapping dict and still
couldn't handle Sentinel

**The `broker_kwargs` pass-through (approach 3) is the simplest:**
- Follows the same pattern as `CeleryAdapterConfig` (which uses
`broker_transport_options: Dict[str, Any]` for pass-through)
- Broker handles its own defaults — no mismatch risk
- Adding a new broker type = one registry entry (import path + package
name + queue param)

### What we validate

Even with pass-through kwargs, we validate at our layer:
- `broker_type` must be a known registry key
- Required kwargs per broker are checked before calling the constructor
(e.g., `redis_stream` requires `url`, `redis_stream_sentinel` requires
`sentinels` + `master_name`)
- Lazy imports with helpful error messages if the broker package isn't
installed

## Config Structure

```python
class TaskiqAdapterConfig(BaseModel):
    broker_type: str                          # required — e.g. "redis_stream", "rabbitmq", "nats"
    broker_kwargs: Optional[Dict[str, Any]]   # pass-through to broker constructor
    result_backend_url: Optional[str]         # Redis URL for result backend
```

## Usage Examples

```python
# Redis Streams
TaskiqAdapterConfig(
    broker_type="redis_stream",
    broker_kwargs={"url": "redis://localhost:6379"},
)

# Redis Sentinel
TaskiqAdapterConfig(
    broker_type="redis_stream_sentinel",
    broker_kwargs={
        "sentinels": [("sentinel1", 26379), ("sentinel2", 26379)],
        "master_name": "mymaster",
    },
)

# RabbitMQ
TaskiqAdapterConfig(
    broker_type="rabbitmq",
    broker_kwargs={"url": "amqp://guest:guest@localhost:5672", "qos": 10},
)

# NATS (multi-server)
TaskiqAdapterConfig(
    broker_type="nats",
    broker_kwargs={"servers": ["nats://host1:4222", "nats://host2:4222"]},
)

# Kafka
TaskiqAdapterConfig(
    broker_type="kafka",
    broker_kwargs={"bootstrap_servers": ["localhost:9092"]},
)
```

## What's in this PR
- `TaskiqAdapterConfig` — config class with `broker_type`,
`broker_kwargs`, `result_backend_url`
- `TaskiqTaskProcessorAdapter.__init__()` + `initialize()` — creates
broker and optional result backend
- Abstract method stubs for remaining adapter methods (follow-up PR)


## Related PRs
- Depends on: ray-project#60977 (`[taskiq - 1/n]` Enable async task handlers) —
**merged**
- Follow-up: `[taskiq - 3/n]` Full adapter implementation + E2E tests
- Follow-up: `[taskiq - 4/n]` Retry, DLQ middleware and reliability
tests

🤖 Generated with [Claude Code](https://claude.com/claude-code)

---------

Signed-off-by: harshit <harshit@anyscale.com>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

go add ONLY when ready to merge, run all tests

2 participants