2121
2222
2323class Executor (IExecutor ):
24-
2524 def __init__ (self , executor_id : str , callback , queue : Queue , mqttc : Client ):
2625 self .queue : Queue [Message ] = queue
2726 self .mqttc : Client = mqttc
@@ -41,7 +40,6 @@ def stop_executor(self):
4140
4241 # Main executor loop. Polls for messages in the queue and parsers them.
4342 def start_executor (self ):
44-
4543 log .info ("Thread started for %s" , self .id )
4644 self .running = True
4745
@@ -56,7 +54,9 @@ def start_executor(self):
5654 self ._put_message_in_queue (message )
5755 else :
5856 log .debug (
59- "Received unknown (n)ack with message_id: %s" , message .message_id )
57+ "Received unknown (n)ack with message_id: %s" ,
58+ message .message_id ,
59+ )
6060 case Register ():
6161 self ._handle_register_message (message )
6262 case Unregister ():
@@ -68,7 +68,8 @@ def start_executor(self):
6868 case _:
6969 # Send Nack?
7070 log .warning (
71- "Unimplemented command: %s" , message .model_dump_json ())
71+ "Unimplemented command: %s" , message .model_dump_json ()
72+ )
7273 except Empty :
7374 pass
7475
@@ -115,7 +116,9 @@ def _handle_unregister_self_message(self, message: UnregisterSelf):
115116 retries -= 1
116117 if retries == 0 :
117118 log .fatal (
118- "Did not receive an ack for message %s. Aborting..." , message .message_id )
119+ "Did not receive an ack for message %s. Aborting..." ,
120+ message .message_id ,
121+ )
119122 exit (- 1 )
120123
121124 # Handles self generated register message.
@@ -147,7 +150,9 @@ def _handle_register_message(self, message: Register):
147150 retries -= 1
148151 if retries == 0 :
149152 log .fatal (
150- "Did not receive an ack for message %s. Aborting..." , message .message_id )
153+ "Did not receive an ack for message %s. Aborting..." ,
154+ message .message_id ,
155+ )
151156 exit (- 1 )
152157
153158 # Handles a command message from SOARCA.
@@ -164,8 +169,7 @@ def _handle_command_message(self, message: Command):
164169 message_id = str (uuid1 ())
165170 timestamp = datetime .now (timezone .utc ).isoformat ()
166171 meta = Meta (timestamp = timestamp , sender_id = self .id )
167- result = Result (message_id = message_id , meta = meta ,
168- result = resultStruct )
172+ result = Result (message_id = message_id , meta = meta , result = resultStruct )
169173
170174 # Send result back
171175 self ._send_message_as_json (result )
@@ -187,7 +191,9 @@ def _handle_command_message(self, message: Command):
187191 if retries == 0 :
188192 self .acks .remove (message_id )
189193 log .error (
190- "Did not receive an acknowledgement for message %s. Skipping message..." , result .message_id )
194+ "Did not receive an acknowledgement for message %s. Skipping message..." ,
195+ result .message_id ,
196+ )
191197 break
192198
193199 # Publishes a message as JSON on a topic. Default topic is self.id.
@@ -221,12 +227,12 @@ def _wait_for_ack(self, message_id: str):
221227 log .info ("Received ack for message: %s" , message_id )
222228 return
223229 case Nack ():
224- log .warning (
225- "Received nack for message: %s" , message_id )
230+ log .warning ("Received nack for message: %s" , message_id )
226231 raise RuntimeError ("Received a nack" )
227232 case _:
228233 raise TypeError (
229- f"Unexpected message type { message .model_dump_json ()} " )
234+ f"Unexpected message type { message .model_dump_json ()} "
235+ )
230236
231237 except Empty as e :
232238 log .warning ("Did not receive an ack" )
0 commit comments