diff --git a/consumers/src/Database/PostgreSQL/Consumers/Components.hs b/consumers/src/Database/PostgreSQL/Consumers/Components.hs index 127e30a..36887f1 100644 --- a/consumers/src/Database/PostgreSQL/Consumers/Components.hs +++ b/consumers/src/Database/PostgreSQL/Consumers/Components.hs @@ -10,7 +10,7 @@ import Control.Concurrent.Lifted import Control.Concurrent.STM hiding (atomically) import Control.Concurrent.STM qualified as STM import Control.Concurrent.Thread.Lifted qualified as T -import Control.Exception (AsyncException (ThreadKilled)) +import Control.Exception (AsyncException (ThreadKilled), evaluate) import Control.Exception.Safe qualified as ES import Control.Monad import Control.Monad.Base @@ -22,6 +22,7 @@ import Data.Foldable qualified as F import Data.Function import Data.Int import Data.Map.Strict qualified as M +import Data.Maybe (catMaybes) import Data.Monoid.Utils import Database.PostgreSQL.Consumers.Config import Database.PostgreSQL.Consumers.Consumer @@ -297,10 +298,12 @@ spawnDispatcher :: forall m idx job . ( MonadBaseControl IO m , MonadLog m + , MonadCatch m , MonadMask m , MonadTime m , Show idx , ToSQL idx + , FromSQL idx ) => ConsumerConfig m idx job -> ConnectionSourceM m @@ -346,7 +349,7 @@ spawnDispatcher ConsumerConfig {..} cs cid semaphore runningJobsInfo runningJobs . (`finally` subtractJobs) . restore $ do - mapM startJob batch >>= mapM joinJob >>= updateJobs + mapM startJob (catMaybes batch) >>= mapM joinJob >>= updateJobs when (batchSize == limit) $ do maxBatchSize <- atomically $ do @@ -357,36 +360,57 @@ spawnDispatcher ConsumerConfig {..} cs cid semaphore runningJobsInfo runningJobs pure (batchSize > 0) - reserveJobs :: Int -> m ([job], Int) + reserveJobs :: Int -> m ([Maybe job], Int) reserveJobs limit = runDBT cs ts $ do now <- currentTime - n <- - runPreparedSQL (preparedSqlName "setReservation" ccJobsTable) $ - smconcat - [ "UPDATE" <+> raw ccJobsTable <+> "SET" - , " reserved_by =" cid - , ", attempts = CASE" - , " WHEN finished_at IS NULL THEN attempts + 1" - , " ELSE 1" - , " END" - , "WHERE id IN (" <> reservedJobs now <> ")" - , "RETURNING" <+> mintercalate ", " ccJobSelectors - ] - -- Decode lazily as we want the transaction to be as short as possible. - (,n) . F.toList . fmap ccJobFetcher <$> queryResult - where - reservedJobs :: UTCTime -> SQL - reservedJobs now = - smconcat - [ "SELECT id FROM" <+> raw ccJobsTable - , "WHERE" - , " reserved_by IS NULL" - , " AND run_at IS NOT NULL" - , " AND run_at <= " now - , " ORDER BY run_at" - , "LIMIT" limit - , "FOR UPDATE SKIP LOCKED" - ] + runPreparedSQL_ (preparedSqlName "getReservedIds" ccJobsTable) $ + smconcat + [ "SELECT id FROM" <+> raw ccJobsTable + , "WHERE" + , " reserved_by IS NULL" + , " AND run_at IS NOT NULL" + , " AND run_at <= " now + , " ORDER BY run_at" + , "LIMIT" limit + , "FOR UPDATE SKIP LOCKED" + ] + jobIds :: [idx] <- fetchMany runIdentity + if null jobIds + then pure ([], 0) + else + handle + ( \(SomeException e) -> do + logAttention "Failure to fetch the jobs, will reenqueue for 6 hours later" $ object ["error" .= show e, "job_ids" .= show jobIds] + let toUpdate :: [(idx, Result)] + toUpdate = (,Failed . RerunAfter . ihours $ 6) <$> jobIds + lift $ updateJobs toUpdate + pure ([], 0) + ) + ( do + n <- + runPreparedSQL (preparedSqlName "setReservation" ccJobsTable) $ + smconcat + [ "UPDATE" <+> raw ccJobsTable <+> "SET" + , " reserved_by =" cid + , ", attempts = CASE" + , " WHEN finished_at IS NULL THEN attempts + 1" + , " ELSE 1" + , " END" + , "WHERE id = ANY(" Array1 jobIds <+> ")" + , "RETURNING id, " <+> mintercalate ", " ccJobSelectors + ] + qr <- queryResult + results <- forM (F.toList qr) $ \(jobIdRow :*: other) -> + let jobId = runIdentity jobIdRow + in handle + ( \(SomeException e) -> do + logAttention "Failure to fetch job, will reenqueue for 6 hours later" $ object ["error" .= show e, "job_id" .= show jobId] + lift $ updateJobs [(jobId, Failed . RerunAfter . ihours $ 6)] + pure Nothing + ) + (liftBase . evaluate $ Just $! ccJobFetcher other) + pure (results, n) + ) -- Spawn each job in a separate thread. startJob :: job -> m (job, m (T.Result Result))