1717from tools .cloud_adapter .exceptions import InvalidResourceTypeException
1818from tools .cloud_adapter .model import ResourceTypes , RES_MODEL_MAP
1919from optscale_client .config_client .client import Client as ConfigClient
20+ from optscale_client .insider_client .client import Client as InsiderClient
2021from optscale_client .rest_api_client .client_v2 import Client as RestClient
2122from tools .optscale_time import utcnow , utcnow_timestamp
2223
2324
25+ BYTES_IN_MB = 1024 * 1024
2426CHUNK_SIZE = 200
2527EXCHANGE_NAME = 'resource-discovery'
2628QUEUE_NAME = 'discovery'
3234
3335
3436class ResourcesSaver :
35- def __init__ (self , rest_cl , limit , timeout , pause_timeout ):
37+ def __init__ (self , rest_cl , insider_cl , limit , timeout , pause_timeout ):
3638 queue_len = int (limit / CHUNK_SIZE ) if CHUNK_SIZE else 0
3739 self .queue = queue .Queue (queue_len )
40+ self .insider_cl = insider_cl
3841 self .rest_cl = rest_cl
3942 self .timeout = timeout
4043 self .pause_timeout = pause_timeout
4144 self .recording_available = Event ()
4245 self .empty = Event ()
4346 self ._proc = None
47+ self ._cloud_account_id = None
48+ self ._cloud_type = None
49+ self ._resource_type = None
4450 self .start ()
4551
4652 def __del__ (self ):
@@ -95,9 +101,9 @@ def is_finished(self):
95101 def _save_chunks (self ):
96102 while True :
97103 try :
98- chunk , resource_type , cloud_acc_id = self .queue .get (timeout = 1 )
104+ chunk = self .queue .get (timeout = 1 )
99105 self .empty .clear ()
100- self .save_bulk_resources (chunk , resource_type , cloud_acc_id )
106+ self .save_bulk_resources (chunk )
101107 except queue .Empty :
102108 self .empty .set ()
103109 except Exception as exc :
@@ -110,26 +116,72 @@ def get_resource_type_model(resource_type):
110116 except KeyError :
111117 raise Exception (f'Invalid resource type { resource_type } ' )
112118
113- def build_payload (self , resource , resource_type ):
119+ def build_payload (self , resource ):
114120 obj = {}
115121 for field in resource .fields (meta_fields_incl = False ):
116122 val = getattr (resource , field )
117123 if val is not None and (isinstance (val , bool ) or val ):
118124 obj [field ] = val
119125 obj .pop ('resource_id' , None )
120126 obj .pop ('organization_id' , None )
121- obj ['resource_type' ] = getattr (ResourceTypes , resource_type ).value
127+ obj ['resource_type' ] = getattr (ResourceTypes , self . resource_type ).value
122128 obj ['last_seen' ] = utcnow_timestamp ()
123129 obj ['active' ] = True
124130 return obj
125131
126- def save_bulk_resources (self , resources , resource_type , cloud_acc_id ):
132+ @property
133+ def resource_type (self ):
134+ return self ._resource_type
135+
136+ @resource_type .setter
137+ def resource_type (self , value ):
138+ self ._resource_type = value
139+
140+ @property
141+ def cloud_account_id (self ):
142+ return self ._cloud_account_id
143+
144+ @cloud_account_id .setter
145+ def cloud_account_id (self , value ):
146+ self ._cloud_account_id = value
147+
148+ @property
149+ def cloud_type (self ):
150+ return self ._cloud_type
151+
152+ @cloud_type .setter
153+ def cloud_type (self , value ):
154+ self ._cloud_type = value
155+
156+ def process_resource_obj (self , resources ):
157+ if self .resource_type != 'instance' or self .cloud_type != 'azure_cnr' :
158+ return resources
159+ flavors = {}
160+ for resource in resources :
161+ flavor = flavors .get (resource .flavor )
162+ if not flavor :
163+ try :
164+ _ , flavor = self .insider_cl .find_flavor (
165+ self .cloud_type , self .resource_type , resource .region ,
166+ {'source_flavor_id' : resource .flavor }, 'current' ,
167+ cloud_account_id = self .cloud_account_id )
168+ except Exception as exc :
169+ LOG .exception (exc )
170+ continue
171+ if flavor :
172+ flavors [resource .flavor ] = flavor
173+ resource .cpu_count = flavor ['cpu' ]
174+ resource .ram = flavor ['ram' ] * BYTES_IN_MB
175+ return resources
176+
177+ def save_bulk_resources (self , resources ):
127178 payload = []
179+ resources = self .process_resource_obj (resources )
128180 for rss in resources :
129- payload .append (self .build_payload (rss , resource_type ))
181+ payload .append (self .build_payload (rss ))
130182 if payload :
131183 _ , response = self .rest_cl .cloud_resource_create_bulk (
132- cloud_acc_id , {'resources' : payload },
184+ self . cloud_account_id , {'resources' : payload },
133185 behavior = 'update_existing' , return_resources = True )
134186 for resource in resources :
135187 try :
@@ -143,6 +195,7 @@ def __init__(self, connection, config_cl):
143195 self .connection = connection
144196 self .config_cl = config_cl
145197 self .set_discover_settings ()
198+ self ._insider_cl = None
146199 self ._rest_cl = None
147200 self ._res_saving = None
148201 self .running = True
@@ -153,6 +206,14 @@ def __del__(self):
153206 if self ._res_saving :
154207 self .res_saving .shutdown ()
155208
209+ @property
210+ def insider_cl (self ):
211+ if not self ._insider_cl :
212+ self ._insider_cl = InsiderClient (
213+ url = self .config_cl .insider_url (),
214+ secret = self .config_cl .cluster_secret ())
215+ return self ._insider_cl
216+
156217 @property
157218 def rest_cl (self ):
158219 if self ._rest_cl is None :
@@ -166,6 +227,7 @@ def rest_cl(self):
166227 def res_saving (self ):
167228 if self ._res_saving is None :
168229 self ._res_saving = ResourcesSaver (
230+ insider_cl = self .insider_cl ,
169231 rest_cl = self .rest_cl ,
170232 limit = self .discover_size ,
171233 timeout = self .timeout ,
@@ -256,7 +318,10 @@ def _discover_resources(self, cloud_acc_id, resource_type):
256318 return
257319 config = self .get_config (cloud_acc_id )
258320 gen_list = self .discover (config , resource_type )
259- discovered_resources = []
321+ self .res_saving .cloud_account_id = cloud_acc_id
322+ self .res_saving .cloud_type = config ['type' ]
323+ self .res_saving .resource_type = resource_type
324+ discovered_resources = set ()
260325 resources_count = 0
261326 max_parallel_requests = self .max_parallel_requests (config )
262327 errors = set ()
@@ -279,18 +344,16 @@ def _discover_resources(self, cloud_acc_id, resource_type):
279344 gen_list_chunk .remove (gen )
280345 errors .add (str (res ))
281346 elif res :
282- discovered_resources .append (res )
347+ discovered_resources .add (res )
283348 else :
284349 gen_list_chunk .remove (gen )
285350 if len (discovered_resources ) >= CHUNK_SIZE :
286351 resources_count += len (discovered_resources )
287- self .res_saving .send ((discovered_resources .copy (),
288- resource_type , cloud_acc_id ))
352+ self .res_saving .send (discovered_resources )
289353 discovered_resources .clear ()
290354 if len (discovered_resources ):
291355 resources_count += len (discovered_resources )
292- self .res_saving .send ((
293- discovered_resources .copy (), resource_type , cloud_acc_id ))
356+ self .res_saving .send (discovered_resources )
294357 LOG .info ("%s %s resources have been discovered for cloud %s" ,
295358 resources_count , resource_type , cloud_acc_id )
296359 self .res_saving .pause ()
0 commit comments