langchain_lcel_fundamentals 50 Q&As

LangChain Lcel Fundamentals FAQ & Answers

50 expert LangChain Lcel Fundamentals answers researched from official documentation. Every answer cites authoritative sources you can verify.

unknown

50 questions
A

LCEL (LangChain Expression Language) uses | pipe operator for chaining. Basic pattern: chain = prompt | model | parser. Example: from langchain_core.prompts import ChatPromptTemplate; from langchain_openai import ChatOpenAI; from langchain_core.output_parsers import StrOutputParser; prompt = ChatPromptTemplate.from_template('Tell me a joke about {topic}'); model = ChatOpenAI(model='gpt-4'); parser = StrOutputParser(); chain = prompt | model | parser; result = chain.invoke({'topic': 'programming'}). Each component must implement Runnable interface. Pipes automatically handle input/output types - prompt outputs messages, model outputs AIMessage, parser extracts string. Async version: await chain.ainvoke(). Advantages over LLMChain (deprecated v0.2): type safety, streaming support, better parallelization. Use .with_config() for runtime options like callbacks, tags, metadata.

99% confidence
A

Runnable is the base interface for all LCEL components. Required methods: invoke(input), stream(input), batch(inputs), async variants (ainvoke, astream, abatch). Create custom: from langchain_core.runnables import RunnableLambda; def my_func(x): return x.upper(); runnable = RunnableLambda(my_func); result = runnable.invoke('hello'). For class-based: from langchain_core.runnables import Runnable; class MyRunnable(Runnable): def invoke(self, input, config=None): return self.process(input); def process(self, x): return x * 2. Runnables composable via | operator. Use RunnablePassthrough for identity, RunnableBranch for conditionals. Config parameter passes runtime options (callbacks, recursion_limit). Type hints: Runnable[InputType, OutputType]. Streaming: yield chunks in stream() method. Essential for custom LangChain components in v0.2+.

99% confidence
A

LCEL streaming uses .stream() method instead of .invoke(). Pattern: for chunk in chain.stream(input): print(chunk, end='', flush=True). Async: async for chunk in chain.astream(input). Streaming works through entire chain - if model streams tokens, parser receives them incrementally. Example with ChatOpenAI: from langchain_openai import ChatOpenAI; from langchain_core.prompts import ChatPromptTemplate; from langchain_core.output_parsers import StrOutputParser; prompt = ChatPromptTemplate.from_template('{question}'); model = ChatOpenAI(streaming=True); parser = StrOutputParser(); chain = prompt | model | parser; for token in chain.stream({'question': 'Explain LCEL'}): print(token, end=''). Streaming intermediate steps: use .astream_events() (v0.2+) to see all chain events. Custom streaming: yield in RunnableLambda. Streaming with tools: use stream_mode='values' in astream_events(). Essential for real-time UIs and reducing perceived latency.

99% confidence
A

RunnablePassthrough passes input through unchanged or adds fields. Two modes: identity (passthrough) and assignment (add keys). Identity: from langchain_core.runnables import RunnablePassthrough; chain = RunnablePassthrough() | some_function. Assignment pattern: RunnablePassthrough.assign(new_key=lambda x: process(x)). Example combining both: from langchain_core.prompts import ChatPromptTemplate; from langchain_core.runnables import RunnablePassthrough; prompt = ChatPromptTemplate.from_template('Context: {context}\n\nQuestion: {question}'); chain = {'context': retriever, 'question': RunnablePassthrough()} | prompt | model. Common pattern - preserve original input while adding fields: chain = RunnablePassthrough.assign(embedding=embedder, metadata=lambda x: get_meta(x)) | downstream. Access original: use itemgetter from operator for dict keys. Use case: RAG chains where you need both retrieved context and original question. Replaces RunnableMap pattern from v0.1.

99% confidence
A

bind_tools() attaches tools to models supporting function calling. Pattern: model_with_tools = model.bind_tools([tool1, tool2]); response = model_with_tools.invoke('use the calculator'). Tool definition: from langchain_core.tools import tool; @tool def multiply(a: int, b: int) -> int: '''Multiply two numbers'''; return a * b. Models return AIMessage with tool_calls attribute. Extract: if response.tool_calls: for call in response.tool_calls: tool_name = call['name']; args = call['args']. Execute tools: from langchain_core.messages import ToolMessage; tool_output = multiply.invoke(args); tool_msg = ToolMessage(content=str(tool_output), tool_call_id=call['id']). Complete agentic loop: while True: response = model.invoke(messages); if not response.tool_calls: break; execute tools, append results to messages. Replaces deprecated functions parameter (pre-v0.2). Use with_structured_output() for Pydantic schema binding.

99% confidence
A

Use RunnableParallel or dict syntax for parallel execution. Dict syntax: from langchain_core.runnables import RunnableParallel; parallel = {'summary': chain1, 'keywords': chain2, 'sentiment': chain3}; results = parallel.invoke(input). Returns dict with all results. Explicit RunnableParallel: parallel = RunnableParallel(summary=chain1, keywords=chain2); same behavior. Parallel with passthrough: {'original': RunnablePassthrough(), 'processed': processing_chain}. Execution: truly parallel with asyncio - use .batch() or .ainvoke() for concurrency. Example RAG with multiple retrievers: retrievers = RunnableParallel(wiki=wiki_retriever, docs=doc_retriever); combined = retrievers | merge_contexts | prompt | model. Error handling: if one branch fails, entire parallel call fails (use RunnableBranch for conditional). Max concurrency: config={'max_concurrency': 5}. Memory efficient - results streamed as available.

99% confidence
A

Use .with_retry() for automatic retries. Pattern: chain_with_retry = chain.with_retry(stop_after_attempt=3, wait_exponential_multiplier=1). Requires tenacity library: pip install tenacity. Custom retry logic: from tenacity import retry, stop_after_attempt, wait_fixed; @retry(stop=stop_after_attempt(3), wait=wait_fixed(2)); def invoke_with_retry(chain, input): return chain.invoke(input). Error handling with try-except: from langchain_core.runnables import RunnableLambda; def safe_invoke(x): try: return risky_chain.invoke(x); except Exception as e: return {'error': str(e)}; safe_chain = RunnableLambda(safe_invoke). Fallback chains: chain_with_fallback = primary_chain.with_fallbacks([fallback_chain1, fallback_chain2]). Config-level retries: chain.invoke(input, config={'max_retries': 3}). Timeout: chain.invoke(input, config={'timeout': 30}). Use callbacks for error logging: chain.with_config(callbacks=[error_logger]).

99% confidence
A

Three core Runnable methods with distinct use cases. invoke(input): synchronous single execution, returns complete result. stream(input): returns iterator yielding incremental results (tokens, chunks). batch(inputs): processes multiple inputs efficiently, returns list of results. Example: model.invoke('hi') returns full response. model.stream('hi') yields tokens one by one. model.batch(['hi', 'hello', 'hey']) returns [response1, response2, response3] with internal batching optimization. Async variants: ainvoke, astream, abatch for concurrent execution. Performance: batch() ~3-10x faster than loop of invoke() calls (API batching, connection pooling). Streaming essential for UIs - reduces time-to-first-token. Memory: stream uses O(1), invoke/batch use O(n). Config applies to all: model.with_config(temperature=0.7).invoke(). Chain methods: (prompt | model).batch([input1, input2]) batches through entire chain. Use abatch with asyncio.gather for max throughput.

99% confidence
A

RunnableBranch routes inputs based on conditions. Pattern: from langchain_core.runnables import RunnableBranch; branch = RunnableBranch((condition1, chain1), (condition2, chain2), default_chain). Conditions are functions taking input, returning bool. Example: def is_question(x): return '?' in x; branch = RunnableBranch((is_question, qa_chain), (lambda x: 'help' in x.lower(), help_chain), general_chain). Advanced pattern with structured input: branch = RunnableBranch((lambda x: x['type'] == 'code', code_chain), (lambda x: x['type'] == 'text', text_chain), error_chain); result = branch.invoke({'type': 'code', 'content': '...'}). Lazy evaluation - only matched chain executes. Combine with RunnablePassthrough to preserve input: {'original': RunnablePassthrough(), 'routed': branch}. Use case: multi-intent routing, error handling, language detection. Alternative: nested if-else with RunnableLambda for simple cases.

99% confidence
A

Use set_debug(True) and callbacks for visibility. Basic debug: import langchain; langchain.debug = True; then run chain - prints all intermediate steps. Programmatic: from langchain.globals import set_debug, set_verbose; set_debug(True); set_verbose(True). Callbacks for custom logging: from langchain.callbacks import StdOutCallbackHandler; chain.invoke(input, config={'callbacks': [StdOutCallbackHandler()]}). Stream events for detailed tracing: async for event in chain.astream_events(input, version='v2'): print(event). Event types: on_chain_start, on_llm_start, on_tool_start, on_chain_end. LangSmith integration (production): export LANGCHAIN_TRACING_V2=true; export LANGCHAIN_API_KEY=...; automatic tracing to LangSmith dashboard. Print intermediate: chain = step1 | RunnableLambda(lambda x: print(f'After step1: {x}') or x) | step2. Type errors: check Runnable[InputType, OutputType] signatures. Use .get_graph() to visualize: chain.get_graph().print_ascii().

99% confidence
A

RAG pattern with LCEL: retriever | prompt | model | parser. Setup: from langchain_community.vectorstores import FAISS; from langchain_openai import OpenAIEmbeddings, ChatOpenAI; from langchain_core.prompts import ChatPromptTemplate; from langchain_core.runnables import RunnablePassthrough; from langchain_core.output_parsers import StrOutputParser. Vectorstore: vectorstore = FAISS.from_texts(['doc1', 'doc2'], OpenAIEmbeddings()). Retriever: retriever = vectorstore.as_retriever(search_kwargs={'k': 4}). Chain: prompt = ChatPromptTemplate.from_template('Context: {context}\n\nQuestion: {question}\n\nAnswer:'); chain = {'context': retriever | format_docs, 'question': RunnablePassthrough()} | prompt | ChatOpenAI() | StrOutputParser(). Helper: def format_docs(docs): return '\n\n'.join([d.page_content for d in docs]). Invoke: answer = chain.invoke('What is...?'). Advanced: add retrieval with sources: chain = RunnablePassthrough.assign(context=retriever | format_docs) | prompt | model. Returns both context and answer.

99% confidence
A

with_structured_output() forces models to return Pydantic objects. Pattern: from pydantic import BaseModel, Field; from langchain_openai import ChatOpenAI; class Person(BaseModel): name: str = Field(description='Person name'); age: int = Field(description='Person age'); model = ChatOpenAI(model='gpt-4'); structured_model = model.with_structured_output(Person); result = structured_model.invoke('John is 25 years old'); print(result.name, result.age). Works with OpenAI, Anthropic, Google models supporting function calling. Chain example: chain = prompt | model.with_structured_output(ResponseSchema) | processing_step. Validation: Pydantic validates automatically, raises ValidationError on schema mismatch. Optional fields: use Optional[str] or default values. Lists: class People(BaseModel): people: List[Person]. Replaces deprecated OutputFixingParser and PydanticOutputParser (v0.1). Use method='json_mode' parameter for models without function calling. Essential for type-safe agent outputs.

99% confidence
A

Messages are core to LCEL chat chains. Types: HumanMessage (user), AIMessage (model), SystemMessage (instructions), ToolMessage (tool results). Import: from langchain_core.messages import HumanMessage, AIMessage, SystemMessage, ToolMessage. Usage in chains: messages = [SystemMessage(content='You are helpful'), HumanMessage(content='Hi')]; response = model.invoke(messages). AIMessage has tool_calls attribute when model uses tools. ToolMessage links back: tool_msg = ToolMessage(content=str(result), tool_call_id=call['id'], name='tool_name'). Message history: messages.append(response); messages.append(tool_msg). Convert strings: from langchain_core.messages import convert_to_messages; msgs = convert_to_messages(['user: hi', 'assistant: hello']). Access content: msg.content, msg.additional_kwargs, msg.response_metadata. ChatPromptTemplate converts to messages automatically. State management: pass message list through chain with RunnablePassthrough. Critical for agentic loops and multi-turn conversations.

99% confidence
A

ChatMessageHistory stores conversation state for LCEL chains. Pattern: from langchain_core.chat_history import InMemoryChatMessageHistory; from langchain_core.runnables.history import RunnableWithMessageHistory; store = {}; def get_session_history(session_id: str): if session_id not in store: store[session_id] = InMemoryChatMessageHistory(); return store[session_id]; chain_with_history = RunnableWithMessageHistory(chain, get_session_history, input_messages_key='input', history_messages_key='history'); response = chain_with_history.invoke({'input': 'Hi, I am Bob'}, config={'configurable': {'session_id': 'user123'}}). Persistent storage: from langchain_community.chat_message_histories import RedisChatMessageHistory or FileChatMessageHistory. Prompt must include history: ChatPromptTemplate.from_messages([('system', 'You are helpful'), MessagesPlaceholder('history'), ('human', '{input}')]). Trimming: use trim_messages for token limits. State access: history.messages returns list. Clear: history.clear(). Replaces deprecated ConversationBufferMemory (v0.1). Essential for stateful chatbots.

99% confidence
A

Tool errors must be caught and returned as ToolMessage. Pattern: from langchain_core.messages import ToolMessage; try: result = tool.invoke(args); tool_msg = ToolMessage(content=str(result), tool_call_id=call_id); except Exception as e: tool_msg = ToolMessage(content=f'Error: {str(e)}', tool_call_id=call_id, additional_kwargs={'error': True}). Model receives error and can retry or adjust. Validation before invocation: if 'required_param' not in args: return ToolMessage(content='Missing required_param', tool_call_id=call_id). Timeout handling: use asyncio.wait_for for async tools: try: result = await asyncio.wait_for(tool.ainvoke(args), timeout=5.0); except asyncio.TimeoutError: return ToolMessage(content='Tool timeout', tool_call_id=call_id). Loop max iterations: for i in range(max_iterations): if i == max_iterations - 1: break loop, return partial result. Use RunnableBranch to route error messages to fallback chain. Critical: always return ToolMessage even on error - breaking message sequence corrupts conversation state.

99% confidence
A

Use config parameter for runtime control. Timeout: chain.invoke(input, config={'timeout': 30}). Raises TimeoutError after 30 seconds. Concurrency for batch: chain.batch(inputs, config={'max_concurrency': 5}). Limits parallel executions to 5. Recursion limit: chain.invoke(input, config={'recursion_limit': 20}). Prevents infinite loops in recursive chains. Multiple configs: config={'timeout': 30, 'max_concurrency': 3, 'recursion_limit': 15}. Per-runnable config: chain_with_timeout = chain.with_config(timeout=10). Async timeout: import asyncio; try: result = await asyncio.wait_for(chain.ainvoke(input), timeout=5.0); except asyncio.TimeoutError: handle_timeout(). RunnableConfig type: from langchain_core.runnables import RunnableConfig; config = RunnableConfig(timeout=30, max_concurrency=5). Apply to parallel chains: parallel_chain.invoke(input, config={'max_concurrency': 10}). Essential for production: prevents hanging chains, controls resource usage, enforces SLAs.

99% confidence
A

Use callbacks for cost tracking. Pattern: from langchain.callbacks import get_openai_callback; with get_openai_callback() as cb: result = chain.invoke(input); print(f'Tokens: {cb.total_tokens}, Cost: ${cb.total_cost}'). Works with OpenAI models. Custom callback: from langchain.callbacks.base import BaseCallbackHandler; class CostTracker(BaseCallbackHandler): def init(self): self.total_tokens = 0; def on_llm_end(self, response, **kwargs): self.total_tokens += response.llm_output.get('token_usage', {}).get('total_tokens', 0). Use: tracker = CostTracker(); chain.invoke(input, config={'callbacks': [tracker]}). For streaming: callback captures tokens as they arrive. Multi-model chains: separate callbacks per model. Token counting without API: from langchain.callbacks import get_openai_callback returns prompt_tokens, completion_tokens, total_tokens. Budget enforcement: raise exception if cb.total_cost > budget. LangSmith integration: automatic cost tracking in dashboard. Essential for production cost monitoring and optimization.

99% confidence
A

LangChain supports local models via Ollama and LlamaCpp. Ollama: from langchain_community.llms import Ollama; model = Ollama(model='llama2', base_url='http://localhost:11434'); chain = prompt | model | parser; result = chain.invoke(input). ChatOllama for chat: from langchain_community.chat_models import ChatOllama; model = ChatOllama(model='llama2'). LM Studio: from langchain_community.llms import LlamaCpp; model = LlamaCpp(model_path='/path/to/model.gguf', n_ctx=2048, n_threads=4); chain = prompt | model. Streaming: both support .stream() method. Tool calling: use ChatOllama with models supporting function calling (llama3.1+). Config: temperature, top_p via model parameters. Performance: n_threads for CPU, n_gpu_layers for GPU acceleration. GGUF format recommended. Callback: track token/second with callbacks. Prompt format: some models need specific templates - check model docs. Use .bind() for model-specific parameters. No API keys needed. Essential for privacy-sensitive applications and offline deployment.

99% confidence
A

Custom parsers transform model output. Pattern: from langchain_core.output_parsers import BaseOutputParser; class MyParser(BaseOutputParser[dict]): def parse(self, text: str) -> dict: import json; return json.loads(text.strip()). Use: chain = prompt | model | MyParser(); result = chain.invoke(input). JSON extraction with error handling: class JsonParser(BaseOutputParser[dict]): def parse(self, text: str) -> dict: try: return json.loads(text); except: import re; match = re.search(r'{.*}', text, re.DOTALL); return json.loads(match.group()) if match else {}. Pydantic-based: from pydantic import BaseModel; class Output(BaseModel): answer: str; class PydanticParser(BaseOutputParser[Output]): def parse(self, text: str) -> Output: return Output(answer=text.strip()). Streaming parser: implement parse_iter for token-by-token processing. Use RunnableLambda for simple parsing: parser = RunnableLambda(lambda x: x.content.upper()). Composable: chain = prompt | model | parser1 | parser2. Essential for structured extraction and validation.

99% confidence
A

State management uses RunnablePassthrough.assign() pattern. Basic state: from langchain_core.runnables import RunnablePassthrough; chain = RunnablePassthrough.assign(count=lambda x: x.get('count', 0) + 1) | next_step. Accumulator pattern: chain = RunnablePassthrough.assign(results=lambda x: x.get('results', []) + [x['current']]). Multi-step state: chain = RunnablePassthrough.assign(step1_result=step1) | RunnablePassthrough.assign(step2_result=step2). Access previous state: each step receives full state dict. Conditional state updates: use RunnableBranch to update state based on conditions. Message state for chat: messages = []; def add_message(x): messages.append(x['msg']); return {'messages': messages}; chain = RunnableLambda(add_message) | next_step. Persistent state: external store (Redis, DynamoDB) with getter/setter wrapped in RunnableLambda. Type-safe state: from typing import TypedDict; class State(TypedDict): count: int; results: List[str]. Essential for multi-turn agents and stateful workflows.

99% confidence
A

Agentic loop pattern: bind tools, invoke, execute tools, repeat. Setup: from langchain_core.messages import HumanMessage, AIMessage, ToolMessage; from langchain_core.tools import tool; @tool def search(query: str): return f'Results for {query}'; model_with_tools = model.bind_tools([search]). Loop: messages = [HumanMessage(content='Find info on X')]; for i in range(max_iterations): response = model_with_tools.invoke(messages); messages.append(response); if not response.tool_calls: break; for call in response.tool_calls: result = search.invoke(call['args']); messages.append(ToolMessage(content=str(result), tool_call_id=call['id'])). LCEL chain version: use while loop with RunnablePassthrough for state. Error handling: wrap tool execution in try/except, return ToolMessage with error. Max iterations: prevent infinite loops. Early exit: check for specific conditions in response. State tracking: track iteration count, tool usage, costs. Alternative: use LangGraph for complex agent graphs. Essential for autonomous agents that use tools iteratively.

99% confidence
A

RunnableConfig controls runtime behavior. Import: from langchain_core.runnables import RunnableConfig. Create: config = RunnableConfig(tags=['production'], metadata={'user_id': '123'}, callbacks=[my_callback], timeout=30, max_concurrency=5, recursion_limit=20). Apply: chain.invoke(input, config=config). Tags for filtering: config={'tags': ['api', 'v2']} - useful in LangSmith. Metadata: arbitrary dict for tracking: config={'metadata': {'session_id': 'abc', 'version': '1.0'}}. Callbacks: list of callback handlers: config={'callbacks': [ConsoleCallbackHandler(), CustomLogger()]}. Configurable fields: chain.with_config(configurable={'llm_model': 'gpt-4'}) then select at runtime: chain.invoke(input, config={'configurable': {'llm_model': 'gpt-3.5-turbo'}}). Merge configs: config1.merge(config2). Access in custom runnables: def invoke(self, input, config: RunnableConfig): timeout = config.get('timeout', 30). Essential for multi-tenant apps, A/B testing, feature flags.

99% confidence
A

operator.itemgetter extracts dict keys for LCEL chains. Import: from operator import itemgetter, attrgetter. Basic: from langchain_core.runnables import RunnablePassthrough; chain = {'question': itemgetter('q'), 'context': retriever} | prompt | model. Extracts 'q' key from input dict. Multiple keys: itemgetter('key1', 'key2') returns tuple. Nested: itemgetter('user', 'preferences', 'language') - but limited, better use lambda. Attrgetter for objects: attrgetter('user.name') extracts attribute. Combine with RunnablePassthrough: chain = RunnablePassthrough.assign(formatted=itemgetter('raw_data') | format_function). Common pattern - extract specific fields: {'context': itemgetter('docs') | format_docs, 'question': itemgetter('query')} | prompt. Alternative: lambda x: x['key'] - more flexible but less readable. Use case: transform input shape to match prompt template requirements. Cleaner than custom RunnableLambda for simple key extraction.

99% confidence
A

Multiple retrievers with RunnableParallel or ensemble. Parallel: from langchain_core.runnables import RunnableParallel; retrievers = RunnableParallel(wiki=wiki_retriever, docs=doc_retriever, web=web_retriever); results = retrievers.invoke(query). Returns dict with all results. Merge: def merge_docs(results): return results['wiki'] + results['docs'] + results['web']; chain = retrievers | RunnableLambda(merge_docs) | rerank | prompt | model. Ensemble with EnsembleRetriever: from langchain.retrievers import EnsembleRetriever; ensemble = EnsembleRetriever(retrievers=[bm25_retriever, vector_retriever], weights=[0.5, 0.5]). Reciprocal Rank Fusion: combines scores automatically. Sequential fallback: primary = retriever1; fallback = retriever1.with_fallbacks([retriever2, retriever3]). Conditional retrieval: use RunnableBranch to select retriever based on query type. Reranking: chain = retriever | reranker | top_k_filter. Hybrid search: combine sparse (BM25) + dense (vector) retrievers for better recall. Essential for production RAG systems.

99% confidence
A

astream_events() streams all chain events for observability. Pattern: async for event in chain.astream_events(input, version='v2'): print(event['event'], event['name'], event.get('data')). Event types: on_chain_start, on_chain_end, on_llm_start, on_llm_end, on_tool_start, on_tool_end, on_retriever_start, on_retriever_end. Filter by event type: if event['event'] == 'on_llm_start': log_prompt(event['data']['input']). Access intermediate outputs: on_chain_end events contain 'output' key. Track tokens: on_llm_end has 'output' with token_usage. Version: must specify version='v2' (newer event format). Include types: astream_events(input, version='v2', include_types=['llm', 'tool']). Exclude: exclude_types=['retriever']. Streaming chunks: on_llm_stream events for token-by-token. Tags filter: include_tags=['critical'], exclude_tags=['debug']. Production use: send events to logging/monitoring system. Debug: inspect full chain execution flow. Better than callbacks for async streaming observability. Essential for complex chain debugging.

99% confidence
A

RunnableSequence represents chained runnables where output of each is input to next. Created via pipe operator: chain = prompt | model | parser becomes RunnableSequence. Direct instantiation: from langchain_core.runnables import RunnableSequence; seq = RunnableSequence(first=prompt, middle=[transform1, transform2], last=parser). Access components: seq.first returns first runnable, seq.middle returns list of middle runnables, seq.last returns last runnable. Inspect: use seq.get_graph() to visualize chain structure. Modify: cannot modify after creation - create new sequence. Common use: debugging complex chains to identify bottlenecks. Example: seq.first.invoke(input) tests first step only. Streaming: seq preserves streaming if all components support it. Batch: automatically parallelizes across inputs. Type checking: RunnableSequence[InputType, OutputType] for type safety. Most important composition operator in LangChain - used in virtually every chain. Auto-created by | operator, rarely instantiated directly.

99% confidence
A

Dynamic chains built by composing runnables conditionally. Pattern: def build_chain(use_history: bool): base = prompt | model | parser; return RunnableWithMessageHistory(base, get_history) if use_history else base. Conditional routing with RunnableBranch: branch = RunnableBranch((lambda x: x['type'] == 'sql', sql_chain), (lambda x: x['type'] == 'api', api_chain), default_chain). Dynamic tools: model_with_tools = model.bind_tools(select_tools(user_permissions)). Config-based selection: chain.with_config(configurable={'llm': 'gpt-4'}) then invoke with config={'configurable': {'llm': 'gpt-3.5-turbo'}}. Runtime prompt selection: prompts = {'formal': formal_prompt, 'casual': casual_prompt}; chain = RunnableLambda(lambda x: prompts[x['style']]) | model. Parallel branches: RunnableParallel({k: v for k, v in branches.items() if should_include(k)}). Factory pattern: def make_rag_chain(retriever, reranker=None): chain = retriever; if reranker: chain = chain | reranker; return chain | prompt | model. Essential for multi-tenant apps and A/B testing.

99% confidence
A

get_graph() visualizes chain structure, get_prompts() extracts prompts. Graph introspection: graph = chain.get_graph(); graph.print_ascii() shows ASCII diagram of chain flow. Useful for debugging complex chains. Graph nodes: graph.nodes shows all components, graph.edges shows connections. Export: graph.to_json() for external visualization tools. Prompt extraction: prompts = chain.get_prompts(config=None) returns list of ChatPromptTemplate and PromptTemplate instances used in chain. Inspect templates: for p in prompts: print(p.messages) to see all message templates. Dynamic prompts: get_prompts(config={'configurable': {'prompt_version': 'v2'}}) gets prompts for specific config. Use case: extract prompts for LangSmith logging, audit for prompt injection risks, version control prompts separately. Limitations: get_prompts() only works with chains containing PromptTemplate/ChatPromptTemplate, not string prompts. Graph depth: shows full nested chain structure including parallel/conditional branches. Essential for production chain monitoring and debugging.

99% confidence
A

Custom chains extend Runnable base class. Pattern: from langchain_core.runnables import Runnable, RunnableConfig; class MyChain(Runnable): def invoke(self, input: dict, config: RunnableConfig = None) -> dict: # custom logic; return result; def stream(self, input: dict, config: RunnableConfig = None): yield from self._stream_impl(input); async def ainvoke(self, input: dict, config: RunnableConfig = None) -> dict: # async version. Required methods: invoke(), optional: stream(), batch(), ainvoke(), astream(), abatch(). For serialization: inherit from RunnableSerializable, implement is_lc_serializable() -> True and lc_secrets property. Type hints: class MyChain(Runnable[InputType, OutputType]). Access config: callbacks = config.get('callbacks', []); tags = config.get('tags', []); metadata = config.get('metadata', {}). Compose with LCEL: custom_chain | other_runnable works automatically. Override batch(): def batch(self, inputs, config=None): # optimized batch logic. Use RunnableLambda for simple functions: chain = RunnableLambda(my_func). Essential for reusable custom logic that integrates with LCEL ecosystem.

99% confidence
A

LangServe deploys LCEL chains as REST APIs. Setup: pip install langserve[all]. Create server: from fastapi import FastAPI; from langserve import add_routes; app = FastAPI(); add_routes(app, chain, path='/my_chain'). Endpoints auto-created: /my_chain/invoke (POST), /my_chain/batch (POST), /my_chain/stream (POST), /my_chain/stream_log (POST for intermediate steps), /my_chain/playground (GET for UI). Run: uvicorn main:app --host 0.0.0.0 --port 8000. Client: from langserve import RemoteRunnable; remote_chain = RemoteRunnable('http://localhost:8000/my_chain'); result = remote_chain.invoke(input). Streaming client: for chunk in remote_chain.stream(input): print(chunk). Config support: remote_chain.invoke(input, config={'tags': ['prod']}). Auth: add FastAPI dependencies for API key validation. CORS: app.add_middleware(CORSMiddleware, allow_origins=['*']). Docker: FROM python:3.11; pip install langserve; CMD uvicorn main:app. Playground at http://localhost:8000/my_chain/playground for testing. Essential for deploying LangChain apps with zero API code.

99% confidence
A

Serialization for LCEL chains requires Serializable base class. Import: from langchain_core.load import dumpd, dumps, load, loads. Serialize to dict: chain_dict = dumpd(chain). Serialize to JSON string: chain_json = dumps(chain, pretty=True). Deserialize: chain = loads(chain_json) or load(file_path). Security: API keys and secrets excluded automatically via lc_secrets attribute. Load secrets: loads(chain_json, secrets_map={'openai_api_key': 'sk-...'}). Custom serializable: class MyRunnable(RunnableSerializable): is_lc_serializable = True; @property; def lc_secrets(self) -> dict: return {'api_key': 'API_KEY'}. Limitations: not all chains serializable by default - must explicitly opt-in. ConversationalRetrievalChain not serializable (use LCEL alternative). State: serialization excludes runtime state like message history. Use case: save chain config to version control, deploy same chain across environments, share chain templates. Alternative: save chain construction code, not serialized objects. Production: prefer code-based chain definitions over serialization for maintainability.

99% confidence
A

LCEL for linear chains, LangGraph for complex agentic workflows. LCEL strengths: linear pipelines (prompt | model | parser), simple RAG chains, stateless request-response, fast prototyping, DAG-only workflows (no loops). LangGraph strengths: multi-turn conversations requiring state, agentic loops with tools, branching logic with cycles, multi-agent systems, human-in-the-loop workflows, complex state management with rollbacks. Architecture: LCEL is Directed Acyclic Graph (one-way, no loops), LangGraph is full graph (supports cycles). State: LCEL requires manual state via RunnablePassthrough, LangGraph has centralized state with history. Example LCEL use case: translate text, classify documents, simple Q&A bot. Example LangGraph use case: research agent that iterates on tasks, collaborative agents, long-running workflows needing checkpoints. Migration path: start with LCEL, migrate to LangGraph when loops/state needed. Complementary: use both - LCEL for subchains inside LangGraph nodes. Performance: LCEL faster for simple chains due to less overhead. Use LCEL if: single LLM call or prompt-model-parser pattern. Use LangGraph if: 'while True' loop in your logic.

99% confidence
A

Prompt composition with from_messages() and MessagesPlaceholder. Basic: from langchain_core.prompts import ChatPromptTemplate; prompt = ChatPromptTemplate.from_messages([('system', 'You are {role}'), ('human', '{input}')]). Multiple message types: from_messages([('system', system_msg), ('human', human_msg), ('ai', ai_msg)]). Dynamic history with MessagesPlaceholder: from langchain_core.prompts import MessagesPlaceholder; prompt = ChatPromptTemplate.from_messages([('system', 'Be helpful'), MessagesPlaceholder('history'), ('human', '{question}')]). Invoke: prompt.invoke({'history': [HumanMessage('Hi'), AIMessage('Hello')], 'question': 'What?'}). Alternative placeholder syntax: ('placeholder', '{msgs}') instead of MessagesPlaceholder('msgs'). Chain prompts: partial_prompt = ChatPromptTemplate.from_template('Context: {context}'); full_prompt = partial_prompt + ChatPromptTemplate.from_template('Question: {question}'). Reuse templates: base = ChatPromptTemplate.from_messages([('system', '{instructions}')]); qa_prompt = base.partial(instructions='Answer questions'). Prompt implements Runnable: prompt | model | parser works. Essential for dynamic conversational flows and template reuse.

99% confidence
A

Multi-modal chains use vision models (GPT-4V, GPT-4o, Claude 3) for image+text. Pattern: from langchain_core.messages import HumanMessage; from langchain_openai import ChatOpenAI; model = ChatOpenAI(model='gpt-4o'); message = HumanMessage(content=[{'type': 'text', 'text': 'Describe this image'}, {'type': 'image_url', 'image_url': {'url': 'https://...'}}]); response = model.invoke([message]). Local images: import base64; def encode_image(path): with open(path, 'rb') as f: return base64.b64encode(f.read()).decode(); image_data = f'data:image/jpeg;base64,{encode_image('img.jpg')}'; use in image_url. Multi-modal RAG: retriever returns Document objects with image metadata, extract image URLs, pass to vision model with text context. Chain pattern: chain = retriever | format_images_and_text | vision_model | parser. Structured output: use with_structured_output() with vision models for JSON extraction from images. Batch images: model.batch([message1, message2]). Streaming: model.stream(message) for token-by-token. Use case: document analysis with charts, visual Q&A, receipt extraction, slide deck analysis. Supports PNG, JPEG, WebP formats. Essential for document-heavy RAG systems.

99% confidence
A

Testing LCEL chains with mocks and fixtures. Mock LLM: from langchain_core.language_models import FakeListLLM; fake_llm = FakeListLLM(responses=['Response 1', 'Response 2']); chain = prompt | fake_llm | parser; result = chain.invoke(input). Custom fake with input capture: class FakeLLMWithHistory(FakeListLLM): def init(self, responses): super().init(responses=responses); self.inputs = []; def invoke(self, input, config=None): self.inputs.append(input); return super().invoke(input, config). Unit test pattern: import pytest; def test_chain(): chain = build_chain(); result = chain.invoke({'question': 'test'}); assert 'expected' in result. Mock retrievers: fake_retriever = FakeRetriever(documents=[doc1, doc2]). VCR for real API recording: pip install pytest-recording vcrpy; @pytest.mark.vcr; def test_with_real_llm(): # first run records, subsequent runs replay. Integration tests: use LangSmith datasets for regression testing. Test streaming: assert list(chain.stream(input)) == expected_chunks. Test batch: assert chain.batch([input1, input2]) == [output1, output2]. Mock callbacks: track events with custom CallbackHandler. Essential for CI/CD without API costs.

99% confidence
A

Performance optimization via batch(), RunnableParallel, and async. Batching: results = chain.batch([input1, input2, input3], config={'max_concurrency': 5}). Batch is 3-10x faster than loop of invoke() due to API batching and connection pooling. Async batching: results = await chain.abatch(inputs). Parallel execution: parallel = RunnableParallel(task1=chain1, task2=chain2, task3=chain3); results = parallel.invoke(input). Runs truly parallel with asyncio. Async chains: use ainvoke(), astream(), abatch() for concurrent execution. Example: import asyncio; results = await asyncio.gather(*[chain.ainvoke(inp) for inp in inputs]). Map reduce: chain.map() for embarrassingly parallel tasks. Concurrency control: config={'max_concurrency': 10} limits parallel calls to prevent rate limits. Streaming for UX: chain.stream() reduces time-to-first-token even if total time same. Lazy evaluation: LCEL optimizes execution plan automatically. Cache layer: @cache_decorator or LangChain cache (InMemoryCache, SQLiteCache, RedisCache). Batch size tuning: test 10-100 for optimal throughput. Essential for production-scale applications handling concurrent users.

99% confidence
A

Caching strategies: InMemoryCache, SQLiteCache, RedisCache, GPTCache. InMemoryCache: import langchain; from langchain.cache import InMemoryCache; langchain.llm_cache = InMemoryCache(). Fast but session-only, lost on restart. SQLiteCache: from langchain.cache import SQLiteCache; langchain.llm_cache = SQLiteCache(database_path='.langchain.db'). Persistent disk-based, survives restarts. RedisCache: from langchain.cache import RedisCache; from redis import Redis; langchain.llm_cache = RedisCache(redis_=Redis(host='localhost', port=6379)). Distributed caching for multi-instance production. GPTCache: from langchain.cache import GPTCache; import gptcache; langchain.llm_cache = GPTCache(gptcache.Cache()). Semantic caching - matches similar queries, not just exact. Cache key: based on prompt + model + temperature. Exact match only (except GPTCache). Clear cache: langchain.llm_cache.clear(). Per-chain caching: chain.with_config(cache=custom_cache). Benefits: 50-90% cost reduction, <10ms response for cached, better UX. Use InMemoryCache for dev, SQLiteCache for single-instance prod, RedisCache for multi-instance, GPTCache for semantic similarity. Essential for production cost optimization.

99% confidence
A

LCEL is official LangChain approach, decorators are third-party library. LCEL: uses pipe operator (chain = prompt | model | parser), declarative syntax, built-in streaming/batching/async, automatic schema validation, official LangChain support, recommended for v0.2+. LangChain decorators: third-party library (not official), uses @llm_prompt decorator to turn functions into chains, Pythonic multiline prompts, syntactic sugar for readability. Example decorator: from langchain_decorators import llm_prompt; @llm_prompt; def summarize(text: str) -> str: '''Summarize: {text}'''. Example LCEL: prompt = PromptTemplate.from_template('Summarize: {text}'); chain = prompt | model. Key difference: LCEL is composition-based (composing Runnables), decorators are function-based (decorating Python functions). Performance: LCEL has built-in optimizations (lazy evaluation, parallel execution), decorators add overhead. Compatibility: LCEL integrates with LangServe, LangSmith, LangGraph; decorators limited. Migration: decorators not maintained for v0.2+. Recommendation: use LCEL for production (official, supported, optimized), decorators only if prefer decorator syntax for prototyping. LCEL is the future of LangChain chains.

99% confidence
A

Major breaking changes in v0.2 migration released May 2024. Package restructuring: langchain now decoupled from langchain-community, both depend on langchain-core. Integration-agnostic: langchain no longer auto-instantiates specific models (OpenAI, Anthropic, etc.), must import from langchain-openai, langchain-anthropic, etc. Deprecated methods: predict_messages() → invoke(), run() → invoke(), call() → invoke(). LLMChain deprecated: use LCEL (prompt | model | parser). Memory classes deprecated: ConversationBufferMemory, ConversationSummaryMemory → use RunnableWithMessageHistory. Import changes: langchain.chat_models → langchain-openai, langchain-anthropic, etc. Tool calling: functions parameter → bind_tools() method. Output parsers: some deprecated in favor of with_structured_output(). Migration tool: langchain-cli migrate updates imports automatically. Install: pip install langchain-cli; langchain-cli migrate . Version pinning: pin langchain-core and langchain to same minor version. Testing: comprehensive test suite required after migration. Timeline: v0.1 legacy support limited, migrate ASAP. Largely backward compatible but requires import updates. Use migration guide at https://python.langchain.com/docs/versions/v0_2/ for complete checklist.

99% confidence
A

Common pitfalls: type mismatches, missing config, complex debugging, state management. Type errors: pipe (|) operator requires compatible input/output types. Fix: add type hints Runnable[InputType, OutputType], use RunnableLambda to transform between types. Debug type flow: chain.get_graph().print_ascii() shows expected types. Missing verbose output: LCEL chains don't respect verbose=True. Fix: import langchain; langchain.debug = True for all chains, or use callbacks: chain.invoke(input, config={'callbacks': [StdOutCallbackHandler()]}). Debugging long chains: use astream_events(version='v2') to see all intermediate steps. State errors: LCEL is stateless by default. Fix: use RunnablePassthrough.assign() or RunnableWithMessageHistory. Silent failures: errors in parallel chains may not surface. Fix: wrap in try/except, use .with_fallbacks([fallback_chain]). Performance issues: blocking I/O in sync chains. Fix: use async variants (ainvoke, astream, abatch). Over-complexity: LCEL chains with hundreds of steps hard to debug. Fix: use LangGraph for complex workflows, keep LCEL for simple chains. Streaming not working: one component in chain doesn't support streaming. Fix: check each component supports .stream(). LangSmith integration: set LANGCHAIN_TRACING_V2=true for automatic observability. Essential debugging: debug=True, astream_events, LangSmith, get_graph().

99% confidence
A

RunnableMap (alias RunnableParallel) runs multiple runnables in parallel, returns dict. Pattern: from langchain_core.runnables import RunnableParallel; parallel = RunnableParallel(summary=summarize_chain, keywords=keyword_chain, sentiment=sentiment_chain); results = parallel.invoke(text). Returns {'summary': '...', 'keywords': [...], 'sentiment': '...'}. Dict syntax shorthand: parallel = {'summary': chain1, 'keywords': chain2} automatically creates RunnableParallel. Input distribution: same input passed to all branches. Different inputs: use RunnableLambda to transform: parallel = RunnableParallel(data=lambda x: x['data'], metadata=lambda x: x['meta']) | next_step. Combining with passthrough: {'original': RunnablePassthrough(), 'processed': processing_chain}. Nested parallel: RunnableParallel(branch1=RunnableParallel(sub1=..., sub2=...), branch2=...). Execution: truly parallel with asyncio, faster than sequential. Error handling: if one fails, entire parallel fails. Use .with_fallbacks() on individual branches. Batch parallel: parallel.batch([input1, input2]) batches each branch. Essential for multi-aspect processing (RAG + classification + summarization simultaneously).

99% confidence
A

MessagesPlaceholder injects variable-length message lists into prompts. Import: from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder. Basic pattern: prompt = ChatPromptTemplate.from_messages([('system', 'You are helpful'), MessagesPlaceholder('history'), ('human', '{question}')]). Invoke with history: from langchain_core.messages import HumanMessage, AIMessage; result = prompt.invoke({'history': [HumanMessage('Hi'), AIMessage('Hello')], 'question': 'What is LangChain?'}). Variable-length: history can be empty list [] or 100 messages, placeholder adapts. Multiple placeholders: ChatPromptTemplate.from_messages([MessagesPlaceholder('context'), MessagesPlaceholder('history'), ('human', '{input}')]). Optional placeholder: MessagesPlaceholder('history', optional=True) allows invoke without history key. Shorthand syntax: ('placeholder', '{history}') equivalent to MessagesPlaceholder('history'). Limit messages: MessagesPlaceholder('history', n_messages=10) takes last 10 only. Chain with memory: use RunnableWithMessageHistory for automatic history management. Essential for chatbots needing dynamic conversation context.

99% confidence
A

configurable_alternatives() enables runtime component swapping. Pattern: from langchain_core.runnables import ConfigurableField; model = ChatOpenAI(model='gpt-4').configurable_alternatives(ConfigurableField(id='llm'), default_key='openai', anthropic=ChatAnthropic(model='claude-3-sonnet'), google=ChatGoogleGenerativeAI(model='gemini-pro')). Invoke with selection: result = chain.invoke(input, config={'configurable': {'llm': 'anthropic'}}). Default: uses 'openai' if no config specified. Multiple configurables: chain.configurable_fields(temperature=ConfigurableField(id='temp')).configurable_alternatives(...). Chain-level: (prompt | model.configurable_alternatives(...) | parser).invoke(input, config={'configurable': {'llm': 'google'}}). Use cases: A/B testing models, user-selected models in UI, fallback models, cost optimization (cheap model for simple queries). Retriever alternatives: retriever.configurable_alternatives(ConfigurableField(id='search'), vector=vector_retriever, keyword=bm25_retriever). Prompt alternatives: similar pattern for dynamic prompt switching. Config inheritance: alternatives inherit config from parent chain. Production pattern: define alternatives once, select at request time based on user tier, query complexity, or feature flags. Essential for flexible multi-model deployments.

99% confidence
A

Both parse LLM output to structured data, PydanticOutputParser adds validation. JsonOutputParser: from langchain_core.output_parsers import JsonOutputParser; parser = JsonOutputParser(); chain = prompt | model | parser. Extracts JSON from text, returns dict. No validation. Use when: need simple JSON extraction without schema. PydanticOutputParser: from langchain_core.output_parsers import PydanticOutputParser; from pydantic import BaseModel, Field; class Person(BaseModel): name: str = Field(description='Person name'); age: int; parser = PydanticOutputParser(pydantic_object=Person); chain = prompt | model | parser. Returns Person instance, validates types automatically. Use when: need type safety, validation, IDE autocomplete. Prompt formatting: parser.get_format_instructions() returns schema description for prompt. Example: prompt = PromptTemplate(template='Extract person.\n{format_instructions}\nText: {text}', partial_variables={'format_instructions': parser.get_format_instructions()}). Error handling: PydanticOutputParser raises ValidationError on schema mismatch, JsonOutputParser raises on invalid JSON. Alternative v0.2+: model.with_structured_output(Person) - cleaner, uses function calling. Recommendation: use with_structured_output() for v0.2+, parsers for legacy or non-function-calling models. Essential for reliable structured extraction.

99% confidence
A

RunnableLambda wraps Python functions to make them chainable with LCEL. Basic: from langchain_core.runnables import RunnableLambda; def process(x): return x.upper(); chain = prompt | model | RunnableLambda(process). Decorator syntax: @RunnableLambda; def process(x): return x * 2; chain = input_step | process | output_step. Multiple inputs: def merge(inputs): return inputs['a'] + inputs['b']; merger = RunnableLambda(merge). Async functions: async def async_process(x): await asyncio.sleep(1); return x; chain = prompt | RunnableLambda(async_process). Context access: def with_config(x, config): return f'{x} - {config.get('tags')}'; chain = RunnableLambda(with_config). Limitations: RunnableLambda doesn't preserve streaming - use RunnableGenerator for streaming. Not serializable - for serialization, create custom Runnable subclass. Use cases: data formatting between chain steps, conditional logic, external API calls, custom preprocessing. Simple alternative to full Runnable class. Compose with other runnables: RunnableLambda(func1) | RunnableLambda(func2) | model. Essential for quick custom logic in LCEL chains.

99% confidence
A

Partial variables pre-fill some prompt variables while leaving others dynamic. Two methods: partial() and partial_variables. Using partial(): prompt = PromptTemplate.from_template('Tell me about {topic} in {language}'); partial_prompt = prompt.partial(language='English'); result = partial_prompt.invoke({'topic': 'AI'}). Using partial_variables at creation: prompt = PromptTemplate(template='Today is {date}. Question: {question}', input_variables=['question'], partial_variables={'date': lambda: datetime.now().strftime('%Y-%m-%d')}). Function-based partials: def get_current_time(): return datetime.now().strftime('%H:%M'); prompt = ChatPromptTemplate.from_messages([('system', 'Current time: {time}'), ('human', '{input}')]).partial(time=get_current_time). Use case: inject context available early (user ID, timestamp) while deferring user input. Multi-stage chains: stage1_prompt = base_prompt.partial(context=retrieved_docs); stage2_result = stage1_prompt.invoke({'question': user_question}). ChatPromptTemplate: chat_prompt.partial(system_context='You are helpful', user_name='Alice'). Essential for reusable prompts with some fixed context and some runtime variables. Reduces prompt passing through intermediate chain steps.

99% confidence
A

LCEL provides multiple streaming mechanisms: stream(), astream(), astream_log(), astream_events(). Basic streaming: for chunk in chain.stream(input): print(chunk, end='', flush=True). Streams final output incrementally. Async streaming: async for chunk in chain.astream(input): process(chunk). Intermediate results with astream_log() (deprecated v0.2): async for log in chain.astream_log(input): if log['type'] == 'llm': print(log['content']). Recommended v0.2+: astream_events(version='v2'): async for event in chain.astream_events(input, version='v2'): if event['event'] == 'on_chain_end': print(event['data']['output']). Filter intermediate: if event['name'] == 'Retriever': docs = event['data']['output']. Stream from specific components: use include_names=['ChatOpenAI'] or include_types=['llm']. Preserve streaming through chain: all components must support streaming (model.stream(), parser.stream()). RunnableLambda breaks streaming - use RunnableGenerator instead. Streaming to client: LangServe automatically exposes /stream endpoint for deployed chains. Essential for real-time UIs, progressive rendering, reduced perceived latency. Streaming doesn't reduce total time but improves UX significantly.

99% confidence
A

LCEL provides async methods for concurrent processing: ainvoke(), abatch(), astream(). Async single call: result = await chain.ainvoke(input). Concurrent multiple calls: import asyncio; results = await asyncio.gather(*[chain.ainvoke(inp) for inp in inputs]). Batch method: results = chain.batch([input1, input2, input3]). Automatically parallelizes internally, 3-10x faster than loop. Async batch: results = await chain.abatch(inputs). Max concurrency: results = await chain.abatch(inputs, config={'max_concurrency': 5}). Limits parallel API calls to avoid rate limits. Parallel chains: parallel = RunnableParallel(task1=chain1, task2=chain2); results = await parallel.ainvoke(input). Runs tasks truly concurrent with asyncio. Performance: sync batch uses threadpool, async batch uses asyncio - async generally faster for I/O bound (API calls). Error handling: await asyncio.gather(*tasks, return_exceptions=True) to handle errors without stopping all tasks. Same chain code: chain works in sync (invoke), async (ainvoke), batch, streaming without modifications. Production pattern: use async for web servers (FastAPI, Flask async), sync for scripts. Essential for high-throughput applications and optimal API utilization.

99% confidence
A

LCEL composition operators: pipe (|), RunnableSequence, RunnableParallel, RunnableBranch, RunnablePassthrough. Pipe operator (|): most common, chains components sequentially. chain = prompt | model | parser. Creates RunnableSequence automatically. Dict shorthand for parallel: parallel = {'summary': chain1, 'keywords': chain2} equivalent to RunnableParallel(summary=chain1, keywords=chain2). List shorthand: [retriever1, retriever2] for multiple retrievers (less common). Passthrough shortcuts: {'original': RunnablePassthrough(), 'processed': chain} preserves input while processing. Assign shortcut: RunnablePassthrough.assign(new_field=transformation) adds fields to dict. Lambda shortcut: instead of RunnableLambda class, use decorator: @RunnableLambda; def func(x): return x. Branching: RunnableBranch((condition, chain), default) for conditional routing. Composition nesting: (prompt | model).with_retry() | parser.with_fallbacks([fallback_parser]). Method chaining: chain.with_config().with_retry().with_fallbacks(). Type inference: LCEL auto-infers input/output types through chain. Essential shortcuts: use | for sequence, {} for parallel, RunnablePassthrough.assign() for adding fields, RunnableBranch for routing. Keep chains readable - overly nested operators hurt maintainability.

99% confidence
A

Production LCEL deployment best practices: use LangServe, enable observability, implement retries/fallbacks, optimize performance. LangServe deployment: from langserve import add_routes; add_routes(app, chain, path='/chain'). Auto-creates REST API with streaming support. Observability: set LANGCHAIN_TRACING_V2=true, LANGCHAIN_API_KEY for LangSmith tracing. Track costs, latency, errors in dashboard. Retries: chain.with_retry(stop_after_attempt=3, wait_exponential_multiplier=1). Prevents transient failures. Fallbacks: primary_chain.with_fallbacks([cheaper_model_chain, cached_chain]). Graceful degradation. Caching: use RedisCache for distributed systems: langchain.llm_cache = RedisCache(). Rate limiting: config={'max_concurrency': 10} prevents API throttling. Error handling: wrap in try/except, return user-friendly errors, log to monitoring. Async for web servers: use FastAPI with async def endpoints, call await chain.ainvoke(). Monitoring: track p95 latency, error rates, token usage. Use Prometheus + Grafana. Testing: unit tests with FakeListLLM, integration tests with VCR recording. Avoid mixing legacy chains and LCEL - complicates debugging. For complex workflows (loops, branching), prefer LangGraph over LCEL. Essential: tracing, retries, fallbacks, caching, async, monitoring.

99% confidence