Haystack's component protocol is powerful but poorly explained. Here is how to write custom components that work first time.
Why Custom Components
Haystack ships with a rich library of built-in components: document stores, retrievers, generators, routers, and rankers. For most standard RAG and agent pipelines, these cover everything you need. When they do not -- when you need a custom data source, a proprietary API, a bespoke ranking algorithm, or specialised preprocessing -- you write a custom component.
The component protocol is elegant but terse in the docs. This article covers the full pattern so your custom components slot into any Haystack pipeline without friction.
The Component Protocol
A Haystack component is any Python class decorated with @component that implements a run() method. The decorator inspects your run() method's type annotations to determine the component's inputs and outputs -- these are what other components can connect to.
from haystack import component, Document
from haystack.core.component.types import Greedy
from typing import Optional, List
@component
class TextCleaner:
"""Cleans and normalises text in Documents before indexing."""
def __init__(self, lowercase: bool = True, remove_urls: bool = True):
# __init__ is for configuration -- not for I/O
self.lowercase = lowercase
self.remove_urls = remove_urls
@component.output_types(documents=List[Document])
def run(self, documents: List[Document]) -> dict:
# run() receives named inputs and returns a dict of named outputs
# The type annotations on run() define the component's I/O contract
import re
cleaned = []
for doc in documents:
text = doc.content or ""
if self.lowercase:
text = text.lower()
if self.remove_urls:
text = re.sub(r'https?://\S+', '', text)
cleaned.append(Document(content=text.strip(), meta=doc.meta))
return {"documents": cleaned} # key must match output_types declarationThe dict returned by run() must have exactly the keys declared in @component.output_types. Extra keys are ignored; missing keys cause a runtime error. Always match them exactly.Multiple Outputs and Conditional Routing
Components can have multiple outputs, allowing pipelines to branch based on the component's decision. This is how Haystack builds conditional routing without if-else logic in the pipeline definition.
from haystack import component
@component
class QueryClassifier:
"""Routes queries to different retrieval paths based on query type."""
@component.output_types(
factual_query=str,
conversational_query=str,
out_of_scope_query=str,
)
def run(self, query: str) -> dict:
query_lower = query.lower()
# Simple keyword-based routing (replace with LLM for production)
if any(w in query_lower for w in ["what", "when", "who", "define", "explain"]):
return {"factual_query": query, "conversational_query": None, "out_of_scope_query": None}
elif any(w in query_lower for w in ["hi", "hello", "thanks", "bye"]):
return {"factual_query": None, "conversational_query": query, "out_of_scope_query": None}
else:
return {"factual_query": None, "conversational_query": None, "out_of_scope_query": query}from haystack import Pipeline
# Connect multi-output component to different pipeline branches
pipeline = Pipeline()
pipeline.add_component("classifier", QueryClassifier())
pipeline.add_component("factual_retriever", InMemoryBM25Retriever(document_store=store))
pipeline.add_component("chat_responder", OpenAIChatGenerator())
# Connect specific outputs to specific downstream inputs
pipeline.connect("classifier.factual_query", "factual_retriever.query")
pipeline.connect("classifier.conversational_query", "chat_responder.messages")Optional Inputs and Defaults
Use Python's Optional type to mark inputs that are not always required. Haystack handles None inputs gracefully when you design for them.
from typing import Optional, List
from haystack import component, Document
@component
class MetadataFilter:
@component.output_types(documents=List[Document])
def run(
self,
documents: List[Document],
filter_key: Optional[str] = None, # optional -- filter only if provided
filter_value: Optional[str] = None,
) -> dict:
if filter_key is None or filter_value is None:
# Pass through unchanged when no filter specified
return {"documents": documents}
filtered = [
doc for doc in documents
if doc.meta.get(filter_key) == filter_value
]
return {"documents": filtered}Stateful Components
Some components need to maintain state between pipeline runs (e.g. a cache, a counter, a connection pool). Store state as instance attributes -- Haystack components are long-lived objects.
from haystack import component
from functools import lru_cache
import hashlib
@component
class CachingEmbedder:
"""Wraps an embedder with a simple in-memory cache."""
def __init__(self, embedder):
self.embedder = embedder
self._cache: dict[str, list[float]] = {}
self.cache_hits = 0
self.cache_misses = 0
@component.output_types(embedding=list)
def run(self, text: str) -> dict:
cache_key = hashlib.md5(text.encode()).hexdigest()
if cache_key in self._cache:
self.cache_hits += 1
return {"embedding": self._cache[cache_key]}
self.cache_misses += 1
result = self.embedder.run(text=text)
self._cache[cache_key] = result["embedding"]
return {"embedding": result["embedding"]}Testing Custom Components
Test components in isolation before adding them to a pipeline. run() is a plain Python method -- no Haystack-specific test setup required.
def test_text_cleaner_lowercases():
cleaner = TextCleaner(lowercase=True, remove_urls=False)
docs = [Document(content="Hello World")]
result = cleaner.run(documents=docs)
assert result["documents"][0].content == "hello world"
def test_text_cleaner_removes_urls():
cleaner = TextCleaner(lowercase=False, remove_urls=True)
docs = [Document(content="Visit https://example.com for more")]
result = cleaner.run(documents=docs)
assert "https" not in result["documents"][0].content
def test_query_classifier_routes_factual():
classifier = QueryClassifier()
result = classifier.run(query="What is the capital of France?")
assert result["factual_query"] == "What is the capital of France?"
assert result["conversational_query"] is NoneQuick Reference
- Decorate with @component and implement run() with typed parameters
- Declare outputs with @component.output_types -- keys must match exactly what run() returns
- Use Optional[type] = None for inputs that are not always provided
- Return None for unused outputs in multi-output components
- Store state as instance attributes -- components are long-lived
- Test run() as a plain Python function -- no special test harness needed