Cloudflare Queues で軽量バッチ処理パイプラインを組む

Cloudflare Queues + Workers で、リクエストの裏で重い処理を非同期に流す最小構成。Producer / Consumer / Dead Letter Queue / リトライ戦略を、コピペで動くコードと wrangler.jsonc 設定込みでまとめます。

「API レスポンスは即返したいけど、その裏で 5 秒かかる処理を流したい」 — Webhook 受信、画像変換、メール送信、Embedding 生成。Cloudflare Workers でこれを真面目に組むと、waitUntil() は 30 秒で打ち切られるし、CPU time の上限にも引っかかります。Cloudflare Queues は、この「リクエスト裏で非同期に重い仕事を流す」用途にぴったりはまります。

本記事では、Webhook を受けて Queue に投げ、Consumer 側で順次処理する最小パイプラインを、Producer / Consumer / Dead Letter Queue / リトライの 4 点込みで組み立てます。

到達点と前提

作るもの: 外部 Webhook を受けて、本体処理 (3 秒程度の I/O) を非同期に流すパイプライン。失敗したメッセージは DLQ に逃がし、後追いで救済できる構成です。

検証バージョン (2026-04 時点):

  • Wrangler v4.84 / Workers Queues
  • 1 producer Worker (webhook-receiver) + 1 consumer Worker (webhook-processor)
  • DLQ: もう 1 つの Queue (webhook-dlq)
External ─▶ webhook-receiver  ─▶ Queue (webhook-events)  ─▶ webhook-processor
                                                                 │ retry x N

                                                           DLQ (webhook-dlq)

なぜ Queues なのか

Workers 単体で「重い処理」を逃がす方法は複数ありますが、Queues は次の点で他より素直です。

  • At-least-once 配送 + 自動リトライ + バッチ受信が 1 サービスでまとまっている
  • Producer と Consumer を別 Worker に分けられるので、処理本体の不調が API のレイテンシに波及しない
  • Workers から無料枠で叩ける (有料プランの中に含まれる)。SQS / Kafka を別途立てる必要なし
  • Dead Letter Queue がネイティブ。リトライ上限を超えたメッセージは別 Queue に勝手に逃げる

「Durable Objects で頑張って queue を実装する」前に、まず Queues を試す価値があります。

wrangler.jsonc — Producer 側

Producer は Queue に書き込むバインディング (queues.producers) だけ持ちます。

// apps/webhook-receiver/wrangler.jsonc
{
  "$schema": "node_modules/wrangler/config-schema.json",
  "name": "webhook-receiver",
  "main": "src/index.ts",
  "compatibility_date": "2026-04-01",
  "queues": {
    "producers": [
      {
        "binding": "EVENTS",
        "queue": "webhook-events"
      }
    ]
  },
  "observability": { "enabled": true }
}
// apps/webhook-receiver/src/index.ts
import { Hono } from "hono";

type Bindings = {
  EVENTS: Queue<WebhookEvent>;
};

type WebhookEvent = {
  type: "stripe.payment.succeeded" | "stripe.invoice.paid";
  payload: unknown;
  receivedAt: string;
};

const app = new Hono<{ Bindings: Bindings }>();

app.post("/webhook/stripe", async (c) => {
  const sig = c.req.header("stripe-signature");
  if (!sig) return c.json({ error: "missing signature" }, 400);

  const body = await c.req.json();

  await c.env.EVENTS.send({
    type: body.type,
    payload: body.data,
    receivedAt: new Date().toISOString(),
  });

  // Stripe には即 200 を返す。本処理は consumer に任せる
  return c.json({ ok: true });
});

export default app;

Producer 側は 「受け取って Queue に投げる」だけ。3 秒かかる本体処理を待たないので、Webhook の SLA (Stripe は 5 秒以内に 2xx を要求) を確実に守れます。

wrangler.jsonc — Consumer 側

Consumer は同じ Queue を consumers で受けます。max_batch_size / max_retries / dead_letter_queue がバッチ処理パイプラインの 3 つの肝です。

// apps/webhook-processor/wrangler.jsonc
{
  "$schema": "node_modules/wrangler/config-schema.json",
  "name": "webhook-processor",
  "main": "src/index.ts",
  "compatibility_date": "2026-04-01",
  "queues": {
    "consumers": [
      {
        "queue": "webhook-events",
        "max_batch_size": 25,
        "max_batch_timeout": 5,
        "max_retries": 3,
        "dead_letter_queue": "webhook-dlq"
      }
    ]
  },
  "observability": { "enabled": true }
}

それぞれの値の意味:

  • max_batch_size: 25 — 1 回の queue ハンドラ呼び出しで最大 25 件まで束ねて受け取る
  • max_batch_timeout: 5 — 25 件たまっていなくても、5 秒待ったら来た分だけで起動する
  • max_retries: 3 — 1 メッセージあたり最大 3 回までリトライ。それを超えたら DLQ 行き
  • dead_letter_queue — リトライ上限を超えたメッセージを送る別 Queue 名

Consumer の実装 — message 単位の ack/retry

Consumer は queue ハンドラを export し、バッチで受け取った messages を 1 件ずつ ack()retry() します。バッチ全体を throw で投げ捨てると、成功した分まで再配送されるので、必ず個別に明示します。

// apps/webhook-processor/src/index.ts
type WebhookEvent = {
  type: string;
  payload: unknown;
  receivedAt: string;
};

type Env = { DB: D1Database };

export default {
  async queue(batch: MessageBatch<WebhookEvent>, env: Env): Promise<void> {
    for (const msg of batch.messages) {
      try {
        await processEvent(msg.body, env);
        msg.ack(); // 成功: Queue から消える
      } catch (err) {
        console.error("processEvent failed", {
          id: msg.id,
          attempt: msg.attempts,
          err: String(err),
        });
        // 一時的失敗だけ retry。永続的失敗は ack で握りつぶして DLQ に流さない
        if (isTransient(err)) {
          msg.retry({ delaySeconds: backoff(msg.attempts) });
        } else {
          msg.ack(); // 4xx 系はリトライしても無駄なので捨てる
        }
      }
    }
  },
};

function backoff(attempt: number): number {
  // 指数バックオフ + jitter: 5s, 30s, 120s ...
  return Math.min(5 * 2 ** attempt, 300) + Math.floor(Math.random() * 5);
}

function isTransient(err: unknown): boolean {
  // ネットワーク・5xx・タイムアウトだけ retry 対象とする
  if (err instanceof Error && /timeout|fetch failed|5\d\d/.test(err.message)) {
    return true;
  }
  return false;
}

async function processEvent(ev: WebhookEvent, env: Env) {
  // 冪等性キーで重複処理を防ぐ。Queues は at-least-once なので必須
  const dedupKey = `${ev.type}:${(ev.payload as any)?.id}`;
  const inserted = await env.DB.prepare(
    "INSERT OR IGNORE INTO processed_events (key) VALUES (?)"
  )
    .bind(dedupKey)
    .run();
  if ((inserted.meta as any).changes === 0) return; // 二重配送

  // ここで実処理: メール送信、画像変換、Embedding 生成、など
}

実装で外せない 3 ポイント:

  1. 冪等性キーを必ず持つ。Queues は at-least-once なので、同じメッセージが 2 回届くのは仕様。INSERT OR IGNOREON CONFLICT DO NOTHING で保護する
  2. isTransient で transient / permanent を分ける。永続的な失敗を retry() し続けると、3 回後に DLQ に飛んで、そこからまた人手で救済する手間が無駄に発生する
  3. バックオフは指数 + jitter。同じタイミングで失敗した 25 件が同じタイミングで retry すると、上流が再度倒れる

DLQ を作る

DLQ は ただの普通の Queueです。wrangler queues create webhook-dlq で作って、後で別 Worker で消費するか、wrangler queues consumer worker add で手動コンシューム用の Worker を当てます。

DLQ に入ったメッセージを眺めるだけの最小 consumer:

// apps/webhook-dlq-inspector/src/index.ts
export default {
  async queue(batch: MessageBatch<unknown>): Promise<void> {
    for (const msg of batch.messages) {
      console.error("DLQ message", {
        id: msg.id,
        attempts: msg.attempts,
        body: msg.body,
      });
      msg.ack(); // 観測したら捨てる。実運用では D1 に書いて UI で見る
    }
  },
};

実運用では DLQ メッセージを D1 に永続化して、別 UI から手動 retry できるようにしておくと、深夜の障害対応がだいぶ楽になります。

落とし穴 / ハマりポイント

  • max_batch_size は QPS に対して大きめに。1 件ずつ来ると Worker の CPU time 上限に何度も引っかかります。25-100 程度を起点に調整
  • バッチ全体を throw で投げると、ack() 済の分まで含めて全件再配送される。「個別 try/catch で ack/retry を明示」が鉄則
  • max_retries を 0 や 1 にしない。Workers の transient 失敗 (上流の rate limit、DNS 一瞬詰まり) はそこそこ起きるので、最低 3 は欲しい
  • DLQ を作り忘れて max_retries 超過したメッセージは消える。最初から DLQ ありきで設計する
  • wrangler dev でローカル Queue は動くが本番と挙動が完全には一致しない。バッチサイズ・タイムアウトのチューニングは、staging 環境で実トラフィックの近似で見るしかない

観測の最低ライン

  • observability.enabled: true を Producer / Consumer の両方に
  • console.log({ id, attempts, latencyMs }) で 1 メッセージ 1 行の構造化ログを必ず吐く
  • Cloudflare Dashboard の Queues タブで「未処理メッセージ数」を眺める。ここが右肩上がりになったら Consumer が詰まっている

監視は最低この 3 点で「詰まっているか / 失敗しているか」が分かります。

まとめ

Cloudflare Queues は、Workers 単体ではどうしても噛み合わない「リクエスト裏で重い処理を流す」用途にきれいにはまります。Producer / Consumer の分離 + DLQ + 冪等性キー、この 3 点を初期から仕込んでおけば、SQS や RabbitMQ を別建てするコストなしで、production-grade な非同期パイプラインが組めます。

次回は Queues + Workers AI で、Webhook 受信 → Embedding 生成 → Vectorize 投入を組む話を書く予定です。

参考