Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion taskiq_nats/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from nats.errors import TimeoutError as NatsTimeoutError
from nats.js import JetStreamContext
from nats.js.api import ConsumerConfig, StreamConfig
from nats.js.errors import NotFoundError as StreamNotFoundError
from taskiq import AckableMessage, AsyncBroker, AsyncResultBackend, BrokerMessage

_T = typing.TypeVar("_T") # (Too short)
Expand Down Expand Up @@ -138,6 +139,15 @@ def __init__(

self.consumer: JetStreamConsumerType

async def _add_or_reuse_stream(self) -> None:
try:
await self.js.stream_info(self.stream_config.name)
await self.js.update_stream(self.stream_config)
logger.info(f"Stream {self.stream_config.name} already exists and was reused.")
except StreamNotFoundError:
await self.js.add_stream(config=self.stream_config)
logger.info(f"Created stream {self.stream_config.name}")

async def startup(self) -> None:
"""
Startup event handler.
Expand All @@ -152,7 +162,7 @@ async def startup(self) -> None:
self.stream_config.name = self.stream_name
if not self.stream_config.subjects:
self.stream_config.subjects = [self.subject]
await self.js.add_stream(config=self.stream_config)
await self._add_or_reuse_stream()
await self._startup_consumer()

async def shutdown(self) -> None:
Expand Down