|
| 1 | +--- |
| 2 | +draft: true |
| 3 | +title: 'pgflow 0.12.0: Simpler Handler Signatures for Flow Composition' |
| 4 | +description: 'Breaking change: asymmetric handler signatures remove the run wrapper, enabling functional composition' |
| 5 | +date: 2025-12-24 |
| 6 | +authors: |
| 7 | + - jumski |
| 8 | +tags: |
| 9 | + - release |
| 10 | + - breaking-change |
| 11 | + - migration |
| 12 | +featured: true |
| 13 | +--- |
| 14 | + |
| 15 | +import { Aside, Steps } from "@astrojs/starlight/components"; |
| 16 | + |
| 17 | +pgflow 0.12.0 introduces asymmetric handler signatures - a breaking change that removes the `run` wrapper from step inputs, enabling cleaner functional composition and preparing the foundation for subflows. |
| 18 | + |
| 19 | +## What Changed |
| 20 | + |
| 21 | +Handler signatures are now asymmetric based on step type: |
| 22 | + |
| 23 | +| Step Type | Before | After | |
| 24 | +|-----------|--------|-------| |
| 25 | +| Root step | `(input) => input.run.xxx` | `(flowInput, ctx) => flowInput.xxx` | |
| 26 | +| Dependent step | `(input) => input.dep.xxx` | `(deps, ctx) => deps.dep.xxx` | |
| 27 | +| Dependent (needs flowInput) | `(input) => input.run.xxx` | `(deps, ctx) => ctx.flowInput.xxx` | |
| 28 | +| Map step | `(item) => ...` | `(item, ctx) => ctx.flowInput.xxx` | |
| 29 | + |
| 30 | +<Aside type="caution" title="Breaking Change"> |
| 31 | +All handler signatures must be updated. The `input.run` pattern no longer exists. |
| 32 | +</Aside> |
| 33 | + |
| 34 | +## Why This Change? |
| 35 | + |
| 36 | +The previous `run` wrapper blocked functional composition: |
| 37 | + |
| 38 | +```typescript |
| 39 | +// OLD: The run wrapper created type mismatches |
| 40 | +// Root steps received: { run: flowInput } |
| 41 | +// Dependent steps received: { run: flowInput, dep1: output1 } |
| 42 | + |
| 43 | +// This meant subflows couldn't compose cleanly: |
| 44 | +const ChildFlow = new Flow<{ data: string }>() |
| 45 | + .step({ slug: "process" }, (input) => { |
| 46 | + // Expected: input.data |
| 47 | + // Received: { run: parentInput, prep: { data: "..." } } |
| 48 | + // TYPE MISMATCH! |
| 49 | + }); |
| 50 | +``` |
| 51 | + |
| 52 | +By removing the wrapper, outputs from one flow can now become inputs to another without transformation. |
| 53 | + |
| 54 | +## New Handler Signatures |
| 55 | + |
| 56 | +### Root Steps |
| 57 | + |
| 58 | +Root steps receive flow input directly as the first parameter: |
| 59 | + |
| 60 | +```typescript |
| 61 | +// .step() - root |
| 62 | +.step({ slug: 'init' }, (flowInput, ctx) => { |
| 63 | + console.log(ctx.env.API_KEY); // Context still available |
| 64 | + return { userId: flowInput.userId }; |
| 65 | +}) |
| 66 | + |
| 67 | +// .array() - root (returns array for parallel processing) |
| 68 | +.array({ slug: 'fetchAll' }, (flowInput, ctx) => { |
| 69 | + return flowInput.urls; // Returns array directly |
| 70 | +}) |
| 71 | +``` |
| 72 | + |
| 73 | +### Dependent Steps |
| 74 | + |
| 75 | +Dependent steps receive only their dependencies as the first parameter. Access `flowInput` via context if needed: |
| 76 | + |
| 77 | +```typescript |
| 78 | +// .step() - dependent (needs flowInput) |
| 79 | +.step({ slug: 'process', dependsOn: ['init'] }, (deps, ctx) => { |
| 80 | + const config = ctx.flowInput.config; // Via context |
| 81 | + const data = deps.init.userId; // Dependencies directly |
| 82 | + return { result: process(data, config) }; |
| 83 | +}) |
| 84 | + |
| 85 | +// .step() - dependent (doesn't need flowInput - most common) |
| 86 | +.step({ slug: 'save', dependsOn: ['process'] }, (deps) => { |
| 87 | + return deps.process.result; // Just use deps |
| 88 | +}) |
| 89 | + |
| 90 | +// .array() - dependent |
| 91 | +.array({ slug: 'splitResults', dependsOn: ['process'] }, (deps, ctx) => { |
| 92 | + return deps.process.items; // Returns array for parallel processing |
| 93 | +}) |
| 94 | +``` |
| 95 | + |
| 96 | +### Map Steps |
| 97 | + |
| 98 | +Map steps receive individual array elements. Access `flowInput` via context: |
| 99 | + |
| 100 | +```typescript |
| 101 | +// .map() - processes each array element |
| 102 | +.map({ slug: 'processItem', array: 'fetchAll' }, (item, ctx) => { |
| 103 | + const config = ctx.flowInput.config; // Original flow input via context |
| 104 | + return transform(item, config); |
| 105 | +}) |
| 106 | +``` |
| 107 | + |
| 108 | +## Upgrading Your Flows |
| 109 | + |
| 110 | +### Migration Patterns |
| 111 | + |
| 112 | +Apply these transformations to your handlers: |
| 113 | + |
| 114 | +#### Root Steps (.step, .array) |
| 115 | + |
| 116 | +```typescript del="input" del="input.run" ins="flowInput" |
| 117 | +// BEFORE |
| 118 | +.step({ slug: 'init' }, (input) => { |
| 119 | + return { userId: input.run.userId }; |
| 120 | +}) |
| 121 | + |
| 122 | +// AFTER |
| 123 | +.step({ slug: 'init' }, (flowInput) => { |
| 124 | + return { userId: flowInput.userId }; |
| 125 | +}) |
| 126 | +``` |
| 127 | + |
| 128 | +```typescript del="input" del="input.run" ins="flowInput" |
| 129 | +// BEFORE |
| 130 | +.array({ slug: 'getUrls' }, (input) => { |
| 131 | + return input.run.urls; |
| 132 | +}) |
| 133 | + |
| 134 | +// AFTER |
| 135 | +.array({ slug: 'getUrls' }, (flowInput) => { |
| 136 | + return flowInput.urls; |
| 137 | +}) |
| 138 | +``` |
| 139 | + |
| 140 | +#### Dependent Steps - Needing flowInput |
| 141 | + |
| 142 | +```typescript del="input" del="input.run" del="input.init" ins="deps, ctx" ins="ctx.flowInput" ins="deps.init" |
| 143 | +// BEFORE |
| 144 | +.step({ slug: 'process', dependsOn: ['init'] }, (input) => { |
| 145 | + const config = input.run.config; |
| 146 | + const data = input.init.data; |
| 147 | + return combine(data, config); |
| 148 | +}) |
| 149 | + |
| 150 | +// AFTER |
| 151 | +.step({ slug: 'process', dependsOn: ['init'] }, (deps, ctx) => { |
| 152 | + const config = ctx.flowInput.config; |
| 153 | + const data = deps.init.data; |
| 154 | + return combine(data, config); |
| 155 | +}) |
| 156 | +``` |
| 157 | + |
| 158 | +#### Dependent Steps - Not Needing flowInput (Common Case) |
| 159 | + |
| 160 | +```typescript del="input" del="input.process" ins="deps" ins="deps.process" |
| 161 | +// BEFORE |
| 162 | +.step({ slug: 'save', dependsOn: ['process'] }, (input) => { |
| 163 | + return saveToDb(input.process.result); |
| 164 | +}) |
| 165 | + |
| 166 | +// AFTER |
| 167 | +.step({ slug: 'save', dependsOn: ['process'] }, (deps) => { |
| 168 | + return saveToDb(deps.process.result); |
| 169 | +}) |
| 170 | +``` |
| 171 | + |
| 172 | +#### Map Steps - Accessing flowInput |
| 173 | + |
| 174 | +```typescript ins="item, ctx" ins="ctx.flowInput.options" |
| 175 | +// BEFORE |
| 176 | +.map({ slug: 'transform', array: 'items' }, (item) => { |
| 177 | + return process(item); |
| 178 | +}) |
| 179 | + |
| 180 | +// AFTER (if you need flowInput) |
| 181 | +.map({ slug: 'transform', array: 'items' }, (item, ctx) => { |
| 182 | + return process(item, ctx.flowInput.options); |
| 183 | +}) |
| 184 | +``` |
| 185 | + |
| 186 | +### Production Upgrade Guide |
| 187 | + |
| 188 | +<Steps> |
| 189 | + |
| 190 | +1. **Update handlers locally and test** |
| 191 | + |
| 192 | + Update all your flow handlers to the new signatures. Test locally: |
| 193 | + |
| 194 | + ```bash frame="none" |
| 195 | + npx supabase functions serve my-worker |
| 196 | + ``` |
| 197 | + |
| 198 | +2. **Disable worker functions** |
| 199 | + |
| 200 | + Prevent cron from starting new workers with old code: |
| 201 | + |
| 202 | + ```sql |
| 203 | + UPDATE pgflow.worker_functions |
| 204 | + SET enabled = false |
| 205 | + WHERE function_name = 'my-worker'; |
| 206 | + ``` |
| 207 | + |
| 208 | +3. **Deprecate existing workers** |
| 209 | + |
| 210 | + ```sql |
| 211 | + UPDATE pgflow.workers |
| 212 | + SET deprecated_at = NOW() |
| 213 | + WHERE function_name = 'my-worker' |
| 214 | + AND deprecated_at IS NULL; |
| 215 | + ``` |
| 216 | + |
| 217 | + Deprecated workers finish their current task but won't call `start_tasks` again - so they won't be affected by the SQL changes. |
| 218 | + |
| 219 | +4. **Apply database migration** |
| 220 | + |
| 221 | + ```bash frame="none" |
| 222 | + npx supabase db push |
| 223 | + ``` |
| 224 | + |
| 225 | +5. **Deploy new workers** |
| 226 | + |
| 227 | + ```bash frame="none" |
| 228 | + npx supabase functions deploy my-worker |
| 229 | + ``` |
| 230 | + |
| 231 | +6. **Enable worker functions** |
| 232 | + |
| 233 | + ```sql |
| 234 | + UPDATE pgflow.worker_functions |
| 235 | + SET enabled = true |
| 236 | + WHERE function_name = 'my-worker'; |
| 237 | + ``` |
| 238 | + |
| 239 | + The pgflow cron automatically starts new workers within seconds. |
| 240 | + |
| 241 | +</Steps> |
| 242 | + |
| 243 | +--- |
| 244 | + |
| 245 | +Questions or issues? Join the [Discord community](https://discord.gg/pgflow) or [open a GitHub issue](https://github.com/pgflow-dev/pgflow/issues). |
0 commit comments