-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
95 lines (76 loc) · 2.74 KB
/
Copy pathmain.py
File metadata and controls
95 lines (76 loc) · 2.74 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# main.py
import asyncio
import signal
from aiogram import Bot, Dispatcher
from aiogram.client.default import DefaultBotProperties
from aiogram.enums import ParseMode
from app.bot.handlers import start as start_pkg, callbacks as cb_pkg, lang_cmd as lang_pkg
from app.middlewares.db_middleware import DBSessionMiddleware
from app.middlewares.middleware import ChatLoggerMiddleware
from app.middlewares.request_id_middleware import RequestIDMiddleware
from app.core.config import conf
from app.core.logger import get_logger
from app.db.session import init_db, dispose_db
from app.utils.redis_manager import RedisManager
logger = get_logger()
async def create_bot_and_dp():
bot = Bot(token=conf.bot.token, default=DefaultBotProperties(parse_mode=ParseMode.HTML))
dp = Dispatcher()
dp.update.outer_middleware(RequestIDMiddleware())
dp.update.outer_middleware(DBSessionMiddleware())
dp.update.outer_middleware(ChatLoggerMiddleware(logger=logger))
dp.include_router(start_pkg.router)
dp.include_router(cb_pkg.router)
dp.include_router(lang_pkg.router)
return bot, dp
async def startup(bot, dp):
await RedisManager.init()
await init_db()
logger.info("startup finished")
async def shutdown(bot, dp):
try:
await dp.storage.close()
except Exception:
logger.exception("closing storage failed")
try:
await bot.session.close()
except Exception:
logger.exception("closing bot session failed")
await RedisManager.close()
await dispose_db()
logger.info("shutdown finished")
async def run_polling():
bot, dp = await create_bot_and_dp()
await startup(bot, dp)
loop = asyncio.get_running_loop()
stop_event = asyncio.Event()
def _on_sig():
logger.info("signal received")
stop_event.set()
for s in (signal.SIGINT, signal.SIGTERM):
try:
loop.add_signal_handler(s, _on_sig)
except NotImplementedError:
pass
await bot.delete_webhook(drop_pending_updates=True)
polling_task = asyncio.create_task(dp.start_polling(bot, allowed_updates=dp.resolve_used_update_types()))
stop_task = asyncio.create_task(stop_event.wait())
done, pending = await asyncio.wait([polling_task, stop_task], return_when=asyncio.FIRST_COMPLETED)
if not polling_task.done():
try:
await dp.stop_polling()
except Exception:
logger.exception("stop polling failed")
for t in (polling_task, stop_task):
if not t.done():
t.cancel()
try:
await t
except asyncio.CancelledError:
pass
await shutdown(bot, dp)
if __name__ == "__main__":
try:
asyncio.run(run_polling())
except KeyboardInterrupt:
pass