「API レスポンスは即返したいけど、その裏で 5 秒かかる処理を流したい」 — Webhook 受信、画像変換、メール送信、Embedding 生成。Cloudflare Workers でこれを真面目に組むと、waitUntil() は 30 秒で打ち切られるし、CPU time の上限にも引っかかります。Cloudflare Queues は、この「リクエスト裏で非同期に重い仕事を流す」用途にぴったりはまります。
本記事では、Webhook を受けて Queue に投げ、Consumer 側で順次処理する最小パイプラインを、Producer / Consumer / Dead Letter Queue / リトライの 4 点込みで組み立てます。
到達点と前提
作るもの: 外部 Webhook を受けて、本体処理 (3 秒程度の I/O) を非同期に流すパイプライン。失敗したメッセージは DLQ に逃がし、後追いで救済できる構成です。
検証バージョン (2026-04 時点):
- Wrangler v4.84 / Workers Queues
- 1 producer Worker (
webhook-receiver) + 1 consumer Worker (webhook-processor) - DLQ: もう 1 つの Queue (
webhook-dlq)
External ─▶ webhook-receiver ─▶ Queue (webhook-events) ─▶ webhook-processor
│ retry x N
▼
DLQ (webhook-dlq)
なぜ Queues なのか
Workers 単体で「重い処理」を逃がす方法は複数ありますが、Queues は次の点で他より素直です。
- At-least-once 配送 + 自動リトライ + バッチ受信が 1 サービスでまとまっている
- Producer と Consumer を別 Worker に分けられるので、処理本体の不調が API のレイテンシに波及しない
- Workers から無料枠で叩ける (有料プランの中に含まれる)。SQS / Kafka を別途立てる必要なし
- Dead Letter Queue がネイティブ。リトライ上限を超えたメッセージは別 Queue に勝手に逃げる
「Durable Objects で頑張って queue を実装する」前に、まず Queues を試す価値があります。
wrangler.jsonc — Producer 側
Producer は Queue に書き込むバインディング (queues.producers) だけ持ちます。
// apps/webhook-receiver/wrangler.jsonc
{
"$schema": "node_modules/wrangler/config-schema.json",
"name": "webhook-receiver",
"main": "src/index.ts",
"compatibility_date": "2026-04-01",
"queues": {
"producers": [
{
"binding": "EVENTS",
"queue": "webhook-events"
}
]
},
"observability": { "enabled": true }
}
// apps/webhook-receiver/src/index.ts
import { Hono } from "hono";
type Bindings = {
EVENTS: Queue<WebhookEvent>;
};
type WebhookEvent = {
type: "stripe.payment.succeeded" | "stripe.invoice.paid";
payload: unknown;
receivedAt: string;
};
const app = new Hono<{ Bindings: Bindings }>();
app.post("/webhook/stripe", async (c) => {
const sig = c.req.header("stripe-signature");
if (!sig) return c.json({ error: "missing signature" }, 400);
const body = await c.req.json();
await c.env.EVENTS.send({
type: body.type,
payload: body.data,
receivedAt: new Date().toISOString(),
});
// Stripe には即 200 を返す。本処理は consumer に任せる
return c.json({ ok: true });
});
export default app;
Producer 側は 「受け取って Queue に投げる」だけ。3 秒かかる本体処理を待たないので、Webhook の SLA (Stripe は 5 秒以内に 2xx を要求) を確実に守れます。
wrangler.jsonc — Consumer 側
Consumer は同じ Queue を consumers で受けます。max_batch_size / max_retries / dead_letter_queue がバッチ処理パイプラインの 3 つの肝です。
// apps/webhook-processor/wrangler.jsonc
{
"$schema": "node_modules/wrangler/config-schema.json",
"name": "webhook-processor",
"main": "src/index.ts",
"compatibility_date": "2026-04-01",
"queues": {
"consumers": [
{
"queue": "webhook-events",
"max_batch_size": 25,
"max_batch_timeout": 5,
"max_retries": 3,
"dead_letter_queue": "webhook-dlq"
}
]
},
"observability": { "enabled": true }
}
それぞれの値の意味:
max_batch_size: 25— 1 回のqueueハンドラ呼び出しで最大 25 件まで束ねて受け取るmax_batch_timeout: 5— 25 件たまっていなくても、5 秒待ったら来た分だけで起動するmax_retries: 3— 1 メッセージあたり最大 3 回までリトライ。それを超えたら DLQ 行きdead_letter_queue— リトライ上限を超えたメッセージを送る別 Queue 名
Consumer の実装 — message 単位の ack/retry
Consumer は queue ハンドラを export し、バッチで受け取った messages を 1 件ずつ ack() か retry() します。バッチ全体を throw で投げ捨てると、成功した分まで再配送されるので、必ず個別に明示します。
// apps/webhook-processor/src/index.ts
type WebhookEvent = {
type: string;
payload: unknown;
receivedAt: string;
};
type Env = { DB: D1Database };
export default {
async queue(batch: MessageBatch<WebhookEvent>, env: Env): Promise<void> {
for (const msg of batch.messages) {
try {
await processEvent(msg.body, env);
msg.ack(); // 成功: Queue から消える
} catch (err) {
console.error("processEvent failed", {
id: msg.id,
attempt: msg.attempts,
err: String(err),
});
// 一時的失敗だけ retry。永続的失敗は ack で握りつぶして DLQ に流さない
if (isTransient(err)) {
msg.retry({ delaySeconds: backoff(msg.attempts) });
} else {
msg.ack(); // 4xx 系はリトライしても無駄なので捨てる
}
}
}
},
};
function backoff(attempt: number): number {
// 指数バックオフ + jitter: 5s, 30s, 120s ...
return Math.min(5 * 2 ** attempt, 300) + Math.floor(Math.random() * 5);
}
function isTransient(err: unknown): boolean {
// ネットワーク・5xx・タイムアウトだけ retry 対象とする
if (err instanceof Error && /timeout|fetch failed|5\d\d/.test(err.message)) {
return true;
}
return false;
}
async function processEvent(ev: WebhookEvent, env: Env) {
// 冪等性キーで重複処理を防ぐ。Queues は at-least-once なので必須
const dedupKey = `${ev.type}:${(ev.payload as any)?.id}`;
const inserted = await env.DB.prepare(
"INSERT OR IGNORE INTO processed_events (key) VALUES (?)"
)
.bind(dedupKey)
.run();
if ((inserted.meta as any).changes === 0) return; // 二重配送
// ここで実処理: メール送信、画像変換、Embedding 生成、など
}
実装で外せない 3 ポイント:
- 冪等性キーを必ず持つ。Queues は at-least-once なので、同じメッセージが 2 回届くのは仕様。
INSERT OR IGNOREかON CONFLICT DO NOTHINGで保護する isTransientで transient / permanent を分ける。永続的な失敗をretry()し続けると、3 回後に DLQ に飛んで、そこからまた人手で救済する手間が無駄に発生する- バックオフは指数 + jitter。同じタイミングで失敗した 25 件が同じタイミングで retry すると、上流が再度倒れる
DLQ を作る
DLQ は ただの普通の Queueです。wrangler queues create webhook-dlq で作って、後で別 Worker で消費するか、wrangler queues consumer worker add で手動コンシューム用の Worker を当てます。
DLQ に入ったメッセージを眺めるだけの最小 consumer:
// apps/webhook-dlq-inspector/src/index.ts
export default {
async queue(batch: MessageBatch<unknown>): Promise<void> {
for (const msg of batch.messages) {
console.error("DLQ message", {
id: msg.id,
attempts: msg.attempts,
body: msg.body,
});
msg.ack(); // 観測したら捨てる。実運用では D1 に書いて UI で見る
}
},
};
実運用では DLQ メッセージを D1 に永続化して、別 UI から手動 retry できるようにしておくと、深夜の障害対応がだいぶ楽になります。
落とし穴 / ハマりポイント
max_batch_sizeは QPS に対して大きめに。1 件ずつ来ると Worker の CPU time 上限に何度も引っかかります。25-100 程度を起点に調整- バッチ全体を
throwで投げると、ack()済の分まで含めて全件再配送される。「個別 try/catch で ack/retry を明示」が鉄則 max_retriesを 0 や 1 にしない。Workers の transient 失敗 (上流の rate limit、DNS 一瞬詰まり) はそこそこ起きるので、最低 3 は欲しい- DLQ を作り忘れて
max_retries超過したメッセージは消える。最初から DLQ ありきで設計する wrangler devでローカル Queue は動くが本番と挙動が完全には一致しない。バッチサイズ・タイムアウトのチューニングは、staging 環境で実トラフィックの近似で見るしかない
観測の最低ライン
observability.enabled: trueを Producer / Consumer の両方にconsole.log({ id, attempts, latencyMs })で 1 メッセージ 1 行の構造化ログを必ず吐く- Cloudflare Dashboard の Queues タブで「未処理メッセージ数」を眺める。ここが右肩上がりになったら Consumer が詰まっている
監視は最低この 3 点で「詰まっているか / 失敗しているか」が分かります。
まとめ
Cloudflare Queues は、Workers 単体ではどうしても噛み合わない「リクエスト裏で重い処理を流す」用途にきれいにはまります。Producer / Consumer の分離 + DLQ + 冪等性キー、この 3 点を初期から仕込んでおけば、SQS や RabbitMQ を別建てするコストなしで、production-grade な非同期パイプラインが組めます。
次回は Queues + Workers AI で、Webhook 受信 → Embedding 生成 → Vectorize 投入を組む話を書く予定です。