-
Notifications
You must be signed in to change notification settings - Fork 282
Feat: batch flush #2617
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Feat: batch flush #2617
Conversation
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR introduces a comprehensive batch processing feature for task execution, enabling tasks to be grouped and flushed together based on configurable batch sizes, flush intervals, and partition keys. The implementation spans database schema, repository layer, scheduler orchestration, and SDK interfaces.
Key Changes:
- Database schema additions for batch tracking (
v1_task_batch_runtable, batch columns on tasks/queue items/runtime) - Scheduler batch coordinator to buffer tasks and decide when to flush batches
- Repository methods for batch key computation, batch run reservation, and metadata persistence
- TypeScript SDK batch task API with configuration options
- End-to-end test coverage and examples
Reviewed changes
Copilot reviewed 73 out of 75 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
sql/schema/v1-olap.sql |
Adds batch-related event types (BATCH_BUFFERED, WAITING_FOR_BATCH, BATCH_FLUSHED) |
sql/schema/v1-core.sql |
Adds batch_key column to task/queue tables, batch metadata to runtime, new v1_task_batch_run tracking table |
sql/schema/v0.sql |
Adds batch configuration columns to Step table |
sdks/typescript/src/v1/task.ts |
Defines BatchTaskFn and BatchTaskConfig types for SDK |
sdks/typescript/src/v1/examples/batch_assign/*.ts |
Provides batch task examples (worker, task definition, run, e2e tests) |
sdks/typescript/src/v1/declaration.ts |
Implements CreateBatchTaskWorkflow factory with validation |
sdks/typescript/src/v1/client/worker/worker-internal.ts |
Adds batch action handler, controller, and queue management |
sdks/typescript/src/v1/client/client.ts |
Exposes batchTask method on HatchetClient |
pkg/scheduling/v1/scheduler.go |
Adds BatchCoordinator interface and batch decision logic |
pkg/scheduling/v1/queuer.go |
Updates queue processing to handle buffered items |
pkg/scheduling/v1/pool.go |
Manages batch coordinator per tenant |
pkg/repository/v1/workflow.go |
Adds ListStepBatchConfigs and batch config persistence |
pkg/repository/v1/task.go |
Implements batch key evaluation, batch run tracking, runtime metadata updates |
pkg/repository/v1/scheduler_queue.go |
Adds buffered task runtime insertion logic |
internal/services/scheduler/v1/scheduler.go |
Implements batch buffering, flush orchestration, and event emission |
internal/services/dispatcher/subscribed_worker_v1.go |
Populates batch metadata in assigned actions |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // validate the token | ||
| // validate the token twice to ensure cache hit does not error | ||
| _, _, err = jwtManager.ValidateTenantToken(context.Background(), token.Token) | ||
| assert.NoError(t, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flyby changes here.. im not clear on why these tests stated failing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this has been flaky for a while, haven't had the chance to look into it
|
|
||
| // Delete removes a value from the cache with the given key | ||
| Delete(key string) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flyby related to the token_test changes
| if self.batch.batch_max_interval is not None: | ||
| interval_ms = int(self.batch.batch_max_interval.total_seconds() * 1000) | ||
| batch_max_interval = ( | ||
| str_to_timedelta(self.batch.batch_max_interval) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure I'm following this 😅 why is self.batch.batch_max_interval ever a string? shouldn't it be a timedelta?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because its using our Duration type for consistency...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oooh, okay, I see 🤔
the reason for Duration is to allow either a Go duration string (1s, 1m, etc.) or a timedelta, but this was mostly for backwards compat with v0. I think we should only allow timedelta here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done :)
abelanger5
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still not seeing that the comments on the migration/schema files have been resolved btw
Description
Introduces configurable batch assignment/flush support across scheduler, repository, API contracts, and SDKs. Adds batch metadata columns, persistence, and runtime tracking along with the orchestration logic to evaluate batch keys, buffer assignments, and flush batches via new scheduler workflows. Includes regenerated contracts/clients and new TypeScript examples with end-to-end coverage.
Fixes #N/A
Type of change
What's Changed
Step, queue/task tables, runtime metadata, and newv1_task_batch_runtracking table.ListTasks.