PDFファイルを使ったRAGに挑戦(2)
今回は、以下の記事の続きです。
Markdownを使ったRAG
単純にテキストを抽出しただけでは文書の構造情報が失われてしまうため、PDFからMarkdown形式に変換することで構造を保ちながらLLMへ入力する方法を試してみました。
PDFをMarkdownに変換
PDFファイルをMarkdownに変換するため、Azure AI Document IntelligenceのAPIを利用しました。このAPIは、テキスト、表、図の抽出、レイアウト情報の取得などが可能で、結果をMarkdown形式で出力することもできます。

事前にAzureでmy_keyとendpointを取得しておきます。
次に、必要なライブラリをインストールします
pip install azure-ai-documentintelligence --pre
具体的な変換コードは以下の通りです。
from azure.core.credentials import AzureKeyCredential
from azure.ai.documentintelligence import DocumentIntelligenceClient
from azure.ai.documentintelligence.models import (
AnalyzeDocumentRequest,
ContentFormat,
AnalyzeResult,
)
import base64
from pathlib import Path
# 定数の定義
def analyze_document(input_file,output_file):
model = "prebuilt-layout"
client = DocumentIntelligenceClient(
endpoint=my_endpoint, credential=AzureKeyCredential(my_key)
)
# Markdown形式の内容をファイルに保存
with open(input_file, "rb") as pdf_file:
pdf_data = pdf_file.read() # PDFファイル全体をバイナリデータとして読み込む
pdf_base64 = base64.b64encode(pdf_data).decode('utf-8')
poller = client.begin_analyze_document(
model,
AnalyzeDocumentRequest(bytes_source = pdf_base64),
output_content_format=ContentFormat.MARKDOWN,
)
result: AnalyzeResult = poller.result()
with open(output_file, "w", encoding="utf-8") as md_file:
md_file.write(f"Here's the full content in format {result.content_format}:\n\n")
md_file.write(result.content)
print(f"Markdown content has been written to {output_file}")
Markdownのコンテキスト化
得られたMarkdownファイルを使って、LLMに質問を投げかけます。今回はclaude-3-5-sonnet-20241022
を使用しました。
Claudeには入力token数の制限があるため、Markdownファイルを適切に分割する必要があります。また、コスト削減のために「キャッシュプロンプト」機能を利用しています。キャッシュプロンプトとは、LLMに送るプロンプトの共通部分をキャッシュさせることで、トークン消費を削減できる機能です。
今回は、同じファイルに対する質問を連続して行う実装にしています。ただし、キャッシュの寿命は約5分で、毎分処理できるトークン数にも上限があるため、適切な時間調整が必要です。
!pip install anthropic
import os
from anthropic import Anthropic
import re
from typing import List, Tuple
import math
import time
class MarkdownProcessor:
def __init__(self, api_key: str, target_chunk_size: int = 170000): # max 200k tokensなので、170k以下になるように分割
self.anthropic = Anthropic(api_key=api_key)
self.target_chunk_size = target_chunk_size
self.content = ""
self.sections = []
def load_markdown_file(self, file_path: str) -> None:
with open(file_path, 'r', encoding='utf-8') as file:
self.content = file.read()
def split_into_sections(self) -> List[Tuple[str, str]]:
lines = self.content.split('\n')
current_section = []
current_title = ""
sections = []
for line in lines:
header_match = re.match(r'^(#{1,3})\s+(.+)$', line) ## markdownの見出しを検出して、分割点とする。
if header_match:
if current_section:
sections.append((current_title, '\n'.join(current_section)))
current_title = line
current_section = [line]
else:
current_section.append(line)
if current_section:
sections.append((current_title, '\n'.join(current_section)))
return sections
def optimize_chunks(self, sections: List[Tuple[str, str]]) -> List[str]:
total_content_size = sum(len(content) for _, content in sections)
optimal_chunk_count = math.ceil(total_content_size / self.target_chunk_size)
chunks = []
current_chunk = []
current_chunk_size = 0
for title, content in sections:
section_size = len(content)
if section_size > self.target_chunk_size:
if current_chunk:
chunks.append('\n\n'.join(current_chunk))
current_chunk = []
current_chunk_size = 0
paragraphs = content.split('\n\n')
temp_chunk = []
temp_size = 0
for para in paragraphs:
if temp_size + len(para) > self.target_chunk_size:
chunks.append('\n\n'.join(temp_chunk))
temp_chunk = [para]
temp_size = len(para)
else:
temp_chunk.append(para)
temp_size += len(para)
if temp_chunk:
chunks.append('\n\n'.join(temp_chunk))
elif current_chunk_size + section_size > self.target_chunk_size:
chunks.append('\n\n'.join(current_chunk))
current_chunk = [content]
current_chunk_size = section_size
else:
current_chunk.append(content)
current_chunk_size += section_size
if current_chunk:
chunks.append('\n\n'.join(current_chunk))
print(f"Total sections: {len(sections)}")
print(f"Total chunks: {len(chunks)}")
return chunks
def retry_with_timeout(self, func, max_retries, delay):
for attempt in range(max_retries):
try:
return func()
except Exception as e:
if attempt == max_retries - 1: # 最後の試行で失敗した場合
raise e
print(f"Attempt {attempt + 1} failed. Retrying in {delay} seconds... Error: {e}")
time.sleep(delay)
def create_message(self, chunk, i, question):
chunk_prompt = f"""
提示されたdocumentは、企業の報告書の一部です。markdown形式で記述されており、元のdocumentの{i+1}番目のpartです。
このpartに含まれる利用者の質問{question}に関連する情報を、すべて抽出してください。
関連する情報がない場合には、「関連情報なし」と出力してください。
"""
response = self.anthropic.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
temperature=0.0,
system=[
{
"type": "text",
"text": "<document>" + chunk + "</document>",
"cache_control": {"type": "ephemeral"}
},
],
messages=[
{
"role": "user",
"content": chunk_prompt,
}
],
extra_headers={"anthropic-beta": "prompt-caching-2024-07-31"}
)
print("cache_creation_input_tokens-", response.usage.cache_creation_input_tokens) ## 新たなキャッシュを作ったtoken数
print("cache_read_input_tokens-", response.usage.cache_read_input_tokens) ## キャッシュから読み取ったtoken数
print("input_tokens-", response.usage.input_tokens)
return response
def process_content(self, question: str) -> str:
sections = self.split_into_sections()
chunks = self.optimize_chunks(sections)
all_responses = []
for i, chunk in enumerate(chunks):
try:
response = self.retry_with_timeout(
lambda: self.create_message(chunk, i, question),
max_retries=5,
delay=90
)
all_responses.append(response.content)
except Exception as e:
print(f"Error processing chunk {i+1} after all retries: {e}")
all_responses.append(str(e))
time.sleep(70)
# 全てのレスポンスを処理して返す
work_responses = []
for i, word in enumerate(all_responses):
work_responses.append(f"Part {i+1}. {word} :")
str_all_responses = " ".join(work_responses)
return str_all_responses
ファイル分割の際には、
- Markdownの見出しを利用して適切な分割点を見つける
- できるだけ均等に分割する
の2点に気をつけています。
また、プロンプトでは、ドキュメントの部分と、質問の部分とを分離し、前者をキャッシュに入れるようにしています。