11#!/usr/bin/env python
22import os
33import time
4+ import logging
45
56import urllib3
67from concurrent .futures import ThreadPoolExecutor
78from threading import Lock , Thread
89from etcd import Lock as EtcdLock
910from kombu import Exchange , Queue , Connection as QConnection
1011from kombu .pools import producers
11- from kombu .log import get_logger
1212from kombu .mixins import ConsumerMixin
13- from kombu .utils .debug import setup_logging
1413from pymongo import MongoClient
1514from urllib3 .exceptions import InsecureRequestWarning
1615import clickhouse_connect
3635 queue_arguments = ARGUMENTS
3736)
3837
39- LOG = get_logger (__name__ )
38+ LOG = logging . getLogger (__name__ )
4039ENVIRONMENT_CLOUD_TYPE = 'environment'
4140HEARTBEAT_INTERVAL = 300
4241DEFAULT_MAX_WORKERS = 4
4342
4443
4544class DIWorker (ConsumerMixin ):
46- def __init__ (self , connection , config_cl ):
45+ def __init__ (self , connection , rabbitmq_conn_str , diworker_settings ,
46+ config_params ):
4747 self .connection = connection
48- self .config_cl = config_cl
49- self ._rest_cl = None
50- self ._mongo_cl = None
51- self ._clickhouse_cl = None
48+ self .rabbitmq_conn_str = rabbitmq_conn_str
49+ self .diworker_settings = diworker_settings
50+ self .config_cl_params = config_params
5251 self .active_report_import_ids = set ()
5352 self .active_reports_lock = Lock ()
5453 self .running = True
55- self .thread = Thread (target = self .heartbeat )
54+ self .thread = Thread (
55+ target = self .heartbeat , args = (self .config_cl_params ,))
5656 self .thread .start ()
5757 self .executor = ThreadPoolExecutor (
5858 max_workers = int (
59- self .config_cl . diworker_settings () .get (
59+ self .diworker_settings .get (
6060 'max_report_imports_workers' ,
6161 DEFAULT_MAX_WORKERS
6262 )
6363 )
6464 )
6565
66- def heartbeat (self ):
66+ def heartbeat (self , config_params ):
67+ config_cl = self .get_config_cl (config_params )
68+ rest_cl = self .get_rest_cl (config_cl )
6769 while self .running :
6870 with self .active_reports_lock :
6971 report_import_ids = list (self .active_report_import_ids )
7072 for report_import_id in report_import_ids :
7173 try :
72- self . rest_cl .report_import_update (report_import_id , {})
74+ rest_cl .report_import_update (report_import_id , {})
7375 except Exception as e :
7476 LOG .warning ("Heartbeat update failed for %s: %s" , report_import_id , e )
7577 time .sleep (HEARTBEAT_INTERVAL )
78+ rest_cl .close ()
7679
77- @property
78- def rest_cl (self ):
79- if self ._rest_cl is None :
80- self ._rest_cl = RestClient (
81- url = self .config_cl .restapi_url (), verify = False )
82- self ._rest_cl .secret = self .config_cl .cluster_secret ()
83- return self ._rest_cl
80+ @staticmethod
81+ def get_config_cl (config_params ):
82+ return ConfigClient (** config_params )
8483
85- @property
86- def mongo_cl (self ):
87- if self ._mongo_cl is None :
88- mongo_params = self .config_cl .mongo_params ()
89- self ._mongo_cl = MongoClient (mongo_params [0 ])
90- return self ._mongo_cl
84+ @staticmethod
85+ def get_rest_cl (config_cl ):
86+ url = config_cl .restapi_url ()
87+ secret = config_cl .cluster_secret ()
88+ return RestClient (url = url , verify = False , secret = secret )
9189
92- @property
93- def clickhouse_cl (self ):
94- if not self ._clickhouse_cl :
95- user , password , host , db_name , port , secure = (
96- self .config_cl .clickhouse_params ())
97- self ._clickhouse_cl = clickhouse_connect .get_client (
98- host = host , password = password , database = db_name , user = user ,
99- port = port , secure = secure )
100- return self ._clickhouse_cl
90+ @staticmethod
91+ def get_mongo_cl (config_cl ):
92+ mongo_params = config_cl .mongo_params ()
93+ return MongoClient (mongo_params [0 ])
94+
95+ @staticmethod
96+ def get_clickhouse_cl (config_cl ):
97+ user , password , host , db_name , port , secure = (
98+ config_cl .clickhouse_params ())
99+ return clickhouse_connect .get_client (
100+ host = host , password = password , database = db_name , user = user ,
101+ port = port , secure = secure )
101102
102103 def publish_activities_task (self , organization_id , object_id , object_type ,
103104 action , routing_key , meta = None ):
@@ -108,8 +109,7 @@ def publish_activities_task(self, organization_id, object_id, object_type,
108109 'action' : action ,
109110 'meta' : meta
110111 }
111- queue_conn = QConnection ('amqp://{user}:{pass}@{host}:{port}' .format (
112- ** self .config_cl .read_branch ('/rabbit' )))
112+ queue_conn = QConnection (self .rabbitmq_conn_str )
113113 task_exchange = Exchange (ACTIVITIES_EXCHANGE_NAME , type = 'topic' )
114114 with producers [queue_conn ].acquire (block = True ) as producer :
115115 producer .publish (
@@ -127,60 +127,59 @@ def get_consumers(self, Consumer, channel):
127127 accept = ['json' ],
128128 callbacks = [self .process_task ],
129129 prefetch_count = int (
130- self .config_cl . diworker_settings () .get (
130+ self .diworker_settings .get (
131131 'max_report_imports_workers' ,
132132 DEFAULT_MAX_WORKERS
133133 )
134134 ),
135135 )]
136136
137- def report_import (self , task ):
137+ def report_import (self , task , config_cl , rest_cl , mongo_cl , clickhouse_cl ):
138138 report_import_id = task .get ('report_import_id' )
139139 if not report_import_id :
140140 raise Exception ('invalid task received: {}' .format (task ))
141141
142142 with self .active_reports_lock :
143143 self .active_report_import_ids .add (report_import_id )
144144
145- _ , import_dict = self . rest_cl .report_import_get (report_import_id )
145+ _ , import_dict = rest_cl .report_import_get (report_import_id )
146146 cloud_acc_id = import_dict .get ('cloud_account_id' )
147- _ , resp = self . rest_cl .report_import_list (cloud_acc_id , show_active = True )
147+ _ , resp = rest_cl .report_import_list (cloud_acc_id , show_active = True )
148148 imports = list (filter (
149149 lambda x : x ['id' ] != report_import_id , resp ['report_imports' ]))
150150 if imports :
151151 reason = 'Import cancelled due another import: %s' % imports [0 ]['id' ]
152- self . rest_cl .report_import_update (
152+ rest_cl .report_import_update (
153153 report_import_id ,
154154 {'state' : 'failed' , 'state_reason' : reason }
155155 )
156156 return
157157 is_recalculation = import_dict .get ('is_recalculation' , False )
158158 LOG .info ('Starting processing for task: %s, purpose %s' ,
159159 task , 'recalculation ' if is_recalculation else 'import' )
160- self .rest_cl .report_import_update (report_import_id ,
161- {'state' : 'in_progress' })
160+ rest_cl .report_import_update (report_import_id , {'state' : 'in_progress' })
162161
163162 importer_params = {
164163 'cloud_account_id' : cloud_acc_id ,
165- 'rest_cl' : self . rest_cl ,
166- 'config_cl' : self . config_cl ,
167- 'mongo_raw' : self . mongo_cl .restapi ['raw_expenses' ],
168- 'mongo_resources' : self . mongo_cl .restapi ['resources' ],
169- 'clickhouse_cl' : self . clickhouse_cl ,
164+ 'rest_cl' : rest_cl ,
165+ 'config_cl' : config_cl ,
166+ 'mongo_raw' : mongo_cl .restapi ['raw_expenses' ],
167+ 'mongo_resources' : mongo_cl .restapi ['resources' ],
168+ 'clickhouse_cl' : clickhouse_cl ,
170169 'import_file' : import_dict .get ('import_file' ),
171170 'recalculate' : is_recalculation }
172171 importer = None
173172 ca = None
174173 previous_attempt_ts = 0
175174 try :
176- _ , ca = self . rest_cl .cloud_account_get (
175+ _ , ca = rest_cl .cloud_account_get (
177176 importer_params .get ('cloud_account_id' ))
178177 organization_id = ca .get ('organization_id' )
179- _ , org = self . rest_cl .organization_get (organization_id )
178+ _ , org = rest_cl .organization_get (organization_id )
180179 if org .get ('disabled' ):
181180 reason = ('Import cancelled due to disabled '
182181 'organization: %s' ) % report_import_id
183- self . rest_cl .report_import_update (
182+ rest_cl .report_import_update (
184183 report_import_id ,
185184 {'state' : 'failed' , 'state_reason' : reason }
186185 )
@@ -192,11 +191,11 @@ def report_import(self, task):
192191 importer = get_importer_class (cc_type , export_scheme )(
193192 ** importer_params )
194193 importer .import_report ()
195- self . rest_cl .report_import_update (
194+ rest_cl .report_import_update (
196195 report_import_id , {'state' : 'completed' })
197196 if start_last_import_ts == 0 and cc_type != ENVIRONMENT_CLOUD_TYPE :
198197 all_reports_finished = True
199- _ , resp = self . rest_cl .cloud_account_list (organization_id )
198+ _ , resp = rest_cl .cloud_account_list (organization_id )
200199 for acc in resp ['cloud_accounts' ]:
201200 if (acc ['type' ] != ENVIRONMENT_CLOUD_TYPE and
202201 acc ['last_import_at' ] == 0 ):
@@ -212,7 +211,7 @@ def report_import(self, task):
212211 # pylint: disable=E1101
213212 LOG .error ('Mongo exception details: %s' , exc .details )
214213 reason = str (exc )
215- self . rest_cl .report_import_update (
214+ rest_cl .report_import_update (
216215 report_import_id ,
217216 {'state' : 'failed' , 'state_reason' : reason }
218217 )
@@ -245,24 +244,36 @@ def process_task(self, body, message):
245244 self .executor .submit (self ._process_task , body , message )
246245
247246 def _process_task (self , body , message ):
247+ config_cl = self .get_config_cl (self .config_cl_params )
248+ rest_cl = self .get_rest_cl (config_cl )
249+ mongo_cl = self .get_mongo_cl (config_cl )
250+ clickhouse_cl = self .get_clickhouse_cl (config_cl )
248251 try :
249- self .report_import (body )
252+ self .report_import (body , config_cl = config_cl , rest_cl = rest_cl ,
253+ mongo_cl = mongo_cl , clickhouse_cl = clickhouse_cl )
250254 except Exception as exc :
251255 LOG .exception ('Data import failed: %s' , str (exc ))
252256 finally :
257+ mongo_cl .close ()
258+ clickhouse_cl .close ()
259+ rest_cl .close ()
253260 with self .active_reports_lock :
254261 self .active_report_import_ids .discard (body .get ('report_import_id' ))
255262 message .ack ()
256263
257264
258265if __name__ == '__main__' :
259266 urllib3 .disable_warnings (InsecureRequestWarning )
260- setup_logging (loglevel = 'INFO' , loggers = ['' ])
261-
262- config_cl = ConfigClient (
263- host = os .environ .get ('HX_ETCD_HOST' ),
264- port = int (os .environ .get ('HX_ETCD_PORT' )),
267+ logging .basicConfig (
268+ level = logging .INFO ,
269+ format = '[%(threadName)s] %(levelname)s: %(message)s'
265270 )
271+
272+ config_cl_params = {
273+ 'host' : os .environ .get ('HX_ETCD_HOST' ),
274+ 'port' : int (os .environ .get ('HX_ETCD_PORT' ))
275+ }
276+ config_cl = ConfigClient (** config_cl_params )
266277 config_cl .wait_configured ()
267278 migrator = Migrator (config_cl , 'restapi' , 'diworker/diworker/migrations' )
268279 # Use lock to avoid migration problems with several diworkers
@@ -272,9 +283,10 @@ def _process_task(self, body, message):
272283 LOG .info ("starting worker" )
273284 conn_str = 'amqp://{user}:{pass}@{host}:{port}' .format (
274285 ** config_cl .read_branch ('/rabbit' ))
286+ dw_settings = config_cl .diworker_settings ()
275287 with QConnection (conn_str ) as conn :
276288 try :
277- worker = DIWorker (conn , config_cl )
289+ worker = DIWorker (conn , conn_str , dw_settings , config_cl_params )
278290 worker .run ()
279291 except KeyboardInterrupt :
280292 worker .running = False
0 commit comments