Overview @kubiks/otel-upstash-queues provides comprehensive OpenTelemetry instrumentation for Upstash QStash . Capture spans for both message publishing and consumption with detailed operation metadata and delivery tracking.Visualize your message queue operations with detailed span information including message publishing, callbacks, and delivery tracking—from producer to consumer. 
Installation npm  install  @kubiks/otel-upstash-queues 
Peer Dependencies:  @opentelemetry/api >= 1.9.0, @upstash/qstash >= 2.0.0
Quick Start Publishing Messages import  {  Client  }  from  "@upstash/qstash" ; import  {  instrumentUpstash  }  from  "@kubiks/otel-upstash-queues" ; const  client  =  instrumentUpstash (   new  Client ({  token:  process . env . QSTASH_TOKEN !  }) ); await  client . publishJSON ({   url:  "https://your-api-endpoint.com/process-image" ,   body:  {  imageId:  "123"  }, }); instrumentUpstash wraps the QStash client instance you already use—no configuration changes needed. Every SDK call creates a client span with useful attributes.
Consuming Messages // app/api/process/route.ts import  {  verifySignatureAppRouter  }  from  "@upstash/qstash/nextjs" ; import  {  instrumentConsumer  }  from  "@kubiks/otel-upstash-queues" ; async  function  handler ( request :  Request ) {   const  data  =  await  request . json ();      // Process your message   await  processImage ( data . imageId );      return  Response . json ({  success:  true  }); } // Instrument first, then verify signature export  const  POST  =  verifySignatureAppRouter ( instrumentConsumer ( handler )); instrumentConsumer wraps your message handler to trace message consumption, creating a SERVER span for each message received and processed.
What Gets Traced This instrumentation provides two main functions: 
instrumentUpstash Wraps the QStash client to trace message publishing  with SpanKind.CLIENT 
instrumentConsumer Wraps your message handler to trace message consumption  with SpanKind.SERVER 
Configuration With Body Capture Optionally capture request/response bodies for debugging: 
const  client  =  instrumentUpstash (   new  Client ({  token:  process . env . QSTASH_TOKEN !  }),   {     captureBody:  true ,       // Enable body capture (default: false)     maxBodyLength:  2048 ,     // Max characters to capture (default: 1024)   } ); Body capture is disabled by default  to protect sensitive data. Only enable in secure, development environments. 
Span Attributes Publisher Spans (instrumentUpstash) Attribute Description Example messaging.systemConstant value qstash qstashmessaging.operationOperation type publishqstash.resourceResource name messagesqstash.targetFull operation target messages.publishqstash.urlTarget URL for the message https://example.com/api/processqstash.methodHTTP method (default: POST) POST, PUT, GETqstash.message_idMessage ID returned by QStash msg_123qstash.delayDelay before processing (seconds or string) 60 or "1h"qstash.not_beforeUnix timestamp for earliest processing 1672531200qstash.deduplication_idDeduplication ID for idempotent operations unique-id-123qstash.retriesNumber of retry attempts (max) 3qstash.callback_urlSuccess callback URL https://example.com/callbackqstash.failure_callback_urlFailure callback URL https://example.com/failure
Consumer Spans (instrumentConsumer) Attribute Description Example messaging.systemConstant value qstash qstashmessaging.operationOperation type receiveqstash.resourceResource name messagesqstash.targetFull operation target messages.receiveqstash.message_idMessage ID from QStash msg_456qstash.retriedNumber of times retried (actual count) 2qstash.schedule_idSchedule ID (if from scheduled message) schedule_123qstash.caller_ipIP address of the caller 192.168.1.1http.status_codeHTTP response status code 200
Body/Payload Attributes (Optional) When captureBody is enabled: 
Attribute Description Captured By qstash.request.bodyRequest/message body content Both publisher and consumer qstash.response.bodyResponse body content Consumer only 
Usage Examples Basic Message Publishing Simple Message
Custom Method
With Headers
import  {  client  }  from  "@/lib/qstash" ; await  client . publishJSON ({   url:  "https://your-api.com/webhook" ,   body:  {     userId:  "user_123" ,     action:  "process_data" ,   }, }); // Traced with: // - qstash.url: "https://your-api.com/webhook" // - qstash.method: "POST" // - qstash.message_id: "msg_..." 
Delayed Message Publishing Delay in Seconds
Human-Readable Delay
Scheduled Time
// Delay message processing by 60 seconds await  client . publishJSON ({   url:  "https://your-api.com/delayed-task" ,   body:  {  taskId:  "task_456"  },   delay:  60 , }); // Traced with: // - qstash.delay: 60 
Message with Callbacks Success Callback
Success and Failure Callbacks
await  client . publishJSON ({   url:  "https://your-api.com/process" ,   body:  {  orderId:  "order_123"  },   callback:  "https://your-api.com/success" , }); // Traced with: // - qstash.callback_url: "https://your-api.com/success" 
Retries and Deduplication With Retries
With Deduplication
Combined
await  client . publishJSON ({   url:  "https://your-api.com/critical-task" ,   body:  {  taskId:  "critical_123"  },   retries:  5 , }); // Traced with: // - qstash.retries: 5 
Message Consumer Basic Consumer
With Body Capture
With Error Handling
// app/api/process/route.ts import  {  verifySignatureAppRouter  }  from  "@upstash/qstash/nextjs" ; import  {  instrumentConsumer  }  from  "@kubiks/otel-upstash-queues" ; async  function  handler ( request :  Request ) {   const  data  =  await  request . json ();      // Process your message   console . log ( "Processing:" ,  data );   await  processTask ( data );      return  Response . json ({  success:  true  }); } // Instrument first, then verify signature export  const  POST  =  verifySignatureAppRouter ( instrumentConsumer ( handler )); // Traced with: // - qstash.message_id: from QStash header // - qstash.retried: retry count // - http.status_code: response status 
Complete Integration Example Here’s a complete example of QStash with OpenTelemetry in a Next.js application: 
Setup import  {  Client  }  from  "@upstash/qstash" ; import  {  instrumentUpstash  }  from  "@kubiks/otel-upstash-queues" ; export  const  client  =  instrumentUpstash (   new  Client ({     token:  process . env . QSTASH_TOKEN ! ,   }),   {     captureBody:  process . env . NODE_ENV  ===  "development" ,     maxBodyLength:  2048 ,   } ); Publishing Messages "use server" ; import  {  client  }  from  "@/lib/qstash" ; export  async  function  enqueueImageProcessing ( imageId :  string ) {   try  {     const  result  =  await  client . publishJSON ({       url:  ` ${ process . env . NEXT_PUBLIC_URL } /api/process/image` ,       body:  {  imageId  },       retries:  3 ,       deduplicationId:  `image- ${ imageId } ` ,     });     return  {  success:  true ,  messageId:  result . messageId  };   }  catch  ( error ) {     console . error ( "Failed to enqueue task:" ,  error );     return  {  success:  false ,  error:  error . message  };   } } export  async  function  scheduleReportGeneration ( reportId :  string ,  delay :  string ) {   const  result  =  await  client . publishJSON ({     url:  ` ${ process . env . NEXT_PUBLIC_URL } /api/process/report` ,     body:  {  reportId  },     delay ,     callback:  ` ${ process . env . NEXT_PUBLIC_URL } /api/callbacks/report-success` ,     failureCallback:  ` ${ process . env . NEXT_PUBLIC_URL } /api/callbacks/report-failure` ,   });   return  {  messageId:  result . messageId  }; } Consuming Messages app/api/process/image/route.ts
import  {  verifySignatureAppRouter  }  from  "@upstash/qstash/nextjs" ; import  {  instrumentConsumer  }  from  "@kubiks/otel-upstash-queues" ; async  function  handler ( request :  Request ) {   const  {  imageId  }  =  await  request . json ();      console . log ( `Processing image:  ${ imageId } ` );      // Simulate image processing   await  processImage ( imageId );      return  Response . json ({      success:  true ,      imageId ,     processedAt:  new  Date (). toISOString ()    }); } export  const  POST  =  verifySignatureAppRouter (   instrumentConsumer ( handler , {     captureBody:  true ,     maxBodyLength:  1024 ,   }) ); async  function  processImage ( imageId :  string ) {   // Your image processing logic   await  new  Promise ( resolve  =>  setTimeout ( resolve ,  1000 ));   console . log ( `Image  ${ imageId }  processed` ); } app/api/process/report/route.ts
import  {  verifySignatureAppRouter  }  from  "@upstash/qstash/nextjs" ; import  {  instrumentConsumer  }  from  "@kubiks/otel-upstash-queues" ; async  function  handler ( request :  Request ) {   const  {  reportId  }  =  await  request . json ();      try  {     const  report  =  await  generateReport ( reportId );     return  Response . json ({  success:  true ,  report  });   }  catch  ( error ) {     console . error ( "Report generation failed:" ,  error );     return  Response . json (       {  success:  false ,  error:  error . message  },       {  status:  500  }     );   } } export  const  POST  =  verifySignatureAppRouter ( instrumentConsumer ( handler )); async  function  generateReport ( reportId :  string ) {   // Your report generation logic   return  {  id:  reportId ,  status:  "completed"  }; } Callback Handlers app/api/callbacks/report-success/route.ts
import  {  verifySignatureAppRouter  }  from  "@upstash/qstash/nextjs" ; async  function  handler ( request :  Request ) {   const  data  =  await  request . json ();   console . log ( "Report generated successfully:" ,  data );      // Update database, send notification, etc.      return  Response . json ({  received:  true  }); } export  const  POST  =  verifySignatureAppRouter ( handler ); app/api/callbacks/report-failure/route.ts
import  {  verifySignatureAppRouter  }  from  "@upstash/qstash/nextjs" ; async  function  handler ( request :  Request ) {   const  data  =  await  request . json ();   console . error ( "Report generation failed:" ,  data );      // Log error, notify admin, etc.      return  Response . json ({  received:  true  }); } export  const  POST  =  verifySignatureAppRouter ( handler ); Best Practices 
Always Use Signature Verification
Always verify QStash signatures to ensure messages are authentic: import  {  verifySignatureAppRouter  }  from  "@upstash/qstash/nextjs" ; export  const  POST  =  verifySignatureAppRouter (   instrumentConsumer ( handler ) ); 
Use Deduplication for Idempotency
Use deduplication IDs to prevent duplicate processing: await  client . publishJSON ({   url:  "https://api.example.com/process" ,   body:  {  orderId  },   deduplicationId:  `order- ${ orderId } ` , }); 
Configure Appropriate Retries
Use Callbacks for Monitoring
Implement callbacks to track message processing: await  client . publishJSON ({   url:  "https://api.example.com/process" ,   body:  {  taskId  },   callback:  "https://api.example.com/callbacks/success" ,   failureCallback:  "https://api.example.com/callbacks/failure" , }); 
Always handle errors in consumer handlers: async  function  handler ( request :  Request ) {   try  {     const  data  =  await  request . json ();     await  processTask ( data );     return  Response . json ({  success:  true  });   }  catch  ( error ) {     console . error ( "Processing failed:" ,  error );     return  Response . json (       {  success:  false ,  error:  error . message  },       {  status:  500  }     );   } } Advanced Patterns 
async  function  enqueueBatch ( tasks :  Task []) {   const  results  =  await  Promise . allSettled (     tasks . map ( task  =>       client . publishJSON ({         url:  ` ${ process . env . API_URL } /process` ,         body:  task ,         deduplicationId:  `task- ${ task . id } ` ,       })     )   );      return  results ; } 
async  function  enqueueWithPriority ( task :  Task ,  priority :  "high"  |  "normal" ) {   await  client . publishJSON ({     url:  ` ${ process . env . API_URL } /process` ,     headers:  {       "X-Priority" :  priority ,     },     body:  task ,     delay:  priority  ===  "high"  ?  0  :  60 ,  // High priority: immediate   }); } 
async  function  handler ( request :  Request ) {   const  retryCount  =  parseInt (     request . headers . get ( "Upstash-Retried" )  ||  "0"   );      // If max retries reached, send to DLQ   if  ( retryCount  >=  3 ) {     await  sendToDeadLetterQueue ( await  request . json ());     return  Response . json ({  handled:  true  });   }      // Normal processing   await  processTask ( await  request . json ());   return  Response . json ({  success:  true  }); } Troubleshooting 
Ensure OpenTelemetry is initialized before using QStash: import  {  NodeSDK  }  from  "@opentelemetry/sdk-node" ; const  sdk  =  new  NodeSDK ({   // ... configuration }); sdk . start (); 
Signature Verification Failing
Make sure environment variables are set correctly: QSTASH_TOKEN = your_token QSTASH_CURRENT_SIGNING_KEY = your_current_key QSTASH_NEXT_SIGNING_KEY = your_next_key 
Messages Not Being Processed
Check that your endpoint is: 
Publicly accessible 
Returns 2xx status codes 
Responds within timeout (default: 2 minutes) 
Has signature verification enabled 
 
Ensure instrumentConsumer is called before verifySignatureAppRouter: // Correct order export  const  POST  =  verifySignatureAppRouter (   instrumentConsumer ( handler ) ); // Wrong order export  const  POST  =  instrumentConsumer (   verifySignatureAppRouter ( handler ) ); Resources License MIT