Haystack's component protocol is powerful but poorly explained. Here is how to write custom components that work first time.

Why Custom Components

Haystack ships with a rich library of built-in components: document stores, retrievers, generators, routers, and rankers. For most standard RAG and agent pipelines, these cover everything you need. When they do not -- when you need a custom data source, a proprietary API, a bespoke ranking algorithm, or specialised preprocessing -- you write a custom component.

The component protocol is elegant but terse in the docs. This article covers the full pattern so your custom components slot into any Haystack pipeline without friction.

The Component Protocol

A Haystack component is any Python class decorated with @component that implements a run() method. The decorator inspects your run() method's type annotations to determine the component's inputs and outputs -- these are what other components can connect to.

from haystack import component, Document
from haystack.core.component.types import Greedy
from typing import Optional, List
 
@component
class TextCleaner:
    """Cleans and normalises text in Documents before indexing."""
 
    def __init__(self, lowercase: bool = True, remove_urls: bool = True):
        # __init__ is for configuration -- not for I/O
        self.lowercase = lowercase
        self.remove_urls = remove_urls
 
    @component.output_types(documents=List[Document])
    def run(self, documents: List[Document]) -> dict:
        # run() receives named inputs and returns a dict of named outputs
        # The type annotations on run() define the component's I/O contract
        import re
        cleaned = []
        for doc in documents:
            text = doc.content or ""
            if self.lowercase:
                text = text.lower()
            if self.remove_urls:
                text = re.sub(r'https?://\S+', '', text)
            cleaned.append(Document(content=text.strip(), meta=doc.meta))
        return {"documents": cleaned}  # key must match output_types declaration
The dict returned by run() must have exactly the keys declared in @component.output_types. Extra keys are ignored; missing keys cause a runtime error. Always match them exactly.

Multiple Outputs and Conditional Routing

Components can have multiple outputs, allowing pipelines to branch based on the component's decision. This is how Haystack builds conditional routing without if-else logic in the pipeline definition.

from haystack import component
 
@component
class QueryClassifier:
    """Routes queries to different retrieval paths based on query type."""
 
    @component.output_types(
        factual_query=str,
        conversational_query=str,
        out_of_scope_query=str,
    )
    def run(self, query: str) -> dict:
        query_lower = query.lower()
 
        # Simple keyword-based routing (replace with LLM for production)
        if any(w in query_lower for w in ["what", "when", "who", "define", "explain"]):
            return {"factual_query": query, "conversational_query": None, "out_of_scope_query": None}
        elif any(w in query_lower for w in ["hi", "hello", "thanks", "bye"]):
            return {"factual_query": None, "conversational_query": query, "out_of_scope_query": None}
        else:
            return {"factual_query": None, "conversational_query": None, "out_of_scope_query": query}
from haystack import Pipeline
 
# Connect multi-output component to different pipeline branches
pipeline = Pipeline()
pipeline.add_component("classifier", QueryClassifier())
pipeline.add_component("factual_retriever", InMemoryBM25Retriever(document_store=store))
pipeline.add_component("chat_responder", OpenAIChatGenerator())
 
# Connect specific outputs to specific downstream inputs
pipeline.connect("classifier.factual_query", "factual_retriever.query")
pipeline.connect("classifier.conversational_query", "chat_responder.messages")

Optional Inputs and Defaults

Use Python's Optional type to mark inputs that are not always required. Haystack handles None inputs gracefully when you design for them.

from typing import Optional, List
from haystack import component, Document
 
@component
class MetadataFilter:
    @component.output_types(documents=List[Document])
    def run(
        self,
        documents: List[Document],
        filter_key: Optional[str] = None,   # optional -- filter only if provided
        filter_value: Optional[str] = None,
    ) -> dict:
        if filter_key is None or filter_value is None:
            # Pass through unchanged when no filter specified
            return {"documents": documents}
 
        filtered = [
            doc for doc in documents
            if doc.meta.get(filter_key) == filter_value
        ]
        return {"documents": filtered}

Stateful Components

Some components need to maintain state between pipeline runs (e.g. a cache, a counter, a connection pool). Store state as instance attributes -- Haystack components are long-lived objects.

from haystack import component
from functools import lru_cache
import hashlib
 
@component
class CachingEmbedder:
    """Wraps an embedder with a simple in-memory cache."""
 
    def __init__(self, embedder):
        self.embedder = embedder
        self._cache: dict[str, list[float]] = {}
        self.cache_hits = 0
        self.cache_misses = 0
 
    @component.output_types(embedding=list)
    def run(self, text: str) -> dict:
        cache_key = hashlib.md5(text.encode()).hexdigest()
 
        if cache_key in self._cache:
            self.cache_hits += 1
            return {"embedding": self._cache[cache_key]}
 
        self.cache_misses += 1
        result = self.embedder.run(text=text)
        self._cache[cache_key] = result["embedding"]
        return {"embedding": result["embedding"]}

Testing Custom Components

Test components in isolation before adding them to a pipeline. run() is a plain Python method -- no Haystack-specific test setup required.

def test_text_cleaner_lowercases():
    cleaner = TextCleaner(lowercase=True, remove_urls=False)
    docs = [Document(content="Hello World")]
    result = cleaner.run(documents=docs)
    assert result["documents"][0].content == "hello world"
 
def test_text_cleaner_removes_urls():
    cleaner = TextCleaner(lowercase=False, remove_urls=True)
    docs = [Document(content="Visit https://example.com for more")]
    result = cleaner.run(documents=docs)
    assert "https" not in result["documents"][0].content
 
def test_query_classifier_routes_factual():
    classifier = QueryClassifier()
    result = classifier.run(query="What is the capital of France?")
    assert result["factual_query"] == "What is the capital of France?"
    assert result["conversational_query"] is None

Quick Reference

  • Decorate with @component and implement run() with typed parameters
  • Declare outputs with @component.output_types -- keys must match exactly what run() returns
  • Use Optional[type] = None for inputs that are not always provided
  • Return None for unused outputs in multi-output components
  • Store state as instance attributes -- components are long-lived
  • Test run() as a plain Python function -- no special test harness needed