Skip to content

Commit 7993e26

Browse files
Add an initial implementation of insert subscriptions.
This implementation also includes an example bytewax adapter which can be used to process inserts with bytewax. An example bytewax flow is shown below: ``` import uuid import btrdb from btrdb.experimental.bytewax_connectors import InsertSubscription from bytewax.dataflow import Dataflow from bytewax.connectors.stdio import StdOutput def selector(db): # Selector can be anything that returns a list of uuids. rows = db.query('select uuid from streams') uuids = [uuid.UUID(row['uuid']) for row in rows] return uuids flow = Dataflow() flow.input("realtime_sub", InsertSubscription(selector, selector_refresh_interval=30)) flow.output("print_output", StdOutput()) ```
1 parent 3235404 commit 7993e26

6 files changed

Lines changed: 391 additions & 145 deletions

File tree

btrdb/endpoint.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -386,3 +386,25 @@ def sql_query(self, stmt, params: typing.List):
386386
for page in self.stub.SQLQuery(request):
387387
check_proto_stat(page.stat)
388388
yield page.SQLQueryRow
389+
390+
@error_handler
391+
def subscribe(self, update_queue):
392+
def updates():
393+
while True:
394+
update = update_queue.get()
395+
if update is None:
396+
return
397+
(to_add, to_remove) = update
398+
if len(to_add) != 0:
399+
yield btrdb_pb2.SubscriptionUpdate(
400+
op=0, uuid=[uu.bytes for uu in to_add]
401+
)
402+
if len(to_remove) != 0:
403+
yield btrdb_pb2.SubscriptionUpdate(
404+
op=1, uuid=[uu.bytes for uu in to_remove]
405+
)
406+
407+
for response in self.stub.Subscribe(updates()):
408+
check_proto_stat(response.stat)
409+
with pa.ipc.open_stream(response.arrowBytes) as reader:
410+
yield uuid.UUID(bytes=response.uuid), reader.read_all()

btrdb/experimental/__init__.py

Whitespace-only changes.
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
import time
2+
import queue
3+
import threading
4+
import weakref
5+
import pyarrow as pa
6+
7+
from bytewax.inputs import DynamicInput, StatelessSource
8+
9+
import btrdb
10+
11+
_empty_values = pa.Table.from_arrays(
12+
[pa.array([]), pa.array([])],
13+
schema=pa.schema(
14+
[
15+
pa.field("time", pa.timestamp("ns", tz="UTC"), nullable=False),
16+
pa.field("value", pa.float64(), nullable=False),
17+
]
18+
),
19+
)
20+
21+
22+
class InsertSubscription(DynamicInput):
23+
def __init__(
24+
self,
25+
selector_fn,
26+
selector_refresh_interval=60 * 60 * 6,
27+
heartbeat_interval=None,
28+
profile=None,
29+
conn_str=None,
30+
apikey=None,
31+
):
32+
self._selector_fn = selector_fn
33+
self._conn_str = conn_str
34+
self._apikey = apikey
35+
self._profile = profile
36+
self._selector_refresh_interval = selector_refresh_interval
37+
self._heartbeat_interval = heartbeat_interval
38+
39+
class Source(StatelessSource):
40+
def __init__(
41+
self,
42+
db,
43+
selector_fn,
44+
selector_refresh_interval,
45+
heartbeat_interval,
46+
worker_index,
47+
worker_count,
48+
):
49+
self._db = db
50+
self._selector_fn = selector_fn
51+
self._worker_index = worker_index
52+
self._worker_count = worker_count
53+
self._selector_refresh_interval = selector_refresh_interval
54+
self._heartbeat_interval = heartbeat_interval
55+
self._read_worker_exception = None
56+
self._background_worker_exception = None
57+
self._del_event = threading.Event()
58+
self._update_queue = queue.Queue(1)
59+
self._data_queue = queue.Queue(512)
60+
61+
# self is wrapped in a weakref with the worker threads so
62+
# that the worker threads keep self alive.
63+
def read_worker(self, data):
64+
try:
65+
# Avoid exessive weakref lookups
66+
# by doing the lookup upfront initially.
67+
del_event = self._del_event
68+
data_queue = self._data_queue
69+
for dat in data:
70+
if del_event.is_set():
71+
return
72+
data_queue.put(dat)
73+
except Exception as e:
74+
self._read_worker_exception = e
75+
76+
# Self is a weakref, same as above.
77+
def background_worker(self):
78+
try:
79+
# Avoid exessive weakref lookups
80+
# by doing the lookup upfront initially.
81+
db = self._db
82+
del_event = self._del_event
83+
selector_fn = self._selector_fn
84+
worker_index = self._worker_index
85+
worker_count = self._worker_count
86+
data_queue = self._data_queue
87+
heartbeat_interval = self._heartbeat_interval
88+
last_heartbeat = time.monotonic()
89+
selector_refresh_interval = self._selector_refresh_interval
90+
last_selector_refresh = time.monotonic() - selector_refresh_interval
91+
current_uuids = set()
92+
while True:
93+
now = time.monotonic()
94+
if (now - last_selector_refresh) >= selector_refresh_interval:
95+
last_selector_refresh = now
96+
next_uuids = {
97+
uuid
98+
for uuid in selector_fn(db)
99+
if uuid.int % worker_count == worker_index
100+
}
101+
added_uuids = next_uuids - current_uuids
102+
removed_uuids = current_uuids - next_uuids
103+
if len(added_uuids) != 0 or len(removed_uuids) != 0:
104+
self._update_queue.put([added_uuids, removed_uuids])
105+
current_uuids = next_uuids
106+
if (
107+
heartbeat_interval is not None
108+
and (now - last_heartbeat) >= heartbeat_interval
109+
):
110+
last_heartbeat = now
111+
for uuid in current_uuids:
112+
while not del_event.is_set():
113+
try:
114+
data_queue.put((uuid, _empty_values), 0.1)
115+
break
116+
except queue.Full:
117+
pass
118+
if del_event.wait(1.0):
119+
return
120+
except Exception as e:
121+
self._background_worker_exception = e
122+
123+
weakself = weakref.proxy(self)
124+
data = db.ep.subscribe(self._update_queue)
125+
self._background_worker = threading.Thread(
126+
target=background_worker, args=[weakself], daemon=True
127+
)
128+
self._read_worker = threading.Thread(
129+
target=read_worker, args=[weakself, data], daemon=True
130+
)
131+
self._background_worker.start()
132+
self._read_worker.start()
133+
134+
def next(self):
135+
# Check if the selector thread has died.
136+
background_worker_exception = self._background_worker_exception
137+
if background_worker_exception is not None:
138+
raise background_worker_exception
139+
try:
140+
return self._data_queue.get_nowait()
141+
except queue.Empty:
142+
# Check if the reason no data arrived is because
143+
# the reader thead has died.
144+
read_worker_exception = self._read_worker_exception
145+
if read_worker_exception is not None:
146+
raise read_worker_exception
147+
return None
148+
149+
def __del__(self):
150+
# Signal workers to exit.
151+
self._del_event.set()
152+
# Signal the end of the subscription.
153+
self._update_queue.put(None)
154+
155+
def build(self, worker_index, worker_count):
156+
db = btrdb.connect(
157+
profile=self._profile,
158+
conn_str=self._conn_str,
159+
apikey=self._apikey,
160+
)
161+
return InsertSubscription.Source(
162+
db,
163+
self._selector_fn,
164+
self._selector_refresh_interval,
165+
self._heartbeat_interval,
166+
worker_index,
167+
worker_count,
168+
)

btrdb/grpcinterface/btrdb.proto

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ service BTrDB {
2828
rpc GetMetadataUsage(MetadataUsageParams) returns (MetadataUsageResponse);
2929
rpc GenerateCSV(GenerateCSVParams) returns (stream GenerateCSVResponse);
3030
rpc SQLQuery(SQLQueryParams) returns (stream SQLQueryResponse);
31+
rpc Subscribe(stream SubscriptionUpdate) returns (stream SubscriptionResp);
3132
//rpc SetCompactionConfig(SetCompactionConfigParams) returns (SetCompactionConfigResponse);
3233
//rpc GetCompactionConfig(GetCompactionConfigParams) returns (GetCompactionConfigResponse);
3334
}
@@ -426,3 +427,19 @@ message ReducedResolutionRange {
426427
int64 End = 2;
427428
uint32 Resolution = 3;
428429
}
430+
431+
enum SubscriptionUpdateOp {
432+
ADD_UUIDS = 0;
433+
REMOVE_UUIDS = 1;
434+
}
435+
436+
message SubscriptionUpdate {
437+
SubscriptionUpdateOp op = 1;
438+
repeated bytes uuid = 2;
439+
}
440+
441+
message SubscriptionResp {
442+
Status stat = 1;
443+
bytes uuid = 2;
444+
bytes arrowBytes = 3;
445+
}

0 commit comments

Comments
 (0)