From d4673657c72e8403ef65297b8ce65d7d3d985fa3 Mon Sep 17 00:00:00 2001 From: Bart Date: Thu, 19 Mar 2026 10:03:36 +0100 Subject: [PATCH 1/3] replace plain add_stream with _add_or_reuse_stream --- taskiq_nats/broker.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/taskiq_nats/broker.py b/taskiq_nats/broker.py index 62b272d..353cf84 100644 --- a/taskiq_nats/broker.py +++ b/taskiq_nats/broker.py @@ -7,6 +7,7 @@ from nats.errors import TimeoutError as NatsTimeoutError from nats.js import JetStreamContext from nats.js.api import ConsumerConfig, StreamConfig +from nats.js.errors import NotFoundError as StreamNotFoundError from taskiq import AckableMessage, AsyncBroker, AsyncResultBackend, BrokerMessage _T = typing.TypeVar("_T") # (Too short) @@ -138,6 +139,14 @@ def __init__( self.consumer: JetStreamConsumerType + async def _add_or_reuse_stream(self) -> None: + try: + await self.js.stream_info(self.stream_config.name) + logger.info(f"Stream {self.stream_config.name} already exists and was reused.") + except StreamNotFoundError: + await self.js.add_stream(config=self.stream_config) + logger.info(f"Created stream {self.stream_config.name}") + async def startup(self) -> None: """ Startup event handler. @@ -152,7 +161,7 @@ async def startup(self) -> None: self.stream_config.name = self.stream_name if not self.stream_config.subjects: self.stream_config.subjects = [self.subject] - await self.js.add_stream(config=self.stream_config) + await self._add_or_reuse_stream() await self._startup_consumer() async def shutdown(self) -> None: From 8f42d710f5a58a3440f20fe97ffeeaba26d0c512 Mon Sep 17 00:00:00 2001 From: Bart Date: Thu, 19 Mar 2026 10:23:59 +0100 Subject: [PATCH 2/3] update stream when reusing --- taskiq_nats/broker.py | 1 + 1 file changed, 1 insertion(+) diff --git a/taskiq_nats/broker.py b/taskiq_nats/broker.py index 353cf84..ad77aef 100644 --- a/taskiq_nats/broker.py +++ b/taskiq_nats/broker.py @@ -142,6 +142,7 @@ def __init__( async def _add_or_reuse_stream(self) -> None: try: await self.js.stream_info(self.stream_config.name) + await self.js.update_stream(self.stream_config.name) logger.info(f"Stream {self.stream_config.name} already exists and was reused.") except StreamNotFoundError: await self.js.add_stream(config=self.stream_config) From a45451c3881c8a438585f13e48db4dbc18c7c3d9 Mon Sep 17 00:00:00 2001 From: Bart Date: Thu, 19 Mar 2026 10:25:36 +0100 Subject: [PATCH 3/3] fix update stream input --- taskiq_nats/broker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/taskiq_nats/broker.py b/taskiq_nats/broker.py index ad77aef..0363379 100644 --- a/taskiq_nats/broker.py +++ b/taskiq_nats/broker.py @@ -142,7 +142,7 @@ def __init__( async def _add_or_reuse_stream(self) -> None: try: await self.js.stream_info(self.stream_config.name) - await self.js.update_stream(self.stream_config.name) + await self.js.update_stream(self.stream_config) logger.info(f"Stream {self.stream_config.name} already exists and was reused.") except StreamNotFoundError: await self.js.add_stream(config=self.stream_config)