Skip to content

Commit c988259

Browse files
ImeevMAlocker
authored andcommitted
proto: support queue
This patch adds the `push` field to `ExecuteRequest` and introduces `aeon_queue.proto` module. Needed for tarantool/aeon#488
1 parent 2d0e279 commit c988259

File tree

2 files changed

+174
-0
lines changed

2 files changed

+174
-0
lines changed

aeon_crud.proto

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,63 @@ service CRUDService {
4343
// Transactionally executes a set of read and write operations.
4444

4545
message ExecuteRequest {
46+
// Message queue push section in a transaction request.
47+
message Push {
48+
// Definition of messages pushed to a shard-local queue.
49+
message Message {
50+
// Topic of the message.
51+
string topic = 1;
52+
// Data of the message.
53+
Value data = 2;
54+
// Time to live of the message.
55+
double ttl = 3;
56+
}
57+
// The source code of the Lua function that will be used to generate
58+
// messages. It's optional: if omitted, messages from the `messages`
59+
// field will be sent instead.
60+
//
61+
// The function is passed three arguments: the resulting read set,
62+
// the resulting write set, and an optional additional argument specified
63+
// as `func_arg`. If the function raises an error, the error will be
64+
// returned in the `push_err` field. The transaction will not be affected.
65+
//
66+
// The function must return an array of messages. Message is an array
67+
// of `{topic, data, ttl}`, where `topic` is required and is a string
68+
// value, `data` is required and is of any supported value type,
69+
// and `ttl` is optional and represents a number of seconds.
70+
//
71+
// A read/write operation is passed in an array: {space, key, tuple}.
72+
// (without string key names).
73+
//
74+
// Below is an example of a Lua function that returns messages
75+
// with `read_topic` as topic and the first fields of the read tuples
76+
// as data if the optional argument is `true`, and `write_topic`
77+
// as topic and the first fields of the written tuples as data otherwise.
78+
//
79+
// function(rs, ws, arg)
80+
// local messages = {}
81+
// if arg == true then
82+
// for _, r in ipairs(rs) do
83+
// if r.tuple ~= nil then
84+
// table.insert(messages, {'read_topic', r.tuple[1]})
85+
// end
86+
// end
87+
// else
88+
// for _, w in ipairs(ws) do
89+
// if w.tuple ~= nil then
90+
// table.insert(messages, {'write_topic', w.tuple[1]})
91+
// end
92+
// end
93+
// end
94+
// return messages
95+
// end
96+
//
97+
string func = 1;
98+
// Additional argument to the push function. Optional.
99+
Value func_arg = 2;
100+
// Messages to send if the push function is not provided.
101+
repeated Message messages = 3;
102+
}
46103
// Array of read operations.
47104
repeated Operation read_set = 1;
48105
// Array of write operations.
@@ -89,6 +146,8 @@ message ExecuteRequest {
89146
// Map : space name -> tuple format.
90147
// Contains formats of all provided tuples. Optional.
91148
map<string, TupleFormat> tuple_formats = 6;
149+
// Description of messages to push when executing a transaction.
150+
Push push = 7;
92151
}
93152

94153
message ExecuteResponse {
@@ -103,6 +162,9 @@ message ExecuteResponse {
103162
// Map : space name -> tuple format.
104163
// Contains formats of all returned tuples.
105164
map<string, TupleFormat> tuple_formats = 5;
165+
// PushErr is the error returned by the push function, or nil
166+
// if no error occurred.
167+
Error push_err = 6;
106168
}
107169

108170
// Transactionally inserts tuples into a space.

aeon_queue.proto

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
syntax = "proto3";
2+
3+
import "aeon_error.proto";
4+
import "aeon_value.proto";
5+
6+
package aeon;
7+
8+
// Queue API to Aeon - a distributed database based on Tarantool.
9+
service QueueService {
10+
// Takes messages from a shard-local queue. The messages should be
11+
// released after processing. Taken messages are not available
12+
// to other consumers until they are released. If a consumer attempts
13+
// to take messages again without releasing a previously accepted batch,
14+
// the router will attempt to return the same batch of messages, although
15+
// this is not guaranteed. If a consumer takes messages exclusively,
16+
// no other consumer will be able to accept messages from the same shard
17+
// with the same topic until the taken messages are released.
18+
rpc TakeMessages(TakeMessagesRequest) returns (TakeMessagesResponse) {}
19+
20+
// Releases messages. If messages are released with the `done` flag set,
21+
// they will not be available to consumers subsequently. Otherwise,
22+
// messages may be re-taken by this consumer or taken by other consumers.
23+
rpc ReleaseMessages(ReleaseMessagesRequest)
24+
returns (ReleaseMessagesResponse) {}
25+
26+
// Returns the oldest message for all storages, or the oldest message
27+
// for each storage.
28+
rpc GetOldestMessages(GetOldestMessagesRequest)
29+
returns (GetOldestMessagesResponse) {}
30+
}
31+
32+
// Description of returned messages.
33+
message Message {
34+
// The shard where the message was taken from.
35+
string shard = 1;
36+
// The serial number of the message on the shard.
37+
uint64 lsn = 2;
38+
// Data of the message.
39+
Value data = 3;
40+
}
41+
42+
// Consumer description.
43+
message ConsumerRef {
44+
// The shard where the consumer took messages from and where
45+
// it was registered.
46+
string shard = 1;
47+
// Topic of taken messages.
48+
string topic = 2;
49+
// Consumer name.
50+
string consumer = 3;
51+
}
52+
53+
message TakeMessagesRequest {
54+
// Topic name.
55+
string topic = 1;
56+
// The unique name of the consumer that takes messages for processing.
57+
// This name will be used to return a reference to the consumer,
58+
// which should be used in `ReleaseMessages` to release taken messages.
59+
string consumer = 2;
60+
// Max number of returned messages.
61+
uint64 limit = 3;
62+
// Time for the consumer to process the messages. After this time,
63+
// messages will be released with the status `undone`.
64+
double ttl = 4;
65+
// Exclusive mode flag. If set, no other consumer can take messages from
66+
// the same topic on the same shard.
67+
bool exclusive = 5;
68+
// Time to wait for messages.
69+
double timeout = 6;
70+
}
71+
72+
message TakeMessagesResponse {
73+
// Error information. Set only on failure.
74+
Error error = 1;
75+
// Returned messages.
76+
repeated Message messages = 2;
77+
// Consumer reference used to release messages.
78+
ConsumerRef ref = 3;
79+
// True if these messages have already been taken by the same consumer,
80+
// false otherwise.
81+
bool taken_earlier = 4;
82+
}
83+
84+
message ReleaseMessagesRequest {
85+
// Consumer reference. Should be the same as returned by `TakeMessages`.
86+
ConsumerRef ref = 1;
87+
// If true, released messages can no longer be taken by consumers.
88+
// Otherwise, the released messages may be taken again.
89+
bool done = 2;
90+
}
91+
92+
message ReleaseMessagesResponse {
93+
// Error information. Set only on failure.
94+
Error error = 1;
95+
// True if messages were already released, false otherwise.
96+
bool released_earlier = 2;
97+
}
98+
99+
message GetOldestMessagesRequest {
100+
// Topic name.
101+
string topic = 1;
102+
// True if the oldest messages for each shard should be returned,
103+
// false if the oldest messages for all shards should be returned.
104+
bool for_each_shard = 2;
105+
}
106+
107+
message GetOldestMessagesResponse {
108+
// Error information. Set only on failure.
109+
Error error = 1;
110+
// Returned messages.
111+
repeated Message messages = 2;
112+
}

0 commit comments

Comments
 (0)