Skip to content

Commit 5554dd7

Browse files
authored
Merge pull request #245 from kili-technology/feature/4574-8-aa-u-i-can-use-the-webhooks-extensively
AA U, I can use the webhooks extensively
2 parents dcfa666 + acc03fb commit 5554dd7

File tree

1 file changed

+32
-10
lines changed

1 file changed

+32
-10
lines changed

kili/graphql_client.py

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,13 @@ def __init__(self, url):
8383
self._subscription_running = False
8484
self._st_id = None
8585

86+
def _reconnect(self):
87+
self._conn = websocket.create_connection(self.ws_url,
88+
on_message=self._on_message,
89+
subprotocols=[GQL_WS_SUBPROTOCOL])
90+
self._conn.on_message = self._on_message
91+
self._subscription_running = True
92+
8693
def _on_message(self, message):
8794
data = json.loads(message)
8895
# skip keepalive messages
@@ -124,23 +131,38 @@ def query(self, query, variables=None, headers=None):
124131
self._stop(_id)
125132
return res
126133

127-
def subscribe(self, query, variables=None, headers=None, callback=None, authorization=None):
134+
def prepare_subscribe(self, query, variables, headers, callback, authorization):
128135
self._conn_init(headers, authorization)
129136
payload = {'headers': headers, 'query': query, 'variables': variables}
130137
_cc = self._on_message if not callback else callback
131138
_id = self._start(payload)
139+
return _cc, _id
140+
141+
def subscribe(self, query, variables=None, headers=None, callback=None, authorization=None):
142+
_cc, _id = self.prepare_subscribe(query, variables, headers, callback, authorization)
132143

133144
def subs(_cc):
145+
total_reconnections = 0
146+
max_reconnections = 10
134147
self._subscription_running = True
135-
while self._subscription_running:
136-
r = json.loads(self._conn.recv())
137-
if r['type'] == 'error' or r['type'] == 'complete':
138-
print(r)
139-
self.stop_subscribe(_id)
140-
break
141-
elif r['type'] != 'ka':
142-
_cc(_id, r)
143-
time.sleep(1)
148+
while self._subscription_running and total_reconnections < max_reconnections:
149+
try:
150+
r = json.loads(self._conn.recv())
151+
if r['type'] == 'error' or r['type'] == 'complete':
152+
print(r)
153+
self.stop_subscribe(_id)
154+
break
155+
elif r['type'] != 'ka':
156+
_cc(_id, r)
157+
time.sleep(1)
158+
except websocket._exceptions.WebSocketConnectionClosedException as e:
159+
print('Connection closed error : {}'.format(str(e)))
160+
print(f'Will try to reconnect {max_reconnections - total_reconnections} times...')
161+
self._reconnect()
162+
total_reconnections += 1
163+
_cc, _id = self.prepare_subscribe(query, variables, headers, callback, authorization)
164+
continue
165+
print('Did not reconnect successfully...')
144166

145167
self._st_id = threading.Thread(target=subs, args=(_cc,))
146168
self._st_id.start()

0 commit comments

Comments
 (0)