Upstash QStash
Integrate Upstash QStash with your application for serverless-first background task processing.
Upstash QStash is a serverless message queue and task scheduler designed specifically for serverless and edge environments. It uses HTTP endpoints instead of persistent connections, making it perfect for modern web applications.
Why QStash?
QStash is built for the serverless world - no infrastructure to manage, automatic scaling, and pay-per-use pricing. It delivers messages to your HTTP endpoints with built-in retries, delays, and scheduling capabilities.
Setup
Visit Upstash Console and create a free account. Create a new QStash project and note down your credentials.
Add your QStash credentials to your environment variables:
QSTASH_URL=https://qstash.upstash.io
QSTASH_TOKEN=your_qstash_token_here
QSTASH_CURRENT_SIGNING_KEY=your_current_signing_key_here
QSTASH_NEXT_SIGNING_KEY=your_next_signing_key_hereYou can find these values in your Upstash Console under the QStash project settings.
For production, make sure to add these environment variables to your deployment platform.
Create the QStash client
Create a utility file to initialize the QStash client:
import { Client } from "@upstash/qstash";
import { env } from "@/lib/env";
export const qstashClient = new Client({
baseUrl: env.QSTASH_URL,
token: env.QSTASH_TOKEN,
});Create task handlers
QStash delivers messages to HTTP endpoints, so you'll create API routes to handle your background tasks.
Let's create task handlers for common operations:
import { NextRequest, NextResponse } from "next/server";
import * as z from "zod";
import { verifySignatureAppRouter } from "@upstash/qstash/nextjs";
import { env } from "@/lib/env";
const processUserDataSchema = z.object({
userId: z.string(),
operation: z.enum(["export", "analyze", "cleanup"]),
});
async function handler(request: NextRequest) {
try {
const payload = processUserDataSchema.parse(await request.json());
const { userId, operation } = payload;
console.log("Starting user data processing", { userId, operation });
switch (operation) {
case "export":
// Simulate data export
await new Promise((resolve) => setTimeout(resolve, 2000));
console.log("User data exported successfully");
return NextResponse.json({
success: true,
result: "Data exported to CSV",
});
case "analyze":
// Simulate data analysis
await new Promise((resolve) => setTimeout(resolve, 5000));
console.log("User data analysis completed");
return NextResponse.json({
success: true,
result: { totalActions: 156, avgSessionTime: "4m 32s" },
});
case "cleanup":
// Simulate data cleanup
await new Promise((resolve) => setTimeout(resolve, 3000));
console.log("User data cleanup completed");
return NextResponse.json({
success: true,
result: "Removed 23 obsolete records",
});
default:
throw new Error(`Unknown operation: ${operation}`);
}
} catch (error) {
console.error("Task failed:", error);
return NextResponse.json({ error: "Task failed" }, { status: 500 });
}
}
export const POST = verifySignatureAppRouter(handler, {
currentSigningKey: env.QSTASH_CURRENT_SIGNING_KEY,
nextSigningKey: env.QSTASH_NEXT_SIGNING_KEY,
});import { NextRequest, NextResponse } from "next/server";
import { verifySignatureAppRouter } from "@upstash/qstash/nextjs";
import { env } from "@/lib/env";
async function handler(request: NextRequest) {
try {
console.log("Starting daily cleanup");
// Cleanup old logs
await new Promise((resolve) => setTimeout(resolve, 5000));
console.log("Logs cleaned up");
// Cleanup temporary files
await new Promise((resolve) => setTimeout(resolve, 3000));
console.log("Temp files cleaned up");
// Generate daily reports
await new Promise((resolve) => setTimeout(resolve, 8000));
console.log("Reports generated");
return NextResponse.json({
success: true,
cleanupTime: new Date().toISOString(),
itemsProcessed: 1247,
});
} catch (error) {
console.error("Daily cleanup failed:", error);
return NextResponse.json({ error: "Daily cleanup failed" }, { status: 500 });
}
}
export const POST = verifySignatureAppRouter(handler, {
currentSigningKey: env.QSTASH_CURRENT_SIGNING_KEY,
nextSigningKey: env.QSTASH_NEXT_SIGNING_KEY,
});Create a task service
Create a service to handle task triggering:
import { qstashClient } from "@/lib/qstash";
function getTaskUrl(taskName: string): string {
const baseUrl = process.env.NEXT_PUBLIC_APP_URL || "http://localhost:3000";
return `${baseUrl}/api/tasks/${taskName}`;
}
export class TaskService {
static async processUserData(
userId: string,
operation: "export" | "analyze" | "cleanup",
) {
return await qstashClient.publishJSON({
url: getTaskUrl("process-user-data"),
body: { userId, operation },
});
}
static async scheduleUserDataProcessing(
userId: string,
operation: "export" | "analyze" | "cleanup",
delaySeconds: number,
) {
return await qstashClient.publishJSON({
url: getTaskUrl("process-user-data"),
body: { userId, operation },
delay: `${delaySeconds}s`,
});
}
static async scheduleDailyCleanup() {
return await qstashClient.schedules.create({
destination: getTaskUrl("daily-cleanup"),
cron: "0 2 * * *", // Daily at 2 AM
});
}
}Create API endpoints for triggering
Create endpoints to trigger tasks from your application:
import { NextRequest, NextResponse } from "next/server";
import * as z from "zod";
import { getSession } from "@/lib/auth/server";
import { TaskService } from "@/lib/tasks/service";
const triggerUserDataSchema = z.object({
userId: z.string(),
operation: z.enum(["export", "analyze", "cleanup"]),
delaySeconds: z.number().optional(),
});
export async function POST(request: NextRequest) {
const session = await getSession();
if (!session) {
return NextResponse.json({ error: "Unauthorized" }, { status: 401 });
}
const body = await request.json();
const { userId, operation, delaySeconds } = triggerUserDataSchema.parse(body);
const result = delaySeconds
? await TaskService.scheduleUserDataProcessing(userId, operation, delaySeconds)
: await TaskService.processUserData(userId, operation);
return NextResponse.json({
success: true,
messageId: result.messageId,
message: delaySeconds
? `Task scheduled to run in ${delaySeconds} seconds`
: "Task queued for immediate processing",
});
}import { NextRequest, NextResponse } from "next/server";
import { getSession } from "@/lib/auth/server";
import { TaskService } from "@/lib/tasks/service";
export async function POST(request: NextRequest) {
const session = await getSession();
if (!session) {
return NextResponse.json({ error: "Unauthorized" }, { status: 401 });
}
const result = await TaskService.scheduleDailyCleanup();
return NextResponse.json({
success: true,
scheduleId: result.scheduleId,
message: "Daily cleanup scheduled",
});
}Using tasks in your application
From the client
"use client";
import { useMutation } from "@tanstack/react-query";
export function ProcessDataButton({ userId }: { userId: string }) {
const { mutate: processData, isPending } = useMutation({
mutationFn: async (operation: "export" | "analyze" | "cleanup") => {
const response = await fetch("/api/tasks/trigger/process-user-data", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
userId,
operation,
delaySeconds: 30, // Optional delay
}),
});
if (!response.ok) {
throw new Error("Failed to queue task");
}
return response.json();
},
onSuccess: (data) => {
console.log("Task queued:", data.messageId);
},
});
return (
<button
onClick={() => processData("analyze")}
disabled={isPending}
>
{isPending ? "Queueing..." : "Analyze User Data"}
</button>
);
}From a server action
"use server";
import { getSession } from "@/lib/auth/server";
import { TaskService } from "@/lib/tasks/service";
export async function processUserData(
userId: string,
operation: "export" | "analyze" | "cleanup",
) {
const session = await getSession();
if (!session) {
throw new Error("Unauthorized");
}
try {
const result = await TaskService.processUserData(userId, operation);
return {
success: true,
messageId: result.messageId,
};
} catch (error) {
console.error("Failed to queue background task:", error);
throw new Error("Failed to queue background task");
}
}Advanced features
Cron jobs & scheduling
QStash makes it easy to schedule recurring tasks:
import { qstashClient } from '@/lib/qstash';
function getTaskUrl(taskName: string): string {
const baseUrl = process.env.NEXT_PUBLIC_APP_URL || 'http://localhost:3000';
return `${baseUrl}/api/tasks/${taskName}`;
}
// Schedule a task to run every day at 2 AM
await qstashClient.schedules.create({
destination: getTaskUrl('daily-cleanup'),
cron: '0 2 * * *'
});
// Schedule a task to run every Monday at 9 AM
await qstashClient.schedules.create({
destination: getTaskUrl('weekly-report'),
cron: '0 9 * * 1'
});
// One-time delayed task
await qstashClient.publishJSON({
url: getTaskUrl('reminder'),
body: { userId: '123', type: 'follow-up' },
delay: '3d' // 3 days from now
});Topics (Fanout pattern)
Create topics to send messages to multiple endpoints:
import { qstashClient } from '@/lib/qstash';
function getTaskUrl(taskName: string): string {
const baseUrl = process.env.NEXT_PUBLIC_APP_URL || 'http://localhost:3000';
return `${baseUrl}/api/tasks/${taskName}`;
}
// Create a topic
await qstashClient.topics.upsert({
name: 'user-events',
endpoints: [
{ url: getTaskUrl('update-analytics') },
{ url: getTaskUrl('send-notification') },
{ url: getTaskUrl('update-crm') }
]
});
// Publish to topic - all endpoints will receive the message
await qstashClient.publishJSON({
topic: 'user-events',
body: {
userId: '123',
event: 'user-registered',
timestamp: new Date().toISOString()
}
});Queues (Sequential processing)
Create queues for ordered task processing:
import { qstashClient } from '@/lib/qstash';
function getTaskUrl(taskName: string): string {
const baseUrl = process.env.NEXT_PUBLIC_APP_URL || 'http://localhost:3000';
return `${baseUrl}/api/tasks/${taskName}`;
}
// Create a queue
const queue = qstashClient.queue({ queueName: 'user-onboarding' });
// Add tasks to queue (they'll run in order)
await queue.enqueueJSON({
url: getTaskUrl('send-welcome-email'),
body: { userId: '123' }
});
await queue.enqueueJSON({
url: getTaskUrl('setup-user-profile'),
body: { userId: '123' }
});
await queue.enqueueJSON({
url: getTaskUrl('trigger-onboarding-sequence'),
body: { userId: '123' }
});Monitoring and debugging
QStash Dashboard
Visit the Upstash Console to monitor your tasks:
- Message tracking: See all messages, their status, and delivery attempts
- Logs: View detailed logs for each message delivery
- Analytics: Monitor throughput, success rates, and error patterns
- Schedules: Manage and monitor your cron jobs
- Dead letter queue: Handle messages that failed after all retries
Local development
During development, you can:
-
Use ngrok for local testing:
Terminal# Install ngrok npm install -g ngrok # Expose your local server ngrok http 3000 # Use the ngrok URL in your QStash configuration -
Check message delivery in the Upstash Console
-
Use console.log in your task handlers for debugging
Best practices
Always verify signatures
Use the QStash signature verification to ensure messages are authentic:
// ✅ Good - Always verify QStash signatures
export const POST = verifySignatureAppRouter(handler, {
currentSigningKey: env.QSTASH_CURRENT_SIGNING_KEY,
nextSigningKey: env.QSTASH_NEXT_SIGNING_KEY
});
// ❌ Not secure - Accepting unverified requests
export async function POST(request: NextRequest) {
// No signature verification
}Handle errors gracefully
Return appropriate HTTP status codes so QStash knows whether to retry:
// ✅ Good - Clear error handling
try {
await processTask(payload);
return NextResponse.json({ success: true });
} catch (error) {
console.error('Task failed:', error);
// 5xx = QStash will retry, 4xx = won't retry
return NextResponse.json({ error: 'Task failed' }, { status: 500 });
}Use idempotent operations
Make your tasks safe to run multiple times in case of retries:
// ✅ Good - Check if work already done
const existingResult = await db.findProcessedResult(payload.id);
if (existingResult) {
return NextResponse.json({ success: true, result: existingResult });
}
// Proceed with processing...Set appropriate timeouts
Configure timeouts based on your expected processing time:
// For quick tasks
await qstashClient.publishJSON({
url: taskUrl,
body: payload,
timeout: '30s'
});
// For longer tasks
await qstashClient.publishJSON({
url: taskUrl,
body: payload,
timeout: '300s' // 5 minutes
});Use structured logging
Include relevant context in your logs:
console.log('Task started', {
taskType: 'process-user-data',
userId: payload.userId,
operation: payload.operation,
timestamp: new Date().toISOString()
});Next steps
With QStash integrated into your application, you can now:
- Process background tasks without worrying about serverless timeouts
- Schedule recurring operations with reliable cron job functionality
- Handle high-volume messaging with automatic retries and scaling
- Build complex workflows using topics, queues, and delays
Ready to explore more advanced features? Check out the official documentation for webhooks, batch operations, and advanced routing patterns.