Distributed Abort Controller
A distributed AbortController that uses durable workflows for cross-process cancellation signaling.
Use this pattern when you need an AbortController-like interface that works across distributed systems. The controller uses a durable workflow to coordinate cancellation — calling .abort() on one machine triggers the .signal on any other machine.
When to use this
- Cross-process cancellation — Cancel a long-running operation from a different server, worker, or edge function
- Durable cancellation — The abort signal persists even if the process that created it crashes
- UI stop buttons — Let users cancel operations running on the server from the browser
- Timeout coordination — The built-in TTL auto-expires stale controllers
Pattern
The DistributedAbortController class encapsulates a workflow that:
- Accepts a user-provided unique ID (like a chat ID or task ID)
- Creates or reconnects to an existing workflow using that ID
- Waits for a hook signal OR TTL expiration
- Writes a cancellation message to the run's stream when triggered
Core Implementation
import { defineHook, getWritable, sleep } from "workflow";
import { start, getRun, getHookByToken } from "workflow/api";
// Default TTL: 24 hours
const DEFAULT_TTL_MS = 24 * 60 * 60 * 1000;
// Default grace period: 1 hour (keeps hook alive after abort for late subscribers)
const DEFAULT_GRACE_MS = 60 * 60 * 1000;
// Hook to trigger the abort signal
export const abortHook = defineHook<{ reason?: string }>();
// The abort message written to the stream
export type AbortMessage = {
type: "abort";
reason?: string;
expired?: boolean;
};
// Helper to create a consistent hook token from the user ID
function getAbortToken(id: string): string {
return `abort:${id}`;
}
// Step function that writes the abort message to the stream
async function writeAbortSignal(reason?: string, expired?: boolean) {
"use step";
const writable = getWritable<AbortMessage>();
const writer = writable.getWriter();
try {
await writer.write({ type: "abort", reason, expired });
} finally {
writer.releaseLock();
}
await writable.close();
}
// Workflow that waits for abort or TTL expiration
export async function abortControllerWorkflow(
id: string,
ttlMs: number,
graceMs: number
) {
"use workflow";
const startTime = Date.now();
const hook = abortHook.create({ token: getAbortToken(id) });
// Race: manual abort OR TTL expiration
const result = await Promise.race([
hook.then((payload) => ({
reason: payload.reason,
expired: false,
})),
sleep(`${ttlMs}ms`).then(() => ({
reason: "Controller expired",
expired: true,
})),
]);
await writeAbortSignal(result.reason, result.expired);
// Only sleep through grace period on TTL expiration (keeps hook alive for late subscribers).
// Manual aborts complete immediately.
if (result.expired) {
const elapsed = Date.now() - startTime;
const remainingTime = graceMs - (elapsed - ttlMs);
if (remainingTime > 0) {
await sleep(`${remainingTime}ms`);
}
}
return { aborted: true, reason: result.reason, expired: result.expired };
}
/**
* A distributed abort controller that works across process boundaries.
* Uses a semantically meaningful ID (like a chat ID or task ID) to coordinate.
*/
export class DistributedAbortController {
private id: string;
readonly runId: string;
private constructor(id: string, runId: string) {
this.id = id;
this.runId = runId;
}
/**
* Creates or reconnects to a distributed abort controller.
* If a controller with this ID already exists, reconnects to it.
* Otherwise, starts a new workflow.
*
* @param id - A unique, semantically meaningful ID (e.g., "chat:123")
* @param options.ttlMs - Time-to-live in ms (default: 24 hours)
* @param options.graceMs - Grace period after abort (default: 1 hour)
*/
static async create(
id: string,
options: { ttlMs?: number; graceMs?: number } = {}
): Promise<DistributedAbortController> {
const { ttlMs = DEFAULT_TTL_MS, graceMs = DEFAULT_GRACE_MS } = options;
const token = getAbortToken(id);
// Try to find an existing run with this hook token
const existingHook = await getHookByToken(token).catch(() => null);
if (existingHook) {
// Reconnect to existing controller
return new DistributedAbortController(id, existingHook.runId);
}
// Create a new workflow
const run = await start(abortControllerWorkflow, [id, ttlMs, graceMs]);
return new DistributedAbortController(id, run.runId);
}
/**
* Triggers the abort signal.
* Idempotent: safe to call multiple times or after the workflow has completed.
*/
async abort(reason?: string): Promise<void> {
try {
await abortHook.resume(getAbortToken(this.id), { reason });
} catch (error) {
const msg = error instanceof Error ? error.message.toLowerCase() : '';
if (msg.includes('not found') || msg.includes('expired')) {
return;
}
throw error;
}
}
/**
* Returns an AbortSignal that fires when abort() is called or TTL expires.
* The signal fires with a reason indicating what triggered it.
*/
get signal(): AbortSignal {
const run = getRun<{ aborted: boolean; reason?: string; expired?: boolean }>(this.runId);
const controller = new AbortController();
const readable = run.getReadable<AbortMessage>();
(async () => {
const reader = readable.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
if (value.type === "abort") {
const reason = value.expired
? `${value.reason} (expired)`
: value.reason;
controller.abort(reason);
break;
}
}
} catch (error) {
if (!controller.signal.aborted) {
controller.abort(
error instanceof Error ? error.message : "Stream read failed"
);
}
} finally {
reader.releaseLock();
}
})();
return controller.signal;
}
}Usage: Single Process
import { DistributedAbortController } from "./distributed-abort-controller";
// Create a controller with a meaningful ID
const controller = await DistributedAbortController.create("chat:user-123");
// Get the signal and use it with fetch
const signal = controller.signal;
const response = await fetch("https://api.example.com/long-operation", {
signal,
});
// Later: abort the operation
await controller.abort("User cancelled");Usage: Cross-Process Coordination
import { DistributedAbortController } from "./distributed-abort-controller";
// Process A: Create the controller
const controller = await DistributedAbortController.create("task:build-123");
// start long operation using controller.signal...
// Process B: Reconnect and abort (no run ID sharing needed!)
const sameController = await DistributedAbortController.create("task:build-123");
await sameController.abort("Cancelled by admin");
// Process C: Reconnect and listen
const anotherRef = await DistributedAbortController.create("task:build-123");
anotherRef.signal.addEventListener("abort", (e) => {
console.log("Task was cancelled:", (e.target as AbortSignal).reason);
});Custom TTL
import { DistributedAbortController } from "./distributed-abort-controller";
// Short-lived controller for a quick operation (5 minutes)
const shortLived = await DistributedAbortController.create("quick-task", {
ttlMs: 5 * 60 * 1000,
});
// Long-lived controller for batch jobs (7 days)
const longLived = await DistributedAbortController.create("batch-job", {
ttlMs: 7 * 24 * 60 * 60 * 1000,
});
// When TTL expires, the signal fires with expired reason
shortLived.signal.addEventListener("abort", (e) => {
const reason = (e.target as AbortSignal).reason;
if (reason?.includes("expired")) {
console.log("Controller expired, cleaning up...");
}
});API Route for Remote Abort
import { DistributedAbortController } from "@/lib/distributed-abort-controller";
export async function POST(
request: Request,
{ params }: { params: Promise<{ id: string }> }
) {
const { id } = await params;
const { reason } = await request.json();
const controller = await DistributedAbortController.create(id);
await controller.abort(reason || "Cancelled via API");
return Response.json({ success: true });
}Client Cancel Button
"use client";
export function CancelButton({ taskId }: { taskId: string }) {
const handleCancel = async () => {
await fetch(`/api/abort/${taskId}`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ reason: "User clicked cancel" }),
});
};
return (
<button type="button" onClick={handleCancel}>
Cancel Operation
</button>
);
}Tips
- Use semantic IDs — Use meaningful IDs like
chat:123ortask:abcinstead of random UUIDs - Create is idempotent — Calling
create()with the same ID reconnects to the existing controller - TTL auto-cleanup — Workflows self-terminate after TTL expires; no manual cleanup needed
- Signal is a getter — Each access to
.signalcreates a new listener; cache it if needed - One-shot — Once aborted or expired, the workflow completes; create a new controller for new operations
Key APIs
defineHook()— type-safe hook for the abort triggergetWritable()— write abort messages to the streamsleep()— TTL timer for auto-expirationstart()— start the abort controller workflowgetHookByToken()— find existing run by hook tokengetRun()— reconnect to the workflow's readable stream