Article crafted from experience, then written down using AI β
Retrieval-Augmented Generation (RAG) grounds an LLMβs answer in retrieved documents, preventing hallucination and enabling access to private or current knowledge.
Why βhybridβ? No single retrieval method is optimal for all queries:
| Retrieval Type | Strengths | Weaknesses |
|---|---|---|
| Dense (vector) | Semantic similarity, paraphrase matching, multilingual | Misses exact keyword matches; computationally expensive to build |
| Lexical (BM25/FTS) | Exact term matches, product codes, names, IDs | No semantic understanding; vocabulary mismatch problem |
| Hybrid (both) | Best of both worlds | Requires merging and reranking stages |
The production-grade pipeline adds two further stages after retrieval:
| Layer | Technology | Notes |
|---|---|---|
| LLM | GPT-5 (gpt-5) |
Temperature = 0 for grounded QA |
| Embedding | text-embedding-3-large |
Dimension = 3072 |
| Vector DB | PostgreSQL + pgvector | HNSW index for ANN at 10M+ scale |
| Lexical Search | PostgreSQL Full Text Search | BM25-approximate via ts_rank; see Β§3.2 |
| Hybrid Merge | Reciprocal Rank Fusion (RRF) | k=60 is the standard constant |
| Reranking | Cohere rerank-v3.5 |
Cross-encoder; far higher precision than bi-encoder |
| Orchestration | Python 3.12+ / LangChain LCEL | Two variants covered |
| Storage | PostgreSQL (chunks + metadata) | Single store for simplicity |
Chunks are embedded into high-dimensional vectors. At query time, the query is also embedded, and we find the nearest neighbours by cosine similarity.
Query: "What causes inflation?"
β
Embedding Model
β
[0.12, -0.87, 0.44 ... ] (3072-dim vector)
β
pgvector HNSW index
β
Approximate Nearest Neighbours
β
Top-100 semantically similar chunks
pgvector operators:
| Operator | Metric |
|---|---|
<=> |
Cosine distance (use for normalised embeddings) |
<-> |
L2 (Euclidean) distance |
<#> |
Negative inner product |
For text-embedding-3-large, use cosine distance (<=>).
HNSW vs IVFFlat:
| Index | 10M Scale | Build Time | Query Speed |
|---|---|---|---|
| HNSW | β Recommended | Slower | Very fast, high recall |
| IVFFlat | Needs careful tuning | Faster | Acceptable |
β οΈ Important clarification: The original material references βpg_textsearch (BM25)β. This needs disambiguation:
PostgreSQLβs built-in FTS (tsvector / tsquery / ts_rank) is NOT true BM25. It uses a scoring function that approximates relevance ranking but does not implement the full BM25 formula (which accounts for term frequency saturation and document length normalisation).
Options for true BM25 in PostgreSQL:
| Extension | True BM25? | Notes |
|---|---|---|
| pg_bm25 / ParadeDB | β Yes | CREATE INDEX USING bm25; Tantivy-based |
| PostgreSQL built-in FTS | β No (approximation) | ts_rank β fine for most use cases |
| Elasticsearch | β Yes | External service; operationally heavier |
For this guide, we cover both:
ts_rank) β simpler, no extra extensionHow PostgreSQL FTS works:
-- Text β tsvector (lexemes)
SELECT to_tsvector('english', 'The quick brown fox jumps');
-- Result: 'brown':3 'fox':4 'jump':5 'quick':2
-- Query parsing
SELECT plainto_tsquery('english', 'quick fox');
-- Result: 'quick' & 'fox'
-- Match + rank
SELECT chunk_text, ts_rank(search_vector, plainto_tsquery('english', 'quick fox')) AS score
FROM document_chunks
WHERE search_vector @@ plainto_tsquery('english', 'quick fox')
ORDER BY score DESC;
True BM25 with pg_bm25 (ParadeDB):
-- Install extension (ParadeDB distribution)
CREATE EXTENSION IF NOT EXISTS pg_bm25;
-- Create BM25 index
CREATE INDEX chunks_bm25_idx ON document_chunks
USING bm25(id, chunk_text)
WITH (key_field='id', text_fields='{"chunk_text": {}}');
-- BM25 search
SELECT id, chunk_text, paradedb.score(id)
FROM document_chunks
WHERE chunk_text @@@ 'quick fox'
ORDER BY paradedb.score(id) DESC
LIMIT 100;
RRF is a rank aggregation algorithm that combines multiple ranked lists without needing calibrated scores.
Formula:
RRF_score(doc d) = Ξ£ 1 / (k + rank_i(d))
i
Where:
k = smoothing constant (default 60)
rank_i(d) = rank of document d in list i (1-indexed)
Why k=60? It was empirically determined in the original 2009 paper (Cormack et al.) to work well across diverse retrieval tasks. It dampens the extreme influence of the very top rank.
Example walkthrough:
Dense Results (top-5): Lexical Results (top-5):
Rank 1 β Doc A Rank 1 β Doc C
Rank 2 β Doc B Rank 2 β Doc A
Rank 3 β Doc C Rank 3 β Doc D
Rank 4 β Doc D Rank 4 β Doc E
Rank 5 β Doc E Rank 5 β Doc B
RRF scores (k=60):
Doc A: 1/(60+1) + 1/(60+2) = 0.01639 + 0.01613 = 0.03252 β Highest
Doc C: 1/(60+3) + 1/(60+1) = 0.01587 + 0.01639 = 0.03226
Doc B: 1/(60+2) + 1/(60+5) = 0.01613 + 0.01538 = 0.03151
Doc D: 1/(60+4) + 1/(60+3) = 0.01563 + 0.01587 = 0.03150
Doc E: 1/(60+5) + 1/(60+4) = 0.01538 + 0.01563 = 0.03101
Final merged order: A β C β B β D β E
After RRF, we have ~100β150 candidates. Most are relevant but not precisely ranked. A cross-encoder reranker fixes this.
Bi-encoder (embedding retrieval) vs Cross-encoder (reranker):
Bi-encoder (fast, approximate):
Query βββ Encoder βββ q_vec
Doc βββ Encoder βββ d_vec
β
cosine_sim(q_vec, d_vec)
[Encodes independently β fast but less accurate]
Cross-encoder (slow, precise):
[Query + Doc] βββ Encoder βββ Relevance Score
[Sees both together β deeply understands interaction]
The cross-encoder considers the full query-document interaction, catching nuances that bi-encoders miss. Itβs too slow for full-corpus search, but perfect for re-scoring 100β150 candidates.
Before Rerank (RRF top-10 example):
1. "France has 68 million people"
2. "Paris is the capital of France" β Should be #1
3. "Germany borders France to the east"
...
After Cohere Rerank (query: "What is the capital of France?"):
1. "Paris is the capital of France" β
2. "France has 68 million people"
3. "Germany borders France to the east"
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β INGESTION PATH β
β β
β Raw Documents β
β β β
β βΌ β
β Chunker (300β500 tokens, 20% overlap) β
β β β
β ββββββββββββββββββββββββββββ β
β β β β
β βΌ βΌ β
β OpenAI Embeddings to_tsvector() β
β text-embedding-3-large (FTS lexemes) β
β (dim=3072) β
β β β β
β ββββββββββββββββ¬ββββββββββββ β
β βΌ β
β PostgreSQL (document_chunks) β
β βββ embedding (vector/HNSW) β
β βββ search_vector (tsvector/GIN) β
β βββ metadata (JSONB) β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β QUERY PATH β
β β
β User Question β
β β β
β βΌ β
β [Optional] Query Rewrite (GPT-5) β
β β Disambiguate, expand acronyms, fix typos β
β β β
β βΌ β
β [Optional] Multi-Query Expansion β
β β Generate 3β5 variations for recall boost β
β β β
β ββββββββββββββββββββββββββββ β
β β β β
β βΌ βΌ β
β Dense Search Lexical Search β
β (pgvector HNSW) (PostgreSQL FTS) β
β Top-100 chunks Top-100 chunks β
β β β β
β ββββββββββββββββ¬ββββββββββββ β
β βΌ β
β Reciprocal Rank Fusion (RRF) β
β β β
β Top 100β150 chunks β
β β β
β βΌ β
β Cohere Rerank API (rerank-v3.5) β
β β β
β Top 10 chunks β
β β β
β βΌ β
β Context Assembly + Citations β
β β β
β βΌ β
β GPT-5 (temperature=0) β
β β β
β βΌ β
β Grounded Answer with Chunk Citations β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
| Stage | Count | Rationale |
|---|---|---|
| Dense Search candidates | 100 | High recall; ANN is fast enough |
| Lexical Search candidates | 100 | High recall for keyword matching |
| After RRF merge | 100β150 | Union of both result sets |
| After Cohere Rerank | 10β20 | High precision shortlist |
| Sent to GPT-5 prompt | 5β10 | Fits context window; avoids noise |
-- pgvector extension (must be installed on server first)
CREATE EXTENSION IF NOT EXISTS vector;
-- Optional: true BM25 via ParadeDB
-- CREATE EXTENSION IF NOT EXISTS pg_bm25;
For large corpora, maintain a three-level hierarchy:
-- Level 1: Source documents
CREATE TABLE documents (
id BIGSERIAL PRIMARY KEY,
source_uri TEXT NOT NULL,
title TEXT,
doc_type TEXT, -- 'pdf', 'html', 'docx', etc.
ingested_at TIMESTAMPTZ DEFAULT NOW(),
metadata JSONB
);
-- Level 2: Sections / headings (optional but useful for navigation)
CREATE TABLE document_sections (
id BIGSERIAL PRIMARY KEY,
document_id BIGINT REFERENCES documents(id) ON DELETE CASCADE,
section_num INT,
heading TEXT,
metadata JSONB
);
-- Level 3: Chunks (the retrieval unit)
CREATE TABLE document_chunks (
id BIGSERIAL PRIMARY KEY,
document_id BIGINT REFERENCES documents(id) ON DELETE CASCADE,
section_id BIGINT REFERENCES document_sections(id),
chunk_number INT NOT NULL,
chunk_text TEXT NOT NULL,
token_count INT,
embedding vector(3072), -- text-embedding-3-large output
search_vector TSVECTOR, -- FTS index column
metadata JSONB, -- arbitrary key-value pairs
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- ββ Vector Index (HNSW) ββββββββββββββββββββββββββββββββββββββββββ
-- HNSW is preferred over IVFFlat at 10M+ scale.
-- m=16, ef_construction=64 are good starting defaults.
-- Tune ef_search at query time for recall/latency tradeoff.
CREATE INDEX idx_chunks_embedding
ON document_chunks
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
-- ββ Full Text Search Index (GIN) ββββββββββββββββββββββββββββββββββ
CREATE INDEX idx_chunks_fts
ON document_chunks
USING gin(search_vector);
-- ββ Supporting Indexes ββββββββββββββββββββββββββββββββββββββββββββ
CREATE INDEX idx_chunks_document_id ON document_chunks(document_id);
CREATE INDEX idx_chunks_metadata ON document_chunks USING gin(metadata);
-- ββ BM25 Index (ParadeDB / pg_bm25 only) βββββββββββββββββββββββββ
-- Uncomment if using ParadeDB:
-- CREATE INDEX idx_chunks_bm25
-- ON document_chunks
-- USING bm25(id, chunk_text)
-- WITH (key_field='id', text_fields='{"chunk_text": {}}');
The two lexical retrieval paths have fundamentally different indexing mechanisms. Choose one path and apply only its schema/index/query pattern. Do not mix them.
| Β | PostgreSQL FTS (ts_rank) |
pg_bm25 / ParadeDB (true BM25) |
|---|---|---|
| Extra column needed? | β
Yes β search_vector TSVECTOR |
β No β index is on chunk_text directly |
| Trigger needed? | β
Yes β to keep search_vector in sync |
β No β index maintains itself |
| Index type | GIN on search_vector |
BM25 on chunk_text (Tantivy engine) |
| Query operator | @@ with plainto_tsquery |
@@@ with plain string |
| Scoring function | ts_rank (approximation) |
paradedb.score(id) (true BM25) |
| When to use | Simpler setup, no extra extension | When precise BM25 scoring is required |
The search_vector column is a pre-computed TSVECTOR. A trigger keeps it automatically in sync whenever chunk_text is inserted or updated.
-- ββ Trigger function ββββββββββββββββββββββββββββββββββββββββββββββ
CREATE OR REPLACE FUNCTION update_search_vector()
RETURNS TRIGGER AS $$
BEGIN
NEW.search_vector := to_tsvector('english', COALESCE(NEW.chunk_text, ''));
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
-- ββ Attach trigger to the table βββββββββββββββββββββββββββββββββββ
-- Fires BEFORE INSERT or any UPDATE that touches chunk_text.
-- No manual population needed β every write is handled automatically.
CREATE TRIGGER trg_update_search_vector
BEFORE INSERT OR UPDATE OF chunk_text
ON document_chunks
FOR EACH ROW EXECUTE FUNCTION update_search_vector();
With this trigger in place, your INSERT statement needs no to_tsvector() call β the column is filled automatically:
-- β
Clean insert β trigger handles search_vector
INSERT INTO document_chunks (document_id, chunk_number, chunk_text, embedding, metadata)
VALUES (%s, %s, %s, %s, %s);
-- β Unnecessary β don't pass search_vector manually when the trigger is active
-- INSERT INTO document_chunks (..., search_vector) VALUES (..., to_tsvector('english', %s));
Query using FTS:
SELECT
id,
chunk_text,
ts_rank(search_vector, plainto_tsquery('english', %s), 32) AS score
FROM document_chunks
WHERE search_vector @@ plainto_tsquery('english', %s)
ORDER BY score DESC
LIMIT 100;
pg_bm25 builds its BM25 index directly on the raw chunk_text column using the Tantivy search engine underneath. There is no search_vector column, no TSVECTOR, and no trigger involved at all.
-- ββ Extension (requires ParadeDB PostgreSQL distribution) βββββββββ
CREATE EXTENSION IF NOT EXISTS pg_bm25;
-- ββ BM25 index on the raw text column ββββββββββββββββββββββββββββ
-- key_field: the primary key column (for score retrieval)
-- text_fields: which columns to index for full-text BM25 search
CREATE INDEX idx_chunks_bm25
ON document_chunks
USING bm25(id, chunk_text)
WITH (key_field='id', text_fields='{"chunk_text": {}}');
With pg_bm25, inserts are identical to any normal insert β no special handling:
-- β
Normal insert β pg_bm25 index updates automatically (like any B-tree)
INSERT INTO document_chunks (document_id, chunk_number, chunk_text, embedding, metadata)
VALUES (%s, %s, %s, %s, %s);
Query using true BM25:
-- @@@ is the ParadeDB full-text match operator
-- paradedb.score(id) returns the BM25 relevance score for each row
SELECT id, chunk_text, paradedb.score(id) AS bm25_score
FROM document_chunks
WHERE chunk_text @@@ %s -- plain query string, no tsquery conversion
ORDER BY paradedb.score(id) DESC
LIMIT 100;
If using Path A (FTS), your document_chunks table includes search_vector:
CREATE TABLE document_chunks (
id BIGSERIAL PRIMARY KEY,
document_id BIGINT REFERENCES documents(id) ON DELETE CASCADE,
section_id BIGINT REFERENCES document_sections(id),
chunk_number INT NOT NULL,
chunk_text TEXT NOT NULL,
token_count INT,
embedding vector(3072),
search_vector TSVECTOR, -- β FTS path only
metadata JSONB,
created_at TIMESTAMPTZ DEFAULT NOW()
);
If using Path B (pg_bm25), drop search_vector entirely β it serves no purpose:
CREATE TABLE document_chunks (
id BIGSERIAL PRIMARY KEY,
document_id BIGINT REFERENCES documents(id) ON DELETE CASCADE,
section_id BIGINT REFERENCES document_sections(id),
chunk_number INT NOT NULL,
chunk_text TEXT NOT NULL, -- β BM25 index built directly on this
token_count INT,
embedding vector(3072),
metadata JSONB,
created_at TIMESTAMPTZ DEFAULT NOW()
);
pip install psycopg2-binary # PostgreSQL driver
pip install openai # OpenAI SDK (embeddings + GPT-5)
pip install cohere # Cohere reranking
pip install tiktoken # Token counting
pip install langchain # Optional: LCEL orchestration
pip install langchain-openai # Optional: LangChain OpenAI integration
pip install numpy # RRF computation
import os
import psycopg2
import openai
import cohere
import tiktoken
# ββ Clients βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
openai_client = openai.OpenAI(api_key=os.environ["OPENAI_API_KEY"])
cohere_client = cohere.Client(api_key=os.environ["COHERE_API_KEY"])
DB_CONFIG = {
"host": os.environ.get("PG_HOST", "localhost"),
"port": int(os.environ.get("PG_PORT", 5432)),
"dbname": os.environ.get("PG_DB", "ragdb"),
"user": os.environ.get("PG_USER", "postgres"),
"password": os.environ.get("PG_PASSWORD", ""),
}
EMBEDDING_MODEL = "text-embedding-3-large"
EMBEDDING_DIM = 3072
RERANK_MODEL = "rerank-v3.5"
LLM_MODEL = "gpt-5"
CHUNK_SIZE_TOKENS = 400 # target tokens per chunk
CHUNK_OVERLAP = 0.20 # 20% overlap
def chunk_text(text: str, max_tokens: int = CHUNK_SIZE_TOKENS,
overlap_ratio: float = CHUNK_OVERLAP) -> list[str]:
"""
Token-aware sliding window chunker.
Returns a list of text chunks.
"""
enc = tiktoken.encoding_for_model("gpt-4o") # cl100k_base
tokens = enc.encode(text)
overlap = int(max_tokens * overlap_ratio)
step = max_tokens - overlap
chunks = []
start = 0
while start < len(tokens):
end = min(start + max_tokens, len(tokens))
chunk_toks = tokens[start:end]
chunks.append(enc.decode(chunk_toks))
if end == len(tokens):
break
start += step
return chunks
def embed(texts: list[str]) -> list[list[float]]:
"""
Batch embed a list of texts using text-embedding-3-large.
OpenAI supports up to 2048 inputs per call; keep batches β€ 500
for safety.
"""
response = openai_client.embeddings.create(
model=EMBEDDING_MODEL,
input=texts,
)
return [item.embedding for item in response.data]
def embed_single(text: str) -> list[float]:
return embed([text])[0]
INSERT_SQL = """
INSERT INTO document_chunks
(document_id, chunk_number, chunk_text, token_count, embedding, metadata)
VALUES
(%s, %s, %s, %s, %s, %s)
ON CONFLICT DO NOTHING
"""
def ingest_document(conn, document_id: int, text: str,
metadata: dict = None):
"""
Chunk, embed, and insert a document's chunks into PostgreSQL.
The search_vector column is handled by the trigger.
"""
enc = tiktoken.encoding_for_model("gpt-4o")
chunks = chunk_text(text)
# Embed all chunks in one batch call for efficiency
embeddings = embed(chunks)
with conn.cursor() as cur:
for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
token_count = len(enc.encode(chunk))
import json
cur.execute(INSERT_SQL, (
document_id,
i,
chunk,
token_count,
embedding, # psycopg2 serialises Python list β pgvector
json.dumps(metadata or {}),
))
conn.commit()
print(f"Ingested {len(chunks)} chunks for document {document_id}")
def dense_search(conn, query_embedding: list[float],
k: int = 100) -> list[tuple]:
"""
Approximate nearest neighbour search using HNSW cosine distance.
Returns list of (id, chunk_text) tuples.
Note: set ef_search for query-time recall tuning:
SET hnsw.ef_search = 200; -- higher = better recall, slower
"""
with conn.cursor() as cur:
# Optionally set ef_search for this session
cur.execute("SET hnsw.ef_search = 200;")
cur.execute("""
SELECT
id,
chunk_text,
1 - (embedding <=> %s::vector) AS cosine_similarity
FROM document_chunks
ORDER BY embedding <=> %s::vector
LIMIT %s
""", (query_embedding, query_embedding, k))
return cur.fetchall() # [(id, chunk_text, similarity), ...]
def lexical_search(conn, query: str, k: int = 100) -> list[tuple]:
"""
Full-text search using PostgreSQL tsvector + ts_rank.
ts_rank is a BM25 approximation β good enough for most workloads.
For true BM25, replace this with a pg_bm25 / ParadeDB query.
"""
with conn.cursor() as cur:
cur.execute("""
SELECT
id,
chunk_text,
ts_rank(
search_vector,
plainto_tsquery('english', %s),
32 -- normalization: divide by doc length
) AS bm25_score
FROM document_chunks
WHERE search_vector @@ plainto_tsquery('english', %s)
ORDER BY bm25_score DESC
LIMIT %s
""", (query, query, k))
return cur.fetchall() # [(id, chunk_text, bm25_score), ...]
True BM25 variant (pg_bm25 / ParadeDB):
def lexical_search_bm25(conn, query: str, k: int = 100) -> list[tuple]: with conn.cursor() as cur: cur.execute(""" SELECT id, chunk_text, paradedb.score(id) AS bm25_score FROM document_chunks WHERE chunk_text @@@ %s ORDER BY paradedb.score(id) DESC LIMIT %s """, (query, k)) return cur.fetchall()
from collections import defaultdict
def rrf_merge(dense_results: list[tuple],
lexical_results: list[tuple],
k: int = 60) -> list[tuple]:
"""
Reciprocal Rank Fusion.
RRF_score(d) = Ξ£_i 1 / (k + rank_i(d))
k=60 is the empirically validated default from Cormack et al., 2009.
Parameters:
dense_results β list of (id, chunk_text, score) from pgvector
lexical_results β list of (id, chunk_text, score) from FTS/BM25
k β smoothing constant (default 60)
Returns:
Merged + sorted list of (id, chunk_text, rrf_score) tuples.
"""
scores: dict[int, float] = defaultdict(float)
docs: dict[int, tuple] = {}
# Accumulate RRF scores from dense results (1-indexed ranks)
for rank, row in enumerate(dense_results, start=1):
doc_id = row[0]
docs[doc_id] = row
scores[doc_id] += 1.0 / (k + rank)
# Accumulate RRF scores from lexical results
for rank, row in enumerate(lexical_results, start=1):
doc_id = row[0]
docs[doc_id] = row # lexical row overwrites; same doc_id
scores[doc_id] += 1.0 / (k + rank)
# Sort by descending RRF score
merged_ids = sorted(scores, key=lambda d: scores[d], reverse=True)
return [(doc_id, docs[doc_id][1], scores[doc_id])
for doc_id in merged_ids]
def cohere_rerank(question: str, candidates: list[tuple],
top_n: int = 10) -> list[tuple]:
"""
Rerank candidates using Cohere cross-encoder.
candidates: list of (id, chunk_text, score) from RRF
Returns: top_n reranked (id, chunk_text, relevance_score) tuples.
"""
if not candidates:
return []
texts = [row[1] for row in candidates]
response = cohere_client.rerank(
query=question,
documents=texts,
top_n=top_n,
model=RERANK_MODEL,
return_documents=True,
)
reranked = []
for result in response.results:
original_row = candidates[result.index]
relevance_score = result.relevance_score
reranked.append((original_row[0], original_row[1], relevance_score))
return reranked
def build_context(chunks: list[tuple]) -> str:
"""
Assemble numbered context blocks for the LLM prompt.
chunks: list of (id, chunk_text, score)
"""
parts = []
for chunk_id, chunk_text, _ in chunks:
parts.append(f"[Chunk {chunk_id}]\n{chunk_text.strip()}")
return "\n\n---\n\n".join(parts)
SYSTEM_PROMPT = """You are a grounded question-answering assistant.
Rules:
1. Answer ONLY using the supplied context below.
2. Do NOT use any external knowledge or prior training data.
3. Cite the chunk IDs that support each statement (e.g., [Chunk 145]).
4. If the answer is not present in the context, respond exactly:
"I don't have enough information to answer this question."
5. Be concise and factually precise.
"""
def build_prompt(question: str, context: str) -> str:
return f"""{SYSTEM_PROMPT}
=== CONTEXT ===
{context}
=== QUESTION ===
{question}
=== ANSWER ==="""
def generate_answer(question: str, context: str) -> str:
response = openai_client.chat.completions.create(
model=LLM_MODEL,
temperature=0,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": f"Context:\n{context}\n\nQuestion:\n{question}"},
],
)
return response.choices[0].message.content
def hybrid_rag_query(question: str, conn,
dense_k: int = 100,
lexical_k: int = 100,
rrf_k: int = 60,
rerank_n: int = 10) -> dict:
"""
Full hybrid RAG query pipeline (non-LangChain).
Returns:
{
"question": str,
"answer": str,
"chunks_used": list of (id, text, score),
"context": str,
}
"""
# 1. Embed the question
query_embedding = embed_single(question)
# 2. Dual retrieval
dense_results = dense_search(conn, query_embedding, k=dense_k)
lexical_results = lexical_search(conn, question, k=lexical_k)
# 3. Merge with RRF
merged = rrf_merge(dense_results, lexical_results, k=rrf_k)
# 4. Rerank with Cohere
top_chunks = cohere_rerank(question, merged, top_n=rerank_n)
# 5. Build context
context = build_context(top_chunks)
# 6. Generate answer
answer = generate_answer(question, context)
return {
"question": question,
"answer": answer,
"chunks_used": top_chunks,
"context": context,
}
# ββ Usage ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
if __name__ == "__main__":
conn = psycopg2.connect(**DB_CONFIG)
result = hybrid_rag_query(
question="What is the capital of France?",
conn=conn,
)
print(result["answer"])
conn.close()
LangChain Expression Language (LCEL) uses the | (pipe) operator to chain Runnable objects:
chain = step_1 | step_2 | step_3
result = chain.invoke(input)
# Equivalent to:
# result = step_3.invoke(step_2.invoke(step_1.invoke(input)))
Key Runnable types:
| Type | Purpose |
|---|---|
RunnableLambda |
Wrap any Python function as a Runnable |
RunnableParallel |
Run multiple Runnables in parallel |
RunnablePassthrough |
Pass input unchanged (useful for merging) |
ChatPromptTemplate |
Build prompt templates |
ChatOpenAI |
LangChain LLM wrapper |
from langchain_core.runnables import (
RunnableLambda,
RunnableParallel,
RunnablePassthrough,
)
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
# LangChain-compatible LLM and embeddings
llm = ChatOpenAI(model=LLM_MODEL, temperature=0)
embeddings = OpenAIEmbeddings(model=EMBEDDING_MODEL)
# ββ Step 1: Query Rewrite ββββββββββββββββββββββββββββββββββββββββββ
rewrite_prompt = ChatPromptTemplate.from_template("""
You are a search query optimizer.
Rewrite the following question to be more precise and search-friendly.
Return ONLY the rewritten query, nothing else.
Original: {question}
Rewritten:""")
query_rewriter = (
rewrite_prompt
| llm
| StrOutputParser()
)
# ββ Step 2: Multi-Query Expansion βββββββββββββββββββββββββββββββββ
multi_query_prompt = ChatPromptTemplate.from_template("""
Generate 3 different search query variations for the question below.
Return one query per line. No numbering or bullets.
Question: {question}
Queries:""")
multi_query_expander = (
multi_query_prompt
| llm
| StrOutputParser()
| RunnableLambda(lambda text: [q.strip() for q in text.strip().split("\n") if q.strip()])
)
# ββ Step 3: Hybrid Retriever βββββββββββββββββββββββββββββββββββββββ
def hybrid_retrieve(question: str, conn, dense_k=100, lexical_k=100) -> list[tuple]:
"""Runs dense + lexical search for a single query."""
q_emb = embed_single(question)
dense = dense_search(conn, q_emb, k=dense_k)
lexical = lexical_search(conn, question, k=lexical_k)
return dense, lexical
def multi_query_retrieve(questions: list[str], conn) -> list[tuple]:
"""
Runs hybrid retrieval for multiple query variants,
then performs a single RRF over all results.
"""
all_dense = []
all_lexical = []
for q in questions:
d, l = hybrid_retrieve(q, conn)
all_dense.extend(d)
all_lexical.extend(l)
# Deduplicate by chunk id before merging
seen_dense = {}
seen_lexical = {}
for row in all_dense:
seen_dense.setdefault(row[0], row)
for row in all_lexical:
seen_lexical.setdefault(row[0], row)
return rrf_merge(
list(seen_dense.values()),
list(seen_lexical.values()),
)
# ββ Step 4: Reranker βββββββββββββββββββββββββββββββββββββββββββββββ
def rerank_step(data: dict) -> dict:
question = data["question"]
candidates = data["candidates"]
top_chunks = cohere_rerank(question, candidates, top_n=10)
return {**data, "top_chunks": top_chunks}
# ββ Step 5: Context + Prompt Assembly βββββββββββββββββββββββββββββ
RAG_PROMPT = ChatPromptTemplate.from_template("""
You are a grounded question-answering assistant.
Answer ONLY using the supplied context.
Do NOT use external knowledge.
Cite chunk IDs for every claim (e.g., [Chunk 145]).
If the answer is not in the context, say:
"I don't have enough information to answer."
=== CONTEXT ===
{context}
=== QUESTION ===
{question}
=== ANSWER ===
""")
def build_lcel_chain(conn):
"""
Build and return the full LCEL hybrid RAG chain.
Requires a live psycopg2 connection.
"""
# Step 1: Rewrite query
rewrite = RunnableLambda(
lambda x: {"question": query_rewriter.invoke({"question": x["question"]}),
"original": x["question"]}
)
# Step 2: Expand into multiple queries
expand = RunnableLambda(
lambda x: {
**x,
"queries": [x["question"]] + multi_query_expander.invoke({"question": x["question"]})
}
)
# Step 3: Retrieve candidates
retrieve = RunnableLambda(
lambda x: {
**x,
"candidates": multi_query_retrieve(x["queries"], conn)
}
)
# Step 4: Rerank
rerank = RunnableLambda(rerank_step)
# Step 5: Build context string
build_ctx = RunnableLambda(
lambda x: {**x, "context": build_context(x["top_chunks"])}
)
# Step 6: Generate answer
generate = RunnableLambda(
lambda x: RAG_PROMPT.invoke({
"question": x["original"],
"context": x["context"],
})
) | llm | StrOutputParser()
# ββ Full chain βββββββββββββββββββββββββββββββββββββββββββββββββ
chain = rewrite | expand | retrieve | rerank | build_ctx | generate
return chain
# ββ Conceptual summary βββββββββββββββββββββββββββββββββββββββββββββ
#
# chain = (
# query_rewriter
# | multi_query_expander
# | hybrid_retriever
# | rrf_merger
# | cohere_reranker
# | context_builder
# | rag_prompt
# | llm
# | StrOutputParser()
# )
# ββ Build and invoke βββββββββββββββββββββββββββββββββββββββββββββββ
conn = psycopg2.connect(**DB_CONFIG)
chain = build_lcel_chain(conn)
result = chain.invoke({"question": "What is the capital of France?"})
print(result)
# ββ Streaming (LCEL supports streaming natively) βββββββββββββββββββ
for token in chain.stream({"question": "Explain the water cycle."}):
print(token, end="", flush=True)
conn.close()
| Feature | Pure Python | LangChain LCEL |
|---|---|---|
| Streaming | Manual | Built-in via .stream() |
| Async | Manual asyncio |
Built-in via .ainvoke() |
| Observability | Manual logging | LangSmith integration |
| Parallelism | Manual threading | RunnableParallel |
| Composability | Function calls | | pipe operator |
| Testability | Easy unit tests | Easy mock injection |
| Control | Maximum | Slightly abstracted |
| Debugging | Straightforward | Requires LCEL knowledge |
Key principles:
SYSTEM_PROMPT = """You are a precise, grounded QA assistant.
Strict rules:
1. Use ONLY the context provided below to answer.
2. Do NOT draw on your training data or outside knowledge.
3. For every factual claim, cite supporting chunk IDs: [Chunk 145].
4. If the answer cannot be found in the context, respond exactly:
"I don't have enough information to answer this question."
5. Never guess, infer, or extrapolate beyond what the context states.
6. Keep your answer concise and factually accurate."""
def build_context_with_metadata(chunks: list[tuple],
metadata: list[dict] = None) -> str:
"""
Build rich context with optional source metadata.
"""
parts = []
for i, (chunk_id, chunk_text, score) in enumerate(chunks):
meta_str = ""
if metadata and i < len(metadata):
m = metadata[i]
src = m.get("source", "unknown")
page = m.get("page", "")
meta_str = f"Source: {src}" + (f", Page: {page}" if page else "")
block = f"[Chunk {chunk_id}]"
if meta_str:
block += f" ({meta_str})"
block += f"\n{chunk_text.strip()}"
parts.append(block)
return "\n\n---\n\n".join(parts)
def validate_answer(answer: str, context: str) -> dict:
"""
Basic heuristic checks on the generated answer.
In production, use an LLM-as-judge or Cohere groundedness API.
"""
no_info_phrase = "I don't have enough information"
has_citations = "[Chunk" in answer
refused = no_info_phrase in answer
return {
"has_citations": has_citations,
"refused": refused,
"needs_review": not has_citations and not refused,
}
| Parameter | Recommended | Notes |
|---|---|---|
| Chunk size | 300β500 tokens | Larger = more context per chunk but less precise |
| Overlap | 15β20% | Prevents answer split across chunk boundary |
| Splitter | Token-aware | Character splitters can break mid-sentence |
| Hierarchy | Doc β Section β Chunk | Enables parent-doc retrieval if needed |
-- Index-time parameters (set during CREATE INDEX)
-- m = number of connections per layer (default 16)
-- ef_construction = candidate list size during construction (default 64)
-- Higher values β better recall, slower build, more memory
CREATE INDEX idx_chunks_embedding
ON document_chunks
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
-- Query-time parameter
-- ef_search = candidate list size during search
-- Higher β better recall, slower query
-- Default: 40 β increase to 100β200 for production recall targets
SET hnsw.ef_search = 200;
At 10M+ chunks, use PgBouncer or pgpool-II:
App Servers β PgBouncer (transaction mode) β PostgreSQL
Recommended PgBouncer settings:
pool_mode = transaction
max_client_conn = 1000
default_pool_size = 20
min_pool_size = 5
| Corpus Size | Approach |
|---|---|
| < 1M chunks | Single PostgreSQL node, IVFFlat index |
| 1Mβ10M chunks | Single PostgreSQL node, HNSW index, PgBouncer |
| 10Mβ50M chunks | PostgreSQL read replicas for search workload |
| 50M+ chunks | pgvector on Citus (distributed), or dedicated vector DB (Weaviate, Qdrant) |
| Stage | Optimisation |
|---|---|
| Embedding | Batch ingestion, cache frequent queries |
| Cohere Rerank | Only rerank top-N from RRF; Cohere charges per token |
| GPT-5 | Send only 5β10 chunks; temperature=0 for consistency |
| PostgreSQL | Partition document_chunks by document_id range for large corpora |
Misconception: PostgreSQLβs built-in full-text search (ts_rank) implements the BM25 ranking algorithm.
Reality: ts_rank is a custom scoring approximation β it does not implement BM25βs term frequency saturation or document length normalisation. For true BM25, you need pg_bm25 (ParadeDB) or an external engine like Elasticsearch. For most RAG workloads ts_rank is sufficient, but do not conflate the two when precision of ranking matters.
Misconception: Pythonβs enumerate() can be used as-is to generate ranks for the RRF formula.
Reality: The RRF formula uses 1-based ranks. enumerate() defaults to 0-based, which inflates the score of the top document (1/(60+0) = 0.0167 instead of the correct 1/(60+1) = 0.0164). Always pass start=1:
# β Wrong β 0-indexed
for rank, row in enumerate(dense_results):
scores[doc_id] += 1 / (k + rank)
# β
Correct β 1-indexed as per Cormack et al., 2009
for rank, row in enumerate(dense_results, start=1):
scores[doc_id] += 1.0 / (k + rank)
Misconception: pg_bm25 (ParadeDB) requires a pre-computed column and a trigger, similar to the search_vector TSVECTOR pattern used in PostgreSQL FTS.
Reality: pg_bm25 builds its index directly on the raw chunk_text column using the Tantivy engine. There is no extra column, no trigger, and no to_tsvector() call involved. A normal INSERT is all that is needed β the index maintains itself like any standard B-tree index.
Misconception: The Cohere rerank API always returns the document text alongside scores, so return_documents need not be specified.
Reality: Since Cohere API v2, only indices and relevance scores are returned by default. Document text must be explicitly requested:
# β Fragile β document text may not be returned
response = cohere_client.rerank(query=question, documents=texts, top_n=10, model="rerank-v3.5")
# β
Explicit and safe
response = cohere_client.rerank(query=question, documents=texts, top_n=10,
model="rerank-v3.5", return_documents=True)
Misconception: psycopg2 will correctly serialise a Python list[float] as a pgvector vector type without any special setup.
Reality: While this often works incidentally, the correct approach is to register the pgvector type adapter explicitly at connection time. Without it, behaviour can be inconsistent across driver versions:
from pgvector.psycopg2 import register_vector
conn = psycopg2.connect(**DB_CONFIG)
register_vector(conn) # β ensures correct vector type serialisation
Misconception: Since Cohere reranking is so accurate, you can skip broad retrieval and just fetch 10β20 candidates directly.
Reality: The reranker is a precision tool, not a recall tool. It can only re-order what it is given β it cannot surface documents that were never retrieved in the first place. If you retrieve too few candidates, high-quality chunks may never reach the reranker. The correct approach is always broad retrieval (100+ per retriever) followed by narrow reranking.
Misconception: A small temperature (e.g. 0.3β0.7) makes the LLMβs answers more natural and readable without meaningfully increasing hallucination.
Reality: In a grounded RAG system, any temperature above 0 increases the probability of the model deviating from the retrieved context. The model may blend retrieved facts with parametric memory in unpredictable ways. For production QA over a closed corpus, always use temperature=0.
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β HYBRID RAG β QUICK REFERENCE β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β CHUNKING β
β Size: 300β500 tokens β
β Overlap: 15β20% β
β Method: Token-aware sliding window β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β EMBEDDING β
β Model: text-embedding-3-large β
β Dim: 3072 β
β Op: cosine similarity (<=>) β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β PGVECTOR INDEX β
β Type: HNSW β
β Params: m=16, ef_construction=64 β
β Runtime: SET hnsw.ef_search = 200; β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β FTS INDEX β
β Type: GIN on tsvector β
β Function: plainto_tsquery / ts_rank β
β True BM25: pg_bm25 / ParadeDB (optional) β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β RETRIEVAL SIZES β
β Dense: top-100 β
β Lexical: top-100 β
β RRF output: top 100β150 β
β Cohere out: top 10 β
β To LLM: top 5β10 β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β RRF β
β Formula: 1 / (k + rank_i) summed over retrievers β
β k: 60 (Cormack default) β
β Ranks: 1-indexed β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β COHERE RERANK β
β Model: rerank-v3.5 β
β Type: Cross-encoder (query + doc together) β
β Input: 100β150 candidates β
β Output: top-10 β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β LLM GENERATION β
β Model: GPT-5 (gpt-5 / verify model string) β
β Temp: 0 (deterministic) β
β Prompt: Context-only, cite chunk IDs β
β Fallback: "I don't have enough information." β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β PIPELINE ORDER β
β Question β Rewrite β Multi-Query β Dense+Lexical β
β β RRF β Cohere Rerank β Context Build β GPT-5 β Answer β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ