diff --git a/README.md b/README.md
index 61127e2..d4236b8 100644
--- a/README.md
+++ b/README.md
@@ -7,7 +7,7 @@ It is intended that the plugins and skills provided in this repository, are adap
## Components
- `./text_2_sql` contains an three Multi-Shot implementations for Text2SQL generation and querying which can be used to answer questions backed by a database as a knowledge base. A **prompt based** and **vector based** approach are shown, both of which exhibit great performance in answering sql queries. Additionally, a further iteration on the vector based approach is shown which uses a **query cache** to further speed up generation. With these plugins, your RAG application can now access and pull data from any SQL table exposed to it to answer questions.
-- `./adi_function_app` contains code for linking **Azure Document Intelligence** with AI Search to process complex documents with charts and images, and uses **multi-modal models (gpt4o)** to interpret and understand these. With this custom skill, the RAG application can **draw insights from complex charts** and images during the vector search.
+- `./adi_function_app` contains code for linking **Azure Document Intelligence** with AI Search to process complex documents with charts and images, and uses **multi-modal models (gpt4o)** to interpret and understand these. With this custom skill, the RAG application can **draw insights from complex charts** and images during the vector search. This function app also contains a **Semantic Text Chunking** method that aims to intelligently group similar sentences, retaining figures and tables together, whilst separating out distinct sentences.
- `./deploy_ai_search` provides an easy Python based utility for deploying an index, indexer and corresponding skillset for AI Search and for Text2SQL.
The above components have been successfully used on production RAG projects to increase the quality of responses.
diff --git a/adi_function_app/README.md b/adi_function_app/README.md
index b34c5a7..673d8a6 100644
--- a/adi_function_app/README.md
+++ b/adi_function_app/README.md
@@ -24,13 +24,21 @@ Once the Markdown is obtained, several steps are carried out:
1. **Extraction of images / charts**. The figures identified are extracted from the original document and passed to a multi-modal model (gpt4o in this case) for analysis. We obtain a description and summary of the chart / image to infer the meaning of the figure. This allows us to index and perform RAG analysis the information that is visually obtainable from a chart, without it being explicitly mentioned in the text surrounding. The information is added back into the original chart.
-2. **Cleaning of Markdown**. The final markdown content is cleaned of any characters or unsupported Markdown elements that we do not want in the chunk e.g. non-relevant images.
+2. **Chunking**. The obtained content is chunked accordingly depending on the chunking strategy. This function app supports two chunking methods, **page wise** and **semantic chunking**. The page wise chunking is performed natively by Azure Document Intelligence. For a Semantic Chunking, we include a customer chunker that splits the text with the following strategy:
-Page wise analysis in ADI is used to avoid splitting tables / figures across multiple chunks, when the chunking is performed.
+ - Splits text into sentences.
+ - Groups sentences if they are table or figure related to avoid splitting them in context.
+ - Semanticly groups sentences if the similarity is above the threshold, starting from the start of the text.
+ - Semanticly groups sentences if the similarity is above the threshold, starting from the end of the text.
+ - Removes non-existent chunks.
-The properties returned from the ADI Custom Skill are then used to perform the following skills:
+ This chunking method aims to improve on page wise chunking, whilst still retaining similar sentences together. When tested, this method shows great performance improvements, over straight page wise chunking, without splitting up the context when relevant.
-- Pre-vectorisation cleaning. This stage is important as we extract the section information in this step from the headers in the document. Additionally, we remove any Markdown tags or characters that would cause an embedding error.
+3. **Cleaning of Markdown**. The final markdown content is cleaned of any characters or unsupported Markdown elements that we do not want in the chunk e.g. non-relevant images.
+
+The properties returned from the ADI Custom Skill and Chunking are then used to perform the following skills:
+
+- Markup cleaning. This stage is important as we extract the section information in this step from the headers in the document. Additionally, we remove any Markdown tags or characters that would cause an embedding error.
- Keyphrase extraction
- Vectorisation
@@ -49,18 +57,24 @@ The Figure 4 content has been interpreted and added into the extracted chunk to
## Provided Notebooks \& Utilities
-- `./ai_search_with_adi_function_app` provides a pre-built Python function app that communicates with Azure Document Intelligence, Azure OpenAI etc to perform the Markdown conversion, extraction of figures, figure understanding and corresponding cleaning of Markdown.
+- `./function_app` provides a pre-built Python function app that communicates with Azure Document Intelligence, Azure OpenAI etc to perform the Markdown conversion, extraction of figures, figure understanding and corresponding cleaning of Markdown.
- `./rag_with_ai_search.ipynb` provides example of how to utilise the AI Search plugin to query the index.
## Deploying AI Search Setup
To deploy the pre-built index and associated indexer / skillset setup, see instructions in `./deploy_ai_search/README.md`.
-## ADI Custom Skill
+## Custom Skills
+
+Deploy the associated function app and the resources. To use with an index, either use the utility to configure a indexer in the provided form, or integrate the skill with your skillset pipeline.
+
+### ADI Custom Skill
-Deploy the associated function app and required resources. You can then experiment with the custom skill by sending an HTTP request in the AI Search JSON format to the `/adi_2_ai_search` HTTP endpoint.
+You can then experiment with the custom skill by sending an HTTP request in the AI Search JSON format to the `/adi_2_ai_search` HTTP endpoint. The header controls the chunking technique *(page wise or not)*.
-To use with an index, either use the utility to configure a indexer in the provided form, or integrate the skill with your skillset pipeline.
+### Semantic Chunker Skill
+
+You can then test the chunking by sending a AI Search JSON format to the `/semantic_text_chunker/ HTTP endpoint. The header controls the different chunking parameters *(num_surrounding_sentences, similarity_threshold, max_chunk_tokens, min_chunk_tokens)*.
### Deployment Steps
@@ -72,11 +86,15 @@ To use with an index, either use the utility to configure a indexer in the provi
#### function_app.py
-`./indexer/ai_search_with_adi_function_app.py` contains the HTTP entrypoints for the ADI skill and the other provided utility skills.
+`./indexer/function_app.py` contains the HTTP entrypoints for the ADI skill and the other provided utility skills.
+
+#### semantic_text_chunker.py
-#### adi_2_aisearch
+`./semantic_text_chunker.py` contains the code to chunk the text semantically, whilst grouping similar sentences.
-`./indexer/adi_2_aisearch.py` contains the methods for content extraction with ADI. The key methods are:
+#### adi_2_ai_search.py
+
+`./indexer/adi_2_ai_search.py` contains the methods for content extraction with ADI. The key methods are:
##### analyse_document
@@ -183,8 +201,6 @@ If `chunk_by_page` header is `False`:
}
```
-**Page wise analysis in ADI is recommended to avoid splitting tables / figures across multiple chunks, when the chunking is performed.**
-
## Other Provided Custom Skills
Due to a AI Search product limitation that AI Search cannot connect to AI Services behind Private Endpoints, we provide a Custom Key Phrase Extraction Skill that will work within a Private Endpoint environment.
diff --git a/adi_function_app/adi_2_ai_search.py b/adi_function_app/adi_2_ai_search.py
index 3b7c509..154b63b 100644
--- a/adi_function_app/adi_2_ai_search.py
+++ b/adi_function_app/adi_2_ai_search.py
@@ -23,7 +23,6 @@
async def build_and_clean_markdown_for_response(
markdown_text: str,
- figures: dict,
page_no: int = None,
remove_irrelevant_figures=False,
):
@@ -39,28 +38,33 @@ async def build_and_clean_markdown_for_response(
str: The cleaned Markdown text.
"""
- output_dict = {}
- comment_patterns = r"||||"
- cleaned_text = re.sub(comment_patterns, "", markdown_text, flags=re.DOTALL)
+ # Pattern to match the comment start ``
+ # Matches opening `|\<)"
+
+ # Using re.sub to remove comments
+ cleaned_text = re.sub(
+ f"{comment_start_pattern}.*?{comment_end_pattern}", "", markdown_text
+ )
# Remove irrelevant figures
if remove_irrelevant_figures:
- irrelevant_figure_pattern = r"\s*"
+ irrelevant_figure_pattern = r"]*>.*?Irrelevant Image.*?"
cleaned_text = re.sub(
irrelevant_figure_pattern, "", cleaned_text, flags=re.DOTALL
)
logging.info(f"Cleaned Text: {cleaned_text}")
- output_dict["content"] = cleaned_text
-
- output_dict["figures"] = figures
-
# add page number when chunk by page is enabled
if page_no is not None:
+ output_dict = {}
+ output_dict["content"] = cleaned_text
output_dict["pageNumber"] = page_no
-
- return output_dict
+ return output_dict
+ else:
+ return cleaned_text
def update_figure_description(
@@ -323,23 +327,15 @@ async def process_figures_from_extracted_content(
)
)
- figure_ids = [
- figure_processing_data[0] for figure_processing_data in figure_processing_datas
- ]
logging.info("Running image understanding tasks")
figure_descriptions = await asyncio.gather(*figure_understanding_tasks)
logging.info("Finished image understanding tasks")
logging.info(f"Image Descriptions: {figure_descriptions}")
logging.info("Running image upload tasks")
- figure_uris = await asyncio.gather(*figure_upload_tasks)
+ await asyncio.gather(*figure_upload_tasks)
logging.info("Finished image upload tasks")
- figures = [
- {"figureId": figure_id, "figureUri": figure_uri}
- for figure_id, figure_uri in zip(figure_ids, figure_uris)
- ]
-
running_offset = 0
for figure_processing_data, figure_description in zip(
figure_processing_datas, figure_descriptions
@@ -355,7 +351,7 @@ async def process_figures_from_extracted_content(
)
running_offset += desc_offset
- return markdown_content, figures
+ return markdown_content
def create_page_wise_content(result: AnalyzeResult) -> list:
@@ -586,8 +582,7 @@ async def process_adi_2_ai_search(record: dict, chunk_by_page: bool = False) ->
):
build_and_clean_markdown_for_response_tasks.append(
build_and_clean_markdown_for_response(
- extracted_page_content[0],
- extracted_page_content[1],
+ extracted_page_content,
page_number,
True,
)
@@ -609,10 +604,7 @@ async def process_adi_2_ai_search(record: dict, chunk_by_page: bool = False) ->
else:
markdown_content = result.content
- (
- extracted_content,
- figures,
- ) = await process_figures_from_extracted_content(
+ (extracted_content) = await process_figures_from_extracted_content(
result,
operation_id,
container_and_blob,
@@ -622,7 +614,7 @@ async def process_adi_2_ai_search(record: dict, chunk_by_page: bool = False) ->
)
cleaned_result = await build_and_clean_markdown_for_response(
- extracted_content, figures, remove_irrelevant_figures=True
+ extracted_content, remove_irrelevant_figures=True
)
except Exception as e:
logging.error(e)
diff --git a/adi_function_app/function_app.py b/adi_function_app/function_app.py
index cca6005..5188023 100644
--- a/adi_function_app/function_app.py
+++ b/adi_function_app/function_app.py
@@ -6,8 +6,9 @@
import asyncio
from adi_2_ai_search import process_adi_2_ai_search
-from pre_embedding_cleaner import process_pre_embedding_cleaner
+from adi_function_app.mark_up_cleaner import process_mark_up_cleaner
from key_phrase_extraction import process_key_phrase_extraction
+from semantic_text_chunker import process_semantic_text_chunker, SemanticTextChunker
logging.basicConfig(level=logging.DEBUG)
app = func.FunctionApp(http_auth_level=func.AuthLevel.FUNCTION)
@@ -50,8 +51,8 @@ async def adi_2_ai_search(req: func.HttpRequest) -> func.HttpResponse:
)
-@app.route(route="pre_embedding_cleaner", methods=[func.HttpMethod.POST])
-async def pre_embedding_cleaner(req: func.HttpRequest) -> func.HttpResponse:
+@app.route(route="mark_up_cleaner", methods=[func.HttpMethod.POST])
+async def mark_up_cleaner(req: func.HttpRequest) -> func.HttpResponse:
"""HTTP trigger for data cleanup function.
Args:
@@ -73,9 +74,65 @@ async def pre_embedding_cleaner(req: func.HttpRequest) -> func.HttpResponse:
record_tasks = []
+ for value in values:
+ record_tasks.append(asyncio.create_task(process_mark_up_cleaner(value)))
+
+ results = await asyncio.gather(*record_tasks)
+ logging.debug("Results: %s", results)
+ cleaned_tasks = {"values": results}
+
+ return func.HttpResponse(
+ json.dumps(cleaned_tasks), status_code=200, mimetype="application/json"
+ )
+
+
+@app.route(route="semantic_text_chunker", methods=[func.HttpMethod.POST])
+async def semantic_text_chunker(req: func.HttpRequest) -> func.HttpResponse:
+ """HTTP trigger for text chunking function.
+
+ Args:
+ req (func.HttpRequest): The HTTP request object.
+
+ Returns:
+ func.HttpResponse: The HTTP response object."""
+ logging.info("Python HTTP trigger text chunking function processed a request.")
+
+ try:
+ req_body = req.get_json()
+ values = req_body.get("values")
+
+ semantic_text_chunker_config = req.headers
+
+ num_surrounding_sentences = semantic_text_chunker_config.get(
+ "num_surrounding_sentences", 1
+ )
+ similarity_threshold = semantic_text_chunker_config.get(
+ "similarity_threshold", 0.8
+ )
+ max_chunk_tokens = semantic_text_chunker_config.get("max_chunk_tokens", 500)
+ min_chunk_tokens = semantic_text_chunker_config.get("min_chunk_tokens", 50)
+
+ except ValueError:
+ return func.HttpResponse(
+ "Please valid Custom Skill Payload in the request body", status_code=400
+ )
+ else:
+ logging.debug("Input Values: %s", values)
+
+ record_tasks = []
+
+ semantic_text_chunker = SemanticTextChunker(
+ num_surrounding_sentences=num_surrounding_sentences,
+ similarity_threshold=similarity_threshold,
+ max_chunk_tokens=max_chunk_tokens,
+ min_chunk_tokens=min_chunk_tokens,
+ )
+
for value in values:
record_tasks.append(
- asyncio.create_task(process_pre_embedding_cleaner(value))
+ asyncio.create_task(
+ process_semantic_text_chunker(value, semantic_text_chunker)
+ )
)
results = await asyncio.gather(*record_tasks)
@@ -83,7 +140,7 @@ async def pre_embedding_cleaner(req: func.HttpRequest) -> func.HttpResponse:
cleaned_tasks = {"values": results}
return func.HttpResponse(
- json.dumps(cleaned_tasks), status_code=200, mimetype="application/json"
+ json.dump(cleaned_tasks), status_code=200, mimetype="application/json"
)
diff --git a/adi_function_app/pre_embedding_cleaner.py b/adi_function_app/mark_up_cleaner.py
similarity index 66%
rename from adi_function_app/pre_embedding_cleaner.py
rename to adi_function_app/mark_up_cleaner.py
index 5c787e6..c59b196 100644
--- a/adi_function_app/pre_embedding_cleaner.py
+++ b/adi_function_app/mark_up_cleaner.py
@@ -5,28 +5,35 @@
import re
-def get_sections(cleaned_text: str) -> list:
+def get_sections(text: str) -> list:
"""
- Returns the section details from the content
+ Returns the section details from the content.
Args:
- cleaned_text: The input text
+ text: The input text
Returns:
list: The sections related to text
-
"""
- combined_pattern = r"(.*?)\n===|\n#+\s*(.*?)\n"
- doc_metadata = re.findall(combined_pattern, cleaned_text, re.DOTALL)
- doc_metadata = [match for group in doc_metadata for match in group if match]
+ # Updated regex pattern to capture markdown headers like ### Header
+ combined_pattern = r"(?<=\n|^)[#]+\s*(.*?)(?=\n)"
+ doc_metadata = re.findall(combined_pattern, text, re.DOTALL)
return clean_sections(doc_metadata)
def clean_sections(sections: list) -> list:
- """Cleans the sections by removing special characters and extra white spaces."""
- cleanedSections = [re.sub(r"[=#]", "", match).strip() for match in sections]
+ """
+ Cleans the sections by removing special characters and extra white spaces.
+ """
+ cleaned_sections = [re.sub(r"[=#]", "", match).strip() for match in sections]
+ return cleaned_sections
- return cleanedSections
+
+def extract_figure_ids(text: str) -> list:
+ # Regex pattern to capture FigureId values
+ figure_id_pattern = r' str:
@@ -52,7 +59,9 @@ def remove_markdown_tags(text: str, tag_patterns: dict) -> str:
return text
-def clean_text_with_section_extraction(src_text: str) -> tuple[str, str]:
+def clean_text_and_extract_metadata(
+ src_text: str, figure_storage_prefix: str
+) -> tuple[str, str]:
"""This function performs following cleanup activities on the text, remove all unicode characters
remove line spacing,remove stop words, normalize characters
@@ -62,23 +71,35 @@ def clean_text_with_section_extraction(src_text: str) -> tuple[str, str]:
Returns:
str: The clean text."""
+ return_record = {}
+
try:
logging.info(f"Input text: {src_text}")
if len(src_text) == 0:
logging.error("Input text is empty")
raise ValueError("Input text is empty")
+ return_record["marked_up_chunk"] = src_text
+ return_record["sections"] = get_sections(src_text)
+
+ figure_ids = extract_figure_ids(src_text)
+
+ figures = []
+ for figure_id in figure_ids:
+ figure_uri = f"{figure_storage_prefix}/{figure_id}.png"
+ figures.append({"figure_id": figure_id, "figure_uri": figure_uri})
+
+ return_record["figures"] = figures
+
# Define specific patterns for each tag
tag_patterns = {
"figurecontent": r"",
- "figure": r"(.*?)",
+ "figure": r"(.*?)",
"figures": r"\(figures/\d+\)(.*?)\(figures/\d+\)",
"figcaption": r"(.*?)",
}
cleaned_text = remove_markdown_tags(src_text, tag_patterns)
- sections = get_sections(cleaned_text)
-
# Updated regex to keep Unicode letters, punctuation, whitespace, currency symbols, and percentage signs,
# while also removing non-printable characters
cleaned_text = re.sub(r"[^\p{L}\p{P}\s\p{Sc}%\x20-\x7E]", "", cleaned_text)
@@ -87,13 +108,15 @@ def clean_text_with_section_extraction(src_text: str) -> tuple[str, str]:
if len(cleaned_text) == 0:
logging.error("Cleaned text is empty")
raise ValueError("Cleaned text is empty")
+ else:
+ return_record["cleaned_chunk"] = cleaned_text
except Exception as e:
- logging.error(f"An error occurred in clean_text: {e}")
+ logging.error(f"An error occurred in clean_text_and_extract_metadata: {e}")
return ""
- return cleaned_text, sections
+ return return_record
-async def process_pre_embedding_cleaner(record: dict) -> dict:
+async def process_mark_up_cleaner(record: dict) -> dict:
"""Cleanup the data using standard python libraries.
Args:
@@ -114,19 +137,9 @@ async def process_pre_embedding_cleaner(record: dict) -> dict:
"warnings": None,
}
- # scenarios when page by chunking is enabled
- if isinstance(record["data"]["chunk"], dict):
- (
- cleaned_record["data"]["cleanedChunk"],
- cleaned_record["data"]["sections"],
- ) = clean_text_with_section_extraction(record["data"]["chunk"]["content"])
- cleaned_record["data"]["chunk"] = record["data"]["chunk"]["content"]
- else:
- (
- cleaned_record["data"]["cleanedChunk"],
- cleaned_record["data"]["sections"],
- ) = clean_text_with_section_extraction(record["data"]["chunk"])
- cleaned_record["data"]["chunk"] = record["data"]["chunk"]
+ cleaned_record["data"] = clean_text_and_extract_metadata(
+ record["data"]["chunk"], record["data"]["figure_storage_prefix"]
+ )
except Exception as e:
logging.error("string cleanup Error: %s", e)
diff --git a/adi_function_app/requirements.txt b/adi_function_app/requirements.txt
index b97a6d6..ca11d4d 100644
--- a/adi_function_app/requirements.txt
+++ b/adi_function_app/requirements.txt
@@ -19,3 +19,7 @@ azure-ai-vision-imageanalysis
PyMuPDF
aiohttp
Pillow
+numpy
+spacy
+en-core-web-md @ https://github.com/explosion/spacy-models/releases/download/en_core_web_md-3.7.1/en_core_web_md-3.7.1.tar.gz
+tiktoken
diff --git a/adi_function_app/semantic_text_chunker.py b/adi_function_app/semantic_text_chunker.py
new file mode 100644
index 0000000..6cbc889
--- /dev/null
+++ b/adi_function_app/semantic_text_chunker.py
@@ -0,0 +1,481 @@
+# Copyright (c) Microsoft Corporation.
+# Licensed under the MIT License.
+import logging
+import json
+import re
+import tiktoken
+import spacy
+import numpy as np
+
+logging.basicConfig(level=logging.INFO)
+
+
+class SemanticTextChunker:
+ def __init__(
+ self,
+ num_surrounding_sentences: int = 1,
+ similarity_threshold: float = 0.8,
+ max_chunk_tokens: int = 200,
+ min_chunk_tokens: int = 50,
+ ):
+ self.num_surrounding_sentences = num_surrounding_sentences
+ self.similarity_threshold = similarity_threshold
+ self.max_chunk_tokens = max_chunk_tokens
+ self.min_chunk_tokens = min_chunk_tokens
+ try:
+ self._nlp_model = spacy.load("en_core_web_md")
+ except IOError as e:
+ raise ValueError("Spacy model 'en_core_web_md' not found.") from e
+
+ def sentence_contains_figure_or_table_ending(self, text: str):
+ return "" in text or "" in text
+
+ def sentence_contains_figure_or_table(self, text: str):
+ return (
+ ("
" in text
+ )
+
+ def num_tokens_from_string(self, string: str) -> int:
+ """Gets the number of tokens in a string using a specific encoding.
+
+ Args:
+ string: The input string.
+
+ Returns:
+ int: The number of tokens in the string."""
+
+ encoding = tiktoken.get_encoding("cl100k_base")
+
+ return len(encoding.encode(string))
+
+ async def chunk(self, text: str) -> list[dict]:
+ """Attempts to chunk the text by:
+ Splitting into sentences
+ Grouping sentences that contain figures and tables
+ Merging semanticly similar chunks
+
+ Args:
+ text (str): The set of text to chunk
+
+ Returns:
+ list(str): The list of chunks"""
+
+ sentences = self.split_into_sentences(text)
+
+ (
+ grouped_sentences,
+ is_table_or_figure_map,
+ ) = self.group_figures_and_tables_into_sentences(sentences)
+
+ forward_pass_chunks, new_is_table_or_figure_map = self.merge_chunks(
+ grouped_sentences, is_table_or_figure_map
+ )
+
+ logging.info(
+ f"""Number of Forward pass chunks: {
+ len(forward_pass_chunks)}"""
+ )
+ logging.info(f"Forward pass chunks: {forward_pass_chunks}")
+
+ backwards_pass_chunks, _ = self.merge_chunks(
+ forward_pass_chunks, new_is_table_or_figure_map, forwards_direction=False
+ )
+
+ reversed_backwards_pass_chunks = list(reversed(backwards_pass_chunks))
+
+ logging.info(
+ f"""Number of Backaward pass chunks: {
+ len(reversed_backwards_pass_chunks)}"""
+ )
+ logging.info(f"Backward pass chunks: {reversed_backwards_pass_chunks}")
+
+ cleaned_final_chunks = []
+ for chunk in reversed_backwards_pass_chunks:
+ stripped_chunk = chunk.strip()
+ if len(stripped_chunk) > 0:
+ cleaned_final_chunks.append(stripped_chunk)
+
+ logging.info(f"Number of final chunks: {len(cleaned_final_chunks)}")
+ logging.info(f"Chunks: {cleaned_final_chunks}")
+
+ return cleaned_final_chunks
+
+ def filter_empty_figures(self, text):
+ # Regular expression to match ... with only newlines or spaces in between
+ pattern = r"\s*"
+
+ # Replace any matches of the pattern with an empty string
+ filtered_text = re.sub(pattern, "", text)
+
+ return filtered_text
+
+ def clean_new_lines(self, text):
+ # Remove single newlines surrounded by < and >
+ cleaned_text = re.sub(r"(?<=>)(\n)(?=<)", "", text)
+
+ # Replace all other single newlines with space
+ cleaned_text = re.sub(r"(? list[str]:
+ """Splits a set of text into a list of sentences uses the Spacy NLP model.
+
+ Args:
+ text (str): The set of text to chunk
+
+ Returns:
+ list(str): The extracted sentences
+ """
+
+ cleaned_text = self.clean_new_lines(text)
+
+ # Filter out empty ... tags
+ cleaned_text = self.filter_empty_figures(cleaned_text)
+
+ doc = self._nlp_model(cleaned_text)
+
+ tag_split_sentences = []
+ # Pattern to match the closing and opening tag junctions with whitespace in between
+ split_pattern = r"(\s*
]*>|\s*]*>)"
+ for sent in doc.sents:
+ split_result = re.split(split_pattern, sent.text)
+ for part in split_result:
+ # Match the junction and split it into two parts
+ if re.match(split_pattern, part):
+ # Split at the first whitespace
+ tag_split = part.split(" ", 1)
+ # Add the closing tag (e.g.,
)
+ tag_split_sentences.append(tag_split[0])
+ if len(tag_split) > 1:
+ # Add the rest of the string with leading space
+ tag_split_sentences.append(" " + tag_split[1])
+ else:
+ tag_split_sentences.append(part)
+
+ # Now apply a split pattern against markdown headings
+ heading_split_sentences = []
+
+ # Iterate through each sentence in tag_split_sentences
+ for sent in tag_split_sentences:
+ # Use re.split to split on \n\n and headings, but keep \n\n in the result
+ split_result = re.split(r"(\n\n|#+ .*)", sent)
+
+ # Extend the result with the correctly split parts, retaining \n\n before the heading
+ for part in split_result:
+ if part.strip(): # Only add non-empty parts
+ heading_split_sentences.append(part)
+
+ return heading_split_sentences
+
+ def group_figures_and_tables_into_sentences(self, sentences: list[str]):
+ grouped_sentences = []
+ holding_sentences = []
+
+ is_table_or_figure_map = []
+
+ is_grouped_sentence = False
+ for current_sentence in sentences:
+ if is_grouped_sentence is False:
+ if self.sentence_is_complete_figure_or_table(current_sentence):
+ grouped_sentences.append(current_sentence)
+ is_table_or_figure_map.append(True)
+ elif self.sentence_contains_figure_or_table(current_sentence):
+ is_grouped_sentence = True
+ holding_sentences.append(current_sentence)
+ else:
+ grouped_sentences.append(current_sentence)
+ is_table_or_figure_map.append(False)
+ else:
+ # check for ending case
+ if self.sentence_contains_figure_or_table_ending(current_sentence):
+ holding_sentences.append(current_sentence)
+
+ full_sentence = " ".join(holding_sentences)
+ grouped_sentences.append(full_sentence)
+ holding_sentences = []
+
+ is_grouped_sentence = False
+ is_table_or_figure_map.append(True)
+ else:
+ holding_sentences.append(current_sentence)
+
+ assert len(holding_sentences) == 0, "Holding sentences should be empty"
+
+ return grouped_sentences, is_table_or_figure_map
+
+ def look_ahead_and_behind_sentences(
+ self, total_sentences, is_table_or_figure_map, current_sentence_index
+ ):
+ is_table_or_figure_ahead = False
+ is_table_or_figure_behind = False
+
+ distance_to_next_figure = self.num_surrounding_sentences
+
+ if current_sentence_index < self.num_surrounding_sentences:
+ is_table_or_figure_behind = is_table_or_figure_map[0]
+ else:
+ is_table_or_figure_behind = is_table_or_figure_map[
+ current_sentence_index - self.num_surrounding_sentences
+ ]
+
+ surround_sentences_gap_to_test = self.num_surrounding_sentences
+ if current_sentence_index + self.num_surrounding_sentences >= total_sentences:
+ is_table_or_figure_ahead = is_table_or_figure_map[-1]
+ surround_sentences_gap_to_test = total_sentences - current_sentence_index
+ else:
+ is_table_or_figure_ahead = is_table_or_figure_map[
+ current_sentence_index + self.num_surrounding_sentences
+ ]
+
+ for (
+ next_sentence_is_table_or_figure_index,
+ next_sentence_is_table_or_figure,
+ ) in enumerate(
+ is_table_or_figure_map[
+ current_sentence_index : current_sentence_index
+ + surround_sentences_gap_to_test
+ ]
+ ):
+ if next_sentence_is_table_or_figure:
+ distance_to_next_figure = next_sentence_is_table_or_figure_index
+
+ return (
+ is_table_or_figure_ahead,
+ is_table_or_figure_behind,
+ min(surround_sentences_gap_to_test, distance_to_next_figure),
+ )
+
+ def merge_similar_chunks(self, current_sentence, current_chunk, forwards_direction):
+ new_chunk = None
+
+ def retrieve_current_chunk_up_to_n(n):
+ if forwards_direction:
+ return " ".join(current_chunk[:-n])
+ else:
+ return " ".join(reversed(current_chunk[:-n]))
+
+ def retrieve_current_chunks_from_n(n):
+ if forwards_direction:
+ return " ".join(current_chunk[n:])
+ else:
+ return " ".join(reversed(current_chunk[:-n]))
+
+ def retrive_current_chunk_at_n(n):
+ if forwards_direction:
+ return current_chunk[n]
+ else:
+ return current_chunk[n]
+
+ current_chunk_tokens = self.num_tokens_from_string(" ".join(current_chunk))
+
+ if len(current_chunk) >= 2 and current_chunk_tokens >= self.min_chunk_tokens:
+ logging.debug("Comparing chunks")
+ cosine_sim = self.sentence_similarity(
+ retrieve_current_chunks_from_n(-2), current_sentence
+ )
+ if (
+ cosine_sim < self.similarity_threshold
+ or current_chunk_tokens >= self.max_chunk_tokens
+ ):
+ if len(current_chunk) > 2:
+ new_chunk = retrieve_current_chunk_up_to_n(1)
+ current_chunk = [retrive_current_chunk_at_n(-1)]
+ else:
+ new_chunk = retrive_current_chunk_at_n(0)
+ current_chunk = [retrive_current_chunk_at_n(1)]
+ else:
+ logging.debug("Chunk too small to compare")
+
+ return new_chunk, current_chunk
+
+ def is_markdown_heading(self, text):
+ return text.strip().startswith("#")
+
+ def merge_chunks(self, sentences, is_table_or_figure_map, forwards_direction=True):
+ chunks = []
+ current_chunk = []
+
+ total_sentences = len(sentences)
+ index = 0
+
+ def retrieve_current_chunk():
+ if forwards_direction:
+ return " ".join(current_chunk)
+ else:
+ return " ".join(reversed(current_chunk))
+
+ new_is_table_or_figure_map = []
+ while index < total_sentences:
+ if forwards_direction is False:
+ current_sentence_index = total_sentences - index - 1
+ else:
+ current_sentence_index = index
+
+ current_sentence = sentences[current_sentence_index]
+
+ if len(current_sentence.strip()) == 0:
+ index += 1
+ continue
+
+ # Detect if table or figure
+ if is_table_or_figure_map[current_sentence_index]:
+ if forwards_direction:
+ if len(current_chunk) > 0:
+ current_chunk.append(current_sentence)
+ chunks.append(retrieve_current_chunk())
+ new_is_table_or_figure_map.append(True)
+ current_chunk = []
+ else:
+ current_chunk.append(current_sentence)
+ else:
+ # On the backwards pass we don't want to add to the table chunk
+ chunks.append(retrieve_current_chunk())
+ new_is_table_or_figure_map.append(True)
+ current_chunk = [current_sentence]
+
+ index += 1
+ continue
+ elif forwards_direction:
+ # Look ahead to see if figure of table is coming up
+ # We only do this on the forward pass
+ (
+ is_table_or_figure_ahead,
+ is_table_or_figure_behind,
+ min_of_distance_to_next_figure_or_num_surrounding_sentences,
+ ) = self.look_ahead_and_behind_sentences(
+ total_sentences, is_table_or_figure_map, current_sentence_index
+ )
+
+ if is_table_or_figure_behind:
+ # Check if Makrdown heading
+ if self.is_markdown_heading(current_sentence):
+ # Start new chunk
+ chunks.append(retrieve_current_chunk())
+ new_is_table_or_figure_map.append(False)
+ current_chunk = [current_sentence]
+ else:
+ # Finish off
+ current_chunk.append(current_sentence)
+ chunks.append(retrieve_current_chunk())
+ new_is_table_or_figure_map.append(False)
+ current_chunk = []
+
+ index += 1
+ continue
+ elif is_table_or_figure_ahead:
+ # Add to the ahead chunk
+ chunks.append(retrieve_current_chunk())
+ new_is_table_or_figure_map.append(False)
+ if forwards_direction:
+ current_chunk = sentences[
+ current_sentence_index : current_sentence_index
+ + min_of_distance_to_next_figure_or_num_surrounding_sentences
+ ]
+ else:
+ current_chunk = sentences[
+ current_sentence_index : current_sentence_index
+ - min_of_distance_to_next_figure_or_num_surrounding_sentences : -1
+ ]
+ index += min_of_distance_to_next_figure_or_num_surrounding_sentences
+ continue
+
+ # now group semanticly
+ num_tokens = self.num_tokens_from_string(current_sentence)
+
+ if num_tokens >= self.max_chunk_tokens:
+ chunks.append(current_sentence)
+ new_is_table_or_figure_map.append(False)
+ else:
+ current_chunk.append(current_sentence)
+
+ new_chunk, current_chunk = self.merge_similar_chunks(
+ current_sentence,
+ current_chunk,
+ forwards_direction=forwards_direction,
+ )
+
+ if new_chunk is not None:
+ chunks.append(new_chunk)
+ new_is_table_or_figure_map.append(False)
+
+ index += 1
+
+ if len(current_chunk) > 0:
+ final_chunk = retrieve_current_chunk()
+ chunks.append(final_chunk)
+
+ new_is_table_or_figure_map.append(
+ self.sentence_contains_figure_or_table(final_chunk)
+ )
+
+ return chunks, new_is_table_or_figure_map
+
+ def sentence_similarity(self, text_1, text_2):
+ vec1 = self._nlp_model(text_1).vector
+ vec2 = self._nlp_model(text_2).vector
+
+ dot_product = np.dot(vec1, vec2)
+ magnitude = np.linalg.norm(vec1) * np.linalg.norm(vec2)
+ similarity = dot_product / magnitude if magnitude != 0 else 0.0
+
+ logging.debug(
+ f"""Similarity between '{text_1}' and '{
+ text_2}': {similarity}"""
+ )
+ return similarity
+
+
+async def process_semantic_text_chunker(record: dict, text_chunker) -> dict:
+ """Chunk the data.
+
+ Args:
+ record (dict): The record to cleanup.
+
+ Returns:
+ dict: The clean record."""
+
+ try:
+ json_str = json.dumps(record, indent=4)
+
+ logging.info(f"Chunking Input: {json_str}")
+
+ cleaned_record = {
+ "recordId": record["recordId"],
+ "data": {},
+ "errors": None,
+ "warnings": None,
+ }
+
+ # scenarios when page by chunking is enabled
+ cleaned_record["data"]["chunks"] = await text_chunker.chunk(
+ record["data"]["content"]
+ )
+
+ except Exception as e:
+ logging.error("Chunking Error: %s", e)
+ return {
+ "recordId": record["recordId"],
+ "data": {},
+ "errors": [
+ {
+ "message": "Failed to chunk data. Check function app logs for more details of exact failure."
+ }
+ ],
+ "warnings": None,
+ }
+ json_str = json.dumps(cleaned_record, indent=4)
+
+ logging.info(f"Chunking output: {json_str}")
+
+ return cleaned_record
diff --git a/deploy_ai_search/.env b/deploy_ai_search/.env
index e738621..af194da 100644
--- a/deploy_ai_search/.env
+++ b/deploy_ai_search/.env
@@ -3,6 +3,7 @@ FunctionApp__Key=
FunctionApp__PreEmbeddingCleaner__FunctionName=pre_embedding_cleaner
FunctionApp__ADI__FunctionName=adi_2_ai_search
FunctionApp__KeyPhraseExtractor__FunctionName=key_phrase_extractor
+FunctionApp__SemanticTextChunker__FunctionName=semantic_text_chunker
FunctionApp__AppRegistrationResourceId=
IdentityType= # system_assigned or user_assigned or key
AIService__AzureSearchOptions__Endpoint=
diff --git a/deploy_ai_search/ai_search.py b/deploy_ai_search/ai_search.py
index 6d63bac..3da8f43 100644
--- a/deploy_ai_search/ai_search.py
+++ b/deploy_ai_search/ai_search.py
@@ -196,7 +196,7 @@ def get_data_source(self) -> SearchIndexerDataSourceConnection:
return data_source_connection
- def get_pre_embedding_cleaner_skill(self, context, source) -> WebApiSkill:
+ def get_mark_up_cleaner_skill(self, context, source) -> WebApiSkill:
"""Get the custom skill for data cleanup.
Args:
@@ -215,66 +215,112 @@ def get_pre_embedding_cleaner_skill(self, context, source) -> WebApiSkill:
batch_size = 16
degree_of_parallelism = 16
- pre_embedding_cleaner_skill_inputs = [
- InputFieldMappingEntry(name="chunk", source=source)
+ mark_up_cleaner_skill_inputs = [
+ InputFieldMappingEntry(name="chunk", source=source),
+ InputFieldMappingEntry(
+ name="figure_storage_prefix", source="/document/metadata_storage_path"
+ ),
]
- pre_embedding_cleaner_skill_outputs = [
- OutputFieldMappingEntry(name="cleanedChunk", target_name="cleanedChunk"),
+ mark_up_cleaner_skill_outputs = [
+ OutputFieldMappingEntry(name="cleaned_chunk", target_name="cleaned_chunk"),
OutputFieldMappingEntry(name="chunk", target_name="chunk"),
OutputFieldMappingEntry(name="sections", target_name="sections"),
]
- pre_embedding_cleaner_skill = WebApiSkill(
- name="Pre Embedding Cleaner Skill",
+ mark_up_cleaner_skill = WebApiSkill(
+ name="Mark Up Cleaner Skill",
description="Skill to clean the data before sending to embedding",
context=context,
- uri=self.environment.get_custom_skill_function_url("pre_embedding_cleaner"),
+ uri=self.environment.get_custom_skill_function_url("mark_up_cleaner"),
timeout="PT230S",
batch_size=batch_size,
degree_of_parallelism=degree_of_parallelism,
http_method="POST",
- inputs=pre_embedding_cleaner_skill_inputs,
- outputs=pre_embedding_cleaner_skill_outputs,
+ inputs=mark_up_cleaner_skill_inputs,
+ outputs=mark_up_cleaner_skill_outputs,
)
if self.environment.identity_type != IdentityType.KEY:
- pre_embedding_cleaner_skill.auth_identity = (
+ mark_up_cleaner_skill.auth_identity = (
self.environment.function_app_app_registration_resource_id
)
if self.environment.identity_type == IdentityType.USER_ASSIGNED:
- pre_embedding_cleaner_skill.auth_identity = (
+ mark_up_cleaner_skill.auth_identity = (
self.environment.ai_search_user_assigned_identity
)
- return pre_embedding_cleaner_skill
+ return mark_up_cleaner_skill
- def get_text_split_skill(self, context, source) -> SplitSkill:
+ def get_text_split_skill(
+ self,
+ context,
+ source,
+ num_surrounding_sentences: int = 1,
+ similarity_threshold: float = 0.8,
+ max_chunk_tokens: int = 200,
+ min_chunk_tokens: int = 50,
+ ) -> SplitSkill:
"""Get the skill for text split.
Args:
-----
context (str): The context of the skill
- inputs (List[InputFieldMappingEntry]): The inputs of the skill
- outputs (List[OutputFieldMappingEntry]): The outputs of the skill
+ source (str): The source of the skill
+ num_surrounding_sentences (int, optional): The number of surrounding sentences. Defaults to 1.
+ similarity_threshold (float, optional): The similarity threshold. Defaults to 0.8.
+ max_chunk_tokens (int, optional): The maximum number of tokens. Defaults to 200.
Returns:
--------
splitSKill: The skill for text split"""
- text_split_skill = SplitSkill(
- name="Text Split Skill",
- description="Skill to split the text before sending to embedding",
+ if self.test:
+ batch_size = 2
+ degree_of_parallelism = 2
+ else:
+ batch_size = 16
+ degree_of_parallelism = 16
+
+ semantic_text_chunker_skill_inputs = [
+ InputFieldMappingEntry(name="content", source=source)
+ ]
+
+ semantic_text_chunker_skill_outputs = [
+ OutputFieldMappingEntry(name="chunks", target_name="chunks"),
+ ]
+
+ semantic_text_chunker_skill = WebApiSkill(
+ name="Mark Up Cleaner Skill",
+ description="Skill to clean the data before sending to embedding",
context=context,
- text_split_mode="pages",
- maximum_page_length=2000,
- page_overlap_length=500,
- inputs=[InputFieldMappingEntry(name="text", source=source)],
- outputs=[OutputFieldMappingEntry(name="textItems", target_name="pages")],
+ uri=self.environment.get_custom_skill_function_url("semantic_text_chunker"),
+ timeout="PT230S",
+ batch_size=batch_size,
+ degree_of_parallelism=degree_of_parallelism,
+ http_method="POST",
+ http_headers={
+ "num_surrounding_sentences": num_surrounding_sentences,
+ "similarity_threshold": similarity_threshold,
+ "max_chunk_tokens": max_chunk_tokens,
+ "min_chunk_tokens": min_chunk_tokens,
+ },
+ inputs=semantic_text_chunker_skill_inputs,
+ outputs=semantic_text_chunker_skill_outputs,
)
- return text_split_skill
+ if self.environment.identity_type != IdentityType.KEY:
+ semantic_text_chunker_skill.auth_identity = (
+ self.environment.function_app_app_registration_resource_id
+ )
+
+ if self.environment.identity_type == IdentityType.USER_ASSIGNED:
+ semantic_text_chunker_skill.auth_identity = (
+ self.environment.ai_search_user_assigned_identity
+ )
+
+ return semantic_text_chunker_skill
def get_adi_skill(self, chunk_by_page=False) -> WebApiSkill:
"""Get the custom skill for adi.
@@ -297,7 +343,7 @@ def get_adi_skill(self, chunk_by_page=False) -> WebApiSkill:
if chunk_by_page:
output = [
- OutputFieldMappingEntry(name="extracted_content", target_name="pages")
+ OutputFieldMappingEntry(name="extracted_content", target_name="chunks")
]
else:
output = [
diff --git a/deploy_ai_search/environment.py b/deploy_ai_search/environment.py
index e431304..8c35b6b 100644
--- a/deploy_ai_search/environment.py
+++ b/deploy_ai_search/environment.py
@@ -198,12 +198,19 @@ def function_app_app_registration_resource_id(self) -> str:
return os.environ.get("FunctionApp__AppRegistrationResourceId")
@property
- def function_app_pre_embedding_cleaner_route(self) -> str:
+ def function_app_mark_up_cleaner_route(self) -> str:
"""
This function returns function app data cleanup function name
"""
return os.environ.get("FunctionApp__PreEmbeddingCleaner__FunctionName")
+ @property
+ def function_app_semantic_text_chunker_route(self) -> str:
+ """
+ This function returns function app semantic text chunker name
+ """
+ return os.environ.get("FunctionApp__SemanticTextChunker__FunctionName")
+
@property
def function_app_adi_route(self) -> str:
"""
@@ -243,12 +250,14 @@ def get_custom_skill_function_url(self, skill_type: str):
"""
Get the function app url that is hosting the custom skill
"""
- if skill_type == "pre_embedding_cleaner":
- route = self.function_app_pre_embedding_cleaner_route
+ if skill_type == "mark_up_cleaner":
+ route = self.function_app_mark_up_cleaner_route
elif skill_type == "adi":
route = self.function_app_adi_route
elif skill_type == "key_phrase_extraction":
route = self.function_app_key_phrase_extractor_route
+ elif skill_type == "semantic_text_chunker":
+ route = self.function_app_semantic_text_chunker_route
else:
raise ValueError(f"Invalid skill type: {skill_type}")
diff --git a/deploy_ai_search/rag_documents.py b/deploy_ai_search/rag_documents.py
index c9ebffd..7d63856 100644
--- a/deploy_ai_search/rag_documents.py
+++ b/deploy_ai_search/rag_documents.py
@@ -171,22 +171,22 @@ def get_skills(self) -> list:
"/document", "/document/extracted_content/content"
)
- pre_embedding_cleaner_skill = self.get_pre_embedding_cleaner_skill(
- "/document/pages/*", "/document/pages/*"
+ mark_up_cleaner_skill = self.get_mark_up_cleaner_skill(
+ "/document/chunks/*", "/document/chunks/*/content"
)
key_phrase_extraction_skill = self.get_key_phrase_extraction_skill(
- "/document/pages/*", "/document/pages/*/cleanedChunk"
+ "/document/chunks/*", "/document/chunks/*/cleaned_chunk"
)
embedding_skill = self.get_vector_skill(
- "/document/pages/*", "/document/pages/*/cleanedChunk"
+ "/document/chunks/*", "/document/chunks/*/cleaned_chunk"
)
if self.enable_page_by_chunking:
skills = [
adi_skill,
- pre_embedding_cleaner_skill,
+ mark_up_cleaner_skill,
key_phrase_extraction_skill,
embedding_skill,
]
@@ -194,7 +194,7 @@ def get_skills(self) -> list:
skills = [
adi_skill,
text_split_skill,
- pre_embedding_cleaner_skill,
+ mark_up_cleaner_skill,
key_phrase_extraction_skill,
embedding_skill,
]
@@ -204,28 +204,29 @@ def get_skills(self) -> list:
def get_index_projections(self) -> SearchIndexerIndexProjection:
"""This function returns the index projections for rag document."""
mappings = [
- InputFieldMappingEntry(name="Chunk", source="/document/pages/*/chunk"),
+ InputFieldMappingEntry(name="Chunk", source="/document/chunks/*/chunk"),
InputFieldMappingEntry(
name="ChunkEmbedding",
- source="/document/pages/*/vector",
+ source="/document/chunks/*/vector",
),
InputFieldMappingEntry(name="Title", source="/document/Title"),
InputFieldMappingEntry(name="SourceUri", source="/document/SourceUri"),
InputFieldMappingEntry(
- name="Keywords", source="/document/pages/*/keywords"
+ name="Keywords", source="/document/chunks/*/keywords"
),
InputFieldMappingEntry(
- name="Sections", source="/document/pages/*/sections"
+ name="Sections", source="/document/chunks/*/sections"
),
InputFieldMappingEntry(
name="Figures",
- source_context="/document/pages/*/figures/*",
+ source_context="/document/chunks/*/figures/*",
inputs=[
InputFieldMappingEntry(
- name="FigureId", source="/document/pages/*/figures/*/figureId"
+ name="FigureId", source="/document/chunks/*/figures/*/figure_id"
),
InputFieldMappingEntry(
- name="FigureUri", source="/document/pages/*/figures/*/figureUri"
+ name="FigureUri",
+ source="/document/chunks/*/figures/*/figure_uri",
),
],
),
@@ -238,7 +239,7 @@ def get_index_projections(self) -> SearchIndexerIndexProjection:
mappings.extend(
[
InputFieldMappingEntry(
- name="PageNumber", source="/document/pages/*/pageNumber"
+ name="PageNumber", source="/document/chunks/*/pageNumber"
)
]
)
@@ -248,7 +249,7 @@ def get_index_projections(self) -> SearchIndexerIndexProjection:
SearchIndexerIndexProjectionSelector(
target_index_name=self.index_name,
parent_key_field_name="Id",
- source_context="/document/pages/*",
+ source_context="/document/chunks/*",
mappings=mappings,
),
],