Taskiq Integration¶
InjectQ provides seamless dependency injection for Taskiq background tasks using a lightweight, context-based approach.
Installation¶
Quick Start¶
from typing import Annotated
from taskiq import InMemoryBroker, Context, TaskiqDepends
from injectq import InjectQ, singleton
from injectq.integrations.taskiq import setup_taskiq, InjectTaskiq
# Define your services
@singleton
class EmailService:
def send_email(self, to: str, subject: str) -> None:
print(f"Sending email to {to}: {subject}")
# Setup
container = InjectQ.get_instance()
broker = InMemoryBroker()
setup_taskiq(container, broker)
# Define tasks with dependency injection
@broker.task
async def send_welcome_email(
user_email: str,
service: Annotated[EmailService, InjectTaskiq(EmailService)]
):
service.send_email(user_email, "Welcome!")
# Schedule task
await broker(send_welcome_email)(user_email="user@example.com")
How It Works¶
The Taskiq integration uses:
1. State-based container attachment - The container is stored in broker.state
2. TaskiqDepends for dependency resolution
3. InjectTaskiq as a Taskiq dependency marker for type-safe injection
Core API¶
setup_taskiq(container, broker)¶
Registers the InjectQ integration with your Taskiq broker. This attaches the container to the broker's state for task-level access.
from injectq import InjectQ
from injectq.integrations.taskiq import setup_taskiq
from taskiq import InMemoryBroker
container = InjectQ.get_instance()
broker = InMemoryBroker()
# Register integration - attaches container to broker.state
setup_taskiq(container, broker)
InjectTaskiq[ServiceType]¶
Type-safe dependency marker for Taskiq tasks. Use with Annotated for clean type hints.
from typing import Annotated
from injectq.integrations.taskiq import InjectTaskiq
# In task definition
@broker.task
async def process_data(
data: dict,
service: Annotated[DataService, InjectTaskiq(DataService)]
):
return service.process(data)
InjectTask[ServiceType] (Alias)¶
Backwards compatibility alias for InjectTaskiq.
from typing import Annotated
from injectq.integrations.taskiq import InjectTask
# Works the same as InjectTaskiq
@broker.task
async def process(
data: dict,
service: Annotated[DataService, InjectTask(DataService)]
):
return service.process(data)
Basic Example¶
Complete working example with Taskiq:
from typing import Annotated
from taskiq import InMemoryBroker, Context, TaskiqDepends
from injectq import InjectQ, singleton, inject
from injectq.integrations.taskiq import InjectTaskiq, setup_taskiq
# Define services
@singleton
class Database:
def __init__(self):
print("Database initialized")
self.data = {"orders": []}
def get_orders(self):
return self.data["orders"]
def save_order(self, order: dict):
self.data["orders"].append(order)
@singleton
class EmailService:
def send_email(self, to: str, subject: str, body: str):
print(f"Email to {to}: {subject}")
@singleton
class OrderService:
@inject
def __init__(self, db: Database, email: EmailService):
self.db = db
self.email = email
def process_order(self, order_id: int):
order = {"id": order_id, "status": "processing"}
self.db.save_order(order)
self.email.send_email("admin@example.com", f"Order {order_id}", "New order")
return order
# Setup
container = InjectQ.get_instance()
broker = InMemoryBroker()
setup_taskiq(container, broker)
# Define tasks
@broker.task
async def process_new_order(
order_id: int,
service: Annotated[OrderService, InjectTaskiq(OrderService)]
):
"""Process a new order"""
return service.process_order(order_id)
@broker.task
async def send_notification(
user_email: str,
service: Annotated[EmailService, InjectTaskiq(EmailService)]
):
"""Send notification email"""
service.send_email(user_email, "Notification", "Your order is ready!")
# Schedule tasks
async def main():
await broker(process_new_order)(order_id=123)
await broker(send_notification)(user_email="user@example.com")
if __name__ == "__main__":
import asyncio
asyncio.run(main())
Using with Modules¶
Organize task services with modules:
from typing import Annotated
from injectq import Module, InjectQ
from injectq.integrations.taskiq import InjectTaskiq, setup_taskiq
from taskiq import InMemoryBroker
# Define modules
class DatabaseModule(Module):
def configure(self, binder):
binder.bind(Database, Database())
class ServiceModule(Module):
def configure(self, binder):
binder.bind(EmailService, EmailService())
binder.bind(OrderService, OrderService())
# Setup
container = InjectQ(modules=[
DatabaseModule(),
ServiceModule()
])
broker = InMemoryBroker()
setup_taskiq(container, broker)
# Tasks use services from modules
@broker.task
async def process_order(
order_id: int,
service: Annotated[OrderService, InjectTaskiq(OrderService)]
):
return service.process_order(order_id)
@broker.task
async def send_email(
to: str,
service: Annotated[EmailService, InjectTaskiq(EmailService)]
):
service.send_email(to, "Update", "Your order status changed")
Common Patterns¶
Dependency Chains¶
from typing import Annotated
from injectq import singleton, inject
@singleton
class Logger:
def log(self, msg: str):
print(f"[LOG] {msg}")
@singleton
class ProcessingService:
@inject
def __init__(self, logger: Logger):
self.logger = logger
def process(self, data: dict):
self.logger.log(f"Processing {len(data)} items")
return {"status": "done", "items": len(data)}
@broker.task
async def batch_process(
data: dict,
service: Annotated[ProcessingService, InjectTaskiq(ProcessingService)]
):
return service.process(data)
Scoped Services¶
from typing import Annotated
from uuid import uuid4
from injectq import scoped
@scoped
class TaskContext:
def __init__(self):
self.task_id = str(uuid4())
self.start_time = __import__("time").time()
def get_duration(self):
return __import__("time").time() - self.start_time
@broker.task
async def long_running_task(
data: dict,
ctx: Annotated[TaskContext, InjectTaskiq(TaskContext)],
processor: Annotated[DataProcessor, InjectTaskiq(DataProcessor)]
):
result = processor.process(data)
print(f"Task {ctx.task_id} completed in {ctx.get_duration():.2f}s")
return result
Error Handling¶
from typing import Annotated
@broker.task(max_retries=3)
async def retry_task(
item_id: int,
service: Annotated[ProcessingService, InjectTaskiq(ProcessingService)]
):
"""Task with retry logic"""
try:
return service.process_item(item_id)
except ProcessingError as e:
# Taskiq handles retries
raise
Testing¶
Test tasks with mocked dependencies:
import pytest
from typing import Annotated
from injectq import InjectQ
from injectq.integrations.taskiq import InjectTaskiq, setup_taskiq
from taskiq import InMemoryBroker
class MockEmailService:
def __init__(self):
self.sent_emails = []
def send_email(self, to: str, subject: str, body: str):
self.sent_emails.append({"to": to, "subject": subject})
return True
@pytest.fixture
def test_broker():
"""Create test broker with mocked services"""
broker = InMemoryBroker()
container = InjectQ()
# Use mocks instead of real services
container[EmailService] = MockEmailService
setup_taskiq(container, broker)
return broker
@pytest.mark.asyncio
async def test_send_email(test_broker):
@test_broker.task
async def send_welcome(
email: str,
service: Annotated[EmailService, InjectTaskiq(EmailService)]
):
service.send_email(email, "Welcome", "Welcome to our app!")
await test_broker(send_welcome)(email="test@example.com")
# Verify email was "sent"
service = test_broker.state.injectq_container[EmailService]
assert len(service.sent_emails) == 1
assert service.sent_emails[0]["to"] == "test@example.com"
Best Practices¶
✅ Good Patterns¶
1. Use singleton for shared resources
2. Use dependency injection in all tasks
@broker.task
async def my_task(
service: Annotated[MyService, InjectTaskiq(MyService)]
):
return service.do_something()
3. Use scoped for task-specific data
❌ Bad Patterns¶
1. Don't access container directly
# ❌ Bad
@broker.task
async def my_task():
service = container.get(MyService)
return service.do_something()
# ✅ Good
@broker.task
async def my_task(
service: Annotated[MyService, InjectTaskiq(MyService)]
):
return service.do_something()
2. Don't use singleton for task-specific data
# ❌ Bad - shared across tasks!
@singleton
class TaskContext:
def __init__(self):
self.data = {}
# ✅ Good - isolated per task
@scoped
class TaskContext:
def __init__(self):
self.data = {}
Summary¶
Taskiq integration provides:
- Simple setup - Just
setup_taskiq(container, broker)once at startup - Type-safe injection - Use
Annotated[ServiceType, InjectTaskiq(ServiceType)]in tasks - Task isolation - Each task gets its own container context via Taskiq's dependency system
- Zero global state - No singleton container pollution
- Easy testing - Mock dependencies by rebinding in test container
Key components:
setup_taskiq(container, broker)- Register integrationInjectTaskiq[ServiceType]orInjectTask[ServiceType]- Inject dependencies in tasks- Singleton, scoped, and transient scopes all supported
Best practices:
- Use dependency injection in all tasks
- Use singleton for shared resources
- Use scoped for task-specific data
- Test with mocked dependencies
- Organize services with modules