Prev Next

AI / LangGraph LangChain Interview questions

1. What is LangChain? 2. What is LCEL (LangChain Expression Language)? 3. What are the key components of LangChain? 4. How does LangChain differ from traditional LLM integration? 5. What are LangChain Runnables? 6. How do you install and set up LangChain? 7. How do you use ChatModels in LangChain? 8. What are PromptTemplates in LangChain? 9. What are output parsers in LangChain? 10. What is the LangSmith platform? 11. What is LangChain Hub? 12. What is LangServe? 13. How do callbacks work in LangChain? 14. How do you implement streaming in LangChain? 15. How does LangChain handle versioning? 16. What are Chains in LangChain? 17. What is the difference between sequential and parallel chains? 18. How do you use the pipe operator in LCEL? 19. What are RunnablePassthrough and RunnableLambda? 20. What are common chain composition patterns? 21. How do you implement a ConversationChain? 22. How does routing work in LCEL? 23. How do you handle errors in chains? 24. What are chain fallbacks and retries? 25. How do you do batch processing with LCEL? 26. What are LangChain Agents? 27. What are the different agent types in LangChain? 28. How do you create custom agents? 29. What is AgentExecutor? 30. How do tools work in LangChain agents? 31. How do you create custom tools? 32. What are multi-action agents? 33. How do agents plan and reason? 34. How do you integrate memory with agents? 35. How do you debug LangChain agents? 36. What is LangGraph? 37. What are the differences between LangGraph and LangChain Agents? 38. What is StateGraph in LangGraph? 39. How do nodes and edges work in LangGraph? 40. How do you implement conditional edges in LangGraph? 41. How does state management work in LangGraph? 42. What is the difference between MessageGraph and StateGraph? 43. How does checkpointing work in LangGraph? 44. How do you implement human-in-the-loop with LangGraph? 45. How do you build multi-agent systems with LangGraph? 46. What are subgraphs in LangGraph? 47. How do streaming and callbacks work in LangGraph? 48. What are persistence patterns in LangGraph? 49. How do you handle errors in LangGraph? 50. How do you deploy LangGraph applications?
Could not find what you were looking for? send us the question and we would be happy to answer your question.

1. What is LangChain?

LangChain is an open-source framework for building applications powered by large language models (LLMs). It provides composable abstractions — Models, Prompts, Chains, Agents, Memory, and Tools — that make it practical to connect LLMs with external data and systems without writing all the integration plumbing from scratch.

The framework is built around several key abstractions. Models give a unified interface to LLMs such as OpenAI, Anthropic, and Google regardless of their individual APIs. Prompts are templates that format inputs before they reach the model. Chains sequence calls to models, tools, or other Runnables. Agents let the LLM decide which tools to call and in what order. Memory stores conversation context so later turns can reference earlier ones. Tools are callable functions — web search, calculators, database queries — that models can invoke.

LangChain's declarative composition syntax, LCEL, uses the pipe operator | to connect components: prompt | model | parser. The ecosystem extends to LangSmith (tracing and evaluation), LangServe (REST deployment), and LangGraph (stateful multi-actor graph applications). Python and JavaScript/TypeScript are both supported.

What does the Memory component in LangChain primarily do?
Which companion product handles tracing and evaluation for LangChain apps?
2. What is LCEL (LangChain Expression Language)?

LCEL (LangChain Expression Language) is a declarative syntax for composing chains in LangChain using the pipe operator |. It connects Runnable objects so the output of one becomes the input of the next, making multi-step LLM workflows readable and concise.

The core building block is the Runnable interface. Any component that implements invoke(), stream(), and batch() can participate in an LCEL chain. A typical example:

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

prompt = ChatPromptTemplate.from_template("Translate to French: {text}")
model = ChatOpenAI()
parser = StrOutputParser()

chain = prompt | model | parser
result = chain.invoke({"text": "Hello, world!"})

LCEL provides automatic streaming, parallel execution via RunnableParallel, passthrough of values with RunnablePassthrough, custom function wrapping with RunnableLambda, and fallback chains with .with_fallbacks(). Every chain invocation is traced in LangSmith without extra setup.

What operator does LCEL use to chain components together?
What built-in capability does LCEL provide automatically?
3. What are the key components of LangChain?

LangChain is organised around six core abstractions that cover the full lifecycle of an LLM application:

  • Models — A unified interface to LLMs (text-in/text-out) and Chat Models (message-in/message-out), as well as Embedding models for vector representations. Supported providers include OpenAI, Anthropic, Google, Cohere, and dozens of open-source models.
  • PromptsPromptTemplate and ChatPromptTemplate format inputs before they reach a model. They support variable substitution, partial templates, and few-shot examples.
  • Chains — Sequences of operations that combine prompts, models, retrievers, and tools. LCEL is the modern way to compose them using the | operator.
  • Agents — Systems where an LLM decides which tools to call and in what order by reasoning through a ReAct (Reason + Act) loop until it reaches a final answer.
  • Memory — Mechanisms to persist state between calls in a conversation: buffer memory stores the full history, summary memory compresses it, window memory keeps the last N turns.
  • Tools & Toolkits — Functions that agents can call: web search, code execution, database queries, REST APIs, and custom business logic. Toolkits bundle related tools together (e.g., SQLDatabaseToolkit, GitHubToolkit).

Additionally, Document Loaders ingest data from PDFs, websites, CSVs, and databases; Text Splitters chunk documents for vector indexing; and Vector Stores (FAISS, Chroma, Pinecone) enable semantic search that feeds into Retrieval-Augmented Generation (RAG) pipelines.

Which LangChain component decides which tools to call during execution?
What does ConversationSummaryMemory do compared to ConversationBufferMemory?
4. How does LangChain differ from traditional LLM integration?

Traditional LLM integration means calling an LLM's HTTP API directly: you construct a prompt string by hand, send a requests.post(), parse the JSON response, and manage conversation history as a list you track yourself. Each provider has a different SDK, different error codes, and different retry behaviour. When you need RAG, you wire vector store calls separately; when you need tools, you parse the model's text output to decide what to call next.

ConcernTraditional API IntegrationLangChain
Provider switchingRewrite code per provider SDKSwap model class, keep same chain
Prompt managementManual string concatenationPromptTemplate with typed variables
Conversation historyManual list trackingMemory classes handle automatically
Tool/function callingCustom parsing logic per use caseAgents + Tools framework
RAG pipelineSeparate vector DB code + manual retrievalRetriever + LCEL pipe
Retry & fallbackCustom retry logicBuilt-in .with_retry() / .with_fallbacks()
ObservabilityCustom loggingLangSmith tracing built-in

The key difference is composability. LangChain treats every component — model, prompt, retriever, parser — as a Runnable with a consistent interface. You can swap, chain, parallelize, or add fallbacks without touching unrelated code.

Which LangChain feature lets you switch from OpenAI to Anthropic with minimal code change?
What does .with_fallbacks() provide that raw API calls don't?
5. What are LangChain Runnables?

A Runnable is the core interface in LangChain that every composable component implements. If something is a Runnable, it can be connected with |, batched, streamed, retried, and traced — regardless of whether it's a prompt template, an LLM, a retriever, or a custom Python function.

Every Runnable exposes these standard methods:

  • invoke(input) — single synchronous call, returns one output
  • batch([input1, input2, ...]) — processes multiple inputs, returns list of outputs
  • stream(input) — yields output chunks as they arrive (useful for token streaming)
  • ainvoke() / abatch() / astream() — async equivalents of the above
  • astream_events() — fine-grained async event stream (tool calls, LLM tokens, etc.)

LangChain ships several utility Runnables: RunnablePassthrough passes input unchanged (or adds extra fields), RunnableLambda wraps any Python function as a Runnable, RunnableParallel runs multiple branches concurrently and merges their outputs into a dict, and RunnableBranch routes input to different Runnables based on conditions. These combine with LCEL pipes to build arbitrarily complex workflows.

Which Runnable method is used to process a list of inputs concurrently?
What does RunnableParallel do?
6. How do you install and set up LangChain?

LangChain is distributed as several pip packages. The minimal install for OpenAI-backed applications is:

pip install langchain langchain-openai
# For community integrations (vector stores, loaders, etc.):
pip install langchain-community
# For serving with REST API:
pip install langserve fastapi uvicorn

API credentials are passed through environment variables so they never appear in source code:

export OPENAI_API_KEY="sk-..."            # OpenAI
export ANTHROPIC_API_KEY="..."          # Anthropic / Claude
export LANGCHAIN_TRACING_V2="true"      # Enable LangSmith tracing
export LANGCHAIN_API_KEY="ls__..."      # LangSmith API key
export LANGCHAIN_PROJECT="my-project"   # LangSmith project name

A minimal "hello world" with LangChain:

from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage

llm = ChatOpenAI(model="gpt-4o-mini")
response = llm.invoke([HumanMessage(content="What is 2 + 2?")])
print(response.content)  # "4"

The package split is intentional: langchain-core contains stable base abstractions, langchain contains orchestration logic, langchain-openai and similar provider packages contain model integrations, and langchain-community contains third-party integrations that move faster.

Which environment variable enables LangSmith tracing?
Which package contains stable base abstractions like the Runnable interface?
7. How do you use ChatModels in LangChain?

ChatModels in LangChain are LLM wrappers that communicate using a message-based format. Instead of passing a raw string, you pass a list of typed messages: SystemMessage, HumanMessage, and AIMessage. This maps directly to the roles used by OpenAI, Anthropic, and similar APIs.

from langchain_openai import ChatOpenAI
from langchain_core.messages import SystemMessage, HumanMessage

llm = ChatOpenAI(
    model="gpt-4o",
    temperature=0.7,
    max_tokens=512,
)

messages = [
    SystemMessage(content="You are a helpful Python tutor."),
    HumanMessage(content="Explain list comprehensions in Python."),
]
response = llm.invoke(messages)
print(response.content)   # AIMessage with text response
print(response.usage_metadata)  # token counts

ChatModels also support streaming so you can print tokens as they arrive:

for chunk in llm.stream(messages):
    print(chunk.content, end="", flush=True)

Other providers follow the same API: ChatAnthropic, ChatGoogleGenerativeAI, ChatMistralAI. Switching providers requires only changing the import and class name; the rest of the chain remains identical.

Which message type represents the LLM's persona or system instructions?
How do you stream token-by-token output from a ChatModel?

8. What are PromptTemplates in LangChain?

PromptTemplates are objects that format dynamic inputs into the correct structure before passing them to a model. Instead of building prompt strings with f-strings scattered across your codebase, templates give you reusable, testable, versionable prompt construction with named variables.

There are two main types:

  • PromptTemplate — produces a plain text string. Best for LLMs (non-chat models):
from langchain_core.prompts import PromptTemplate
pt = PromptTemplate.from_template("Summarise this in {n} sentences: {text}")
print(pt.format(n=2, text="LangChain is..."))
  • ChatPromptTemplate — produces a list of typed messages. Best for Chat Models:
from langchain_core.prompts import ChatPromptTemplate
chat_pt = ChatPromptTemplate.from_messages([
    ("system", "You are a {role}."),
    ("human", "{user_input}"),
])
messages = chat_pt.format_messages(role="poet", user_input="Write about the sea.")

MessagesPlaceholder is used inside a ChatPromptTemplate to insert a variable-length list of messages — useful for injecting conversation history. partial() lets you pre-fill some variables while leaving others to be filled at call time, which is handy for re-usable templates across different contexts.

Which prompt class should you use when your model expects a list of typed messages?
What is MessagesPlaceholder used for inside a ChatPromptTemplate?
9. What are output parsers in LangChain?

Output parsers sit at the end of a chain and transform the raw text or message returned by an LLM into a more structured or usable form. Without a parser, chain.invoke() returns an AIMessage object; with a parser, you get a plain string, a Python dict, a validated Pydantic model, or a list — whatever your downstream code expects.

The most common parsers:

  • StrOutputParser — extracts .content from an AIMessage, returns a string. Used in virtually every chain: prompt | model | StrOutputParser()
  • JsonOutputParser — parses the model's text as JSON and returns a Python dict. Works best when the prompt instructs the model to return valid JSON.
  • PydanticOutputParser — validates parsed JSON against a Pydantic schema. The parser injects format instructions into the prompt automatically via parser.get_format_instructions().
  • CommaSeparatedListOutputParser — splits a comma-delimited response into a Python list.
  • StructuredOutputParser — uses a JSON schema for more flexible structured output.
from langchain_core.output_parsers import JsonOutputParser
from pydantic import BaseModel

class Person(BaseModel):
    name: str
    age: int

parser = JsonOutputParser(pydantic_object=Person)
chain = prompt | model | parser
result = chain.invoke({"query": "John is 30 years old"})
# result: {'name': 'John', 'age': 30}
What does StrOutputParser extract from an AIMessage?
What extra capability does PydanticOutputParser add over JsonOutputParser?
10. What is the LangSmith platform?

LangSmith is LangChain's hosted observability and evaluation platform for LLM applications. It automatically captures traces — the full execution tree of every chain, agent step, LLM call, retriever hit, and tool invocation — so you can inspect exactly what happened during a run, including prompts sent, completions received, latency at each step, and token usage.

Enabling LangSmith requires just two environment variables:

export LANGCHAIN_TRACING_V2=true
export LANGCHAIN_API_KEY=ls__your_key_here
export LANGCHAIN_PROJECT=my-project   # optional, groups traces

No code changes are required — every LangChain component automatically sends traces once these are set. LangSmith's main capabilities include:

  • Tracing — visualise the full execution tree of any run
  • Datasets & Evaluations — build golden datasets and run LLM-as-judge or custom evaluators to benchmark prompt changes
  • Playground — edit prompts inline and replay traces to test changes
  • Monitoring — dashboards for latency, error rates, and cost over time in production
  • Annotation queues — route interesting traces to human reviewers for labelling and feedback
What does LangSmith capture automatically when tracing is enabled?
Which LangSmith feature lets you test prompt changes against a saved set of inputs and expected outputs?
11. What is LangChain Hub?

LangChain Hub is a public repository at smith.langchain.com/hub for sharing and versioning prompts. Teams use it to store prompts outside of application code, iterate on them without deployments, and pull specific versions into chains at runtime.

To use Hub prompts in code, install langchainhub and call hub.pull():

pip install langchainhub
from langchain import hub

# Pull a community RAG prompt (returns a ChatPromptTemplate)
rag_prompt = hub.pull("rlm/rag-prompt")

# Pin a specific commit to avoid drift
rag_prompt_v2 = hub.pull("rlm/rag-prompt:50442af1")

# Use it in a chain
chain = rag_prompt | llm | StrOutputParser()

You can also push your own prompts to the Hub from code, making them accessible to teammates or the broader community:

hub.push("your-username/my-prompt", my_prompt_template)

LangChain Hub is especially useful for teams that want to separate prompt engineering from application deployment — a prompt designer can update and version a prompt in the Hub, and the next invocation of the application picks up the latest (or pinned) version without a code deploy.

What does hub.pull('rlm/rag-prompt:50442af1') do?
What is the primary benefit of storing prompts in LangChain Hub?
12. What is LangServe?

LangServe is a library that turns any LCEL chain into a production-ready REST API in a few lines of code. It wraps FastAPI and exposes standard endpoints — /invoke, /batch, /stream, and /stream_log — so clients can call your chain over HTTP without any custom FastAPI code.

pip install langserve[all] fastapi uvicorn
from fastapi import FastAPI
from langserve import add_routes
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

app = FastAPI(title="My LLM API")

chain = (
    ChatPromptTemplate.from_template("Answer: {question}")
    | ChatOpenAI()
    | StrOutputParser()
)

add_routes(app, chain, path="/qa")

# uvicorn server:app --host 0.0.0.0 --port 8000

Once running, the /qa/invoke endpoint accepts POST with {"input": {"question": "..."}}, /qa/stream returns an SSE stream, and /qa/playground serves an interactive browser UI. LangServe also generates an OpenAPI schema at /docs automatically.

Which HTTP endpoint does LangServe provide for token-by-token streaming?
What web framework does LangServe use under the hood?
13. How do callbacks work in LangChain?

Callbacks in LangChain are hooks that fire at specific lifecycle events during chain, model, and agent execution. You implement a BaseCallbackHandler subclass and override only the methods you care about. Each method receives context about what just happened — which model was called, what the prompt was, what the response was, and how long it took.

Key callback methods (all have async equivalents prefixed with a):

  • on_llm_start(serialized, prompts) — fired before an LLM call
  • on_llm_end(response) — fired after an LLM call completes
  • on_chain_start(serialized, inputs) — fired when a chain begins
  • on_chain_end(outputs) — fired when a chain finishes
  • on_tool_start(serialized, input_str) — fired before a tool executes
  • on_tool_end(output) — fired after a tool returns
  • on_agent_action(action) — fired each time an agent decides to use a tool
from langchain_core.callbacks import BaseCallbackHandler

class TokenLogger(BaseCallbackHandler):
    def on_llm_end(self, response, **kwargs):
        usage = response.llm_output.get('token_usage', {})
        print(f"Tokens used: {usage}")

chain.invoke({"input": "hello"}, config={"callbacks": [TokenLogger()]})

Callbacks can be attached per-invocation via config={"callbacks": [...]}, per-component via constructor arguments, or globally with set_global_handler(). LangSmith tracing itself is implemented as a callback handler.

Which callback method fires immediately before an LLM is called?
How do you attach a callback to a single chain invocation without affecting other calls?
14. How do you implement streaming in LangChain?

Streaming in LangChain means receiving model output token-by-token rather than waiting for the full response. This dramatically improves perceived responsiveness in user-facing applications. LCEL chains support streaming out of the box through three methods: stream(), astream(), and astream_events().

Synchronous streaming — iterates over chunks as they arrive:

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

chain = ChatPromptTemplate.from_template("{topic}") | ChatOpenAI() | StrOutputParser()

for chunk in chain.stream({"topic": "Explain quantum entanglement briefly"}):
    print(chunk, end="", flush=True)

Async streaming — for FastAPI / async servers:

async for chunk in chain.astream({"topic": "..."}):
    print(chunk, end="", flush=True)

Fine-grained event streamingastream_events() gives you granular events for every component in the chain (tool calls, retriever results, LLM tokens), letting you build rich streaming UIs that show intermediate steps:

async for event in chain.astream_events({"topic": "..."}, version="v2"):
    if event["event"] == "on_chat_model_stream":
        print(event["data"]["chunk"].content, end="")
Which method do you use for async token streaming in LangChain?
What advantage does astream_events() have over astream()?
15. How does LangChain handle versioning?

LangChain follows a modular package structure that allows different parts of the ecosystem to evolve at different speeds without breaking stable core interfaces. As of 2024, the main packages are:

  • langchain-core — Stable base abstractions: Runnable, BaseMessage, BasePromptTemplate, BaseOutputParser. Changes here are rare and follow strict semver. Most application code depends only on this.
  • langchain — Orchestration logic: Chains, Agents, ConversationMemory, AgentExecutor. Versions are released frequently but follow deprecation warnings.
  • langchain-community — Third-party integrations (vector stores, document loaders, tool wrappers). Changes fast; pin carefully in production.
  • Provider packages (langchain-openai, langchain-anthropic, langchain-google-genai, etc.) — Maintained separately so OpenAI SDK updates don't break Anthropic users.
  • langchain-experimental — Unstable, experimental features not ready for production.

When a feature is deprecated (e.g. LLMChain in favour of LCEL), LangChain emits LangChainDeprecationWarning for at least one major version before removal. Pin versions in requirements.txt or use a lockfile (pip-tools, poetry.lock) to avoid unintentional upgrades in production.

Which package contains the stable Runnable and BaseMessage interfaces?
Why are provider packages (langchain-openai, langchain-anthropic) kept separate?
16. What are Chains in LangChain?

A Chain in LangChain is any sequence of processing steps that takes an input, passes it through one or more components (prompts, models, retrievers, tools), and produces an output. Chains are the fundamental unit of composition — everything from a single prompt+model call to a multi-step RAG pipeline is a chain.

The modern way to build chains is with LCEL (using the | operator). Legacy chain classes still exist but are deprecated:

Legacy ClassLCEL Equivalent
LLMChainprompt | llm | StrOutputParser()
SimpleSequentialChainchain1 | chain2 | chain3
RetrievalQA(retriever | format_docs) | prompt | llm | StrOutputParser()
ConversationalRetrievalChainRunnablePassthrough + retriever + prompt | llm

Every LCEL chain is itself a Runnable, so chains compose recursively — a chain can be embedded inside another chain as a step. The main practical patterns are: simple prompt chain (question → answer), RAG chain (question → retrieve → augment → answer), and agent loop (question → plan → tool → observe → answer).

What is the LCEL replacement for the deprecated LLMChain class?
Why is LCEL preferred over the legacy chain classes?
17. What is the difference between sequential and parallel chains?

In a sequential chain, components run one after another: the output of step N becomes the input of step N+1. This is the default LCEL pipe behaviour — chain = step1 | step2 | step3 means step2 cannot start until step1 finishes.

In a parallel chain, multiple branches run concurrently on the same input, and their results are merged into a single dict. LangChain implements this with RunnableParallel:

from langchain_core.runnables import RunnableParallel, RunnablePassthrough
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser

llm = ChatOpenAI()

parallel_chain = RunnableParallel(
    summary=summary_prompt | llm | StrOutputParser(),
    sentiment=sentiment_prompt | llm | StrOutputParser(),
    keywords=keywords_prompt | llm | StrOutputParser(),
)

# Runs all three LLM calls concurrently, then returns:
# {"summary": "...", "sentiment": "...", "keywords": "..."}
result = parallel_chain.invoke({"text": "LangChain is amazing..."})

Use sequential chains when each step depends on the previous result. Use parallel chains when steps are independent of each other — this reduces wall-clock time to the slowest branch's latency rather than the sum of all branches.

What class in LangChain runs multiple branches concurrently on the same input?
When does using RunnableParallel reduce latency compared to sequential chaining?
18. How do you use the pipe operator in LCEL?

The pipe operator | in LCEL connects two Runnable objects so that the output of the left side becomes the input of the right side. It is syntactic sugar for RunnableSequence(left, right) and works because LangChain overloads Python's __or__ and __ror__ dunder methods on the Runnable base class.

Basic usage — each step must accept what the previous step returns:

from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser

# prompt returns ChatPromptValue
# model accepts ChatPromptValue, returns AIMessage
# parser accepts AIMessage, returns str
chain = (
    ChatPromptTemplate.from_template("Explain {concept} in one sentence.")
    | ChatOpenAI(model="gpt-4o-mini")
    | StrOutputParser()
)
print(chain.invoke({"concept": "recursion"}))

You can also chain dicts (automatically wrapped in RunnableParallel) or lambda functions (wrapped in RunnableLambda). Input/output type compatibility is checked lazily at runtime — LangChain will raise a clear error if types don't align.

# Dict shorthand for RunnableParallel at the start:
chain = (
    {"context": retriever, "question": RunnablePassthrough()}
    | rag_prompt
    | ChatOpenAI()
    | StrOutputParser()
)
What Python dunder method does LangChain override to enable the | operator?
What happens when you pipe a plain dict {} into an LCEL chain?
19. What are RunnablePassthrough and RunnableLambda?

RunnablePassthrough and RunnableLambda are utility Runnables that solve two common chain-building problems: passing input data unchanged to a later step, and wrapping arbitrary Python logic as a Runnable step.

RunnablePassthrough simply passes whatever it receives as input directly to its output. It is most useful in RAG chains where you need to forward the original question to the prompt while also fetching documents in parallel:

from langchain_core.runnables import RunnablePassthrough

chain = (
    {"context": retriever, "question": RunnablePassthrough()}
    | rag_prompt
    | llm
    | StrOutputParser()
)
# 'question' is passed through unchanged; 'context' is fetched from the retriever

RunnablePassthrough.assign(key=fn) extends this by adding new keys to the dict while keeping existing ones.

RunnableLambda wraps any Python function as a Runnable so it can participate in an LCEL chain:

from langchain_core.runnables import RunnableLambda

def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

chain = retriever | RunnableLambda(format_docs) | prompt | llm | StrOutputParser()

# Shorthand: lambda automatically wraps when piped
chain = retriever | (lambda docs: "\n".join(d.page_content for d in docs)) | prompt
In a RAG chain, why is RunnablePassthrough used for the 'question' key?
What does RunnableLambda do?
20. What are common chain composition patterns?

Beyond simple prompt | model | parser pipes, a handful of patterns appear repeatedly in production LangChain applications:

  • RAG pattern — retrieve relevant documents, inject them into a prompt, generate an answer. The retriever and passthrough run in parallel so both context and question reach the prompt: {context: retriever, question: RunnablePassthrough()} | rag_prompt | llm | parser
  • Router / conditional branch — use RunnableBranch or a lambda to route different inputs to different sub-chains. Useful for multi-intent chatbots where a general question goes to one chain and a SQL query goes to another.
  • Map-reduce — split a long document into chunks, process each chunk in parallel with .batch(), then reduce the results with a combine chain. Standard pattern for summarising books or analysing large codebases.
  • Refine — process chunks sequentially, passing the previous summary into the next iteration to progressively build a better answer. More accurate than map-reduce for certain summarisation tasks.
  • Fallback chain — primary chain with a backup: gpt4_chain.with_fallbacks([gpt35_chain]). If the primary raises an exception, the fallback is tried automatically.
  • Branching + merge — run parallel branches (e.g. extract entities, summarise, classify sentiment) and merge their outputs into a final dict for downstream use.
In the standard RAG chain pattern, why does RunnableParallel wrap the retriever and RunnablePassthrough together?
When would you choose a Refine pattern over a Map-Reduce pattern for document summarisation?
21. How do you implement a ConversationChain?

A ConversationChain maintains multi-turn dialogue by storing conversation history and injecting it into each new prompt invocation. The legacy approach uses ConversationChain with a memory object; the LCEL approach manages history explicitly in the chain state using MessagesPlaceholder.

LCEL approach (recommended):

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.messages import HumanMessage, AIMessage

prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a helpful assistant."),
    MessagesPlaceholder(variable_name="history"),
    ("human", "{input}"),
])

chain = prompt | ChatOpenAI()

# Manually manage history
history = []
def chat(user_input):
    response = chain.invoke({"input": user_input, "history": history})
    history.append(HumanMessage(content=user_input))
    history.append(AIMessage(content=response.content))
    return response.content

print(chat("My name is Alice."))
print(chat("What is my name?"))  # correctly recalls "Alice"

For server-side multi-user conversations, pair this with LangGraph's checkpointing or RunnableWithMessageHistory which wraps the chain and automatically loads/saves history per session ID from a configurable store.

What role does MessagesPlaceholder play in a conversation chain?
What does RunnableWithMessageHistory add to a basic LCEL conversation chain?
22. How does routing work in LCEL?

Routing in LCEL means directing an input to one of several sub-chains based on a condition. The two main tools are RunnableBranch (declarative) and a plain Python function returning a Runnable (imperative).

RunnableBranch — takes a list of (condition, runnable) pairs and a default. The first condition that evaluates to True determines which runnable handles the input:

from langchain_core.runnables import RunnableBranch

router = RunnableBranch(
    (lambda x: "sql" in x["topic"].lower(), sql_chain),
    (lambda x: "python" in x["topic"].lower(), python_chain),
    general_chain,  # default
)
result = router.invoke({"topic": "How do I write a SQL JOIN?"})
# Routes to sql_chain

Lambda-based routing — a custom function that returns the appropriate runnable based on the classification output from an earlier chain step:

def route(info):
    if info["topic"] == "science":
        return science_chain
    return general_chain

full_chain = classify_chain | RunnableLambda(route)

A common production pattern is to first run a fast, cheap classification chain that returns a topic label, then route to specialised chains accordingly. This avoids sending every request through a heavyweight model.

How does RunnableBranch decide which sub-chain to invoke?
What is the last argument to RunnableBranch used for?
23. How do you handle errors in chains?

Error handling in LangChain chains operates at several levels: Python exception handling around .invoke(), chain-level fallbacks, parser-level retry, and output validation with Pydantic.

Basic try/except — handles transient API errors or rate limits:

from openai import RateLimitError
try:
    result = chain.invoke({"question": user_input})
except RateLimitError as e:
    result = "Service busy, please retry."
except Exception as e:
    logger.error(f"Chain failed: {e}")
    result = fallback_response

Chain fallbacks — declaratively try a backup chain if the primary fails:

# If gpt4_chain raises any exception, gpt35_chain is tried automatically
robust_chain = gpt4_chain.with_fallbacks([gpt35_chain])

Output parser errorsOutputFixingParser wraps another parser and uses a second LLM call to fix malformed output if parsing fails:

from langchain.output_parsers import OutputFixingParser
fixing_parser = OutputFixingParser.from_llm(
    parser=json_parser, llm=ChatOpenAI()
)
chain = prompt | llm | fixing_parser

For structured output validation, using llm.with_structured_output(MyModel) raises a ValidationError if the model's response doesn't match the schema, making it easy to catch and handle type mismatches.

What does .with_fallbacks([backup_chain]) do when the primary chain raises an exception?
What does OutputFixingParser do when a parser fails?
24. What are chain fallbacks and retries?

Fallbacks and retries are resilience mechanisms built into LangChain Runnables that make production chains tolerant of transient failures and model quality issues.

Fallbacks.with_fallbacks() attaches one or more backup Runnables that are tried in order if the primary raises an exception. You can fall back to a cheaper model, a different provider, or a static response:

from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic

chain = (
    ChatOpenAI(model="gpt-4o")
    .with_fallbacks([
        ChatAnthropic(model="claude-3-sonnet-20240229"),
        ChatOpenAI(model="gpt-4o-mini"),
    ])
)
# If gpt-4o fails, tries Claude; if that fails, tries gpt-4o-mini

Retries.with_retry() retries the same Runnable on failure with configurable stop conditions and wait strategies:

from langchain_openai import ChatOpenAI

resilient_llm = ChatOpenAI().with_retry(
    retry_if_exception_type=(RateLimitError, Timeout),
    stop_after_attempt=3,
    wait_exponential_jitter=True,
)
chain = prompt | resilient_llm | StrOutputParser()

You can combine both: retry first (for transient errors), then fall back (if the model is genuinely unavailable). Retries are best for rate-limit errors; fallbacks are best for model outages or quality failures (e.g. the primary model produces invalid JSON).

When should you use .with_retry() rather than .with_fallbacks()?
What does wait_exponential_jitter=True do in .with_retry()?
25. How do you do batch processing with LCEL?

The .batch() method on any LCEL chain processes a list of inputs and returns a list of outputs. Under the hood, LangChain runs the inputs concurrently using a thread pool (synchronous) or asyncio tasks (async), subject to an optional concurrency limit.

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

chain = ChatPromptTemplate.from_template("Summarise: {text}") | ChatOpenAI() | StrOutputParser()

texts = [
    {"text": "Article 1 text..."},
    {"text": "Article 2 text..."},
    {"text": "Article 3 text..."},
]

# Runs concurrently, returns list in input order
summaries = chain.batch(texts)

# Limit concurrency to avoid rate limits
summaries = chain.batch(texts, config={"max_concurrency": 5})

The async equivalent is .abatch(), which is preferred in async applications:

summaries = await chain.abatch(texts, config={"max_concurrency": 5})

Batch processing is ideal for offline data pipelines: indexing document collections, running evaluations against a test set, or bulk extracting structured data from unstructured text. Results are always returned in the same order as the input list, even if individual tasks complete out of order.

How do you limit the number of concurrent requests when using .batch()?
Are results from .batch() guaranteed to be in the same order as inputs?
26. What are LangChain Agents?

A LangChain Agent is a system where an LLM acts as the reasoning engine that decides, at each step, which action to take. Unlike a fixed chain where the sequence of operations is defined by the developer, an agent dynamically determines the order and selection of tool calls based on the user's input and intermediate results.

The core loop of an agent is:

  1. Receive user input
  2. LLM reasons about what to do (Thought)
  3. LLM selects a tool and provides its input (Action)
  4. Tool executes and returns a result (Observation)
  5. LLM receives the observation and decides whether to take another action or produce a final answer
  6. Repeat steps 2–5 until a final answer is reached

This pattern is called ReAct (Reasoning + Acting). Agents are most valuable when the number or order of steps needed to solve a task cannot be predetermined — for example, researching a question that may require 1 or 5 web searches depending on what the first search returns. The two main modern approaches are OpenAI Tools Agent (structured tool calling via OpenAI function calling API) and ReAct Agent (reasoning via text in the prompt for models without native function calling).

What makes an agent different from a fixed LCEL chain?
In the ReAct loop, what is an 'Observation'?
27. What are the different agent types in LangChain?

LangChain provides several agent types, each suited to different LLM capabilities and task requirements:

Agent TypeHow it worksBest for
OpenAI Tools AgentUses OpenAI's native tool/function calling API to select and call toolsOpenAI models (gpt-4o, gpt-4-turbo); most reliable structured tool use
OpenAI Functions AgentOlder version using the functions API (now superseded by Tools Agent)Legacy gpt-3.5/gpt-4 function calling
ReAct AgentUses Thought/Action/Observation text format in the prompt; parses action from model outputModels without native function calling; transparent reasoning
Structured Chat AgentLike ReAct but handles tools with multi-field structured inputsTools that require more than a single string input
XML AgentUses XML-formatted actions; designed for Anthropic Claude modelsClaude models where XML is reliable output format
JSON Chat AgentUses JSON-formatted actions in the promptModels that reliably produce JSON without native tool calling

In practice, create_openai_tools_agent() or create_react_agent() are the most common entry points. For anything requiring fine-grained control over the agent loop — including human-in-the-loop, persistent state, or multi-agent coordination — consider using LangGraph instead.

Which agent type is recommended when using OpenAI gpt-4o with structured tool calling?
When would you choose a ReAct Agent over an OpenAI Tools Agent?
28. How do you create custom agents?

The easiest way to create a custom agent is with the factory functions create_react_agent() or create_openai_tools_agent(), which combine a custom prompt, an LLM, and a list of tools. Most customisation needs are met by adjusting the prompt and tool list.

from langchain import hub
from langchain.agents import create_react_agent, AgentExecutor
from langchain_openai import ChatOpenAI
from langchain_community.tools.tavily_search import TavilySearchResults

# Pull base ReAct prompt from Hub or define your own
prompt = hub.pull("hwchase17/react")

tools = [TavilySearchResults(max_results=3)]
llm = ChatOpenAI(model="gpt-4o")

agent = create_react_agent(llm=llm, tools=tools, prompt=prompt)
executor = AgentExecutor(agent=agent, tools=tools, verbose=True)

executor.invoke({"input": "What is the population of France?"})

For full control, subclass BaseSingleActionAgent (returns one action per step) or BaseMultiActionAgent (returns multiple actions per step). You must implement plan() and aplan() which receive the current intermediate steps and return either an AgentAction (tool to call) or AgentFinish (final answer).

For production multi-step agents with complex state and human-in-the-loop needs, LangGraph's graph-based approach is more appropriate than subclassing agent base classes.

What are the three required arguments to create_react_agent()?
What must a custom BaseSingleActionAgent.plan() method return?
29. What is AgentExecutor?

AgentExecutor is the runtime loop that drives an agent to completion. It takes an agent (which decides actions) and a list of tools (which execute those actions), and repeatedly calls the agent, executes the selected tool, feeds the observation back, and repeats until the agent returns an AgentFinish or a stopping condition is reached.

from langchain.agents import AgentExecutor

executor = AgentExecutor(
    agent=agent,
    tools=tools,
    verbose=True,                  # print each step
    max_iterations=10,             # prevent infinite loops
    return_intermediate_steps=True, # include tool call history in output
    handle_parsing_errors=True,    # auto-retry if output parse fails
)

result = executor.invoke({"input": "Find the CEO of Anthropic"})
print(result["output"])            # final answer
print(result["intermediate_steps"]) # list of (AgentAction, observation)

Key configuration options: max_iterations prevents runaway loops, max_execution_time adds a wall-clock timeout, early_stopping_method controls whether the agent generates a final answer when max_iterations is hit or just stops, and handle_parsing_errors retries if the LLM produces malformed output instead of crashing the loop.

What does max_iterations in AgentExecutor prevent?
What does return_intermediate_steps=True add to the AgentExecutor output?
30. How do tools work in LangChain agents?

A Tool in LangChain is a callable that an agent can invoke when it needs to interact with the outside world. Every tool has three required attributes: a name (how the LLM refers to it), a description (what it does and when to use it — the LLM reads this to decide), and an input schema (the parameters it expects).

When the agent decides to call a tool, AgentExecutor:

  1. Finds the tool by name in its tools list
  2. Parses the agent's action into the tool's input format
  3. Calls tool.run(input) or tool.arun(input)
  4. Returns the result as an "Observation" back to the agent

LangChain ships dozens of pre-built tools in langchain-community: web search (Tavily, SerpAPI), code execution (PythonREPL), database query (SQLDatabase), Wikipedia, file I/O, and more. You access them as:

from langchain_community.tools.tavily_search import TavilySearchResults
from langchain.tools import WikipediaQueryRun

search = TavilySearchResults(max_results=3)
wiki = WikipediaQueryRun()

tools = [search, wiki]

A critical practical point: the tool description matters more than the implementation. The LLM decides whether to call a tool based entirely on reading its description. A vague description leads to incorrect tool selection; a precise description improves agent accuracy.

Why is the tool description the most important attribute of a LangChain tool?
What does AgentExecutor do after a tool returns its result?
31. How do you create custom tools?

There are three ways to create custom tools in LangChain, in order of increasing complexity: the @tool decorator, StructuredTool.from_function(), and subclassing BaseTool.

@tool decorator — simplest approach for single-string input tools:

from langchain_core.tools import tool

@tool
def get_word_count(text: str) -> int:
    """Counts the number of words in the provided text. Use when asked about word count."""
    return len(text.split())

# Tool name: 'get_word_count', description from docstring
print(get_word_count.invoke("Hello world"))  # 2

StructuredTool.from_function() — for tools with multiple inputs:

from langchain_core.tools import StructuredTool
from pydantic import BaseModel

class MultiplyInput(BaseModel):
    a: float
    b: float

def multiply(a: float, b: float) -> float:
    """Multiplies two numbers together."""
    return a * b

multiply_tool = StructuredTool.from_function(
    func=multiply,
    name="multiply",
    description="Multiplies two numbers together.",
    args_schema=MultiplyInput,
)

BaseTool subclass — for full control, async support, and complex logic:

from langchain_core.tools import BaseTool

class DatabaseQueryTool(BaseTool):
    name = "database_query"
    description = "Query the internal product database. Input should be a SQL WHERE clause."

    def _run(self, query: str) -> str:
        return db.execute(f"SELECT * FROM products WHERE {query}")

    async def _arun(self, query: str) -> str:
        return await db.async_execute(query)
Where does the @tool decorator get the tool's description from?
When should you use BaseTool subclassing instead of the @tool decorator?
32. What are multi-action agents?

A multi-action agent returns a list of AgentAction objects per reasoning step rather than a single action. This enables the agent to call multiple tools simultaneously within a single turn, which is useful when several tool calls are independent and don't need to be serialised.

Multi-action agents implement BaseMultiActionAgent, and their plan() method returns List[AgentAction] instead of a single AgentAction. AgentExecutor detects this and executes all returned actions in parallel before feeding their observations back to the agent.

OpenAI's parallel tool calling feature maps directly to this pattern. When you call ChatOpenAI with tools bound via .bind_tools(), the model can return multiple tool calls in a single response, and AgentExecutor (or LangGraph) runs them concurrently:

from langchain_openai import ChatOpenAI

llm_with_tools = ChatOpenAI(model="gpt-4o").bind_tools([search_tool, calculator_tool])

# Model may respond with both a search call AND a calculator call in one step
response = llm_with_tools.invoke("What is the population of France times 2?")
print(response.tool_calls)  # [{name: 'search', ...}, {name: 'calculator', ...}]

For complex coordination of parallel tool execution with state management, LangGraph is better suited than AgentExecutor, as it provides explicit graph edges for parallel branches.

What does a multi-action agent's plan() method return per reasoning step?
What OpenAI model feature enables parallel tool calls in a single LLM response?
33. How do agents plan and reason?

LangChain agents use the ReAct (Reasoning + Acting) framework to plan and reason. The model is prompted to produce interleaved Thought, Action, and Observation sequences. The Thought is the model's explicit reasoning about what to do next; the Action is the tool call decision; the Observation is the tool's returned result. This cycle repeats until the model produces a "Final Answer".

A ReAct trace looks like this:

Question: Who is the CEO of Anthropic and when was the company founded?

Thought: I need to search for information about Anthropic.
Action: search
Action Input: "Anthropic CEO founder"
Observation: Anthropic was founded in 2021. Dario Amodei is the CEO.

Thought: I now have both pieces of information needed to answer.
Final Answer: Anthropic's CEO is Dario Amodei. The company was founded in 2021.

For models with native function calling (OpenAI, Anthropic), the reasoning is more structured: the model returns a JSON tool call object rather than parsing free text, which is more reliable. The OpenAI Tools Agent uses this approach. Newer techniques like chain-of-thought prompting and tree-of-thought can be integrated to improve multi-step reasoning quality by providing examples of good reasoning chains in the system prompt.

In the ReAct framework, what is the 'Thought' step?
Why is the OpenAI Tools Agent more reliable than a text-based ReAct Agent?
34. How do you integrate memory with agents?

By default, AgentExecutor has no memory — each invocation is stateless. To give an agent conversation memory, pass a memory object to AgentExecutor. This is distinct from return_intermediate_steps (which stores tool call history within a single run); memory stores the dialogue across multiple separate invocations.

from langchain.memory import ConversationBufferMemory
from langchain.agents import AgentExecutor

memory = ConversationBufferMemory(
    memory_key="chat_history",  # must match prompt variable
    return_messages=True,
)

executor = AgentExecutor(
    agent=agent,
    tools=tools,
    memory=memory,
    verbose=True,
)

executor.invoke({"input": "My name is Alice."})
executor.invoke({"input": "What did I just tell you?"})  # recalls "Alice"

The prompt used by the agent must include a {chat_history} variable (or whatever memory_key is set to) so the history is injected on each call. For multi-user scenarios, each user needs their own memory object — or use LangGraph's checkpointing with thread IDs to manage per-conversation state.

What must the agent's prompt include for ConversationBufferMemory to work correctly?
How is conversation memory different from return_intermediate_steps in AgentExecutor?
35. How do you debug LangChain agents?

Debugging LangChain agents requires visibility into the agent's reasoning steps, tool inputs, and tool outputs — not just the final answer. Several tools address this at different levels of depth.

verbose=True — prints every Thought, Action, and Observation to stdout during execution. Quick and zero-setup, ideal during development:

executor = AgentExecutor(agent=agent, tools=tools, verbose=True)

return_intermediate_steps=True — returns the full [(AgentAction, observation), ...] list in the output dict so you can inspect programmatically in tests:

result = executor.invoke({"input": "..."}, return_intermediate_steps=True)
for action, obs in result["intermediate_steps"]:
    print(action.tool, action.tool_input, "=>", obs)

LangSmith tracing — set LANGCHAIN_TRACING_V2=true and every agent run is captured as a full tree trace in LangSmith. You can see token counts, latency per step, exact prompts sent to the model, and tool call details. This is the most powerful debugging tool for production issues.

StdOutCallbackHandler — equivalent to verbose but via the callback system, useful when you need to attach it conditionally:

from langchain_core.callbacks import StdOutCallbackHandler
result = executor.invoke({"input": "..."}, config={"callbacks": [StdOutCallbackHandler()]})
What does verbose=True in AgentExecutor print to stdout?
Which debugging approach provides the most detail for production issues including token counts and exact prompts?
36. What is LangGraph?

LangGraph is a library for building stateful, multi-actor applications with LLMs using a directed graph model. Where LangChain chains are linear (or at most tree-shaped), LangGraph graphs can have cycles — a node can route back to an earlier node, making it possible to express iterative agent loops, retry-on-failure patterns, and human-in-the-loop pauses as explicit graph edges rather than implicit recursion.

The core concepts are:

  • State — a typed Python dict (TypedDict) that persists across all nodes in the graph
  • Nodes — Python functions that receive state and return a partial state update
  • Edges — connections between nodes; can be unconditional or conditional (routing)
  • StateGraph — the graph builder class; compile it to get an executable app
  • Checkpointing — built-in persistence of state after every node, enabling resume, time-travel debugging, and human-in-the-loop

LangGraph is the recommended approach for anything beyond a simple linear chain: autonomous agents with retry loops, multi-agent coordination, chatbots with persistent memory, and workflows that need a human to approve or correct an intermediate step before proceeding.

What structural capability does LangGraph have that standard LCEL chains do not?
What is a Node in LangGraph?
37. What are the differences between LangGraph and LangChain Agents?

LangChain Agents (via AgentExecutor) and LangGraph both implement agent behaviour, but they differ significantly in how much control you have over the execution flow:

DimensionLangChain AgentExecutorLangGraph
Execution flowBlack-box loop; you can't see or modify the flow between stepsExplicit graph; every edge and node is defined by you
Cycles / loopsImplicit loop managed by AgentExecutorExplicit cycles via conditional edges
Human-in-the-loopHard to add; requires custom callback hacksFirst-class feature via interrupt_before/after
State managementLimited to memory object passed to executorFull typed state dict with custom reducers
PersistenceNot built-in; requires custom implementationBuilt-in checkpointers (MemorySaver, SqliteSaver, PostgresSaver)
Multi-agentNo native supportFirst-class: agents as nodes with handoffs
ComplexitySimple, quick to prototypeMore setup, but much more control

In practice: use AgentExecutor for quick prototypes and simple single-agent tasks. Switch to LangGraph when you need reliable production agents with human oversight, complex multi-step state, persistent memory, or multi-agent coordination.

Which feature is first-class in LangGraph but very difficult in AgentExecutor?
When is AgentExecutor still the appropriate choice over LangGraph?
38. What is StateGraph in LangGraph?

StateGraph is the main graph class in LangGraph. You instantiate it with a state type (a TypedDict class), add nodes and edges to it, then compile it into an executable app. The state type defines all the fields that are shared across nodes and how those fields are updated when a node returns a partial update.

from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
import operator

# Define the shared state structure
class AgentState(TypedDict):
    messages: Annotated[list, operator.add]  # append-reducer
    steps_taken: int

# Build the graph
graph_builder = StateGraph(AgentState)

def call_llm(state: AgentState) -> dict:
    response = llm.invoke(state["messages"])
    return {"messages": [response], "steps_taken": state["steps_taken"] + 1}

graph_builder.add_node("llm", call_llm)
graph_builder.add_edge(START, "llm")
graph_builder.add_edge("llm", END)

# Compile to executable
graph = graph_builder.compile()

State updates use reducers. The default reducer is last-write-wins (the node's returned value replaces the current value). Using Annotated[list, operator.add] means returned lists are appended to the existing list — the standard pattern for message history in chat agents.

What happens when a StateGraph node returns {'messages': [new_msg]} and messages uses Annotated[list, operator.add]?
What does graph_builder.compile() produce?
39. How do nodes and edges work in LangGraph?

In LangGraph, nodes are Python functions that contain the logic of your application, and edges are the connections that define execution flow between nodes.

Nodes receive the current state dict and return a partial state update (a dict containing only the keys they want to change). LangGraph merges this update into the full state using the defined reducers:

def tool_node(state: AgentState) -> dict:
    # Execute the tool called by the last message
    last_message = state["messages"][-1]
    tool_result = tools_by_name[last_message.tool_calls[0]["name"]].invoke(
        last_message.tool_calls[0]["args"]
    )
    return {"messages": [ToolMessage(content=str(tool_result), ...)]}

Edges come in two flavours:

  • Normal edges — always go from node A to node B: graph.add_edge("node_a", "node_b")
  • Conditional edges — a router function decides the next node: graph.add_conditional_edges("node_a", router_fn, {"tool": "tool_node", "end": END})

Two special node names mark the graph boundaries: START is the entry point (no logic, just the first edge target), and END is the terminal node that signals the graph has finished. A node can have multiple outgoing edges but only one edge can be triggered per invocation (conditional edges are mutually exclusive).

What does a LangGraph node function return?
What is the purpose of the END sentinel in LangGraph?
40. How do you implement conditional edges in LangGraph?

Conditional edges implement branching logic in LangGraph. A router function takes the current state and returns a string key. That key is looked up in a mapping dict to determine which node to execute next.

from langgraph.graph import StateGraph, START, END

def should_continue(state: AgentState) -> str:
    """Decide whether to call a tool or end."""
    last_message = state["messages"][-1]
    if last_message.tool_calls:   # LLM wants to call a tool
        return "call_tool"
    return "end"                  # LLM produced a final answer

graph.add_conditional_edges(
    "agent",          # source node
    should_continue,  # router function
    {
        "call_tool": "tool_executor",  # route to tool executor
        "end": END,                     # or finish
    }
)

The router function can return any string; the mapping dict translates those strings to actual node names or END. If all possible return values are listed in the mapping, you can omit the mapping dict and the router function can return node names directly. Conditional edges are the mechanism that creates cycles in a LangGraph — the agent node routes back to the tool executor, which routes back to the agent, until the agent routes to END.

What must a conditional edge router function return?
How do conditional edges create cycles in a LangGraph?
41. How does state management work in LangGraph?

State in LangGraph is a TypedDict that is shared across all nodes in a graph run. Every time a node executes, it can return a partial update — a dict containing only the keys it wants to change. LangGraph merges the update into the current state using reducers.

The default reducer is last-write-wins: the node's returned value replaces the current value for that key. You can override this with Annotated[type, reducer_fn] where reducer_fn takes (current, update) and returns the new value:

from typing import TypedDict, Annotated
import operator

class GraphState(TypedDict):
    # Append-only: new messages are added to the list
    messages: Annotated[list, operator.add]

    # Last-write-wins: iteration count is replaced each time
    iteration_count: int

    # Custom reducer: keep the highest score seen so far
    best_score: Annotated[float, lambda a, b: max(a, b)]

State is immutable between node calls — nodes receive a snapshot and return updates; they do not mutate state in place. This design enables checkpointing (save the full state after each node), time-travel debugging (replay from any past state), and parallel node execution (each branch gets a copy of the state).

What happens if a LangGraph node returns {'messages': [new_msg]} and the messages field has no reducer?
What Python construct is used to attach a custom reducer to a TypedDict field?
42. What is the difference between MessageGraph and StateGraph?

MessageGraph is a specialised version of StateGraph where the entire state is a single list of messages (using the add_messages reducer). Nodes receive the message list and return new messages to append. StateGraph is the general-purpose graph where you define any TypedDict as the state, with full control over all fields and their reducers.

FeatureMessageGraphStateGraph
State structureAlways a list of BaseMessage objectsAny TypedDict with any fields
Node inputList of messagesFull state dict
Node outputOne or more messages to appendPartial dict of any fields to update
Custom fieldsNot supportedAny fields: scores, iteration counts, flags, etc.
StatusSimpler but less flexibleRecommended for all but trivial chatbots
from langgraph.graph import MessageGraph

# MessageGraph - state is just the messages list
graph = MessageGraph()
graph.add_node("model", lambda msgs: llm.invoke(msgs))
graph.set_entry_point("model")
graph.set_finish_point("model")

MessageGraph was the original LangGraph API and is still useful for pure chatbot flows with no additional state. For anything more complex, StateGraph with Annotated[list, add_messages] for the messages field is preferred because it lets you add other state fields alongside the conversation history.

What is the state structure in a MessageGraph?
Why is StateGraph preferred over MessageGraph for most production agents?
43. How does checkpointing work in LangGraph?

LangGraph's checkpointing system saves the full graph state after every node execution to a persistent store. This enables resuming interrupted runs, time-travel debugging (replay from any past state), and human-in-the-loop workflows (pause, inspect, modify state, then continue).

To enable checkpointing, pass a checkpointer to graph.compile() and provide a thread_id in the config on each invocation. The thread_id is the key that groups checkpoints belonging to the same conversation or workflow run:

from langgraph.checkpoint.memory import MemorySaver

memory = MemorySaver()  # in-memory, for development
graph = graph_builder.compile(checkpointer=memory)

config = {"configurable": {"thread_id": "user-123-session-1"}}

# First invocation
graph.invoke({"messages": [HumanMessage("Hello")]}, config)

# Second invocation — LangGraph automatically loads the previous state
graph.invoke({"messages": [HumanMessage("What did I say?")]}, config)

Checkpointer options: MemorySaver (in-process, ephemeral), SqliteSaver (persistent SQLite file, single-process), AsyncSqliteSaver (async SQLite), PostgresSaver / AsyncPostgresSaver (production multi-process). All implement the BaseCheckpointSaver interface, so switching backends requires only changing the checkpointer passed to compile().

What does the thread_id in the config identify when using LangGraph checkpointing?
Which checkpointer is recommended for production multi-process deployments?
44. How do you implement human-in-the-loop with LangGraph?

Human-in-the-loop (HITL) in LangGraph means pausing graph execution at a specified point so a human can inspect the current state, approve an action, or modify a value before the graph continues. This is a first-class LangGraph feature built on top of checkpointing.

Step 1: Compile the graph with interrupt_before or interrupt_after

from langgraph.checkpoint.memory import MemorySaver

graph = graph_builder.compile(
    checkpointer=MemorySaver(),
    interrupt_before=["tool_executor"],  # pause before this node runs
)

Step 2: Run until the interrupt

config = {"configurable": {"thread_id": "session-1"}}
for event in graph.stream({"messages": [HumanMessage("Search for X")]}, config):
    print(event)  # stops before tool_executor

Step 3: Inspect and optionally update state

current_state = graph.get_state(config)
print(current_state.values)  # see what the agent is about to do

# Optionally modify the state before continuing:
graph.update_state(config, {"messages": [HumanMessage("Actually search for Y")]}, as_node="agent")

Step 4: Resume execution

# Pass None as input to resume from the checkpoint
for event in graph.stream(None, config):
    print(event)
What argument to graph.compile() causes execution to pause before a specific node?
How do you resume a LangGraph run after a human-in-the-loop pause?
45. How do you build multi-agent systems with LangGraph?

Multi-agent systems in LangGraph are built by representing each agent as a node (or subgraph) and connecting them with edges that define how work is handed off. The most common architecture is the supervisor pattern: one supervisor agent receives the user request, decides which specialist agent should handle it, routes to that agent, and continues routing until the task is complete.

from langgraph.graph import StateGraph, START, END
from typing import Literal

class MultiAgentState(TypedDict):
    messages: Annotated[list, operator.add]
    next_agent: str

def supervisor(state):
    # Supervisor LLM decides which agent goes next
    response = supervisor_llm.invoke(state["messages"])
    return {"next_agent": response.next}  # 'researcher', 'coder', or 'FINISH'

def route_from_supervisor(state) -> Literal["researcher", "coder", END]:
    return state["next_agent"] if state["next_agent"] != "FINISH" else END

graph = StateGraph(MultiAgentState)
graph.add_node("supervisor", supervisor)
graph.add_node("researcher", researcher_agent)
graph.add_node("coder", coder_agent)

graph.add_conditional_edges("supervisor", route_from_supervisor)
graph.add_edge("researcher", "supervisor")  # always report back
graph.add_edge("coder", "supervisor")
graph.add_edge(START, "supervisor")

An alternative is the network pattern where agents can hand off directly to each other without a central supervisor. Both patterns use shared state in the TypedDict to pass context between agents.

In the supervisor multi-agent pattern, what is the supervisor's role?
After a specialist agent (e.g. researcher) finishes its work, where does it route in the supervisor pattern?
46. What are subgraphs in LangGraph?

A subgraph in LangGraph is a compiled graph that is used as a node inside a parent graph. Subgraphs allow you to encapsulate complex, reusable agent logic and compose multiple graphs hierarchically — exactly like functions in programming, where a subgraph is the 'function' and the parent graph is the 'caller'.

from langgraph.graph import StateGraph, START, END

# --- Define the subgraph ---
class SubgraphState(TypedDict):
    messages: Annotated[list, operator.add]
    search_results: list

sub_builder = StateGraph(SubgraphState)
sub_builder.add_node("search", search_node)
sub_builder.add_node("summarise", summarise_node)
sub_builder.add_edge(START, "search")
sub_builder.add_edge("search", "summarise")
sub_builder.add_edge("summarise", END)
research_subgraph = sub_builder.compile()

# --- Use it as a node in the parent graph ---
class ParentState(TypedDict):
    messages: Annotated[list, operator.add]

parent_builder = StateGraph(ParentState)
parent_builder.add_node("research", research_subgraph)  # subgraph as node
parent_builder.add_node("answer", answer_node)
parent_builder.add_edge(START, "research")
parent_builder.add_edge("research", "answer")
parent_builder.add_edge("answer", END)

graph = parent_builder.compile()

State key overlap between parent and subgraph determines how data flows between them. Keys present in both states are automatically mapped. Subgraphs can have their own checkpointers for independent persistence, or inherit the parent's checkpointer.

How is a compiled subgraph added to a parent LangGraph?
How does the parent graph pass state into a subgraph?
47. How do streaming and callbacks work in LangGraph?

LangGraph's .stream() and .astream() methods yield events as each node finishes executing, rather than waiting for the full graph to complete. The stream_mode parameter controls what is yielded.

The three main stream modes:

  • stream_mode='updates' (default) — yields the state update returned by each node as {node_name: {updated_keys}}
  • stream_mode='values' — yields the full state after each node runs
  • stream_mode='debug' — yields detailed debug events for each step
config = {"configurable": {"thread_id": "1"}}

# Stream node updates
for event in graph.stream({"messages": [HumanMessage("Hello")]}, config, stream_mode="updates"):
    node_name, state_update = list(event.items())[0]
    print(f"Node '{node_name}' updated: {list(state_update.keys())}")

# Stream token-by-token from LLM inside a node
async for event in graph.astream_events({"messages": [...]}, config, version="v2"):
    if event["event"] == "on_chat_model_stream":
        print(event["data"]["chunk"].content, end="")

For token-level streaming from LLMs called inside nodes, use astream_events() which propagates the standard LangChain callback events (on_chat_model_stream, on_tool_start, on_tool_end) through the entire graph execution tree.

What does stream_mode='values' yield from LangGraph.stream()?
Which method gives you token-by-token streaming from LLMs inside LangGraph nodes?
48. What are persistence patterns in LangGraph?

Persistence in LangGraph means saving graph state so it survives process restarts, can be resumed after interrupts, and can be inspected or replayed at any past checkpoint. All persistence goes through the checkpointer interface, so the storage backend is swappable without changing application code.

CheckpointerStorageUse Case
MemorySaverPython dict, in-processDevelopment, unit tests
SqliteSaverSQLite fileSingle-process apps, CLI tools
AsyncSqliteSaverSQLite file (async)Async single-process servers
PostgresSaverPostgreSQLMulti-process production (sync)
AsyncPostgresSaverPostgreSQL (async)Multi-process production (async FastAPI)
from langgraph.checkpoint.sqlite import SqliteSaver
import sqlite3

conn = sqlite3.connect("checkpoints.db", check_same_thread=False)
saver = SqliteSaver(conn)

graph = graph_builder.compile(checkpointer=saver)

# Retrieve past state for a thread
state = graph.get_state({"configurable": {"thread_id": "user-1"}})

# List all past checkpoints
for checkpoint in graph.get_state_history({"configurable": {"thread_id": "user-1"}}):
    print(checkpoint.config, checkpoint.created_at)
Which persistence backend is appropriate for a production multi-process FastAPI deployment?
What method retrieves a list of all past checkpoints for a given thread?
49. How do you handle errors in LangGraph?

Error handling in LangGraph is explicit — errors in nodes are not automatically caught or retried. If a node raises an unhandled exception, the graph execution stops and the exception propagates to the caller. This is intentional: LangGraph wants you to be explicit about failure modes rather than silently swallowing errors.

Approach 1: try/except inside node functions — the most common pattern. Catch the error, add a diagnostic message to state, and route to an error-recovery node:

def call_tool(state: AgentState) -> dict:
    try:
        result = tool.invoke(state["tool_input"])
        return {"messages": [ToolMessage(content=result, ...)]}
    except Exception as e:
        return {"messages": [ToolMessage(content=f"Error: {e}", ...)]}

Approach 2: error recovery edges — route to a dedicated error handler node using a conditional edge that inspects whether the last message signals an error:

def should_retry(state) -> str:
    last = state["messages"][-1].content
    if last.startswith("Error:"):
        return "error_handler"
    return "continue"

graph.add_conditional_edges("tool_node", should_retry, {"error_handler": "error_handler", "continue": "agent"})

For transient external service errors (rate limits, timeouts), wrap the relevant LangChain component with .with_retry() before using it inside a node.

What happens in LangGraph if a node raises an unhandled exception?
What is the recommended pattern for handling recoverable tool errors in LangGraph?
50. How do you deploy LangGraph applications?

LangGraph applications can be deployed in three main ways: LangGraph Cloud (managed service), self-hosted with Docker + FastAPI, and embedded in a larger application. The right choice depends on your team's infrastructure requirements and SLA needs.

LangGraph Cloud — LangChain's managed deployment platform. You push your graph code to a GitHub repo, connect it to LangGraph Cloud, and it handles scaling, checkpointing (PostgreSQL), streaming, and monitoring automatically. Provides REST and WebSocket APIs out of the box.

Self-hosted FastAPI — wrap the compiled graph with a FastAPI app and use PostgresSaver for multi-process state:

from fastapi import FastAPI
from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver

app = FastAPI()

@app.on_event("startup")
async def startup():
    saver = await AsyncPostgresSaver.from_conn_string(DB_URL)
    global graph
    graph = graph_builder.compile(checkpointer=saver)

@app.post("/chat/{thread_id}")
async def chat(thread_id: str, message: str):
    config = {"configurable": {"thread_id": thread_id}}
    result = await graph.ainvoke({"messages": [HumanMessage(message)]}, config)
    return {"response": result["messages"][-1].content}

Containerise with Docker, expose via Kubernetes or a managed container service, and use LangSmith for production observability.

Which checkpointer should you use when deploying LangGraph with multiple worker processes?
What is LangGraph Cloud?
«
»
LangGraph LangChain Interview questions II

Comments & Discussions