-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtask_queue.py
More file actions
124 lines (106 loc) · 4.85 KB
/
task_queue.py
File metadata and controls
124 lines (106 loc) · 4.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# -*- coding: utf-8 -*-
"""
task_queue.py — Очередь «тяжёлых» задач.
Логика:
• На весь процесс есть один Semaphore с ёмкостью MAX_CONCURRENT_TASKS
(по умолчанию 2).
• Любая «тяжёлая» задача (массовый залив / партия регистраций /
LDV-цикл партией / XO-партия) кладётся через TaskQueue.submit().
• Если все слоты заняты — пользователь получает сообщение
«⏳ В очереди (позиция: N) — <title>». Когда подошла очередь и
задача стартует — «▶️ Задача запущена: <title>».
• Очередь FIFO, считается позиция от количества ожидающих + бегущих.
"""
import asyncio
import logging
from typing import Awaitable, Callable, Optional, List, Dict, Any
from config import MAX_CONCURRENT_TASKS
log = logging.getLogger("task_queue")
# Тип callback: notify(owner_id: int, text: str) -> Awaitable[None]
NotifyFn = Callable[[int, str], Awaitable[None]]
class TaskQueue:
"""
submit(coro_factory, owner_id, notify, title)
coro_factory — async-функция без аргументов или partial; то, что будет
запущено внутри слота семафора.
owner_id — кому слать уведомления.
notify — async fn(owner_id, text). Если None — молча.
title — короткое имя задачи для уведомлений.
"""
def __init__(self, max_concurrent: int = MAX_CONCURRENT_TASKS) -> None:
self.max_concurrent = max_concurrent
self._sem = asyncio.Semaphore(max_concurrent)
# счётчики
self._running: int = 0
self._waiting: List[Dict[str, Any]] = [] # видимая «очередь»
self._lock = asyncio.Lock()
# -------------- public --------------
@property
def running(self) -> int:
return self._running
@property
def waiting(self) -> int:
return len(self._waiting)
def status(self) -> Dict[str, int]:
return {
"max": self.max_concurrent,
"running": self._running,
"waiting": self.waiting,
}
async def submit(self,
coro_factory: Callable[[], Awaitable[Any]],
owner_id: Optional[int] = None,
notify: Optional[NotifyFn] = None,
title: str = "Задача") -> asyncio.Task:
"""
Положить задачу в очередь и сразу вернуть asyncio.Task,
реальный запуск произойдёт после получения слота семафора.
"""
item = {
"title": title,
"owner_id": owner_id,
"notify": notify,
}
# Если все слоты заняты — сообщить позицию ОЖИДАНИЯ.
async with self._lock:
self._waiting.append(item)
position = len(self._waiting)
slots_busy = self._running >= self.max_concurrent
if slots_busy and notify and owner_id is not None:
try:
await notify(
owner_id,
f"⏳ В очереди (позиция: {position}) — {title}"
)
except Exception:
pass
async def _wrapper():
await self._sem.acquire()
try:
# выйти из «ожидающих»
async with self._lock:
if item in self._waiting:
self._waiting.remove(item)
self._running += 1
# уведомить о старте
if notify and owner_id is not None:
try:
await notify(owner_id, f"▶️ Задача запущена: {title}")
except Exception:
pass
try:
return await coro_factory()
except Exception as e:
log.warning("TaskQueue: %s failed: %s", title, e)
if notify and owner_id is not None:
try:
await notify(owner_id,
f"❌ {title}: ошибка — {e}")
except Exception:
pass
raise
finally:
async with self._lock:
self._running = max(0, self._running - 1)
self._sem.release()
return asyncio.create_task(_wrapper())