Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
a2b3e03
Provided support for Symfony 5.x
webhdx May 27, 2020
6fce110
[Composer] Changed package name
webhdx Jun 22, 2020
a43c936
[Composer] Generated lock file
webhdx Jun 22, 2020
d3f835d
EC-245: JobQueueBundle cron throws exception and pollutes log (#1)
mauroaltamura Oct 13, 2020
f3a802a
Bumped doctrine/common to ^3.0 and fixed compatibility
webhdx Dec 22, 2020
1e80d2e
Bumped minimal PHP version to 7.3
webhdx Dec 22, 2020
fc96862
Fixed return type in CronTest::setUp
webhdx Dec 22, 2020
ec72a00
[Composer] Set branch alias to 4.0.x-dev
webhdx Dec 22, 2020
1c69ee7
Fixed return type in setUp methods
webhdx Dec 22, 2020
f42c05c
Set travis to use installed phpunit version
webhdx Dec 22, 2020
75a7399
Disabled throwing exception on errors to avoid false results on depre…
webhdx Dec 22, 2020
aa62401
Bumped PHP requirement
Dec 13, 2021
045d029
Merge pull request #5 from ezsystems/bump-php-req
webhdx Dec 14, 2021
c93b45e
IBX-3029: Fixed entities referenced by their alias instead of FQCN
Steveb-p Jun 13, 2022
9805af0
IBX-3029: Fixed entities referenced by their alias instead of FQCN
Steveb-p Jun 13, 2022
f364c73
IBX-3029: Fixed entities referenced by their alias instead of FQCN
Steveb-p Jun 13, 2022
1646d5e
Update composer.json
CunningFatalist Apr 2, 2024
a612c22
IND-512 fix deprecations
CunningFatalist Apr 8, 2024
29f7c47
IND-512 fix route deprecations
CunningFatalist Apr 10, 2024
9ba0298
NOKEY fix interface
CunningFatalist Apr 10, 2024
fc86ae8
NOKEY more fixes
CunningFatalist Apr 10, 2024
92b2235
IND-515 tree builder return type
CunningFatalist Apr 12, 2024
ecf7e53
IND-515 mapped super class
CunningFatalist Apr 15, 2024
39e25a9
IND-515 attributes
CunningFatalist Apr 15, 2024
b310465
IND-515 attr
CunningFatalist Apr 15, 2024
dbb8d3e
Fix type
CunningFatalist Apr 16, 2024
46346a9
IND-515 fix compatibility issues
paroe May 13, 2024
bc0dbd9
IND-515 fix type annotation
paroe May 17, 2024
5ad649e
IND-515 fix some more issues
paroe May 24, 2024
fb695f5
IND-515 document expected `args` structure
paroe May 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@ cache:
matrix:
fast_finish: true
include:
- php: 7.1
- php: 7.2
- php: 7.3
- php: 7.4

before_script:
- composer self-update
- echo "memory_limit=3G" >> ~/.phpenv/versions/$(phpenv version-name)/etc/conf.d/travis.ini

install: composer install --dev

script: phpunit --coverage-clover clover
script: vendor/bin/phpunit --coverage-clover clover

after_success:
- curl -sL https://bit.ly/artifact-uploader | php
24 changes: 12 additions & 12 deletions Command/CleanUpCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

namespace JMS\JobQueueBundle\Command;

use Doctrine\Common\Persistence\ManagerRegistry;
use Doctrine\Persistence\ManagerRegistry;
use Doctrine\DBAL\Connection;
use Doctrine\ORM\EntityManager;
use JMS\JobQueueBundle\Entity\Job;
Expand All @@ -14,20 +14,18 @@

class CleanUpCommand extends Command
{
protected static $defaultName = 'jms-job-queue:clean-up';

private $jobManager;
private $registry;

public function __construct(ManagerRegistry $registry, JobManager $jobManager)
{
parent::__construct();
parent::__construct('jms-job-queue:clean-up');

$this->jobManager = $jobManager;
$this->registry = $registry;
}

protected function configure()
protected function configure(): void
{
$this
->setDescription('Cleans up jobs which exceed the maximum retention time.')
Expand All @@ -37,14 +35,16 @@ protected function configure()
;
}

protected function execute(InputInterface $input, OutputInterface $output)
protected function execute(InputInterface $input, OutputInterface $output): int
{
/** @var EntityManager $em */
$em = $this->registry->getManagerForClass(Job::class);
$con = $em->getConnection();

$this->cleanUpExpiredJobs($em, $con, $input);
$this->collectStaleJobs($em);

return Command::SUCCESS;
}

private function collectStaleJobs(EntityManager $em)
Expand All @@ -69,7 +69,7 @@ private function findStaleJobs(EntityManager $em)
$em->clear();

/** @var Job $job */
$job = $em->createQuery("SELECT j FROM JMSJobQueueBundle:Job j
$job = $em->createQuery("SELECT j FROM " . Job::class . " j
WHERE j.state = :running AND j.workerName IS NOT NULL AND j.checkedAt < :maxAge
AND j.id NOT IN (:excludedIds)")
->setParameter('running', Job::STATE_RUNNING)
Expand Down Expand Up @@ -97,7 +97,7 @@ private function cleanUpExpiredJobs(EntityManager $em, Connection $con, InputInt
$count++;

$result = $con->executeQuery($incomingDepsSql, array('id' => $job->getId()));
if ($result->fetchColumn() !== false) {
if ($result->fetchOne() !== false) {
$em->transactional(function() use ($em, $job) {
$this->resolveDependencies($em, $job);
$em->remove($job);
Expand Down Expand Up @@ -141,7 +141,7 @@ private function resolveDependencies(EntityManager $em, Job $job)
private function findExpiredJobs(EntityManager $em, InputInterface $input)
{
$succeededJobs = function(array $excludedIds) use ($em, $input) {
return $em->createQuery("SELECT j FROM JMSJobQueueBundle:Job j WHERE j.closedAt < :maxRetentionTime AND j.originalJob IS NULL AND j.state = :succeeded AND j.id NOT IN (:excludedIds)")
return $em->createQuery("SELECT j FROM " . Job::class . " j WHERE j.closedAt < :maxRetentionTime AND j.originalJob IS NULL AND j.state = :succeeded AND j.id NOT IN (:excludedIds)")
->setParameter('maxRetentionTime', new \DateTime('-'.$input->getOption('max-retention-succeeded')))
->setParameter('excludedIds', $excludedIds)
->setParameter('succeeded', Job::STATE_FINISHED)
Expand All @@ -151,7 +151,7 @@ private function findExpiredJobs(EntityManager $em, InputInterface $input)
yield from $this->whileResults( $succeededJobs );

$finishedJobs = function(array $excludedIds) use ($em, $input) {
return $em->createQuery("SELECT j FROM JMSJobQueueBundle:Job j WHERE j.closedAt < :maxRetentionTime AND j.originalJob IS NULL AND j.id NOT IN (:excludedIds)")
return $em->createQuery("SELECT j FROM " . Job::class . " j WHERE j.closedAt < :maxRetentionTime AND j.originalJob IS NULL AND j.id NOT IN (:excludedIds)")
->setParameter('maxRetentionTime', new \DateTime('-'.$input->getOption('max-retention')))
->setParameter('excludedIds', $excludedIds)
->setMaxResults(100)
Expand All @@ -160,7 +160,7 @@ private function findExpiredJobs(EntityManager $em, InputInterface $input)
yield from $this->whileResults( $finishedJobs );

$canceledJobs = function(array $excludedIds) use ($em, $input) {
return $em->createQuery("SELECT j FROM JMSJobQueueBundle:Job j WHERE j.state = :canceled AND j.createdAt < :maxRetentionTime AND j.originalJob IS NULL AND j.id NOT IN (:excludedIds)")
return $em->createQuery("SELECT j FROM " . Job::class . " j WHERE j.state = :canceled AND j.createdAt < :maxRetentionTime AND j.originalJob IS NULL AND j.id NOT IN (:excludedIds)")
->setParameter('maxRetentionTime', new \DateTime('-'.$input->getOption('max-retention')))
->setParameter('canceled', Job::STATE_CANCELED)
->setParameter('excludedIds', $excludedIds)
Expand All @@ -183,4 +183,4 @@ private function whileResults(callable $resultProducer)
}
} while ( ! empty($jobs));
}
}
}
14 changes: 6 additions & 8 deletions Command/MarkJobIncompleteCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

namespace JMS\JobQueueBundle\Command;

use Doctrine\Common\Persistence\ManagerRegistry;
use Doctrine\Persistence\ManagerRegistry;
use Doctrine\ORM\EntityManager;
use JMS\JobQueueBundle\Entity\Job;
use Symfony\Component\Console\Command\Command;
Expand All @@ -13,28 +13,26 @@

class MarkJobIncompleteCommand extends Command
{
protected static $defaultName = 'jms-job-queue:mark-incomplete';

private $registry;
private $jobManager;

public function __construct(ManagerRegistry $managerRegistry, JobManager $jobManager)
{
parent::__construct();
parent::__construct('jms-job-queue:mark-incomplete');

$this->registry = $managerRegistry;
$this->jobManager = $jobManager;
}

protected function configure()
protected function configure(): void
{
$this
->setDescription('Internal command (do not use). It marks jobs as incomplete.')
->addArgument('job-id', InputArgument::REQUIRED, 'The ID of the Job.')
;
}

protected function execute(InputInterface $input, OutputInterface $output)
protected function execute(InputInterface $input, OutputInterface $output): int
{
/** @var EntityManager $em */
$em = $this->registry->getManagerForClass(Job::class);
Expand All @@ -52,6 +50,6 @@ protected function execute(InputInterface $input, OutputInterface $output)

$this->jobManager->closeJob($job, Job::STATE_INCOMPLETE);

return 0;
return Command::SUCCESS;
}
}
}
26 changes: 15 additions & 11 deletions Command/RunCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

namespace JMS\JobQueueBundle\Command;

use Doctrine\ORM\EntityManager;
use Doctrine\Persistence\ObjectManager;
use JMS\JobQueueBundle\Entity\Job;
use JMS\JobQueueBundle\Entity\Repository\JobManager;
use JMS\JobQueueBundle\Event\NewOutputEvent;
Expand All @@ -35,8 +35,6 @@

class RunCommand extends Command
{
protected static $defaultName = 'jms-job-queue:run';

/** @var string */
private $env;

Expand Down Expand Up @@ -69,7 +67,7 @@ class RunCommand extends Command

public function __construct(ManagerRegistry $managerRegistry, JobManager $jobManager, EventDispatcherInterface $dispatcher, array $queueOptionsDefault, array $queueOptions)
{
parent::__construct();
parent::__construct('jms-job-queue:run');

$this->registry = $managerRegistry;
$this->jobManager = $jobManager;
Expand All @@ -78,7 +76,7 @@ public function __construct(ManagerRegistry $managerRegistry, JobManager $jobMan
$this->queueOptions = $queueOptions;
}

protected function configure()
protected function configure(): void
{
$this
->setDescription('Runs jobs from the queue.')
Expand All @@ -90,7 +88,7 @@ protected function configure()
;
}

protected function execute(InputInterface $input, OutputInterface $output)
protected function execute(InputInterface $input, OutputInterface $output): int
{
$startTime = time();

Expand Down Expand Up @@ -149,6 +147,8 @@ protected function execute(InputInterface $input, OutputInterface $output)
$this->queueOptionsDefault,
$this->queueOptions
);

return Command::SUCCESS;
}

private function runJobs($workerName, $startTime, $maxRuntime, $idleTime, $maxJobs, array $restrictedQueues, array $queueOptionsDefaults, array $queueOptions)
Expand Down Expand Up @@ -272,6 +272,10 @@ private function getRunningJobsPerQueue()
return $runningJobsPerQueue;
}

/**
* @throws \Doctrine\ORM\ORMException
* @throws \Doctrine\ORM\OptimisticLockException
*/
private function checkRunningJobs()
{
foreach ($this->runningJobs as $i => &$data) {
Expand All @@ -283,13 +287,13 @@ private function checkRunningJobs()

if ( ! empty($newOutput)) {
$event = new NewOutputEvent($data['job'], $newOutput, NewOutputEvent::TYPE_STDOUT);
$this->dispatcher->dispatch('jms_job_queue.new_job_output', $event);
$this->dispatcher->dispatch($event, 'jms_job_queue.new_job_output');
$newOutput = $event->getNewOutput();
}

if ( ! empty($newErrorOutput)) {
$event = new NewOutputEvent($data['job'], $newErrorOutput, NewOutputEvent::TYPE_STDERR);
$this->dispatcher->dispatch('jms_job_queue.new_job_output', $event);
$this->dispatcher->dispatch($event, 'jms_job_queue.new_job_output');
$newErrorOutput = $event->getNewOutput();
}

Expand Down Expand Up @@ -350,7 +354,7 @@ private function checkRunningJobs()
private function startJob(Job $job)
{
$event = new StateChangeEvent($job, Job::STATE_RUNNING);
$this->dispatcher->dispatch('jms_job_queue.job_state_change', $event);
$this->dispatcher->dispatch($event, 'jms_job_queue.job_state_change');
$newState = $event->getNewState();

if (Job::STATE_CANCELED === $newState) {
Expand Down Expand Up @@ -443,8 +447,8 @@ private function getBasicCommandLineArgs(): array
return $args;
}

private function getEntityManager(): EntityManager
private function getEntityManager(): ObjectManager
{
return /** @var EntityManager */ $this->registry->getManagerForClass('JMSJobQueueBundle:Job');
return /** @var ObjectManager */ $this->registry->getManagerForClass(Job::class);
}
}
35 changes: 16 additions & 19 deletions Command/ScheduleCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

namespace JMS\JobQueueBundle\Command;

use Doctrine\Common\Persistence\ManagerRegistry;
use Doctrine\Persistence\ManagerRegistry;
use Doctrine\ORM\EntityManager;
use Doctrine\ORM\Query;
use JMS\JobQueueBundle\Console\CronCommand;
Expand All @@ -17,22 +17,21 @@

class ScheduleCommand extends Command
{
protected static $defaultName = 'jms-job-queue:schedule';

private $registry;
private $schedulers;
private $cronCommands;

public function __construct(ManagerRegistry $managerRegistry, iterable $schedulers, iterable $cronCommands)
/**
* @param ManagerRegistry $registry
* @param iterable<JobScheduler> $schedulers
* @param iterable<CronCommand> $cronCommands
*/
public function __construct(
private readonly ManagerRegistry $registry,
private readonly iterable $schedulers,
private readonly iterable $cronCommands
)
{
parent::__construct();

$this->registry = $managerRegistry;
$this->schedulers = $schedulers;
$this->cronCommands = $cronCommands;
parent::__construct('jms-job-queue:schedule');
}

protected function configure()
protected function configure(): void
{
$this
->setDescription('Schedules jobs at defined intervals')
Expand All @@ -41,7 +40,7 @@ protected function configure()
;
}

protected function execute(InputInterface $input, OutputInterface $output)
protected function execute(InputInterface $input, OutputInterface $output): int
{
$maxRuntime = $input->getOption('max-runtime');
if ($maxRuntime > 300) {
Expand Down Expand Up @@ -82,7 +81,7 @@ protected function execute(InputInterface $input, OutputInterface $output)
}
}

return 0;
return Command::SUCCESS;
}

/**
Expand All @@ -98,7 +97,7 @@ private function scheduleJobs(OutputInterface $output, array $jobSchedulers, arr
continue;
}

list($success, $newLastRunAt) = $this->acquireLock($name, $lastRunAt);
[$success, $newLastRunAt] = $this->acquireLock($name, $lastRunAt);
$jobsLastRunAt[$name] = $newLastRunAt;

if ($success) {
Expand Down Expand Up @@ -148,14 +147,12 @@ private function populateJobSchedulers()
{
$schedulers = [];
foreach ($this->schedulers as $scheduler) {
/** @var JobScheduler $scheduler */
foreach ($scheduler->getCommands() as $name) {
$schedulers[$name] = $scheduler;
}
}

foreach ($this->cronCommands as $command) {
/** @var CronCommand $command */
if ( ! $command instanceof Command) {
throw new \RuntimeException('CronCommand should only be used on Symfony commands.');
}
Expand Down
Loading