-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdemo.py
More file actions
216 lines (179 loc) Β· 7.14 KB
/
demo.py
File metadata and controls
216 lines (179 loc) Β· 7.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
"""
SonicWave Demo Script
Quick showcase demo for emergency communication system
"""
import time
import threading
import sys
import os
# Add the project root to the path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from client import SonicWaveClient, ClientConfig
from sonicwave.packet import EmergencyPacket
def demo_emergency_callback(packet: EmergencyPacket):
"""Callback for received emergency packets"""
print(f"\nπ¨ EMERGENCY ALERT π¨")
print(f"ββ Type: {packet.emergency_type.upper()}")
print(f"ββ Message: {packet.message}")
print(f"ββ Location: {packet.location}")
print(f"ββ Sender: {packet.sender_id}")
print(f"ββ Hops: {packet.hops}")
print(f"ββ Time: {time.strftime('%H:%M:%S', time.localtime(packet.timestamp))}")
print("=" * 50)
def run_demo():
"""Run the SonicWave demo"""
print("π SonicWave Emergency Communication System")
print("=" * 50)
print("Setting up client...")
# Configure client
config = ClientConfig(
server_host='localhost',
server_port=8080,
enable_udp=True,
enable_ggwave=True,
enable_ble=True, # Will fallback gracefully if not available
auto_relay=True,
max_hops=3
)
# Create client
client = SonicWaveClient(config, on_emergency_received=demo_emergency_callback)
try:
# Start client
print("Starting SonicWave client...")
client.start()
time.sleep(2) # Let it initialize
# Show status
stats = client.get_stats()
print(f"β
Client started successfully!")
print(f"Node ID: {stats['node_id']}")
print(f"Active transports: {', '.join(stats['active_transports'])}")
print("\n" + "=" * 50)
# Demo menu
while True:
print("\nπ± DEMO MENU")
print("1. Send Medical Emergency")
print("2. Send Fire Emergency")
print("3. Send Security Alert")
print("4. Send Custom Emergency")
print("5. Show Statistics")
print("6. Discover Peers")
print("7. Exit Demo")
choice = input("\nEnter choice (1-7): ").strip()
if choice == '1':
send_medical_emergency(client)
elif choice == '2':
send_fire_emergency(client)
elif choice == '3':
send_security_alert(client)
elif choice == '4':
send_custom_emergency(client)
elif choice == '5':
show_statistics(client)
elif choice == '6':
discover_peers(client)
elif choice == '7':
break
else:
print("β Invalid choice. Please try again.")
except KeyboardInterrupt:
print("\n\nDemo interrupted by user.")
finally:
print("\nπ Stopping client...")
client.stop()
print("β
Demo completed successfully!")
def send_medical_emergency(client: SonicWaveClient):
"""Send a medical emergency demo"""
print("\nπ₯ MEDICAL EMERGENCY DEMO")
location = {
'description': 'Conference Room B, 2nd Floor',
'building': 'Tech Conference Center',
'coordinates': {'lat': 40.7128, 'lng': -74.0060}
}
packet_id = client.send_emergency(
message="Person collapsed during presentation, needs immediate medical attention",
location=location,
emergency_type="medical",
metadata={'severity': 'high', 'conscious': False}
)
print(f"β
Medical emergency sent! Packet ID: {packet_id}")
print("π‘ Broadcasting through all available transports...")
def send_fire_emergency(client: SonicWaveClient):
"""Send a fire emergency demo"""
print("\nπ₯ FIRE EMERGENCY DEMO")
location = {
'description': 'Kitchen area, Ground Floor',
'building': 'Tech Conference Center',
'coordinates': {'lat': 40.7130, 'lng': -74.0058}
}
packet_id = client.send_emergency(
message="Smoke detected in kitchen, possible electrical fire",
location=location,
emergency_type="fire",
metadata={'smoke_level': 'high', 'evacuation_needed': True}
)
print(f"β
Fire emergency sent! Packet ID: {packet_id}")
print("π‘ Broadcasting through all available transports...")
def send_security_alert(client: SonicWaveClient):
"""Send a security alert demo"""
print("\nπ¨ SECURITY ALERT DEMO")
location = {
'description': 'Parking Lot Section C',
'building': 'Tech Conference Center',
'coordinates': {'lat': 40.7125, 'lng': -74.0065}
}
packet_id = client.send_emergency(
message="Suspicious individual attempting vehicle break-ins",
location=location,
emergency_type="security",
metadata={'threat_level': 'medium', 'police_notified': False}
)
print(f"β
Security alert sent! Packet ID: {packet_id}")
print("π‘ Broadcasting through all available transports...")
def send_custom_emergency(client: SonicWaveClient):
"""Send a custom emergency"""
print("\nπ CUSTOM EMERGENCY")
message = input("Enter emergency message: ")
emergency_type = input("Enter emergency type: ")
location_desc = input("Enter location description: ")
location = {
'description': location_desc,
'timestamp': time.time()
}
packet_id = client.send_emergency(message, location, emergency_type)
print(f"β
Custom emergency sent! Packet ID: {packet_id}")
def show_statistics(client: SonicWaveClient):
"""Show client statistics"""
stats = client.get_stats()
print("\nπ CLIENT STATISTICS")
print("=" * 30)
print(f"Node ID: {stats['node_id']}")
print(f"Status: {'π’ Running' if stats['running'] else 'π΄ Stopped'}")
print(f"Active Transports: {', '.join(stats['active_transports'])}")
print("\nπ Packet Statistics:")
print(f" Sent: {stats['stats']['packets_sent']}")
print(f" Received: {stats['stats']['packets_received']}")
print(f" Relayed: {stats['stats']['packets_relayed']}")
print(f" Server ACKs: {stats['stats']['server_acknowledgments']}")
print("\nπ Packet Manager:")
print(f" Processed: {stats['packet_manager']['processed_packets']}")
print(f" Pending ACKs: {stats['packet_manager']['pending_acks']}")
def discover_peers(client: SonicWaveClient):
"""Discover network peers"""
print("\nπ DISCOVERING PEERS...")
peers = client.discover_peers()
if peers:
print(f"β
Found {len(peers)} peers:")
for i, peer in enumerate(peers, 1):
print(f" {i}. {peer['type'].upper()}: {peer['id']}")
if 'rssi' in peer:
print(f" Signal: {peer['rssi']} dBm")
else:
print("β No peers discovered")
print("π‘ Try running another client instance or check network connectivity")
if __name__ == '__main__':
print("π Starting SonicWave Demo...")
print("π‘ Make sure the server is running: python server.py")
print("π‘ For best demo, run multiple client instances")
print("\nPress Ctrl+C at any time to exit")
input("\nPress Enter to continue...")
run_demo()