Skip to main content

Retries, Timeouts, and Cleanup

What You'll Learn

How to handle transient failures with retries, set timeouts on network calls, clean up resources reliably, and shut down gracefully.

Why Programs Need Retries

Real-world operations fail intermittently:

  • Network blips
  • API rate limits (429 Too Many Requests)
  • Temporary database unavailability
  • Throttled cloud services

A single failure shouldn't crash the whole job. Retry the operation with a delay.

Simple Retry Pattern

import time
import logging

log = logging.getLogger(__name__)


def retry(fn, max_attempts: int = 3, delay: float = 1.0):
"""Call fn() up to max_attempts times, waiting delay seconds between tries."""
last_error = None
for attempt in range(1, max_attempts + 1):
try:
return fn()
except Exception as e:
last_error = e
if attempt < max_attempts:
log.warning("Attempt %d/%d failed: %s — retrying in %.1fs",
attempt, max_attempts, e, delay)
time.sleep(delay)
else:
log.error("All %d attempts failed: %s", max_attempts, e)
raise last_error

Usage:

result = retry(lambda: fetch_data_from_api(), max_attempts=3, delay=2.0)

Exponential Backoff

Instead of a fixed delay, wait longer after each failure:

import time
import random
import logging

log = logging.getLogger(__name__)


def retry_with_backoff(
fn,
max_attempts: int = 4,
base_delay: float = 1.0,
max_delay: float = 60.0,
jitter: bool = True,
):
"""Retry with exponential backoff and optional jitter."""
for attempt in range(1, max_attempts + 1):
try:
return fn()
except Exception as e:
if attempt == max_attempts:
raise

delay = min(base_delay * (2 ** (attempt - 1)), max_delay)
if jitter:
delay *= (0.5 + random.random() * 0.5) # add randomness

log.warning("Attempt %d/%d failed (%s) — retrying in %.1fs",
attempt, max_attempts, e, delay)
time.sleep(delay)

Delay schedule (base=1s):

  • Attempt 1 fails → wait 1s
  • Attempt 2 fails → wait 2s
  • Attempt 3 fails → wait 4s
  • Attempt 4 fails → raise

Using tenacity (Third-Party)

For production code, use the tenacity library — battle-tested and flexible:

pip install tenacity
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import requests

@retry(
stop=stop_after_attempt(4),
wait=wait_exponential(multiplier=1, min=1, max=60),
retry=retry_if_exception_type(requests.exceptions.ConnectionError),
)
def fetch_data(url: str) -> dict:
response = requests.get(url, timeout=10)
response.raise_for_status()
return response.json()

Timeouts on Network Calls

Always set timeouts. Without them, your program can hang forever:

import requests

# ❌ No timeout — can hang for minutes
response = requests.get("https://api.example.com/data")

# ✅ With timeout
response = requests.get(
"https://api.example.com/data",
timeout=(5, 30) # (connect_timeout, read_timeout) in seconds
)

With the standard library:

import urllib.request

with urllib.request.urlopen(url, timeout=10) as response:
data = response.read()

Catch timeouts:

from requests.exceptions import Timeout, ConnectionError

try:
response = requests.get(url, timeout=10)
except Timeout:
log.error("Request timed out after 10s: %s", url)
except ConnectionError:
log.error("Connection failed: %s", url)

Resource Cleanup with Context Managers

Use with statements to guarantee cleanup even when exceptions occur:

# Files
with open("data.txt", encoding="utf-8") as f:
data = f.read()
# f is always closed here, even if an exception occurred

# Multiple resources
with open("input.txt") as fin, open("output.txt", "w") as fout:
fout.write(fin.read())

Writing Your Own Context Manager

import contextlib
import logging

log = logging.getLogger(__name__)


@contextlib.contextmanager
def timer(label: str):
"""Context manager that logs how long a block took."""
import time
start = time.monotonic()
try:
yield
finally:
elapsed = time.monotonic() - start
log.info("%s took %.3fs", label, elapsed)


with timer("data processing"):
process_large_dataset()
@contextlib.contextmanager
def temporary_file(suffix: str = ".tmp"):
"""Create a temp file, yield it, delete on exit."""
from pathlib import Path
import tempfile
path = Path(tempfile.mktemp(suffix=suffix))
try:
yield path
finally:
path.unlink(missing_ok=True)

with temporary_file(".json") as tmp:
tmp.write_text('{"ok": true}')
process(tmp)
# File deleted here

Graceful Shutdown with signal Handlers

Handle Ctrl+C and SIGTERM cleanly:

import signal
import sys
import logging

log = logging.getLogger(__name__)
shutdown_requested = False


def handle_shutdown(signum, frame):
global shutdown_requested
log.info("Shutdown signal received (%d) — finishing current item", signum)
shutdown_requested = True


signal.signal(signal.SIGINT, handle_shutdown) # Ctrl+C
signal.signal(signal.SIGTERM, handle_shutdown) # kill / docker stop


def process_queue(items):
for item in items:
if shutdown_requested:
log.info("Shutdown requested — stopping after %d items", processed)
break
process(item)

Common Mistakes

MistakeFix
No timeout on network callsAlways pass timeout=
Retrying non-transient errorsOnly retry errors that may recover (network, rate limits)
Not sleeping between retriesGive the service time to recover
finally: doing risky workKeep finally simple — just cleanup
Not closing files/connectionsUse with statements

Quick Reference

# Simple retry
for attempt in range(1, 4):
try:
result = risky()
break
except Exception as e:
if attempt == 3: raise
time.sleep(2 ** attempt)

# tenacity
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(stop=stop_after_attempt(3), wait=wait_exponential(min=1, max=10))
def fn(): ...

# Timeout
requests.get(url, timeout=(5, 30))

# Context manager
@contextlib.contextmanager
def managed():
setup()
try:
yield resource
finally:
cleanup()

# Signal handler
signal.signal(signal.SIGINT, handler)
signal.signal(signal.SIGTERM, handler)

What's Next

Lesson 4: Custom Exceptions