Data Chunking
Implmenting Chunking From Scratch¶
In [2]:
Copied!
import numpy as np
from typing import List, Callable, Optional
from abc import ABC, abstractmethod
import re
import tiktoken
# Load the text file
with open('../data/paul_graham_essay.txt', 'r') as file:
text = file.read()
print(f"Loaded text with {len(text)} characters")
import numpy as np
from typing import List, Callable, Optional
from abc import ABC, abstractmethod
import re
import tiktoken
# Load the text file
with open('../data/paul_graham_essay.txt', 'r') as file:
text = file.read()
print(f"Loaded text with {len(text)} characters")
Loaded text with 75014 characters
In [3]:
Copied!
class BaseChunker(ABC):
@abstractmethod
def split_text(self, text: str) -> List[str]:
pass
class TextSplitter(BaseChunker, ABC):
def __init__(
self,
chunk_size: int = 4000,
chunk_overlap: int = 200,
length_function: Callable[[str], int] = len,
) -> None:
self._chunk_size = chunk_size
self._chunk_overlap = chunk_overlap
self._length_function = length_function
def _merge_splits(self, splits: List[str], separator: str) -> List[str]:
docs = []
current_doc = []
total = 0
for d in splits:
_len = self._length_function(d)
if total + _len > self._chunk_size:
if total > self._chunk_size:
print(f"Created a chunk of size {total}, which is longer than the specified {self._chunk_size}")
if current_doc:
doc = self._join_docs(current_doc, separator)
if doc is not None:
docs.append(doc)
# Keep on popping if:
# - we have a larger chunk than in the chunk overlap
# - or if we still have any chunks and the length is long
while total > self._chunk_overlap or (
total + _len > self._chunk_size and total > 0
):
total -= self._length_function(current_doc[0])
current_doc = current_doc[1:]
current_doc.append(d)
total += _len
doc = self._join_docs(current_doc, separator)
if doc is not None:
docs.append(doc)
return docs
def _join_docs(self, docs: List[str], separator: str) -> Optional[str]:
text = separator.join(docs)
text = text.strip()
if text == "":
return None
else:
return text
class BaseChunker(ABC):
@abstractmethod
def split_text(self, text: str) -> List[str]:
pass
class TextSplitter(BaseChunker, ABC):
def __init__(
self,
chunk_size: int = 4000,
chunk_overlap: int = 200,
length_function: Callable[[str], int] = len,
) -> None:
self._chunk_size = chunk_size
self._chunk_overlap = chunk_overlap
self._length_function = length_function
def _merge_splits(self, splits: List[str], separator: str) -> List[str]:
docs = []
current_doc = []
total = 0
for d in splits:
_len = self._length_function(d)
if total + _len > self._chunk_size:
if total > self._chunk_size:
print(f"Created a chunk of size {total}, which is longer than the specified {self._chunk_size}")
if current_doc:
doc = self._join_docs(current_doc, separator)
if doc is not None:
docs.append(doc)
# Keep on popping if:
# - we have a larger chunk than in the chunk overlap
# - or if we still have any chunks and the length is long
while total > self._chunk_overlap or (
total + _len > self._chunk_size and total > 0
):
total -= self._length_function(current_doc[0])
current_doc = current_doc[1:]
current_doc.append(d)
total += _len
doc = self._join_docs(current_doc, separator)
if doc is not None:
docs.append(doc)
return docs
def _join_docs(self, docs: List[str], separator: str) -> Optional[str]:
text = separator.join(docs)
text = text.strip()
if text == "":
return None
else:
return text
Fixed Token Chunker¶
In [8]:
Copied!
class FixedTokenChunker(TextSplitter):
def __init__(self, chunk_size: int = 100, chunk_overlap: int = 0):
super().__init__(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
self._tokenizer = tiktoken.get_encoding("cl100k_base")
def split_text(self, text: str) -> List[str]:
tokens = self._tokenizer.encode(text)
chunks = []
for i in range(0, len(tokens), self._chunk_size - self._chunk_overlap):
chunk = self._tokenizer.decode(tokens[i:i + self._chunk_size])
chunks.append(chunk)
return chunks
# Use FixedTokenChunker
fixed_chunker = FixedTokenChunker(chunk_size=100, chunk_overlap=0)
fixed_chunks = fixed_chunker.split_text(text)
print(f"FixedTokenChunker: {len(fixed_chunks)} chunks")
print("\nFirst 5 chunks:")
for i, chunk in enumerate(fixed_chunks[:5], 1):
print(f"\nChunk {i}:\n","-"*100)
print(chunk[:100] + "..." if len(chunk) > 100 else chunk)
class FixedTokenChunker(TextSplitter):
def __init__(self, chunk_size: int = 100, chunk_overlap: int = 0):
super().__init__(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
self._tokenizer = tiktoken.get_encoding("cl100k_base")
def split_text(self, text: str) -> List[str]:
tokens = self._tokenizer.encode(text)
chunks = []
for i in range(0, len(tokens), self._chunk_size - self._chunk_overlap):
chunk = self._tokenizer.decode(tokens[i:i + self._chunk_size])
chunks.append(chunk)
return chunks
# Use FixedTokenChunker
fixed_chunker = FixedTokenChunker(chunk_size=100, chunk_overlap=0)
fixed_chunks = fixed_chunker.split_text(text)
print(f"FixedTokenChunker: {len(fixed_chunks)} chunks")
print("\nFirst 5 chunks:")
for i, chunk in enumerate(fixed_chunks[:5], 1):
print(f"\nChunk {i}:\n","-"*100)
print(chunk[:100] + "..." if len(chunk) > 100 else chunk)
FixedTokenChunker: 166 chunks First 5 chunks: Chunk 1: ---------------------------------------------------------------------------------------------------- What I Worked On February 2021 Before college the two main things I worked on, outside of school... Chunk 2: ---------------------------------------------------------------------------------------------------- was then called "data processing." This was in 9th grade, so I was 13 or 14. The school district's ... Chunk 3: ---------------------------------------------------------------------------------------------------- we used was an early version of Fortran. You had to type programs on punch cards, then stack them i... Chunk 4: ---------------------------------------------------------------------------------------------------- punched cards, and I didn't have any data stored on punched cards. The only other option was to do ... Chunk 5: ---------------------------------------------------------------------------------------------------- . On a machine without time-sharing, this was a social as well as a technical error, as the data cen...
Recursive Token Chunking¶
In [9]:
Copied!
class RecursiveTokenChunker(TextSplitter):
def __init__(
self,
chunk_size: int = 100,
chunk_overlap: int = 0,
separators: Optional[List[str]] = None,
):
super().__init__(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
self._separators = separators or ["\n\n", "\n", ".", "?", "!", " ", ""]
def split_text(self, text: str) -> List[str]:
return self._split_text(text, self._separators)
def _split_text(self, text: str, separators: List[str]) -> List[str]:
final_chunks = []
separator = separators[-1]
new_separators = []
for i, _s in enumerate(separators):
if _s == "":
separator = _s
break
if re.search(re.escape(_s), text):
separator = _s
new_separators = separators[i + 1:]
break
splits = re.split(f"({re.escape(separator)})", text)
splits = [s for s in splits if s != ""]
_good_splits = []
for s in splits:
if self._length_function(s) < self._chunk_size:
_good_splits.append(s)
else:
if _good_splits:
merged_text = self._merge_splits(_good_splits, "")
final_chunks.extend(merged_text)
_good_splits = []
if not new_separators:
final_chunks.append(s)
else:
other_info = self._split_text(s, new_separators)
final_chunks.extend(other_info)
if _good_splits:
merged_text = self._merge_splits(_good_splits, "")
final_chunks.extend(merged_text)
return final_chunks
# Use RecursiveTokenChunker
recursive_chunker = RecursiveTokenChunker(chunk_size=100, chunk_overlap=0)
recursive_chunks = recursive_chunker.split_text(text)
print(f"RecursiveTokenChunker: {len(recursive_chunks)} chunks")
print("\nFirst 5 chunks:")
for i, chunk in enumerate(fixed_chunks[:5], 1):
print(f"\nChunk {i}:\n","-"*100)
print(chunk[:100] + "..." if len(chunk) > 100 else chunk)
class RecursiveTokenChunker(TextSplitter):
def __init__(
self,
chunk_size: int = 100,
chunk_overlap: int = 0,
separators: Optional[List[str]] = None,
):
super().__init__(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
self._separators = separators or ["\n\n", "\n", ".", "?", "!", " ", ""]
def split_text(self, text: str) -> List[str]:
return self._split_text(text, self._separators)
def _split_text(self, text: str, separators: List[str]) -> List[str]:
final_chunks = []
separator = separators[-1]
new_separators = []
for i, _s in enumerate(separators):
if _s == "":
separator = _s
break
if re.search(re.escape(_s), text):
separator = _s
new_separators = separators[i + 1:]
break
splits = re.split(f"({re.escape(separator)})", text)
splits = [s for s in splits if s != ""]
_good_splits = []
for s in splits:
if self._length_function(s) < self._chunk_size:
_good_splits.append(s)
else:
if _good_splits:
merged_text = self._merge_splits(_good_splits, "")
final_chunks.extend(merged_text)
_good_splits = []
if not new_separators:
final_chunks.append(s)
else:
other_info = self._split_text(s, new_separators)
final_chunks.extend(other_info)
if _good_splits:
merged_text = self._merge_splits(_good_splits, "")
final_chunks.extend(merged_text)
return final_chunks
# Use RecursiveTokenChunker
recursive_chunker = RecursiveTokenChunker(chunk_size=100, chunk_overlap=0)
recursive_chunks = recursive_chunker.split_text(text)
print(f"RecursiveTokenChunker: {len(recursive_chunks)} chunks")
print("\nFirst 5 chunks:")
for i, chunk in enumerate(fixed_chunks[:5], 1):
print(f"\nChunk {i}:\n","-"*100)
print(chunk[:100] + "..." if len(chunk) > 100 else chunk)
RecursiveTokenChunker: 1274 chunks First 5 chunks: Chunk 1: ---------------------------------------------------------------------------------------------------- What I Worked On February 2021 Before college the two main things I worked on, outside of school... Chunk 2: ---------------------------------------------------------------------------------------------------- was then called "data processing." This was in 9th grade, so I was 13 or 14. The school district's ... Chunk 3: ---------------------------------------------------------------------------------------------------- we used was an early version of Fortran. You had to type programs on punch cards, then stack them i... Chunk 4: ---------------------------------------------------------------------------------------------------- punched cards, and I didn't have any data stored on punched cards. The only other option was to do ... Chunk 5: ---------------------------------------------------------------------------------------------------- . On a machine without time-sharing, this was a social as well as a technical error, as the data cen...
In [ ]:
Copied!
!pip install openai
!pip install anthropic
!pip install backoff
!pip install openai
!pip install anthropic
!pip install backoff
In [10]:
Copied!
import os
import openai
from getpass import getpass
if not (openai_api_key := os.getenv("OPENAI_API_KEY")):
openai_api_key = getpass("🔑 Enter your OpenAI API key: ")
openai.api_key = openai_api_key
os.environ["OPENAI_API_KEY"] = openai_api_key
import os
import openai
from getpass import getpass
if not (openai_api_key := os.getenv("OPENAI_API_KEY")):
openai_api_key = getpass("🔑 Enter your OpenAI API key: ")
openai.api_key = openai_api_key
os.environ["OPENAI_API_KEY"] = openai_api_key
Cluster Semantic Chunking¶
In [11]:
Copied!
import numpy as np
from openai import OpenAI
def get_openai_embedding_function(api_key):
client = OpenAI(api_key=api_key)
def embedding_function(texts):
response = client.embeddings.create(input=texts, model="text-embedding-3-small")
return [item.embedding for item in response.data]
return embedding_function
class ClusterSemanticChunker(BaseChunker):
def __init__(self, embedding_function, max_chunk_size=400, min_chunk_size=50):
self.splitter = RecursiveTokenChunker(chunk_size=min_chunk_size, chunk_overlap=0)
self.max_cluster = max_chunk_size // min_chunk_size
self.embedding_function = embedding_function
def _get_similarity_matrix(self, sentences):
BATCH_SIZE = 500
N = len(sentences)
embedding_matrix = None
for i in range(0, N, BATCH_SIZE):
batch_sentences = sentences[i:i+BATCH_SIZE]
embeddings = self.embedding_function(batch_sentences)
batch_embedding_matrix = np.array(embeddings)
if embedding_matrix is None:
embedding_matrix = batch_embedding_matrix
else:
embedding_matrix = np.concatenate((embedding_matrix, batch_embedding_matrix), axis=0)
similarity_matrix = np.dot(embedding_matrix, embedding_matrix.T)
return similarity_matrix
def _calculate_reward(self, matrix, start, end):
sub_matrix = matrix[start:end+1, start:end+1]
return np.sum(sub_matrix)
def _optimal_segmentation(self, matrix, max_cluster_size):
mean_value = np.mean(matrix[np.triu_indices(matrix.shape[0], k=1)])
matrix = matrix - mean_value
np.fill_diagonal(matrix, 0)
n = matrix.shape[0]
dp = np.zeros(n)
segmentation = np.zeros(n, dtype=int)
for i in range(n):
for size in range(1, max_cluster_size + 1):
if i - size + 1 >= 0:
reward = self._calculate_reward(matrix, i - size + 1, i)
adjusted_reward = reward
if i - size >= 0:
adjusted_reward += dp[i - size]
if adjusted_reward > dp[i]:
dp[i] = adjusted_reward
segmentation[i] = i - size + 1
clusters = []
i = n - 1
while i >= 0:
start = segmentation[i]
clusters.append((start, i))
i = start - 1
clusters.reverse()
return clusters
def split_text(self, text: str) -> List[str]:
sentences = self.splitter.split_text(text)
similarity_matrix = self._get_similarity_matrix(sentences)
clusters = self._optimal_segmentation(similarity_matrix, max_cluster_size=self.max_cluster)
docs = [' '.join(sentences[start:end+1]) for start, end in clusters]
return docs
# Use ClusterSemanticChunker
# api_key = "your_openai_api_key_here" # Replace with your actual OpenAI API key
embedding_function = get_openai_embedding_function(api_key = openai.api_key)
cluster_chunker = ClusterSemanticChunker(embedding_function=embedding_function, max_chunk_size=400, min_chunk_size=50)
cluster_chunks = cluster_chunker.split_text(text)
print(f"ClusterSemanticChunker: {len(cluster_chunks)} chunks")
print("\nFirst 5 chunks:")
for i, chunk in enumerate(fixed_chunks[:5], 1):
print(f"\nChunk {i}:\n","-"*100)
print(chunk[:100] + "..." if len(chunk) > 100 else chunk)
import numpy as np
from openai import OpenAI
def get_openai_embedding_function(api_key):
client = OpenAI(api_key=api_key)
def embedding_function(texts):
response = client.embeddings.create(input=texts, model="text-embedding-3-small")
return [item.embedding for item in response.data]
return embedding_function
class ClusterSemanticChunker(BaseChunker):
def __init__(self, embedding_function, max_chunk_size=400, min_chunk_size=50):
self.splitter = RecursiveTokenChunker(chunk_size=min_chunk_size, chunk_overlap=0)
self.max_cluster = max_chunk_size // min_chunk_size
self.embedding_function = embedding_function
def _get_similarity_matrix(self, sentences):
BATCH_SIZE = 500
N = len(sentences)
embedding_matrix = None
for i in range(0, N, BATCH_SIZE):
batch_sentences = sentences[i:i+BATCH_SIZE]
embeddings = self.embedding_function(batch_sentences)
batch_embedding_matrix = np.array(embeddings)
if embedding_matrix is None:
embedding_matrix = batch_embedding_matrix
else:
embedding_matrix = np.concatenate((embedding_matrix, batch_embedding_matrix), axis=0)
similarity_matrix = np.dot(embedding_matrix, embedding_matrix.T)
return similarity_matrix
def _calculate_reward(self, matrix, start, end):
sub_matrix = matrix[start:end+1, start:end+1]
return np.sum(sub_matrix)
def _optimal_segmentation(self, matrix, max_cluster_size):
mean_value = np.mean(matrix[np.triu_indices(matrix.shape[0], k=1)])
matrix = matrix - mean_value
np.fill_diagonal(matrix, 0)
n = matrix.shape[0]
dp = np.zeros(n)
segmentation = np.zeros(n, dtype=int)
for i in range(n):
for size in range(1, max_cluster_size + 1):
if i - size + 1 >= 0:
reward = self._calculate_reward(matrix, i - size + 1, i)
adjusted_reward = reward
if i - size >= 0:
adjusted_reward += dp[i - size]
if adjusted_reward > dp[i]:
dp[i] = adjusted_reward
segmentation[i] = i - size + 1
clusters = []
i = n - 1
while i >= 0:
start = segmentation[i]
clusters.append((start, i))
i = start - 1
clusters.reverse()
return clusters
def split_text(self, text: str) -> List[str]:
sentences = self.splitter.split_text(text)
similarity_matrix = self._get_similarity_matrix(sentences)
clusters = self._optimal_segmentation(similarity_matrix, max_cluster_size=self.max_cluster)
docs = [' '.join(sentences[start:end+1]) for start, end in clusters]
return docs
# Use ClusterSemanticChunker
# api_key = "your_openai_api_key_here" # Replace with your actual OpenAI API key
embedding_function = get_openai_embedding_function(api_key = openai.api_key)
cluster_chunker = ClusterSemanticChunker(embedding_function=embedding_function, max_chunk_size=400, min_chunk_size=50)
cluster_chunks = cluster_chunker.split_text(text)
print(f"ClusterSemanticChunker: {len(cluster_chunks)} chunks")
print("\nFirst 5 chunks:")
for i, chunk in enumerate(fixed_chunks[:5], 1):
print(f"\nChunk {i}:\n","-"*100)
print(chunk[:100] + "..." if len(chunk) > 100 else chunk)
ClusterSemanticChunker: 680 chunks First 5 chunks: Chunk 1: ---------------------------------------------------------------------------------------------------- What I Worked On February 2021 Before college the two main things I worked on, outside of school... Chunk 2: ---------------------------------------------------------------------------------------------------- was then called "data processing." This was in 9th grade, so I was 13 or 14. The school district's ... Chunk 3: ---------------------------------------------------------------------------------------------------- we used was an early version of Fortran. You had to type programs on punch cards, then stack them i... Chunk 4: ---------------------------------------------------------------------------------------------------- punched cards, and I didn't have any data stored on punched cards. The only other option was to do ... Chunk 5: ---------------------------------------------------------------------------------------------------- . On a machine without time-sharing, this was a social as well as a technical error, as the data cen...
LLM Semantic Chunking¶
In [16]:
Copied!
import anthropic
import backoff
from tqdm import tqdm
class AnthropicClient:
def __init__(self, model_name, api_key):
self.client = anthropic.Anthropic(api_key=api_key)
self.model_name = model_name
@backoff.on_exception(backoff.expo, Exception, max_tries=3)
def create_message(self, system_prompt, messages, max_tokens=1000, temperature=1.0):
try:
message = self.client.messages.create(
model=self.model_name,
max_tokens=max_tokens,
temperature=temperature,
system=system_prompt,
messages=messages
)
return message.content[0].text
except Exception as e:
print(f"Error occurred: {e}, retrying...")
raise e
class OpenAIClient:
def __init__(self, model_name, api_key):
self.client = OpenAI(api_key=api_key)
self.model_name = model_name
@backoff.on_exception(backoff.expo, Exception, max_tries=3)
def create_message(self, system_prompt, messages, max_tokens=1000, temperature=1.0):
try:
gpt_messages = [
{"role": "system", "content": system_prompt}
] + messages
completion = self.client.chat.completions.create(
model=self.model_name,
max_tokens=max_tokens,
messages=gpt_messages,
temperature=temperature
)
return completion.choices[0].message.content
except Exception as e:
print(f"Error occurred: {e}, retrying...")
raise e
class LLMSemanticChunker(BaseChunker):
def __init__(self, organisation="openai", api_key=None, model_name=None):
if organisation == "openai":
if model_name is None:
model_name = "gpt-4"
self.client = OpenAIClient(model_name, api_key=api_key)
elif organisation == "anthropic":
if model_name is None:
model_name = "claude-3-opus-20240229"
self.client = AnthropicClient(model_name, api_key=api_key)
else:
raise ValueError("Invalid organisation. Please choose either 'openai' or 'anthropic'.")
self.splitter = RecursiveTokenChunker(chunk_size=50, chunk_overlap=0)
def get_prompt(self, chunked_input, current_chunk=0, invalid_response=None):
messages = [
{
"role": "system",
"content": (
"You are an assistant specialized in splitting text into thematically consistent sections. "
"The text has been divided into chunks, each marked with <|start_chunk_X|> and <|end_chunk_X|> tags, where X is the chunk number. "
"Your task is to identify the points where splits should occur, such that consecutive chunks of similar themes stay together. "
"Respond with a list of chunk IDs where you believe a split should be made. For example, if chunks 1 and 2 belong together but chunk 3 starts a new topic, you would suggest a split after chunk 2. THE CHUNKS MUST BE IN ASCENDING ORDER."
"Your response should be in the form: 'split_after: 3, 5'."
)
},
{
"role": "user",
"content": (
"CHUNKED_TEXT: " + chunked_input + "\n\n"
"Respond only with the IDs of the chunks where you believe a split should occur. YOU MUST RESPOND WITH AT LEAST ONE SPLIT. THESE SPLITS MUST BE IN ASCENDING ORDER AND EQUAL OR LARGER THAN: " + str(current_chunk)+"." + (f"\n\The previous response of {invalid_response} was invalid. DO NOT REPEAT THIS ARRAY OF NUMBERS. Please try again." if invalid_response else "")
)
},
]
return messages
def split_text(self, text):
import re
chunks = self.splitter.split_text(text)
split_indices = []
current_chunk = 0
with tqdm(total=len(chunks), desc="Processing chunks") as pbar:
while True:
if current_chunk >= len(chunks) - 4:
break
token_count = 0
chunked_input = ''
for i in range(current_chunk, len(chunks)):
token_count += len(chunks[i].split())
chunked_input += f"<|start_chunk_{i+1}|>{chunks[i]}<|end_chunk_{i+1}|>"
if token_count > 800:
break
messages = self.get_prompt(chunked_input, current_chunk)
while True:
result_string = self.client.create_message(messages[0]['content'], messages[1:], max_tokens=200, temperature=0.2)
split_after_line = [line for line in result_string.split('\n') if 'split_after:' in line][0]
numbers = re.findall(r'\d+', split_after_line)
numbers = list(map(int, numbers))
if not (numbers != sorted(numbers) or any(number < current_chunk for number in numbers)):
break
else:
messages = self.get_prompt(chunked_input, current_chunk, numbers)
print("Response: ", result_string)
print("Invalid response. Please try again.")
split_indices.extend(numbers)
current_chunk = numbers[-1]
pbar.update(current_chunk - pbar.n)
chunks_to_split_after = [i - 1 for i in split_indices]
docs = []
current_chunk = ''
for i, chunk in enumerate(chunks):
current_chunk += chunk + ' '
if i in chunks_to_split_after:
docs.append(current_chunk.strip())
current_chunk = ''
if current_chunk:
docs.append(current_chunk.strip())
return docs
# Use LLMSemanticChunker
# api_key = "your_openai_api_key_here" # Replace with your actual OpenAI API key
api_key = openai.api_key
llm_chunker = LLMSemanticChunker(organisation="openai", api_key=api_key)
llm_chunks = llm_chunker.split_text(text)
print(f"LLMSemanticChunker: {len(llm_chunks)} chunks")
print("\nFirst 5 chunks:")
for i, chunk in enumerate(fixed_chunks[:5], 1):
print(f"\nChunk {i}:\n","-"*100)
print(chunk[:100] + "..." if len(chunk) > 100 else chunk)
import anthropic
import backoff
from tqdm import tqdm
class AnthropicClient:
def __init__(self, model_name, api_key):
self.client = anthropic.Anthropic(api_key=api_key)
self.model_name = model_name
@backoff.on_exception(backoff.expo, Exception, max_tries=3)
def create_message(self, system_prompt, messages, max_tokens=1000, temperature=1.0):
try:
message = self.client.messages.create(
model=self.model_name,
max_tokens=max_tokens,
temperature=temperature,
system=system_prompt,
messages=messages
)
return message.content[0].text
except Exception as e:
print(f"Error occurred: {e}, retrying...")
raise e
class OpenAIClient:
def __init__(self, model_name, api_key):
self.client = OpenAI(api_key=api_key)
self.model_name = model_name
@backoff.on_exception(backoff.expo, Exception, max_tries=3)
def create_message(self, system_prompt, messages, max_tokens=1000, temperature=1.0):
try:
gpt_messages = [
{"role": "system", "content": system_prompt}
] + messages
completion = self.client.chat.completions.create(
model=self.model_name,
max_tokens=max_tokens,
messages=gpt_messages,
temperature=temperature
)
return completion.choices[0].message.content
except Exception as e:
print(f"Error occurred: {e}, retrying...")
raise e
class LLMSemanticChunker(BaseChunker):
def __init__(self, organisation="openai", api_key=None, model_name=None):
if organisation == "openai":
if model_name is None:
model_name = "gpt-4"
self.client = OpenAIClient(model_name, api_key=api_key)
elif organisation == "anthropic":
if model_name is None:
model_name = "claude-3-opus-20240229"
self.client = AnthropicClient(model_name, api_key=api_key)
else:
raise ValueError("Invalid organisation. Please choose either 'openai' or 'anthropic'.")
self.splitter = RecursiveTokenChunker(chunk_size=50, chunk_overlap=0)
def get_prompt(self, chunked_input, current_chunk=0, invalid_response=None):
messages = [
{
"role": "system",
"content": (
"You are an assistant specialized in splitting text into thematically consistent sections. "
"The text has been divided into chunks, each marked with <|start_chunk_X|> and <|end_chunk_X|> tags, where X is the chunk number. "
"Your task is to identify the points where splits should occur, such that consecutive chunks of similar themes stay together. "
"Respond with a list of chunk IDs where you believe a split should be made. For example, if chunks 1 and 2 belong together but chunk 3 starts a new topic, you would suggest a split after chunk 2. THE CHUNKS MUST BE IN ASCENDING ORDER."
"Your response should be in the form: 'split_after: 3, 5'."
)
},
{
"role": "user",
"content": (
"CHUNKED_TEXT: " + chunked_input + "\n\n"
"Respond only with the IDs of the chunks where you believe a split should occur. YOU MUST RESPOND WITH AT LEAST ONE SPLIT. THESE SPLITS MUST BE IN ASCENDING ORDER AND EQUAL OR LARGER THAN: " + str(current_chunk)+"." + (f"\n\The previous response of {invalid_response} was invalid. DO NOT REPEAT THIS ARRAY OF NUMBERS. Please try again." if invalid_response else "")
)
},
]
return messages
def split_text(self, text):
import re
chunks = self.splitter.split_text(text)
split_indices = []
current_chunk = 0
with tqdm(total=len(chunks), desc="Processing chunks") as pbar:
while True:
if current_chunk >= len(chunks) - 4:
break
token_count = 0
chunked_input = ''
for i in range(current_chunk, len(chunks)):
token_count += len(chunks[i].split())
chunked_input += f"<|start_chunk_{i+1}|>{chunks[i]}<|end_chunk_{i+1}|>"
if token_count > 800:
break
messages = self.get_prompt(chunked_input, current_chunk)
while True:
result_string = self.client.create_message(messages[0]['content'], messages[1:], max_tokens=200, temperature=0.2)
split_after_line = [line for line in result_string.split('\n') if 'split_after:' in line][0]
numbers = re.findall(r'\d+', split_after_line)
numbers = list(map(int, numbers))
if not (numbers != sorted(numbers) or any(number < current_chunk for number in numbers)):
break
else:
messages = self.get_prompt(chunked_input, current_chunk, numbers)
print("Response: ", result_string)
print("Invalid response. Please try again.")
split_indices.extend(numbers)
current_chunk = numbers[-1]
pbar.update(current_chunk - pbar.n)
chunks_to_split_after = [i - 1 for i in split_indices]
docs = []
current_chunk = ''
for i, chunk in enumerate(chunks):
current_chunk += chunk + ' '
if i in chunks_to_split_after:
docs.append(current_chunk.strip())
current_chunk = ''
if current_chunk:
docs.append(current_chunk.strip())
return docs
# Use LLMSemanticChunker
# api_key = "your_openai_api_key_here" # Replace with your actual OpenAI API key
api_key = openai.api_key
llm_chunker = LLMSemanticChunker(organisation="openai", api_key=api_key)
llm_chunks = llm_chunker.split_text(text)
print(f"LLMSemanticChunker: {len(llm_chunks)} chunks")
print("\nFirst 5 chunks:")
for i, chunk in enumerate(fixed_chunks[:5], 1):
print(f"\nChunk {i}:\n","-"*100)
print(chunk[:100] + "..." if len(chunk) > 100 else chunk)
Processing chunks: 100%|█████████▉| 2427/2428 [01:22<00:00, 29.49it/s]
LLMSemanticChunker: 355 chunks First 5 chunks: Chunk 1: ---------------------------------------------------------------------------------------------------- What I Worked On February 2021 Before college the two main things I worked on, outside of school... Chunk 2: ---------------------------------------------------------------------------------------------------- was then called "data processing." This was in 9th grade, so I was 13 or 14. The school district's ... Chunk 3: ---------------------------------------------------------------------------------------------------- we used was an early version of Fortran. You had to type programs on punch cards, then stack them i... Chunk 4: ---------------------------------------------------------------------------------------------------- punched cards, and I didn't have any data stored on punched cards. The only other option was to do ... Chunk 5: ---------------------------------------------------------------------------------------------------- . On a machine without time-sharing, this was a social as well as a technical error, as the data cen...
In [18]:
Copied!
Output saved to llm_semantic_chunker_output.json
Kamradt Modified Chunker¶
In [19]:
Copied!
class KamradtModifiedChunker(BaseChunker):
def __init__(self, avg_chunk_size=400, min_chunk_size=50, embedding_function=None):
self.splitter = RecursiveTokenChunker(
chunk_size=min_chunk_size,
chunk_overlap=0,
)
self.avg_chunk_size = avg_chunk_size
if embedding_function is None:
embedding_function = get_openai_embedding_function(api_key) # Use the same API key as before
self.embedding_function = embedding_function
def combine_sentences(self, sentences, buffer_size=1):
for i in range(len(sentences)):
combined_sentence = ''
for j in range(i - buffer_size, i + 1 + buffer_size):
if 0 <= j < len(sentences):
combined_sentence += sentences[j]['sentence'] + ' '
sentences[i]['combined_sentence'] = combined_sentence.strip()
return sentences
def calculate_cosine_distances(self, sentences):
BATCH_SIZE = 500
distances = []
embedding_matrix = None
for i in range(0, len(sentences), BATCH_SIZE):
batch_sentences = sentences[i:i+BATCH_SIZE]
batch_sentences = [sentence['combined_sentence'] for sentence in batch_sentences]
embeddings = self.embedding_function(batch_sentences)
batch_embedding_matrix = np.array(embeddings)
if embedding_matrix is None:
embedding_matrix = batch_embedding_matrix
else:
embedding_matrix = np.concatenate((embedding_matrix, batch_embedding_matrix), axis=0)
norms = np.linalg.norm(embedding_matrix, axis=1, keepdims=True)
embedding_matrix = embedding_matrix / norms
similarity_matrix = np.dot(embedding_matrix, embedding_matrix.T)
for i in range(len(sentences) - 1):
similarity = similarity_matrix[i, i + 1]
distance = 1 - similarity
distances.append(distance)
sentences[i]['distance_to_next'] = distance
return distances, sentences
def split_text(self, text):
sentences_strips = self.splitter.split_text(text)
sentences = [{'sentence': x, 'index': i} for i, x in enumerate(sentences_strips)]
sentences = self.combine_sentences(sentences, 3)
distances, sentences = self.calculate_cosine_distances(sentences)
total_tokens = sum(len(sentence['sentence'].split()) for sentence in sentences)
number_of_cuts = total_tokens // self.avg_chunk_size
lower_limit, upper_limit = 0.0, 1.0
distances_np = np.array(distances)
while upper_limit - lower_limit > 1e-6:
threshold = (upper_limit + lower_limit) / 2.0
num_points_above_threshold = np.sum(distances_np > threshold)
if num_points_above_threshold > number_of_cuts:
lower_limit = threshold
else:
upper_limit = threshold
indices_above_thresh = [i for i, x in enumerate(distances) if x > threshold]
chunks = []
start_index = 0
for index in indices_above_thresh:
group = sentences[start_index:index + 1]
combined_text = ' '.join([d['sentence'] for d in group])
chunks.append(combined_text)
start_index = index + 1
if start_index < len(sentences):
combined_text = ' '.join([d['sentence'] for d in sentences[start_index:]])
chunks.append(combined_text)
return chunks
# Use KamradtModifiedChunker
kamradt_chunker = KamradtModifiedChunker(avg_chunk_size=300, min_chunk_size=50)
kamradt_chunks = kamradt_chunker.split_text(text)
print(f"KamradtModifiedChunker: {len(kamradt_chunks)} chunks")
print("First chunk:", kamradt_chunks[0][:100] + "..." if len(kamradt_chunks[0]) > 100 else kamradt_chunks[0])
class KamradtModifiedChunker(BaseChunker):
def __init__(self, avg_chunk_size=400, min_chunk_size=50, embedding_function=None):
self.splitter = RecursiveTokenChunker(
chunk_size=min_chunk_size,
chunk_overlap=0,
)
self.avg_chunk_size = avg_chunk_size
if embedding_function is None:
embedding_function = get_openai_embedding_function(api_key) # Use the same API key as before
self.embedding_function = embedding_function
def combine_sentences(self, sentences, buffer_size=1):
for i in range(len(sentences)):
combined_sentence = ''
for j in range(i - buffer_size, i + 1 + buffer_size):
if 0 <= j < len(sentences):
combined_sentence += sentences[j]['sentence'] + ' '
sentences[i]['combined_sentence'] = combined_sentence.strip()
return sentences
def calculate_cosine_distances(self, sentences):
BATCH_SIZE = 500
distances = []
embedding_matrix = None
for i in range(0, len(sentences), BATCH_SIZE):
batch_sentences = sentences[i:i+BATCH_SIZE]
batch_sentences = [sentence['combined_sentence'] for sentence in batch_sentences]
embeddings = self.embedding_function(batch_sentences)
batch_embedding_matrix = np.array(embeddings)
if embedding_matrix is None:
embedding_matrix = batch_embedding_matrix
else:
embedding_matrix = np.concatenate((embedding_matrix, batch_embedding_matrix), axis=0)
norms = np.linalg.norm(embedding_matrix, axis=1, keepdims=True)
embedding_matrix = embedding_matrix / norms
similarity_matrix = np.dot(embedding_matrix, embedding_matrix.T)
for i in range(len(sentences) - 1):
similarity = similarity_matrix[i, i + 1]
distance = 1 - similarity
distances.append(distance)
sentences[i]['distance_to_next'] = distance
return distances, sentences
def split_text(self, text):
sentences_strips = self.splitter.split_text(text)
sentences = [{'sentence': x, 'index': i} for i, x in enumerate(sentences_strips)]
sentences = self.combine_sentences(sentences, 3)
distances, sentences = self.calculate_cosine_distances(sentences)
total_tokens = sum(len(sentence['sentence'].split()) for sentence in sentences)
number_of_cuts = total_tokens // self.avg_chunk_size
lower_limit, upper_limit = 0.0, 1.0
distances_np = np.array(distances)
while upper_limit - lower_limit > 1e-6:
threshold = (upper_limit + lower_limit) / 2.0
num_points_above_threshold = np.sum(distances_np > threshold)
if num_points_above_threshold > number_of_cuts:
lower_limit = threshold
else:
upper_limit = threshold
indices_above_thresh = [i for i, x in enumerate(distances) if x > threshold]
chunks = []
start_index = 0
for index in indices_above_thresh:
group = sentences[start_index:index + 1]
combined_text = ' '.join([d['sentence'] for d in group])
chunks.append(combined_text)
start_index = index + 1
if start_index < len(sentences):
combined_text = ' '.join([d['sentence'] for d in sentences[start_index:]])
chunks.append(combined_text)
return chunks
# Use KamradtModifiedChunker
kamradt_chunker = KamradtModifiedChunker(avg_chunk_size=300, min_chunk_size=50)
kamradt_chunks = kamradt_chunker.split_text(text)
print(f"KamradtModifiedChunker: {len(kamradt_chunks)} chunks")
print("First chunk:", kamradt_chunks[0][:100] + "..." if len(kamradt_chunks[0]) > 100 else kamradt_chunks[0])
KamradtModifiedChunker: 49 chunks First chunk: What I Worked On February 2021 Before college the two main things I worked on, outside of school, w...
In [20]:
Copied!
import json
# After running the LLMSemanticChunker
output = {
"chunker": "LLMSemanticChunker",
"total_chunks": len(llm_chunks),
"first_5_chunks": [
{
"chunk_number": i,
"content": chunk
}
for i, chunk in enumerate(kamradt_chunks, 1)
]
}
# Save the output as a JSON file
with open('llm_semantic_chunker_output.json', 'w') as f:
json.dump(output, f, indent=2)
print("Output saved to llm_semantic_chunker_output.json")
import json
# After running the LLMSemanticChunker
output = {
"chunker": "LLMSemanticChunker",
"total_chunks": len(llm_chunks),
"first_5_chunks": [
{
"chunk_number": i,
"content": chunk
}
for i, chunk in enumerate(kamradt_chunks, 1)
]
}
# Save the output as a JSON file
with open('llm_semantic_chunker_output.json', 'w') as f:
json.dump(output, f, indent=2)
print("Output saved to llm_semantic_chunker_output.json")
Output saved to llm_semantic_chunker_output.json
Llama Index Based Chunking¶
In [ ]:
Copied!
# Standard library imports
import logging
import sys
import os
# Third-party imports
from dotenv import load_dotenv
from IPython.display import Markdown, display
# Qdrant client import
import qdrant_client
# LlamaIndex core imports
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader
from llama_index.core import Settings
# LlamaIndex vector store import
from llama_index.vector_stores.qdrant import QdrantVectorStore
# Embedding model imports
from llama_index.embeddings.fastembed import FastEmbedEmbedding
from llama_index.embeddings.openai import OpenAIEmbedding
# LLM import
from llama_index.llms.openai import OpenAI
# Load environment variables
load_dotenv()
# Get OpenAI API key from environment variables
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
# Set the embedding model
# Option 1: Use FastEmbed with BAAI/bge-base-en-v1.5 model (default)
Settings.embed_model = FastEmbedEmbedding(model_name="BAAI/bge-base-en-v1.5")
# Option 2: Use OpenAI's embedding model (commented out)
# If you want to use OpenAI's embedding model, uncomment the following line:
# Settings.embed_model = OpenAIEmbedding(embed_batch_size=10, api_key=OPENAI_API_KEY)
# Qdrant configuration (commented out)
# If you're using Qdrant, uncomment and set these variables:
# QDRANT_CLOUD_ENDPOINT = os.getenv("QDRANT_CLOUD_ENDPOINT")
# QDRANT_API_KEY = os.getenv("QDRANT_API_KEY")
# Note: Remember to add QDRANT_CLOUD_ENDPOINT and QDRANT_API_KEY to your .env file if using Qdrant Hosted version
# Standard library imports
import logging
import sys
import os
# Third-party imports
from dotenv import load_dotenv
from IPython.display import Markdown, display
# Qdrant client import
import qdrant_client
# LlamaIndex core imports
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader
from llama_index.core import Settings
# LlamaIndex vector store import
from llama_index.vector_stores.qdrant import QdrantVectorStore
# Embedding model imports
from llama_index.embeddings.fastembed import FastEmbedEmbedding
from llama_index.embeddings.openai import OpenAIEmbedding
# LLM import
from llama_index.llms.openai import OpenAI
# Load environment variables
load_dotenv()
# Get OpenAI API key from environment variables
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
# Set the embedding model
# Option 1: Use FastEmbed with BAAI/bge-base-en-v1.5 model (default)
Settings.embed_model = FastEmbedEmbedding(model_name="BAAI/bge-base-en-v1.5")
# Option 2: Use OpenAI's embedding model (commented out)
# If you want to use OpenAI's embedding model, uncomment the following line:
# Settings.embed_model = OpenAIEmbedding(embed_batch_size=10, api_key=OPENAI_API_KEY)
# Qdrant configuration (commented out)
# If you're using Qdrant, uncomment and set these variables:
# QDRANT_CLOUD_ENDPOINT = os.getenv("QDRANT_CLOUD_ENDPOINT")
# QDRANT_API_KEY = os.getenv("QDRANT_API_KEY")
# Note: Remember to add QDRANT_CLOUD_ENDPOINT and QDRANT_API_KEY to your .env file if using Qdrant Hosted version
In [ ]:
Copied!
# lets loading the documents using SimpleDirectoryReader
from llama_index.core import Document
reader = SimpleDirectoryReader("../data/" , recursive=True)
documents = reader.load_data(show_progress=True)
# combining all the documents into a single document for later chunking and splitting
documents = Document(text="\n\n".join([doc.text for doc in documents]))
# lets loading the documents using SimpleDirectoryReader
from llama_index.core import Document
reader = SimpleDirectoryReader("../data/" , recursive=True)
documents = reader.load_data(show_progress=True)
# combining all the documents into a single document for later chunking and splitting
documents = Document(text="\n\n".join([doc.text for doc in documents]))
In [ ]:
Copied!
## ingesting data into vector database
## lets set up an ingestion pipeline
from llama_index.core.node_parser import TokenTextSplitter
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.node_parser import MarkdownNodeParser
from llama_index.core.node_parser import SemanticSplitterNodeParser
from llama_index.core.ingestion import IngestionPipeline
pipeline = IngestionPipeline(
transformations=[
# MarkdownNodeParser(include_metadata=True),
# TokenTextSplitter(chunk_size=500, chunk_overlap=20),
SentenceSplitter(chunk_size=1024, chunk_overlap=20),
# SemanticSplitterNodeParser(buffer_size=1, breakpoint_percentile_threshold=95 , embed_model=Settings.embed_model),
Settings.embed_model,
]
)
# Ingest directly into a vector db
nodes = pipeline.run(documents=[documents] , show_progress=True)
print("Number of chunks added to vector DB :",len(nodes))
## ingesting data into vector database
## lets set up an ingestion pipeline
from llama_index.core.node_parser import TokenTextSplitter
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.node_parser import MarkdownNodeParser
from llama_index.core.node_parser import SemanticSplitterNodeParser
from llama_index.core.ingestion import IngestionPipeline
pipeline = IngestionPipeline(
transformations=[
# MarkdownNodeParser(include_metadata=True),
# TokenTextSplitter(chunk_size=500, chunk_overlap=20),
SentenceSplitter(chunk_size=1024, chunk_overlap=20),
# SemanticSplitterNodeParser(buffer_size=1, breakpoint_percentile_threshold=95 , embed_model=Settings.embed_model),
Settings.embed_model,
]
)
# Ingest directly into a vector db
nodes = pipeline.run(documents=[documents] , show_progress=True)
print("Number of chunks added to vector DB :",len(nodes))
In [ ]:
Copied!
print(nodes)
print(nodes)
Reference:
Smith, Brandon, and Anton Troynikov. "Evaluating Chunking Strategies for Retrieval." Chroma, July 2024. https://research.trychroma.com/evaluating-chunking.