-
Notifications
You must be signed in to change notification settings - Fork 0
/
client.py
58 lines (45 loc) · 1.53 KB
/
client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import os
import time
import socketio
import urllib3
import sys
urllib3.disable_warnings() # Testing
SERVER_IP = os.environ.get("SERVER_IP", "localhost")
class Client:
def __init__(self):
verbose = True
self.client = socketio.Client(
ssl_verify=False,
logger=verbose,
engineio_logger=verbose,
reconnection=False,
)
self.received_status = None
self.client.on("connect", self.on_connected)
self.client.on("disconnect", self.on_disconnected)
self.client.on("status", self.on_status_received)
def __enter__(self):
try:
self.client.connect("https://{}:5000".format(SERVER_IP))
except socketio.exceptions.ConnectionError:
print("Connection error")
return self
def __exit__(self, exception_type, exception_value, traceback):
if self.client.connected:
self.client.disconnect()
def on_connected(self):
print("connection established")
self.client.emit(event="subscribe_status", data={"code": "ok"})
def on_disconnected(self):
print("disconnected from server")
def on_status_received(self, data):
print("data ", data)
self.received_status = data.get("status", None)
def wait_for_status_ok(self):
while self.client.connected and self.received_status != "ok":
time.sleep(0.1)
if __name__ == "__main__":
with Client() as sio:
sio.wait_for_status_ok()
sio.client.wait()
print("Done")