Skip to content

@cached does not coalesce concurrent calls when using Semaphore which leads to cache stampede in first batch. #1041

@0x1618

Description

@0x1618

When using @cached with a concurrency limit via asyncio.Semaphore, the first batch of tasks all miss the cache and execute the underlying function in parallel. This results in multiple uncached executions instead of a single in-flight call being shared.

import asyncio
import random
from aiocache import cached, caches

caches.set_config({
    "default": {
        "cache": "aiocache.SimpleMemoryCache",
        "ttl": 300
    }
})

async def with_semaphore(semaphore, coro):
    async with semaphore:
        return await coro

@cached()
async def long_job():
    await asyncio.sleep(2)
    return random.randint(0, 1000)

async def main():
    s = asyncio.Semaphore(4)
    tasks = [with_semaphore(s, long_job()) for _ in range(8)]
    print(await asyncio.gather(*tasks))

asyncio.run(main())

output:

[177, 972, 52, 418, 418, 418, 418, 418]

The first caller (within the semaphore limit) should execute the function, and all other concurrent callers should await the same in-flight result, not trigger additional executions.

fix:

from functools import wraps

def _default_key_builder(func, *args, **kwargs):
    return f"{func.__module__}:{func.__qualname__}:{args}:{tuple(sorted(kwargs.items()))}"


def coalesce(key_builder):
    lock: Optional[asyncio.Lock] = None
    inflight: dict[str, asyncio.Task] = {}

    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            nonlocal lock

            if lock is None:
                lock = asyncio.Lock()

            key: str = key_builder(func, *args, **kwargs)

            async with lock:
                task: Optional[asyncio.Task] = inflight.get(key)

                if task is None:
                    task = asyncio.create_task(func(*args, **kwargs))

                    inflight[key] = task

            try:
                return await task
            finally:
                if task.done():
                    async with lock:
                        if inflight.get(key) is task:
                            inflight.pop(key, None)

        return wrapper

    return decorator


@coalesce(key_builder=_default_key_builder)
@cached(alias="default", key_builder=_default_key_builder)
async def long_job():
    await asyncio.sleep(2)
    return random.randint(0, 1000)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions