AI / LangGraph LangChain Interview questions
LangChain is an open-source framework for building applications powered by large language models (LLMs). It provides composable abstractions — Models, Prompts, Chains, Agents, Memory, and Tools — that make it practical to connect LLMs with external data and systems without writing all the integration plumbing from scratch.
The framework is built around several key abstractions. Models give a unified interface to LLMs such as OpenAI, Anthropic, and Google regardless of their individual APIs. Prompts are templates that format inputs before they reach the model. Chains sequence calls to models, tools, or other Runnables. Agents let the LLM decide which tools to call and in what order. Memory stores conversation context so later turns can reference earlier ones. Tools are callable functions — web search, calculators, database queries — that models can invoke.
LangChain's declarative composition syntax, LCEL, uses the pipe operator | to connect components: prompt | model | parser. The ecosystem extends to LangSmith (tracing and evaluation), LangServe (REST deployment), and LangGraph (stateful multi-actor graph applications). Python and JavaScript/TypeScript are both supported.
LCEL (LangChain Expression Language) is a declarative syntax for composing chains in LangChain using the pipe operator |. It connects Runnable objects so the output of one becomes the input of the next, making multi-step LLM workflows readable and concise.
The core building block is the Runnable interface. Any component that implements invoke(), stream(), and batch() can participate in an LCEL chain. A typical example:
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
prompt = ChatPromptTemplate.from_template("Translate to French: {text}")
model = ChatOpenAI()
parser = StrOutputParser()
chain = prompt | model | parser
result = chain.invoke({"text": "Hello, world!"})
LCEL provides automatic streaming, parallel execution via RunnableParallel, passthrough of values with RunnablePassthrough, custom function wrapping with RunnableLambda, and fallback chains with .with_fallbacks(). Every chain invocation is traced in LangSmith without extra setup.
LangChain is organised around six core abstractions that cover the full lifecycle of an LLM application:
- Models — A unified interface to LLMs (text-in/text-out) and Chat Models (message-in/message-out), as well as Embedding models for vector representations. Supported providers include OpenAI, Anthropic, Google, Cohere, and dozens of open-source models.
- Prompts —
PromptTemplateandChatPromptTemplateformat inputs before they reach a model. They support variable substitution, partial templates, and few-shot examples. - Chains — Sequences of operations that combine prompts, models, retrievers, and tools. LCEL is the modern way to compose them using the
|operator. - Agents — Systems where an LLM decides which tools to call and in what order by reasoning through a ReAct (Reason + Act) loop until it reaches a final answer.
- Memory — Mechanisms to persist state between calls in a conversation: buffer memory stores the full history, summary memory compresses it, window memory keeps the last N turns.
- Tools & Toolkits — Functions that agents can call: web search, code execution, database queries, REST APIs, and custom business logic. Toolkits bundle related tools together (e.g., SQLDatabaseToolkit, GitHubToolkit).
Additionally, Document Loaders ingest data from PDFs, websites, CSVs, and databases; Text Splitters chunk documents for vector indexing; and Vector Stores (FAISS, Chroma, Pinecone) enable semantic search that feeds into Retrieval-Augmented Generation (RAG) pipelines.
Traditional LLM integration means calling an LLM's HTTP API directly: you construct a prompt string by hand, send a requests.post(), parse the JSON response, and manage conversation history as a list you track yourself. Each provider has a different SDK, different error codes, and different retry behaviour. When you need RAG, you wire vector store calls separately; when you need tools, you parse the model's text output to decide what to call next.
| Concern | Traditional API Integration | LangChain |
|---|---|---|
| Provider switching | Rewrite code per provider SDK | Swap model class, keep same chain |
| Prompt management | Manual string concatenation | PromptTemplate with typed variables |
| Conversation history | Manual list tracking | Memory classes handle automatically |
| Tool/function calling | Custom parsing logic per use case | Agents + Tools framework |
| RAG pipeline | Separate vector DB code + manual retrieval | Retriever + LCEL pipe |
| Retry & fallback | Custom retry logic | Built-in .with_retry() / .with_fallbacks() |
| Observability | Custom logging | LangSmith tracing built-in |
The key difference is composability. LangChain treats every component — model, prompt, retriever, parser — as a Runnable with a consistent interface. You can swap, chain, parallelize, or add fallbacks without touching unrelated code.
A Runnable is the core interface in LangChain that every composable component implements. If something is a Runnable, it can be connected with |, batched, streamed, retried, and traced — regardless of whether it's a prompt template, an LLM, a retriever, or a custom Python function.
Every Runnable exposes these standard methods:
invoke(input)— single synchronous call, returns one outputbatch([input1, input2, ...])— processes multiple inputs, returns list of outputsstream(input)— yields output chunks as they arrive (useful for token streaming)ainvoke() / abatch() / astream()— async equivalents of the aboveastream_events()— fine-grained async event stream (tool calls, LLM tokens, etc.)
LangChain ships several utility Runnables: RunnablePassthrough passes input unchanged (or adds extra fields), RunnableLambda wraps any Python function as a Runnable, RunnableParallel runs multiple branches concurrently and merges their outputs into a dict, and RunnableBranch routes input to different Runnables based on conditions. These combine with LCEL pipes to build arbitrarily complex workflows.
LangChain is distributed as several pip packages. The minimal install for OpenAI-backed applications is:
pip install langchain langchain-openai
# For community integrations (vector stores, loaders, etc.):
pip install langchain-community
# For serving with REST API:
pip install langserve fastapi uvicorn
API credentials are passed through environment variables so they never appear in source code:
export OPENAI_API_KEY="sk-..." # OpenAI
export ANTHROPIC_API_KEY="..." # Anthropic / Claude
export LANGCHAIN_TRACING_V2="true" # Enable LangSmith tracing
export LANGCHAIN_API_KEY="ls__..." # LangSmith API key
export LANGCHAIN_PROJECT="my-project" # LangSmith project name
A minimal "hello world" with LangChain:
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
llm = ChatOpenAI(model="gpt-4o-mini")
response = llm.invoke([HumanMessage(content="What is 2 + 2?")])
print(response.content) # "4"
The package split is intentional: langchain-core contains stable base abstractions, langchain contains orchestration logic, langchain-openai and similar provider packages contain model integrations, and langchain-community contains third-party integrations that move faster.
ChatModels in LangChain are LLM wrappers that communicate using a message-based format. Instead of passing a raw string, you pass a list of typed messages: SystemMessage, HumanMessage, and AIMessage. This maps directly to the roles used by OpenAI, Anthropic, and similar APIs.
from langchain_openai import ChatOpenAI
from langchain_core.messages import SystemMessage, HumanMessage
llm = ChatOpenAI(
model="gpt-4o",
temperature=0.7,
max_tokens=512,
)
messages = [
SystemMessage(content="You are a helpful Python tutor."),
HumanMessage(content="Explain list comprehensions in Python."),
]
response = llm.invoke(messages)
print(response.content) # AIMessage with text response
print(response.usage_metadata) # token counts
ChatModels also support streaming so you can print tokens as they arrive:
for chunk in llm.stream(messages):
print(chunk.content, end="", flush=True)
Other providers follow the same API: ChatAnthropic, ChatGoogleGenerativeAI, ChatMistralAI. Switching providers requires only changing the import and class name; the rest of the chain remains identical.
PromptTemplates are objects that format dynamic inputs into the correct structure before passing them to a model. Instead of building prompt strings with f-strings scattered across your codebase, templates give you reusable, testable, versionable prompt construction with named variables.
There are two main types:
- PromptTemplate — produces a plain text string. Best for LLMs (non-chat models):
from langchain_core.prompts import PromptTemplate
pt = PromptTemplate.from_template("Summarise this in {n} sentences: {text}")
print(pt.format(n=2, text="LangChain is..."))
- ChatPromptTemplate — produces a list of typed messages. Best for Chat Models:
from langchain_core.prompts import ChatPromptTemplate
chat_pt = ChatPromptTemplate.from_messages([
("system", "You are a {role}."),
("human", "{user_input}"),
])
messages = chat_pt.format_messages(role="poet", user_input="Write about the sea.")
MessagesPlaceholder is used inside a ChatPromptTemplate to insert a variable-length list of messages — useful for injecting conversation history. partial() lets you pre-fill some variables while leaving others to be filled at call time, which is handy for re-usable templates across different contexts.
Output parsers sit at the end of a chain and transform the raw text or message returned by an LLM into a more structured or usable form. Without a parser, chain.invoke() returns an AIMessage object; with a parser, you get a plain string, a Python dict, a validated Pydantic model, or a list — whatever your downstream code expects.
The most common parsers:
- StrOutputParser — extracts
.contentfrom an AIMessage, returns a string. Used in virtually every chain:prompt | model | StrOutputParser() - JsonOutputParser — parses the model's text as JSON and returns a Python dict. Works best when the prompt instructs the model to return valid JSON.
- PydanticOutputParser — validates parsed JSON against a Pydantic schema. The parser injects format instructions into the prompt automatically via
parser.get_format_instructions(). - CommaSeparatedListOutputParser — splits a comma-delimited response into a Python list.
- StructuredOutputParser — uses a JSON schema for more flexible structured output.
from langchain_core.output_parsers import JsonOutputParser
from pydantic import BaseModel
class Person(BaseModel):
name: str
age: int
parser = JsonOutputParser(pydantic_object=Person)
chain = prompt | model | parser
result = chain.invoke({"query": "John is 30 years old"})
# result: {'name': 'John', 'age': 30}
LangSmith is LangChain's hosted observability and evaluation platform for LLM applications. It automatically captures traces — the full execution tree of every chain, agent step, LLM call, retriever hit, and tool invocation — so you can inspect exactly what happened during a run, including prompts sent, completions received, latency at each step, and token usage.
Enabling LangSmith requires just two environment variables:
export LANGCHAIN_TRACING_V2=true
export LANGCHAIN_API_KEY=ls__your_key_here
export LANGCHAIN_PROJECT=my-project # optional, groups traces
No code changes are required — every LangChain component automatically sends traces once these are set. LangSmith's main capabilities include:
- Tracing — visualise the full execution tree of any run
- Datasets & Evaluations — build golden datasets and run LLM-as-judge or custom evaluators to benchmark prompt changes
- Playground — edit prompts inline and replay traces to test changes
- Monitoring — dashboards for latency, error rates, and cost over time in production
- Annotation queues — route interesting traces to human reviewers for labelling and feedback
LangChain Hub is a public repository at smith.langchain.com/hub for sharing and versioning prompts. Teams use it to store prompts outside of application code, iterate on them without deployments, and pull specific versions into chains at runtime.
To use Hub prompts in code, install langchainhub and call hub.pull():
pip install langchainhub
from langchain import hub
# Pull a community RAG prompt (returns a ChatPromptTemplate)
rag_prompt = hub.pull("rlm/rag-prompt")
# Pin a specific commit to avoid drift
rag_prompt_v2 = hub.pull("rlm/rag-prompt:50442af1")
# Use it in a chain
chain = rag_prompt | llm | StrOutputParser()
You can also push your own prompts to the Hub from code, making them accessible to teammates or the broader community:
hub.push("your-username/my-prompt", my_prompt_template)
LangChain Hub is especially useful for teams that want to separate prompt engineering from application deployment — a prompt designer can update and version a prompt in the Hub, and the next invocation of the application picks up the latest (or pinned) version without a code deploy.
LangServe is a library that turns any LCEL chain into a production-ready REST API in a few lines of code. It wraps FastAPI and exposes standard endpoints — /invoke, /batch, /stream, and /stream_log — so clients can call your chain over HTTP without any custom FastAPI code.
pip install langserve[all] fastapi uvicorn
from fastapi import FastAPI
from langserve import add_routes
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
app = FastAPI(title="My LLM API")
chain = (
ChatPromptTemplate.from_template("Answer: {question}")
| ChatOpenAI()
| StrOutputParser()
)
add_routes(app, chain, path="/qa")
# uvicorn server:app --host 0.0.0.0 --port 8000
Once running, the /qa/invoke endpoint accepts POST with {"input": {"question": "..."}}, /qa/stream returns an SSE stream, and /qa/playground serves an interactive browser UI. LangServe also generates an OpenAPI schema at /docs automatically.
Callbacks in LangChain are hooks that fire at specific lifecycle events during chain, model, and agent execution. You implement a BaseCallbackHandler subclass and override only the methods you care about. Each method receives context about what just happened — which model was called, what the prompt was, what the response was, and how long it took.
Key callback methods (all have async equivalents prefixed with a):
on_llm_start(serialized, prompts)— fired before an LLM callon_llm_end(response)— fired after an LLM call completeson_chain_start(serialized, inputs)— fired when a chain beginson_chain_end(outputs)— fired when a chain finisheson_tool_start(serialized, input_str)— fired before a tool executeson_tool_end(output)— fired after a tool returnson_agent_action(action)— fired each time an agent decides to use a tool
from langchain_core.callbacks import BaseCallbackHandler
class TokenLogger(BaseCallbackHandler):
def on_llm_end(self, response, **kwargs):
usage = response.llm_output.get('token_usage', {})
print(f"Tokens used: {usage}")
chain.invoke({"input": "hello"}, config={"callbacks": [TokenLogger()]})
Callbacks can be attached per-invocation via config={"callbacks": [...]}, per-component via constructor arguments, or globally with set_global_handler(). LangSmith tracing itself is implemented as a callback handler.
Streaming in LangChain means receiving model output token-by-token rather than waiting for the full response. This dramatically improves perceived responsiveness in user-facing applications. LCEL chains support streaming out of the box through three methods: stream(), astream(), and astream_events().
Synchronous streaming — iterates over chunks as they arrive:
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
chain = ChatPromptTemplate.from_template("{topic}") | ChatOpenAI() | StrOutputParser()
for chunk in chain.stream({"topic": "Explain quantum entanglement briefly"}):
print(chunk, end="", flush=True)
Async streaming — for FastAPI / async servers:
async for chunk in chain.astream({"topic": "..."}):
print(chunk, end="", flush=True)
Fine-grained event streaming — astream_events() gives you granular events for every component in the chain (tool calls, retriever results, LLM tokens), letting you build rich streaming UIs that show intermediate steps:
async for event in chain.astream_events({"topic": "..."}, version="v2"):
if event["event"] == "on_chat_model_stream":
print(event["data"]["chunk"].content, end="")
LangChain follows a modular package structure that allows different parts of the ecosystem to evolve at different speeds without breaking stable core interfaces. As of 2024, the main packages are:
- langchain-core — Stable base abstractions: Runnable, BaseMessage, BasePromptTemplate, BaseOutputParser. Changes here are rare and follow strict semver. Most application code depends only on this.
- langchain — Orchestration logic: Chains, Agents, ConversationMemory, AgentExecutor. Versions are released frequently but follow deprecation warnings.
- langchain-community — Third-party integrations (vector stores, document loaders, tool wrappers). Changes fast; pin carefully in production.
- Provider packages (langchain-openai, langchain-anthropic, langchain-google-genai, etc.) — Maintained separately so OpenAI SDK updates don't break Anthropic users.
- langchain-experimental — Unstable, experimental features not ready for production.
When a feature is deprecated (e.g. LLMChain in favour of LCEL), LangChain emits LangChainDeprecationWarning for at least one major version before removal. Pin versions in requirements.txt or use a lockfile (pip-tools, poetry.lock) to avoid unintentional upgrades in production.
A Chain in LangChain is any sequence of processing steps that takes an input, passes it through one or more components (prompts, models, retrievers, tools), and produces an output. Chains are the fundamental unit of composition — everything from a single prompt+model call to a multi-step RAG pipeline is a chain.
The modern way to build chains is with LCEL (using the | operator). Legacy chain classes still exist but are deprecated:
| Legacy Class | LCEL Equivalent |
|---|---|
| LLMChain | prompt | llm | StrOutputParser() |
| SimpleSequentialChain | chain1 | chain2 | chain3 |
| RetrievalQA | (retriever | format_docs) | prompt | llm | StrOutputParser() |
| ConversationalRetrievalChain | RunnablePassthrough + retriever + prompt | llm |
Every LCEL chain is itself a Runnable, so chains compose recursively — a chain can be embedded inside another chain as a step. The main practical patterns are: simple prompt chain (question → answer), RAG chain (question → retrieve → augment → answer), and agent loop (question → plan → tool → observe → answer).
In a sequential chain, components run one after another: the output of step N becomes the input of step N+1. This is the default LCEL pipe behaviour — chain = step1 | step2 | step3 means step2 cannot start until step1 finishes.
In a parallel chain, multiple branches run concurrently on the same input, and their results are merged into a single dict. LangChain implements this with RunnableParallel:
from langchain_core.runnables import RunnableParallel, RunnablePassthrough
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
llm = ChatOpenAI()
parallel_chain = RunnableParallel(
summary=summary_prompt | llm | StrOutputParser(),
sentiment=sentiment_prompt | llm | StrOutputParser(),
keywords=keywords_prompt | llm | StrOutputParser(),
)
# Runs all three LLM calls concurrently, then returns:
# {"summary": "...", "sentiment": "...", "keywords": "..."}
result = parallel_chain.invoke({"text": "LangChain is amazing..."})
Use sequential chains when each step depends on the previous result. Use parallel chains when steps are independent of each other — this reduces wall-clock time to the slowest branch's latency rather than the sum of all branches.
The pipe operator | in LCEL connects two Runnable objects so that the output of the left side becomes the input of the right side. It is syntactic sugar for RunnableSequence(left, right) and works because LangChain overloads Python's __or__ and __ror__ dunder methods on the Runnable base class.
Basic usage — each step must accept what the previous step returns:
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
# prompt returns ChatPromptValue
# model accepts ChatPromptValue, returns AIMessage
# parser accepts AIMessage, returns str
chain = (
ChatPromptTemplate.from_template("Explain {concept} in one sentence.")
| ChatOpenAI(model="gpt-4o-mini")
| StrOutputParser()
)
print(chain.invoke({"concept": "recursion"}))
You can also chain dicts (automatically wrapped in RunnableParallel) or lambda functions (wrapped in RunnableLambda). Input/output type compatibility is checked lazily at runtime — LangChain will raise a clear error if types don't align.
# Dict shorthand for RunnableParallel at the start:
chain = (
{"context": retriever, "question": RunnablePassthrough()}
| rag_prompt
| ChatOpenAI()
| StrOutputParser()
)
RunnablePassthrough and RunnableLambda are utility Runnables that solve two common chain-building problems: passing input data unchanged to a later step, and wrapping arbitrary Python logic as a Runnable step.
RunnablePassthrough simply passes whatever it receives as input directly to its output. It is most useful in RAG chains where you need to forward the original question to the prompt while also fetching documents in parallel:
from langchain_core.runnables import RunnablePassthrough
chain = (
{"context": retriever, "question": RunnablePassthrough()}
| rag_prompt
| llm
| StrOutputParser()
)
# 'question' is passed through unchanged; 'context' is fetched from the retriever
RunnablePassthrough.assign(key=fn) extends this by adding new keys to the dict while keeping existing ones.
RunnableLambda wraps any Python function as a Runnable so it can participate in an LCEL chain:
from langchain_core.runnables import RunnableLambda
def format_docs(docs):
return "\n\n".join(doc.page_content for doc in docs)
chain = retriever | RunnableLambda(format_docs) | prompt | llm | StrOutputParser()
# Shorthand: lambda automatically wraps when piped
chain = retriever | (lambda docs: "\n".join(d.page_content for d in docs)) | prompt
Beyond simple prompt | model | parser pipes, a handful of patterns appear repeatedly in production LangChain applications:
- RAG pattern — retrieve relevant documents, inject them into a prompt, generate an answer. The retriever and passthrough run in parallel so both context and question reach the prompt:
{context: retriever, question: RunnablePassthrough()} | rag_prompt | llm | parser - Router / conditional branch — use
RunnableBranchor a lambda to route different inputs to different sub-chains. Useful for multi-intent chatbots where a general question goes to one chain and a SQL query goes to another. - Map-reduce — split a long document into chunks, process each chunk in parallel with
.batch(), then reduce the results with a combine chain. Standard pattern for summarising books or analysing large codebases. - Refine — process chunks sequentially, passing the previous summary into the next iteration to progressively build a better answer. More accurate than map-reduce for certain summarisation tasks.
- Fallback chain — primary chain with a backup:
gpt4_chain.with_fallbacks([gpt35_chain]). If the primary raises an exception, the fallback is tried automatically. - Branching + merge — run parallel branches (e.g. extract entities, summarise, classify sentiment) and merge their outputs into a final dict for downstream use.
A ConversationChain maintains multi-turn dialogue by storing conversation history and injecting it into each new prompt invocation. The legacy approach uses ConversationChain with a memory object; the LCEL approach manages history explicitly in the chain state using MessagesPlaceholder.
LCEL approach (recommended):
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.messages import HumanMessage, AIMessage
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant."),
MessagesPlaceholder(variable_name="history"),
("human", "{input}"),
])
chain = prompt | ChatOpenAI()
# Manually manage history
history = []
def chat(user_input):
response = chain.invoke({"input": user_input, "history": history})
history.append(HumanMessage(content=user_input))
history.append(AIMessage(content=response.content))
return response.content
print(chat("My name is Alice."))
print(chat("What is my name?")) # correctly recalls "Alice"
For server-side multi-user conversations, pair this with LangGraph's checkpointing or RunnableWithMessageHistory which wraps the chain and automatically loads/saves history per session ID from a configurable store.
Routing in LCEL means directing an input to one of several sub-chains based on a condition. The two main tools are RunnableBranch (declarative) and a plain Python function returning a Runnable (imperative).
RunnableBranch — takes a list of (condition, runnable) pairs and a default. The first condition that evaluates to True determines which runnable handles the input:
from langchain_core.runnables import RunnableBranch
router = RunnableBranch(
(lambda x: "sql" in x["topic"].lower(), sql_chain),
(lambda x: "python" in x["topic"].lower(), python_chain),
general_chain, # default
)
result = router.invoke({"topic": "How do I write a SQL JOIN?"})
# Routes to sql_chain
Lambda-based routing — a custom function that returns the appropriate runnable based on the classification output from an earlier chain step:
def route(info):
if info["topic"] == "science":
return science_chain
return general_chain
full_chain = classify_chain | RunnableLambda(route)
A common production pattern is to first run a fast, cheap classification chain that returns a topic label, then route to specialised chains accordingly. This avoids sending every request through a heavyweight model.
Error handling in LangChain chains operates at several levels: Python exception handling around .invoke(), chain-level fallbacks, parser-level retry, and output validation with Pydantic.
Basic try/except — handles transient API errors or rate limits:
from openai import RateLimitError
try:
result = chain.invoke({"question": user_input})
except RateLimitError as e:
result = "Service busy, please retry."
except Exception as e:
logger.error(f"Chain failed: {e}")
result = fallback_response
Chain fallbacks — declaratively try a backup chain if the primary fails:
# If gpt4_chain raises any exception, gpt35_chain is tried automatically
robust_chain = gpt4_chain.with_fallbacks([gpt35_chain])
Output parser errors — OutputFixingParser wraps another parser and uses a second LLM call to fix malformed output if parsing fails:
from langchain.output_parsers import OutputFixingParser
fixing_parser = OutputFixingParser.from_llm(
parser=json_parser, llm=ChatOpenAI()
)
chain = prompt | llm | fixing_parser
For structured output validation, using llm.with_structured_output(MyModel) raises a ValidationError if the model's response doesn't match the schema, making it easy to catch and handle type mismatches.
Fallbacks and retries are resilience mechanisms built into LangChain Runnables that make production chains tolerant of transient failures and model quality issues.
Fallbacks — .with_fallbacks() attaches one or more backup Runnables that are tried in order if the primary raises an exception. You can fall back to a cheaper model, a different provider, or a static response:
from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic
chain = (
ChatOpenAI(model="gpt-4o")
.with_fallbacks([
ChatAnthropic(model="claude-3-sonnet-20240229"),
ChatOpenAI(model="gpt-4o-mini"),
])
)
# If gpt-4o fails, tries Claude; if that fails, tries gpt-4o-mini
Retries — .with_retry() retries the same Runnable on failure with configurable stop conditions and wait strategies:
from langchain_openai import ChatOpenAI
resilient_llm = ChatOpenAI().with_retry(
retry_if_exception_type=(RateLimitError, Timeout),
stop_after_attempt=3,
wait_exponential_jitter=True,
)
chain = prompt | resilient_llm | StrOutputParser()
You can combine both: retry first (for transient errors), then fall back (if the model is genuinely unavailable). Retries are best for rate-limit errors; fallbacks are best for model outages or quality failures (e.g. the primary model produces invalid JSON).
The .batch() method on any LCEL chain processes a list of inputs and returns a list of outputs. Under the hood, LangChain runs the inputs concurrently using a thread pool (synchronous) or asyncio tasks (async), subject to an optional concurrency limit.
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
chain = ChatPromptTemplate.from_template("Summarise: {text}") | ChatOpenAI() | StrOutputParser()
texts = [
{"text": "Article 1 text..."},
{"text": "Article 2 text..."},
{"text": "Article 3 text..."},
]
# Runs concurrently, returns list in input order
summaries = chain.batch(texts)
# Limit concurrency to avoid rate limits
summaries = chain.batch(texts, config={"max_concurrency": 5})
The async equivalent is .abatch(), which is preferred in async applications:
summaries = await chain.abatch(texts, config={"max_concurrency": 5})
Batch processing is ideal for offline data pipelines: indexing document collections, running evaluations against a test set, or bulk extracting structured data from unstructured text. Results are always returned in the same order as the input list, even if individual tasks complete out of order.
A LangChain Agent is a system where an LLM acts as the reasoning engine that decides, at each step, which action to take. Unlike a fixed chain where the sequence of operations is defined by the developer, an agent dynamically determines the order and selection of tool calls based on the user's input and intermediate results.
The core loop of an agent is:
- Receive user input
- LLM reasons about what to do (Thought)
- LLM selects a tool and provides its input (Action)
- Tool executes and returns a result (Observation)
- LLM receives the observation and decides whether to take another action or produce a final answer
- Repeat steps 2–5 until a final answer is reached
This pattern is called ReAct (Reasoning + Acting). Agents are most valuable when the number or order of steps needed to solve a task cannot be predetermined — for example, researching a question that may require 1 or 5 web searches depending on what the first search returns. The two main modern approaches are OpenAI Tools Agent (structured tool calling via OpenAI function calling API) and ReAct Agent (reasoning via text in the prompt for models without native function calling).
LangChain provides several agent types, each suited to different LLM capabilities and task requirements:
| Agent Type | How it works | Best for |
|---|---|---|
| OpenAI Tools Agent | Uses OpenAI's native tool/function calling API to select and call tools | OpenAI models (gpt-4o, gpt-4-turbo); most reliable structured tool use |
| OpenAI Functions Agent | Older version using the functions API (now superseded by Tools Agent) | Legacy gpt-3.5/gpt-4 function calling |
| ReAct Agent | Uses Thought/Action/Observation text format in the prompt; parses action from model output | Models without native function calling; transparent reasoning |
| Structured Chat Agent | Like ReAct but handles tools with multi-field structured inputs | Tools that require more than a single string input |
| XML Agent | Uses XML-formatted actions; designed for Anthropic Claude models | Claude models where XML is reliable output format |
| JSON Chat Agent | Uses JSON-formatted actions in the prompt | Models that reliably produce JSON without native tool calling |
In practice, create_openai_tools_agent() or create_react_agent() are the most common entry points. For anything requiring fine-grained control over the agent loop — including human-in-the-loop, persistent state, or multi-agent coordination — consider using LangGraph instead.
The easiest way to create a custom agent is with the factory functions create_react_agent() or create_openai_tools_agent(), which combine a custom prompt, an LLM, and a list of tools. Most customisation needs are met by adjusting the prompt and tool list.
from langchain import hub
from langchain.agents import create_react_agent, AgentExecutor
from langchain_openai import ChatOpenAI
from langchain_community.tools.tavily_search import TavilySearchResults
# Pull base ReAct prompt from Hub or define your own
prompt = hub.pull("hwchase17/react")
tools = [TavilySearchResults(max_results=3)]
llm = ChatOpenAI(model="gpt-4o")
agent = create_react_agent(llm=llm, tools=tools, prompt=prompt)
executor = AgentExecutor(agent=agent, tools=tools, verbose=True)
executor.invoke({"input": "What is the population of France?"})
For full control, subclass BaseSingleActionAgent (returns one action per step) or BaseMultiActionAgent (returns multiple actions per step). You must implement plan() and aplan() which receive the current intermediate steps and return either an AgentAction (tool to call) or AgentFinish (final answer).
For production multi-step agents with complex state and human-in-the-loop needs, LangGraph's graph-based approach is more appropriate than subclassing agent base classes.
AgentExecutor is the runtime loop that drives an agent to completion. It takes an agent (which decides actions) and a list of tools (which execute those actions), and repeatedly calls the agent, executes the selected tool, feeds the observation back, and repeats until the agent returns an AgentFinish or a stopping condition is reached.
from langchain.agents import AgentExecutor
executor = AgentExecutor(
agent=agent,
tools=tools,
verbose=True, # print each step
max_iterations=10, # prevent infinite loops
return_intermediate_steps=True, # include tool call history in output
handle_parsing_errors=True, # auto-retry if output parse fails
)
result = executor.invoke({"input": "Find the CEO of Anthropic"})
print(result["output"]) # final answer
print(result["intermediate_steps"]) # list of (AgentAction, observation)
Key configuration options: max_iterations prevents runaway loops, max_execution_time adds a wall-clock timeout, early_stopping_method controls whether the agent generates a final answer when max_iterations is hit or just stops, and handle_parsing_errors retries if the LLM produces malformed output instead of crashing the loop.
A Tool in LangChain is a callable that an agent can invoke when it needs to interact with the outside world. Every tool has three required attributes: a name (how the LLM refers to it), a description (what it does and when to use it — the LLM reads this to decide), and an input schema (the parameters it expects).
When the agent decides to call a tool, AgentExecutor:
- Finds the tool by name in its tools list
- Parses the agent's action into the tool's input format
- Calls
tool.run(input)ortool.arun(input) - Returns the result as an "Observation" back to the agent
LangChain ships dozens of pre-built tools in langchain-community: web search (Tavily, SerpAPI), code execution (PythonREPL), database query (SQLDatabase), Wikipedia, file I/O, and more. You access them as:
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain.tools import WikipediaQueryRun
search = TavilySearchResults(max_results=3)
wiki = WikipediaQueryRun()
tools = [search, wiki]
A critical practical point: the tool description matters more than the implementation. The LLM decides whether to call a tool based entirely on reading its description. A vague description leads to incorrect tool selection; a precise description improves agent accuracy.
There are three ways to create custom tools in LangChain, in order of increasing complexity: the @tool decorator, StructuredTool.from_function(), and subclassing BaseTool.
@tool decorator — simplest approach for single-string input tools:
from langchain_core.tools import tool
@tool
def get_word_count(text: str) -> int:
"""Counts the number of words in the provided text. Use when asked about word count."""
return len(text.split())
# Tool name: 'get_word_count', description from docstring
print(get_word_count.invoke("Hello world")) # 2
StructuredTool.from_function() — for tools with multiple inputs:
from langchain_core.tools import StructuredTool
from pydantic import BaseModel
class MultiplyInput(BaseModel):
a: float
b: float
def multiply(a: float, b: float) -> float:
"""Multiplies two numbers together."""
return a * b
multiply_tool = StructuredTool.from_function(
func=multiply,
name="multiply",
description="Multiplies two numbers together.",
args_schema=MultiplyInput,
)
BaseTool subclass — for full control, async support, and complex logic:
from langchain_core.tools import BaseTool
class DatabaseQueryTool(BaseTool):
name = "database_query"
description = "Query the internal product database. Input should be a SQL WHERE clause."
def _run(self, query: str) -> str:
return db.execute(f"SELECT * FROM products WHERE {query}")
async def _arun(self, query: str) -> str:
return await db.async_execute(query)
A multi-action agent returns a list of AgentAction objects per reasoning step rather than a single action. This enables the agent to call multiple tools simultaneously within a single turn, which is useful when several tool calls are independent and don't need to be serialised.
Multi-action agents implement BaseMultiActionAgent, and their plan() method returns List[AgentAction] instead of a single AgentAction. AgentExecutor detects this and executes all returned actions in parallel before feeding their observations back to the agent.
OpenAI's parallel tool calling feature maps directly to this pattern. When you call ChatOpenAI with tools bound via .bind_tools(), the model can return multiple tool calls in a single response, and AgentExecutor (or LangGraph) runs them concurrently:
from langchain_openai import ChatOpenAI
llm_with_tools = ChatOpenAI(model="gpt-4o").bind_tools([search_tool, calculator_tool])
# Model may respond with both a search call AND a calculator call in one step
response = llm_with_tools.invoke("What is the population of France times 2?")
print(response.tool_calls) # [{name: 'search', ...}, {name: 'calculator', ...}]
For complex coordination of parallel tool execution with state management, LangGraph is better suited than AgentExecutor, as it provides explicit graph edges for parallel branches.
LangChain agents use the ReAct (Reasoning + Acting) framework to plan and reason. The model is prompted to produce interleaved Thought, Action, and Observation sequences. The Thought is the model's explicit reasoning about what to do next; the Action is the tool call decision; the Observation is the tool's returned result. This cycle repeats until the model produces a "Final Answer".
A ReAct trace looks like this:
Question: Who is the CEO of Anthropic and when was the company founded?
Thought: I need to search for information about Anthropic.
Action: search
Action Input: "Anthropic CEO founder"
Observation: Anthropic was founded in 2021. Dario Amodei is the CEO.
Thought: I now have both pieces of information needed to answer.
Final Answer: Anthropic's CEO is Dario Amodei. The company was founded in 2021.
For models with native function calling (OpenAI, Anthropic), the reasoning is more structured: the model returns a JSON tool call object rather than parsing free text, which is more reliable. The OpenAI Tools Agent uses this approach. Newer techniques like chain-of-thought prompting and tree-of-thought can be integrated to improve multi-step reasoning quality by providing examples of good reasoning chains in the system prompt.
By default, AgentExecutor has no memory — each invocation is stateless. To give an agent conversation memory, pass a memory object to AgentExecutor. This is distinct from return_intermediate_steps (which stores tool call history within a single run); memory stores the dialogue across multiple separate invocations.
from langchain.memory import ConversationBufferMemory
from langchain.agents import AgentExecutor
memory = ConversationBufferMemory(
memory_key="chat_history", # must match prompt variable
return_messages=True,
)
executor = AgentExecutor(
agent=agent,
tools=tools,
memory=memory,
verbose=True,
)
executor.invoke({"input": "My name is Alice."})
executor.invoke({"input": "What did I just tell you?"}) # recalls "Alice"
The prompt used by the agent must include a {chat_history} variable (or whatever memory_key is set to) so the history is injected on each call. For multi-user scenarios, each user needs their own memory object — or use LangGraph's checkpointing with thread IDs to manage per-conversation state.
Debugging LangChain agents requires visibility into the agent's reasoning steps, tool inputs, and tool outputs — not just the final answer. Several tools address this at different levels of depth.
verbose=True — prints every Thought, Action, and Observation to stdout during execution. Quick and zero-setup, ideal during development:
executor = AgentExecutor(agent=agent, tools=tools, verbose=True)
return_intermediate_steps=True — returns the full [(AgentAction, observation), ...] list in the output dict so you can inspect programmatically in tests:
result = executor.invoke({"input": "..."}, return_intermediate_steps=True)
for action, obs in result["intermediate_steps"]:
print(action.tool, action.tool_input, "=>", obs)
LangSmith tracing — set LANGCHAIN_TRACING_V2=true and every agent run is captured as a full tree trace in LangSmith. You can see token counts, latency per step, exact prompts sent to the model, and tool call details. This is the most powerful debugging tool for production issues.
StdOutCallbackHandler — equivalent to verbose but via the callback system, useful when you need to attach it conditionally:
from langchain_core.callbacks import StdOutCallbackHandler
result = executor.invoke({"input": "..."}, config={"callbacks": [StdOutCallbackHandler()]})
LangGraph is a library for building stateful, multi-actor applications with LLMs using a directed graph model. Where LangChain chains are linear (or at most tree-shaped), LangGraph graphs can have cycles — a node can route back to an earlier node, making it possible to express iterative agent loops, retry-on-failure patterns, and human-in-the-loop pauses as explicit graph edges rather than implicit recursion.
The core concepts are:
- State — a typed Python dict (TypedDict) that persists across all nodes in the graph
- Nodes — Python functions that receive state and return a partial state update
- Edges — connections between nodes; can be unconditional or conditional (routing)
- StateGraph — the graph builder class; compile it to get an executable app
- Checkpointing — built-in persistence of state after every node, enabling resume, time-travel debugging, and human-in-the-loop
LangGraph is the recommended approach for anything beyond a simple linear chain: autonomous agents with retry loops, multi-agent coordination, chatbots with persistent memory, and workflows that need a human to approve or correct an intermediate step before proceeding.
LangChain Agents (via AgentExecutor) and LangGraph both implement agent behaviour, but they differ significantly in how much control you have over the execution flow:
| Dimension | LangChain AgentExecutor | LangGraph |
|---|---|---|
| Execution flow | Black-box loop; you can't see or modify the flow between steps | Explicit graph; every edge and node is defined by you |
| Cycles / loops | Implicit loop managed by AgentExecutor | Explicit cycles via conditional edges |
| Human-in-the-loop | Hard to add; requires custom callback hacks | First-class feature via interrupt_before/after |
| State management | Limited to memory object passed to executor | Full typed state dict with custom reducers |
| Persistence | Not built-in; requires custom implementation | Built-in checkpointers (MemorySaver, SqliteSaver, PostgresSaver) |
| Multi-agent | No native support | First-class: agents as nodes with handoffs |
| Complexity | Simple, quick to prototype | More setup, but much more control |
In practice: use AgentExecutor for quick prototypes and simple single-agent tasks. Switch to LangGraph when you need reliable production agents with human oversight, complex multi-step state, persistent memory, or multi-agent coordination.
StateGraph is the main graph class in LangGraph. You instantiate it with a state type (a TypedDict class), add nodes and edges to it, then compile it into an executable app. The state type defines all the fields that are shared across nodes and how those fields are updated when a node returns a partial update.
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
import operator
# Define the shared state structure
class AgentState(TypedDict):
messages: Annotated[list, operator.add] # append-reducer
steps_taken: int
# Build the graph
graph_builder = StateGraph(AgentState)
def call_llm(state: AgentState) -> dict:
response = llm.invoke(state["messages"])
return {"messages": [response], "steps_taken": state["steps_taken"] + 1}
graph_builder.add_node("llm", call_llm)
graph_builder.add_edge(START, "llm")
graph_builder.add_edge("llm", END)
# Compile to executable
graph = graph_builder.compile()
State updates use reducers. The default reducer is last-write-wins (the node's returned value replaces the current value). Using Annotated[list, operator.add] means returned lists are appended to the existing list — the standard pattern for message history in chat agents.
In LangGraph, nodes are Python functions that contain the logic of your application, and edges are the connections that define execution flow between nodes.
Nodes receive the current state dict and return a partial state update (a dict containing only the keys they want to change). LangGraph merges this update into the full state using the defined reducers:
def tool_node(state: AgentState) -> dict:
# Execute the tool called by the last message
last_message = state["messages"][-1]
tool_result = tools_by_name[last_message.tool_calls[0]["name"]].invoke(
last_message.tool_calls[0]["args"]
)
return {"messages": [ToolMessage(content=str(tool_result), ...)]}
Edges come in two flavours:
- Normal edges — always go from node A to node B:
graph.add_edge("node_a", "node_b") - Conditional edges — a router function decides the next node:
graph.add_conditional_edges("node_a", router_fn, {"tool": "tool_node", "end": END})
Two special node names mark the graph boundaries: START is the entry point (no logic, just the first edge target), and END is the terminal node that signals the graph has finished. A node can have multiple outgoing edges but only one edge can be triggered per invocation (conditional edges are mutually exclusive).
Conditional edges implement branching logic in LangGraph. A router function takes the current state and returns a string key. That key is looked up in a mapping dict to determine which node to execute next.
from langgraph.graph import StateGraph, START, END
def should_continue(state: AgentState) -> str:
"""Decide whether to call a tool or end."""
last_message = state["messages"][-1]
if last_message.tool_calls: # LLM wants to call a tool
return "call_tool"
return "end" # LLM produced a final answer
graph.add_conditional_edges(
"agent", # source node
should_continue, # router function
{
"call_tool": "tool_executor", # route to tool executor
"end": END, # or finish
}
)
The router function can return any string; the mapping dict translates those strings to actual node names or END. If all possible return values are listed in the mapping, you can omit the mapping dict and the router function can return node names directly. Conditional edges are the mechanism that creates cycles in a LangGraph — the agent node routes back to the tool executor, which routes back to the agent, until the agent routes to END.
State in LangGraph is a TypedDict that is shared across all nodes in a graph run. Every time a node executes, it can return a partial update — a dict containing only the keys it wants to change. LangGraph merges the update into the current state using reducers.
The default reducer is last-write-wins: the node's returned value replaces the current value for that key. You can override this with Annotated[type, reducer_fn] where reducer_fn takes (current, update) and returns the new value:
from typing import TypedDict, Annotated
import operator
class GraphState(TypedDict):
# Append-only: new messages are added to the list
messages: Annotated[list, operator.add]
# Last-write-wins: iteration count is replaced each time
iteration_count: int
# Custom reducer: keep the highest score seen so far
best_score: Annotated[float, lambda a, b: max(a, b)]
State is immutable between node calls — nodes receive a snapshot and return updates; they do not mutate state in place. This design enables checkpointing (save the full state after each node), time-travel debugging (replay from any past state), and parallel node execution (each branch gets a copy of the state).
MessageGraph is a specialised version of StateGraph where the entire state is a single list of messages (using the add_messages reducer). Nodes receive the message list and return new messages to append. StateGraph is the general-purpose graph where you define any TypedDict as the state, with full control over all fields and their reducers.
| Feature | MessageGraph | StateGraph |
|---|---|---|
| State structure | Always a list of BaseMessage objects | Any TypedDict with any fields |
| Node input | List of messages | Full state dict |
| Node output | One or more messages to append | Partial dict of any fields to update |
| Custom fields | Not supported | Any fields: scores, iteration counts, flags, etc. |
| Status | Simpler but less flexible | Recommended for all but trivial chatbots |
from langgraph.graph import MessageGraph
# MessageGraph - state is just the messages list
graph = MessageGraph()
graph.add_node("model", lambda msgs: llm.invoke(msgs))
graph.set_entry_point("model")
graph.set_finish_point("model")
MessageGraph was the original LangGraph API and is still useful for pure chatbot flows with no additional state. For anything more complex, StateGraph with Annotated[list, add_messages] for the messages field is preferred because it lets you add other state fields alongside the conversation history.
LangGraph's checkpointing system saves the full graph state after every node execution to a persistent store. This enables resuming interrupted runs, time-travel debugging (replay from any past state), and human-in-the-loop workflows (pause, inspect, modify state, then continue).
To enable checkpointing, pass a checkpointer to graph.compile() and provide a thread_id in the config on each invocation. The thread_id is the key that groups checkpoints belonging to the same conversation or workflow run:
from langgraph.checkpoint.memory import MemorySaver
memory = MemorySaver() # in-memory, for development
graph = graph_builder.compile(checkpointer=memory)
config = {"configurable": {"thread_id": "user-123-session-1"}}
# First invocation
graph.invoke({"messages": [HumanMessage("Hello")]}, config)
# Second invocation — LangGraph automatically loads the previous state
graph.invoke({"messages": [HumanMessage("What did I say?")]}, config)
Checkpointer options: MemorySaver (in-process, ephemeral), SqliteSaver (persistent SQLite file, single-process), AsyncSqliteSaver (async SQLite), PostgresSaver / AsyncPostgresSaver (production multi-process). All implement the BaseCheckpointSaver interface, so switching backends requires only changing the checkpointer passed to compile().
Human-in-the-loop (HITL) in LangGraph means pausing graph execution at a specified point so a human can inspect the current state, approve an action, or modify a value before the graph continues. This is a first-class LangGraph feature built on top of checkpointing.
Step 1: Compile the graph with interrupt_before or interrupt_after
from langgraph.checkpoint.memory import MemorySaver
graph = graph_builder.compile(
checkpointer=MemorySaver(),
interrupt_before=["tool_executor"], # pause before this node runs
)
Step 2: Run until the interrupt
config = {"configurable": {"thread_id": "session-1"}}
for event in graph.stream({"messages": [HumanMessage("Search for X")]}, config):
print(event) # stops before tool_executor
Step 3: Inspect and optionally update state
current_state = graph.get_state(config)
print(current_state.values) # see what the agent is about to do
# Optionally modify the state before continuing:
graph.update_state(config, {"messages": [HumanMessage("Actually search for Y")]}, as_node="agent")
Step 4: Resume execution
# Pass None as input to resume from the checkpoint
for event in graph.stream(None, config):
print(event)
Multi-agent systems in LangGraph are built by representing each agent as a node (or subgraph) and connecting them with edges that define how work is handed off. The most common architecture is the supervisor pattern: one supervisor agent receives the user request, decides which specialist agent should handle it, routes to that agent, and continues routing until the task is complete.
from langgraph.graph import StateGraph, START, END
from typing import Literal
class MultiAgentState(TypedDict):
messages: Annotated[list, operator.add]
next_agent: str
def supervisor(state):
# Supervisor LLM decides which agent goes next
response = supervisor_llm.invoke(state["messages"])
return {"next_agent": response.next} # 'researcher', 'coder', or 'FINISH'
def route_from_supervisor(state) -> Literal["researcher", "coder", END]:
return state["next_agent"] if state["next_agent"] != "FINISH" else END
graph = StateGraph(MultiAgentState)
graph.add_node("supervisor", supervisor)
graph.add_node("researcher", researcher_agent)
graph.add_node("coder", coder_agent)
graph.add_conditional_edges("supervisor", route_from_supervisor)
graph.add_edge("researcher", "supervisor") # always report back
graph.add_edge("coder", "supervisor")
graph.add_edge(START, "supervisor")
An alternative is the network pattern where agents can hand off directly to each other without a central supervisor. Both patterns use shared state in the TypedDict to pass context between agents.
A subgraph in LangGraph is a compiled graph that is used as a node inside a parent graph. Subgraphs allow you to encapsulate complex, reusable agent logic and compose multiple graphs hierarchically — exactly like functions in programming, where a subgraph is the 'function' and the parent graph is the 'caller'.
from langgraph.graph import StateGraph, START, END
# --- Define the subgraph ---
class SubgraphState(TypedDict):
messages: Annotated[list, operator.add]
search_results: list
sub_builder = StateGraph(SubgraphState)
sub_builder.add_node("search", search_node)
sub_builder.add_node("summarise", summarise_node)
sub_builder.add_edge(START, "search")
sub_builder.add_edge("search", "summarise")
sub_builder.add_edge("summarise", END)
research_subgraph = sub_builder.compile()
# --- Use it as a node in the parent graph ---
class ParentState(TypedDict):
messages: Annotated[list, operator.add]
parent_builder = StateGraph(ParentState)
parent_builder.add_node("research", research_subgraph) # subgraph as node
parent_builder.add_node("answer", answer_node)
parent_builder.add_edge(START, "research")
parent_builder.add_edge("research", "answer")
parent_builder.add_edge("answer", END)
graph = parent_builder.compile()
State key overlap between parent and subgraph determines how data flows between them. Keys present in both states are automatically mapped. Subgraphs can have their own checkpointers for independent persistence, or inherit the parent's checkpointer.
LangGraph's .stream() and .astream() methods yield events as each node finishes executing, rather than waiting for the full graph to complete. The stream_mode parameter controls what is yielded.
The three main stream modes:
- stream_mode='updates' (default) — yields the state update returned by each node as
{node_name: {updated_keys}} - stream_mode='values' — yields the full state after each node runs
- stream_mode='debug' — yields detailed debug events for each step
config = {"configurable": {"thread_id": "1"}}
# Stream node updates
for event in graph.stream({"messages": [HumanMessage("Hello")]}, config, stream_mode="updates"):
node_name, state_update = list(event.items())[0]
print(f"Node '{node_name}' updated: {list(state_update.keys())}")
# Stream token-by-token from LLM inside a node
async for event in graph.astream_events({"messages": [...]}, config, version="v2"):
if event["event"] == "on_chat_model_stream":
print(event["data"]["chunk"].content, end="")
For token-level streaming from LLMs called inside nodes, use astream_events() which propagates the standard LangChain callback events (on_chat_model_stream, on_tool_start, on_tool_end) through the entire graph execution tree.
Persistence in LangGraph means saving graph state so it survives process restarts, can be resumed after interrupts, and can be inspected or replayed at any past checkpoint. All persistence goes through the checkpointer interface, so the storage backend is swappable without changing application code.
| Checkpointer | Storage | Use Case |
|---|---|---|
| MemorySaver | Python dict, in-process | Development, unit tests |
| SqliteSaver | SQLite file | Single-process apps, CLI tools |
| AsyncSqliteSaver | SQLite file (async) | Async single-process servers |
| PostgresSaver | PostgreSQL | Multi-process production (sync) |
| AsyncPostgresSaver | PostgreSQL (async) | Multi-process production (async FastAPI) |
from langgraph.checkpoint.sqlite import SqliteSaver
import sqlite3
conn = sqlite3.connect("checkpoints.db", check_same_thread=False)
saver = SqliteSaver(conn)
graph = graph_builder.compile(checkpointer=saver)
# Retrieve past state for a thread
state = graph.get_state({"configurable": {"thread_id": "user-1"}})
# List all past checkpoints
for checkpoint in graph.get_state_history({"configurable": {"thread_id": "user-1"}}):
print(checkpoint.config, checkpoint.created_at)
Error handling in LangGraph is explicit — errors in nodes are not automatically caught or retried. If a node raises an unhandled exception, the graph execution stops and the exception propagates to the caller. This is intentional: LangGraph wants you to be explicit about failure modes rather than silently swallowing errors.
Approach 1: try/except inside node functions — the most common pattern. Catch the error, add a diagnostic message to state, and route to an error-recovery node:
def call_tool(state: AgentState) -> dict:
try:
result = tool.invoke(state["tool_input"])
return {"messages": [ToolMessage(content=result, ...)]}
except Exception as e:
return {"messages": [ToolMessage(content=f"Error: {e}", ...)]}
Approach 2: error recovery edges — route to a dedicated error handler node using a conditional edge that inspects whether the last message signals an error:
def should_retry(state) -> str:
last = state["messages"][-1].content
if last.startswith("Error:"):
return "error_handler"
return "continue"
graph.add_conditional_edges("tool_node", should_retry, {"error_handler": "error_handler", "continue": "agent"})
For transient external service errors (rate limits, timeouts), wrap the relevant LangChain component with .with_retry() before using it inside a node.
LangGraph applications can be deployed in three main ways: LangGraph Cloud (managed service), self-hosted with Docker + FastAPI, and embedded in a larger application. The right choice depends on your team's infrastructure requirements and SLA needs.
LangGraph Cloud — LangChain's managed deployment platform. You push your graph code to a GitHub repo, connect it to LangGraph Cloud, and it handles scaling, checkpointing (PostgreSQL), streaming, and monitoring automatically. Provides REST and WebSocket APIs out of the box.
Self-hosted FastAPI — wrap the compiled graph with a FastAPI app and use PostgresSaver for multi-process state:
from fastapi import FastAPI
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver
app = FastAPI()
@app.on_event("startup")
async def startup():
saver = await AsyncPostgresSaver.from_conn_string(DB_URL)
global graph
graph = graph_builder.compile(checkpointer=saver)
@app.post("/chat/{thread_id}")
async def chat(thread_id: str, message: str):
config = {"configurable": {"thread_id": thread_id}}
result = await graph.ainvoke({"messages": [HumanMessage(message)]}, config)
return {"response": result["messages"][-1].content}
Containerise with Docker, expose via Kubernetes or a managed container service, and use LangSmith for production observability.
