Skip to content

Commit a7b4dc8

Browse files
committed
Don't split ranges to refresh after the threshold
Cap the refresh window at the hypertable invalidation threshold to don't produce unnecessary batches for refresh.
1 parent e9d34b5 commit a7b4dc8

File tree

8 files changed

+343
-575
lines changed

8 files changed

+343
-575
lines changed

test/src/bgw/log.c

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020

2121
#include "compat/compat.h"
2222

23+
TS_FUNCTION_INFO_V1(ts_bgw_log_register_emit_log_hook);
24+
TS_FUNCTION_INFO_V1(ts_bgw_log_unregister_emit_log_hook);
25+
2326
static char *bgw_application_name = "unset";
2427

2528
void
@@ -137,3 +140,28 @@ ts_register_emit_log_hook()
137140
prev_emit_log_hook = emit_log_hook;
138141
emit_log_hook = emit_log_hook_callback;
139142
}
143+
144+
void
145+
ts_unregister_emit_log_hook()
146+
{
147+
emit_log_hook = prev_emit_log_hook;
148+
}
149+
150+
Datum
151+
ts_bgw_log_register_emit_log_hook(PG_FUNCTION_ARGS)
152+
{
153+
if (!PG_ARGISNULL(0))
154+
{
155+
ts_bgw_log_set_application_name(strdup(text_to_cstring(PG_GETARG_TEXT_P(0))));
156+
}
157+
158+
ts_register_emit_log_hook();
159+
PG_RETURN_VOID();
160+
}
161+
162+
Datum
163+
ts_bgw_log_unregister_emit_log_hook(PG_FUNCTION_ARGS)
164+
{
165+
ts_unregister_emit_log_hook();
166+
PG_RETURN_VOID();
167+
}

test/src/bgw/log.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,4 @@
77

88
extern void ts_bgw_log_set_application_name(char *name);
99
extern void ts_register_emit_log_hook(void);
10+
extern void ts_unregister_emit_log_hook(void);

tsl/src/continuous_aggs/refresh.c

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1040,6 +1040,12 @@ continuous_agg_split_refresh_window(ContinuousAgg *cagg, InternalTimeRange *orig
10401040
compute_inscribed_bucketed_refresh_window(cagg, &refresh_window, bucket_width);
10411041
}
10421042

1043+
/* We must cap the refresh window at the invalidation threshold to don't produce unnecessary
1044+
* batches */
1045+
int64 invalidation_threshold =
1046+
invalidation_threshold_get(cagg->data.raw_hypertable_id, refresh_window.type);
1047+
refresh_window.end = MIN(invalidation_threshold, refresh_window.end);
1048+
10431049
/* Check if the refresh size is large enough to produce bathes, if not then return no batches */
10441050
const int64 refresh_window_size = i64abs(refresh_window.end - refresh_window.start);
10451051
const int64 batch_size = (bucket_width * buckets_per_batch);

tsl/test/expected/cagg_bgw-15.out

Lines changed: 33 additions & 86 deletions
Large diffs are not rendered by default.

tsl/test/expected/cagg_bgw-16.out

Lines changed: 33 additions & 86 deletions
Large diffs are not rendered by default.

tsl/test/expected/cagg_bgw-17.out

Lines changed: 33 additions & 86 deletions
Large diffs are not rendered by default.

tsl/test/expected/cagg_policy_incremental.out

Lines changed: 170 additions & 270 deletions
Large diffs are not rendered by default.

tsl/test/sql/cagg_policy_incremental.sql

Lines changed: 39 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -4,23 +4,31 @@
44

55
\c :TEST_DBNAME :ROLE_CLUSTER_SUPERUSER
66

7-
CREATE OR REPLACE FUNCTION ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(timeout INT = -1, mock_start_time INT = 0) RETURNS VOID
7+
CREATE OR REPLACE FUNCTION ts_bgw_log_register_emit_log_hook(application_name TEXT) RETURNS VOID
88
AS :MODULE_PATHNAME LANGUAGE C VOLATILE;
9-
CREATE OR REPLACE FUNCTION ts_bgw_params_create() RETURNS VOID
10-
AS :MODULE_PATHNAME LANGUAGE C VOLATILE;
11-
CREATE OR REPLACE FUNCTION ts_bgw_params_destroy() RETURNS VOID
9+
CREATE OR REPLACE FUNCTION ts_bgw_log_unregister_emit_log_hook() RETURNS VOID
1210
AS :MODULE_PATHNAME LANGUAGE C VOLATILE;
13-
CREATE OR REPLACE FUNCTION ts_bgw_params_reset_time(set_time BIGINT = 0, wait BOOLEAN = false) RETURNS VOID
11+
CREATE OR REPLACE FUNCTION ts_bgw_params_create() RETURNS VOID
1412
AS :MODULE_PATHNAME LANGUAGE C VOLATILE;
1513

1614
-- Create a user with specific timezone and mock time
1715
CREATE ROLE test_cagg_refresh_policy_user WITH LOGIN;
18-
ALTER ROLE test_cagg_refresh_policy_user SET timezone TO 'UTC';
19-
ALTER ROLE test_cagg_refresh_policy_user SET timescaledb.current_timestamp_mock TO '2025-03-11 00:00:00+00';
2016
GRANT ALL ON SCHEMA public TO test_cagg_refresh_policy_user;
2117

2218
\c :TEST_DBNAME test_cagg_refresh_policy_user
2319

20+
CREATE PROCEDURE run_job_with_log(job_id INTEGER)
21+
AS
22+
$$
23+
BEGIN
24+
SET LOCAL application_name = 'cagg_policy_incremental';
25+
CALL run_job(job_id);
26+
END;
27+
$$
28+
LANGUAGE plpgsql;
29+
30+
SET timezone TO 'UTC';
31+
2432
CREATE TABLE public.bgw_log(
2533
msg_no INT,
2634
mock_time BIGINT,
@@ -30,12 +38,13 @@ CREATE TABLE public.bgw_log(
3038

3139
CREATE VIEW sorted_bgw_log AS
3240
SELECT
33-
msg_no,
34-
mock_time,
35-
application_name,
41+
row_number() OVER () AS msg_no,
3642
regexp_replace(regexp_replace(msg, '(Wait until|started at|execution time) [0-9]+(\.[0-9]+)?', '\1 (RANDOM)', 'g'), 'background worker "[^"]+"','connection') AS msg
3743
FROM
3844
bgw_log
45+
WHERE
46+
msg !~ '^(statement:|duration:|LOG: background worker "[^"]+" )'
47+
AND application_name = 'cagg_policy_incremental'
3948
ORDER BY
4049
mock_time,
4150
application_name COLLATE "C",
@@ -46,6 +55,7 @@ CREATE TABLE public.bgw_dsm_handle_store(
4655
);
4756
INSERT INTO public.bgw_dsm_handle_store VALUES (0);
4857
SELECT ts_bgw_params_create();
58+
SELECT ts_bgw_log_register_emit_log_hook('cagg_policy_incremental');
4959

5060
CREATE TABLE conditions (
5161
time TIMESTAMP WITH TIME ZONE NOT NULL,
@@ -87,6 +97,7 @@ SELECT
8797
start_offset => NULL,
8898
end_offset => NULL,
8999
schedule_interval => INTERVAL '1 h',
100+
initial_start => NOW() + INTERVAL '1 h',
90101
buckets_per_batch => 10
91102
) AS job_id \gset
92103

@@ -97,8 +108,7 @@ FROM
97108
WHERE
98109
job_id = :'job_id' \gset
99110

100-
SELECT ts_bgw_params_reset_time(0, true);
101-
SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25);
111+
CALL run_job_with_log(:'job_id');
102112
SELECT * FROM sorted_bgw_log;
103113

104114
CREATE MATERIALIZED VIEW conditions_by_day_manual_refresh
@@ -140,10 +150,7 @@ FROM
140150
config => jsonb_set(:'config', '{max_batches_per_execution}', '2')
141151
);
142152

143-
-- advance time by 1h so that job runs one more time
144-
SELECT ts_bgw_params_reset_time(extract(epoch from interval '1 hour')::bigint * 1000000, true);
145-
146-
SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25);
153+
CALL run_job_with_log(:'job_id');
147154
SELECT * FROM sorted_bgw_log;
148155

149156
SELECT count(*) FROM conditions_by_day;
@@ -157,10 +164,7 @@ FROM
157164
EXCEPT
158165
(SELECT * FROM conditions_by_day ORDER BY 1, 2)) AS diff;
159166

160-
-- advance time by 2h so that job runs one more time
161-
SELECT ts_bgw_params_reset_time(extract(epoch from interval '2 hour')::bigint * 1000000, true);
162-
163-
SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25);
167+
CALL run_job_with_log(:'job_id');
164168
SELECT * FROM sorted_bgw_log;
165169

166170
-- Should have no differences
@@ -193,11 +197,7 @@ FROM
193197
'1 hour'::interval) AS t,
194198
generate_series(1,5) AS d;
195199

196-
-- advance time by 3h so that job runs one more time
197-
SELECT ts_bgw_params_reset_time(extract(epoch from interval '3 hour')::bigint * 1000000, true);
198-
199-
-- Should process all four batches in the past
200-
SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25);
200+
CALL run_job_with_log(:'job_id');
201201
SELECT * FROM sorted_bgw_log;
202202

203203
SELECT count(*) FROM conditions_by_day;
@@ -239,11 +239,7 @@ FROM
239239
-- Truncate all data from the original hypertable
240240
TRUNCATE bgw_log, conditions;
241241

242-
-- advance time by 4h so that job runs one more time
243-
SELECT ts_bgw_params_reset_time(extract(epoch from interval '4 hour')::bigint * 1000000, true);
244-
245-
-- Should fallback to single batch processing because there's no data to be refreshed on the original hypertable
246-
SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25);
242+
CALL run_job_with_log(:'job_id');
247243
SELECT * FROM sorted_bgw_log;
248244

249245
-- Should return zero rows
@@ -262,11 +258,7 @@ FROM
262258

263259
TRUNCATE bgw_log;
264260

265-
-- advance time by 5h so that job runs one more time
266-
SELECT ts_bgw_params_reset_time(extract(epoch from interval '5 hour')::bigint * 1000000, true);
267-
268-
-- Should fallback to single batch processing because the refresh size is too small
269-
SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25);
261+
CALL run_job_with_log(:'job_id');
270262
SELECT * FROM sorted_bgw_log;
271263

272264
-- Should return 10 rows because the bucket width is `1 day` and buckets per batch is `10`
@@ -278,11 +270,7 @@ TRUNCATE conditions_by_day, conditions, bgw_log;
278270
INSERT INTO conditions
279271
VALUES ('2020-02-05 00:00:00+00', 1, 10);
280272

281-
-- advance time by 6h so that job runs one more time
282-
SELECT ts_bgw_params_reset_time(extract(epoch from interval '6 hour')::bigint * 1000000, true);
283-
284-
-- Should fallback to single batch processing because the refresh size is too small
285-
SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25);
273+
CALL run_job_with_log(:'job_id');
286274
SELECT * FROM sorted_bgw_log;
287275

288276
-- Should return 1 row
@@ -293,19 +281,21 @@ SELECT delete_job(:job_id);
293281
SELECT
294282
add_continuous_aggregate_policy(
295283
'conditions_by_day',
296-
start_offset => INTERVAL '15 days',
284+
start_offset => NULL,
297285
end_offset => NULL,
298286
schedule_interval => INTERVAL '1 h',
287+
initial_start => NOW() + INTERVAL '1 h',
299288
buckets_per_batch => 5,
300289
refresh_newest_first => true -- explicitly set to true to test the default behavior
301290
) AS job_id \gset
302291

303292
SELECT
304293
add_continuous_aggregate_policy(
305294
'conditions_by_day_manual_refresh',
306-
start_offset => INTERVAL '15 days',
295+
start_offset => NULL,
307296
end_offset => NULL,
308297
schedule_interval => INTERVAL '1 h',
298+
initial_start => NOW() + INTERVAL '1 h',
309299
buckets_per_batch => 0 -- 0 means no batching, so it will refresh all buckets in one go
310300
) AS job_id_manual \gset
311301

@@ -321,8 +311,8 @@ FROM
321311
'1 hour'::interval) AS t,
322312
generate_series(1,5) AS d;
323313

324-
SELECT ts_bgw_params_reset_time(0, true);
325-
SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25);
314+
CALL run_job_with_log(:'job_id');
315+
CALL run_job_with_log(:'job_id_manual');
326316
SELECT * FROM sorted_bgw_log;
327317

328318
-- Both continuous aggregates should have the same data
@@ -344,9 +334,10 @@ SELECT delete_job(:job_id_manual);
344334
SELECT
345335
add_continuous_aggregate_policy(
346336
'conditions_by_day',
347-
start_offset => INTERVAL '15 days',
337+
start_offset => NULL,
348338
end_offset => NULL,
349339
schedule_interval => INTERVAL '1 h',
340+
initial_start => NOW() + INTERVAL '1 h',
350341
buckets_per_batch => 5,
351342
refresh_newest_first => false
352343
) AS job_id \gset
@@ -360,8 +351,7 @@ WHERE
360351

361352
TRUNCATE bgw_log, conditions_by_day;
362353

363-
SELECT ts_bgw_params_reset_time(0, true);
364-
SELECT ts_bgw_db_scheduler_test_run_and_wait_for_scheduler_finish(25);
354+
CALL run_job_with_log(:'job_id');
365355
SELECT * FROM sorted_bgw_log;
366356

367357
-- Both continuous aggregates should have the same data
@@ -376,6 +366,8 @@ FROM
376366
EXCEPT
377367
(SELECT * FROM conditions_by_day ORDER BY 1, 2)) AS diff;
378368

369+
SELECT ts_bgw_log_unregister_emit_log_hook();
370+
379371
\c :TEST_DBNAME :ROLE_CLUSTER_SUPERUSER
380372
REASSIGN OWNED BY test_cagg_refresh_policy_user TO :ROLE_CLUSTER_SUPERUSER;
381373
REVOKE ALL ON SCHEMA public FROM test_cagg_refresh_policy_user;

0 commit comments

Comments
 (0)