Overview
@kubiks/otel-upstash-workflow provides comprehensive OpenTelemetry instrumentation for the Upstash Workflow SDK . Capture spans for workflow executions, steps, sleep operations, API calls, and event waiting with detailed performance metrics.
Visualize your workflow executions with detailed span information including steps, sleep operations, API calls, and performance metrics.
Pre-release Note: This package instruments the Upstash Workflow SDK, which is currently in pre-release. The API may change as the Workflow SDK evolves.
Installation
npm install @kubiks/otel-upstash-workflow
Peer Dependencies: @opentelemetry/api >= 1.9.0, @upstash/workflow >= 0.0.0
Quick Start
Instrumenting Workflow Handlers
import { serve as originalServe } from "@upstash/workflow" ;
import { instrumentWorkflowServe } from "@kubiks/otel-upstash-workflow" ;
const serve = instrumentWorkflowServe ( originalServe );
export const POST = serve ( async ( context ) => {
const result1 = await context . run ( "step-1" , async () => {
return await processData ();
});
await context . sleep ( "wait-5s" , 5 );
const result2 = await context . run ( "step-2" , async () => {
return await saveResults ( result1 );
});
return result2 ;
});
instrumentWorkflowServe wraps the serve function to trace the entire workflow execution and all steps—no configuration changes needed. Every workflow execution creates a server span with child spans for each step.
Instrumenting Workflow Client
import { Client } from "@upstash/workflow" ;
import { instrumentWorkflowClient } from "@kubiks/otel-upstash-workflow" ;
const client = instrumentWorkflowClient (
new Client ({ baseUrl: process . env . QSTASH_URL ! , token: process . env . QSTASH_TOKEN ! })
);
await client . trigger ({
url: "https://your-app.com/api/workflow" ,
body: { data: "example" },
});
instrumentWorkflowClient wraps the workflow client to trace workflow triggers, creating client spans for each trigger operation.
Configuration
With Step Data Capture
Optionally capture step inputs and outputs for debugging:
const serve = instrumentWorkflowServe ( originalServe , {
captureStepData: true , // Enable step data capture (default: false)
maxStepDataLength: 2048 , // Max characters to capture (default: 1024)
});
export const POST = serve ( async ( context ) => {
// Your workflow - all steps are traced with input/output capture
});
Step data capture is disabled by default to protect sensitive data. Only enable in secure, development environments.
What Gets Traced
This instrumentation provides two main functions:
instrumentWorkflowClient Wraps the Workflow Client to trace workflow triggers with SpanKind.CLIENT
instrumentWorkflowServe Wraps the serve function to trace execution and all workflow steps with SpanKind.SERVER
Workflow Handler Instrumentation
The instrumentWorkflowServe function wraps the serve function, creating a span with SpanKind.SERVER for the entire workflow execution. All workflow steps (context.run, context.sleep, etc.) automatically create child spans.
Client Instrumentation
The instrumentWorkflowClient function wraps the client’s trigger method, creating a span with SpanKind.CLIENT for each workflow trigger operation.
Span Hierarchy
The instrumentation creates the following span hierarchy:
[SERVER] workflow.execute
├─ [INTERNAL] workflow.step.step-1 (context.run)
├─ [INTERNAL] workflow.step.wait-5s (context.sleep)
├─ [CLIENT] workflow.step.api-call (context.call)
└─ [INTERNAL] workflow.step.wait-event (context.waitForEvent)
Separate client-side triggers create independent traces:
[CLIENT] workflow.trigger
Span Attributes
Workflow Handler Spans (instrumentWorkflowServe)
Attribute Description Example
workflow.systemConstant value upstash upstashworkflow.operationOperation type executeworkflow.idWorkflow ID from headers wf_123workflow.run_idWorkflow run ID from headers run_456workflow.urlWorkflow URL from headers https://example.com/api/workflowhttp.status_codeHTTP response status 200
Client Trigger Spans (instrumentWorkflowClient)
Attribute Description Example
workflow.systemConstant value upstash upstashworkflow.operationOperation type triggerworkflow.urlTarget workflow URL https://example.com/api/workflowworkflow.idWorkflow ID from response wf_123workflow.run_idWorkflow run ID from response run_456
Step Spans (context.run)
Attribute Description Example
workflow.systemConstant value upstash upstashworkflow.operationOperation type stepworkflow.step.nameStep name step-1workflow.step.typeStep type runworkflow.step.duration_msStep execution time in ms 150workflow.step.outputStep output (if enabled) {"result":"success"}
Sleep Spans (context.sleep, context.sleepFor, context.sleepUntil)
Attribute Description Example
workflow.systemConstant value upstash upstashworkflow.operationOperation type stepworkflow.step.nameStep name (if named sleep) wait-5sworkflow.step.typeStep type sleepworkflow.sleep.duration_msSleep duration in ms 5000workflow.sleep.until_timestampTarget timestamp (sleepUntil) 1704067200000
Call Spans (context.call)
Attribute Description Example
workflow.systemConstant value upstash upstashworkflow.operationOperation type stepworkflow.step.nameStep name api-callworkflow.step.typeStep type callworkflow.call.urlTarget URL https://api.example.com/dataworkflow.call.methodHTTP method POSTworkflow.call.status_codeResponse status code 200workflow.step.inputRequest body (if enabled) {"userId":"123"}workflow.step.outputResponse data (if enabled) {"status":"ok"}
Event Spans (context.waitForEvent)
Attribute Description Example
workflow.systemConstant value upstash upstashworkflow.operationOperation type stepworkflow.step.nameStep name wait-eventworkflow.step.typeStep type waitForEventworkflow.event.idEvent ID evt_123workflow.event.timeout_msTimeout in ms 60000workflow.step.outputEvent data (if enabled) {"received":true}
Step Data Attributes (Optional)
When captureStepData is enabled:
Attribute Description Captured By
workflow.step.inputStep input data Client trigger, context.call workflow.step.outputStep output data All context methods
The instrumentation captures workflow metadata and step details to help with debugging and monitoring. Step data capture is disabled by default to protect sensitive data.
Usage Examples
Basic Workflow Execution
import { serve as originalServe } from "@upstash/workflow" ;
import { instrumentWorkflowServe } from "@kubiks/otel-upstash-workflow" ;
const serve = instrumentWorkflowServe ( originalServe );
export const POST = serve ( async ( context ) => {
const data = await context . run ( "fetch-data" , async () => {
return await fetchFromDatabase ();
});
const processed = await context . run ( "process-data" , async () => {
return await processData ( data );
});
return { success: true , result: processed };
});
Workflow with Sleep
const serve = instrumentWorkflowServe ( originalServe );
export const POST = serve ( async ( context ) => {
await context . run ( "send-email" , async () => {
await sendEmail ();
});
await context . sleep ( "wait-5s" , 5 );
await context . run ( "check-status" , async () => {
return await checkEmailStatus ();
});
return { done: true };
});
Workflow with External API Calls
const serve = instrumentWorkflowServe ( originalServe );
export const POST = serve ( async ( context ) => {
const apiResponse = await context . call ( "fetch-user" , {
url: "https://api.example.com/users/123" ,
method: "GET" ,
});
const result = await context . run ( "process-user" , async () => {
return await processUser ( apiResponse );
});
return result ;
});
Workflow with Event Waiting
const serve = instrumentWorkflowServe ( originalServe );
export const POST = serve ( async ( context ) => {
await context . run ( "start-process" , async () => {
await startLongRunningProcess ();
});
const event = await context . waitForEvent ( "process-complete" , {
eventId: "evt_123" ,
timeout: 60000 ,
});
await context . run ( "finalize" , async () => {
return await finalizeProcess ( event );
});
return { success: true };
});
Client Triggering Workflows
import { Client } from "@upstash/workflow" ;
import { instrumentWorkflowClient } from "@kubiks/otel-upstash-workflow" ;
const client = instrumentWorkflowClient (
new Client ({
baseUrl: process . env . QSTASH_URL ! ,
token: process . env . QSTASH_TOKEN ! ,
})
);
const result = await client . trigger ({
url: "https://your-app.vercel.app/api/workflow" ,
body: {
userId: "user_123" ,
action: "process_data" ,
},
});
console . log ( "Workflow triggered:" , result . workflowId );
With Step Data Capture
const serve = instrumentWorkflowServe ( originalServe , {
captureStepData: true , // Enable input/output capture
maxStepDataLength: 2048 , // Increase truncation limit
});
export const POST = serve ( async ( context ) => {
const result = await context . run ( "complex-calculation" , async () => {
return {
value: 42 ,
timestamp: Date . now (),
metadata: { processed: true },
};
});
return result ;
});
Complete Next.js Integration Example
Workflow Handler
app/api/workflow/route.ts
import { serve as originalServe } from "@upstash/workflow" ;
import { instrumentWorkflowServe } from "@kubiks/otel-upstash-workflow" ;
const serve = instrumentWorkflowServe ( originalServe );
export const POST = serve ( async ( context ) => {
const orderId = context . requestPayload . orderId ;
const result = await context . run ( "process-order" , async () => {
return await processOrder ( orderId );
});
await context . sleep ( "wait-1-minute" , 60 );
await context . run ( "send-notification" , async () => {
return await sendNotification ( orderId );
});
return { success: true , order: result };
});
Triggering Workflows
"use server" ;
import { Client } from "@upstash/workflow" ;
import { instrumentWorkflowClient } from "@kubiks/otel-upstash-workflow" ;
const workflowClient = instrumentWorkflowClient (
new Client ({
baseUrl: process . env . QSTASH_URL ! ,
token: process . env . QSTASH_TOKEN ! ,
})
);
export async function createOrder ( orderId : string ) {
const result = await workflowClient . trigger ({
url: "https://your-app.vercel.app/api/workflow" ,
body: { orderId },
});
return {
workflowId: result . workflowId ,
runId: result . workflowRunId ,
};
}
Configuration Options
interface InstrumentationConfig {
/**
* Whether to capture step inputs/outputs in spans.
* @default false
*/
captureStepData ?: boolean ;
/**
* Maximum length of step input/output to capture.
* Data longer than this will be truncated.
* @default 1024
*/
maxStepDataLength ?: number ;
/**
* Custom tracer name.
* @default " @kubiks/otel-upstash-workflow "
*/
tracerName ?: string ;
}
Best Practices
Name Your Steps Meaningfully
Use descriptive step names to make traces easier to understand: // Good: Descriptive names
await context . run ( "fetch-user-from-database" , async () => { ... });
await context . run ( "send-welcome-email" , async () => { ... });
// Bad: Generic names
await context . run ( "step1" , async () => { ... });
await context . run ( "process" , async () => { ... });
Use Named Sleeps for Clarity
Name sleep operations to understand workflow timing: await context . sleep ( "wait-for-email-delivery" , 5 );
await context . sleep ( "cool-down-period" , 60 );
Only enable step data capture in development or when data doesn’t contain sensitive information: const serve = instrumentWorkflowServe ( originalServe , {
captureStepData: process . env . NODE_ENV === "development" ,
});
Implement proper error handling within workflow steps: await context . run ( "risky-operation" , async () => {
try {
return await performRiskyOperation ();
} catch ( error ) {
console . error ( "Operation failed:" , error );
throw error ;
}
});
Troubleshooting
Ensure OpenTelemetry is initialized before using workflows: import { NodeSDK } from "@opentelemetry/sdk-node" ;
const sdk = new NodeSDK ({
// ... configuration
});
sdk . start ();
Missing Environment Variables
Make sure required environment variables are set: QSTASH_URL = https://qstash.upstash.io
QSTASH_TOKEN = your_token
Check that your endpoint is:
Publicly accessible
Returns 2xx status codes
Properly configured with Upstash Workflow
Resources
License
MIT