Python / FastAPI Interview Questions
FastAPI is a modern, high-performance Python web framework for building APIs, built on top of Starlette (for async web handling) and Pydantic (for data validation). It was created by Sebastián Ramírez and released in 2018.
| Feature | FastAPI | Flask | Django REST Framework |
|---|---|---|---|
| Performance | Very high (async, Starlette) | Moderate (sync-first) | Moderate (sync-first) |
| Type hints | First-class — drives validation & docs | Optional, no built-in use | Limited |
| Auto docs | Swagger UI + ReDoc built-in | Manual setup required | Browsable API only |
| Validation | Pydantic — automatic, deep | Manual or extensions | Serializers — verbose |
| Async support | Native (async def) | Limited (Flask 2+) | Limited |
| Learning curve | Low-moderate | Low | High (Django ecosystem) |
Key selling points:
- Automatic interactive API documentation (Swagger UI at
/docs, ReDoc at/redoc) - Runtime data validation and serialisation via Pydantic with zero extra code
- Full async/await support enabling high concurrency
- Editor auto-completion everywhere because the entire framework is built around type hints
- One of the fastest Python frameworks available — comparable to NodeJS and Go in benchmarks
A FastAPI app needs just a few lines. You create a FastAPI() instance and decorate Python functions with HTTP method decorators. The app is served by an ASGI server — Uvicorn is the standard choice.
# main.py
from fastapi import FastAPI
app = FastAPI()
@app.get("/")
def read_root():
return {"message": "Hello, FastAPI!"}
@app.get("/items/{item_id}")
def read_item(item_id: int, q: str | None = None):
return {"item_id": item_id, "q": q}# Run the server
uvicorn main:app --reload
# main = the module (main.py)
# app = the FastAPI() instance variable
# --reload = auto-restart on code changes (dev only)After starting, visit http://127.0.0.1:8000/docs for the interactive Swagger UI, which FastAPI generates automatically from your code. The --reload flag is for development only — never use it in production.
| Flag | Purpose |
|---|---|
| --reload | Auto-restart on file change (dev) |
| --host 0.0.0.0 | Listen on all interfaces |
| --port 8080 | Custom port (default: 8000) |
| --workers 4 | Multiple worker processes (prod) |
Path parameters are part of the URL path itself, declared with curly braces in the decorator and as function arguments. Query parameters are declared as function arguments that do not appear in the path string — FastAPI reads them from the URL query string automatically.
from fastapi import FastAPI
from enum import Enum
app = FastAPI()
# PATH parameter: part of the URL — /users/42
@app.get("/users/{user_id}")
def get_user(user_id: int): # int type enforced automatically
return {"user_id": user_id}
# QUERY parameters: after ? in URL — /items?skip=0&limit=10
@app.get("/items")
def list_items(
skip: int = 0, # optional, default 0
limit: int = 10, # optional, default 10
search: str | None = None, # fully optional
):
return {"skip": skip, "limit": limit, "search": search}
# Enum path param — validates allowed values
class ModelName(str, Enum):
alexnet = "alexnet"
resnet = "resnet"
@app.get("/models/{model_name}")
def get_model(model_name: ModelName):
return {"model": model_name}| Aspect | Path param | Query param |
|---|---|---|
| URL example | /users/{id} | /users?id=42 |
| Declaration | In path string + function arg | Function arg only (not in path) |
| Required by default? | Yes — URL won't match without it | No if it has a default value |
| Type validation | Yes — automatic via type hint | Yes — automatic via type hint |
Declare a Pydantic BaseModel as a function parameter. FastAPI automatically reads the JSON body, validates it against the model, and provides it as a typed Python object. If validation fails it returns a 422 with details.
from fastapi import FastAPI
from pydantic import BaseModel
from typing import Annotated
from pydantic import Field
app = FastAPI()
class Item(BaseModel):
name: str
description: str | None = None # optional field
price: float
tax: float | None = None
@app.post("/items")
def create_item(item: Item): # body parsed + validated automatically
total = item.price + (item.tax or 0)
return {**item.model_dump(), "total_price": total}
# Mixing path param + query param + body in the same endpoint
@app.put("/items/{item_id}")
def update_item(
item_id: int, # path
q: str | None = None, # query
item: Item | None = None, # body (optional)
):
result = {"item_id": item_id}
if q:
result["q"] = q
if item:
result.update(item.model_dump())
return resultFastAPI resolves the source of each parameter automatically:
| Parameter type | Source |
|---|---|
| Matches a path segment {name} | Path parameter |
| Simple type (int, str, float…) + not in path | Query parameter |
| Pydantic BaseModel subclass | JSON request body |
| Annotated with Body() | Explicitly JSON body |
Pydantic is FastAPI's validation engine. Models are Python classes inheriting from BaseModel where each field is a type-annotated attribute. Pydantic validates on instantiation, raising ValidationError for invalid data. FastAPI catches this and returns a 422 automatically.
from pydantic import BaseModel, Field, field_validator, model_validator
from typing import Annotated
from datetime import datetime
class UserCreate(BaseModel):
username: Annotated[str, Field(min_length=3, max_length=50)]
email: Annotated[str, Field(pattern=r"^[\w.-]+@[\w.-]+\.\w+$")]
age: Annotated[int, Field(ge=18, le=120)] # >= 18 and <= 120
password: str
created_at: datetime = Field(default_factory=datetime.utcnow)
# Field-level validator
@field_validator("username")
@classmethod
def username_alphanumeric(cls, v: str) -> str:
if not v.isalnum():
raise ValueError("Username must be alphanumeric")
return v.lower()
# Cross-field validator (model level)
@model_validator(mode="after")
def passwords_match(self) -> "UserCreate":
# example: confirm_password field check would go here
return self
class UserRead(BaseModel): # separate model for responses (no password)
username: str
email: str
age: int
created_at: datetime
model_config = {"from_attributes": True} # allows ORM object inputCommon Field constraints: min_length, max_length, pattern, ge (≥), gt (>), le (≤), lt (<), multiple_of, min_items, max_items.
Best practice: use separate Pydantic models for input (UserCreate) and output (UserRead) to avoid accidentally exposing sensitive fields like passwords in responses.
The response_model parameter on a route decorator tells FastAPI which Pydantic model to use for filtering and serialising the response. Even if the endpoint returns more data internally, only the fields defined in the response model are included in the JSON output.
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class UserIn(BaseModel):
username: str
password: str
email: str
class UserOut(BaseModel):
username: str
email: str
# password intentionally excluded
# response_model filters the output — password never appears in response
@app.post("/users", response_model=UserOut)
def create_user(user: UserIn) -> UserOut:
# Even if we return the full user object, password is stripped
return user # FastAPI applies UserOut filtering
# response_model_exclude_unset: only include fields explicitly set by caller
@app.get("/items/{id}", response_model=UserOut, response_model_exclude_unset=True)
def get_item(id: int):
return {"username": "alice", "email": "a@b.com"}
# List response
@app.get("/users", response_model=list[UserOut])
def list_users():
return [{"username": "alice", "email": "a@b.com", "password": "secret"}]| Option | Effect |
|---|---|
| response_model=UserOut | Filters output to only UserOut fields |
| response_model_exclude_unset=True | Omits fields not explicitly set (no default-value noise) |
| response_model_exclude_none=True | Omits fields that are None |
| response_model_include={'field'} | Only include specific fields |
| response_model_exclude={'field'} | Exclude specific fields |
FastAPI provides Path() and Query() functions (imported from fastapi) to add metadata and validation constraints to individual parameters — the same constraints available in Pydantic's Field().
from fastapi import FastAPI, Path, Query
from typing import Annotated
app = FastAPI()
@app.get("/items/{item_id}")
def read_item(
item_id: Annotated[int, Path(
title="The ID of the item",
description="Must be a positive integer",
ge=1, # >= 1
le=1_000_000, # <= 1,000,000
)],
q: Annotated[str | None, Query(
min_length=3,
max_length=50,
pattern=r"^[a-z]+$",
alias="search", # URL uses ?search=... instead of ?q=...
deprecated=True, # marks param as deprecated in docs
)] = None,
):
return {"item_id": item_id, "q": q}
# Required query param with no default
@app.get("/search")
def search(
q: Annotated[str, Query(min_length=1)], # required — no default
):
return {"query": q}The Annotated pattern (Python 3.9+) is FastAPI's recommended way to attach metadata. It keeps the type hint clean while allowing rich validation: Annotated[int, Path(ge=1)] means "an int, validated by Path with ge=1".
FastAPI lets you set the default response status code on the decorator, raise HTTPException for errors, and return Response subclasses for full control over headers and body.
from fastapi import FastAPI, HTTPException, status
from fastapi.responses import JSONResponse, Response, FileResponse
app = FastAPI()
fake_db = {1: {"name": "Foo"}, 2: {"name": "Bar"}}
# Set default success status code
@app.post("/items", status_code=status.HTTP_201_CREATED)
def create_item(name: str):
return {"name": name}
# Raise HTTP errors
@app.get("/items/{item_id}")
def get_item(item_id: int):
if item_id not in fake_db:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Item {item_id} not found",
headers={"X-Error": "Item missing"}, # custom header on error
)
return fake_db[item_id]
# Custom JSONResponse for full control
@app.get("/custom")
def custom_response():
return JSONResponse(
status_code=200,
content={"message": "ok"},
headers={"X-Custom": "value"},
)
# 204 No Content (no response body)
@app.delete("/items/{item_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_item(item_id: int):
fake_db.pop(item_id, None)
return Response(status_code=204)| Constant | Value | Meaning |
|---|---|---|
| HTTP_200_OK | 200 | Success |
| HTTP_201_CREATED | 201 | Resource created |
| HTTP_204_NO_CONTENT | 204 | Success, no body |
| HTTP_400_BAD_REQUEST | 400 | Client error |
| HTTP_401_UNAUTHORIZED | 401 | Not authenticated |
| HTTP_403_FORBIDDEN | 403 | Authenticated but not authorised |
| HTTP_404_NOT_FOUND | 404 | Resource not found |
| HTTP_422_UNPROCESSABLE_ENTITY | 422 | Validation error |
FastAPI has a powerful built-in dependency injection (DI) system. You declare dependencies as functions and inject them into route handlers using Depends(). FastAPI resolves the dependency tree automatically, handles async dependencies, and caches results per request.
from fastapi import FastAPI, Depends, HTTPException
from typing import Annotated
app = FastAPI()
# Simple dependency — shared pagination logic
def pagination(skip: int = 0, limit: int = 10):
return {"skip": skip, "limit": limit}
@app.get("/items")
def list_items(pagination: Annotated[dict, Depends(pagination)]):
return {"pagination": pagination}
@app.get("/users")
def list_users(pagination: Annotated[dict, Depends(pagination)]):
return {"pagination": pagination}
# Class-based dependency
class DBSession:
def __init__(self):
self.db = "fake_db_connection" # in real code: create session
def close(self):
pass # close connection
# Generator dependency — enables cleanup with finally
def get_db():
db = DBSession()
try:
yield db.db # yield makes it a context-managed dependency
finally:
db.close() # runs after request completes, even on error
@app.get("/data")
def get_data(db: Annotated[str, Depends(get_db)]):
return {"db": db}
# Nested dependencies
def verify_token(token: str) -> str:
if token != "secret":
raise HTTPException(status_code=401, detail="Invalid token")
return token
def get_current_user(token: Annotated[str, Depends(verify_token)]):
return {"user": "alice", "token": token}Benefits of DI in FastAPI: reusable logic (auth, DB sessions, pagination), easy testing (swap real dependencies for mocks), automatic parameter parsing from query/headers/cookies, and per-request caching (same dependency called once per request by default).
APIRouter is FastAPI's equivalent of Flask Blueprints — it lets you group related routes in separate files, then include them in the main app. This keeps large codebases manageable.
# routers/items.py
from fastapi import APIRouter, Depends
router = APIRouter(
prefix="/items", # all routes here are prefixed
tags=["items"], # Swagger UI grouping label
responses={404: {"description": "Not found"}}, # shared response docs
)
@router.get("/")
def list_items():
return [{"name": "item1"}]
@router.get("/{item_id}")
def get_item(item_id: int):
return {"item_id": item_id}
@router.post("/", status_code=201)
def create_item(name: str):
return {"name": name}# routers/users.py
from fastapi import APIRouter
router = APIRouter(prefix="/users", tags=["users"])
@router.get("/")
def list_users():
return [{"username": "alice"}]# main.py
from fastapi import FastAPI
from routers import items, users
app = FastAPI()
app.include_router(items.router)
app.include_router(users.router)
app.include_router(
users.router,
prefix="/v2", # override prefix for a second version
dependencies=[Depends(verify_admin)], # apply dep to all routes
)
@app.get("/") # root route stays in main.py
def root():
return {"message": "API root"}
Middleware is a function that runs on every request before it reaches a route handler and on every response before it's sent to the client. FastAPI uses Starlette's middleware system — you can add it with @app.middleware('http') or app.add_middleware().
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.gzip import GZipMiddleware
import time
app = FastAPI()
# Custom middleware using decorator
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
start = time.time()
response = await call_next(request) # passes request to next handler
duration = time.time() - start
response.headers["X-Process-Time"] = str(duration)
return response
# CORS — allow cross-origin requests (e.g. from a React frontend)
app.add_middleware(
CORSMiddleware,
allow_origins=["https://myfrontend.com", "http://localhost:3000"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# GZip compression for large responses
app.add_middleware(GZipMiddleware, minimum_size=1000)Common built-in middleware:
| Middleware | Purpose |
|---|---|
| CORSMiddleware | Cross-Origin Resource Sharing headers |
| GZipMiddleware | Compress responses above minimum size |
| HTTPSRedirectMiddleware | Redirect HTTP to HTTPS |
| TrustedHostMiddleware | Block requests with invalid Host headers |
| SessionMiddleware | Cookie-based sessions |
FastAPI supports both async def and plain def route handlers. The choice has real performance implications because of how FastAPI's underlying ASGI server (Uvicorn + Starlette) handles concurrency.
| Handler type | How FastAPI runs it | When to use |
|---|---|---|
| async def | Runs directly on the event loop | I/O-bound async work: awaiting HTTP calls, async DB drivers (asyncpg, motor), async file I/O |
| def (sync) | Runs in a thread pool (executor) to avoid blocking the event loop | CPU-bound work, or libraries that are synchronous only (psycopg2, requests, pandas) |
import asyncio
import httpx
from fastapi import FastAPI
app = FastAPI()
# GOOD: truly async I/O — does not block the event loop
@app.get("/async-data")
async def get_data():
async with httpx.AsyncClient() as client:
response = await client.get("https://api.example.com/data")
return response.json()
# ALSO GOOD: synchronous — FastAPI runs this in a thread pool
@app.get("/sync-data")
def get_data_sync():
import requests # sync library
response = requests.get("https://api.example.com/data")
return response.json()
# DANGEROUS: blocking call inside async def without await
# This blocks the entire event loop, killing concurrency
@app.get("/bad")
async def bad_handler():
import time
time.sleep(5) # NEVER do this in async def
return {"message": "slow"}
# CORRECT: use asyncio.sleep in async context
@app.get("/good-async-wait")
async def good_handler():
await asyncio.sleep(5) # non-blocking
return {"message": "waited"}
Rule of thumb: if your route calls await somewhere, use async def. If your route only calls synchronous libraries, use plain def — FastAPI will correctly run it in a threadpool, keeping the event loop free.
BackgroundTasks let you run work after returning a response to the client — for lightweight fire-and-forget tasks like sending emails or writing audit logs. The response is sent immediately and the task runs afterward in the same process.
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
app = FastAPI()
def send_welcome_email(email: str, username: str):
# Simulate sending an email (could call an email service)
print(f"Sending welcome email to {email} for {username}")
def write_audit_log(action: str, user_id: int):
print(f"Audit: user {user_id} performed {action}")
class UserIn(BaseModel):
username: str
email: str
@app.post("/register", status_code=201)
def register_user(user: UserIn, background_tasks: BackgroundTasks):
# Response is returned immediately
# Email is sent after the response
background_tasks.add_task(
send_welcome_email,
email=user.email,
username=user.username,
)
background_tasks.add_task(write_audit_log, "register", user_id=42)
return {"message": f"User {user.username} created"}
# BackgroundTasks can also be injected via dependencies
def get_background(background_tasks: BackgroundTasks) -> BackgroundTasks:
return background_tasksLimitations: BackgroundTasks run in the same process and event loop — if the server restarts, queued tasks are lost. For heavy, reliable background work use Celery, ARQ, or FastAPI + Redis Queue instead.
FastAPI provides OAuth2PasswordBearer and OAuth2PasswordRequestForm helpers for the standard username/password token flow. The pattern: client POSTs credentials → server returns a JWT → client sends JWT in Authorization: Bearer <token> header on subsequent requests.
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from typing import Annotated
from datetime import datetime, timedelta
import jwt # pip install PyJWT
app = FastAPI()
SECRET_KEY = "your-secret-key" # use a strong random key in production!
ALGORITHM = "HS256"
# Tells FastAPI where clients obtain tokens — used in OpenAPI docs
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token")
# 1. Login endpoint — returns a JWT
@app.post("/token")
def login(form: Annotated[OAuth2PasswordRequestForm, Depends()]):
if form.username != "alice" or form.password != "secret":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
# Create JWT token
payload = {
"sub": form.username,
"exp": datetime.utcnow() + timedelta(minutes=30),
}
token = jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
return {"access_token": token, "token_type": "bearer"}
# 2. Dependency that decodes and validates the token
def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username = payload.get("sub")
if not username:
raise ValueError
except Exception:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return username
# 3. Protected endpoint
@app.get("/me")
def read_me(current_user: Annotated[str, Depends(get_current_user)]):
return {"username": current_user}
FastAPI's dependency injection makes RBAC clean: create a higher-order dependency that checks the current user's role. Inject it into routes that require elevated permissions.
from fastapi import FastAPI, Depends, HTTPException, status
from typing import Annotated
from enum import Enum
app = FastAPI()
class Role(str, Enum):
user = "user"
admin = "admin"
class User:
def __init__(self, username: str, role: Role):
self.username = username
self.role = role
# Simulated auth — in reality decode a JWT
def get_current_user() -> User:
return User(username="alice", role=Role.user)
# Higher-order dependency factory — creates a role checker
def require_role(role: Role):
def checker(user: Annotated[User, Depends(get_current_user)]) -> User:
if user.role != role:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Requires {role} role",
)
return user
return checker
# Public route — any authenticated user
@app.get("/profile")
def profile(user: Annotated[User, Depends(get_current_user)]):
return {"username": user.username, "role": user.role}
# Admin-only route
@app.delete("/users/{user_id}")
def delete_user(
user_id: int,
_: Annotated[User, Depends(require_role(Role.admin))],
):
return {"deleted": user_id}
# Router-level dependency — apply to all routes in a router
from fastapi import APIRouter
admin_router = APIRouter(
prefix="/admin",
dependencies=[Depends(require_role(Role.admin))],
)
@admin_router.get("/stats")
def admin_stats():
return {"total_users": 42}
For async database access, use SQLAlchemy 2.x with AsyncSession and create_async_engine paired with an async driver such as asyncpg (PostgreSQL) or aiosqlite (SQLite). The DB session is injected via a generator dependency.
# pip install sqlalchemy asyncpg
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy import select
from fastapi import FastAPI, Depends
from typing import Annotated
DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/db"
engine = create_async_engine(DATABASE_URL, echo=True)
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
username: Mapped[str] = mapped_column(unique=True)
email: Mapped[str]
# DB dependency — yields session, always closes it
async def get_db() -> AsyncSession:
async with AsyncSessionLocal() as session:
yield session
DBDep = Annotated[AsyncSession, Depends(get_db)]
app = FastAPI()
@app.on_event("startup")
async def startup():
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all) # create tables
@app.get("/users")
async def list_users(db: DBDep):
result = await db.execute(select(User))
users = result.scalars().all()
return users
@app.post("/users", status_code=201)
async def create_user(username: str, email: str, db: DBDep):
user = User(username=username, email=email)
db.add(user)
await db.commit()
await db.refresh(user)
return user
Alembic is the standard database migration tool for SQLAlchemy. It tracks schema changes as versioned migration scripts that can be applied or rolled back. This is essential for production databases — never rely solely on create_all().
# 1. Install and initialise Alembic
# pip install alembic
# alembic init alembic
# This creates alembic/ directory and alembic.ini# alembic/env.py — point to your models and database
from app.database import Base, DATABASE_URL # your app imports
from app import models # noqa: import models so Alembic can see them
# In run_migrations_online():
connectable = create_engine(DATABASE_URL)
# Set target_metadata so Alembic compares your models vs the DB
target_metadata = Base.metadata# 2. Generate a migration after changing a model
alembic revision --autogenerate -m "add users table"
# Creates alembic/versions/xxxx_add_users_table.py
# 3. Apply migrations
alembic upgrade head # apply all pending migrations
alembic upgrade +1 # apply one migration
# 4. Rollback
alembic downgrade -1 # undo one migration
alembic downgrade base # undo ALL migrations
# 5. View history
alembic history # list all migrations
alembic current # show current DB revision# Generated migration file — always review before applying!
def upgrade() -> None:
op.create_table(
"users",
sa.Column("id", sa.Integer(), primary_key=True),
sa.Column("username", sa.String(), nullable=False),
sa.Column("email", sa.String(), nullable=False),
)
def downgrade() -> None:
op.drop_table("users")
FastAPI provides TestClient (wrapping httpx) for synchronous tests and AsyncClient for async tests. Dependencies can be overridden for testing to inject mocks instead of real databases or services.
# app/main.py
from fastapi import FastAPI, Depends
from pydantic import BaseModel
app = FastAPI()
def get_db():
return {"connection": "real_db"}
class Item(BaseModel):
name: str
price: float
@app.get("/")
def root():
return {"message": "Hello"}
@app.post("/items", status_code=201)
def create_item(item: Item, db=Depends(get_db)):
return {"created": item.name, "db": db["connection"]}# tests/test_main.py
import pytest
from fastapi.testclient import TestClient
from app.main import app, get_db
# Override the real DB dependency with a mock
def override_get_db():
return {"connection": "test_db"}
app.dependency_overrides[get_db] = override_get_db
client = TestClient(app)
def test_root():
response = client.get("/")
assert response.status_code == 200
assert response.json() == {"message": "Hello"}
def test_create_item():
response = client.post(
"/items",
json={"name": "Widget", "price": 9.99},
)
assert response.status_code == 201
data = response.json()
assert data["created"] == "Widget"
assert data["db"] == "test_db" # mock dependency used
def test_create_item_invalid():
response = client.post("/items", json={"name": "Widget"}) # missing price
assert response.status_code == 422 # validation error
# Async test with httpx.AsyncClient
import pytest
from httpx import AsyncClient, ASGITransport
@pytest.mark.asyncio
async def test_root_async():
async with AsyncClient(
transport=ASGITransport(app=app), base_url="http://test"
) as ac:
response = await ac.get("/")
assert response.status_code == 200
Use @app.exception_handler(ExceptionClass) to catch specific exception types globally and return custom JSON responses. This is cleaner than wrapping every route in try/except.
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
from starlette.exceptions import HTTPException as StarletteHTTPException
app = FastAPI()
# Custom exception class
class ItemNotFoundError(Exception):
def __init__(self, item_id: int):
self.item_id = item_id
# Handler for our custom exception
@app.exception_handler(ItemNotFoundError)
async def item_not_found_handler(request: Request, exc: ItemNotFoundError):
return JSONResponse(
status_code=404,
content={"error": "not_found", "item_id": exc.item_id},
)
# Override the default validation error handler
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
return JSONResponse(
status_code=422,
content={
"error": "validation_failed",
"detail": exc.errors(),
"body": exc.body,
},
)
# Override generic HTTP exception handler
@app.exception_handler(StarletteHTTPException)
async def http_exception_handler(request: Request, exc: StarletteHTTPException):
return JSONResponse(
status_code=exc.status_code,
content={"error": "http_error", "detail": exc.detail},
)
@app.get("/items/{item_id}")
def get_item(item_id: int):
if item_id == 999:
raise ItemNotFoundError(item_id=item_id) # triggers custom handler
return {"item_id": item_id}
FastAPI handles HTML form data with Form() and file uploads with File() and UploadFile. Note: you need to pip install python-multipart for form/file support.
from fastapi import FastAPI, Form, File, UploadFile
from typing import Annotated
app = FastAPI()
# Form data (application/x-www-form-urlencoded)
@app.post("/login")
def login(
username: Annotated[str, Form()],
password: Annotated[str, Form()],
):
return {"username": username}
# Single file upload
@app.post("/upload")
async def upload_file(file: UploadFile):
contents = await file.read() # bytes
return {
"filename": file.filename,
"content_type": file.content_type,
"size": len(contents),
}
# Multiple files
@app.post("/upload-multiple")
async def upload_files(files: list[UploadFile]):
return [{"filename": f.filename} for f in files]
# Mix form fields + file
@app.post("/profile")
async def update_profile(
username: Annotated[str, Form()],
avatar: UploadFile | None = None,
):
result = {"username": username}
if avatar:
content = await avatar.read()
# Save to disk / cloud storage
result["avatar_size"] = len(content)
return result
# Validate file type and size
@app.post("/images")
async def upload_image(file: UploadFile):
if file.content_type not in ["image/jpeg", "image/png"]:
from fastapi import HTTPException
raise HTTPException(400, "Only JPEG/PNG allowed")
content = await file.read()
if len(content) > 5 * 1024 * 1024: # 5 MB limit
raise HTTPException(400, "File too large")
return {"filename": file.filename}
Use pydantic-settings to define a typed settings class that reads from environment variables and .env files. Inject settings via a dependency so they can be overridden in tests.
# pip install pydantic-settings
from pydantic_settings import BaseSettings, SettingsConfigDict
from functools import lru_cache
from fastapi import FastAPI, Depends
from typing import Annotated
class Settings(BaseSettings):
app_name: str = "My API"
debug: bool = False
db_url: str = "sqlite:///./test.db"
secret_key: str
jwt_expire_minutes: int = 30
model_config = SettingsConfigDict(
env_file=".env", # read from .env file
env_file_encoding="utf-8",
case_sensitive=False, # DB_URL and db_url both work
)
@lru_cache # singleton — reads .env once, cached for app lifetime
def get_settings() -> Settings:
return Settings()
SettingsDep = Annotated[Settings, Depends(get_settings)]
app = FastAPI()
@app.get("/info")
def app_info(settings: SettingsDep):
return {"app_name": settings.app_name, "debug": settings.debug}# .env file (never commit to git)
SECRET_KEY=my-super-secret-key-here
DB_URL=postgresql+asyncpg://user:pass@localhost/mydb
DEBUG=false# tests — override settings easily
from app.main import app, get_settings
from app.config import Settings
def override_settings():
return Settings(secret_key="test-key", db_url="sqlite:///./test.db")
app.dependency_overrides[get_settings] = override_settings
FastAPI supports a lifespan context manager (introduced in FastAPI 0.93, replaces the deprecated on_event decorators) for running code at application startup and shutdown — e.g. creating DB connection pools, loading ML models, or connecting to message brokers.
from contextlib import asynccontextmanager
from fastapi import FastAPI
import asyncpg
# Module-level storage for shared resources
db_pool: asyncpg.Pool | None = None
@asynccontextmanager
async def lifespan(app: FastAPI):
# --- STARTUP: runs before the app starts serving requests ---
global db_pool
db_pool = await asyncpg.create_pool(
"postgresql://user:pass@localhost/db",
min_size=5,
max_size=20,
)
print("Database pool created")
yield # app runs here
# --- SHUTDOWN: runs after last request is processed ---
await db_pool.close()
print("Database pool closed")
app = FastAPI(lifespan=lifespan)
@app.get("/users")
async def list_users():
async with db_pool.acquire() as conn:
rows = await conn.fetch("SELECT id, username FROM users")
return [dict(r) for r in rows]Common startup tasks: create DB connection pools, initialise Redis clients, load ML models into memory, connect to message brokers (Kafka, RabbitMQ).
Common shutdown tasks: close DB pools, flush metrics, close broker connections, gracefully drain request queues.
FastAPI supports WebSockets natively via the WebSocket parameter type. Use await websocket.accept() to establish the connection, then loop to send and receive messages.
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List
app = FastAPI()
# Simple echo WebSocket
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Echo: {data}")
except WebSocketDisconnect:
print("Client disconnected")
# Connection manager for broadcast (e.g. chat room)
class ConnectionManager:
def __init__(self):
self.active: List[WebSocket] = []
async def connect(self, ws: WebSocket):
await ws.accept()
self.active.append(ws)
def disconnect(self, ws: WebSocket):
self.active.remove(ws)
async def broadcast(self, message: str):
for ws in self.active:
await ws.send_text(message)
manager = ConnectionManager()
@app.websocket("/chat/{room}")
async def chat(websocket: WebSocket, room: str):
await manager.connect(websocket)
try:
while True:
msg = await websocket.receive_text()
await manager.broadcast(f"[{room}] {msg}")
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(f"A user left {room}")WebSocket message types: receive_text() / send_text() for strings, receive_bytes() / send_bytes() for binary, receive_json() / send_json() for JSON.
Containerising FastAPI with Docker ensures consistent environments across development, staging, and production. Use a multi-stage build to keep the production image small, and run with Gunicorn + Uvicorn workers for production-grade concurrency.
# Dockerfile
FROM python:3.12-slim AS base
WORKDIR /app
# Install dependencies first (layer caching)
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Create non-root user for security
RUN adduser --disabled-password --gecos "" appuser
USER appuser
EXPOSE 8000
# Production: Gunicorn manages multiple Uvicorn worker processes
CMD ["gunicorn", "main:app",\
"--workers", "4",\
"--worker-class", "uvicorn.workers.UvicornWorker",\
"--bind", "0.0.0.0:8000"]# docker-compose.yml
version: "3.9"
services:
api:
build: .
ports:
- "8000:8000"
environment:
- DATABASE_URL=postgresql+asyncpg://user:pass@db/mydb
- SECRET_KEY=${SECRET_KEY} # from .env
depends_on:
db:
condition: service_healthy
restart: unless-stopped
db:
image: postgres:16
environment:
POSTGRES_USER: user
POSTGRES_PASSWORD: pass
POSTGRES_DB: mydb
healthcheck:
test: ["CMD-SHELL", "pg_isready -U user -d mydb"]
interval: 5s
retries: 5
volumes:
- pgdata:/var/lib/postgresql/data
volumes:
pgdata:| CPUs | Recommended workers (2*CPU+1) |
|---|---|
| 1 | 3 |
| 2 | 5 |
| 4 | 9 |
| 8 | 17 |
Production FastAPI deployments involve several layers beyond just running the app — a reverse proxy, TLS termination, process management, health checks, and observability.
| Concern | Solution |
|---|---|
| Multiple CPU cores | Gunicorn + UvicornWorker, or multiple containers |
| HTTPS / TLS | Nginx or Traefik as reverse proxy with Let's Encrypt |
| Environment secrets | Environment variables / secrets manager (Vault, AWS Secrets Manager) |
| Health checks | /health endpoint + Docker/Kubernetes probes |
| Logging | Structured JSON logs (structlog), forward to Datadog/Loki |
| Metrics | Prometheus + Grafana, or OpenTelemetry |
| Zero-downtime deploys | Rolling updates in Kubernetes, or blue/green |
| Database migrations | Run alembic upgrade head as an init container |
| Rate limiting | Nginx rate limiting or slowapi middleware |
# Health check endpoint
from fastapi import FastAPI
from sqlalchemy import text
app = FastAPI()
@app.get("/health")
async def health(db: DBDep):
try:
await db.execute(text("SELECT 1"))
db_ok = True
except Exception:
db_ok = False
return {
"status": "healthy" if db_ok else "degraded",
"database": "ok" if db_ok else "error",
}
# Nginx config snippet for reverse proxy
# server {
# listen 443 ssl;
# server_name api.example.com;
# location / {
# proxy_pass http://localhost:8000;
# proxy_set_header Host $host;
# proxy_set_header X-Real-IP $remote_addr;
# }
# }# slowapi — rate limiting middleware
# pip install slowapi
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@app.get("/limited")
@limiter.limit("10/minute")
async def limited_route(request: Request):
return {"message": "ok"}
FastAPI 0.100+ fully supports Pydantic v2, which is a complete rewrite in Rust offering 5–50× speed improvements. Several APIs changed between v1 and v2.
| v1 (old) | v2 (current) | Notes |
|---|---|---|
| .dict() | .model_dump() | Serialise model to dict |
| .json() | .model_dump_json() | Serialise to JSON string |
| .parse_obj() | .model_validate() | Create model from dict |
| .schema() | .model_json_schema() | Get JSON schema |
| @validator | @field_validator | Field-level validation |
| @root_validator | @model_validator | Model-level validation |
| class Config: | model_config = ConfigDict() | Model configuration |
| orm_mode=True | from_attributes=True | Enable ORM object input |
from pydantic import BaseModel, field_validator, model_validator, ConfigDict
from typing import Self
class OrderItem(BaseModel):
product_id: int
quantity: int
unit_price: float
@field_validator("quantity")
@classmethod
def quantity_positive(cls, v: int) -> int:
if v <= 0:
raise ValueError("quantity must be positive")
return v
class Order(BaseModel):
items: list[OrderItem]
discount: float = 0.0
model_config = ConfigDict(from_attributes=True) # replaces orm_mode
@model_validator(mode="after")
def discount_valid(self) -> Self:
if self.discount < 0 or self.discount > 1:
raise ValueError("discount must be between 0 and 1")
return self
@property
def total(self) -> float:
subtotal = sum(i.quantity * i.unit_price for i in self.items)
return round(subtotal * (1 - self.discount), 2)
# Serialisation
order = Order(items=[OrderItem(product_id=1, quantity=2, unit_price=9.99)])
print(order.model_dump()) # dict
print(order.model_dump_json()) # JSON bytes
print(order.model_json_schema()) # JSON Schema
FastAPI has no built-in cache, but several patterns work well: in-process caching with functools.lru_cache / cachetools, Redis-backed caching with fastapi-cache2, and HTTP caching headers for client-side caching.
# Option 1: In-process LRU cache for slow dependencies
from functools import lru_cache
from fastapi import FastAPI, Depends
from typing import Annotated
app = FastAPI()
@lru_cache(maxsize=128)
def get_config_from_db(key: str) -> str:
# Called once per unique key, then cached in memory
return f"value_for_{key}"
@app.get("/config/{key}")
def get_config(key: str):
return {"value": get_config_from_db(key)}# Option 2: Redis cache with fastapi-cache2
# pip install fastapi-cache2[redis]
from fastapi_cache import FastAPICache
from fastapi_cache.backends.redis import RedisBackend
from fastapi_cache.decorator import cache
from redis import asyncio as aioredis
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app: FastAPI):
redis = aioredis.from_url("redis://localhost")
FastAPICache.init(RedisBackend(redis), prefix="myapp-cache")
yield
app = FastAPI(lifespan=lifespan)
@app.get("/expensive")
@cache(expire=60) # cache for 60 seconds in Redis
async def expensive_operation():
import asyncio
await asyncio.sleep(2) # simulate slow DB query
return {"result": "computed"}
# Option 3: HTTP Cache-Control headers
from fastapi.responses import Response
@app.get("/public-data")
def public_data(response: Response):
response.headers["Cache-Control"] = "public, max-age=3600"
return {"data": "cacheable by CDN and browsers"}
FastAPI auto-generates OpenAPI 3.x specs from your code. You can enrich the docs with metadata, examples, tags, and custom descriptions without any extra tooling.
from fastapi import FastAPI
from fastapi.openapi.utils import get_openapi
from pydantic import BaseModel, Field
# App-level metadata
app = FastAPI(
title="My Inventory API",
description="## Manage your warehouse inventory\n\nSupports CRUD for items and categories.",
version="2.1.0",
contact={"name": "Dev Team", "email": "dev@example.com"},
license_info={"name": "MIT"},
terms_of_service="https://example.com/terms",
docs_url="/docs", # Swagger UI path
redoc_url="/redoc", # ReDoc path
openapi_url="/openapi.json",
)
class Item(BaseModel):
name: str = Field(
...,
examples=["Widget"],
description="Human-readable item name",
)
price: float = Field(
..., gt=0, examples=[9.99],
description="Price in USD, must be positive",
)
@app.post(
"/items",
summary="Create a new inventory item",
description="Creates an item and returns it with its assigned ID.",
response_description="The created item including its database ID",
tags=["inventory"],
status_code=201,
)
def create_item(item: Item) -> Item:
return item
# Disable docs in production (optional)
# app = FastAPI(docs_url=None, redoc_url=None)
FastAPI's built-in BackgroundTasks is suitable for lightweight, lossy tasks. For tasks that must be reliable, retried, scheduled, or distributed across workers, use Celery with a broker (Redis or RabbitMQ).
# pip install celery redis
# celery_app.py
from celery import Celery
celery_app = Celery(
"worker",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/1",
)
celery_app.conf.task_serializer = "json"
@celery_app.task(bind=True, max_retries=3, default_retry_delay=60)
def send_email(self, email: str, subject: str, body: str):
try:
# call email service
print(f"Sending {subject} to {email}")
except Exception as exc:
raise self.retry(exc=exc) # retry up to 3 times# main.py — FastAPI triggers Celery tasks
from fastapi import FastAPI
from pydantic import BaseModel
from celery_app import send_email
app = FastAPI()
class EmailRequest(BaseModel):
to: str
subject: str
body: str
@app.post("/send-email", status_code=202)
def trigger_email(req: EmailRequest):
# .delay() sends the task to the broker queue
task = send_email.delay(req.to, req.subject, req.body)
return {"task_id": task.id, "status": "queued"}
@app.get("/tasks/{task_id}")
def task_status(task_id: str):
from celery.result import AsyncResult
result = AsyncResult(task_id)
return {"task_id": task_id, "status": result.status}# Run the Celery worker (separate process)
# celery -A celery_app worker --loglevel=info
FastAPI is already one of the fastest Python frameworks, but real-world performance depends on database queries, serialisation, and concurrency patterns. These are the key tools and techniques.
# 1. Measure with locust load testing
# pip install locust
# locustfile.py
from locust import HttpUser, task, between
class APIUser(HttpUser):
wait_time = between(0.1, 1)
@task(3)
def list_items(self):
self.client.get("/items")
@task(1)
def create_item(self):
self.client.post("/items", json={"name": "test", "price": 1.0})
# locust -f locustfile.py --host=http://localhost:8000# 2. Add request timing middleware
import time
from fastapi import FastAPI, Request
app = FastAPI()
@app.middleware("http")
async def timing_middleware(request: Request, call_next):
start = time.perf_counter()
response = await call_next(request)
elapsed = time.perf_counter() - start
response.headers["X-Response-Time"] = f"{elapsed:.4f}s"
if elapsed > 0.5: # log slow requests
print(f"SLOW: {request.url.path} took {elapsed:.3f}s")
return response| Area | Tip |
|---|---|
| DB queries | Use async SQLAlchemy, add indexes, use select_in_loading for relationships |
| Serialisation | Pydantic v2 is ~5x faster than v1; avoid model_dump() in hot paths |
| Concurrency | Use async def for I/O-bound routes; avoid blocking calls in async context |
| Connection pools | Set pool size based on worker count × connections-per-worker |
| Response size | Use response_model_exclude_unset=True to reduce payload size |
| Caching | Cache expensive queries with Redis; use HTTP Cache-Control headers |
Dependencies can be classes with a __call__ method, enabling stateful or configurable dependencies. Sub-dependencies are automatically resolved — FastAPI builds the full dependency tree per request.
from fastapi import FastAPI, Depends, HTTPException
from typing import Annotated
app = FastAPI()
# Class-based dependency — holds configuration
class PaginationParams:
def __init__(
self,
skip: int = 0,
limit: int = 10,
max_limit: int = 100,
):
if limit > max_limit:
raise HTTPException(400, f"limit cannot exceed {max_limit}")
self.skip = skip
self.limit = limit
# Sub-dependency tree:
# get_current_user → verify_token → oauth2_scheme
def verify_token(token: str = Depends(oauth2_scheme)) -> dict:
if token != "valid":
raise HTTPException(401, "Bad token")
return {"username": "alice", "role": "admin"}
def get_current_user(
payload: dict = Depends(verify_token),
) -> dict:
return {"username": payload["username"]}
Pagination = Annotated[PaginationParams, Depends(PaginationParams)]
CurrentUser = Annotated[dict, Depends(get_current_user)]
@app.get("/items")
def list_items(page: Pagination, user: CurrentUser):
return {
"user": user["username"],
"skip": page.skip,
"limit": page.limit,
}
# Dependency with use_cache=False — new instance per call
@app.get("/no-cache")
def no_cache(
a: Annotated[PaginationParams, Depends(PaginationParams, use_cache=False)],
b: Annotated[PaginationParams, Depends(PaginationParams, use_cache=False)],
):
# a and b are separate instances (default: same instance per request)
return {"a_skip": a.skip, "b_skip": b.skip}
Use httpx.AsyncClient with ASGITransport for async tests, and pytest-asyncio to run async test functions. Combine with app.dependency_overrides to mock async dependencies.
# pip install pytest pytest-asyncio httpx
# conftest.py
import pytest
from httpx import AsyncClient, ASGITransport
from app.main import app, get_db
from app.database import AsyncSessionLocal, Base, engine
@pytest.fixture(scope="function")
async def async_client():
# Use a test database
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async def override_db():
async with AsyncSessionLocal() as session:
yield session
app.dependency_overrides[get_db] = override_db
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test",
) as client:
yield client
app.dependency_overrides.clear()
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
# tests/test_users.py
import pytest
@pytest.mark.asyncio
async def test_create_user(async_client):
response = await async_client.post(
"/users",
json={"username": "alice", "email": "alice@example.com"},
)
assert response.status_code == 201
data = response.json()
assert data["username"] == "alice"
assert "password" not in data # response_model excludes it
@pytest.mark.asyncio
async def test_list_users_empty(async_client):
response = await async_client.get("/users")
assert response.status_code == 200
assert response.json() == []
@pytest.mark.asyncio
async def test_validation_error(async_client):
response = await async_client.post("/users", json={"username": "ab"}) # too short
assert response.status_code == 422
errors = response.json()["detail"]
assert any("username" in str(e) for e in errors)
# pytest.ini
[pytest]
asyncio_mode = auto # or use @pytest.mark.asyncio on each test
StreamingResponse lets you send data incrementally — essential for large file downloads, CSV exports, real-time data feeds, or AI token streaming — without loading everything into memory first.
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
import csv
import io
app = FastAPI()
# Stream a large CSV without loading it all into memory
@app.get("/export/users")
async def export_users():
async def generate_csv():
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(["id", "username", "email"]) # header
yield output.getvalue()
output.seek(0)
output.truncate(0)
for i in range(1_000_000): # stream 1M rows
writer.writerow([i, f"user{i}", f"user{i}@example.com"])
if i % 1000 == 0: # flush every 1000 rows
yield output.getvalue()
output.seek(0)
output.truncate(0)
await asyncio.sleep(0) # yield control to event loop
return StreamingResponse(
generate_csv(),
media_type="text/csv",
headers={"Content-Disposition": "attachment; filename=users.csv"},
)
# Server-Sent Events (SSE) for real-time updates
@app.get("/events")
async def event_stream():
async def generate():
for i in range(10):
yield f"data: Event {i}\n\n" # SSE format
await asyncio.sleep(1)
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache"},
)
# Stream a file from disk
@app.get("/download/{filename}")
async def download_file(filename: str):
def iterfile():
with open(f"/data/{filename}", "rb") as f:
yield from f # yields chunks as they are read
return StreamingResponse(iterfile(), media_type="application/octet-stream")
Strawberry is a Python-first GraphQL library that uses type hints and dataclasses — a natural fit alongside FastAPI and Pydantic. It mounts a GraphQL endpoint on the FastAPI app.
# pip install strawberry-graphql[fastapi]
import strawberry
from strawberry.fastapi import GraphQLRouter
from fastapi import FastAPI
from typing import Optional
# Define types with Strawberry (type hints = schema)
@strawberry.type
class User:
id: int
username: str
email: str
fake_users = [
User(id=1, username="alice", email="alice@example.com"),
User(id=2, username="bob", email="bob@example.com"),
]
# Query type
@strawberry.type
class Query:
@strawberry.field
def users(self) -> list[User]:
return fake_users
@strawberry.field
def user(self, id: int) -> Optional[User]:
return next((u for u in fake_users if u.id == id), None)
# Mutation type
@strawberry.type
class Mutation:
@strawberry.mutation
def create_user(self, username: str, email: str) -> User:
new_user = User(id=len(fake_users)+1, username=username, email=email)
fake_users.append(new_user)
return new_user
schema = strawberry.Schema(query=Query, mutation=Mutation)
graphql_app = GraphQLRouter(schema)
app = FastAPI()
app.include_router(graphql_app, prefix="/graphql")
# REST routes coexist happily
@app.get("/health")
def health():
return {"status": "ok"}
When a request fails Pydantic validation, FastAPI automatically returns a 422 Unprocessable Entity with a structured JSON body listing every field error. You can override this default behaviour with a custom exception handler.
from fastapi import FastAPI, Request
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field
app = FastAPI()
# Default 422 response body shape from FastAPI:
# {
# "detail": [
# {
# "type": "missing",
# "loc": ["body", "price"],
# "msg": "Field required",
# "input": {"name": "Widget"},
# "url": "https://errors.pydantic.dev/..."
# }
# ]
# }
# Custom error handler — reshape the response
@app.exception_handler(RequestValidationError)
async def custom_validation_handler(
request: Request, exc: RequestValidationError
) -> JSONResponse:
errors = []
for error in exc.errors():
field = " -> ".join(str(loc) for loc in error["loc"])
errors.append({"field": field, "message": error["msg"]})
return JSONResponse(
status_code=422,
content={
"success": False,
"errors": errors,
},
)
class Product(BaseModel):
name: str = Field(min_length=1)
price: float = Field(gt=0)
stock: int = Field(ge=0)
@app.post("/products")
def create_product(product: Product):
return {"success": True, "product": product.model_dump()}
# Request: POST /products {}
# Custom response:
# {
# "success": false,
# "errors": [
# {"field": "body -> name", "message": "Field required"},
# {"field": "body -> price", "message": "Field required"},
# {"field": "body -> stock", "message": "Field required"}
# ]
# }
By default FastAPI dependencies are request-scoped — a new instance is created per request. For shared state (connection pools, caches, ML models) use module-level variables initialised in the lifespan context manager.
from contextlib import asynccontextmanager
from fastapi import FastAPI, Depends
from typing import Annotated
import asyncpg
# Module-level: shared across all requests
db_pool: asyncpg.Pool | None = None
ml_model = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global db_pool, ml_model
# Create once at startup — shared by all workers in same process
db_pool = await asyncpg.create_pool("postgresql://localhost/db", min_size=5)
# ml_model = load_model("model.pkl") # load once
yield # requests are served here
await db_pool.close()
app = FastAPI(lifespan=lifespan)
# Request-scoped dependency — new connection per request from the pool
async def get_conn():
async with db_pool.acquire() as conn: # borrows from the shared pool
yield conn # returned to pool after request
Conn = Annotated[asyncpg.Connection, Depends(get_conn)]
@app.get("/users")
async def list_users(conn: Conn):
rows = await conn.fetch("SELECT id, username FROM users")
return [dict(r) for r in rows]
# Singleton dependency — same instance for all requests (use_cache=True, default)
class AppState:
def __init__(self):
self.request_count = 0 # not thread-safe — example only
app_state = AppState()
def get_app_state() -> AppState:
return app_state # same object every request
@app.get("/stats")
def stats(state: Annotated[AppState, Depends(get_app_state)]):
state.request_count += 1
return {"total_requests": state.request_count}
FastAPI provides Header() and Cookie() parameter types. Headers are automatically converted from HTTP's hyphen-case to Python's snake_case. You can also set cookies on responses.
from fastapi import FastAPI, Header, Cookie, Response
from typing import Annotated
app = FastAPI()
# Read request headers
@app.get("/headers")
def read_headers(
user_agent: Annotated[str | None, Header()] = None,
accept_language: Annotated[str | None, Header()] = None,
x_api_key: Annotated[str | None, Header()] = None, # X-Api-Key header
):
# FastAPI converts Accept-Language → accept_language automatically
return {
"user_agent": user_agent,
"accept_language": accept_language,
"x_api_key": x_api_key,
}
# Duplicate headers → list
@app.get("/multi-header")
def multi_header(
x_token: Annotated[list[str] | None, Header()] = None,
):
return {"x_token": x_token} # all values of X-Token header
# Read cookies
@app.get("/profile")
def read_profile(
session_id: Annotated[str | None, Cookie()] = None,
):
return {"session_id": session_id}
# Set a cookie in the response
@app.post("/login")
def login(response: Response, username: str):
response.set_cookie(
key="session_id",
value=f"session_{username}",
httponly=True, # not accessible via JS
secure=True, # HTTPS only
samesite="lax", # CSRF protection
max_age=3600, # 1 hour
)
return {"message": "Logged in"}
# Delete a cookie
@app.post("/logout")
def logout(response: Response):
response.delete_cookie("session_id")
return {"message": "Logged out"}
A well-architected FastAPI project follows consistent patterns across structure, validation, security, and operations. Here is a consolidated reference.
| Area | Best Practice |
|---|---|
| Project structure | Separate concerns: routers/, models/, schemas/, dependencies/, services/ |
| Pydantic models | Separate Input/Output models; never return password fields; use Field() for constraints |
| Dependencies | Use Depends() for DB sessions, auth, pagination — keep routes thin |
| Authentication | JWT with short expiry + refresh tokens; hash passwords with bcrypt/argon2 |
| Database | Async SQLAlchemy + asyncpg; connection pool sized to workers; run Alembic in CI/CD |
| Error handling | Custom exception handlers for consistent error format; never expose stack traces |
| Testing | 100% route coverage with TestClient; override dependencies for isolation |
| Configuration | pydantic-settings + .env; never hardcode secrets; use secret managers in prod |
| Deployment | Gunicorn + UvicornWorker; non-root Docker user; health check endpoint |
| Observability | Structured JSON logs; /metrics for Prometheus; distributed tracing with OpenTelemetry |
| Docs | Meaningful summaries/descriptions; hide /docs in production; version the API |
# Recommended project layout
# .
# ├── app/
# │ ├── main.py # FastAPI() instance, lifespan, include_router
# │ ├── config.py # pydantic-settings Settings class
# │ ├── database.py # engine, AsyncSessionLocal, Base
# │ ├── dependencies.py # get_db, get_current_user, pagination
# │ ├── routers/
# │ │ ├── users.py # APIRouter for /users
# │ │ └── items.py # APIRouter for /items
# │ ├── models/
# │ │ └── user.py # SQLAlchemy ORM models
# │ ├── schemas/
# │ │ └── user.py # Pydantic in/out schemas
# │ └── services/
# │ └── user_service.py # business logic, DB queries
# ├── tests/
# │ ├── conftest.py # fixtures, TestClient
# │ └── test_users.py
# ├── alembic/ # migrations
# ├── Dockerfile
# ├── docker-compose.yml
# └── requirements.txt
