LLM APIs fail. Rate limits hit at inconvenient times, 503s appear during high-traffic periods, and timeouts spike when the model is under load. If your pipeline doesn't have a retry strategy, a transient failure kills an hours-long job. This article builds a complete retry system: a generic backoff decorator, specific handling for 429 rate-limit responses, and a circuit breaker to stop retrying when a service is genuinely down.
Why LLM APIs Fail
Understanding the failure mode shapes your retry strategy:
-
429 Too Many Requests: you've exceeded your rate limit. The
Retry-Afterheader tells you exactly how long to wait. Exponential backoff without reading this header wastes time or retries too early. - 503 Service Unavailable: the provider is overloaded. Retryable — back off and try again.
- 500 Internal Server Error: server-side bug. Sometimes retryable (transient), sometimes not. Retry with a cap.
- 408 Request Timeout: the request took too long. Retryable.
- 400 Bad Request: your payload is malformed. Not retryable — retrying won't fix a broken prompt.
- 401 Unauthorized / 403 Forbidden: auth issue. Not retryable — fix your API key first.
- 422 Unprocessable Entity: your request is semantically invalid. Not retryable.
The rule: 4xx errors are generally your fault (except 429 and 408), 5xx errors are the server's fault.
Generic Retry Decorator with Exponential Backoff + Jitter
import time
import random
import logging
import functools
from typing import Callable, TypeVar, Any
log = logging.getLogger("pipeline.retry")
RETRYABLE_STATUS_CODES = {408, 429, 500, 502, 503, 504}
F = TypeVar("F", bound=Callable[..., Any])
def with_retry(
max_attempts: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0,
backoff_factor: float = 2.0,
jitter: bool = True,
) -> Callable[[F], F]:
"""
Decorator that retries a function on retryable HTTP errors.
Uses exponential backoff with optional full jitter.
"""
def decorator(func: F) -> F:
@functools.wraps(func)
def wrapper(*args, **kwargs):
attempt = 0
while attempt < max_attempts:
try:
return func(*args, **kwargs)
except RetryableError as e:
attempt += 1
if attempt >= max_attempts:
log.error(f"Giving up after {max_attempts} attempts: {e}")
raise
delay = min(base_delay * (backoff_factor ** (attempt - 1)), max_delay)
if jitter:
# Full jitter: uniform random between 0 and delay
# Prevents thundering herd when many workers retry simultaneously
delay = random.uniform(0, delay)
if e.retry_after is not None:
delay = max(delay, e.retry_after)
log.warning(
f"Attempt {attempt}/{max_attempts} failed ({e}). "
f"Retrying in {delay:.1f}s"
)
time.sleep(delay)
except NonRetryableError:
raise # no point retrying
return wrapper # type: ignore
return decorator
class RetryableError(Exception):
def __init__(self, message: str, status_code: int, retry_after: float | None = None):
super().__init__(message)
self.status_code = status_code
self.retry_after = retry_after
class NonRetryableError(Exception):
def __init__(self, message: str, status_code: int):
super().__init__(message)
self.status_code = status_code
Specific Handling for 429: Read the Retry-After Header
The Retry-After header is either a number of seconds or an HTTP date. Parse both:
from email.utils import parsedate_to_datetime
from datetime import datetime, timezone
def parse_retry_after(header_value: str | None) -> float | None:
"""
Parse the Retry-After header.
Returns seconds to wait, or None if the header is absent/unparseable.
"""
if not header_value:
return None
# Try it as a plain number of seconds first
try:
return float(header_value)
except ValueError:
pass
# Try it as an HTTP date
try:
retry_at = parsedate_to_datetime(header_value)
now = datetime.now(tz=timezone.utc)
seconds = (retry_at - now).total_seconds()
return max(0.0, seconds)
except Exception:
return None
def classify_http_error(status_code: int, headers: dict, body: str) -> Exception:
"""Convert an HTTP error response into the appropriate exception type."""
if status_code in RETRYABLE_STATUS_CODES:
retry_after = None
if status_code == 429:
retry_after = parse_retry_after(headers.get("Retry-After"))
return RetryableError(
f"HTTP {status_code}: {body[:200]}",
status_code=status_code,
retry_after=retry_after,
)
else:
return NonRetryableError(
f"HTTP {status_code}: {body[:200]}",
status_code=status_code,
)
Circuit Breaker: Stop Retrying After N Consecutive Failures
A circuit breaker prevents your pipeline from hammering an API that's genuinely down. After N consecutive failures it "opens" and fast-fails all calls. After a cooldown period it enters "half-open" state and allows one test call through.
import threading
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed" # normal operation
OPEN = "open" # failing fast
HALF_OPEN = "half_open" # testing recovery
class CircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 60.0,
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self._state = CircuitState.CLOSED
self._failure_count = 0
self._last_failure_time: float | None = None
self._lock = threading.Lock()
@property
def state(self) -> CircuitState:
with self._lock:
if self._state == CircuitState.OPEN:
if (
self._last_failure_time is not None
and time.monotonic() - self._last_failure_time >= self.recovery_timeout
):
self._state = CircuitState.HALF_OPEN
log.info("Circuit breaker entering HALF_OPEN — testing recovery")
return self._state
def record_success(self):
with self._lock:
self._failure_count = 0
self._state = CircuitState.CLOSED
def record_failure(self):
with self._lock:
self._failure_count += 1
self._last_failure_time = time.monotonic()
if self._failure_count >= self.failure_threshold:
if self._state != CircuitState.OPEN:
log.error(
f"Circuit breaker OPEN after {self._failure_count} consecutive failures"
)
self._state = CircuitState.OPEN
def call(self, func: Callable, *args, **kwargs):
if self.state == CircuitState.OPEN:
raise RuntimeError("Circuit breaker is OPEN — not attempting call")
try:
result = func(*args, **kwargs)
self.record_success()
return result
except Exception:
self.record_failure()
raise
Putting It All Together
import requests
circuit_breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=60.0)
@with_retry(max_attempts=5, base_delay=1.0, max_delay=60.0)
def call_llm_api(prompt: str, model: str, api_key: str) -> str:
"""Make a retryable LLM API call wrapped in a circuit breaker."""
def _call():
response = requests.post(
"https://api.anthropic.com/v1/messages",
headers={
"x-api-key": api_key,
"anthropic-version": "2023-06-01",
"content-type": "application/json",
},
json={
"model": model,
"max_tokens": 4096,
"messages": [{"role": "user", "content": prompt}],
},
timeout=120,
)
if not response.ok:
raise classify_http_error(
response.status_code,
dict(response.headers),
response.text,
)
return response.json()["content"][0]["text"]
return circuit_breaker.call(_call)
With this setup: transient 503s retry automatically with backoff, 429s wait the exact number of seconds the server requests, 400s fail immediately without wasting retries, and five consecutive failures open the circuit and surface a clear error to your pipeline rather than thrashing for minutes.
Full pipeline + source code: germy5.gumroad.com/l/xhxkzz — $19.99, 30-day refund.
📋 Free resource: AI Publishing Checklist — 7 steps to ship a technical ebook with Python (free PDF)
Full pipeline + 10 scripts: germy5.gumroad.com/l/xhxkzz — $12.99 launch price
Top comments (0)