Emit job usage data through IBM Cloud Event Streams instances#2209
Emit job usage data through IBM Cloud Event Streams instances#2209ismaelRozasRamallal wants to merge 15 commits into
Conversation
Records the timestamp when a Fleets job transitions to RUNNING status, enabling computation of job duration for IBM Cloud Event Streams usage events. Generated with AI Co-Authored-By: AI <ai@example.com>
Adds a Kafka producer client that publishes CloudEvents 1.0 usage events (job_started, job_in_progress, job_ended) to IBM Cloud Event Streams for Fleets jobs, configured via environment variables. Generated with AI Co-Authored-By: AI <ai@example.com>
Wires IBMEventStreamsClient into UpdateFleetsJobsStatuses: emit_job_started before PENDING→RUNNING DB write, emit_job_ended before terminal DB write, and emit_job_in_progress for already-RUNNING jobs each scheduler tick. All emit calls are wrapped in try/except so broker failures never block DB updates. Generated with AI Co-Authored-By: AI <ai@example.com>
Extracts IBMEventStreamsClient from gateway/core/ibm_cloud/clients.py into its own gateway/core/ibm_cloud/event_streams/event_streams_client.py module, following the same pattern as the cos/ and code_engine/ submodules. Updates all imports and moves the corresponding tests to a matching test subpackage.
Generated with AI Co-Authored-By: AI <ai@example.com>
…tion Generated with AI Co-Authored-By: AI <ai@example.com>
Generated with AI Co-Authored-By: AI <ai@example.com>
Generated with AI Co-Authored-By: AI <ai@example.com>
There was a problem hiding this comment.
Fleets will also need the compute profile (actually the allocated resources, number of CPU, RAM and GPUs) to calculate the cost. Even though this might be out of scope for this PR, it would be good to start thinking about how this information will be added later...
Generated with AI Co-Authored-By: AI <ai@example.com>
Generated with AI Co-Authored-By: AI <ai@example.com>
Generated with AI Co-Authored-By: AI <ai@example.com>
Thanks for the comment Marcelo!! you're right, there is a potential metric there which can be good to use. The compute profile can be also took as metric for complexity, but we need to figure out if we can use since it's easy the "cheat" (since the profile is decided when adding a function, we need to avoid somebody deciding to pick up the highest compute profile to force users paying more on their function execution) |
Summary
Fleets jobs now report their classical compute usage to downstream consumers in real time. From the moment a job starts running until it completes, consumers receive a continuous stream of usage events they can use to track how long the job has been executing and react when it finishes. This enables billing, monitoring, and any other business logic that depends on knowing a job's runtime to operate without polling the API.
One key point here is, Ray-based jobs are not affected by this change. Event publishing is scoped exclusively to Fleets (Code Engine) jobs.
🔨 related with #https://github.ibm.com/IBM-Q-Software/qiskit-serverless/issues/1507
Details and comments
RUNNING, a job_started event is published; subsequent scheduler iterations emit job_in_progress events with the elapsed classical time in nanoseconds; and when the job reaches a terminal state, a job_ended event is published.quantum.{ENVIRONMENT}.function-usage.v1. Publishing is gated by anEVENT_STREAMS_ENABLEDfeature flag — when disabled, a no-op client logs at DEBUG level instead.running_started_atfield. We need to know when the job starts to run in order to keep it as base for the calculated delta we will be publishing as usage. As a consequence, each job will sabe a timestamp on when it changed to running statusMore context
Still missing other events to be published. This PR just covers the Kafka integration and the very basic publishing, let's take it as reference for next iterations.
It also misses adding an instance in docker-compose to make acceptance tests. Didn't want to add it at first to avoid make the PR even bigger.