PDFファイルを使ったRAGに挑戦(2)

今回は、以下の記事の続きです。

PDFファイルを使ったRAGに挑戦(1)

今ではLLMを使う場合にRAG(Retrieval-augmented generation)の技術は必須となってきています。理屈は理解できているのですが、本当に正しく知識をつかえて言えるのか心配…

Markdownを使ったRAG

単純にテキストを抽出しただけでは文書の構造情報が失われてしまうため、PDFからMarkdown形式に変換することで構造を保ちながらLLMへ入力する方法を試してみました。

PDFをMarkdownに変換

PDFファイルをMarkdownに変換するため、Azure AI Document IntelligenceのAPIを利用しました。このAPIは、テキスト、表、図の抽出、レイアウト情報の取得などが可能で、結果をMarkdown形式で出力することもできます。

事前にAzureでmy_keyとendpointを取得しておきます。

次に、必要なライブラリをインストールします

pip install azure-ai-documentintelligence --pre

具体的な変換コードは以下の通りです。

from azure.core.credentials import AzureKeyCredential
from azure.ai.documentintelligence import DocumentIntelligenceClient
from azure.ai.documentintelligence.models import (
    AnalyzeDocumentRequest,
    ContentFormat,
    AnalyzeResult,
)
import base64
from pathlib import Path


# 定数の定義

def analyze_document(input_file,output_file):

    model = "prebuilt-layout"
    client = DocumentIntelligenceClient(
        endpoint=my_endpoint, credential=AzureKeyCredential(my_key)
    )

    # Markdown形式の内容をファイルに保存
    with open(input_file, "rb") as pdf_file:
      pdf_data = pdf_file.read()  # PDFファイル全体をバイナリデータとして読み込む

    pdf_base64 = base64.b64encode(pdf_data).decode('utf-8')

    poller = client.begin_analyze_document(
        model,
        AnalyzeDocumentRequest(bytes_source = pdf_base64),
        output_content_format=ContentFormat.MARKDOWN,
    )

    result: AnalyzeResult = poller.result()


    with open(output_file, "w", encoding="utf-8") as md_file:
        md_file.write(f"Here's the full content in format {result.content_format}:\n\n")
        md_file.write(result.content)

    print(f"Markdown content has been written to {output_file}")

Markdownのコンテキスト化 

得られたMarkdownファイルを使って、LLMに質問を投げかけます。今回はclaude-3-5-sonnet-20241022を使用しました。

Claudeには入力token数の制限があるため、Markdownファイルを適切に分割する必要があります。また、コスト削減のために「キャッシュプロンプト」機能を利用しています。キャッシュプロンプトとは、LLMに送るプロンプトの共通部分をキャッシュさせることで、トークン消費を削減できる機能です。

今回は、同じファイルに対する質問を連続して行う実装にしています。ただし、キャッシュの寿命は約5分で、毎分処理できるトークン数にも上限があるため、適切な時間調整が必要です。

!pip install anthropic

import os
from anthropic import Anthropic
import re
from typing import List, Tuple
import math
import time

class MarkdownProcessor:
    def __init__(self, api_key: str, target_chunk_size: int = 170000):     # max 200k tokensなので、170k以下になるように分割
        self.anthropic = Anthropic(api_key=api_key)
        self.target_chunk_size = target_chunk_size
        self.content = ""
        self.sections = []

    def load_markdown_file(self, file_path: str) -> None:
        with open(file_path, 'r', encoding='utf-8') as file:
            self.content = file.read()

    def split_into_sections(self) -> List[Tuple[str, str]]:
        lines = self.content.split('\n')
        current_section = []
        current_title = ""
        sections = []

        for line in lines:
            header_match = re.match(r'^(#{1,3})\s+(.+)$', line)      ## markdownの見出しを検出して、分割点とする。
            if header_match:
                if current_section:
                    sections.append((current_title, '\n'.join(current_section)))
                current_title = line
                current_section = [line]
            else:
                current_section.append(line)

        if current_section:
            sections.append((current_title, '\n'.join(current_section)))

        return sections

    def optimize_chunks(self, sections: List[Tuple[str, str]]) -> List[str]:
        total_content_size = sum(len(content) for _, content in sections)
        optimal_chunk_count = math.ceil(total_content_size / self.target_chunk_size)
        chunks = []
        current_chunk = []
        current_chunk_size = 0

        for title, content in sections:
            section_size = len(content)

            if section_size > self.target_chunk_size:
                if current_chunk:
                    chunks.append('\n\n'.join(current_chunk))
                    current_chunk = []
                    current_chunk_size = 0

                paragraphs = content.split('\n\n')
                temp_chunk = []
                temp_size = 0

                for para in paragraphs:
                    if temp_size + len(para) > self.target_chunk_size:
                        chunks.append('\n\n'.join(temp_chunk))
                        temp_chunk = [para]
                        temp_size = len(para)
                    else:
                        temp_chunk.append(para)
                        temp_size += len(para)

                if temp_chunk:
                    chunks.append('\n\n'.join(temp_chunk))

            elif current_chunk_size + section_size > self.target_chunk_size:
                chunks.append('\n\n'.join(current_chunk))
                current_chunk = [content]
                current_chunk_size = section_size
            else:
                current_chunk.append(content)
                current_chunk_size += section_size

        if current_chunk:
            chunks.append('\n\n'.join(current_chunk))

        print(f"Total sections: {len(sections)}")
        print(f"Total chunks: {len(chunks)}")

        return chunks

    def retry_with_timeout(self, func, max_retries, delay):
        for attempt in range(max_retries):
            try:
                return func()
            except Exception as e:
                if attempt == max_retries - 1:  # 最後の試行で失敗した場合
                    raise e
                print(f"Attempt {attempt + 1} failed. Retrying in {delay} seconds... Error: {e}")
                time.sleep(delay)

    def create_message(self, chunk, i, question):
        chunk_prompt = f"""
        提示されたdocumentは、企業の報告書の一部です。markdown形式で記述されており、元のdocumentの{i+1}番目のpartです。
        このpartに含まれる利用者の質問{question}に関連する情報を、すべて抽出してください。
        関連する情報がない場合には、「関連情報なし」と出力してください。
        """

        response = self.anthropic.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=1024,
            temperature=0.0,
            system=[
                {
                    "type": "text",
                    "text": "<document>" + chunk + "</document>",
                    "cache_control": {"type": "ephemeral"}        
                },
            ],
            messages=[
                {
                    "role": "user",
                    "content": chunk_prompt,               
                }
            ],
            extra_headers={"anthropic-beta": "prompt-caching-2024-07-31"}
          )

        print("cache_creation_input_tokens-", response.usage.cache_creation_input_tokens) ## 新たなキャッシュを作ったtoken数
        print("cache_read_input_tokens-", response.usage.cache_read_input_tokens)          ## キャッシュから読み取ったtoken数
        print("input_tokens-", response.usage.input_tokens)

        return response

    def process_content(self, question: str) -> str:
        sections = self.split_into_sections()
        chunks = self.optimize_chunks(sections)
        all_responses = []

        for i, chunk in enumerate(chunks):
            try:
                response = self.retry_with_timeout(
                    lambda: self.create_message(chunk, i, question),
                    max_retries=5,
                    delay=90
                )

                all_responses.append(response.content)
            except Exception as e:
                print(f"Error processing chunk {i+1} after all retries: {e}")
                all_responses.append(str(e))

            time.sleep(70)

        # 全てのレスポンスを処理して返す
        work_responses = []
        for i, word in enumerate(all_responses):
            work_responses.append(f"Part {i+1}. {word} :")
        str_all_responses = " ".join(work_responses)

        return str_all_responses


ファイル分割の際には、

  • Markdownの見出しを利用して適切な分割点を見つける
  • できるだけ均等に分割する

の2点に気をつけています。

また、プロンプトでは、ドキュメントの部分と、質問の部分とを分離し、前者をキャッシュに入れるようにしています。