close

DEV Community

Amito Vrito
Amito Vrito

Posted on

Why I Modelled My LLM Pipeline as a DAG Instead of a Chain — and What I Found Out

The problem with chains in production

Every major Python LLM framework gives you the same primitive: a chain.

LangChain's LCEL. LlamaIndex's pipeline. Haystack's components. They all model your pipeline as a linear sequence of steps — input flows through A, then B, then C, output comes out the end.

For a hello-world RAG demo, that's fine. For a production system, you hit the wall fast.

What chains can't express cleanly

Here's a real pipeline I needed to build:

  1. Classify the incoming query
  2. Based on classification: route to either semantic search, keyword search, or both in parallel
  3. If both: merge and re-rank results
  4. Generate response from ranked context
  5. If any retrieval stage fails: surface a clear error, don't silently continue

Try expressing that as a chain. You end up with nested chains, manual asyncio.gather() calls outside the framework, try/except blocks swallowing exceptions to keep the chain going, and no clean way to express "step D depends on both B and C."

The abstraction is fighting you.

DAGs are the right model

A directed acyclic graph expresses all of this naturally.

Nodes are tasks. Edges are data dependencies. Execution is topologically ordered — a node fires when all its upstream dependencies have resolved.

from synapsekit import Pipeline

pipeline = Pipeline()
pipeline.add_node("classify", ClassifierNode())
pipeline.add_node("semantic_search", RAGNode(store=vector_store))
pipeline.add_node("keyword_search", BM25Node(index=bm25_index))
pipeline.add_node("rerank", RerankerNode())
pipeline.add_node("generate", LLMNode(model="gpt-4o"))

pipeline.add_edge("classify", "semantic_search")
pipeline.add_edge("classify", "keyword_search")
pipeline.add_edge("semantic_search", "rerank")
pipeline.add_edge("keyword_search", "rerank")
pipeline.add_edge("rerank", "generate")

result = await pipeline.run(query="explain async-native design")
Enter fullscreen mode Exit fullscreen mode

semantic_search and keyword_search run concurrently. rerank waits for both. generate waits for rerank. The execution engine handles ordering. You describe the dependencies.

Failure propagation that actually works

In a chain, a failed step either kills the whole pipeline or gets swallowed silently.

In a DAG, failure has semantics. If semantic_search fails, rerank — which depends on it — is cancelled. generate — which depends on rerank — is also cancelled. You get a clear error naming the failed node and its dependents.

No silent degradation. Failure is explicit and traceable.

The async piece

Nodes with no dependency relationship between them run concurrently automatically. The execution engine handles asyncio.gather() at each topological level. You write individual async node functions. The graph handles orchestration.

class SemanticSearchNode(Node):
    async def execute(self, inputs):
        results = await self.vector_store.asearch(inputs["query"])
        return {"results": results}
Enter fullscreen mode Exit fullscreen mode

No manual gather calls. The graph structure encodes the parallelism.

Is it worth the complexity?

For simple A → B → C pipelines with no branching: a chain is fine.

The moment you have parallel retrieval, conditional routing, or stages where failure isolation matters — the chain abstraction costs more than it saves.

SynapseKit is the framework I built around this model:
https://github.com/SynapseKit/SynapseKit
API Doc: https://synapsekit.github.io/synapsekit-docs/

10k PyPI downloads since launch. The engineers who need this know exactly why.


What does your production RAG architecture look like? Drop it in the comments.

Top comments (0)