-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathtest_agent.py
More file actions
153 lines (116 loc) · 4.34 KB
/
test_agent.py
File metadata and controls
153 lines (116 loc) · 4.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import asyncio
import os
from contextlib import suppress
import pytest
import pytest_asyncio
from fishjam import FishjamClient, FishjamNotifier, Room
from fishjam.agent.agent import Agent
from fishjam.agent.errors import AgentAuthError
from fishjam.api._fishjam_client import AgentOptions, AgentOutputOptions
from fishjam.events._protos.fishjam import (
ServerMessagePeerDisconnected,
ServerMessagePeerMetadataUpdated,
)
from fishjam.events.allowed_notifications import AllowedNotification
HOST = "proxy" if os.getenv("DOCKER_TEST") == "TRUE" else "localhost"
FISHJAM_ID = f"http://{HOST}:5555"
SERVER_API_TOKEN = os.getenv("MANAGEMENT_TOKEN", "development")
@pytest.fixture
def room_api():
return FishjamClient(FISHJAM_ID, SERVER_API_TOKEN)
@pytest.fixture
def room(room_api: FishjamClient):
room = room_api.create_room()
yield room
room_api.delete_room(room.id)
@pytest.fixture
def agent(room: Room, room_api: FishjamClient):
agent = room_api.create_agent(room.id)
yield agent
room_api.delete_peer(room.id, agent.id)
@pytest_asyncio.fixture
async def notifier():
notifier = FishjamNotifier(
fishjam_id=FISHJAM_ID,
management_token=SERVER_API_TOKEN,
)
@notifier.on_server_notification
def _(notification: AllowedNotification):
print(f"Too fast!: {notification}")
pass
task = asyncio.create_task(notifier.connect())
await asyncio.sleep(2)
yield notifier
task.cancel()
with suppress(asyncio.TimeoutError):
async with asyncio.timeout(0):
await task
class TestAgentApi:
def test_create_delete_agent(self, room_api: FishjamClient, room: Room):
agent = room_api.create_agent(room.id)
room = room_api.get_room(room.id)
assert len(room.peers) == 1
assert room.peers[0].id == agent.id
assert room.peers[0].type_ == "agent"
assert room.peers[0].status == "disconnected"
room_api.delete_peer(room.id, agent.id)
room = room_api.get_room(room.id)
assert room.peers == []
def test_create_agent_with_options(self, room_api: FishjamClient, room: Room):
agent = room_api.create_agent(
room.id,
AgentOptions(
output=AgentOutputOptions(
audio_format="pcm16",
audio_sample_rate=24000,
)
),
)
room = room_api.get_room(room.id)
assert len(room.peers) == 1
assert room.peers[0].id == agent.id
assert room.peers[0].type_ == "agent"
assert room.peers[0].status == "disconnected"
assert room.peers[0].subscribe_mode == "auto"
room_api.delete_peer(room.id, agent.id)
room = room_api.get_room(room.id)
assert room.peers == []
async def wait_event(event: asyncio.Event, timeout: float = 5):
await asyncio.wait_for(event.wait(), timeout)
class TestAgentConnection:
@pytest.mark.asyncio
async def test_invalid_auth(self, room_api: FishjamClient):
agent = Agent("fake-id", "room-id", "fake-token", room_api._fishjam_url)
with pytest.raises(AgentAuthError):
async with agent.connect():
raise RuntimeError("Connect should have raised AgentAuthError.")
@pytest.mark.asyncio
async def test_context_manager(
self,
room_api: FishjamClient,
room: Room,
agent: Agent,
notifier: FishjamNotifier,
):
connect_event = asyncio.Event()
disconnect_event = asyncio.Event()
@notifier.on_server_notification
def _(notification: AllowedNotification):
print(f"Received notification {notification}")
if (
isinstance(notification, ServerMessagePeerMetadataUpdated)
and notification.peer_id == agent.id
):
connect_event.set()
if (
isinstance(notification, ServerMessagePeerDisconnected)
and notification.peer_id == agent.id
):
disconnect_event.set()
async with agent.connect():
await wait_event(connect_event)
room = room_api.get_room(room.id)
assert len(room.peers) == 1
assert room.peers[0].id == agent.id
assert room.peers[0].status == "connected"
await wait_event(disconnect_event)