-
-
Notifications
You must be signed in to change notification settings - Fork 972
Description
Celery with the GCP Pub/Sub transport cannot be tested locally using the Pub/Sub emulator, as the _size() method in the GCP Pub/Sub transport fails, because it attempts to call the Google Cloud Monitoring API, which is not supported by the emulator.
Current Behavior
When running Celery with the gcpubsub transport using the Pub/Sub emulator (local development/testing), the _size() method attempts to query the Google Cloud Monitoring API to get queue message counts. The emulator does not support the Monitoriing API, causing the method to fail or return incorrect values.
# kombu/transport/gcpubsub.py
def _size(self, queue: str) -> int:
"""Return the number of messages in a queue."""
queue = self.entity_name(queue)
if queue not in self._queue_cache:
return 0
qdesc = self._queue_cache[queue]
# This fails when using the emulator
result = query.Query(
self.monitor,
self.project_id,
'pubsub.googleapis.com/subscription/num_undelivered_messages',
end_time=datetime.datetime.now(),
minutes=1,
).select_resources(subscription_id=qdesc.subscription_id)
# ...Expected Behavior
When the PUBSUB_EMULATOR_HOST environment variable is set, the _size() method should skip the monitoring API call and either:
- Return
-1to indicate the size is unknown (consistent with permission denied behavior), or - Use an alternative method to get the approximate message count if available
Steps to Reproduce
-
Start the Pub/Sub emulator:
gcloud beta emulators pubsub start --project=test-project
-
Set the emulator environment variable:
export PUBSUB_EMULATOR_HOST=localhost:8085 -
Run Celery worker with gcpubsub transport:
app = Celery(__name__) app.conf.update( broker_url='gcpubsub://projects/test-project', task_default_queue='test-queue', )
-
The worker fails incorrectly due to monitoring API errors and expects Google Credentials.
Environment
- Kombu Version: 5.6.0
- Python Version: 3.11+
- GCP Pub/Sub Emulator: Latest
- Celery Version: 5.5.3+