Skip to content

Emit job usage data through IBM Cloud Event Streams instances#2209

Open
ismaelRozasRamallal wants to merge 15 commits into
mainfrom
integrate-kafka
Open

Emit job usage data through IBM Cloud Event Streams instances#2209
ismaelRozasRamallal wants to merge 15 commits into
mainfrom
integrate-kafka

Conversation

@ismaelRozasRamallal

@ismaelRozasRamallal ismaelRozasRamallal commented Jun 9, 2026

Copy link
Copy Markdown
Contributor

Summary

Fleets jobs now report their classical compute usage to downstream consumers in real time. From the moment a job starts running until it completes, consumers receive a continuous stream of usage events they can use to track how long the job has been executing and react when it finishes. This enables billing, monitoring, and any other business logic that depends on knowing a job's runtime to operate without polling the API.

One key point here is, Ray-based jobs are not affected by this change. Event publishing is scoped exclusively to Fleets (Code Engine) jobs.

🔨 related with #https://github.ibm.com/IBM-Q-Software/qiskit-serverless/issues/1507

Details and comments

  • Add job usage event publishing to IBM Cloud Event Streams for Fleets (Code Engine) jobs.
  • When a job transitions to RUNNING, a job_started event is published; subsequent scheduler iterations emit job_in_progress events with the elapsed classical time in nanoseconds; and when the job reaches a terminal state, a job_ended event is published.
  • Events follow the CloudEvents 1.0 spec and are published to a topic named quantum.{ENVIRONMENT}.function-usage.v1. Publishing is gated by an EVENT_STREAMS_ENABLED feature flag — when disabled, a no-op client logs at DEBUG level instead.
  • Publish failures block the DB status update and are retried on the next scheduler iteration.
  • ⚠️ Consumers should handle duplicate events keyed on job_id + event_type, particularly for job_ended which can be re-emitted if the scheduler restarts between publish and DB commit. It should be that way anywhere since that's one principle for Event Driven architectures but just in case to take it into account
  • There is a DB migration to add running_started_at field. We need to know when the job starts to run in order to keep it as base for the calculated delta we will be publishing as usage. As a consequence, each job will sabe a timestamp on when it changed to running status

More context

Still missing other events to be published. This PR just covers the Kafka integration and the very basic publishing, let's take it as reference for next iterations.

It also misses adding an instance in docker-compose to make acceptance tests. Didn't want to add it at first to avoid make the PR even bigger.

ismaelRozasRamallal and others added 8 commits June 8, 2026 12:59
Records the timestamp when a Fleets job transitions to RUNNING status,
enabling computation of job duration for IBM Cloud Event Streams usage events.

Generated with AI

Co-Authored-By: AI <ai@example.com>
Adds a Kafka producer client that publishes CloudEvents 1.0 usage events
(job_started, job_in_progress, job_ended) to IBM Cloud Event Streams for
Fleets jobs, configured via environment variables.

Generated with AI

Co-Authored-By: AI <ai@example.com>
Wires IBMEventStreamsClient into UpdateFleetsJobsStatuses: emit_job_started
before PENDING→RUNNING DB write, emit_job_ended before terminal DB write,
and emit_job_in_progress for already-RUNNING jobs each scheduler tick.
All emit calls are wrapped in try/except so broker failures never block DB updates.

Generated with AI

Co-Authored-By: AI <ai@example.com>
Extracts IBMEventStreamsClient from gateway/core/ibm_cloud/clients.py into
its own gateway/core/ibm_cloud/event_streams/event_streams_client.py module,
following the same pattern as the cos/ and code_engine/ submodules. Updates
all imports and moves the corresponding tests to a matching test subpackage.
Generated with AI

Co-Authored-By: AI <ai@example.com>
…tion

Generated with AI

Co-Authored-By: AI <ai@example.com>
Generated with AI

Co-Authored-By: AI <ai@example.com>
Generated with AI

Co-Authored-By: AI <ai@example.com>
@ismaelRozasRamallal ismaelRozasRamallal requested a review from a team as a code owner June 9, 2026 15:33
@ismaelRozasRamallal ismaelRozasRamallal changed the title Integrate kafka Emit job usage data through IBM Cloud Event Streams instances Jun 9, 2026

@marceloamaral marceloamaral left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fleets will also need the compute profile (actually the allocated resources, number of CPU, RAM and GPUs) to calculate the cost. Even though this might be out of scope for this PR, it would be good to start thinking about how this information will be added later...

ismaelRozasRamallal and others added 5 commits June 10, 2026 09:37
Generated with AI

Co-Authored-By: AI <ai@example.com>
Generated with AI

Co-Authored-By: AI <ai@example.com>
Generated with AI

Co-Authored-By: AI <ai@example.com>
@ismaelRozasRamallal

Copy link
Copy Markdown
Contributor Author

Fleets will also need the compute profile (actually the allocated resources, number of CPU, RAM and GPUs) to calculate the cost. Even though this might be out of scope for this PR, it would be good to start thinking about how this information will be added later...

Thanks for the comment Marcelo!! you're right, there is a potential metric there which can be good to use. The compute profile can be also took as metric for complexity, but we need to figure out if we can use since it's easy the "cheat" (since the profile is decided when adding a function, we need to avoid somebody deciding to pick up the highest compute profile to force users paying more on their function execution)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants