Skip to main content

Idempotent Scripts

What You'll Learn

What idempotency means for scripts, why it matters for automation, and practical patterns to make any script safe to run repeatedly.

What Is Idempotency?

An idempotent operation produces the same result whether executed once or many times:

Run 1: Creates file users.csv ✅ success
Run 2: File already exists → skips ✅ success (not an error)
Run 3: File already exists → skips ✅ success

A non-idempotent script:

Run 1: Inserts 100 records into DB ✅
Run 2: Inserts 100 more records ❌ now 200 duplicate records!
Run 3: Inserts 100 more records ❌ now 300!

Idempotent scripts are safe for:

  • Cron jobs (run at midnight, re-run if failed)
  • CI/CD pipelines (retry on transient failure)
  • Provisioning (re-run to fix drift)
  • Onboarding (re-run for new team members)

Pattern 1: Check Before Acting

from pathlib import Path
import logging

log = logging.getLogger(__name__)


def ensure_directory(path: Path) -> bool:
"""Create directory if it doesn't exist. Returns True if created."""
if path.exists():
log.debug("Directory already exists: %s", path)
return False
path.mkdir(parents=True)
log.info("Created directory: %s", path)
return True


def write_default_config(config_path: Path) -> bool:
"""Write default config only if it doesn't exist."""
if config_path.exists():
log.debug("Config already exists: %s", config_path)
return False

config_path.write_text('{"version": 1, "debug": false}\n', encoding="utf-8")
log.info("Created default config: %s", config_path)
return True

Pattern 2: UPSERT (Update or Insert)

For database operations, use UPSERT instead of INSERT:

import sqlite3


def upsert_user(db_path: str, user: dict) -> None:
"""Insert or update a user. Safe to call multiple times."""
with sqlite3.connect(db_path) as conn:
conn.execute("""
INSERT INTO users (id, name, email)
VALUES (?, ?, ?)
ON CONFLICT(id) DO UPDATE SET
name = excluded.name,
email = excluded.email
""", (user["id"], user["name"], user["email"]))


def sync_users(db_path: str, users: list[dict]) -> dict:
"""Sync all users idempotently."""
stats = {"synced": 0}
for user in users:
upsert_user(db_path, user)
stats["synced"] += 1
return stats

Pattern 3: State Files (Job Checkpointing)

Track which items have been processed so you don't re-process them:

import json
from pathlib import Path


def load_processed(state_file: Path) -> set[str]:
"""Load set of already-processed IDs."""
if not state_file.exists():
return set()
return set(json.loads(state_file.read_text(encoding="utf-8")))


def save_processed(state_file: Path, processed: set[str]) -> None:
"""Save the set of processed IDs."""
state_file.write_text(json.dumps(sorted(processed)), encoding="utf-8")


def process_records(records: list[dict], state_file: Path) -> dict:
"""Process records, skipping already-processed ones."""
processed = load_processed(state_file)
stats = {"new": 0, "skipped": 0, "errors": 0}

for record in records:
record_id = str(record["id"])

if record_id in processed:
stats["skipped"] += 1
continue

try:
process_one(record)
processed.add(record_id)
save_processed(state_file, processed) # save after each success
stats["new"] += 1
except Exception as e:
log.error("Failed record_id=%s: %s", record_id, e)
stats["errors"] += 1

return stats

Pattern 4: --dry-run Mode

Show what would happen without making changes:

import argparse
import logging
from pathlib import Path

log = logging.getLogger(__name__)


def cleanup_old_files(directory: Path, days: int = 30, dry_run: bool = False) -> dict:
"""Delete files older than `days` days."""
import time
cutoff = time.time() - (days * 86400)
stats = {"deleted": 0, "skipped": 0}

for file in directory.rglob("*"):
if not file.is_file():
continue
if file.stat().st_mtime < cutoff:
if dry_run:
log.info("[DRY RUN] Would delete: %s", file)
else:
file.unlink()
log.info("Deleted: %s", file)
stats["deleted"] += 1
else:
stats["skipped"] += 1

return stats


def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("directory", type=Path)
parser.add_argument("--days", type=int, default=30)
parser.add_argument("--dry-run", action="store_true",
help="Show what would be deleted without deleting")
args = parser.parse_args()

logging.basicConfig(level=logging.INFO)
result = cleanup_old_files(args.directory, args.days, dry_run=args.dry_run)
log.info("Done: %s", result)
return 0

Pattern 5: Atomic File Writes

Prevent partial files if a script crashes mid-write:

from pathlib import Path
import json


def atomic_write_json(path: Path, data: dict) -> None:
"""Write JSON atomically — no partial files on crash."""
tmp = path.with_suffix(".tmp")
try:
tmp.write_text(json.dumps(data, indent=2), encoding="utf-8")
tmp.rename(path) # atomic on Linux/macOS
except Exception:
tmp.unlink(missing_ok=True)
raise

Idempotency Checklist

☐ Check if the action is already done before doing it
☐ Use UPSERT instead of INSERT for database records
☐ Use --dry-run to preview changes
☐ Track progress in a state file for long jobs
☐ Use atomic writes for file output
☐ Make cleanup operations also idempotent
☐ Test by running the script twice and checking it's safe

Quick Reference

# Check before acting
if not path.exists():
path.mkdir(parents=True)

# Atomic write
tmp = path.with_suffix(".tmp")
tmp.write_text(content)
tmp.rename(path)

# State file checkpoint
processed = load_state(state_file)
if item_id not in processed:
do_work(item)
processed.add(item_id)
save_state(state_file, processed)

# Dry run pattern
if dry_run:
log.info("[DRY RUN] Would do: %s", action)
else:
do_action()

# UPSERT (SQLite)
INSERT INTO table (...) VALUES (...)
ON CONFLICT(id) DO UPDATE SET col = excluded.col

What's Next

Lesson 2: Scheduled and Event-Driven Jobs