Data Ingestion
Building Data Ingestion from Scratch
Introduction¶
Data ingestion is a crucial first step in building effective Retrieval-Augmented Generation (RAG) systems. It involves the process of collecting, processing, and storing data in a format that can be efficiently retrieved and used by the RAG model. This README provides an overview of the data ingestion process for RAG systems.
Importance of Data Ingestion in RAG¶
Effective data ingestion is essential for RAG systems because it:
- Determines the quality and relevance of information available for retrieval.
- Affects the system's ability to understand and process queries accurately.
- Impacts the overall performance and efficiency of the RAG pipeline.
- Enables the system to handle diverse data sources and formats.
Key Steps in Data Ingestion¶
The data ingestion process typically involves the following steps:
flowchart TB A[Data Collection] --> B[Data Cleaning] B --> C[Document Splitting] C --> D[Metadata Extraction] D --> E[Embedding Generation] E --> F[Indexing and Storage]
Data Collection: Gathering information from various sources such as databases, APIs, web scraping, or file systems.
Data Cleaning: Preprocessing the collected data to remove noise, handle missing values, and standardize formats.
Document Splitting: Breaking down large documents into smaller, manageable chunks for more effective retrieval.
Metadata Extraction: Identifying and extracting relevant metadata from the documents to enhance retrieval capabilities.
Embedding Generation: Creating vector representations of the text chunks to enable semantic search.
Indexing and Storage: Organizing and storing the processed data in a format optimized for quick retrieval, often using vector databases or search engines.
Challenges in Data Ingestion¶
- Handling diverse data formats and sources
- Ensuring data quality and consistency
- Managing large volumes of data efficiently
- Updating and maintaining the knowledge base
- Balancing between chunk size and semantic coherence
Best Practices¶
- Data Quality: Implement robust data cleaning and validation processes.
- Scalability: Design the ingestion pipeline to handle growing data volumes.
- Metadata Enrichment: Extract and store relevant metadata to improve retrieval accuracy.
- Incremental Updates: Develop mechanisms for efficiently updating the knowledge base.
- Monitoring: Implement logging and monitoring to track ingestion performance and data quality.
Conclusion¶
A well-designed data ingestion process is fundamental to the success of a RAG system. It ensures that the information retrieved is accurate, relevant, and up-to-date, ultimately leading to better-quality responses from the language model.
In the following sections, we'll explore other crucial components of RAG systems, including data chunking, embedding generation, and retrieval mechanisms.
If you're opening this Notebook on colab, you will probably need to install LlamaIndex 🦙.
!!pip install llama-index
!pip install llama-index-llms-openai
!pip install llama-index-embeddings-openai
!pip install llama-index-vector-stores-qdrant
!pip -q install python-dotenv
!pip install -U qdrant_client fastembed
!pip install pymupdf
OpenAI¶
You will need an OpenAI api key for this tutorial. Login to your platform.openai.com account, click on your profile picture in the upper right corner, and choose 'API Keys' from the menu. Create an API key for this tutorial and save it. You will need it below.
Set your OpenAI api key, and Pinecone api key and environment in the file we created.
import os
from dotenv import load_dotenv
load_dotenv()
Setting up Vector Database¶
We will be using qDrant as the Vector database There are 4 ways to initialize qdrant
- Inmemory
client = qdrant_client.QdrantClient(location=":memory:")
- Disk
client = qdrant_client.QdrantClient(path="./data")
- Self hosted or Docker
client = qdrant_client.QdrantClient(
# url="http://<host>:<port>"
host="localhost",port=6333
)
- Qdrant cloud
client = qdrant_client.QdrantClient(
url=QDRANT_CLOUD_ENDPOINT,
api_key=QDRANT_API_KEY,
)
for this notebook we will be using qdrant cloud
import qdrant_client
# LlamaIndex core imports
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader
from llama_index.core import Settings
# LlamaIndex vector store import
from llama_index.vector_stores.qdrant import QdrantVectorStore
# creating a qdrant client instance
client = qdrant_client.QdrantClient(
# you can use :memory: mode for fast and light-weight experiments,
# it does not require to have Qdrant deployed anywhere
# but requires qdrant-client >= 1.1.1
# location=":memory:"
# otherwise set Qdrant instance address with:
# url=QDRANT_CLOUD_ENDPOINT,
# otherwise set Qdrant instance with host and port:
host="localhost",
port=6333
# set API KEY for Qdrant Cloud
# api_key=QDRANT_API_KEY,
# path="./db/"
)
vector_store = QdrantVectorStore(client=client, collection_name="01_Data_Ingestion")
Build an Ingestion Pipeline from Scratch¶
We show how to build an ingestion pipeline as mentioned in the introduction.
Note that steps (2) and (3) can be handled via our NodeParser
abstractions, which handle splitting and node creation.
For the purposes of this tutorial, we show you how to create these objects manually.
1. Load Data¶
!mkdir data
!wget --user-agent "Mozilla" "https://arxiv.org/pdf/2307.09288.pdf" -O "../data/llama2.pdf"
--2023-10-13 01:45:14-- https://arxiv.org/pdf/2307.09288.pdf Resolving arxiv.org (arxiv.org)... 128.84.21.199 Connecting to arxiv.org (arxiv.org)|128.84.21.199|:443... connected. HTTP request sent, awaiting response... 200 OK Length: 13661300 (13M) [application/pdf] Saving to: ‘data/llama2.pdf’ data/llama2.pdf 100%[===================>] 13.03M 7.59MB/s in 1.7s 2023-10-13 01:45:16 (7.59 MB/s) - ‘data/llama2.pdf’ saved [13661300/13661300]
import fitz
file_path = "../data/llama2.pdf"
doc = fitz.open(file_path)
2. Use a Text Splitter to Split Documents¶
Here we import our SentenceSplitter
to split document texts into smaller chunks, while preserving paragraphs/sentences as much as possible.
from llama_index.core.node_parser import SentenceSplitter
text_parser = SentenceSplitter(
chunk_size=1024,
# separator=" ",
)
text_chunks = []
# maintain relationship with source doc index, to help inject doc metadata in (3)
doc_idxs = []
for doc_idx, page in enumerate(doc):
page_text = page.get_text("text")
cur_text_chunks = text_parser.split_text(page_text)
text_chunks.extend(cur_text_chunks)
doc_idxs.extend([doc_idx] * len(cur_text_chunks))
3. Manually Construct Nodes from Text Chunks¶
We convert each chunk into a TextNode
object, a low-level data abstraction in LlamaIndex that stores content but also allows defining metadata + relationships with other Nodes.
We inject metadata from the document into each node.
This essentially replicates logic in our SentenceSplitter
.
from llama_index.core.schema import TextNode
nodes = []
for idx, text_chunk in enumerate(text_chunks):
node = TextNode(
text=text_chunk,
)
src_doc_idx = doc_idxs[idx]
src_page = doc[src_doc_idx]
nodes.append(node)
print(nodes[0].metadata)
# print a sample node
print(nodes[0].get_content(metadata_mode="all"))
[Optional] 4. Extract Metadata from each Node¶
We extract metadata from each Node using our Metadata extractors.
This will add more metadata to each Node.
from llama_index.core.extractors import (
QuestionsAnsweredExtractor,
TitleExtractor,
)
from llama_index.core.ingestion import IngestionPipeline
from llama_index.llms.openai import OpenAI
llm = OpenAI(model="gpt-3.5-turbo")
extractors = [
TitleExtractor(nodes=5, llm=llm),
QuestionsAnsweredExtractor(questions=3, llm=llm),
]
pipeline = IngestionPipeline(
transformations=extractors,
)
nodes = await pipeline.arun(nodes=nodes, in_place=False)
print(nodes[0].metadata)
5. Generate Embeddings for each Node¶
Generate document embeddings for each Node using our OpenAI embedding model (text-embedding-ada-002
).
Store these on the embedding
property on each Node.
from llama_index.embeddings.openai import OpenAIEmbedding
embed_model = OpenAIEmbedding()
for node in nodes:
node_embedding = embed_model.get_text_embedding(
node.get_content(metadata_mode="all")
)
node.embedding = node_embedding
6. Load Nodes into a Vector Store¶
We now insert these nodes into our PineconeVectorStore
.
NOTE: We skip the VectorStoreIndex abstraction, which is a higher-level abstraction that handles ingestion as well. We use VectorStoreIndex
in the next section to fast-track retrieval/querying.
vector_store.add(nodes)
Retrieve and Query from the Vector Store¶
Now that our ingestion is complete, we can retrieve/query this vector store.
NOTE: We can use our high-level VectorStoreIndex
abstraction here. See the next section to see how to define retrieval at a lower-level!
from llama_index.core import VectorStoreIndex
from llama_index.core import StorageContext
index = VectorStoreIndex.from_vector_store(vector_store)
query_engine = index.as_query_engine()
query_str = "Can you tell me about the key concepts for safety finetuning"
response = query_engine.query(query_str)
print(str(response))