Skip to main content

Overview

@kubiks/otel-upstash-queues provides comprehensive OpenTelemetry instrumentation for Upstash QStash. Capture spans for both message publishing and consumption with detailed operation metadata and delivery tracking.
Upstash QStash Trace Visualization
Visualize your message queue operations with detailed span information including message publishing, callbacks, and delivery tracking—from producer to consumer.

Installation

npm install @kubiks/otel-upstash-queues
Peer Dependencies: @opentelemetry/api >= 1.9.0, @upstash/qstash >= 2.0.0

Quick Start

Publishing Messages

import { Client } from "@upstash/qstash";
import { instrumentUpstash } from "@kubiks/otel-upstash-queues";

const client = instrumentUpstash(
  new Client({ token: process.env.QSTASH_TOKEN! })
);

await client.publishJSON({
  url: "https://your-api-endpoint.com/process-image",
  body: { imageId: "123" },
});
instrumentUpstash wraps the QStash client instance you already use—no configuration changes needed. Every SDK call creates a client span with useful attributes.

Consuming Messages

// app/api/process/route.ts
import { verifySignatureAppRouter } from "@upstash/qstash/nextjs";
import { instrumentConsumer } from "@kubiks/otel-upstash-queues";

async function handler(request: Request) {
  const data = await request.json();
  
  // Process your message
  await processImage(data.imageId);
  
  return Response.json({ success: true });
}

// Instrument first, then verify signature
export const POST = verifySignatureAppRouter(instrumentConsumer(handler));
instrumentConsumer wraps your message handler to trace message consumption, creating a SERVER span for each message received and processed.

What Gets Traced

This instrumentation provides two main functions:

instrumentUpstash

Wraps the QStash client to trace message publishing with SpanKind.CLIENT

instrumentConsumer

Wraps your message handler to trace message consumption with SpanKind.SERVER

Configuration

With Body Capture

Optionally capture request/response bodies for debugging:
const client = instrumentUpstash(
  new Client({ token: process.env.QSTASH_TOKEN! }),
  {
    captureBody: true,      // Enable body capture (default: false)
    maxBodyLength: 2048,    // Max characters to capture (default: 1024)
  }
);
Body capture is disabled by default to protect sensitive data. Only enable in secure, development environments.

Span Attributes

Publisher Spans (instrumentUpstash)

AttributeDescriptionExample
messaging.systemConstant value qstashqstash
messaging.operationOperation typepublish
qstash.resourceResource namemessages
qstash.targetFull operation targetmessages.publish
qstash.urlTarget URL for the messagehttps://example.com/api/process
qstash.methodHTTP method (default: POST)POST, PUT, GET
qstash.message_idMessage ID returned by QStashmsg_123
qstash.delayDelay before processing (seconds or string)60 or "1h"
qstash.not_beforeUnix timestamp for earliest processing1672531200
qstash.deduplication_idDeduplication ID for idempotent operationsunique-id-123
qstash.retriesNumber of retry attempts (max)3
qstash.callback_urlSuccess callback URLhttps://example.com/callback
qstash.failure_callback_urlFailure callback URLhttps://example.com/failure

Consumer Spans (instrumentConsumer)

AttributeDescriptionExample
messaging.systemConstant value qstashqstash
messaging.operationOperation typereceive
qstash.resourceResource namemessages
qstash.targetFull operation targetmessages.receive
qstash.message_idMessage ID from QStashmsg_456
qstash.retriedNumber of times retried (actual count)2
qstash.schedule_idSchedule ID (if from scheduled message)schedule_123
qstash.caller_ipIP address of the caller192.168.1.1
http.status_codeHTTP response status code200

Body/Payload Attributes (Optional)

When captureBody is enabled:
AttributeDescriptionCaptured By
qstash.request.bodyRequest/message body contentBoth publisher and consumer
qstash.response.bodyResponse body contentConsumer only

Usage Examples

Basic Message Publishing

import { client } from "@/lib/qstash";

await client.publishJSON({
  url: "https://your-api.com/webhook",
  body: {
    userId: "user_123",
    action: "process_data",
  },
});

// Traced with:
// - qstash.url: "https://your-api.com/webhook"
// - qstash.method: "POST"
// - qstash.message_id: "msg_..."

Delayed Message Publishing

// Delay message processing by 60 seconds
await client.publishJSON({
  url: "https://your-api.com/delayed-task",
  body: { taskId: "task_456" },
  delay: 60,
});

// Traced with:
// - qstash.delay: 60

Message with Callbacks

await client.publishJSON({
  url: "https://your-api.com/process",
  body: { orderId: "order_123" },
  callback: "https://your-api.com/success",
});

// Traced with:
// - qstash.callback_url: "https://your-api.com/success"

Retries and Deduplication

await client.publishJSON({
  url: "https://your-api.com/critical-task",
  body: { taskId: "critical_123" },
  retries: 5,
});

// Traced with:
// - qstash.retries: 5

Message Consumer

// app/api/process/route.ts
import { verifySignatureAppRouter } from "@upstash/qstash/nextjs";
import { instrumentConsumer } from "@kubiks/otel-upstash-queues";

async function handler(request: Request) {
  const data = await request.json();
  
  // Process your message
  console.log("Processing:", data);
  await processTask(data);
  
  return Response.json({ success: true });
}

// Instrument first, then verify signature
export const POST = verifySignatureAppRouter(instrumentConsumer(handler));

// Traced with:
// - qstash.message_id: from QStash header
// - qstash.retried: retry count
// - http.status_code: response status

Complete Integration Example

Here’s a complete example of QStash with OpenTelemetry in a Next.js application:

Setup

lib/qstash.ts
import { Client } from "@upstash/qstash";
import { instrumentUpstash } from "@kubiks/otel-upstash-queues";

export const client = instrumentUpstash(
  new Client({
    token: process.env.QSTASH_TOKEN!,
  }),
  {
    captureBody: process.env.NODE_ENV === "development",
    maxBodyLength: 2048,
  }
);

Publishing Messages

app/actions/tasks.ts
"use server";

import { client } from "@/lib/qstash";

export async function enqueueImageProcessing(imageId: string) {
  try {
    const result = await client.publishJSON({
      url: `${process.env.NEXT_PUBLIC_URL}/api/process/image`,
      body: { imageId },
      retries: 3,
      deduplicationId: `image-${imageId}`,
    });

    return { success: true, messageId: result.messageId };
  } catch (error) {
    console.error("Failed to enqueue task:", error);
    return { success: false, error: error.message };
  }
}

export async function scheduleReportGeneration(reportId: string, delay: string) {
  const result = await client.publishJSON({
    url: `${process.env.NEXT_PUBLIC_URL}/api/process/report`,
    body: { reportId },
    delay,
    callback: `${process.env.NEXT_PUBLIC_URL}/api/callbacks/report-success`,
    failureCallback: `${process.env.NEXT_PUBLIC_URL}/api/callbacks/report-failure`,
  });

  return { messageId: result.messageId };
}

Consuming Messages

app/api/process/image/route.ts
import { verifySignatureAppRouter } from "@upstash/qstash/nextjs";
import { instrumentConsumer } from "@kubiks/otel-upstash-queues";

async function handler(request: Request) {
  const { imageId } = await request.json();
  
  console.log(`Processing image: ${imageId}`);
  
  // Simulate image processing
  await processImage(imageId);
  
  return Response.json({ 
    success: true, 
    imageId,
    processedAt: new Date().toISOString() 
  });
}

export const POST = verifySignatureAppRouter(
  instrumentConsumer(handler, {
    captureBody: true,
    maxBodyLength: 1024,
  })
);

async function processImage(imageId: string) {
  // Your image processing logic
  await new Promise(resolve => setTimeout(resolve, 1000));
  console.log(`Image ${imageId} processed`);
}
app/api/process/report/route.ts
import { verifySignatureAppRouter } from "@upstash/qstash/nextjs";
import { instrumentConsumer } from "@kubiks/otel-upstash-queues";

async function handler(request: Request) {
  const { reportId } = await request.json();
  
  try {
    const report = await generateReport(reportId);
    return Response.json({ success: true, report });
  } catch (error) {
    console.error("Report generation failed:", error);
    return Response.json(
      { success: false, error: error.message },
      { status: 500 }
    );
  }
}

export const POST = verifySignatureAppRouter(instrumentConsumer(handler));

async function generateReport(reportId: string) {
  // Your report generation logic
  return { id: reportId, status: "completed" };
}

Callback Handlers

app/api/callbacks/report-success/route.ts
import { verifySignatureAppRouter } from "@upstash/qstash/nextjs";

async function handler(request: Request) {
  const data = await request.json();
  console.log("Report generated successfully:", data);
  
  // Update database, send notification, etc.
  
  return Response.json({ received: true });
}

export const POST = verifySignatureAppRouter(handler);
app/api/callbacks/report-failure/route.ts
import { verifySignatureAppRouter } from "@upstash/qstash/nextjs";

async function handler(request: Request) {
  const data = await request.json();
  console.error("Report generation failed:", data);
  
  // Log error, notify admin, etc.
  
  return Response.json({ received: true });
}

export const POST = verifySignatureAppRouter(handler);

Best Practices

Always verify QStash signatures to ensure messages are authentic:
import { verifySignatureAppRouter } from "@upstash/qstash/nextjs";

export const POST = verifySignatureAppRouter(
  instrumentConsumer(handler)
);
Use deduplication IDs to prevent duplicate processing:
await client.publishJSON({
  url: "https://api.example.com/process",
  body: { orderId },
  deduplicationId: `order-${orderId}`,
});
Set retry counts based on operation criticality:
// Critical operations: more retries
await client.publishJSON({
  url: "https://api.example.com/payment",
  body: { paymentId },
  retries: 5,
});

// Non-critical operations: fewer retries
await client.publishJSON({
  url: "https://api.example.com/analytics",
  body: { event },
  retries: 1,
});
Implement callbacks to track message processing:
await client.publishJSON({
  url: "https://api.example.com/process",
  body: { taskId },
  callback: "https://api.example.com/callbacks/success",
  failureCallback: "https://api.example.com/callbacks/failure",
});
Always handle errors in consumer handlers:
async function handler(request: Request) {
  try {
    const data = await request.json();
    await processTask(data);
    return Response.json({ success: true });
  } catch (error) {
    console.error("Processing failed:", error);
    return Response.json(
      { success: false, error: error.message },
      { status: 500 }
    );
  }
}

Advanced Patterns

async function enqueueBatch(tasks: Task[]) {
  const results = await Promise.allSettled(
    tasks.map(task =>
      client.publishJSON({
        url: `${process.env.API_URL}/process`,
        body: task,
        deduplicationId: `task-${task.id}`,
      })
    )
  );
  
  return results;
}
async function enqueueWithPriority(task: Task, priority: "high" | "normal") {
  await client.publishJSON({
    url: `${process.env.API_URL}/process`,
    headers: {
      "X-Priority": priority,
    },
    body: task,
    delay: priority === "high" ? 0 : 60, // High priority: immediate
  });
}
async function handler(request: Request) {
  const retryCount = parseInt(
    request.headers.get("Upstash-Retried") || "0"
  );
  
  // If max retries reached, send to DLQ
  if (retryCount >= 3) {
    await sendToDeadLetterQueue(await request.json());
    return Response.json({ handled: true });
  }
  
  // Normal processing
  await processTask(await request.json());
  return Response.json({ success: true });
}

Troubleshooting

Ensure OpenTelemetry is initialized before using QStash:
import { NodeSDK } from "@opentelemetry/sdk-node";

const sdk = new NodeSDK({
  // ... configuration
});

sdk.start();
Make sure environment variables are set correctly:
QSTASH_TOKEN=your_token
QSTASH_CURRENT_SIGNING_KEY=your_current_key
QSTASH_NEXT_SIGNING_KEY=your_next_key
Check that your endpoint is:
  • Publicly accessible
  • Returns 2xx status codes
  • Responds within timeout (default: 2 minutes)
  • Has signature verification enabled
Ensure instrumentConsumer is called before verifySignatureAppRouter:
// Correct order
export const POST = verifySignatureAppRouter(
  instrumentConsumer(handler)
);

// Wrong order
export const POST = instrumentConsumer(
  verifySignatureAppRouter(handler)
);

Resources

License

MIT