Skip to content

Conversation

@grutt
Copy link
Contributor

@grutt grutt commented Dec 8, 2025

Description

Introduces configurable batch assignment/flush support across scheduler, repository, API contracts, and SDKs. Adds batch metadata columns, persistence, and runtime tracking along with the orchestration logic to evaluate batch keys, buffer assignments, and flush batches via new scheduler workflows. Includes regenerated contracts/clients and new TypeScript examples with end-to-end coverage.

Fixes #N/A

Type of change

  • New feature (non-breaking change which adds functionality)
  • Test changes (add, refactor, improve or change a test)
  • This change requires a documentation update

What's Changed

  • Add migration wiring batch columns on Step, queue/task tables, runtime metadata, and new v1_task_batch_run tracking table.
  • Extend repository APIs to compute batch keys, manage batch runs, update runtimes, and surface runtime data through ListTasks.
  • Implement scheduler batch buffering/flush machinery with timers, dispatcher/worker coordination, and completion handling.
  • Regenerate gRPC/OpenAPI contracts and update Go + TypeScript clients to expose batch configuration and runtime fields.
  • Provide examples and tests for batch assignment, including Go scheduler/unit coverage and TypeScript e2e exercises.
@vercel
Copy link

vercel bot commented Dec 8, 2025

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Review Updated (UTC)
hatchet-docs Ready Ready Preview, Comment Dec 23, 2025 9:17pm
hatchet-v0-docs Ready Ready Preview, Comment Dec 23, 2025 9:17pm
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a comprehensive batch processing feature for task execution, enabling tasks to be grouped and flushed together based on configurable batch sizes, flush intervals, and partition keys. The implementation spans database schema, repository layer, scheduler orchestration, and SDK interfaces.

Key Changes:

  • Database schema additions for batch tracking (v1_task_batch_run table, batch columns on tasks/queue items/runtime)
  • Scheduler batch coordinator to buffer tasks and decide when to flush batches
  • Repository methods for batch key computation, batch run reservation, and metadata persistence
  • TypeScript SDK batch task API with configuration options
  • End-to-end test coverage and examples

Reviewed changes

Copilot reviewed 73 out of 75 changed files in this pull request and generated no comments.

Show a summary per file
File Description
sql/schema/v1-olap.sql Adds batch-related event types (BATCH_BUFFERED, WAITING_FOR_BATCH, BATCH_FLUSHED)
sql/schema/v1-core.sql Adds batch_key column to task/queue tables, batch metadata to runtime, new v1_task_batch_run tracking table
sql/schema/v0.sql Adds batch configuration columns to Step table
sdks/typescript/src/v1/task.ts Defines BatchTaskFn and BatchTaskConfig types for SDK
sdks/typescript/src/v1/examples/batch_assign/*.ts Provides batch task examples (worker, task definition, run, e2e tests)
sdks/typescript/src/v1/declaration.ts Implements CreateBatchTaskWorkflow factory with validation
sdks/typescript/src/v1/client/worker/worker-internal.ts Adds batch action handler, controller, and queue management
sdks/typescript/src/v1/client/client.ts Exposes batchTask method on HatchetClient
pkg/scheduling/v1/scheduler.go Adds BatchCoordinator interface and batch decision logic
pkg/scheduling/v1/queuer.go Updates queue processing to handle buffered items
pkg/scheduling/v1/pool.go Manages batch coordinator per tenant
pkg/repository/v1/workflow.go Adds ListStepBatchConfigs and batch config persistence
pkg/repository/v1/task.go Implements batch key evaluation, batch run tracking, runtime metadata updates
pkg/repository/v1/scheduler_queue.go Adds buffered task runtime insertion logic
internal/services/scheduler/v1/scheduler.go Implements batch buffering, flush orchestration, and event emission
internal/services/dispatcher/subscribed_worker_v1.go Populates batch metadata in assigned actions

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

// validate the token
// validate the token twice to ensure cache hit does not error
_, _, err = jwtManager.ValidateTenantToken(context.Background(), token.Token)
assert.NoError(t, err)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flyby changes here.. im not clear on why these tests stated failing

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this has been flaky for a while, haven't had the chance to look into it


// Delete removes a value from the cache with the given key
Delete(key string)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flyby related to the token_test changes

if self.batch.batch_max_interval is not None:
interval_ms = int(self.batch.batch_max_interval.total_seconds() * 1000)
batch_max_interval = (
str_to_timedelta(self.batch.batch_max_interval)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure I'm following this 😅 why is self.batch.batch_max_interval ever a string? shouldn't it be a timedelta?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because its using our Duration type for consistency...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oooh, okay, I see 🤔

the reason for Duration is to allow either a Go duration string (1s, 1m, etc.) or a timedelta, but this was mostly for backwards compat with v0. I think we should only allow timedelta here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done :)

Copy link
Contributor

@abelanger5 abelanger5 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still not seeing that the comments on the migration/schema files have been resolved btw

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

4 participants