Skip to content

Conversation

@saschwartz
Copy link
Contributor

@saschwartz saschwartz commented Dec 1, 2025

Overview

This merge request attempts to address part of #18753, by making the TaskRunRecorder service more efficiently insert tasks into the database by means of bulk insertion.

Current Behaviour

Currently, tasks are inserted one-by-one into the database by the service. This is very slow and can cause the messaging layer to become backed up. The throughput in a real-world setting with 5x replicas of TaskRunRecorder was <10 tasks inserted into database per second.

Proposed Behaviour

Tasks are inserted accordingly to a parametrizable batch size and flush interval (this mirrors the configuration parameters for the EventPersisterService

Dependencies

The PR depends on a few things being merged first:

Testing

Unit tests have been added covering the new codepath. Additionally, this has been stress-tested in an OSS HA setup and with 5x replicas of the TaskRunRecorder, about 5K per second insertions are achieved with a HA postgres instance.

Checklist

  • This pull request references any related issue by including "closes <link to issue>"
    • If no issue exists and your change is not a small fix, please create an issue first.
  • If this pull request adds new functionality, it includes unit tests that cover the changes
  • If this pull request removes docs files, it includes redirect settings in mint.json.
  • If this pull request adds functions or classes, it includes helpful docstrings.

@saschwartz saschwartz force-pushed the schwarts/task-run-bulk-insert branch from 5cb526a to 43d9d94 Compare December 1, 2025 17:29
@saschwartz saschwartz marked this pull request as draft December 1, 2025 17:29
@codspeed-hq
Copy link

codspeed-hq bot commented Dec 1, 2025

CodSpeed Performance Report

Merging #19586 will not alter performance

Comparing saschwartz:schwarts/task-run-bulk-insert (1a8d6e6) with main (c5d255d)

Summary

✅ 2 untouched

@saschwartz saschwartz force-pushed the schwarts/task-run-bulk-insert branch from 43d9d94 to 6d534e4 Compare December 1, 2025 18:58
@saschwartz saschwartz force-pushed the schwarts/task-run-bulk-insert branch 5 times, most recently from 141f126 to d08f928 Compare December 1, 2025 20:47
@saschwartz saschwartz marked this pull request as ready for review December 1, 2025 20:48
@saschwartz saschwartz changed the title Draft: Provide ability to bulk insert tasks in TaskRunRecorder Provide ability to bulk insert tasks in TaskRunRecorder Dec 1, 2025
@saschwartz saschwartz force-pushed the schwarts/task-run-bulk-insert branch 7 times, most recently from 01cd336 to eecbf78 Compare December 2, 2025 18:56
@saschwartz
Copy link
Contributor Author

I changed the approach slightly to insert by task state as ordered in the StateType enumeration, so we don't coalesce data directly.

@saschwartz saschwartz force-pushed the schwarts/task-run-bulk-insert branch from 98c61c6 to e614f33 Compare December 3, 2025 17:25
Copy link
Member

@desertaxle desertaxle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, this is an awesome PR! I have a couple of concerns about the bulk task run insertion, and I left some suggestions for you to consider.

Comment on lines 201 to 202
# Should be the same for all in the batch
update_cols = batch[0]["task_run_dict"].keys()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this might be a somewhat dangerous assumption since there could be variation in the event payload between versions, and those could be in the same batch if they have the same state type.

Could you make this more explicit by batching by the key signature itself? I'm thinking something like this:

batches_by_keys: dict[frozenset[str], list] = {}
for tr in all_task_runs:
    key_signature = frozenset(tr["task_run_dict"].keys())
    batches_by_keys.setdefault(key_signature, []).append(tr)

to replace your current batching by state.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, makes sense

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, took this approach in latest version. Thanks for the review.

@saschwartz saschwartz force-pushed the schwarts/task-run-bulk-insert branch from 3fe7836 to 2e823a0 Compare December 5, 2025 20:39
Copy link
Member

@desertaxle desertaxle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Thanks for the awesome PR @saschwartz!

@desertaxle desertaxle merged commit 42dcb00 into PrefectHQ:main Dec 8, 2025
88 of 89 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants