> ## Documentation Index
> Fetch the complete documentation index at: https://docs.kubiks.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Upstash QStash

> OpenTelemetry instrumentation for Upstash QStash message queues

## Overview

`@kubiks/otel-upstash-queues` provides comprehensive OpenTelemetry instrumentation for [Upstash QStash](https://upstash.com/docs/qstash). Capture spans for both message publishing and consumption with detailed operation metadata and delivery tracking.

<Frame>
  <img src="https://mintcdn.com/kubiks-fb8cce26/aWqkpsvTx2hc17v8/images/otel/otel-upstash-queue-trace.png?fit=max&auto=format&n=aWqkpsvTx2hc17v8&q=85&s=a739fe877180da6e7492af99a957c95a" alt="Upstash QStash Trace Visualization" width="3379" height="2386" data-path="images/otel/otel-upstash-queue-trace.png" />
</Frame>

<Note>
  Visualize your message queue operations with detailed span information including message publishing, callbacks, and delivery tracking—from producer to consumer.
</Note>

## Installation

<CodeGroup>
  ```bash npm theme={null}
  npm install @kubiks/otel-upstash-queues
  ```

  ```bash pnpm theme={null}
  pnpm add @kubiks/otel-upstash-queues
  ```

  ```bash yarn theme={null}
  yarn add @kubiks/otel-upstash-queues
  ```
</CodeGroup>

<Warning>
  **Peer Dependencies:** `@opentelemetry/api` >= 1.9.0, `@upstash/qstash` >= 2.0.0
</Warning>

## Quick Start

### Publishing Messages

```typescript theme={null}
import { Client } from "@upstash/qstash";
import { instrumentUpstash } from "@kubiks/otel-upstash-queues";

const client = instrumentUpstash(
  new Client({ token: process.env.QSTASH_TOKEN! })
);

await client.publishJSON({
  url: "https://your-api-endpoint.com/process-image",
  body: { imageId: "123" },
});
```

<Tip>
  `instrumentUpstash` wraps the QStash client instance you already use—no configuration changes needed. Every SDK call creates a client span with useful attributes.
</Tip>

### Consuming Messages

```typescript theme={null}
// app/api/process/route.ts
import { verifySignatureAppRouter } from "@upstash/qstash/nextjs";
import { instrumentConsumer } from "@kubiks/otel-upstash-queues";

async function handler(request: Request) {
  const data = await request.json();
  
  // Process your message
  await processImage(data.imageId);
  
  return Response.json({ success: true });
}

// Instrument first, then verify signature
export const POST = verifySignatureAppRouter(instrumentConsumer(handler));
```

<Tip>
  `instrumentConsumer` wraps your message handler to trace message consumption, creating a SERVER span for each message received and processed.
</Tip>

## What Gets Traced

This instrumentation provides two main functions:

<CardGroup cols={2}>
  <Card title="instrumentUpstash" icon="paper-plane">
    Wraps the QStash client to trace **message publishing** with `SpanKind.CLIENT`
  </Card>

  <Card title="instrumentConsumer" icon="inbox">
    Wraps your message handler to trace **message consumption** with `SpanKind.SERVER`
  </Card>
</CardGroup>

## Configuration

### With Body Capture

Optionally capture request/response bodies for debugging:

```typescript theme={null}
const client = instrumentUpstash(
  new Client({ token: process.env.QSTASH_TOKEN! }),
  {
    captureBody: true,      // Enable body capture (default: false)
    maxBodyLength: 2048,    // Max characters to capture (default: 1024)
  }
);
```

<Warning>
  Body capture is **disabled by default** to protect sensitive data. Only enable in secure, development environments.
</Warning>

## Span Attributes

### Publisher Spans (`instrumentUpstash`)

| Attribute                     | Description                                 | Example                           |
| ----------------------------- | ------------------------------------------- | --------------------------------- |
| `messaging.system`            | Constant value `qstash`                     | `qstash`                          |
| `messaging.operation`         | Operation type                              | `publish`                         |
| `qstash.resource`             | Resource name                               | `messages`                        |
| `qstash.target`               | Full operation target                       | `messages.publish`                |
| `qstash.url`                  | Target URL for the message                  | `https://example.com/api/process` |
| `qstash.method`               | HTTP method (default: POST)                 | `POST`, `PUT`, `GET`              |
| `qstash.message_id`           | Message ID returned by QStash               | `msg_123`                         |
| `qstash.delay`                | Delay before processing (seconds or string) | `60` or `"1h"`                    |
| `qstash.not_before`           | Unix timestamp for earliest processing      | `1672531200`                      |
| `qstash.deduplication_id`     | Deduplication ID for idempotent operations  | `unique-id-123`                   |
| `qstash.retries`              | Number of retry attempts (max)              | `3`                               |
| `qstash.callback_url`         | Success callback URL                        | `https://example.com/callback`    |
| `qstash.failure_callback_url` | Failure callback URL                        | `https://example.com/failure`     |

### Consumer Spans (`instrumentConsumer`)

| Attribute             | Description                             | Example            |
| --------------------- | --------------------------------------- | ------------------ |
| `messaging.system`    | Constant value `qstash`                 | `qstash`           |
| `messaging.operation` | Operation type                          | `receive`          |
| `qstash.resource`     | Resource name                           | `messages`         |
| `qstash.target`       | Full operation target                   | `messages.receive` |
| `qstash.message_id`   | Message ID from QStash                  | `msg_456`          |
| `qstash.retried`      | Number of times retried (actual count)  | `2`                |
| `qstash.schedule_id`  | Schedule ID (if from scheduled message) | `schedule_123`     |
| `qstash.caller_ip`    | IP address of the caller                | `192.168.1.1`      |
| `http.status_code`    | HTTP response status code               | `200`              |

### Body/Payload Attributes (Optional)

When `captureBody` is enabled:

| Attribute              | Description                  | Captured By                 |
| ---------------------- | ---------------------------- | --------------------------- |
| `qstash.request.body`  | Request/message body content | Both publisher and consumer |
| `qstash.response.body` | Response body content        | Consumer only               |

## Usage Examples

### Basic Message Publishing

<CodeGroup>
  ```typescript Simple Message theme={null}
  import { client } from "@/lib/qstash";

  await client.publishJSON({
    url: "https://your-api.com/webhook",
    body: {
      userId: "user_123",
      action: "process_data",
    },
  });

  // Traced with:
  // - qstash.url: "https://your-api.com/webhook"
  // - qstash.method: "POST"
  // - qstash.message_id: "msg_..."
  ```

  ```typescript Custom Method theme={null}
  await client.publishJSON({
    url: "https://your-api.com/update",
    method: "PUT",
    body: { status: "completed" },
  });

  // Traced with:
  // - qstash.method: "PUT"
  ```

  ```typescript With Headers theme={null}
  await client.publishJSON({
    url: "https://your-api.com/process",
    headers: {
      "X-User-ID": "user_123",
      "X-Priority": "high",
    },
    body: { taskId: "task_456" },
  });
  ```
</CodeGroup>

### Delayed Message Publishing

<CodeGroup>
  ```typescript Delay in Seconds theme={null}
  // Delay message processing by 60 seconds
  await client.publishJSON({
    url: "https://your-api.com/delayed-task",
    body: { taskId: "task_456" },
    delay: 60,
  });

  // Traced with:
  // - qstash.delay: 60
  ```

  ```typescript Human-Readable Delay theme={null}
  // Use human-readable delay format
  await client.publishJSON({
    url: "https://your-api.com/delayed-task",
    body: { taskId: "task_789" },
    delay: "1h", // 1 hour
  });

  // Traced with:
  // - qstash.delay: "1h"

  // Other examples:
  // - "30s" (30 seconds)
  // - "5m" (5 minutes)
  // - "2h" (2 hours)
  // - "1d" (1 day)
  ```

  ```typescript Scheduled Time theme={null}
  // Schedule for a specific time
  const scheduledTime = Math.floor(Date.now() / 1000) + 3600; // 1 hour from now

  await client.publishJSON({
    url: "https://your-api.com/scheduled-task",
    body: { reportId: "report_456" },
    notBefore: scheduledTime,
  });

  // Traced with:
  // - qstash.not_before: 1672531200
  ```
</CodeGroup>

### Message with Callbacks

<CodeGroup>
  ```typescript Success Callback theme={null}
  await client.publishJSON({
    url: "https://your-api.com/process",
    body: { orderId: "order_123" },
    callback: "https://your-api.com/success",
  });

  // Traced with:
  // - qstash.callback_url: "https://your-api.com/success"
  ```

  ```typescript Success and Failure Callbacks theme={null}
  await client.publishJSON({
    url: "https://your-api.com/process",
    body: { orderId: "order_456" },
    callback: "https://your-api.com/callbacks/success",
    failureCallback: "https://your-api.com/callbacks/failure",
  });

  // Traced with:
  // - qstash.callback_url: "https://your-api.com/callbacks/success"
  // - qstash.failure_callback_url: "https://your-api.com/callbacks/failure"
  ```
</CodeGroup>

### Retries and Deduplication

<CodeGroup>
  ```typescript With Retries theme={null}
  await client.publishJSON({
    url: "https://your-api.com/critical-task",
    body: { taskId: "critical_123" },
    retries: 5,
  });

  // Traced with:
  // - qstash.retries: 5
  ```

  ```typescript With Deduplication theme={null}
  // Prevent duplicate processing
  await client.publishJSON({
    url: "https://your-api.com/process",
    body: { orderId: "order_789" },
    deduplicationId: `order-${orderId}`,
  });

  // Traced with:
  // - qstash.deduplication_id: "order-order_789"
  ```

  ```typescript Combined theme={null}
  await client.publishJSON({
    url: "https://your-api.com/critical",
    body: { transactionId: "tx_123" },
    retries: 3,
    deduplicationId: `tx-${transactionId}`,
  });
  ```
</CodeGroup>

### Message Consumer

<CodeGroup>
  ```typescript Basic Consumer theme={null}
  // app/api/process/route.ts
  import { verifySignatureAppRouter } from "@upstash/qstash/nextjs";
  import { instrumentConsumer } from "@kubiks/otel-upstash-queues";

  async function handler(request: Request) {
    const data = await request.json();
    
    // Process your message
    console.log("Processing:", data);
    await processTask(data);
    
    return Response.json({ success: true });
  }

  // Instrument first, then verify signature
  export const POST = verifySignatureAppRouter(instrumentConsumer(handler));

  // Traced with:
  // - qstash.message_id: from QStash header
  // - qstash.retried: retry count
  // - http.status_code: response status
  ```

  ```typescript With Body Capture theme={null}
  export const POST = verifySignatureAppRouter(
    instrumentConsumer(handler, {
      captureBody: true,
      maxBodyLength: 2048,
    })
  );

  // Additionally captures:
  // - qstash.request.body: message payload
  // - qstash.response.body: handler response
  ```

  ```typescript With Error Handling theme={null}
  async function handler(request: Request) {
    try {
      const data = await request.json();
      await processTask(data);
      return Response.json({ success: true });
    } catch (error) {
      console.error("Processing failed:", error);
      // Error is captured in span
      return Response.json(
        { success: false, error: error.message },
        { status: 500 }
      );
    }
  }

  export const POST = verifySignatureAppRouter(instrumentConsumer(handler));
  ```
</CodeGroup>

## Complete Integration Example

Here's a complete example of QStash with OpenTelemetry in a Next.js application:

### Setup

```typescript lib/qstash.ts theme={null}
import { Client } from "@upstash/qstash";
import { instrumentUpstash } from "@kubiks/otel-upstash-queues";

export const client = instrumentUpstash(
  new Client({
    token: process.env.QSTASH_TOKEN!,
  }),
  {
    captureBody: process.env.NODE_ENV === "development",
    maxBodyLength: 2048,
  }
);
```

### Publishing Messages

```typescript app/actions/tasks.ts theme={null}
"use server";

import { client } from "@/lib/qstash";

export async function enqueueImageProcessing(imageId: string) {
  try {
    const result = await client.publishJSON({
      url: `${process.env.NEXT_PUBLIC_URL}/api/process/image`,
      body: { imageId },
      retries: 3,
      deduplicationId: `image-${imageId}`,
    });

    return { success: true, messageId: result.messageId };
  } catch (error) {
    console.error("Failed to enqueue task:", error);
    return { success: false, error: error.message };
  }
}

export async function scheduleReportGeneration(reportId: string, delay: string) {
  const result = await client.publishJSON({
    url: `${process.env.NEXT_PUBLIC_URL}/api/process/report`,
    body: { reportId },
    delay,
    callback: `${process.env.NEXT_PUBLIC_URL}/api/callbacks/report-success`,
    failureCallback: `${process.env.NEXT_PUBLIC_URL}/api/callbacks/report-failure`,
  });

  return { messageId: result.messageId };
}
```

### Consuming Messages

```typescript app/api/process/image/route.ts theme={null}
import { verifySignatureAppRouter } from "@upstash/qstash/nextjs";
import { instrumentConsumer } from "@kubiks/otel-upstash-queues";

async function handler(request: Request) {
  const { imageId } = await request.json();
  
  console.log(`Processing image: ${imageId}`);
  
  // Simulate image processing
  await processImage(imageId);
  
  return Response.json({ 
    success: true, 
    imageId,
    processedAt: new Date().toISOString() 
  });
}

export const POST = verifySignatureAppRouter(
  instrumentConsumer(handler, {
    captureBody: true,
    maxBodyLength: 1024,
  })
);

async function processImage(imageId: string) {
  // Your image processing logic
  await new Promise(resolve => setTimeout(resolve, 1000));
  console.log(`Image ${imageId} processed`);
}
```

```typescript app/api/process/report/route.ts theme={null}
import { verifySignatureAppRouter } from "@upstash/qstash/nextjs";
import { instrumentConsumer } from "@kubiks/otel-upstash-queues";

async function handler(request: Request) {
  const { reportId } = await request.json();
  
  try {
    const report = await generateReport(reportId);
    return Response.json({ success: true, report });
  } catch (error) {
    console.error("Report generation failed:", error);
    return Response.json(
      { success: false, error: error.message },
      { status: 500 }
    );
  }
}

export const POST = verifySignatureAppRouter(instrumentConsumer(handler));

async function generateReport(reportId: string) {
  // Your report generation logic
  return { id: reportId, status: "completed" };
}
```

### Callback Handlers

```typescript app/api/callbacks/report-success/route.ts theme={null}
import { verifySignatureAppRouter } from "@upstash/qstash/nextjs";

async function handler(request: Request) {
  const data = await request.json();
  console.log("Report generated successfully:", data);
  
  // Update database, send notification, etc.
  
  return Response.json({ received: true });
}

export const POST = verifySignatureAppRouter(handler);
```

```typescript app/api/callbacks/report-failure/route.ts theme={null}
import { verifySignatureAppRouter } from "@upstash/qstash/nextjs";

async function handler(request: Request) {
  const data = await request.json();
  console.error("Report generation failed:", data);
  
  // Log error, notify admin, etc.
  
  return Response.json({ received: true });
}

export const POST = verifySignatureAppRouter(handler);
```

## Best Practices

<AccordionGroup>
  <Accordion title="Always Use Signature Verification">
    Always verify QStash signatures to ensure messages are authentic:

    ```typescript theme={null}
    import { verifySignatureAppRouter } from "@upstash/qstash/nextjs";

    export const POST = verifySignatureAppRouter(
      instrumentConsumer(handler)
    );
    ```
  </Accordion>

  <Accordion title="Use Deduplication for Idempotency">
    Use deduplication IDs to prevent duplicate processing:

    ```typescript theme={null}
    await client.publishJSON({
      url: "https://api.example.com/process",
      body: { orderId },
      deduplicationId: `order-${orderId}`,
    });
    ```
  </Accordion>

  <Accordion title="Configure Appropriate Retries">
    Set retry counts based on operation criticality:

    ```typescript theme={null}
    // Critical operations: more retries
    await client.publishJSON({
      url: "https://api.example.com/payment",
      body: { paymentId },
      retries: 5,
    });

    // Non-critical operations: fewer retries
    await client.publishJSON({
      url: "https://api.example.com/analytics",
      body: { event },
      retries: 1,
    });
    ```
  </Accordion>

  <Accordion title="Use Callbacks for Monitoring">
    Implement callbacks to track message processing:

    ```typescript theme={null}
    await client.publishJSON({
      url: "https://api.example.com/process",
      body: { taskId },
      callback: "https://api.example.com/callbacks/success",
      failureCallback: "https://api.example.com/callbacks/failure",
    });
    ```
  </Accordion>

  <Accordion title="Handle Errors Gracefully">
    Always handle errors in consumer handlers:

    ```typescript theme={null}
    async function handler(request: Request) {
      try {
        const data = await request.json();
        await processTask(data);
        return Response.json({ success: true });
      } catch (error) {
        console.error("Processing failed:", error);
        return Response.json(
          { success: false, error: error.message },
          { status: 500 }
        );
      }
    }
    ```
  </Accordion>
</AccordionGroup>

## Advanced Patterns

<AccordionGroup>
  <Accordion title="Batch Processing">
    ```typescript theme={null}
    async function enqueueBatch(tasks: Task[]) {
      const results = await Promise.allSettled(
        tasks.map(task =>
          client.publishJSON({
            url: `${process.env.API_URL}/process`,
            body: task,
            deduplicationId: `task-${task.id}`,
          })
        )
      );
      
      return results;
    }
    ```
  </Accordion>

  <Accordion title="Priority Queues">
    ```typescript theme={null}
    async function enqueueWithPriority(task: Task, priority: "high" | "normal") {
      await client.publishJSON({
        url: `${process.env.API_URL}/process`,
        headers: {
          "X-Priority": priority,
        },
        body: task,
        delay: priority === "high" ? 0 : 60, // High priority: immediate
      });
    }
    ```
  </Accordion>

  <Accordion title="Dead Letter Queue">
    ```typescript theme={null}
    async function handler(request: Request) {
      const retryCount = parseInt(
        request.headers.get("Upstash-Retried") || "0"
      );
      
      // If max retries reached, send to DLQ
      if (retryCount >= 3) {
        await sendToDeadLetterQueue(await request.json());
        return Response.json({ handled: true });
      }
      
      // Normal processing
      await processTask(await request.json());
      return Response.json({ success: true });
    }
    ```
  </Accordion>
</AccordionGroup>

## Troubleshooting

<AccordionGroup>
  <Accordion title="Spans Not Appearing">
    Ensure OpenTelemetry is initialized before using QStash:

    ```typescript theme={null}
    import { NodeSDK } from "@opentelemetry/sdk-node";

    const sdk = new NodeSDK({
      // ... configuration
    });

    sdk.start();
    ```
  </Accordion>

  <Accordion title="Signature Verification Failing">
    Make sure environment variables are set correctly:

    ```bash theme={null}
    QSTASH_TOKEN=your_token
    QSTASH_CURRENT_SIGNING_KEY=your_current_key
    QSTASH_NEXT_SIGNING_KEY=your_next_key
    ```
  </Accordion>

  <Accordion title="Messages Not Being Processed">
    Check that your endpoint is:

    * Publicly accessible
    * Returns 2xx status codes
    * Responds within timeout (default: 2 minutes)
    * Has signature verification enabled
  </Accordion>

  <Accordion title="Missing Consumer Spans">
    Ensure `instrumentConsumer` is called before `verifySignatureAppRouter`:

    ```typescript theme={null}
    // Correct order
    export const POST = verifySignatureAppRouter(
      instrumentConsumer(handler)
    );

    // Wrong order
    export const POST = instrumentConsumer(
      verifySignatureAppRouter(handler)
    );
    ```
  </Accordion>
</AccordionGroup>

## Resources

<CardGroup cols={2}>
  <Card title="QStash Documentation" icon="book" href="https://upstash.com/docs/qstash">
    Learn more about Upstash QStash
  </Card>

  <Card title="GitHub Repository" icon="github" href="https://github.com/kubiks-inc/otel/tree/main/packages/otel-upstash-queues">
    View source code and examples
  </Card>

  <Card title="npm Package" icon="box" href="https://www.npmjs.com/package/@kubiks/otel-upstash-queues">
    View package on npm
  </Card>

  <Card title="Report Issues" icon="circle-exclamation" href="https://github.com/kubiks-inc/otel/issues">
    Found a bug? Let us know!
  </Card>
</CardGroup>

## License

MIT
