Source code for metabeeai.metabeeai_llm.llm_pipeline

import argparse
import asyncio
import json
import os
import sys
import time

import yaml

from metabeeai.metabeeai_llm.json_multistage_qa import ask_json as ask_json_async
from metabeeai.metabeeai_llm.json_multistage_qa import format_to_list as format_to_list_async


def ask_json(question_text, json_path):
    """
    Asks a question to the JSON file at the specified path and returns the answer.
    """
    return asyncio.run(ask_json_async(question_text, json_path))


def format_to_list(question, text, model="gpt-4o-mini"):
    """
    Formats the JSON file at the specified path to a list.
    """
    return asyncio.run(format_to_list_async(question, text, model))


# ------------------------------------------------------------------------------
# Hierarchical Questions Dictionary
# ------------------------------------------------------------------------------
# Use {placeholder} format syntax in any question that should be parameterized.

# Lazy load questions to avoid import-time errors
_QUESTIONS = None


def _get_questions():
    """
    Lazy loads the questions.yml file when first accessed.
    Returns the questions dictionary.
    """
    global _QUESTIONS
    if _QUESTIONS is None:
        # Get the directory where this script is located
        script_dir = os.path.dirname(os.path.abspath(__file__))
        questions_path = os.path.join(script_dir, "questions.yml")

        with open(questions_path, "r") as file:
            _QUESTIONS = yaml.safe_load(file)

    return _QUESTIONS


# ------------------------------------------------------------------------------
# Helper Function: get_answer
# ------------------------------------------------------------------------------
async def get_answer(question_text, json_path, relevance_model=None, answer_model=None):
    """
    Retrieves the answer for a given question by calling ask_json.
    Returns a dictionary with the required structure: answer, reason, and chunk_ids.

    Args:
        question_text: The question to ask
        json_path: Path to the JSON file containing text chunks
        relevance_model: Model to use for chunk selection (defaults to config)
        answer_model: Model to use for answer generation and reflection (defaults to config)
    """
    result = await ask_json_async(question_text, json_path, relevance_model=relevance_model, answer_model=answer_model)

    # Ensure the result has the required structure
    if isinstance(result, dict):
        # Extract the required fields from the enhanced result
        return {
            "answer": result.get("answer", ""),
            "reason": result.get("reason", ""),
            "chunk_ids": result.get("chunk_ids", []),
        }
    else:
        # Fallback if result is not a dict
        return {
            "answer": str(result) if result else "",
            "reason": "Answer generated from available information",
            "chunk_ids": [],
        }


# ------------------------------------------------------------------------------
# Generic Recursive Function to Process a Hierarchical Question Tree
# ------------------------------------------------------------------------------
async def process_question_tree(tree, json_path, context=None, relevance_model=None, answer_model=None):
    """
    Recursively traverses the question tree (a nested dictionary) and obtains answers using get_answer.

    Args:
        tree: The question tree structure
        json_path: Path to the JSON file containing text chunks
        context: Context for formatting questions with placeholders
        relevance_model: Model to use for chunk selection (defaults to config)
        answer_model: Model to use for answer generation and reflection (defaults to config)

    - If a node contains a "question" key, it is treated as a leaf node.
    - The "for_each" key indicates that the associated value should be processed for
      each item in a list provided via the context.
    - The context is used to format questions with placeholders.
    """
    if context is None:
        context = {}

    # If the tree is a dictionary
    if isinstance(tree, dict):
        # If this dictionary has a "question" key, treat it as a leaf.
        if "question" in tree:
            question_text = tree["question"].format(**context)
            answer = await get_answer(question_text, json_path, relevance_model=relevance_model, answer_model=answer_model)
            # Process conditional branch if available.
            return answer
        else:
            result = {}
            for key, value in tree.items():
                if key == "list":
                    # If the key is "list", return the list as is.
                    question_of_the_list = value["question"].format(**context)
                    endpoint_name = value["endpoint_name"]
                    answer = await get_answer(
                        question_of_the_list, json_path, relevance_model=relevance_model, answer_model=answer_model
                    )
                    list_result = await format_to_list_async(question_of_the_list, answer["answer"])
                    list_items = list_result["answer"]
                    result[key] = {}
                    for item in list_items:
                        new_context = context.copy()
                        new_context[endpoint_name] = item
                        result[key][item] = await process_question_tree(
                            value["for_each"],
                            json_path,
                            new_context,
                            relevance_model=relevance_model,
                            answer_model=answer_model,
                        )
                else:
                    result[key] = await process_question_tree(
                        value, json_path, context, relevance_model=relevance_model, answer_model=answer_model
                    )
            return result
    elif isinstance(tree, list):
        return [
            await process_question_tree(item, json_path, context, relevance_model=relevance_model, answer_model=answer_model)
            for item in tree
        ]
    elif isinstance(tree, str):
        # If the tree itself is a string, treat it as a question.
        question_text = tree.format(**context)
        return await get_answer(question_text, json_path, relevance_model=relevance_model, answer_model=answer_model)
    else:
        return tree


# ------------------------------------------------------------------------------
# Main Function: Retrieve All Answers Based on the Questions Dictionary
# ------------------------------------------------------------------------------
[docs] async def get_literature_answers(json_path, relevance_model=None, answer_model=None): """ Processes the entire hierarchical question tree defined in QUESTIONS and returns the collected answers. Args: json_path: Path to the JSON file containing text chunks relevance_model: Model to use for chunk selection (defaults to config) answer_model: Model to use for answer generation and reflection (defaults to config) """ questions = _get_questions() answers = await process_question_tree(questions, json_path, relevance_model=relevance_model, answer_model=answer_model) return answers
# ------------------------------------------------------------------------------ # Main Execution # ------------------------------------------------------------------------------
[docs] def merge_json_in_the_folder(folder_path, overwrite=False): """ Merges all JSON files in the specified folder into a single dictionary. """ if not overwrite: if os.path.exists(folder_path + "merged.json"): print("The file already exists. Set 'overwrite=True' to overwrite.") return chunks_kept = [] for file in os.listdir(folder_path): if file.endswith(".json"): json_path = os.path.join(folder_path, file) with open(json_path, "r") as f: json_obj = json.load(f) chunks = json_obj["data"]["chunks"] for chunk in chunks: if chunk["chunk_type"] in ["figure", "marginalia"]: continue chunks_kept.append(chunk) json_obj = {"data": {"chunks": chunks_kept}} with open(folder_path + "merged.json", "w") as f: json.dump(json_obj, f, indent=2)
async def process_papers( base_dir=None, paper_folders=None, overwrite_merged=False, relevance_model=None, answer_model=None, start_folder=None, end_folder=None, ): """ Processes papers in the specified directory. Args: base_dir: Base directory containing paper folders (defaults to config) paper_folders: List of specific paper folder names to process (defaults to all folders) start_folder: Optional start folder (inclusive, alphanumeric) end_folder: Optional end folder (inclusive, alphanumeric) overwrite_merged: Whether to overwrite existing merged.json files relevance_model: Model to use for chunk selection (defaults to config) answer_model: Model to use for answer generation and reflection (defaults to config) """ # Import centralized configuration if base_dir not provided if base_dir is None: from metabeeai.config import get_config_param base_dir = get_config_param("papers_dir") # Validate base directory if not os.path.exists(base_dir): print(f"Error: Base directory '{base_dir}' not found") return # Add trailing slash if missing if not base_dir.endswith("/"): base_dir += "/" # If no specific folders provided, get all subdirectories if paper_folders is None: paper_folders = [] for item in os.listdir(base_dir): item_path = os.path.join(base_dir, item) # Only include directories (not files) if os.path.isdir(item_path) and not item.startswith("."): paper_folders.append(item) paper_folders.sort() # Sort for consistent processing order if start_folder or end_folder: filtered = [] for folder in paper_folders: if start_folder and folder < start_folder: continue if end_folder and folder > end_folder: continue filtered.append(folder) paper_folders = filtered total_papers = len(paper_folders) completed_papers = 0 failed_papers = [] # Create progress log file log_file = os.path.join(base_dir, "processing_log.txt") print(f"🚀 Starting pipeline: {total_papers} papers to process") print(f"📁 Papers directory: {base_dir}") print(f"📝 Progress log: {log_file}") print("=" * 60) for paper_folder in paper_folders: paper_path = os.path.join(base_dir, paper_folder) # Show overall progress remaining = total_papers - completed_papers print(f"\n📊 Progress: {completed_papers}/{total_papers} completed, {remaining} remaining") print(f"🔄 Processing paper {paper_folder}...") # Skip if the paper directory doesn't exist if not os.path.exists(paper_path): print(f"⏭️ Skipping {paper_folder} - directory not found") continue try: pages_path = os.path.join(paper_path, "pages/") if not os.path.exists(pages_path): print(f"⏭️ Skipping {paper_folder} - pages directory not found") continue # Check if merged_v2.json exists json_path = os.path.join(pages_path, "merged_v2.json") if not os.path.exists(json_path): print(f"⏭️ Skipping {paper_folder} - merged_v2.json not found") continue # Process the paper with progress tracking questions = _get_questions() print(f" 📖 Processing {len(questions)} questions...") # Temporarily reduce logging verbosity and suppress all output during processing import logging import sys from io import StringIO # Capture and suppress all output during processing original_stdout = sys.stdout original_stderr = sys.stderr original_log_level = logging.getLogger().level # Suppress all output sys.stdout = StringIO() sys.stderr = StringIO() logging.getLogger().setLevel(logging.ERROR) try: literature_answers = await get_literature_answers( json_path, relevance_model=relevance_model, answer_model=answer_model ) finally: # Restore all output sys.stdout = original_stdout sys.stderr = original_stderr logging.getLogger().setLevel(original_log_level) # Merge with existing answers.json if it exists answers_path = os.path.join(paper_path, "answers.json") # Load existing answers if the file exists existing_answers = {} if os.path.exists(answers_path): try: with open(answers_path, "r") as f: existing_data = json.load(f) # Handle both old format (direct dict) and new format (with QUESTIONS key) if "QUESTIONS" in existing_data: existing_answers = existing_data["QUESTIONS"] else: existing_answers = existing_data print(f" 📝 Found existing answers with {len(existing_answers)} question(s)") except Exception as e: print(f" ⚠️ Could not read existing answers: {e}") # Merge new answers with existing ones # New answers will update existing keys, but won't delete old keys if existing_answers: # Preserve existing answers that aren't in the new results for key in existing_answers: if key not in literature_answers: literature_answers[key] = existing_answers[key] print(f" 🔄 Merged answers: {len(literature_answers)} total question(s)") # Save the merged results in QUESTIONS format output_data = {"QUESTIONS": literature_answers} with open(answers_path, "w") as f: json.dump(output_data, f, indent=2) completed_papers += 1 print(f" ✅ Paper {paper_folder} completed successfully") # Log completion with open(log_file, "a") as f: f.write(f"{paper_folder}: COMPLETED at {time.strftime('%Y-%m-%d %H:%M:%S')}\n") except Exception as e: print(f" ❌ Error processing paper {paper_folder}: {str(e)}") failed_papers.append(paper_folder) # Log failure with open(log_file, "a") as f: f.write(f"{paper_folder}: FAILED at {time.strftime('%Y-%m-%d %H:%M:%S')} - {str(e)}\n") continue # Final summary print("\n" + "=" * 60) print("🎉 PIPELINE COMPLETED!") print(f"✅ Successfully processed: {completed_papers}/{total_papers} papers") if failed_papers: print(f"❌ Failed papers: {', '.join(failed_papers)}") print(f"📝 Detailed log: {log_file}") def main(argv=None): """Main entry point.""" if argv is None: argv = sys.argv[1:] parser = argparse.ArgumentParser(description="Process paper folders to extract literature answers") # YAML config file path (sets METABEEAI_CONFIG_FILE for downstream lookups) parser.add_argument( "--config", "--config-file", dest="config", type=str, default=None, help="Path to config YAML file (overrides METABEEAI_CONFIG_FILE and defaults)", ) # Base directory and selection of folders parser.add_argument( "--dir", type=str, default=None, help="Base directory containing paper folders (default: auto-detect from config)" ) parser.add_argument( "--papers", type=str, nargs="+", default=None, help=("Specific paper IDs to process (e.g., 283C6B42 3ZHNVADM). " "If not specified, all folders will be processed."), ) parser.add_argument( "--start", type=str, default=None, help="Start processing from this paper ID (alphanumeric, optional; only applies when --papers is not set)", ) parser.add_argument( "--end", type=str, default=None, help="End processing at this paper ID (alphanumeric, optional; only applies when --papers is not set)", ) parser.add_argument("--overwrite", action="store_true", help="Overwrite existing merged.json files") # Models parser.add_argument( "--relevance-model", type=str, default=None, help="Model to use for chunk selection (e.g., 'openai/gpt-4o-mini', 'openai/gpt-4o'). Default: from config", ) parser.add_argument( "--answer-model", type=str, default=None, help=( "Model to use for answer generation and reflection (e.g., 'openai/gpt-4o-mini', 'openai/gpt-4o'). " "Default: from config" ), ) # Preset selector (fast/balanced/quality) parser.add_argument( "--preset", type=str, choices=["fast", "balanced", "quality"], default=None, help="Use predefined configuration preset: 'fast', 'balanced', or 'quality'", ) args = parser.parse_args(argv) # Respect provided config file for downstream lookups if args.config: os.environ["METABEEAI_CONFIG_FILE"] = args.config # Handle predefined configurations if args.preset: from metabeeai.metabeeai_llm.pipeline_config import BALANCED_CONFIG, FAST_CONFIG, QUALITY_CONFIG config_map = {"fast": FAST_CONFIG, "balanced": BALANCED_CONFIG, "quality": QUALITY_CONFIG} selected_config = config_map[args.preset] # Override model arguments with config values if not explicitly provided if args.relevance_model is None: args.relevance_model = selected_config["relevance_model"] if args.answer_model is None: args.answer_model = selected_config["answer_model"] print(f"🔧 Using {args.preset.upper()} configuration:") print(f" Relevance Model: {args.relevance_model}") print(f" Answer Model: {args.answer_model}") print(f" Description: {selected_config['description']}") import asyncio asyncio.run( process_papers( base_dir=args.dir, paper_folders=args.papers, overwrite_merged=args.overwrite, relevance_model=args.relevance_model, answer_model=args.answer_model, start_folder=args.start, end_folder=args.end, ) ) if __name__ == "__main__": main(sys.argv[1:])