Skip to content

Commit 7d7eaf4

Browse files
committed
fix: clamp max_age metrics at 0
After the change in #89 to have `delayed` surface DB-accurate max_age metrics, it appears that PostgreSQL will occasionally report a `run_at` that is milliseconds _after_ the current NOW() in UTC time. This is more than just clock drift between the DB and the server that inserted the row -- in READ COMMITTED mode, statements can see rows committed after the start of the transaction, while NOW() is frozen to the transaction start time. (Even without an explicit `BEGIN`, bare statements are wrapped in an implicit transaction.) This PR: - Switches PostgreSQL to `STATEMENT_TIMESTAMP()` to get even closer to the time at which the `SELECT` actually began. - Clamps at 0 anyways because clock drift is still a possibility. /no-platform stack-info: PR: #94, branch: smudge/stack/5
1 parent d3bfb4c commit 7d7eaf4

3 files changed

Lines changed: 17 additions & 16 deletions

File tree

lib/delayed/monitor.rb

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ def query_for(metric)
3838
def self.sql_now_in_utc
3939
case ActiveRecord::Base.connection.adapter_name
4040
when 'PostgreSQL'
41-
"TIMEZONE('UTC', NOW())"
41+
"TIMEZONE('UTC', STATEMENT_TIMESTAMP())"
4242
when 'MySQL', 'Mysql2'
4343
"UTC_TIMESTAMP()"
4444
else
@@ -142,16 +142,16 @@ def failed_count_grouped
142142
end
143143

144144
def max_lock_age_grouped
145-
oldest_locked_at_query.transform_values { |j| db_now(j) - j.locked_at }
145+
oldest_locked_at_query.transform_values { |j| time_ago(db_now(j), j.locked_at) }
146146
end
147147

148148
def max_age_grouped
149-
oldest_run_at_query.transform_values { |j| db_now(j) - j.run_at }
149+
oldest_run_at_query.transform_values { |j| time_ago(db_now(j), j.run_at) }
150150
end
151151

152152
def alert_age_percent_grouped
153153
oldest_run_at_query.each_with_object({}) do |((priority, queue), j), metrics|
154-
max_age = db_now(j) - j.run_at
154+
max_age = time_ago(db_now(j), j.run_at)
155155
alert_age = Priority.new(priority).alert_age
156156
metrics[[priority, queue]] = [max_age / alert_age * 100, 100].min if alert_age
157157
end
@@ -183,6 +183,10 @@ def db_now(record)
183183
self.class.parse_utc_time(record.db_now_utc)
184184
end
185185

186+
def time_ago(now, value)
187+
[now - (value || now), 0].max
188+
end
189+
186190
def priority_case_statement
187191
[
188192
'CASE',

spec/delayed/__snapshots__/monitor_spec.rb.snap

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ GroupAggregate (cost=...)
149149
---------------------------------
150150
SELECT (CASE WHEN priority < 10 THEN 0 WHEN priority < 20 THEN 10 WHEN priority < 30 THEN 20 WHEN priority >= 30 THEN 30 END) AS priority,
151151
queue, MIN(locked_at) AS locked_at,
152-
TIMEZONE('UTC', NOW()) AS db_now_utc
152+
TIMEZONE('UTC', STATEMENT_TIMESTAMP()) AS db_now_utc
153153
FROM (SELECT priority, queue, MIN(locked_at) AS locked_at
154154
FROM \"delayed_jobs\"
155155
WHERE \"delayed_jobs\".\"locked_at\" >= '2025-11-10 16:59:43'
@@ -159,7 +159,7 @@ SELECT (CASE WHEN priority < 10 THEN 0 WHEN priority < 20 THEN 10 WHEN priority
159159
GROUP BY CASE WHEN priority < 10 THEN 0 WHEN priority < 20 THEN 10 WHEN priority < 30 THEN 20 WHEN priority >= 30 THEN 30 END, \"queue\"
160160
161161
GroupAggregate (cost=...)
162-
Output: (CASE WHEN (subquery.priority < 10) THEN 0 WHEN (subquery.priority < 20) THEN 10 WHEN (subquery.priority < 30) THEN 20 WHEN (subquery.priority >= 30) THEN 30 ELSE NULL::integer END), subquery.queue, min(subquery.locked_at), timezone('UTC'::text, now())
162+
Output: (CASE WHEN (subquery.priority < 10) THEN 0 WHEN (subquery.priority < 20) THEN 10 WHEN (subquery.priority < 30) THEN 20 WHEN (subquery.priority >= 30) THEN 30 ELSE NULL::integer END), subquery.queue, min(subquery.locked_at), timezone('UTC'::text, statement_timestamp())
163163
Group Key: (CASE WHEN (subquery.priority < 10) THEN 0 WHEN (subquery.priority < 20) THEN 10 WHEN (subquery.priority < 30) THEN 20 WHEN (subquery.priority >= 30) THEN 30 ELSE NULL::integer END), subquery.queue
164164
-> Sort (cost=...)
165165
Output: (CASE WHEN (subquery.priority < 10) THEN 0 WHEN (subquery.priority < 20) THEN 10 WHEN (subquery.priority < 30) THEN 20 WHEN (subquery.priority >= 30) THEN 30 ELSE NULL::integer END), subquery.queue, subquery.locked_at
@@ -181,7 +181,7 @@ GroupAggregate (cost=...)
181181
---------------------------------
182182
SELECT (CASE WHEN priority < 10 THEN 0 WHEN priority < 20 THEN 10 WHEN priority < 30 THEN 20 WHEN priority >= 30 THEN 30 END) AS priority,
183183
queue, MIN(run_at) AS run_at,
184-
TIMEZONE('UTC', NOW()) AS db_now_utc
184+
TIMEZONE('UTC', STATEMENT_TIMESTAMP()) AS db_now_utc
185185
FROM (SELECT priority, queue, MIN(run_at) AS run_at
186186
FROM \"delayed_jobs\"
187187
WHERE (\"delayed_jobs\".\"locked_at\" IS NULL
@@ -192,7 +192,7 @@ SELECT (CASE WHEN priority < 10 THEN 0 WHEN priority < 20 THEN 10 WHEN priority
192192
GROUP BY CASE WHEN priority < 10 THEN 0 WHEN priority < 20 THEN 10 WHEN priority < 30 THEN 20 WHEN priority >= 30 THEN 30 END, \"queue\"
193193
194194
GroupAggregate (cost=...)
195-
Output: (CASE WHEN (subquery.priority < 10) THEN 0 WHEN (subquery.priority < 20) THEN 10 WHEN (subquery.priority < 30) THEN 20 WHEN (subquery.priority >= 30) THEN 30 ELSE NULL::integer END), subquery.queue, min(subquery.run_at), timezone('UTC'::text, now())
195+
Output: (CASE WHEN (subquery.priority < 10) THEN 0 WHEN (subquery.priority < 20) THEN 10 WHEN (subquery.priority < 30) THEN 20 WHEN (subquery.priority >= 30) THEN 30 ELSE NULL::integer END), subquery.queue, min(subquery.run_at), timezone('UTC'::text, statement_timestamp())
196196
Group Key: (CASE WHEN (subquery.priority < 10) THEN 0 WHEN (subquery.priority < 20) THEN 10 WHEN (subquery.priority < 30) THEN 20 WHEN (subquery.priority >= 30) THEN 30 ELSE NULL::integer END), subquery.queue
197197
-> Sort (cost=...)
198198
Output: (CASE WHEN (subquery.priority < 10) THEN 0 WHEN (subquery.priority < 20) THEN 10 WHEN (subquery.priority < 30) THEN 20 WHEN (subquery.priority >= 30) THEN 30 ELSE NULL::integer END), subquery.queue, subquery.run_at
@@ -408,7 +408,7 @@ GroupAggregate (cost=...)
408408
---------------------------------
409409
SELECT (CASE WHEN priority < 10 THEN 0 WHEN priority < 20 THEN 10 WHEN priority < 30 THEN 20 WHEN priority >= 30 THEN 30 END) AS priority,
410410
queue, MIN(locked_at) AS locked_at,
411-
TIMEZONE('UTC', NOW()) AS db_now_utc
411+
TIMEZONE('UTC', STATEMENT_TIMESTAMP()) AS db_now_utc
412412
FROM (SELECT priority, queue, MIN(locked_at) AS locked_at
413413
FROM \"delayed_jobs\"
414414
WHERE \"delayed_jobs\".\"locked_at\" >= '2025-11-10 16:59:43'
@@ -418,7 +418,7 @@ SELECT (CASE WHEN priority < 10 THEN 0 WHEN priority < 20 THEN 10 WHEN priority
418418
GROUP BY CASE WHEN priority < 10 THEN 0 WHEN priority < 20 THEN 10 WHEN priority < 30 THEN 20 WHEN priority >= 30 THEN 30 END, \"queue\"
419419
420420
GroupAggregate (cost=...)
421-
Output: (CASE WHEN (subquery.priority < 10) THEN 0 WHEN (subquery.priority < 20) THEN 10 WHEN (subquery.priority < 30) THEN 20 WHEN (subquery.priority >= 30) THEN 30 ELSE NULL::integer END), subquery.queue, min(subquery.locked_at), timezone('UTC'::text, now())
421+
Output: (CASE WHEN (subquery.priority < 10) THEN 0 WHEN (subquery.priority < 20) THEN 10 WHEN (subquery.priority < 30) THEN 20 WHEN (subquery.priority >= 30) THEN 30 ELSE NULL::integer END), subquery.queue, min(subquery.locked_at), timezone('UTC'::text, statement_timestamp())
422422
Group Key: (CASE WHEN (subquery.priority < 10) THEN 0 WHEN (subquery.priority < 20) THEN 10 WHEN (subquery.priority < 30) THEN 20 WHEN (subquery.priority >= 30) THEN 30 ELSE NULL::integer END), subquery.queue
423423
-> Sort (cost=...)
424424
Output: (CASE WHEN (subquery.priority < 10) THEN 0 WHEN (subquery.priority < 20) THEN 10 WHEN (subquery.priority < 30) THEN 20 WHEN (subquery.priority >= 30) THEN 30 ELSE NULL::integer END), subquery.queue, subquery.locked_at
@@ -440,7 +440,7 @@ GroupAggregate (cost=...)
440440
---------------------------------
441441
SELECT (CASE WHEN priority < 10 THEN 0 WHEN priority < 20 THEN 10 WHEN priority < 30 THEN 20 WHEN priority >= 30 THEN 30 END) AS priority,
442442
queue, MIN(run_at) AS run_at,
443-
TIMEZONE('UTC', NOW()) AS db_now_utc
443+
TIMEZONE('UTC', STATEMENT_TIMESTAMP()) AS db_now_utc
444444
FROM (SELECT priority, queue, MIN(run_at) AS run_at
445445
FROM \"delayed_jobs\"
446446
WHERE (\"delayed_jobs\".\"locked_at\" IS NULL
@@ -451,7 +451,7 @@ SELECT (CASE WHEN priority < 10 THEN 0 WHEN priority < 20 THEN 10 WHEN priority
451451
GROUP BY CASE WHEN priority < 10 THEN 0 WHEN priority < 20 THEN 10 WHEN priority < 30 THEN 20 WHEN priority >= 30 THEN 30 END, \"queue\"
452452
453453
GroupAggregate (cost=...)
454-
Output: (CASE WHEN (subquery.priority < 10) THEN 0 WHEN (subquery.priority < 20) THEN 10 WHEN (subquery.priority < 30) THEN 20 WHEN (subquery.priority >= 30) THEN 30 ELSE NULL::integer END), subquery.queue, min(subquery.run_at), timezone('UTC'::text, now())
454+
Output: (CASE WHEN (subquery.priority < 10) THEN 0 WHEN (subquery.priority < 20) THEN 10 WHEN (subquery.priority < 30) THEN 20 WHEN (subquery.priority >= 30) THEN 30 ELSE NULL::integer END), subquery.queue, min(subquery.run_at), timezone('UTC'::text, statement_timestamp())
455455
Group Key: (CASE WHEN (subquery.priority < 10) THEN 0 WHEN (subquery.priority < 20) THEN 10 WHEN (subquery.priority < 30) THEN 20 WHEN (subquery.priority >= 30) THEN 30 ELSE NULL::integer END), subquery.queue
456456
-> Sort (cost=...)
457457
Output: (CASE WHEN (subquery.priority < 10) THEN 0 WHEN (subquery.priority < 20) THEN 10 WHEN (subquery.priority < 30) THEN 20 WHEN (subquery.priority >= 30) THEN 30 ELSE NULL::integer END), subquery.queue, subquery.run_at

spec/helper.rb

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -228,10 +228,7 @@ def supports_block_expectations?
228228
diffable
229229

230230
match do |block|
231-
if @approximately && current_adapter != 'postgresql'
232-
@expected_value = a_value_within([2, @expected_value.abs * 0.05].max).of(@expected_value)
233-
end
234-
231+
@expected_value = a_value_within([2, @expected_value.abs * 0.05].max).of(@expected_value) if @approximately
235232
@expected = { event_name: expected_event_name, payload: expected_payload, value: @expected_value }
236233
@actuals = []
237234
callback = ->(name, _started, _finished, _unique_id, payload) do

0 commit comments

Comments
 (0)