Skip to content

Commit 1b6022f

Browse files
committed
Make empty CQ init faster in case of clean shutdown
In case of a clean shutdown the value next_seq_id is stored in recovery terms. This value can be utilized by the queue index to provide better bounds in absence of segment files. Before this patch starting an empty classic queue with next_seq_id = 100_000_000 used to take about 26 seconds. With this patch it takes less than 1ms.
1 parent a22fa85 commit 1b6022f

File tree

2 files changed

+17
-3
lines changed

2 files changed

+17
-3
lines changed

deps/rabbit/src/rabbit_classic_queue_index_v2.erl

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@
2020
%% queue implementation itself.
2121
-export([pre_publish/7, flush_pre_publish_cache/2,
2222
sync/1, needs_sync/1, flush/1,
23-
bounds/1, next_segment_boundary/1]).
23+
bounds/1, bounds/2,
24+
next_segment_boundary/1]).
2425

2526
%% Used to upgrade/downgrade from/to the v1 index.
2627
-export([init_for_conversion/3]).
@@ -1191,11 +1192,18 @@ flush_pre_publish_cache(TargetRamCount, State) ->
11911192
-spec bounds(State) ->
11921193
{non_neg_integer(), non_neg_integer(), State}
11931194
when State::state().
1195+
bounds(State) ->
1196+
bounds(State, undefined).
11941197

1195-
bounds(State = #qi{ segments = Segments }) ->
1198+
-spec bounds(State, non_neg_integer() | undefiend) ->
1199+
{non_neg_integer(), non_neg_integer(), State}
1200+
when State::state().
1201+
bounds(State = #qi{ segments = Segments }, NextSeqIdHint) ->
11961202
?DEBUG("~0p", [State]),
11971203
%% We must special case when we are empty to make tests happy.
11981204
if
1205+
Segments =:= #{} andalso is_integer(NextSeqIdHint) ->
1206+
{NextSeqIdHint, NextSeqIdHint, State};
11991207
Segments =:= #{} ->
12001208
{0, 0, State};
12011209
true ->

deps/rabbit/src/rabbit_variable_queue.erl

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1172,7 +1172,13 @@ expand_delta(_SeqId, #delta { count = Count,
11721172

11731173
init(IsDurable, IndexState, StoreState, DeltaCount, DeltaBytes, Terms,
11741174
PersistentClient, TransientClient, VHost) ->
1175-
{LowSeqId, HiSeqId, IndexState1} = rabbit_classic_queue_index_v2:bounds(IndexState),
1175+
NextSeqIdHint =
1176+
case Terms of
1177+
non_clean_shutdown -> undefined;
1178+
_ -> proplists:get_value(next_seq_id, Terms)
1179+
end,
1180+
1181+
{LowSeqId, HiSeqId, IndexState1} = rabbit_classic_queue_index_v2:bounds(IndexState, NextSeqIdHint),
11761182

11771183
{NextSeqId, NextDeliverSeqId, DeltaCount1, DeltaBytes1} =
11781184
case Terms of

0 commit comments

Comments
 (0)