Sample RAG implementations

Refer Retrieval-Augmented Generation for details on RAG.

Rag Implementation 1
import os
from google import genai
from google.genai import types

# --- 1. SETUP ---
# Ensure your GEMINI_API_KEY is set in your environment
try:
    client = genai.Client()
except Exception as e:
    print("Error initializing Gemini Client. Is GEMINI_API_KEY set?")
    print(e)
    exit()

# --- 2. THE SIMULATED KNOWLEDGE BASE ---
# In a real RAG system, this text would be loaded from a vector database after chunking.
# We simulate chunks using a list of strings (our "knowledge base").
KNOWLEDGE_BASE = [
    "Fact 1: The capital of NEW_COUNTRY is NEW_CAPITAL, which is located on the NEW_RIVER River.",
    "Fact 2: Python was created by Guido van Rossum and first released in 1991.",
    "Fact 3: The fastest land animal is the cheetah, capable of speeds up to 120 km/h.",
    "Fact 4: The human body has 206 bones, though babies are born with about 300.",
    "Fact 5: The Apollo 11 mission first landed humans on the moon in 1969.",
    "Fact 6: RAG stands for Retrieval-Augmented Generation, a technique that uses external data.",
]

def retrieve_context(query: str, knowledge_base: list[str]) -> str:
    """
    SIMULATED RETRIEVAL: Finds relevant "chunks" based on simple keyword matching.
    
    In a real RAG system, this would use embeddings (vectors) for semantic search.
    """
    print(f"-> Retrieving context for: '{query}'")
    relevant_chunks = []
    
    # Simple tokenization and normalization for matching
    query_keywords = set(query.lower().split())
    print("SLV:", type(query_keywords), query_keywords)
    for chunk in knowledge_base:
        # Check if any significant query keyword exists in the chunk
        if any(keyword in chunk.lower() for keyword in query_keywords if len(keyword) > 3):
            relevant_chunks.append(chunk)

    if relevant_chunks:
        print(f"-> Found {len(relevant_chunks)} relevant chunk(s).")
        print("SLV: Relevant chunks are: ", relevant_chunks)
        # Combine the relevant chunks into a single string for the prompt
        context = "\n".join(relevant_chunks)
        return context
    else:
        print("-> No relevant context found using keyword matching.")
        return ""

def generate_response(query: str, context: str) -> str:
    """
    GENERATION: Uses the retrieved context to answer the user query.
    """
    if not context:
        return "I cannot answer the question, as no relevant context was retrieved from the knowledge base."

    # Construct the augmented prompt
    prompt = (
        "You are an expert Q&A system. Use ONLY the provided context to answer the query.\n\n"
        f"CONTEXT:\n---\n{context}\n---\n\n"
        f"QUERY: {query}"
    )

    try:
        response = client.models.generate_content(
            model='gemini-2.5-flash',
            contents=prompt
        )
        # Uncomment below line and check the other contents of the repsonse received
        # print(f"SLV: Type of Response: {type(response)} Response: {response}")
        return response.text
    except Exception as e:
        return f"An API error occurred during generation: {e}"

# --- 3. MAIN RAG EXECUTION ---
if __name__ == "__main__":
    
    # --- Example 1: Successful Retrieval ---
    query1 = "Which year was Python first released and who developed it?"
    print("\n" + "="*50)
    print(f"RUNNING RAG QUERY 1: {query1}")
    
    retrieved_context1 = retrieve_context(query1, KNOWLEDGE_BASE)
    final_answer1 = generate_response(query1, retrieved_context1)
    
    print("\nFINAL ANSWER 1:")
    print(final_answer1)
    
    # --- Example 2: Retrieval for an unrelated topic (Demonstrates grounding limit) ---
    query2 = "What is the capital of Spain?"
    print("\n" + "="*50)
    print(f"RUNNING RAG QUERY 2: {query2}")

    retrieved_context2 = retrieve_context(query2, KNOWLEDGE_BASE)
    final_answer2 = generate_response(query2, retrieved_context2)
    
    print("\nFINAL ANSWER 2:")
    print(final_answer2)
    
    # --- Example 3: Successful Retrieval ---
    query3 = "What is the capital of NEW_COUNTRY?"
    print("\n" + "="*50)
    print(f"RUNNING RAG QUERY 1: {query3}")
    
    retrieved_context3 = retrieve_context(query3, KNOWLEDGE_BASE)
    final_answer3 = generate_response(query3, retrieved_context1)
    
    print("\nFINAL ANSWER 3:")
    print(final_answer3)
    
"""
Creating a simple RAG (Retrieval-Augmented Generation) function without external documents or a vector database requires simulating the retrieval process using **string matching** against the reference text embedded directly in the script.

This is a **rudimentary simulation** because real RAG relies on semantic (meaning-based) search using vectors, not simple keyword matching. However, this script demonstrates the core **Retrieve-Then-Generate** logic.

### Python Script: Simple In-Script RAG Simulation

Save the above code as `simple_rag_inscript.py`.

### Explanation of the Simulated RAG Process

1.  **Retrieve:** The `retrieve_context` function simulates the retrieval phase.
      * It takes the user's `query` and the `KNOWLEDGE_BASE` list.
      * Instead of performing a vector search, it uses basic Python string operations to find **keyword overlaps** between the query and the knowledge chunks.
      * The relevant matching chunks are combined into a single `context` string.
      * *Note: This is the weakest link compared to real RAG, as it misses synonyms and semantic relevance.*
2.  **Augment:** The `generate_response` function takes the original `query` and the `context` string.
      * It **augments** (enhances) the prompt given to the Gemini model by inserting the retrieved context into a clear instruction format.
      * The prompt instructs the model to use **ONLY** the provided context, which forces the model to ground its answer in the external data.
3.  **Generate:** The Gemini model receives the augmented prompt and generates a factual, grounded response based on the included context.

This script clearly separates the two core RAG stages: **retrieval** (which identifies relevant data) and **generation** (which uses that data to form the answer).

"""
Rag Implementation 2
print("Importing Modules...")
import os
#FAISS (Facebook AI Similarity Search) is an open-source library developed by Facebook AI Research (FAIR) 
# for efficient similarity search and clustering of high-dimensional vectors. 
# A "FAISS index" refers to the data structure created and managed by FAISS to enable this efficient similarity search. 
import faiss
import numpy as np
from sentence_transformers import SentenceTransformer
from google import genai
from google.genai import types
print("Completed Importing Modules")

# --- 0. CONFIGURATION AND INITIALIZATION ---
# NOTE: This uses an open-source model locally for embeddings.
EMBEDDING_MODEL_NAME = 'sentence-transformers/all-MiniLM-L6-v2'
LLM_MODEL_NAME = 'gemini-2.5-flash'
K_CHUNKS = 2 # Number of top relevant chunks to retrieve

# --- 1. SIMULATED KNOWLEDGE BASE (Text File Content) ---
# In a real application, this would be loaded from a large text file or PDF.
# We simulate text extracted from a document.
RAW_TEXT = """
The Solar Cruiser program was approved in 2024 with a primary goal of testing solar-sail technology. 
The lead engineer for the project is Dr. Anya Sharma. The total budget allocated for the first phase 
of the Solar Cruiser is $500 million, focusing heavily on materials science and deployment systems.
Deployment is scheduled for Q1 2026. The funding was provided by the National Aerospace Foundation.
A secondary component involves using AI for navigation corrections, reducing reliance on ground control.
The foundational AI platform, 'Aura,' was developed by a specialized aerospace contractor.
"""

def chunk_text(text: str) -> list[str]:
    """Simple chunking function: splits text by sentence."""
    # A more robust solution would use LangChain's RecursiveCharacterTextSplitter
    return [s.strip() for s in text.split('.') if s.strip()]

class RAGSystem:
    def __init__(self):
        # Initialize the local embedding model
        print("1. Loading Embedding Model...")
        self.embed_model = SentenceTransformer(EMBEDDING_MODEL_NAME)
        
        # Initialize the Gemini client
        if 'GEMINI_API_KEY' not in os.environ:
            raise ValueError("GEMINI_API_KEY environment variable not set.")
        self.llm_client = genai.Client()

        # Components for Indexing
        self.text_chunks = []
        self.faiss_index = None

    def create_index(self, raw_text: str):
        """
        INDEXING PHASE: Chunking, Embedding, and Storing Vectors.
        """
        # 1. Chunking
        self.text_chunks = chunk_text(raw_text)
        print(f"   -> Chunked text into {len(self.text_chunks)} segments.")

        # 2. Embedding
        print("   -> Generating Embeddings...")
        embeddings = self.embed_model.encode(self.text_chunks)
        embeddings = embeddings.astype('float32')
        
        # 3. Indexing (FAISS)
        dimension = embeddings.shape[1]
        print(f"   -> Creating FAISS Index (Dim: {dimension})...")
        self.faiss_index = faiss.IndexFlatL2(dimension)
        self.faiss_index.add(embeddings)
        print("   Indexing complete.")

    def retrieve_context(self, query: str, k: int) -> str:
        """
        RETRIEVAL PHASE: Embed Query, Search Index, Get Chunks.
        """
        if self.faiss_index is None:
            raise RuntimeError("Index not created. Run create_index first.")

        # 1. Embed Query
        query_vector = self.embed_model.encode([query]).astype('float32')
        
        # 2. Search FAISS Index (L2 distance for similarity search)
        distances, indices = self.faiss_index.search(query_vector, k)
        
        # 3. Get Chunks (Retrieve text corresponding to the top indices)
        retrieved_chunks = [self.text_chunks[i] for i in indices[0]]
        
        print(f"\n2. Retrieved {len(retrieved_chunks)} relevant chunk(s).")
        for i, chunk in enumerate(retrieved_chunks):
            print(f"   [Chunk {i+1}]: {chunk}")

        # Combine chunks into a single context string
        return "\n".join(retrieved_chunks)

    def generate_response(self, query: str, context: str) -> str:
        """
        GENERATION PHASE: Augment Prompt and Call LLM.
        """
        print("\n3. Generating response with Gemini...")
        
        # 1. Augment Prompt
        prompt = (
            "You are an expert Q&A system. Use ONLY the following retrieved context "
            "to answer the question. If the information is not in the context, state "
            "that you cannot answer based on the provided data.\n\n"
            f"CONTEXT:\n---\n{context}\n---\n\n"
            f"QUERY: {query}"
        )

        # 2. Call LLM
        response = self.llm_client.models.generate_content(
            model=LLM_MODEL_NAME,
            contents=prompt,
        )
        return response.text

# --- 4. EXECUTION ---
if __name__ == "__main__":
    
    rag_pipeline = RAGSystem()
    
    # 1. INDEXING
    print("--- START INDEXING ---")
    rag_pipeline.create_index(RAW_TEXT)
    print("--- INDEXING COMPLETE ---\n")

    # 2. RAG QUERY
    user_query = "Who is the lead engineer for Solar Cruiser, and what is the project's budget?"
    print("="*50)
    print(f"USER QUERY: {user_query}")
    print("="*50)
    
    # Retrieval Phase
    context = rag_pipeline.retrieve_context(user_query, K_CHUNKS)
    
    # Generation Phase
    final_answer = rag_pipeline.generate_response(user_query, context)
    
    print("\n\n--- FINAL GROUNDED ANSWER (from Gemini) ---")
    print(final_answer)
    print("------------------------------------------")


"""    
    
This script demonstrates a fundamental **Retrieval-Augmented Generation (RAG)** pipeline using a 
**local embedding model** (`sentence-transformers`) and a lightweight **vector database** (`FAISS`) to index text, 
perform semantic search, and then use the retrieved context to ground a response from the Gemini API.

This approach isolates the three core RAG steps: **Indexing**, **Retrieval**, and **Generation**.
"""