LCEL (LangChain Expression Language) uses | pipe operator for chaining. Basic pattern: chain = prompt | model | parser. Example: from langchain_core.prompts import ChatPromptTemplate; from langchain_openai import ChatOpenAI; from langchain_core.output_parsers import StrOutputParser; prompt = ChatPromptTemplate.from_template('Tell me a joke about {topic}'); model = ChatOpenAI(model='gpt-4'); parser = StrOutputParser(); chain = prompt | model | parser; result = chain.invoke({'topic': 'programming'}). Each component must implement Runnable interface. Pipes automatically handle input/output types - prompt outputs messages, model outputs AIMessage, parser extracts string. Async version: await chain.ainvoke(). Advantages over LLMChain (deprecated v0.2): type safety, streaming support, better parallelization. Use .with_config() for runtime options like callbacks, tags, metadata.
LangChain Lcel Fundamentals FAQ & Answers
50 expert LangChain Lcel Fundamentals answers researched from official documentation. Every answer cites authoritative sources you can verify.
unknown
50 questionsRunnable is the base interface for all LCEL components. Required methods: invoke(input), stream(input), batch(inputs), async variants (ainvoke, astream, abatch). Create custom: from langchain_core.runnables import RunnableLambda; def my_func(x): return x.upper(); runnable = RunnableLambda(my_func); result = runnable.invoke('hello'). For class-based: from langchain_core.runnables import Runnable; class MyRunnable(Runnable): def invoke(self, input, config=None): return self.process(input); def process(self, x): return x * 2. Runnables composable via | operator. Use RunnablePassthrough for identity, RunnableBranch for conditionals. Config parameter passes runtime options (callbacks, recursion_limit). Type hints: Runnable[InputType, OutputType]. Streaming: yield chunks in stream() method. Essential for custom LangChain components in v0.2+.
LCEL streaming uses .stream() method instead of .invoke(). Pattern: for chunk in chain.stream(input): print(chunk, end='', flush=True). Async: async for chunk in chain.astream(input). Streaming works through entire chain - if model streams tokens, parser receives them incrementally. Example with ChatOpenAI: from langchain_openai import ChatOpenAI; from langchain_core.prompts import ChatPromptTemplate; from langchain_core.output_parsers import StrOutputParser; prompt = ChatPromptTemplate.from_template('{question}'); model = ChatOpenAI(streaming=True); parser = StrOutputParser(); chain = prompt | model | parser; for token in chain.stream({'question': 'Explain LCEL'}): print(token, end=''). Streaming intermediate steps: use .astream_events() (v0.2+) to see all chain events. Custom streaming: yield in RunnableLambda. Streaming with tools: use stream_mode='values' in astream_events(). Essential for real-time UIs and reducing perceived latency.
RunnablePassthrough passes input through unchanged or adds fields. Two modes: identity (passthrough) and assignment (add keys). Identity: from langchain_core.runnables import RunnablePassthrough; chain = RunnablePassthrough() | some_function. Assignment pattern: RunnablePassthrough.assign(new_key=lambda x: process(x)). Example combining both: from langchain_core.prompts import ChatPromptTemplate; from langchain_core.runnables import RunnablePassthrough; prompt = ChatPromptTemplate.from_template('Context: {context}\n\nQuestion: {question}'); chain = {'context': retriever, 'question': RunnablePassthrough()} | prompt | model. Common pattern - preserve original input while adding fields: chain = RunnablePassthrough.assign(embedding=embedder, metadata=lambda x: get_meta(x)) | downstream. Access original: use itemgetter from operator for dict keys. Use case: RAG chains where you need both retrieved context and original question. Replaces RunnableMap pattern from v0.1.
bind_tools() attaches tools to models supporting function calling. Pattern: model_with_tools = model.bind_tools([tool1, tool2]); response = model_with_tools.invoke('use the calculator'). Tool definition: from langchain_core.tools import tool; @tool def multiply(a: int, b: int) -> int: '''Multiply two numbers'''; return a * b. Models return AIMessage with tool_calls attribute. Extract: if response.tool_calls: for call in response.tool_calls: tool_name = call['name']; args = call['args']. Execute tools: from langchain_core.messages import ToolMessage; tool_output = multiply.invoke(args); tool_msg = ToolMessage(content=str(tool_output), tool_call_id=call['id']). Complete agentic loop: while True: response = model.invoke(messages); if not response.tool_calls: break; execute tools, append results to messages. Replaces deprecated functions parameter (pre-v0.2). Use with_structured_output() for Pydantic schema binding.
Use RunnableParallel or dict syntax for parallel execution. Dict syntax: from langchain_core.runnables import RunnableParallel; parallel = {'summary': chain1, 'keywords': chain2, 'sentiment': chain3}; results = parallel.invoke(input). Returns dict with all results. Explicit RunnableParallel: parallel = RunnableParallel(summary=chain1, keywords=chain2); same behavior. Parallel with passthrough: {'original': RunnablePassthrough(), 'processed': processing_chain}. Execution: truly parallel with asyncio - use .batch() or .ainvoke() for concurrency. Example RAG with multiple retrievers: retrievers = RunnableParallel(wiki=wiki_retriever, docs=doc_retriever); combined = retrievers | merge_contexts | prompt | model. Error handling: if one branch fails, entire parallel call fails (use RunnableBranch for conditional). Max concurrency: config={'max_concurrency': 5}. Memory efficient - results streamed as available.
Use .with_retry() for automatic retries. Pattern: chain_with_retry = chain.with_retry(stop_after_attempt=3, wait_exponential_multiplier=1). Requires tenacity library: pip install tenacity. Custom retry logic: from tenacity import retry, stop_after_attempt, wait_fixed; @retry(stop=stop_after_attempt(3), wait=wait_fixed(2)); def invoke_with_retry(chain, input): return chain.invoke(input). Error handling with try-except: from langchain_core.runnables import RunnableLambda; def safe_invoke(x): try: return risky_chain.invoke(x); except Exception as e: return {'error': str(e)}; safe_chain = RunnableLambda(safe_invoke). Fallback chains: chain_with_fallback = primary_chain.with_fallbacks([fallback_chain1, fallback_chain2]). Config-level retries: chain.invoke(input, config={'max_retries': 3}). Timeout: chain.invoke(input, config={'timeout': 30}). Use callbacks for error logging: chain.with_config(callbacks=[error_logger]).
Three core Runnable methods with distinct use cases. invoke(input): synchronous single execution, returns complete result. stream(input): returns iterator yielding incremental results (tokens, chunks). batch(inputs): processes multiple inputs efficiently, returns list of results. Example: model.invoke('hi') returns full response. model.stream('hi') yields tokens one by one. model.batch(['hi', 'hello', 'hey']) returns [response1, response2, response3] with internal batching optimization. Async variants: ainvoke, astream, abatch for concurrent execution. Performance: batch() ~3-10x faster than loop of invoke() calls (API batching, connection pooling). Streaming essential for UIs - reduces time-to-first-token. Memory: stream uses O(1), invoke/batch use O(n). Config applies to all: model.with_config(temperature=0.7).invoke(). Chain methods: (prompt | model).batch([input1, input2]) batches through entire chain. Use abatch with asyncio.gather for max throughput.
RunnableBranch routes inputs based on conditions. Pattern: from langchain_core.runnables import RunnableBranch; branch = RunnableBranch((condition1, chain1), (condition2, chain2), default_chain). Conditions are functions taking input, returning bool. Example: def is_question(x): return '?' in x; branch = RunnableBranch((is_question, qa_chain), (lambda x: 'help' in x.lower(), help_chain), general_chain). Advanced pattern with structured input: branch = RunnableBranch((lambda x: x['type'] == 'code', code_chain), (lambda x: x['type'] == 'text', text_chain), error_chain); result = branch.invoke({'type': 'code', 'content': '...'}). Lazy evaluation - only matched chain executes. Combine with RunnablePassthrough to preserve input: {'original': RunnablePassthrough(), 'routed': branch}. Use case: multi-intent routing, error handling, language detection. Alternative: nested if-else with RunnableLambda for simple cases.
Use set_debug(True) and callbacks for visibility. Basic debug: import langchain; langchain.debug = True; then run chain - prints all intermediate steps. Programmatic: from langchain.globals import set_debug, set_verbose; set_debug(True); set_verbose(True). Callbacks for custom logging: from langchain.callbacks import StdOutCallbackHandler; chain.invoke(input, config={'callbacks': [StdOutCallbackHandler()]}). Stream events for detailed tracing: async for event in chain.astream_events(input, version='v2'): print(event). Event types: on_chain_start, on_llm_start, on_tool_start, on_chain_end. LangSmith integration (production): export LANGCHAIN_TRACING_V2=true; export LANGCHAIN_API_KEY=...; automatic tracing to LangSmith dashboard. Print intermediate: chain = step1 | RunnableLambda(lambda x: print(f'After step1: {x}') or x) | step2. Type errors: check Runnable[InputType, OutputType] signatures. Use .get_graph() to visualize: chain.get_graph().print_ascii().
RAG pattern with LCEL: retriever | prompt | model | parser. Setup: from langchain_community.vectorstores import FAISS; from langchain_openai import OpenAIEmbeddings, ChatOpenAI; from langchain_core.prompts import ChatPromptTemplate; from langchain_core.runnables import RunnablePassthrough; from langchain_core.output_parsers import StrOutputParser. Vectorstore: vectorstore = FAISS.from_texts(['doc1', 'doc2'], OpenAIEmbeddings()). Retriever: retriever = vectorstore.as_retriever(search_kwargs={'k': 4}). Chain: prompt = ChatPromptTemplate.from_template('Context: {context}\n\nQuestion: {question}\n\nAnswer:'); chain = {'context': retriever | format_docs, 'question': RunnablePassthrough()} | prompt | ChatOpenAI() | StrOutputParser(). Helper: def format_docs(docs): return '\n\n'.join([d.page_content for d in docs]). Invoke: answer = chain.invoke('What is...?'). Advanced: add retrieval with sources: chain = RunnablePassthrough.assign(context=retriever | format_docs) | prompt | model. Returns both context and answer.
with_structured_output() forces models to return Pydantic objects. Pattern: from pydantic import BaseModel, Field; from langchain_openai import ChatOpenAI; class Person(BaseModel): name: str = Field(description='Person name'); age: int = Field(description='Person age'); model = ChatOpenAI(model='gpt-4'); structured_model = model.with_structured_output(Person); result = structured_model.invoke('John is 25 years old'); print(result.name, result.age). Works with OpenAI, Anthropic, Google models supporting function calling. Chain example: chain = prompt | model.with_structured_output(ResponseSchema) | processing_step. Validation: Pydantic validates automatically, raises ValidationError on schema mismatch. Optional fields: use Optional[str] or default values. Lists: class People(BaseModel): people: List[Person]. Replaces deprecated OutputFixingParser and PydanticOutputParser (v0.1). Use method='json_mode' parameter for models without function calling. Essential for type-safe agent outputs.
Messages are core to LCEL chat chains. Types: HumanMessage (user), AIMessage (model), SystemMessage (instructions), ToolMessage (tool results). Import: from langchain_core.messages import HumanMessage, AIMessage, SystemMessage, ToolMessage. Usage in chains: messages = [SystemMessage(content='You are helpful'), HumanMessage(content='Hi')]; response = model.invoke(messages). AIMessage has tool_calls attribute when model uses tools. ToolMessage links back: tool_msg = ToolMessage(content=str(result), tool_call_id=call['id'], name='tool_name'). Message history: messages.append(response); messages.append(tool_msg). Convert strings: from langchain_core.messages import convert_to_messages; msgs = convert_to_messages(['user: hi', 'assistant: hello']). Access content: msg.content, msg.additional_kwargs, msg.response_metadata. ChatPromptTemplate converts to messages automatically. State management: pass message list through chain with RunnablePassthrough. Critical for agentic loops and multi-turn conversations.
ChatMessageHistory stores conversation state for LCEL chains. Pattern: from langchain_core.chat_history import InMemoryChatMessageHistory; from langchain_core.runnables.history import RunnableWithMessageHistory; store = {}; def get_session_history(session_id: str): if session_id not in store: store[session_id] = InMemoryChatMessageHistory(); return store[session_id]; chain_with_history = RunnableWithMessageHistory(chain, get_session_history, input_messages_key='input', history_messages_key='history'); response = chain_with_history.invoke({'input': 'Hi, I am Bob'}, config={'configurable': {'session_id': 'user123'}}). Persistent storage: from langchain_community.chat_message_histories import RedisChatMessageHistory or FileChatMessageHistory. Prompt must include history: ChatPromptTemplate.from_messages([('system', 'You are helpful'), MessagesPlaceholder('history'), ('human', '{input}')]). Trimming: use trim_messages for token limits. State access: history.messages returns list. Clear: history.clear(). Replaces deprecated ConversationBufferMemory (v0.1). Essential for stateful chatbots.
Tool errors must be caught and returned as ToolMessage. Pattern: from langchain_core.messages import ToolMessage; try: result = tool.invoke(args); tool_msg = ToolMessage(content=str(result), tool_call_id=call_id); except Exception as e: tool_msg = ToolMessage(content=f'Error: {str(e)}', tool_call_id=call_id, additional_kwargs={'error': True}). Model receives error and can retry or adjust. Validation before invocation: if 'required_param' not in args: return ToolMessage(content='Missing required_param', tool_call_id=call_id). Timeout handling: use asyncio.wait_for for async tools: try: result = await asyncio.wait_for(tool.ainvoke(args), timeout=5.0); except asyncio.TimeoutError: return ToolMessage(content='Tool timeout', tool_call_id=call_id). Loop max iterations: for i in range(max_iterations): if i == max_iterations - 1: break loop, return partial result. Use RunnableBranch to route error messages to fallback chain. Critical: always return ToolMessage even on error - breaking message sequence corrupts conversation state.
Use config parameter for runtime control. Timeout: chain.invoke(input, config={'timeout': 30}). Raises TimeoutError after 30 seconds. Concurrency for batch: chain.batch(inputs, config={'max_concurrency': 5}). Limits parallel executions to 5. Recursion limit: chain.invoke(input, config={'recursion_limit': 20}). Prevents infinite loops in recursive chains. Multiple configs: config={'timeout': 30, 'max_concurrency': 3, 'recursion_limit': 15}. Per-runnable config: chain_with_timeout = chain.with_config(timeout=10). Async timeout: import asyncio; try: result = await asyncio.wait_for(chain.ainvoke(input), timeout=5.0); except asyncio.TimeoutError: handle_timeout(). RunnableConfig type: from langchain_core.runnables import RunnableConfig; config = RunnableConfig(timeout=30, max_concurrency=5). Apply to parallel chains: parallel_chain.invoke(input, config={'max_concurrency': 10}). Essential for production: prevents hanging chains, controls resource usage, enforces SLAs.
Use callbacks for cost tracking. Pattern: from langchain.callbacks import get_openai_callback; with get_openai_callback() as cb: result = chain.invoke(input); print(f'Tokens: {cb.total_tokens}, Cost: ${cb.total_cost}'). Works with OpenAI models. Custom callback: from langchain.callbacks.base import BaseCallbackHandler; class CostTracker(BaseCallbackHandler): def init(self): self.total_tokens = 0; def on_llm_end(self, response, **kwargs): self.total_tokens += response.llm_output.get('token_usage', {}).get('total_tokens', 0). Use: tracker = CostTracker(); chain.invoke(input, config={'callbacks': [tracker]}). For streaming: callback captures tokens as they arrive. Multi-model chains: separate callbacks per model. Token counting without API: from langchain.callbacks import get_openai_callback returns prompt_tokens, completion_tokens, total_tokens. Budget enforcement: raise exception if cb.total_cost > budget. LangSmith integration: automatic cost tracking in dashboard. Essential for production cost monitoring and optimization.
LangChain supports local models via Ollama and LlamaCpp. Ollama: from langchain_community.llms import Ollama; model = Ollama(model='llama2', base_url='http://localhost:11434'); chain = prompt | model | parser; result = chain.invoke(input). ChatOllama for chat: from langchain_community.chat_models import ChatOllama; model = ChatOllama(model='llama2'). LM Studio: from langchain_community.llms import LlamaCpp; model = LlamaCpp(model_path='/path/to/model.gguf', n_ctx=2048, n_threads=4); chain = prompt | model. Streaming: both support .stream() method. Tool calling: use ChatOllama with models supporting function calling (llama3.1+). Config: temperature, top_p via model parameters. Performance: n_threads for CPU, n_gpu_layers for GPU acceleration. GGUF format recommended. Callback: track token/second with callbacks. Prompt format: some models need specific templates - check model docs. Use .bind() for model-specific parameters. No API keys needed. Essential for privacy-sensitive applications and offline deployment.
Custom parsers transform model output. Pattern: from langchain_core.output_parsers import BaseOutputParser; class MyParser(BaseOutputParser[dict]): def parse(self, text: str) -> dict: import json; return json.loads(text.strip()). Use: chain = prompt | model | MyParser(); result = chain.invoke(input). JSON extraction with error handling: class JsonParser(BaseOutputParser[dict]): def parse(self, text: str) -> dict: try: return json.loads(text); except: import re; match = re.search(r'{.*}', text, re.DOTALL); return json.loads(match.group()) if match else {}. Pydantic-based: from pydantic import BaseModel; class Output(BaseModel): answer: str; class PydanticParser(BaseOutputParser[Output]): def parse(self, text: str) -> Output: return Output(answer=text.strip()). Streaming parser: implement parse_iter for token-by-token processing. Use RunnableLambda for simple parsing: parser = RunnableLambda(lambda x: x.content.upper()). Composable: chain = prompt | model | parser1 | parser2. Essential for structured extraction and validation.
State management uses RunnablePassthrough.assign() pattern. Basic state: from langchain_core.runnables import RunnablePassthrough; chain = RunnablePassthrough.assign(count=lambda x: x.get('count', 0) + 1) | next_step. Accumulator pattern: chain = RunnablePassthrough.assign(results=lambda x: x.get('results', []) + [x['current']]). Multi-step state: chain = RunnablePassthrough.assign(step1_result=step1) | RunnablePassthrough.assign(step2_result=step2). Access previous state: each step receives full state dict. Conditional state updates: use RunnableBranch to update state based on conditions. Message state for chat: messages = []; def add_message(x): messages.append(x['msg']); return {'messages': messages}; chain = RunnableLambda(add_message) | next_step. Persistent state: external store (Redis, DynamoDB) with getter/setter wrapped in RunnableLambda. Type-safe state: from typing import TypedDict; class State(TypedDict): count: int; results: List[str]. Essential for multi-turn agents and stateful workflows.
Agentic loop pattern: bind tools, invoke, execute tools, repeat. Setup: from langchain_core.messages import HumanMessage, AIMessage, ToolMessage; from langchain_core.tools import tool; @tool def search(query: str): return f'Results for {query}'; model_with_tools = model.bind_tools([search]). Loop: messages = [HumanMessage(content='Find info on X')]; for i in range(max_iterations): response = model_with_tools.invoke(messages); messages.append(response); if not response.tool_calls: break; for call in response.tool_calls: result = search.invoke(call['args']); messages.append(ToolMessage(content=str(result), tool_call_id=call['id'])). LCEL chain version: use while loop with RunnablePassthrough for state. Error handling: wrap tool execution in try/except, return ToolMessage with error. Max iterations: prevent infinite loops. Early exit: check for specific conditions in response. State tracking: track iteration count, tool usage, costs. Alternative: use LangGraph for complex agent graphs. Essential for autonomous agents that use tools iteratively.
RunnableConfig controls runtime behavior. Import: from langchain_core.runnables import RunnableConfig. Create: config = RunnableConfig(tags=['production'], metadata={'user_id': '123'}, callbacks=[my_callback], timeout=30, max_concurrency=5, recursion_limit=20). Apply: chain.invoke(input, config=config). Tags for filtering: config={'tags': ['api', 'v2']} - useful in LangSmith. Metadata: arbitrary dict for tracking: config={'metadata': {'session_id': 'abc', 'version': '1.0'}}. Callbacks: list of callback handlers: config={'callbacks': [ConsoleCallbackHandler(), CustomLogger()]}. Configurable fields: chain.with_config(configurable={'llm_model': 'gpt-4'}) then select at runtime: chain.invoke(input, config={'configurable': {'llm_model': 'gpt-3.5-turbo'}}). Merge configs: config1.merge(config2). Access in custom runnables: def invoke(self, input, config: RunnableConfig): timeout = config.get('timeout', 30). Essential for multi-tenant apps, A/B testing, feature flags.
operator.itemgetter extracts dict keys for LCEL chains. Import: from operator import itemgetter, attrgetter. Basic: from langchain_core.runnables import RunnablePassthrough; chain = {'question': itemgetter('q'), 'context': retriever} | prompt | model. Extracts 'q' key from input dict. Multiple keys: itemgetter('key1', 'key2') returns tuple. Nested: itemgetter('user', 'preferences', 'language') - but limited, better use lambda. Attrgetter for objects: attrgetter('user.name') extracts attribute. Combine with RunnablePassthrough: chain = RunnablePassthrough.assign(formatted=itemgetter('raw_data') | format_function). Common pattern - extract specific fields: {'context': itemgetter('docs') | format_docs, 'question': itemgetter('query')} | prompt. Alternative: lambda x: x['key'] - more flexible but less readable. Use case: transform input shape to match prompt template requirements. Cleaner than custom RunnableLambda for simple key extraction.
Multiple retrievers with RunnableParallel or ensemble. Parallel: from langchain_core.runnables import RunnableParallel; retrievers = RunnableParallel(wiki=wiki_retriever, docs=doc_retriever, web=web_retriever); results = retrievers.invoke(query). Returns dict with all results. Merge: def merge_docs(results): return results['wiki'] + results['docs'] + results['web']; chain = retrievers | RunnableLambda(merge_docs) | rerank | prompt | model. Ensemble with EnsembleRetriever: from langchain.retrievers import EnsembleRetriever; ensemble = EnsembleRetriever(retrievers=[bm25_retriever, vector_retriever], weights=[0.5, 0.5]). Reciprocal Rank Fusion: combines scores automatically. Sequential fallback: primary = retriever1; fallback = retriever1.with_fallbacks([retriever2, retriever3]). Conditional retrieval: use RunnableBranch to select retriever based on query type. Reranking: chain = retriever | reranker | top_k_filter. Hybrid search: combine sparse (BM25) + dense (vector) retrievers for better recall. Essential for production RAG systems.
astream_events() streams all chain events for observability. Pattern: async for event in chain.astream_events(input, version='v2'): print(event['event'], event['name'], event.get('data')). Event types: on_chain_start, on_chain_end, on_llm_start, on_llm_end, on_tool_start, on_tool_end, on_retriever_start, on_retriever_end. Filter by event type: if event['event'] == 'on_llm_start': log_prompt(event['data']['input']). Access intermediate outputs: on_chain_end events contain 'output' key. Track tokens: on_llm_end has 'output' with token_usage. Version: must specify version='v2' (newer event format). Include types: astream_events(input, version='v2', include_types=['llm', 'tool']). Exclude: exclude_types=['retriever']. Streaming chunks: on_llm_stream events for token-by-token. Tags filter: include_tags=['critical'], exclude_tags=['debug']. Production use: send events to logging/monitoring system. Debug: inspect full chain execution flow. Better than callbacks for async streaming observability. Essential for complex chain debugging.
RunnableSequence represents chained runnables where output of each is input to next. Created via pipe operator: chain = prompt | model | parser becomes RunnableSequence. Direct instantiation: from langchain_core.runnables import RunnableSequence; seq = RunnableSequence(first=prompt, middle=[transform1, transform2], last=parser). Access components: seq.first returns first runnable, seq.middle returns list of middle runnables, seq.last returns last runnable. Inspect: use seq.get_graph() to visualize chain structure. Modify: cannot modify after creation - create new sequence. Common use: debugging complex chains to identify bottlenecks. Example: seq.first.invoke(input) tests first step only. Streaming: seq preserves streaming if all components support it. Batch: automatically parallelizes across inputs. Type checking: RunnableSequence[InputType, OutputType] for type safety. Most important composition operator in LangChain - used in virtually every chain. Auto-created by | operator, rarely instantiated directly.
Dynamic chains built by composing runnables conditionally. Pattern: def build_chain(use_history: bool): base = prompt | model | parser; return RunnableWithMessageHistory(base, get_history) if use_history else base. Conditional routing with RunnableBranch: branch = RunnableBranch((lambda x: x['type'] == 'sql', sql_chain), (lambda x: x['type'] == 'api', api_chain), default_chain). Dynamic tools: model_with_tools = model.bind_tools(select_tools(user_permissions)). Config-based selection: chain.with_config(configurable={'llm': 'gpt-4'}) then invoke with config={'configurable': {'llm': 'gpt-3.5-turbo'}}. Runtime prompt selection: prompts = {'formal': formal_prompt, 'casual': casual_prompt}; chain = RunnableLambda(lambda x: prompts[x['style']]) | model. Parallel branches: RunnableParallel({k: v for k, v in branches.items() if should_include(k)}). Factory pattern: def make_rag_chain(retriever, reranker=None): chain = retriever; if reranker: chain = chain | reranker; return chain | prompt | model. Essential for multi-tenant apps and A/B testing.
get_graph() visualizes chain structure, get_prompts() extracts prompts. Graph introspection: graph = chain.get_graph(); graph.print_ascii() shows ASCII diagram of chain flow. Useful for debugging complex chains. Graph nodes: graph.nodes shows all components, graph.edges shows connections. Export: graph.to_json() for external visualization tools. Prompt extraction: prompts = chain.get_prompts(config=None) returns list of ChatPromptTemplate and PromptTemplate instances used in chain. Inspect templates: for p in prompts: print(p.messages) to see all message templates. Dynamic prompts: get_prompts(config={'configurable': {'prompt_version': 'v2'}}) gets prompts for specific config. Use case: extract prompts for LangSmith logging, audit for prompt injection risks, version control prompts separately. Limitations: get_prompts() only works with chains containing PromptTemplate/ChatPromptTemplate, not string prompts. Graph depth: shows full nested chain structure including parallel/conditional branches. Essential for production chain monitoring and debugging.
Custom chains extend Runnable base class. Pattern: from langchain_core.runnables import Runnable, RunnableConfig; class MyChain(Runnable): def invoke(self, input: dict, config: RunnableConfig = None) -> dict: # custom logic; return result; def stream(self, input: dict, config: RunnableConfig = None): yield from self._stream_impl(input); async def ainvoke(self, input: dict, config: RunnableConfig = None) -> dict: # async version. Required methods: invoke(), optional: stream(), batch(), ainvoke(), astream(), abatch(). For serialization: inherit from RunnableSerializable, implement is_lc_serializable() -> True and lc_secrets property. Type hints: class MyChain(Runnable[InputType, OutputType]). Access config: callbacks = config.get('callbacks', []); tags = config.get('tags', []); metadata = config.get('metadata', {}). Compose with LCEL: custom_chain | other_runnable works automatically. Override batch(): def batch(self, inputs, config=None): # optimized batch logic. Use RunnableLambda for simple functions: chain = RunnableLambda(my_func). Essential for reusable custom logic that integrates with LCEL ecosystem.
LangServe deploys LCEL chains as REST APIs. Setup: pip install langserve[all]. Create server: from fastapi import FastAPI; from langserve import add_routes; app = FastAPI(); add_routes(app, chain, path='/my_chain'). Endpoints auto-created: /my_chain/invoke (POST), /my_chain/batch (POST), /my_chain/stream (POST), /my_chain/stream_log (POST for intermediate steps), /my_chain/playground (GET for UI). Run: uvicorn main:app --host 0.0.0.0 --port 8000. Client: from langserve import RemoteRunnable; remote_chain = RemoteRunnable('http://localhost:8000/my_chain'); result = remote_chain.invoke(input). Streaming client: for chunk in remote_chain.stream(input): print(chunk). Config support: remote_chain.invoke(input, config={'tags': ['prod']}). Auth: add FastAPI dependencies for API key validation. CORS: app.add_middleware(CORSMiddleware, allow_origins=['*']). Docker: FROM python:3.11; pip install langserve; CMD uvicorn main:app. Playground at http://localhost:8000/my_chain/playground for testing. Essential for deploying LangChain apps with zero API code.
Serialization for LCEL chains requires Serializable base class. Import: from langchain_core.load import dumpd, dumps, load, loads. Serialize to dict: chain_dict = dumpd(chain). Serialize to JSON string: chain_json = dumps(chain, pretty=True). Deserialize: chain = loads(chain_json) or load(file_path). Security: API keys and secrets excluded automatically via lc_secrets attribute. Load secrets: loads(chain_json, secrets_map={'openai_api_key': 'sk-...'}). Custom serializable: class MyRunnable(RunnableSerializable): is_lc_serializable = True; @property; def lc_secrets(self) -> dict: return {'api_key': 'API_KEY'}. Limitations: not all chains serializable by default - must explicitly opt-in. ConversationalRetrievalChain not serializable (use LCEL alternative). State: serialization excludes runtime state like message history. Use case: save chain config to version control, deploy same chain across environments, share chain templates. Alternative: save chain construction code, not serialized objects. Production: prefer code-based chain definitions over serialization for maintainability.
LCEL for linear chains, LangGraph for complex agentic workflows. LCEL strengths: linear pipelines (prompt | model | parser), simple RAG chains, stateless request-response, fast prototyping, DAG-only workflows (no loops). LangGraph strengths: multi-turn conversations requiring state, agentic loops with tools, branching logic with cycles, multi-agent systems, human-in-the-loop workflows, complex state management with rollbacks. Architecture: LCEL is Directed Acyclic Graph (one-way, no loops), LangGraph is full graph (supports cycles). State: LCEL requires manual state via RunnablePassthrough, LangGraph has centralized state with history. Example LCEL use case: translate text, classify documents, simple Q&A bot. Example LangGraph use case: research agent that iterates on tasks, collaborative agents, long-running workflows needing checkpoints. Migration path: start with LCEL, migrate to LangGraph when loops/state needed. Complementary: use both - LCEL for subchains inside LangGraph nodes. Performance: LCEL faster for simple chains due to less overhead. Use LCEL if: single LLM call or prompt-model-parser pattern. Use LangGraph if: 'while True' loop in your logic.
Prompt composition with from_messages() and MessagesPlaceholder. Basic: from langchain_core.prompts import ChatPromptTemplate; prompt = ChatPromptTemplate.from_messages([('system', 'You are {role}'), ('human', '{input}')]). Multiple message types: from_messages([('system', system_msg), ('human', human_msg), ('ai', ai_msg)]). Dynamic history with MessagesPlaceholder: from langchain_core.prompts import MessagesPlaceholder; prompt = ChatPromptTemplate.from_messages([('system', 'Be helpful'), MessagesPlaceholder('history'), ('human', '{question}')]). Invoke: prompt.invoke({'history': [HumanMessage('Hi'), AIMessage('Hello')], 'question': 'What?'}). Alternative placeholder syntax: ('placeholder', '{msgs}') instead of MessagesPlaceholder('msgs'). Chain prompts: partial_prompt = ChatPromptTemplate.from_template('Context: {context}'); full_prompt = partial_prompt + ChatPromptTemplate.from_template('Question: {question}'). Reuse templates: base = ChatPromptTemplate.from_messages([('system', '{instructions}')]); qa_prompt = base.partial(instructions='Answer questions'). Prompt implements Runnable: prompt | model | parser works. Essential for dynamic conversational flows and template reuse.
Multi-modal chains use vision models (GPT-4V, GPT-4o, Claude 3) for image+text. Pattern: from langchain_core.messages import HumanMessage; from langchain_openai import ChatOpenAI; model = ChatOpenAI(model='gpt-4o'); message = HumanMessage(content=[{'type': 'text', 'text': 'Describe this image'}, {'type': 'image_url', 'image_url': {'url': 'https://...'}}]); response = model.invoke([message]). Local images: import base64; def encode_image(path): with open(path, 'rb') as f: return base64.b64encode(f.read()).decode(); image_data = f'data:image/jpeg;base64,{encode_image('img.jpg')}'; use in image_url. Multi-modal RAG: retriever returns Document objects with image metadata, extract image URLs, pass to vision model with text context. Chain pattern: chain = retriever | format_images_and_text | vision_model | parser. Structured output: use with_structured_output() with vision models for JSON extraction from images. Batch images: model.batch([message1, message2]). Streaming: model.stream(message) for token-by-token. Use case: document analysis with charts, visual Q&A, receipt extraction, slide deck analysis. Supports PNG, JPEG, WebP formats. Essential for document-heavy RAG systems.
Testing LCEL chains with mocks and fixtures. Mock LLM: from langchain_core.language_models import FakeListLLM; fake_llm = FakeListLLM(responses=['Response 1', 'Response 2']); chain = prompt | fake_llm | parser; result = chain.invoke(input). Custom fake with input capture: class FakeLLMWithHistory(FakeListLLM): def init(self, responses): super().init(responses=responses); self.inputs = []; def invoke(self, input, config=None): self.inputs.append(input); return super().invoke(input, config). Unit test pattern: import pytest; def test_chain(): chain = build_chain(); result = chain.invoke({'question': 'test'}); assert 'expected' in result. Mock retrievers: fake_retriever = FakeRetriever(documents=[doc1, doc2]). VCR for real API recording: pip install pytest-recording vcrpy; @pytest.mark.vcr; def test_with_real_llm(): # first run records, subsequent runs replay. Integration tests: use LangSmith datasets for regression testing. Test streaming: assert list(chain.stream(input)) == expected_chunks. Test batch: assert chain.batch([input1, input2]) == [output1, output2]. Mock callbacks: track events with custom CallbackHandler. Essential for CI/CD without API costs.
Performance optimization via batch(), RunnableParallel, and async. Batching: results = chain.batch([input1, input2, input3], config={'max_concurrency': 5}). Batch is 3-10x faster than loop of invoke() due to API batching and connection pooling. Async batching: results = await chain.abatch(inputs). Parallel execution: parallel = RunnableParallel(task1=chain1, task2=chain2, task3=chain3); results = parallel.invoke(input). Runs truly parallel with asyncio. Async chains: use ainvoke(), astream(), abatch() for concurrent execution. Example: import asyncio; results = await asyncio.gather(*[chain.ainvoke(inp) for inp in inputs]). Map reduce: chain.map() for embarrassingly parallel tasks. Concurrency control: config={'max_concurrency': 10} limits parallel calls to prevent rate limits. Streaming for UX: chain.stream() reduces time-to-first-token even if total time same. Lazy evaluation: LCEL optimizes execution plan automatically. Cache layer: @cache_decorator or LangChain cache (InMemoryCache, SQLiteCache, RedisCache). Batch size tuning: test 10-100 for optimal throughput. Essential for production-scale applications handling concurrent users.
Caching strategies: InMemoryCache, SQLiteCache, RedisCache, GPTCache. InMemoryCache: import langchain; from langchain.cache import InMemoryCache; langchain.llm_cache = InMemoryCache(). Fast but session-only, lost on restart. SQLiteCache: from langchain.cache import SQLiteCache; langchain.llm_cache = SQLiteCache(database_path='.langchain.db'). Persistent disk-based, survives restarts. RedisCache: from langchain.cache import RedisCache; from redis import Redis; langchain.llm_cache = RedisCache(redis_=Redis(host='localhost', port=6379)). Distributed caching for multi-instance production. GPTCache: from langchain.cache import GPTCache; import gptcache; langchain.llm_cache = GPTCache(gptcache.Cache()). Semantic caching - matches similar queries, not just exact. Cache key: based on prompt + model + temperature. Exact match only (except GPTCache). Clear cache: langchain.llm_cache.clear(). Per-chain caching: chain.with_config(cache=custom_cache). Benefits: 50-90% cost reduction, <10ms response for cached, better UX. Use InMemoryCache for dev, SQLiteCache for single-instance prod, RedisCache for multi-instance, GPTCache for semantic similarity. Essential for production cost optimization.
LCEL is official LangChain approach, decorators are third-party library. LCEL: uses pipe operator (chain = prompt | model | parser), declarative syntax, built-in streaming/batching/async, automatic schema validation, official LangChain support, recommended for v0.2+. LangChain decorators: third-party library (not official), uses @llm_prompt decorator to turn functions into chains, Pythonic multiline prompts, syntactic sugar for readability. Example decorator: from langchain_decorators import llm_prompt; @llm_prompt; def summarize(text: str) -> str: '''Summarize: {text}'''. Example LCEL: prompt = PromptTemplate.from_template('Summarize: {text}'); chain = prompt | model. Key difference: LCEL is composition-based (composing Runnables), decorators are function-based (decorating Python functions). Performance: LCEL has built-in optimizations (lazy evaluation, parallel execution), decorators add overhead. Compatibility: LCEL integrates with LangServe, LangSmith, LangGraph; decorators limited. Migration: decorators not maintained for v0.2+. Recommendation: use LCEL for production (official, supported, optimized), decorators only if prefer decorator syntax for prototyping. LCEL is the future of LangChain chains.
Major breaking changes in v0.2 migration released May 2024. Package restructuring: langchain now decoupled from langchain-community, both depend on langchain-core. Integration-agnostic: langchain no longer auto-instantiates specific models (OpenAI, Anthropic, etc.), must import from langchain-openai, langchain-anthropic, etc. Deprecated methods: predict_messages() → invoke(), run() → invoke(), call() → invoke(). LLMChain deprecated: use LCEL (prompt | model | parser). Memory classes deprecated: ConversationBufferMemory, ConversationSummaryMemory → use RunnableWithMessageHistory. Import changes: langchain.chat_models → langchain-openai, langchain-anthropic, etc. Tool calling: functions parameter → bind_tools() method. Output parsers: some deprecated in favor of with_structured_output(). Migration tool: langchain-cli migrate updates imports automatically. Install: pip install langchain-cli; langchain-cli migrate
Common pitfalls: type mismatches, missing config, complex debugging, state management. Type errors: pipe (|) operator requires compatible input/output types. Fix: add type hints Runnable[InputType, OutputType], use RunnableLambda to transform between types. Debug type flow: chain.get_graph().print_ascii() shows expected types. Missing verbose output: LCEL chains don't respect verbose=True. Fix: import langchain; langchain.debug = True for all chains, or use callbacks: chain.invoke(input, config={'callbacks': [StdOutCallbackHandler()]}). Debugging long chains: use astream_events(version='v2') to see all intermediate steps. State errors: LCEL is stateless by default. Fix: use RunnablePassthrough.assign() or RunnableWithMessageHistory. Silent failures: errors in parallel chains may not surface. Fix: wrap in try/except, use .with_fallbacks([fallback_chain]). Performance issues: blocking I/O in sync chains. Fix: use async variants (ainvoke, astream, abatch). Over-complexity: LCEL chains with hundreds of steps hard to debug. Fix: use LangGraph for complex workflows, keep LCEL for simple chains. Streaming not working: one component in chain doesn't support streaming. Fix: check each component supports .stream(). LangSmith integration: set LANGCHAIN_TRACING_V2=true for automatic observability. Essential debugging: debug=True, astream_events, LangSmith, get_graph().
RunnableMap (alias RunnableParallel) runs multiple runnables in parallel, returns dict. Pattern: from langchain_core.runnables import RunnableParallel; parallel = RunnableParallel(summary=summarize_chain, keywords=keyword_chain, sentiment=sentiment_chain); results = parallel.invoke(text). Returns {'summary': '...', 'keywords': [...], 'sentiment': '...'}. Dict syntax shorthand: parallel = {'summary': chain1, 'keywords': chain2} automatically creates RunnableParallel. Input distribution: same input passed to all branches. Different inputs: use RunnableLambda to transform: parallel = RunnableParallel(data=lambda x: x['data'], metadata=lambda x: x['meta']) | next_step. Combining with passthrough: {'original': RunnablePassthrough(), 'processed': processing_chain}. Nested parallel: RunnableParallel(branch1=RunnableParallel(sub1=..., sub2=...), branch2=...). Execution: truly parallel with asyncio, faster than sequential. Error handling: if one fails, entire parallel fails. Use .with_fallbacks() on individual branches. Batch parallel: parallel.batch([input1, input2]) batches each branch. Essential for multi-aspect processing (RAG + classification + summarization simultaneously).
MessagesPlaceholder injects variable-length message lists into prompts. Import: from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder. Basic pattern: prompt = ChatPromptTemplate.from_messages([('system', 'You are helpful'), MessagesPlaceholder('history'), ('human', '{question}')]). Invoke with history: from langchain_core.messages import HumanMessage, AIMessage; result = prompt.invoke({'history': [HumanMessage('Hi'), AIMessage('Hello')], 'question': 'What is LangChain?'}). Variable-length: history can be empty list [] or 100 messages, placeholder adapts. Multiple placeholders: ChatPromptTemplate.from_messages([MessagesPlaceholder('context'), MessagesPlaceholder('history'), ('human', '{input}')]). Optional placeholder: MessagesPlaceholder('history', optional=True) allows invoke without history key. Shorthand syntax: ('placeholder', '{history}') equivalent to MessagesPlaceholder('history'). Limit messages: MessagesPlaceholder('history', n_messages=10) takes last 10 only. Chain with memory: use RunnableWithMessageHistory for automatic history management. Essential for chatbots needing dynamic conversation context.
configurable_alternatives() enables runtime component swapping. Pattern: from langchain_core.runnables import ConfigurableField; model = ChatOpenAI(model='gpt-4').configurable_alternatives(ConfigurableField(id='llm'), default_key='openai', anthropic=ChatAnthropic(model='claude-3-sonnet'), google=ChatGoogleGenerativeAI(model='gemini-pro')). Invoke with selection: result = chain.invoke(input, config={'configurable': {'llm': 'anthropic'}}). Default: uses 'openai' if no config specified. Multiple configurables: chain.configurable_fields(temperature=ConfigurableField(id='temp')).configurable_alternatives(...). Chain-level: (prompt | model.configurable_alternatives(...) | parser).invoke(input, config={'configurable': {'llm': 'google'}}). Use cases: A/B testing models, user-selected models in UI, fallback models, cost optimization (cheap model for simple queries). Retriever alternatives: retriever.configurable_alternatives(ConfigurableField(id='search'), vector=vector_retriever, keyword=bm25_retriever). Prompt alternatives: similar pattern for dynamic prompt switching. Config inheritance: alternatives inherit config from parent chain. Production pattern: define alternatives once, select at request time based on user tier, query complexity, or feature flags. Essential for flexible multi-model deployments.
Both parse LLM output to structured data, PydanticOutputParser adds validation. JsonOutputParser: from langchain_core.output_parsers import JsonOutputParser; parser = JsonOutputParser(); chain = prompt | model | parser. Extracts JSON from text, returns dict. No validation. Use when: need simple JSON extraction without schema. PydanticOutputParser: from langchain_core.output_parsers import PydanticOutputParser; from pydantic import BaseModel, Field; class Person(BaseModel): name: str = Field(description='Person name'); age: int; parser = PydanticOutputParser(pydantic_object=Person); chain = prompt | model | parser. Returns Person instance, validates types automatically. Use when: need type safety, validation, IDE autocomplete. Prompt formatting: parser.get_format_instructions() returns schema description for prompt. Example: prompt = PromptTemplate(template='Extract person.\n{format_instructions}\nText: {text}', partial_variables={'format_instructions': parser.get_format_instructions()}). Error handling: PydanticOutputParser raises ValidationError on schema mismatch, JsonOutputParser raises on invalid JSON. Alternative v0.2+: model.with_structured_output(Person) - cleaner, uses function calling. Recommendation: use with_structured_output() for v0.2+, parsers for legacy or non-function-calling models. Essential for reliable structured extraction.
RunnableLambda wraps Python functions to make them chainable with LCEL. Basic: from langchain_core.runnables import RunnableLambda; def process(x): return x.upper(); chain = prompt | model | RunnableLambda(process). Decorator syntax: @RunnableLambda; def process(x): return x * 2; chain = input_step | process | output_step. Multiple inputs: def merge(inputs): return inputs['a'] + inputs['b']; merger = RunnableLambda(merge). Async functions: async def async_process(x): await asyncio.sleep(1); return x; chain = prompt | RunnableLambda(async_process). Context access: def with_config(x, config): return f'{x} - {config.get('tags')}'; chain = RunnableLambda(with_config). Limitations: RunnableLambda doesn't preserve streaming - use RunnableGenerator for streaming. Not serializable - for serialization, create custom Runnable subclass. Use cases: data formatting between chain steps, conditional logic, external API calls, custom preprocessing. Simple alternative to full Runnable class. Compose with other runnables: RunnableLambda(func1) | RunnableLambda(func2) | model. Essential for quick custom logic in LCEL chains.
Partial variables pre-fill some prompt variables while leaving others dynamic. Two methods: partial() and partial_variables. Using partial(): prompt = PromptTemplate.from_template('Tell me about {topic} in {language}'); partial_prompt = prompt.partial(language='English'); result = partial_prompt.invoke({'topic': 'AI'}). Using partial_variables at creation: prompt = PromptTemplate(template='Today is {date}. Question: {question}', input_variables=['question'], partial_variables={'date': lambda: datetime.now().strftime('%Y-%m-%d')}). Function-based partials: def get_current_time(): return datetime.now().strftime('%H:%M'); prompt = ChatPromptTemplate.from_messages([('system', 'Current time: {time}'), ('human', '{input}')]).partial(time=get_current_time). Use case: inject context available early (user ID, timestamp) while deferring user input. Multi-stage chains: stage1_prompt = base_prompt.partial(context=retrieved_docs); stage2_result = stage1_prompt.invoke({'question': user_question}). ChatPromptTemplate: chat_prompt.partial(system_context='You are helpful', user_name='Alice'). Essential for reusable prompts with some fixed context and some runtime variables. Reduces prompt passing through intermediate chain steps.
LCEL provides multiple streaming mechanisms: stream(), astream(), astream_log(), astream_events(). Basic streaming: for chunk in chain.stream(input): print(chunk, end='', flush=True). Streams final output incrementally. Async streaming: async for chunk in chain.astream(input): process(chunk). Intermediate results with astream_log() (deprecated v0.2): async for log in chain.astream_log(input): if log['type'] == 'llm': print(log['content']). Recommended v0.2+: astream_events(version='v2'): async for event in chain.astream_events(input, version='v2'): if event['event'] == 'on_chain_end': print(event['data']['output']). Filter intermediate: if event['name'] == 'Retriever': docs = event['data']['output']. Stream from specific components: use include_names=['ChatOpenAI'] or include_types=['llm']. Preserve streaming through chain: all components must support streaming (model.stream(), parser.stream()). RunnableLambda breaks streaming - use RunnableGenerator instead. Streaming to client: LangServe automatically exposes /stream endpoint for deployed chains. Essential for real-time UIs, progressive rendering, reduced perceived latency. Streaming doesn't reduce total time but improves UX significantly.
LCEL provides async methods for concurrent processing: ainvoke(), abatch(), astream(). Async single call: result = await chain.ainvoke(input). Concurrent multiple calls: import asyncio; results = await asyncio.gather(*[chain.ainvoke(inp) for inp in inputs]). Batch method: results = chain.batch([input1, input2, input3]). Automatically parallelizes internally, 3-10x faster than loop. Async batch: results = await chain.abatch(inputs). Max concurrency: results = await chain.abatch(inputs, config={'max_concurrency': 5}). Limits parallel API calls to avoid rate limits. Parallel chains: parallel = RunnableParallel(task1=chain1, task2=chain2); results = await parallel.ainvoke(input). Runs tasks truly concurrent with asyncio. Performance: sync batch uses threadpool, async batch uses asyncio - async generally faster for I/O bound (API calls). Error handling: await asyncio.gather(*tasks, return_exceptions=True) to handle errors without stopping all tasks. Same chain code: chain works in sync (invoke), async (ainvoke), batch, streaming without modifications. Production pattern: use async for web servers (FastAPI, Flask async), sync for scripts. Essential for high-throughput applications and optimal API utilization.
LCEL composition operators: pipe (|), RunnableSequence, RunnableParallel, RunnableBranch, RunnablePassthrough. Pipe operator (|): most common, chains components sequentially. chain = prompt | model | parser. Creates RunnableSequence automatically. Dict shorthand for parallel: parallel = {'summary': chain1, 'keywords': chain2} equivalent to RunnableParallel(summary=chain1, keywords=chain2). List shorthand: [retriever1, retriever2] for multiple retrievers (less common). Passthrough shortcuts: {'original': RunnablePassthrough(), 'processed': chain} preserves input while processing. Assign shortcut: RunnablePassthrough.assign(new_field=transformation) adds fields to dict. Lambda shortcut: instead of RunnableLambda class, use decorator: @RunnableLambda; def func(x): return x. Branching: RunnableBranch((condition, chain), default) for conditional routing. Composition nesting: (prompt | model).with_retry() | parser.with_fallbacks([fallback_parser]). Method chaining: chain.with_config().with_retry().with_fallbacks(). Type inference: LCEL auto-infers input/output types through chain. Essential shortcuts: use | for sequence, {} for parallel, RunnablePassthrough.assign() for adding fields, RunnableBranch for routing. Keep chains readable - overly nested operators hurt maintainability.
Production LCEL deployment best practices: use LangServe, enable observability, implement retries/fallbacks, optimize performance. LangServe deployment: from langserve import add_routes; add_routes(app, chain, path='/chain'). Auto-creates REST API with streaming support. Observability: set LANGCHAIN_TRACING_V2=true, LANGCHAIN_API_KEY for LangSmith tracing. Track costs, latency, errors in dashboard. Retries: chain.with_retry(stop_after_attempt=3, wait_exponential_multiplier=1). Prevents transient failures. Fallbacks: primary_chain.with_fallbacks([cheaper_model_chain, cached_chain]). Graceful degradation. Caching: use RedisCache for distributed systems: langchain.llm_cache = RedisCache(). Rate limiting: config={'max_concurrency': 10} prevents API throttling. Error handling: wrap in try/except, return user-friendly errors, log to monitoring. Async for web servers: use FastAPI with async def endpoints, call await chain.ainvoke(). Monitoring: track p95 latency, error rates, token usage. Use Prometheus + Grafana. Testing: unit tests with FakeListLLM, integration tests with VCR recording. Avoid mixing legacy chains and LCEL - complicates debugging. For complex workflows (loops, branching), prefer LangGraph over LCEL. Essential: tracing, retries, fallbacks, caching, async, monitoring.