-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathAgent.py
More file actions
193 lines (143 loc) · 9.02 KB
/
Agent.py
File metadata and controls
193 lines (143 loc) · 9.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
import logging
import Configuration
import BatteryFactory
log = logging.getLogger(__name__)
class Agent:
DEFAULT_BATTERY_LIFE = 3600
def __init__(self, identifier, speed, delivery_order, battery_life):
self.identifier = identifier
self.speed = int(speed * Configuration.slot_resolution) # Agent speed is given in distance units per second - but internally we have to use slots per second
self.delivery_order = delivery_order
self.previous_position = None
self.position = 0
self.movement_time = 0
self.current_risk = 0
self.sum_of_risk = 0
self.max_risk = 0
self.current_link = None
self.battery = None
if not self.delivery_order.is_completed():
self.current_link = delivery_order.get_start_link()
self.current_link.set_slot_occupation_at(self.position)
if battery_life is None:
battery_life = Agent.DEFAULT_BATTERY_LIFE
self.battery = BatteryFactory.get_battery(battery_life, self)
# We assume that move() is called every simulation step (which is 1 second in real life) starting from the start
# of the delivery_order
def move(self):
if self.current_link is None:
log.warning('Link is None')
return
if self.current_link.number_of_agents_that_moved_on_this_link_this_timestep >= self.current_link.capacity:
return
if self.delivery_order.is_completed():
log.warning('Agent ' + str(self.identifier) + ' completed delivery order -> does not move.')
return
if not self.delivery_order.route:
log.warning('Agent ' + str(self.identifier) + ' delivery order has an empty route -> Delivery order is set to finished')
self.delivery_order.set_completed()
return
self.movement_time += 1
if Configuration.risk_calculation_enabled:
self.__calculate_risk()
self.__move_on_link()
self.current_link.number_of_agents_that_moved_on_this_link_this_timestep += 1
self.battery.discharge()
def __move_on_link(self):
if self.is_space_before_me_occupied():
# if someone is too close to me, I can't move
log.debug(str(self.identifier) + ': Space before me occupied, cannot move')
return
if (self.position + self.speed) < self.current_link.get_number_of_slots():
# I am free to move within a link
self.__move_position_according_to_speed()
log.debug(str(self.identifier) + ': Much space, moved freely')
else:
# I am nearing the end of the link and have to switch to the next link
if self.delivery_order.i_am_on_finish_link():
# I am finished!
self.current_link.unset_slot_occupation_at(self.position)
self.position = self.current_link.get_number_of_slots() - 1
# do not set a new slot occupation
self.delivery_order.set_completed()
elif self.delivery_order.next_link_is_full():
# But the next link is full (capacity limit), therefore I can't move
log.debug(str(self.identifier) + ': Next link full, cannot move')
else:
# I can switch to the next link and start there from position zero
self.current_link.unset_slot_occupation_at(self.position)
self.current_link = self.delivery_order.next_link()
self.position = 0
self.current_link.set_slot_occupation_at(self.position)
log.debug(str(self.identifier) + ': Switched link')
def __move_position_according_to_speed(self):
self.current_link.unset_slot_occupation_at(self.position)
self.position += self.speed
self.current_link.set_slot_occupation_at(self.position)
def is_space_before_me_occupied(self):
# Returns if from the current position, another agent is visible within the range of speed or up to the end of the link
if (self.position + self.speed) < self.current_link.get_number_of_slots():
return True in self.current_link.slot_occupations[self.position + 1:(self.position + self.speed)]
else:
return True in self.current_link.slot_occupations[
self.position + 1:] # the construct in brackets [] returns all elements starting from position, until end of list https://stackoverflow.com/questions/621354/how-to-slice-a-list-from-an-element-n-to-the-end-in-python
def __calculate_risk(self):
radius = int(Configuration.risk_checking_radius)
number_of_agents_within_radius = 0
# Scan the previous and next links for agents if the radius overlaps to the previous or next links
if self.position - radius < 0:
length_to_be_scanned_on_previous_links = abs(self.position - radius)
# Get the previous links
if self.current_link.start_node.incoming_links:
for link in self.current_link.start_node.incoming_links:
number_of_agents_within_radius += sum(link.slot_occupations[-length_to_be_scanned_on_previous_links:])
# Get the previous links
if self.current_link.start_node.outgoing_links:
for link in self.current_link.start_node.outgoing_links:
if link is not self.current_link:
number_of_agents_within_radius += sum(
link.slot_occupations[-length_to_be_scanned_on_previous_links:])
if self.position + radius >= self.current_link.length:
length_to_be_scanned_on_next_links = abs(self.position + radius - self.current_link.length)
# Get the next links
if self.current_link.end_node.outgoing_links:
for link in self.current_link.end_node.outgoing_links:
number_of_agents_within_radius += sum(link.slot_occupations[:length_to_be_scanned_on_next_links])
# Also consider the links that are incoming in the end node
if self.current_link.end_node.incoming_links:
for link in self.current_link.end_node.incoming_links:
if link is not self.current_link:
number_of_agents_within_radius += sum(link.slot_occupations[-length_to_be_scanned_on_next_links:])
# Finally, also check the agents on the current link
if self.position - radius >= 0 and self.position + radius < self.current_link.length: # This means the radius is fully contained on the current link
number_of_agents_within_radius += sum(self.current_link.slot_occupations[self.position-radius:self.position+radius]) - 1
elif self.position - radius >= 0 and self.position + radius >= self.current_link.length: # This means that the radius overlaps the end node of the current link
number_of_agents_within_radius += sum(self.current_link.slot_occupations[self.position - radius:]) -1
elif self.position - radius < 0 and self.position + radius <= self.current_link.length: # This means that the radius overlaps the start node of the current link
number_of_agents_within_radius += sum(self.current_link.slot_occupations[:self.position+radius]) - 1
else:
# This is the case when the radius overlaps both start and end node of the current link (radius covers the full link)
number_of_agents_within_radius += sum(self.current_link.slot_occupations) - 1
# In case an agent is new, and it has not moved yet, and it is on an empty link, the link will have no slot occupations. Risk can be set to 0 in this case
if self.current_link.slot_occupations[self.position] == False:
number_of_agents_within_radius = 0
self.current_risk = number_of_agents_within_radius
self.sum_of_risk += self.current_risk
if self.current_risk > self.max_risk:
self.max_risk = self.current_risk
log.debug('Agent ' + str(self.identifier) + ' risk ' + str(self.current_risk))
def get_progress(self):
return self.position / self.current_link.get_number_of_slots()
def get_3d_coordinate(self):
if self.delivery_order.is_completed():
# If the delivery order is complete, the agent has no position anymore
# In case an agent has a trivial route (start node = end node), the delivery order is automatically completed.
# Battery model initialization makes use of the agent's position, therefore we return the arbitrary 0,0,0 here.
return 0, 0, 0
x = self.current_link.start_node.x + self.get_progress() * (
self.current_link.end_node.x - self.current_link.start_node.x)
y = self.current_link.start_node.y + self.get_progress() * (
self.current_link.end_node.y - self.current_link.start_node.y)
z = self.current_link.start_node.z + self.get_progress() * (
self.current_link.end_node.z - self.current_link.start_node.z)
return x, y, z