File tree Expand file tree Collapse file tree 2 files changed +24
-3
lines changed
Expand file tree Collapse file tree 2 files changed +24
-3
lines changed Original file line number Diff line number Diff line change @@ -65,6 +65,10 @@ class Task
6565 * @var int
6666 */
6767 private $ startedCount ;
68+ /**
69+ * @var mixed
70+ */
71+ private $ queueSource ;
6872
6973 /**
7074 * Task constructor.
@@ -104,6 +108,7 @@ public function getRawData()
104108 'ts_started ' => $ this ->getTsStarted (),
105109 'exec_time ' => $ this ->getExecTime (),
106110 'started_count ' => $ this ->getStartedCount (),
111+ 'queue_source ' => $ this ->getQueueSource (),
107112 );
108113 }
109114
@@ -367,6 +372,16 @@ public function setStatusDone($execTime=null)
367372 return $ this ;
368373 }
369374
375+ public function setQueueSource ($ queueSource )
376+ {
377+ $ this ->queueSource = $ queueSource ;
378+ }
379+
380+ public function getQueueSource ()
381+ {
382+ return $ this ->queueSource ?: null ;
383+ }
384+
370385 public function setStatusWaitingForRetry ($ execTime =null )
371386 {
372387 $ this ->status = \G4 \Tasker \Consts::STATUS_WAITING_FOR_RETRY ;
@@ -414,7 +429,8 @@ public static function fromData($data)
414429 ->setStatus ((int ) $ data ['status ' ])
415430 ->setTsStarted ((int ) $ data ['ts_started ' ])
416431 ->setExecTime ((float ) $ data ['exec_time ' ])
417- ->setStartedCount ((int ) $ data ['started_count ' ]);
432+ ->setStartedCount ((int ) $ data ['started_count ' ])
433+ ->setQueueSource (isset ($ data ['queue_source ' ]) ? (string ) $ data ['queue_source ' ] : null );
418434
419435 return $ task ;
420436 }
Original file line number Diff line number Diff line change @@ -65,12 +65,17 @@ class Runner extends \G4\Tasker\TimerAbstract
6565
6666 const LOG_TYPE = 'rb_worker ' ;
6767
68- public function __construct (AMQPMessage $ AMQPMessage , TaskRepositoryInterface $ taskRepository , array $ delayForRetries = [])
69- {
68+ public function __construct (
69+ AMQPMessage $ AMQPMessage ,
70+ TaskRepositoryInterface $ taskRepository ,
71+ array $ delayForRetries = [],
72+ string $ queue = null
73+ ){
7074 $ this ->taskRepository = $ taskRepository ;
7175 $ this ->taskData = new \G4 \ValueObject \Dictionary (
7276 json_decode ($ AMQPMessage ->getBody (), true )
7377 );
78+ $ this ->taskData ->add ('queue_source ' , $ queue );
7479 $ this ->taskDomain = \G4 \Tasker \Model \Domain \Task::fromData ($ this ->taskData ->getAll ());
7580 $ this ->taskerExecution = (new \G4 \Log \Data \TaskerExecution ())->setLogType (self ::LOG_TYPE );
7681 $ this ->resolver = new RetryAfterResolver ($ delayForRetries );
You can’t perform that action at this time.
0 commit comments