Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 54 additions & 30 deletions consumers/src/Database/PostgreSQL/Consumers/Components.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import Control.Concurrent.Lifted
import Control.Concurrent.STM hiding (atomically)
import Control.Concurrent.STM qualified as STM
import Control.Concurrent.Thread.Lifted qualified as T
import Control.Exception (AsyncException (ThreadKilled))
import Control.Exception (AsyncException (ThreadKilled), evaluate)
import Control.Exception.Safe qualified as ES
import Control.Monad
import Control.Monad.Base
Expand All @@ -22,6 +22,7 @@ import Data.Foldable qualified as F
import Data.Function
import Data.Int
import Data.Map.Strict qualified as M
import Data.Maybe (catMaybes)
import Data.Monoid.Utils
import Database.PostgreSQL.Consumers.Config
import Database.PostgreSQL.Consumers.Consumer
Expand Down Expand Up @@ -297,10 +298,12 @@ spawnDispatcher
:: forall m idx job
. ( MonadBaseControl IO m
, MonadLog m
, MonadCatch m
, MonadMask m
, MonadTime m
, Show idx
, ToSQL idx
, FromSQL idx
)
=> ConsumerConfig m idx job
-> ConnectionSourceM m
Expand Down Expand Up @@ -346,7 +349,7 @@ spawnDispatcher ConsumerConfig {..} cs cid semaphore runningJobsInfo runningJobs
. (`finally` subtractJobs)
. restore
$ do
mapM startJob batch >>= mapM joinJob >>= updateJobs
mapM startJob (catMaybes batch) >>= mapM joinJob >>= updateJobs

when (batchSize == limit) $ do
maxBatchSize <- atomically $ do
Expand All @@ -357,36 +360,57 @@ spawnDispatcher ConsumerConfig {..} cs cid semaphore runningJobsInfo runningJobs

pure (batchSize > 0)

reserveJobs :: Int -> m ([job], Int)
reserveJobs :: Int -> m ([Maybe job], Int)
reserveJobs limit = runDBT cs ts $ do
now <- currentTime
n <-
runPreparedSQL (preparedSqlName "setReservation" ccJobsTable) $
smconcat
[ "UPDATE" <+> raw ccJobsTable <+> "SET"
, " reserved_by =" <?> cid
, ", attempts = CASE"
, " WHEN finished_at IS NULL THEN attempts + 1"
, " ELSE 1"
, " END"
, "WHERE id IN (" <> reservedJobs now <> ")"
, "RETURNING" <+> mintercalate ", " ccJobSelectors
]
-- Decode lazily as we want the transaction to be as short as possible.
(,n) . F.toList . fmap ccJobFetcher <$> queryResult
where
reservedJobs :: UTCTime -> SQL
reservedJobs now =
smconcat
[ "SELECT id FROM" <+> raw ccJobsTable
, "WHERE"
, " reserved_by IS NULL"
, " AND run_at IS NOT NULL"
, " AND run_at <= " <?> now
, " ORDER BY run_at"
, "LIMIT" <?> limit
, "FOR UPDATE SKIP LOCKED"
]
runPreparedSQL_ (preparedSqlName "getReservedIds" ccJobsTable) $
smconcat
[ "SELECT id FROM" <+> raw ccJobsTable
, "WHERE"
, " reserved_by IS NULL"
, " AND run_at IS NOT NULL"
, " AND run_at <= " <?> now
, " ORDER BY run_at"
, "LIMIT" <?> limit
, "FOR UPDATE SKIP LOCKED"
]
jobIds :: [idx] <- fetchMany runIdentity
if null jobIds
then pure ([], 0)
else
handle
( \(SomeException e) -> do
logAttention "Failure to fetch the jobs, will reenqueue for 6 hours later" $ object ["error" .= show e, "job_ids" .= show jobIds]
let toUpdate :: [(idx, Result)]
toUpdate = (,Failed . RerunAfter . ihours $ 6) <$> jobIds
lift $ updateJobs toUpdate
pure ([], 0)
)
( do
n <-
runPreparedSQL (preparedSqlName "setReservation" ccJobsTable) $
smconcat
[ "UPDATE" <+> raw ccJobsTable <+> "SET"
, " reserved_by =" <?> cid
, ", attempts = CASE"
, " WHEN finished_at IS NULL THEN attempts + 1"
, " ELSE 1"
, " END"
, "WHERE id = ANY(" <?> Array1 jobIds <+> ")"
, "RETURNING id, " <+> mintercalate ", " ccJobSelectors
]
qr <- queryResult
results <- forM (F.toList qr) $ \(jobIdRow :*: other) ->
let jobId = runIdentity jobIdRow
in handle
( \(SomeException e) -> do
logAttention "Failure to fetch job, will reenqueue for 6 hours later" $ object ["error" .= show e, "job_id" .= show jobId]
lift $ updateJobs [(jobId, Failed . RerunAfter . ihours $ 6)]
pure Nothing
)
(liftBase . evaluate $ Just $! ccJobFetcher other)
pure (results, n)
)

-- Spawn each job in a separate thread.
startJob :: job -> m (job, m (T.Result Result))
Expand Down
Loading