orion/notify-svc · Pull Request #318

為 webhook 投遞加上指數退避重試與死信佇列

林玟潔
三天前開啟
wj/webhook-retry-dlq main
+186 / −42 6 個檔案變更

這個 PR 做了什麼

風險地圖

安全 值得看一下 需要注意

變更檔案

notify/retry.py
需要注意 +72 −0
@@ -0,0 +1,24 @@
1+import asyncio, random
2+import httpx
3+from .dead_letter import DeadLetterQueue
4+
5+MAX_ATTEMPTS = 6
6+BASE_DELAY = 2.0
7+MAX_DELAY = 300.0
8+
9+async def deliver_with_retry(client, event, dlq: DeadLetterQueue):
10+ for attempt in range(MAX_ATTEMPTS):
11+ try:
12+ resp = await client.post(event.url, json=event.payload)
13+ if resp.status_code < 400:
14+ return resp
15+ if 400 <= resp.status_code < 500:
16+ await dlq.push(event, reason=f"http_{resp.status_code}")
17+ return resp
18+ except httpx.TransportError:
19+ pass
20+ delay = min(BASE_DELAY * (2 ** attempt), MAX_DELAY)
21+ delay *= 1 + random.uniform(-0.2, 0.2)
22+ await asyncio.sleep(delay)
23+ await dlq.push(event, reason="max_attempts")
24+}
第 18 行

阻擋合併只 catch TransportError 不夠,httpx.TimeoutException 不是它的子類,timeout 會直接冒泡出去把整個 worker task 吃掉。請改成 (httpx.TransportError, httpx.TimeoutException) 一起捕捉,並把訊息記進結構化 log,否則 5xx 重試還算正常、timeout 卻會悄悄丟事件。

第 12 行

小建議POST 沒有給 timeout,預設會吃 client 級設定,但 retry 層自己再保險一道比較好。client.post(..., timeout=10.0) 比較不會跟外部慢服務一起被卡住超過退避視窗。

notify/worker.py
值得看一下 +38 −28
@@ -56,15 +56,19 @@ async def run_worker(stream: str):
56 redis = await aioredis.from_url(settings.REDIS_URL)
57- async with httpx.AsyncClient() as client:
58- while True:
59- event = await pop_event(redis, stream)
60- resp = await client.post(event.url, json=event.payload)
61- if resp.status_code >= 400:
62- log.warning("delivery_failed", id=event.id)
63- await ack_event(redis, event)
57+ dlq = DeadLetterQueue(redis)
58+ limits = httpx.Limits(max_connections=64, max_keepalive_connections=32)
59+ async with httpx.AsyncClient(timeout=10.0, limits=limits) as client:
60+ while True:
61+ event = await pop_event(redis, stream)
62+ await deliver_with_retry(client, event, dlq)
63+ await ack_event(redis, event)
64
第 62 行

小建議deliver_with_retry 是 sequential 的,單一 worker 卡在某個慢 endpoint 時整條 stream 都會塞住。考慮把每個 event 丟進 asyncio.create_task 配一個 Semaphore(32),吞吐會明顯好看,但要先確定下游 webhook 端能接受併發。這個 PR 不一定要改,至少在 issue 開一張票追蹤。

notify/dead_letter.py
值得看一下 +24 −0
@@ -0,0 +1,18 @@
1+import json, time
2+
3+DLQ_STREAM = "webhook:dlq"
4+
5+class DeadLetterQueue:
6+ def __init__(self, redis):
7+ self.redis = redis
8+
9+ async def push(self, event, reason: str):
10+ body = {
11+ "event_id": event.id,
12+ "url": event.url,
13+ "payload": event.payload,
14+ "reason": reason,
15+ "buried_at": time.time(),
16+ }
17+ await self.redis.xadd(DLQ_STREAM, {"j": json.dumps(body)})
18+
第 17 行

阻擋合併DLQ stream 沒有設 MAXLEN,事故時可能一小時就吞掉幾 GB 把 Redis 撐爆。請用 xadd(DLQ_STREAM, {...}, maxlen=100_000, approximate=True),並在 README 標明保留視窗大概多久;另外 event.payload 可能包含使用者明文資料,落進長期 stream 前最好過一層遮罩。

notify/metrics.py 安全 +18 −2
新增 webhook_retry_total(標籤 attempt、status)與 webhook_dlq_total(標籤 reason)兩個 Prometheus Counter,沿用既有 registry,沒有改舊指標的命名或標籤集,dashboard 不會壞。
notify/config.py 安全 +9 −3
MAX_ATTEMPTSBASE_DELAYMAX_DELAY 拉出來給環境變數覆蓋,預設值跟 retry.py 一致;新增 DLQ_STREAM_MAXLEN(待 dead_letter.py 阻擋意見處理後接上)。
notify/tests/test_retry.py 安全 +25 −9
respx mock httpx 路由,新增三條測試:4xx 直接進 DLQ 不重試、5xx 退避後成功、連續 6 次連線錯誤後落 DLQ。本地全綠,CI 已跑過一輪。