Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 36 additions & 40 deletions src/phlower/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

from .config import Config
from .events import CeleryEventConsumer
from .sqlite_store import SQLiteStore
from .sse import SSEBroadcaster
from .store import Store

Expand Down Expand Up @@ -165,61 +164,55 @@ async def _background_recovery(store: Store, sqlite_store, config: Config) -> No
logger.exception("Background recovery failed")


async def _purge_in_batches(loop, batch_fn, cutoff_ts: float) -> int:
"""Repeat a batch purge until empty, yielding between batches.

Each batch acquires the SQLite lock independently, so concurrent flushes
can interleave. Without this, a multi-million-row purge holds the lock
for tens of minutes and starves the flush loop until liveness probes fail.
"""
total = 0
while True:
deleted = await loop.run_in_executor(None, batch_fn, cutoff_ts)
total += deleted
if deleted < SQLiteStore.PURGE_BATCH_SIZE:
break
await asyncio.sleep(0.05)
return total


async def _sqlite_purge_loop(
store: Store, sqlite_store, config: Config, consumer=None
) -> None:
"""Purge detail rows after SQLITE_DETAIL_HOURS, core rows after SQLITE_INVOCATION_RETENTION_HOURS."""
"""Drop expired daily partitions; ensure tomorrow's partition exists.

With per-day partitioned tables, retention enforcement is a series of
DROP TABLE statements — metadata operations that take milliseconds.
The previous DELETE-based purge held the SQLite write lock long enough
to starve the flush loop and OOM the pod under load spikes.
"""
while True:
await asyncio.sleep(3600)
loop = asyncio.get_running_loop()
now = time.time()

# Disk pressure: if usage exceeds cap, halve the retention window
# repeatedly until it fits or hits a 1-hour floor.
# Make sure today's and tomorrow's partitions exist proactively, so
# midnight-UTC rollover doesn't race with the first flush of the day.
from .sqlite_store import _suffix_for_ts
for ts in (now, now + 86400):
await loop.run_in_executor(
None, sqlite_store.ensure_partition, _suffix_for_ts(ts)
)

retention_hours = config.sqlite_invocation_retention_hours
detail_hours = config.sqlite_detail_hours

# Disk pressure: shrink retention until it fits, halving each pass.
disk_pct = await loop.run_in_executor(None, sqlite_store.disk_usage_pct)
if disk_pct > config.sqlite_disk_usage_pct_cap:
while retention_hours > 1 and disk_pct > config.sqlite_disk_usage_pct_cap:
retention_hours = max(1, retention_hours // 2)
detail_hours = max(1, detail_hours // 2)
logger.warning(
"Disk %.0f%% > %d%% cap — emergency purge with %dh retention, %dh details",
disk_pct, config.sqlite_disk_usage_pct_cap, retention_hours, detail_hours,
"Disk %.0f%% > %d%% cap — emergency purge with %dh retention",
disk_pct, config.sqlite_disk_usage_pct_cap, retention_hours,
)
dropped = await loop.run_in_executor(
None, sqlite_store.purge_old_partitions, retention_hours
)
purge_cutoff = now - retention_hours * 3600
await _purge_in_batches(loop, sqlite_store.purge_expired_batch, purge_cutoff)
detail_cutoff = now - detail_hours * 3600
await _purge_in_batches(loop, sqlite_store.purge_details_batch, detail_cutoff)
if dropped:
logger.info("Emergency purge: dropped %d partitions", dropped)
disk_pct = await loop.run_in_executor(None, sqlite_store.disk_usage_pct)
else:
# Normal purge: details first (short retention), then core rows
detail_cutoff = now - detail_hours * 3600
purged_details = await _purge_in_batches(loop, sqlite_store.purge_details_batch, detail_cutoff)
if purged_details:
logger.info("SQLite purge: deleted %d detail rows (>%dh)", purged_details, detail_hours)

purge_cutoff = now - retention_hours * 3600
deleted = await _purge_in_batches(loop, sqlite_store.purge_expired_batch, purge_cutoff)
if deleted:
logger.info("SQLite purge: deleted %d expired rows (>%dh)", deleted, retention_hours)
dropped = await loop.run_in_executor(
None, sqlite_store.purge_old_partitions, retention_hours
)
if dropped:
logger.info(
"SQLite purge: dropped %d partitions (>%dh)",
dropped, retention_hours,
)

# Remove snapshots for tasks no longer tracked
active = set(store.tasks.keys())
Expand All @@ -238,10 +231,13 @@ async def _sqlite_purge_loop(
await loop.run_in_executor(None, sqlite_store.refresh_cached_stats)
size_mb = await loop.run_in_executor(None, sqlite_store.db_size_mb)
wal_mb = await loop.run_in_executor(None, sqlite_store.wal_size_mb)
dropped_invocations = store.snapshot_dropped_invocations()
logger.info(
"SQLite: %.1f MB, %d rows (%d detail), WAL: %.1f MB, disk: %.0f%%",
"SQLite: %.1f MB, %d rows (%d detail), WAL: %.1f MB, disk: %.0f%%, "
"dropped-invocations: %d",
size_mb, sqlite_store._cached_row_count,
sqlite_store._cached_detail_row_count, wal_mb, disk_pct,
dropped_invocations,
)


Expand Down
5 changes: 5 additions & 0 deletions src/phlower/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,8 @@ class Config:
detail_rate_threshold: int = field(
default_factory=lambda: int(os.environ.get("DETAIL_RATE_THRESHOLD", "500"))
)
sqlite_pending_buffer_cap: int = field(
default_factory=lambda: int(
os.environ.get("SQLITE_PENDING_BUFFER_CAP", "200000")
)
)
Loading
Loading