Pagination and Rate Limits
What You'll Learn
How to retrieve all results from a paginated API, respect rate limits without getting blocked, and handle both reliably.
Why Pagination Matters
Most APIs don't return thousands of records in one response:
GET /users → 100 records (page 1)
GET /users?page=2 → 100 records (page 2)
GET /users?page=3 → 83 records (last page)
Total: 283 users
If you only call GET /users once, you get 100 records and think that's all.
Pagination Styles
Offset/Page Pagination
import requests
def fetch_all_users_paged(base_url: str, token: str) -> list[dict]:
"""Fetch all users using page-number pagination."""
all_users = []
page = 1
while True:
response = requests.get(
f"{base_url}/users",
params={"page": page, "per_page": 100},
headers={"Authorization": f"Bearer {token}"},
timeout=10,
)
response.raise_for_status()
data = response.json()
users = data["users"]
all_users.extend(users)
# Stop when we get fewer results than the page size (last page)
if len(users) < 100:
break
page += 1
return all_users
Cursor Pagination
def fetch_all_with_cursor(base_url: str, token: str) -> list[dict]:
"""Fetch all records using cursor-based pagination."""
all_records = []
cursor = None
while True:
params = {"limit": 100}
if cursor:
params["cursor"] = cursor
response = requests.get(
f"{base_url}/records",
params=params,
headers={"Authorization": f"Bearer {token}"},
timeout=10,
)
response.raise_for_status()
data = response.json()
all_records.extend(data["items"])
# next_cursor is None when we're on the last page
cursor = data.get("next_cursor")
if not cursor:
break
return all_records
Link Header Pagination (GitHub style)
def fetch_all_github_repos(org: str, token: str) -> list[dict]:
"""Follow GitHub Link headers for pagination."""
repos = []
url = f"https://api.github.com/orgs/{org}/repos"
params = {"per_page": 100}
while url:
response = requests.get(
url,
params=params,
headers={
"Authorization": f"Bearer {token}",
"Accept": "application/vnd.github.v3+json",
},
timeout=10,
)
response.raise_for_status()
repos.extend(response.json())
# Parse the Link header: <url>; rel="next"
link_header = response.headers.get("Link", "")
next_url = None
for part in link_header.split(","):
if 'rel="next"' in part:
next_url = part.split(";")[0].strip().strip("<>")
break
url = next_url
params = {} # URL already has params encoded
return repos
Rate Limits
APIs limit how many requests you can make per minute/hour to prevent abuse.
Reading Rate Limit Headers
Most APIs return rate limit info in response headers:
response = requests.get(url, headers={"Authorization": f"Bearer {token}"})
# GitHub rate limit headers
remaining = int(response.headers.get("X-RateLimit-Remaining", 999))
reset_time = int(response.headers.get("X-RateLimit-Reset", 0))
limit = int(response.headers.get("X-RateLimit-Limit", 999))
print(f"Remaining: {remaining}/{limit}")
Respect Rate Limits Before Hitting Them
import time
import requests
def fetch_with_rate_limit(urls: list[str], requests_per_minute: int = 60) -> list[dict]:
"""Fetch multiple URLs while respecting a rate limit."""
delay = 60 / requests_per_minute # seconds between requests
results = []
for i, url in enumerate(urls):
if i > 0:
time.sleep(delay) # wait between requests
response = requests.get(url, timeout=10)
# Check if we're about to hit the limit
remaining = int(response.headers.get("X-RateLimit-Remaining", 999))
if remaining < 5:
reset_at = int(response.headers.get("X-RateLimit-Reset", 0))
wait_sec = max(0, reset_at - time.time()) + 1
print(f"Rate limit low ({remaining} remaining) — waiting {wait_sec:.0f}s")
time.sleep(wait_sec)
response.raise_for_status()
results.append(response.json())
return results
Handling 429 Too Many Requests
import time
import requests
from requests.exceptions import HTTPError
def fetch_with_retry_on_rate_limit(url: str, max_retries: int = 5) -> dict:
"""Retry automatically when rate limited (HTTP 429)."""
for attempt in range(max_retries):
response = requests.get(url, timeout=10)
if response.status_code == 429:
# Respect Retry-After header if provided
retry_after = float(response.headers.get("Retry-After", 60))
print(f"Rate limited — waiting {retry_after:.0f}s (attempt {attempt+1})")
time.sleep(retry_after)
continue
response.raise_for_status()
return response.json()
raise RuntimeError(f"Rate limit not resolved after {max_retries} retries")
Complete Paginated Fetcher with Rate Limiting
import time
import logging
import requests
from typing import Iterator
log = logging.getLogger(__name__)
def iter_all_records(
base_url: str,
token: str,
requests_per_minute: int = 30,
) -> Iterator[dict]:
"""Generator that yields all records from a paginated API."""
page = 1
delay = 60 / requests_per_minute
session = requests.Session()
session.headers.update({"Authorization": f"Bearer {token}"})
while True:
log.debug("Fetching page %d", page)
response = session.get(
f"{base_url}/records",
params={"page": page, "limit": 100},
timeout=10,
)
if response.status_code == 429:
wait = float(response.headers.get("Retry-After", 60))
log.warning("Rate limited — sleeping %.0fs", wait)
time.sleep(wait)
continue
response.raise_for_status()
data = response.json()
records = data.get("items", data) # handle both list and wrapped response
if not records:
break
yield from records
if len(records) < 100: # last page
break
page += 1
time.sleep(delay) # throttle between pages
# Usage — memory efficient, processes one page at a time
total = 0
for record in iter_all_records("https://api.example.com", token="..."):
process(record)
total += 1
print(f"Processed {total} records")
Quick Reference
# Offset pagination
while True:
data = get(url, params={"page": page, "per_page": 100})
records.extend(data["items"])
if len(data["items"]) < 100: break
page += 1
# Cursor pagination
while True:
data = get(url, params={"cursor": cursor, "limit": 100})
records.extend(data["items"])
cursor = data.get("next_cursor")
if not cursor: break
# Link header (GitHub)
while url:
r = get(url)
records.extend(r.json())
url = parse_link_next(r.headers.get("Link", ""))
# Rate limiting
time.sleep(60 / requests_per_minute) # between requests
# Handle 429
if response.status_code == 429:
time.sleep(float(response.headers.get("Retry-After", 60)))
continue
# Check remaining before exhausting
remaining = int(r.headers.get("X-RateLimit-Remaining", 999))
if remaining < 5:
time.sleep(reset_in_seconds)