
Building a Production RAG Application with Clean Architecture
Master Clean Architecture patterns with FastAPI, Lagom DI, React, Qdrant, and self-hosted Ollama. Complete local development guide with working examples.
Building a Production RAG Application with Clean Architecture
Your AI application works beautifully in development. You've got embeddings flowing, vector searches returning relevant chunks, and an LLM generating coherent responses. Then requirements change: swap OpenAI for self-hosted Ollama, replace FAISS with Qdrant, add a React frontend. Suddenly, your monolithic codebase becomes a tangled mess of dependencies.
Sound familiar? This is where Clean Architecture transforms chaos into confidence. By inverting dependencies and isolating business logic, you can swap entire infrastructure layers without touching your core application.
In this comprehensive guide, we'll build a production-ready RAG (Retrieval-Augmented Generation) system using a battle-tested stack: FastAPI with Lagom for dependency injection, Qdrant for vector storage, self-hosted Ollama for embeddings and LLM inference, and a modern React + Vite + shadcn/ui frontend powered by TanStack Query and TanStack Table.
What is RAG and When Should You Use It?
Retrieval-Augmented Generation (RAG) is an architectural pattern that grounds LLM responses in your own data. Instead of relying solely on the model's training knowledge, RAG retrieves relevant documents from your corpus and includes them as context in the prompt. The pattern was introduced by Facebook AI Research and has become the standard approach for building knowledge-grounded AI applications.
The RAG Pipeline
User Query
↓
1. RETRIEVE: Search your document corpus
- Convert query to embedding vector
- Find semantically similar chunks
- (Optional) Hybrid search with keywords
↓
2. AUGMENT: Build context-enriched prompt
- Inject retrieved chunks into prompt
- Add system instructions
- Format for your LLM
↓
3. GENERATE: LLM produces grounded response
- Model answers using provided context
- Can cite sources from retrieved docs
- Reduces hallucination risk
When to Use RAG
RAG is the right choice when:
- Your data changes frequently — retraining is too slow or expensive
- You need source attribution — users must verify information
- Domain-specific knowledge is required — legal, medical, internal docs
- Privacy matters — data stays in your infrastructure, never sent to training
- Budget constraints — fine-tuning costs more than retrieval infrastructure
Consider alternatives when:
- Your knowledge is static and fits in context — just use long-context models
- Latency is critical — retrieval adds ~100-500ms overhead
- Your corpus is tiny (fewer than 10 docs) — simple keyword search may suffice
Why RAG Beats Fine-Tuning for Most Use Cases
When deciding between RAG and fine-tuning, the tradeoffs become clear once you consider operational reality.
Data freshness is where RAG shines brightest. Add a document today, query it immediately. Fine-tuning requires retraining cycles that can take hours or days.
Cost follows a similar pattern. RAG infrastructure — a vector database and embedding model — is a fixed cost. Fine-tuning demands GPU hours for every update, plus the expertise to manage training runs.
Transparency matters for trust. RAG responses can cite exact source documents. Fine-tuned models bake knowledge into weights where it becomes untraceable.
The hallucination problem is nuanced. RAG grounds responses in retrieved documents, dramatically reducing fabrication. Fine-tuned models can still confidently generate false information — they just do it in your domain's style.
Setup complexity favors RAG for most teams. Standing up a vector database and retrieval pipeline is a week of work. Proper fine-tuning requires dataset curation, hyperparameter tuning, and validation — easily a month or more.
Maintenance seals the decision. RAG lets you add, update, or remove documents without touching the model. Fine-tuning requires retraining whenever your knowledge base changes.
The key insight: RAG separates what the model knows from what information is available. This separation enables the architectural patterns we'll explore throughout this guide.
The Architecture at a Glance
Clean Architecture layers diagram showing Presentation, Use Cases, Domain, and Infrastructure layers with bidirectional dependency arrows
Dependencies flow inward. The domain layer has zero external dependencies. Infrastructure implements domain interfaces. This is the key insight from Robert Martin's Clean Architecture that makes everything swappable.
Why This Stack Works
Our stack combines battle-tested tools that each solve a specific problem exceptionally well.
FastAPI powers our REST API with its async-first design, automatic OpenAPI documentation, and native Pydantic validation.
For dependency injection, we chose Lagom — a lightweight, type-safe container with seamless FastAPI integration that avoids the complexity of heavier alternatives.
Qdrant serves as our vector database, offering production-ready performance, powerful filtering capabilities, and horizontal scaling when needed.
On the AI side, Ollama handles both LLM inference and embeddings locally, eliminating API costs while maintaining privacy compliance. We leverage LangChain specifically for its proven text chunking strategies and document loaders, though we keep our core architecture framework-agnostic.
The frontend runs on React with Vite for blazing-fast HMR and TypeScript-first tooling.
shadcn/ui provides our component foundation — accessible, deeply customizable, and built on Tailwind CSS. For server state management, TanStack Query delivers intelligent caching, background refetching, and optimistic updates out of the box.
TanStack Table completes the stack with its headless, flexible approach to building feature-rich data tables without fighting against opinionated styling.
Setting Up Local Development
Let's start with a production-ready compose.yml that includes health checks, proper networking, and Docker Compose's new develop.watch feature for hot reload without volume mounting overhead:
services:
qdrant:
image: qdrant/qdrant:v1.12.0
container_name: rag-qdrant
ports:
- '6333:6333'
- '6334:6334'
volumes:
- qdrant_data:/qdrant/storage:rw
environment:
QDRANT__SERVICE__GRPC_PORT: 6334
QDRANT__LOG_LEVEL: INFO
healthcheck:
test: ['CMD', 'wget', '-qO-', 'http://localhost:6333/healthz']
interval: 10s
timeout: 5s
retries: 5
start_period: 10s
networks:
- rag-network
ollama:
image: ollama/ollama:0.4.7
container_name: rag-ollama
ports:
- '11434:11434'
volumes:
- ollama_data:/root/.ollama:rw
environment:
OLLAMA_KEEP_ALIVE: 5m
OLLAMA_NUM_PARALLEL: 2
# Auto-pull models on startup
entrypoint: ['/bin/sh', '-c']
command:
- |
/bin/ollama serve &
sleep 5
ollama pull mxbai-embed-large
ollama pull qwen2.5:0.5b
wait
healthcheck:
test: ['CMD', 'curl', '-f', 'http://localhost:11434/api/tags']
interval: 15s
timeout: 10s
retries: 5
start_period: 60s
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: all
capabilities: [gpu]
networks:
- rag-network
backend:
build:
context: .
dockerfile: Dockerfile
container_name: rag-backend
ports:
- '8421:8421'
environment:
QDRANT_HOST: qdrant
QDRANT_PORT: 6333
OLLAMA_BASE_URL: http://ollama:11434
OLLAMA_MODEL: qwen2.5:0.5b
LOG_LEVEL: INFO
depends_on:
qdrant:
condition: service_healthy
ollama:
condition: service_healthy
healthcheck:
test: ['CMD', 'curl', '-f', 'http://localhost:8421/health']
interval: 10s
timeout: 5s
retries: 3
start_period: 15s
develop:
watch:
# Sync source changes without rebuild
- action: sync
path: ./src
target: /app/src
# Restart on config changes
- action: sync+restart
path: ./config
target: /app/config
# Full rebuild on dependency changes
- action: rebuild
path: ./pyproject.toml
networks:
- rag-network
frontend:
build:
context: ./frontend
dockerfile: Dockerfile.dev
container_name: rag-frontend
ports:
- '5173:5173'
environment:
VITE_API_URL: http://localhost:8421
depends_on:
backend:
condition: service_healthy
develop:
watch:
- action: sync
path: ./frontend/src
target: /app/src
- action: rebuild
path: ./frontend/package.json
networks:
- rag-network
networks:
rag-network:
driver: bridge
volumes:
qdrant_data:
driver: local
ollama_data:
driver: local
Key production improvements:
- Pinned versions — Never use
latestin production; pin to specific versions - Health checks with
start_period— Allows containers to initialize before health checks begin depends_onwith conditions — Services wait for dependencies to be healthy, not just running- Named network — Explicit bridge network for inter-container communication
develop.watchactions — Three modes:sync(hot reload),sync+restart(reload + restart process),rebuild(full container rebuild)- Auto-pull models — Ollama entrypoint pulls required models on first startup
Start the entire stack with hot reload:
docker compose up --watch
Tip: The
mxbai-embed-largemodel produces 1024-dimensional embeddings. Always match your vector store configuration to your embedding model dimensions.
Kubernetes Tunnel: Port-Forwarding to Cloud GPU
When your team runs Ollama on a GPU-equipped Kubernetes cluster but needs local development, a tunnel container can transparently proxy traffic. The key is that this container joins the same Docker network and uses the same service name (ollama), so your backend connects to http://ollama:11434 regardless of whether it's hitting a local GPU or a cloud cluster.
Override Compose for Kubernetes Tunnel
Create compose.k8s.yml to completely replace the Ollama service:
# compose.k8s.yml - Override for K8s tunnel
services:
ollama:
# Completely replace the local Ollama with a tunnel container
image: !reset null
build:
context: .
dockerfile: Dockerfile.k8s-tunnel
container_name: rag-ollama-tunnel
entrypoint: ['/bin/sh']
command: ['/scripts/k8s-tunnel.sh']
environment:
KUBE_CONTEXT: your-aks-cluster
KUBE_NAMESPACE: ai-services
OLLAMA_SERVICE: ollama-gpu
OLLAMA_PORT: '11434'
# Override volumes completely (remove local ollama_data)
volumes: !override
- ~/.kube:/root/.kube:ro
- ~/.azure:/root/.azure:ro
# Remove GPU requirements for tunnel container
deploy: !reset {}
healthcheck:
test: ['CMD', 'curl', '-sf', 'http://localhost:11434/api/tags']
interval: 10s
timeout: 5s
retries: 10
start_period: 30s
# CRITICAL: Same network as backend - this is why it works
networks:
- rag-network
Why this works:
- The tunnel container gets the service name
ollamaonrag-network - Backend resolves
http://ollama:11434via Docker DNS - Traffic routes to tunnel container → kubectl port-forward → K8s pod
- Zero code changes needed in backend
Tunnel Dockerfile
# Dockerfile.k8s-tunnel
FROM mcr.microsoft.com/azure-cli:cbl-mariner2.0
# Install kubectl via Azure CLI
RUN az aks install-cli --only-show-errors
# Install utilities (curl for healthcheck, jq for JSON parsing)
RUN tdnf install -y curl jq && tdnf clean all
# Copy tunnel script
COPY scripts/k8s-tunnel.sh /scripts/k8s-tunnel.sh
RUN chmod +x /scripts/k8s-tunnel.sh
# Expose same port as real Ollama
EXPOSE 11434
Tunnel Script with Auto-Reconnect
#!/bin/sh
# scripts/k8s-tunnel.sh
# Establishes kubectl port-forward to K8s Ollama with auto-reconnect
set -e
log() { echo "[$(date '+%H:%M:%S')] $*"; }
log "=== Kubernetes Ollama Tunnel ==="
# Validate mounted credentials
if [ ! -f "/root/.kube/config" ]; then
log "ERROR: ~/.kube/config not mounted"
log "Mount with: -v ~/.kube:/root/.kube:ro"
exit 1
fi
if [ ! -d "/root/.azure" ]; then
log "WARNING: ~/.azure not mounted - AKS auth may fail"
log "Run 'az login' on host first"
fi
# Set Kubernetes context
log "Setting context: ${KUBE_CONTEXT:-default}"
kubectl config use-context "${KUBE_CONTEXT:-default}" || {
log "ERROR: Failed to set context. Available contexts:"
kubectl config get-contexts
exit 1
}
# Discover Ollama pod dynamically
discover_pod() {
log "Discovering Ollama pod in namespace: ${KUBE_NAMESPACE}"
POD_NAME=$(kubectl get pods -n "${KUBE_NAMESPACE}" \
-l "app=${OLLAMA_SERVICE}" \
--field-selector=status.phase=Running \
-o jsonpath='{.items[0].metadata.name}' 2>/dev/null)
if [ -z "$POD_NAME" ]; then
log "ERROR: No running pod found for app=${OLLAMA_SERVICE}"
log "Available pods:"
kubectl get pods -n "${KUBE_NAMESPACE}" -l "app=${OLLAMA_SERVICE}"
return 1
fi
log "Found pod: ${POD_NAME}"
# Check GPU allocation
GPU_COUNT=$(kubectl get pod "$POD_NAME" -n "${KUBE_NAMESPACE}" \
-o jsonpath='{.spec.containers[0].resources.limits.nvidia\.com/gpu}' 2>/dev/null)
log "GPU allocation: ${GPU_COUNT:-none}"
return 0
}
# Initial discovery
if ! discover_pod; then
log "Retrying pod discovery in 10s..."
sleep 10
discover_pod || exit 1
fi
# Main loop with auto-reconnect
RETRY_DELAY=5
MAX_RETRY_DELAY=60
while true; do
log "Starting port-forward: localhost:${OLLAMA_PORT} → ${POD_NAME}:${OLLAMA_PORT}"
# Port-forward with 0.0.0.0 to accept connections from other containers
kubectl port-forward "pod/${POD_NAME}" \
--address 0.0.0.0 \
"${OLLAMA_PORT}:${OLLAMA_PORT}" \
-n "${KUBE_NAMESPACE}" &
PF_PID=$!
# Wait for port-forward to be ready
sleep 2
# Verify connection
if curl -sf "http://localhost:${OLLAMA_PORT}/api/tags" > /dev/null 2>&1; then
log "✓ Tunnel established successfully"
RETRY_DELAY=5 # Reset delay on success
wait $PF_PID # Wait for port-forward to exit
else
log "✗ Port-forward failed to establish"
kill $PF_PID 2>/dev/null || true
fi
log "Connection lost. Reconnecting in ${RETRY_DELAY}s..."
sleep $RETRY_DELAY
# Exponential backoff (cap at MAX_RETRY_DELAY)
RETRY_DELAY=$((RETRY_DELAY * 2))
[ $RETRY_DELAY -gt $MAX_RETRY_DELAY ] && RETRY_DELAY=$MAX_RETRY_DELAY
# Re-discover pod (it may have been rescheduled)
discover_pod || {
log "Pod discovery failed, retrying..."
continue
}
done
Usage
# Local development with local GPU
docker compose up --watch
# Local development with cloud GPU (tunnel to AKS)
docker compose -f compose.yml -f compose.k8s.yml up --watch
The backend service connects to http://ollama:11434 in both cases — Docker's DNS resolution handles routing to either the local Ollama container or the tunnel container. This pattern lets teams share expensive GPU resources while maintaining identical local development workflows.
Production tip: For multi-developer teams, consider running Ollama behind a Kubernetes Service with an Ingress. The tunnel approach is ideal for development; for shared staging environments, a proper network endpoint is more reliable.
Type-Safe API Client: OpenAPI to TypeScript
One of the most powerful patterns for full-stack development is generating TypeScript types directly from your FastAPI OpenAPI schema. This creates a single source of truth: define your Pydantic models once in Python, and TypeScript types flow automatically.
Step 1: FastAPI Generates OpenAPI from Pydantic Models
FastAPI automatically generates an OpenAPI 3.0 schema from your Pydantic models and route definitions. The magic happens through Python type hints:
# src/presentation/api/schemas.py
from pydantic import BaseModel, Field
from datetime import datetime
from enum import Enum
class DocumentStatus(str, Enum):
processing = "processing"
ready = "ready"
failed = "failed"
class DocumentResponse(BaseModel):
"""Response model for a single document."""
id: str = Field(..., description="Unique document identifier")
title: str = Field(..., description="Document title")
uploaded_at: datetime = Field(..., description="Upload timestamp")
page_count: int = Field(..., ge=0, description="Number of pages")
status: DocumentStatus = Field(..., description="Processing status")
model_config = {"from_attributes": True}
class PaginatedDocuments(BaseModel):
"""Paginated list of documents."""
items: list[DocumentResponse]
total: int = Field(..., ge=0, description="Total matching documents")
limit: int = Field(..., ge=1, le=100)
offset: int = Field(..., ge=0)
Use these models in your route definitions:
# src/presentation/api/documents_api.py
from fastapi import APIRouter, UploadFile, Query
from typing import Literal
router = APIRouter(prefix="/documents", tags=["documents"])
@router.get("", response_model=PaginatedDocuments)
async def list_documents(
resource_type: Literal["articles", "drafts"] = Query(...),
limit: int = Query(default=10, ge=1, le=100),
offset: int = Query(default=0, ge=0),
) -> PaginatedDocuments:
"""List documents with pagination."""
# Implementation...
pass
@router.post("", response_model=DocumentResponse, status_code=201)
async def upload_document(
file: UploadFile,
title: str = Query(..., min_length=1, max_length=255),
resource_type: Literal["articles", "drafts"] = Query(...),
language: str = Query(default="en", pattern="^[a-z]{2}$"),
) -> DocumentResponse:
"""Upload a new document for processing."""
# Implementation...
pass
FastAPI exposes the schema at /openapi.json. Every Pydantic field, type hint, enum, and docstring becomes part of the schema:
{
"openapi": "3.0.0",
"paths": {
"/documents": {
"get": {
"parameters": [
{
"name": "resource_type",
"in": "query",
"required": true,
"schema": { "enum": ["articles", "drafts"] }
}
],
"responses": {
"200": {
"content": {
"application/json": { "schema": { "$ref": "#/components/schemas/PaginatedDocuments" } }
}
}
}
}
}
},
"components": {
"schemas": {
"DocumentResponse": {
"properties": {
"id": { "type": "string" },
"status": { "enum": ["processing", "ready", "failed"] }
}
}
}
}
}
Step 2: Generate TypeScript Types from OpenAPI
With the schema exposed, we use openapi-typescript to generate TypeScript interfaces. This library parses OpenAPI 3.x schemas and outputs accurate type definitions:
// scripts/generate-types.mjs
import fs from 'node:fs';
import path from 'node:path';
import openapiTS, { astToString } from 'openapi-typescript';
const SCHEMA_URL = process.env.API_URL || 'http://localhost:8421';
const OUTPUT_PATH = './src/lib/api/types.generated.ts';
async function generateTypes() {
console.log(`Fetching OpenAPI schema from ${SCHEMA_URL}/openapi.json`);
const response = await fetch(`${SCHEMA_URL}/openapi.json`);
if (!response.ok) {
throw new Error(`Failed to fetch schema: ${response.status}`);
}
const schema = await response.json();
// Generate TypeScript AST from OpenAPI schema
const ast = await openapiTS(schema, {
exportType: true,
alphabetize: true,
});
// Convert AST to string
const output = astToString(ast);
// Add header comment
const header = `// AUTO-GENERATED — DO NOT EDIT
// Generated from ${SCHEMA_URL}/openapi.json
// Run: npm run generate:types\n\n`;
fs.mkdirSync(path.dirname(OUTPUT_PATH), { recursive: true });
fs.writeFileSync(OUTPUT_PATH, header + output, 'utf-8');
console.log(`Generated types at ${OUTPUT_PATH}`);
}
generateTypes().catch(console.error);
Step 3: Integrate into Build Pipeline
Add scripts to regenerate types before dev and build:
{
"scripts": {
"generate:types": "node scripts/generate-types.mjs",
"dev": "npm run generate:types && vite",
"build": "npm run generate:types && tsc && vite build"
},
"devDependencies": {
"openapi-typescript": "^7.4.0"
}
}
Tip: Run
npm run generate:typesafter any backend schema change. In CI, generate types as a pre-build step to catch mismatches before deployment.
Step 4: Use Generated Types in Your Client
The generated types mirror your Pydantic models exactly. Here's what gets generated:
// src/lib/api/types.generated.ts (auto-generated)
export interface paths {
'/documents': {
get: {
parameters: {
query: {
resource_type: 'articles' | 'drafts';
limit?: number;
offset?: number;
};
};
responses: {
200: {
content: {
'application/json': components['schemas']['PaginatedDocuments'];
};
};
};
};
post: {
requestBody: {
content: {
'multipart/form-data': {
file: Blob;
title: string;
language?: string;
};
};
};
responses: {
201: {
content: {
'application/json': components['schemas']['Document'];
};
};
};
};
};
}
export interface components {
schemas: {
Document: {
id: string;
title: string;
uploaded_at: string;
page_count: number;
status: 'processing' | 'ready' | 'failed';
};
PaginatedDocuments: {
items: components['schemas']['Document'][];
total: number;
limit: number;
offset: number;
};
};
}
Step 5: Build a Type-Safe API Client
With generated types, your API client gets full type safety:
// src/lib/api/client.ts
import type { paths, components } from './types.generated';
type Document = components['schemas']['Document'];
type PaginatedDocuments = components['schemas']['PaginatedDocuments'];
const API_BASE = import.meta.env.VITE_API_URL || 'http://localhost:8421';
class ApiClient {
private token: string | null = null;
async request<T>(path: string, options: RequestInit = {}): Promise<T> {
const headers: HeadersInit = {
...options.headers,
};
if (this.token) {
headers['Authorization'] = `Bearer ${this.token}`;
}
const response = await fetch(`${API_BASE}${path}`, {
...options,
headers,
credentials: 'include', // For httpOnly cookies
});
if (!response.ok) {
if (response.status === 401) {
// Handle token refresh
await this.refreshToken();
return this.request<T>(path, options);
}
throw new ApiError(response.status, await response.text());
}
return response.json();
}
async getDocuments(resourceType: 'articles' | 'drafts', limit = 10, offset = 0): Promise<PaginatedDocuments> {
const params = new URLSearchParams({
resource_type: resourceType,
limit: String(limit),
offset: String(offset),
});
return this.request<PaginatedDocuments>(`/documents?${params}`);
}
async uploadDocument(resourceType: string, file: File, title: string): Promise<Document> {
const formData = new FormData();
formData.append('file', file);
formData.append('title', title);
formData.append('resource_type', resourceType);
return this.request<Document>('/documents', {
method: 'POST',
body: formData,
});
}
private async refreshToken(): Promise<void> {
const response = await fetch(`${API_BASE}/auth/refresh`, {
method: 'POST',
credentials: 'include',
});
if (!response.ok) {
throw new AuthError('Session expired');
}
const { access_token } = await response.json();
this.token = access_token;
}
}
export const apiClient = new ApiClient();
Key insight: By generating types from OpenAPI, any backend schema change triggers TypeScript compilation errors in your frontend. You'll catch mismatches at build time, not runtime.
Domain Layer: The Untouchable Core
The domain layer contains your business logic and has zero external dependencies. This is non-negotiable.
Defining Ports (Interfaces)
Ports are abstract interfaces that define what your application needs, not how it's implemented:
# src/domain/ports/vector_store.py
from abc import ABC, abstractmethod
from dataclasses import dataclass
from uuid import UUID
from typing import Any
from ..entities.chunks import Chunk, EmbeddedChunk, ScoredChunk
@dataclass
class VectorStore(ABC):
"""Abstract interface for vector database operations."""
@abstractmethod
def store_chunks(self, chunks: list[EmbeddedChunk]) -> None:
"""Store embedded chunks in the vector database."""
pass
@abstractmethod
def query_similar(
self,
embedding: list[float],
k: int,
filter_metadata: dict[str, Any] | None = None,
document_ids: list[UUID] | None = None,
) -> list[ScoredChunk]:
"""Find k most similar chunks to the given embedding.
Pre-filtering by document_ids happens BEFORE similarity search
for performance (equivalent to FAISS IDSelectorArray).
"""
pass
@abstractmethod
def delete_chunks_by_document_id(self, document_id: UUID) -> bool:
"""Delete all chunks belonging to a specific document."""
pass
Notice the docstrings explain what operations do, not how. The domain knows nothing about Qdrant, FAISS, or any specific database.
Defining Entities
Entities are your core business objects with validation logic:
# src/domain/entities/chunks.py
from dataclasses import dataclass
from uuid import UUID
@dataclass(frozen=True)
class TextChunk:
"""Immutable text chunk with position metadata."""
content: str
chunk_index: int
start_char: int
end_char: int
@dataclass
class Chunk:
"""A text chunk with document lineage."""
id: UUID
page_id: UUID
document_id: UUID
text_chunk: TextChunk
metadata: dict[str, Any]
@dataclass
class EmbeddedChunk:
"""A chunk with its embedding vector."""
chunk: Chunk
embedding_vector: tuple[float, ...] # Immutable for hashability
@dataclass
class ScoredChunk:
"""A chunk with similarity score from vector search."""
chunk: Chunk
score: float
Using dataclass(frozen=True) for TextChunk ensures immutability - critical for debugging and caching.
Infrastructure Layer: Swappable Implementations
Now we implement the domain ports with concrete infrastructure. The key is that infrastructure knows about domain interfaces, but domain never imports infrastructure.
Qdrant Vector Store Implementation
Qdrant's filtering capabilities are essential for multi-tenant RAG systems. We use payload indexes to enable efficient pre-filtering before vector search:
# src/infrastructure/vector_stores/qdrant_vector_store.py
from uuid import UUID
from typing import Any
from qdrant_client import QdrantClient
from qdrant_client.models import (
Distance, FieldCondition, Filter, MatchAny,
MatchValue, PointStruct, VectorParams,
)
from src.domain.ports.vector_store import VectorStore
from src.domain.entities.chunks import Chunk, EmbeddedChunk, ScoredChunk
class QdrantVectorStore(VectorStore):
"""Production Qdrant implementation with pre-filtering."""
def __init__(
self,
client: QdrantClient,
collection_name: str = "document_chunks",
dimension: int = 1024,
):
self.client = client
self.collection_name = collection_name
self.dimension = dimension
self._ensure_collection_exists()
def _ensure_collection_exists(self) -> None:
"""Create collection with payload indexes for filtering."""
collections = self.client.get_collections().collections
if not any(c.name == self.collection_name for c in collections):
self.client.create_collection(
collection_name=self.collection_name,
vectors_config=VectorParams(
size=self.dimension,
distance=Distance.COSINE
),
)
# Create indexes for efficient filtering
self._create_payload_indexes()
def _create_payload_indexes(self) -> None:
"""Without indexes, Qdrant scans all payloads - extremely slow at scale."""
for field, schema in [
("document_id", "keyword"),
("page_id", "keyword"),
("chunk_index", "integer"),
]:
self.client.create_payload_index(
collection_name=self.collection_name,
field_name=field,
field_schema=schema,
)
def store_chunks(self, chunks: list[EmbeddedChunk]) -> None:
"""Batch upsert with wait=True for data pipeline consistency."""
if not chunks:
return
points = [
PointStruct(
id=str(chunk.id),
vector=list(chunk.embedding_vector),
payload={
"chunk_id": str(chunk.id),
"document_id": str(chunk.chunk.document_id),
"chunk_text": chunk.chunk.text_chunk.content,
"chunk_index": chunk.chunk.text_chunk.chunk_index,
**chunk.chunk.metadata,
}
)
for chunk in chunks
]
self.client.upsert(
collection_name=self.collection_name,
points=points,
wait=True, # Critical: ensures consistency before returning
)
def query_similar(
self,
embedding: list[float],
k: int,
filter_metadata: dict[str, Any] | None = None,
document_ids: list[UUID] | None = None,
) -> list[ScoredChunk]:
"""Pre-filter by document_ids before vector search."""
qdrant_filter = None
if document_ids or filter_metadata:
must_conditions = []
if document_ids:
# Pre-filter to specific documents (like FAISS IDSelectorArray)
must_conditions.append(
FieldCondition(
key="document_id",
match=MatchAny(any=[str(did) for did in document_ids]),
)
)
if filter_metadata:
for field, value in filter_metadata.items():
must_conditions.append(
FieldCondition(key=field, match=MatchValue(value=value))
)
qdrant_filter = Filter(must=must_conditions)
results = self.client.search(
collection_name=self.collection_name,
query_vector=embedding,
query_filter=qdrant_filter,
limit=k,
with_payload=True,
)
return [
ScoredChunk(
chunk=self._payload_to_chunk(point.payload),
score=point.score
)
for point in results
if point.payload
]
Pitfall: Always create payload indexes in Qdrant. Without them, filtering requires scanning every payload - turning O(log n) searches into O(n).
Ollama Embedder Implementation
We use mxbai-embed-large for embeddings — a 334M parameter model that produces 1024-dimensional vectors with excellent retrieval quality. Ollama's Python client provides a clean interface:
# src/infrastructure/embeddings/ollama_embedder.py
import logging
import ollama
from src.domain.entities.chunks import Chunk, EmbeddedChunk
from src.domain.ports.embedder import Embedder, EmbeddingError
logger = logging.getLogger(__name__)
class OllamaEmbedder(Embedder):
"""Self-hosted embedding generation via Ollama."""
def __init__(
self,
base_url: str = "http://localhost:11434",
model_name: str = "mxbai-embed-large",
):
self._model_name = model_name
self._client = ollama.Client(host=base_url)
logger.info(f"Initialized OllamaEmbedder with '{model_name}'")
def embed(self, chunks: list[Chunk]) -> list[EmbeddedChunk]:
"""Generate embeddings for a batch of chunks."""
if not chunks:
return []
try:
embedded_chunks = []
for chunk in chunks:
# Ollama processes single inputs
response = self._client.embeddings(
model=self._model_name,
prompt=chunk.text_chunk.content
)
embedding_vector = tuple(response["embedding"])
embedded_chunks.append(
EmbeddedChunk(chunk=chunk, embedding_vector=embedding_vector)
)
logger.info(
f"Generated {len(embedded_chunks)} embeddings with "
f"{len(embedded_chunks[0].embedding_vector)} dimensions"
)
return embedded_chunks
except Exception as e:
logger.error(f"Embedding failed: {e}")
raise EmbeddingError(f"Embedding generation failed: {e}") from e
Tip: For production, implement batch embedding with rate limiting. Ollama's API processes one input at a time, so parallelization requires careful orchestration.
Dependency Injection with Lagom
Lagom is a lightweight DI container that integrates seamlessly with FastAPI through its FastApiIntegration. Unlike heavier alternatives, Lagom uses Python's type hints for automatic dependency resolution:
# src/lagom_dependencies.py
from pathlib import Path
from lagom import Container
from lagom.integrations.fast_api import FastApiIntegration
from qdrant_client import QdrantClient
# Domain ports
from src.domain.ports.vector_store import VectorStore
from src.domain.ports.embedder import Embedder
from src.domain.ports.retriever import Retriever
from src.domain.ports.llm_client import LLMClient
from src.domain.repositories.document_repository import DocumentRepository
# Infrastructure implementations
from src.infrastructure.vector_stores.qdrant_vector_store import QdrantVectorStore
from src.infrastructure.embeddings.ollama_embedder import OllamaEmbedder
from src.infrastructure.retrievers import (
HybridRetriever, VectorStoreRetriever, KeywordRetriever
)
from src.infrastructure.llm_clients.ollama_client import OllamaClient
from src.infrastructure.config.settings import Settings
def create_container(settings: Settings | None = None) -> Container:
"""Create and configure the Lagom DI container."""
container = Container()
if settings is None:
settings = Settings()
container[Settings] = settings
# Shared Qdrant client (single connection pool)
qdrant_client = QdrantClient(
host=settings.qdrant_host,
port=settings.qdrant_port
)
# Register implementations for domain ports
container[Embedder] = OllamaEmbedder(
base_url=settings.ollama_base_url,
model_name="mxbai-embed-large"
)
container[VectorStore] = QdrantVectorStore(
client=qdrant_client,
collection_name=f"{settings.collection_prefix}chunks",
dimension=1024,
)
container[LLMClient] = lambda: OllamaClient(
settings.ollama_base_url,
settings.ollama_model
)
# Hybrid retriever combining semantic + keyword search
def _provide_retriever(c: Container) -> Retriever:
vector_retriever = VectorStoreRetriever(
embedder=c[Embedder],
vector_store=c[VectorStore],
k=10,
)
keyword_retriever = KeywordRetriever(
keyword_index=c[KeywordSearchIndex],
k=10,
)
return HybridRetriever(
dense_retriever=vector_retriever,
sparse_retriever=keyword_retriever,
dense_k=10,
sparse_k=10,
max_total_results=15,
)
container[Retriever] = _provide_retriever
return container
def setup_lagom_dependencies() -> FastApiIntegration:
"""Configure dependencies for FastAPI."""
container = create_container()
return FastApiIntegration(container)
# FastAPI integration handle
deps = setup_lagom_dependencies()
Using it in API endpoints:
# src/presentation/api/chat_api.py
from fastapi import APIRouter, Depends
from fastapi.responses import StreamingResponse
from src.lagom_dependencies import deps
from src.use_cases.chat_conversation import SendChatMessage
router = APIRouter(prefix="/chat", tags=["chat"])
@router.post("/message")
async def send_message(
message: str,
use_case: SendChatMessage = deps.depends(SendChatMessage),
) -> StreamingResponse:
"""Send a message and receive a streaming response."""
result = await use_case.execute(message)
return StreamingResponse(
result.response_stream,
media_type="text/plain"
)
Pitfall: Don't instantiate dependencies inside functions. Use
deps.depends()to let Lagom manage the lifecycle. This ensures proper connection pooling and resource cleanup.
Hybrid Retrieval: Dense + Sparse Search
For RAG applications, pure semantic search often misses exact keyword matches (acronyms, technical terms). Hybrid search combines both approaches. While Qdrant supports native hybrid queries with sparse vectors, we implement a composition pattern that allows swapping retrieval strategies:
# src/infrastructure/retrievers/hybrid_retriever.py
from uuid import UUID
from src.domain.entities import ScoredChunk, SourceDocument
from src.domain.ports.retriever import RetrievalResult, Retriever
class HybridRetriever(Retriever):
"""Combines dense (semantic) and sparse (keyword) retrieval."""
def __init__(
self,
dense_retriever: Retriever,
sparse_retriever: Retriever,
dense_k: int = 10,
sparse_k: int = 10,
max_total_results: int = 15,
):
self._dense = dense_retriever
self._sparse = sparse_retriever
self._dense_k = dense_k
self._sparse_k = sparse_k
self._max_results = max_total_results
def retrieve(
self,
query: str,
document_ids: list[UUID] | None = None
) -> RetrievalResult:
"""Retrieve using both methods, merge, and deduplicate."""
if not query.strip():
return RetrievalResult(chunks=[], sources=[])
# Run both retrievers (could parallelize with asyncio)
dense_results = self._dense.retrieve(query, document_ids)
sparse_results = self._sparse.retrieve(query, document_ids)
# Merge with dense priority (semantic relevance first)
combined = self._merge_and_deduplicate(
dense_results.chunks[:self._dense_k],
sparse_results.chunks[:self._sparse_k],
)
# Deduplicate source documents
sources = self._deduplicate_sources(
dense_results.sources + sparse_results.sources
)
return RetrievalResult(
chunks=combined[:self._max_results],
sources=sources
)
def _merge_and_deduplicate(
self,
dense_chunks: list[ScoredChunk],
sparse_chunks: list[ScoredChunk],
) -> list[ScoredChunk]:
"""Dense results first, then unique sparse results."""
seen_ids = {c.chunk.id for c in dense_chunks}
merged = list(dense_chunks)
for chunk in sparse_chunks:
if chunk.chunk.id not in seen_ids:
merged.append(chunk)
return merged
Insight: BM25 (sparse) excels at exact matches like "HIPAA compliance" while embeddings (dense) capture semantic similarity. The combination provides better recall than either alone.
React Frontend with TanStack
Streaming Responses: Why Mutations Need Manual Handling
TanStack Query excels at caching request/response pairs, but streaming responses don't fit this model. The library's mental model assumes requests return complete data that can be cached and invalidated. Here's why streaming needs special handling:
Why useQuery doesn't work for streaming:
- Queries expect a single resolved value to cache
- Streams are continuous data flows without a "final" value
- Caching partial streams creates inconsistent states
Why useMutation needs manual handling:
- Mutations are designed for fire-and-forget operations
- They don't have built-in support for progressive updates
- The
onSuccesscallback only fires after the entire response completes
The Solution: Manual Fetch with State Management
// src/hooks/use-chat-stream.ts
import { useState, useCallback, useRef } from 'react';
import { useQueryClient } from '@tanstack/react-query';
import type { SourceDocument, ChatMessage } from '@/lib/api/types.generated';
interface StreamState {
isStreaming: boolean;
content: string;
sources: SourceDocument[];
error: Error | null;
}
interface ChatStreamEvent {
event: 'sources' | 'content' | 'done' | 'error';
data?: string | SourceDocument[];
}
export function useChatStream(sessionId: string) {
const queryClient = useQueryClient();
const abortControllerRef = useRef<AbortController | null>(null);
const [state, setState] = useState<StreamState>({
isStreaming: false,
content: '',
sources: [],
error: null,
});
const sendMessage = useCallback(
async (message: string) => {
// Cancel any existing stream
abortControllerRef.current?.abort();
abortControllerRef.current = new AbortController();
setState({
isStreaming: true,
content: '',
sources: [],
error: null,
});
try {
const response = await fetch('/api/chat/message', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ session_id: sessionId, message }),
signal: abortControllerRef.current.signal,
});
if (!response.ok) {
throw new Error(`HTTP ${response.status}`);
}
const reader = response.body?.getReader();
if (!reader) throw new Error('No response body');
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
// Parse Server-Sent Events format
const lines = buffer.split('\n');
buffer = lines.pop() || ''; // Keep incomplete line in buffer
for (const line of lines) {
if (!line.startsWith('data: ')) continue;
const event: ChatStreamEvent = JSON.parse(line.slice(6));
switch (event.event) {
case 'sources':
setState((prev) => ({
...prev,
sources: event.data as SourceDocument[],
}));
break;
case 'content':
setState((prev) => ({
...prev,
content: prev.content + (event.data as string),
}));
break;
case 'done':
// Invalidate chat history to include new message
queryClient.invalidateQueries({
queryKey: ['chat', 'history', sessionId],
});
break;
}
}
}
} catch (error) {
if ((error as Error).name !== 'AbortError') {
setState((prev) => ({
...prev,
error: error as Error,
}));
}
} finally {
setState((prev) => ({ ...prev, isStreaming: false }));
}
},
[sessionId, queryClient],
);
const cancel = useCallback(() => {
abortControllerRef.current?.abort();
}, []);
return {
...state,
sendMessage,
cancel,
};
}
Using the Stream Hook in Components
// src/components/chat/chat-interface.tsx
import { useChatStream } from '@/hooks/use-chat-stream';
import { useQuery } from '@tanstack/react-query';
import { chatKeys, fetchChatHistory } from '@/queries/chat';
export function ChatInterface({ sessionId }: { sessionId: string }) {
const { content, sources, isStreaming, error, sendMessage, cancel } = useChatStream(sessionId);
// Cached chat history (non-streaming)
const { data: history } = useQuery({
queryKey: chatKeys.history(sessionId),
queryFn: () => fetchChatHistory(sessionId),
});
const handleSubmit = (message: string) => {
sendMessage(message);
};
return (
<div className='flex flex-col h-full'>
{/* Message history from cache */}
<div className='flex-1 overflow-y-auto'>
{history?.messages.map((msg) => (
<ChatMessage key={msg.id} message={msg} />
))}
{/* Live streaming message */}
{isStreaming && (
<div className='animate-pulse'>
{sources.length > 0 && <SourcesList sources={sources} />}
<MarkdownContent content={content} />
</div>
)}
</div>
<ChatInput onSubmit={handleSubmit} disabled={isStreaming} onCancel={isStreaming ? cancel : undefined} />
</div>
);
}
Key insight: Keep streaming state local to the component, but invalidate cached queries when the stream completes. This hybrid approach gives you real-time updates during streaming and proper cache consistency afterward.
Query Client Configuration
// src/lib/query-client.ts
import { QueryClient } from '@tanstack/react-query';
export const queryClient = new QueryClient({
defaultOptions: {
queries: {
staleTime: 1000 * 60 * 5, // 5 minutes
gcTime: 1000 * 60 * 30, // 30 minutes (formerly cacheTime)
retry: 1,
refetchOnWindowFocus: false,
},
mutations: {
retry: 0,
},
},
});
Query Key Factory Pattern
Structured query keys enable surgical cache invalidation:
// src/queries/documents.ts
import { keepPreviousData, useMutation, useQuery, useQueryClient } from '@tanstack/react-query';
import { fetchDocuments, uploadDocument, deleteDocument } from '@/lib/api/documents';
// Query key factory - enables precise cache invalidation
export const documentKeys = {
all: ['documents'] as const,
lists: () => [...documentKeys.all, 'list'] as const,
list: (type: string, page: number, limit: number, filters?: object) =>
[...documentKeys.lists(), type, page, limit, filters] as const,
};
// Paginated query with keepPreviousData to prevent UI flash
export function useDocuments(
resourceType: 'articles' | 'drafts',
page: number,
limit = 10,
filters?: DocumentFilters,
) {
const offset = (page - 1) * limit;
return useQuery({
queryKey: documentKeys.list(resourceType, page, limit, filters),
queryFn: () => fetchDocuments(resourceType, limit, offset, filters),
placeholderData: keepPreviousData, // Smooth pagination
});
}
// Upload with automatic cache invalidation
export function useUploadDocument() {
const queryClient = useQueryClient();
return useMutation({
mutationFn: ({ resourceType, file, title, language = 'en' }) =>
uploadDocument(resourceType, file, title, language),
onSuccess: (_, variables) => {
// Invalidate all document lists to refetch
queryClient.invalidateQueries({
queryKey: documentKeys.lists(),
});
},
});
}
Tip: Use
keepPreviousDatafor paginated queries. It shows stale data while fetching, preventing jarring loading states during page navigation.
Data Table with TanStack Table and shadcn/ui
// src/components/documents/document-table.tsx
import { Table, TableBody, TableCell, TableHead, TableHeader, TableRow } from '@/components/ui/table';
import { Checkbox } from '@/components/ui/checkbox';
import { Button } from '@/components/ui/button';
import { Badge } from '@/components/ui/badge';
import { Trash2, Download, Loader2 } from 'lucide-react';
interface DocumentTableProps {
documents: Document[];
total: number;
currentPage: number;
pageSize: number;
onPageChange: (page: number) => void;
onDelete: (ids: string[]) => void;
isLoading?: boolean;
isFetching?: boolean;
selectedIds: Set<string>;
onSelectionChange: (ids: Set<string>) => void;
}
export function DocumentTable({
documents,
total,
currentPage,
pageSize,
onPageChange,
onDelete,
isLoading,
isFetching,
selectedIds,
onSelectionChange,
}: DocumentTableProps) {
const totalPages = Math.ceil(total / pageSize);
const toggleSelection = (id: string) => {
const newSet = new Set(selectedIds);
if (newSet.has(id)) {
newSet.delete(id);
} else {
newSet.add(id);
}
onSelectionChange(newSet);
};
const allCurrentPageSelected = documents.length > 0 && documents.every((d) => selectedIds.has(d.id));
const toggleAllOnPage = () => {
const newSet = new Set(selectedIds);
if (allCurrentPageSelected) {
documents.forEach((d) => newSet.delete(d.id));
} else {
documents.forEach((d) => newSet.add(d.id));
}
onSelectionChange(newSet);
};
if (isLoading) {
return <DocumentTableSkeleton />;
}
return (
<div className='space-y-4'>
{/* Selection actions */}
<div className='flex items-center justify-between'>
<div className='flex items-center gap-2'>
{selectedIds.size > 0 && <Badge variant='secondary'>{selectedIds.size} selected</Badge>}
</div>
<Button
variant='destructive'
size='sm'
onClick={() => onDelete(Array.from(selectedIds))}
disabled={selectedIds.size === 0}>
<Trash2 className='mr-2 h-4 w-4' />
Delete selected
</Button>
</div>
{/* Table with opacity transition during refetch */}
<div className={`rounded-md border transition-opacity ${isFetching && !isLoading ? 'opacity-60' : ''}`}>
<Table>
<TableHeader>
<TableRow>
<TableHead className='w-12'>
<Checkbox
checked={allCurrentPageSelected}
onCheckedChange={toggleAllOnPage}
aria-label='Select all'
/>
</TableHead>
<TableHead>Document</TableHead>
<TableHead>Date</TableHead>
</TableRow>
</TableHeader>
<TableBody>
{documents.map((doc) => (
<TableRow key={doc.id}>
<TableCell>
<Checkbox
checked={selectedIds.has(doc.id)}
onCheckedChange={() => toggleSelection(doc.id)}
/>
</TableCell>
<TableCell className='font-medium'>{doc.title}</TableCell>
<TableCell>{formatDate(doc.uploaded_at)}</TableCell>
</TableRow>
))}
</TableBody>
</Table>
</div>
<Pagination currentPage={currentPage} totalPages={totalPages} onPageChange={onPageChange} />
</div>
);
}
Streaming Responses: The Right Way
For chat interfaces, streaming responses are essential for perceived performance. We use Server-Sent Events (SSE) with FastAPI's StreamingResponse. Here's the pattern:
# src/use_cases/generate_answer_with_sources.py
from collections.abc import AsyncIterator
from dataclasses import dataclass
from typing import Literal
from src.domain.ports import Generator, Retriever
@dataclass
class SourcesEvent:
data: list[SourceDocument]
event: Literal["sources"] = "sources"
@dataclass
class ContentEvent:
data: str
event: Literal["content"] = "content"
@dataclass
class DoneEvent:
event: Literal["done"] = "done"
ChatStreamEvent = SourcesEvent | ContentEvent | DoneEvent
class GenerateAnswerWithSources:
"""Orchestrates RAG with streaming response."""
def __init__(self, retriever: Retriever, generator: Generator):
self._retriever = retriever
self._generator = generator
async def execute(self, query: str) -> AsyncIterator[ChatStreamEvent]:
"""
1. Retrieve sources → emit sources event
2. Format augmented prompt
3. Stream content events from LLM
4. Emit completion event
"""
# 1. Retrieve and emit sources first
result = self._retriever.retrieve(query)
yield SourcesEvent(data=result.sources)
# 2. Format prompt with retrieved context
prompt = self._format_prompt(query, result)
# 3. Stream LLM response
async for chunk in self._generator.generate(prompt):
yield ContentEvent(data=chunk)
# 4. Signal completion
yield DoneEvent()
Pitfall: Always "prime" the stream before returning a
StreamingResponse. This catches LLM errors before headers are sent:
@router.post("/message")
async def send_message(message: str, use_case: SendChatMessage):
stream = await use_case.execute(message)
# Prime the stream to catch errors early
try:
first_chunk = await stream.__anext__()
except Exception as e:
raise HTTPException(status_code=502, detail=str(e))
async def primed_stream():
if first_chunk is not None:
yield first_chunk
async for chunk in stream:
yield chunk
return StreamingResponse(primed_stream(), media_type="text/plain")
Common Pitfalls and Solutions
1. Qdrant Collection Not Found
Problem: "Collection 'chunks' doesn't exist" on first query.
Solution: Always ensure collections exist on startup with retry logic:
def _ensure_collection_exists(self) -> None:
max_retries = 5
retry_delay = 1
for attempt in range(max_retries):
try:
collections = self.client.get_collections().collections
if not any(c.name == self.collection_name for c in collections):
self.client.create_collection(...)
return
except Exception:
time.sleep(retry_delay)
retry_delay *= 2
raise VectorStoreError("Failed to create collection")
2. Embedding Dimension Mismatch
Problem: "Vector dimension mismatch: expected 1024, got 768"
Solution: Match embedding model to vector store config:
# Both must use same dimension
container[Embedder] = OllamaEmbedder(model_name="mxbai-embed-large") # 1024 dims
container[VectorStore] = QdrantVectorStore(dimension=1024)
3. TanStack Query Cache Staleness
Problem: Uploaded document doesn't appear in list.
Solution: Use query key factory for precise invalidation:
// Invalidate all lists, not just current page
queryClient.invalidateQueries({
queryKey: documentKeys.lists(), // Matches all list queries
});
4. Lagom Circular Dependencies
Problem: "Circular dependency detected"
Solution: Use factory functions for late binding:
# Instead of direct instantiation
container[Retriever] = _provide_retriever # Factory function
def _provide_retriever(c: Container) -> Retriever:
return HybridRetriever(
dense_retriever=VectorStoreRetriever(
embedder=c[Embedder], # Resolved at call time
vector_store=c[VectorStore],
),
...
)
Benefits and Tradeoffs
Benefits
- Swappability: Replace
QdrantwithPineconewithout touching use cases - Testability: Mock domain ports for fast unit tests
- Onboarding: New developers understand architecture in hours, not days
- Local-first: Full stack runs on a laptop without cloud dependencies
- Type safety: End-to-end TypeScript/Python typing catches errors at compile time
Tradeoffs
- Initial complexity: More files and abstractions than a monolith
- Learning curve: Team must understand DI and Clean Architecture
- Overhead: Lagom adds ~2ms per request for resolution (negligible)
- Self-hosted maintenance: Ollama requires GPU resources and model updates
Key Takeaways
- Invert dependencies: Domain defines interfaces; infrastructure implements them
- Use Lagom for FastAPI: Lightweight DI with native integration
- Pre-filter in Qdrant: Always create payload indexes for filtering
- Hybrid retrieval wins: Combine semantic and keyword search for better recall
- Prime streaming responses: Catch LLM errors before sending headers
- Query key factories: Enable surgical cache invalidation in TanStack Query
Ready to build your own production RAG system? Start with the domain layer - define your ports and entities before writing a single line of infrastructure code. The upfront investment pays dividends in maintainability and confidence.
For more on sharing types between frontend and backend in a monorepo, check out our guide on Type-Safe Shared Packages with Turborepo.