440 lines
No EOL
17 KiB
Python
440 lines
No EOL
17 KiB
Python
from pdf2image import convert_from_path
|
|
import os
|
|
import json
|
|
from pathlib import Path
|
|
import tempfile
|
|
import base64
|
|
import anthropic
|
|
from typing import List, Dict
|
|
import time
|
|
import argparse
|
|
import sys
|
|
from tqdm import tqdm
|
|
import logging
|
|
from datetime import datetime
|
|
from metadata_writer import PDFMetadataWriter
|
|
|
|
|
|
class PDFProcessor:
|
|
def __init__(self, input_dir: str, output_dir: str, api_key: str, logger: logging.Logger = None):
|
|
self.input_dir = Path(input_dir)
|
|
self.output_dir = Path(output_dir)
|
|
self.output_dir.mkdir(parents=True, exist_ok=True)
|
|
self.temp_dir = Path(tempfile.mkdtemp())
|
|
self.client = anthropic.Client(api_key=api_key)
|
|
self.logger = logger or self._setup_default_logger()
|
|
|
|
def _setup_default_logger(self) -> logging.Logger:
|
|
"""Setup default logger if none provided"""
|
|
logger = logging.getLogger('PDFProcessor')
|
|
logger.setLevel(logging.INFO)
|
|
return logger
|
|
|
|
def encode_image(self, image_path: str) -> str:
|
|
"""Convert image to base64 for API"""
|
|
with open(image_path, 'rb') as image_file:
|
|
return base64.b64encode(image_file.read()).decode('utf-8')
|
|
|
|
def analyze_image(self, image_path: str) -> Dict:
|
|
"""Analyze a single image using Claude Vision API"""
|
|
try:
|
|
self.logger.debug(f"Analyzing image: {image_path}")
|
|
with open(image_path, 'rb') as img:
|
|
message = self.client.messages.create(
|
|
model="claude-3-opus-20240229",
|
|
max_tokens=1000,
|
|
messages=[{
|
|
"role": "user",
|
|
"content": [
|
|
{
|
|
"type": "text",
|
|
"text": """Analyze this magazine cover and extract the following metadata:
|
|
1. Magazine Title
|
|
2. Issue Date/Publication Date
|
|
3. Publisher
|
|
4. Issue Number
|
|
|
|
Format your response as JSON with these exact keys:
|
|
{
|
|
"title": string,
|
|
"date": string,
|
|
"publisher": string,
|
|
"issue_number": string,
|
|
"confidence": "high|medium|low"
|
|
}
|
|
|
|
If any field cannot be determined, use null. Set confidence based on how clear the information is."""
|
|
},
|
|
{
|
|
"type": "image",
|
|
"source": {
|
|
"type": "base64",
|
|
"media_type": "image/jpeg",
|
|
"data": self.encode_image(image_path)
|
|
}
|
|
}
|
|
]
|
|
}]
|
|
)
|
|
|
|
# Parse the JSON response
|
|
response_text = message.content[0].text
|
|
metadata = json.loads(response_text)
|
|
self.logger.debug(f"Successfully extracted metadata from {image_path}")
|
|
return metadata
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error analyzing image {image_path}: {str(e)}")
|
|
return {
|
|
"title": None,
|
|
"date": None,
|
|
"publisher": None,
|
|
"issue_number": None,
|
|
"confidence": "error"
|
|
}
|
|
|
|
def process_pdfs(self) -> List[Dict]:
|
|
"""Process all PDFs in the input directory"""
|
|
pdf_files = list(self.input_dir.glob('*.pdf'))
|
|
results = []
|
|
|
|
# Setup progress bar
|
|
pbar = tqdm(pdf_files, desc="Processing PDFs", unit="file")
|
|
|
|
for pdf_path in pbar:
|
|
try:
|
|
# Update progress bar description
|
|
pbar.set_description(f"Processing {pdf_path.name}")
|
|
|
|
result = self.process_single_pdf(pdf_path)
|
|
results.append(result)
|
|
|
|
# Update progress bar postfix with confidence
|
|
confidence = result.get('metadata', {}).get('confidence', 'unknown')
|
|
pbar.set_postfix(confidence=confidence)
|
|
|
|
# Small delay to respect API rate limits
|
|
time.sleep(1)
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error processing {pdf_path}: {str(e)}")
|
|
results.append({
|
|
'pdf_path': str(pdf_path),
|
|
'status': 'error',
|
|
'error': str(e)
|
|
})
|
|
|
|
# Save results to JSON
|
|
results_file = self.output_dir / 'processing_results.json'
|
|
with open(results_file, 'w', encoding='utf-8') as f:
|
|
json.dump(results, f, indent=4, ensure_ascii=False)
|
|
|
|
self.logger.info(f"Results saved to {results_file}")
|
|
return results
|
|
|
|
def process_single_pdf(self, pdf_path: Path) -> Dict:
|
|
"""Process a single PDF file"""
|
|
self.logger.info(f"Processing: {pdf_path}")
|
|
|
|
# Convert first page to image
|
|
self.logger.debug(f"Converting first page of {pdf_path} to image")
|
|
images = convert_from_path(pdf_path, first_page=1, last_page=1)
|
|
if not images:
|
|
raise Exception("Could not extract first page")
|
|
|
|
# Save first page image
|
|
first_page = images[0]
|
|
image_path = self.temp_dir / f"{pdf_path.stem}_page1.jpg"
|
|
first_page.save(str(image_path), 'JPEG')
|
|
self.logger.debug(f"Saved first page as image: {image_path}")
|
|
|
|
# Analyze the image
|
|
metadata = self.analyze_image(str(image_path))
|
|
|
|
return {
|
|
'pdf_path': str(pdf_path),
|
|
'image_path': str(image_path),
|
|
'metadata': metadata,
|
|
'status': 'completed',
|
|
'processed_at': datetime.now().isoformat()
|
|
}
|
|
|
|
def setup_logging(output_dir: Path, debug: bool = False) -> logging.Logger:
|
|
"""Setup logging configuration"""
|
|
logger = logging.getLogger('PDFProcessor')
|
|
logger.setLevel(logging.DEBUG if debug else logging.INFO)
|
|
|
|
# Create handlers
|
|
console_handler = logging.StreamHandler()
|
|
console_handler.setLevel(logging.INFO)
|
|
|
|
file_handler = logging.FileHandler(
|
|
output_dir / f'pdf_processor_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log'
|
|
)
|
|
file_handler.setLevel(logging.DEBUG)
|
|
|
|
# Create formatters
|
|
console_formatter = logging.Formatter('%(message)s')
|
|
file_formatter = logging.Formatter(
|
|
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
|
|
# Set formatters
|
|
console_handler.setFormatter(console_formatter)
|
|
file_handler.setFormatter(file_formatter)
|
|
|
|
# Add handlers
|
|
logger.addHandler(console_handler)
|
|
logger.addHandler(file_handler)
|
|
|
|
return logger
|
|
def __init__(self, input_dir: str, output_dir: str, api_key: str):
|
|
self.input_dir = Path(input_dir)
|
|
self.output_dir = Path(output_dir)
|
|
self.output_dir.mkdir(parents=True, exist_ok=True)
|
|
self.temp_dir = Path(tempfile.mkdtemp())
|
|
self.client = anthropic.Client(api_key=api_key)
|
|
|
|
def encode_image(self, image_path: str) -> str:
|
|
"""Convert image to base64 for API"""
|
|
with open(image_path, 'rb') as image_file:
|
|
return base64.b64encode(image_file.read()).decode('utf-8')
|
|
|
|
def analyze_image(self, image_path: str) -> Dict:
|
|
"""Analyze a single image using Claude Vision API"""
|
|
try:
|
|
with open(image_path, 'rb') as img:
|
|
message = self.client.messages.create(
|
|
model="claude-3-opus-20240229",
|
|
max_tokens=1000,
|
|
messages=[{
|
|
"role": "user",
|
|
"content": [
|
|
{
|
|
"type": "text",
|
|
"text": """Analyze this magazine cover and extract the following metadata:
|
|
1. Magazine Title
|
|
2. Issue Date/Publication Date
|
|
3. Publisher
|
|
4. Issue Number
|
|
|
|
Format your response as JSON with these exact keys:
|
|
{
|
|
"title": string,
|
|
"date": string,
|
|
"publisher": string,
|
|
"issue_number": string,
|
|
"confidence": "high|medium|low"
|
|
}
|
|
|
|
If any field cannot be determined, use null. Set confidence based on how clear the information is."""
|
|
},
|
|
{
|
|
"type": "image",
|
|
"source": {
|
|
"type": "base64",
|
|
"media_type": "image/jpeg",
|
|
"data": self.encode_image(image_path)
|
|
}
|
|
}
|
|
]
|
|
}]
|
|
)
|
|
|
|
# Parse the JSON response
|
|
response_text = message.content[0].text
|
|
metadata = json.loads(response_text)
|
|
return metadata
|
|
|
|
except Exception as e:
|
|
print(f"Error analyzing image {image_path}: {str(e)}")
|
|
return {
|
|
"title": None,
|
|
"date": None,
|
|
"publisher": None,
|
|
"issue_number": None,
|
|
"confidence": "error"
|
|
}
|
|
|
|
def process_pdfs(self) -> List[Dict]:
|
|
"""Process all PDFs in the input directory"""
|
|
pdf_files = list(self.input_dir.glob('*.pdf'))
|
|
results = []
|
|
|
|
for pdf_path in pdf_files:
|
|
try:
|
|
result = self.process_single_pdf(pdf_path)
|
|
results.append(result)
|
|
# Small delay to respect API rate limits
|
|
time.sleep(1)
|
|
except Exception as e:
|
|
print(f"Error processing {pdf_path}: {str(e)}")
|
|
results.append({
|
|
'pdf_path': str(pdf_path),
|
|
'status': 'error',
|
|
'error': str(e)
|
|
})
|
|
|
|
# Save results to JSON
|
|
with open(self.output_dir / 'processing_results.json', 'w', encoding='utf-8') as f:
|
|
json.dump(results, f, indent=4, ensure_ascii=False)
|
|
|
|
return results
|
|
|
|
def process_single_pdf(self, pdf_path: Path) -> Dict:
|
|
"""Process a single PDF file"""
|
|
print(f"Processing: {pdf_path}")
|
|
|
|
# Convert first page to image
|
|
images = convert_from_path(pdf_path, first_page=1, last_page=1)
|
|
if not images:
|
|
raise Exception("Could not extract first page")
|
|
|
|
# Save first page image
|
|
first_page = images[0]
|
|
image_path = self.temp_dir / f"{pdf_path.stem}_page1.jpg"
|
|
first_page.save(str(image_path), 'JPEG')
|
|
|
|
# Analyze the image
|
|
metadata = self.analyze_image(str(image_path))
|
|
|
|
return {
|
|
'pdf_path': str(pdf_path),
|
|
'image_path': str(image_path),
|
|
'metadata': metadata,
|
|
'status': 'completed'
|
|
}
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description='Process PDFs to extract magazine metadata using Claude Vision API',
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog='''
|
|
Examples:
|
|
# Process all PDFs in current directory
|
|
python pdf_processor.py -i . -o results
|
|
|
|
# Process specific PDFs with custom batch size
|
|
python pdf_processor.py -i /path/to/pdfs -o /path/to/output --pattern "magazine_*.pdf"
|
|
|
|
# Test mode with single file
|
|
python pdf_processor.py -i /path/to/pdfs -o /path/to/output --test
|
|
'''
|
|
)
|
|
|
|
parser.add_argument('-i', '--input-dir',
|
|
required=True,
|
|
help='Directory containing PDF files')
|
|
|
|
parser.add_argument('-o', '--output-dir',
|
|
required=True,
|
|
help='Directory for output files')
|
|
|
|
parser.add_argument('--pattern',
|
|
default='*.pdf',
|
|
help='Glob pattern for PDF files (default: *.pdf)')
|
|
|
|
parser.add_argument('--api-key',
|
|
help='Anthropic API key (alternative to env variable)')
|
|
|
|
parser.add_argument('--test',
|
|
action='store_true',
|
|
help='Test mode: process only first PDF file')
|
|
|
|
parser.add_argument('--skip-existing',
|
|
action='store_true',
|
|
help='Skip PDFs that already have results in output directory')
|
|
|
|
parser.add_argument('--no-cleanup',
|
|
action='store_true',
|
|
help='Keep temporary image files after processing')
|
|
|
|
parser.add_argument('--debug',
|
|
action='store_true',
|
|
help='Enable debug logging')
|
|
|
|
parser.add_argument('--write-metadata',
|
|
action='store_true',
|
|
help='Write extracted metadata back to PDF files')
|
|
|
|
parser.add_argument('--no-backup',
|
|
action='store_true',
|
|
help='Skip creating backups when writing metadata')
|
|
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Validate directories
|
|
input_dir = Path(args.input_dir)
|
|
output_dir = Path(args.output_dir)
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
logger = setup_logging(output_dir, args.debug)
|
|
|
|
if not input_dir.exists():
|
|
print(f"Error: Input directory '{input_dir}' does not exist")
|
|
sys.exit(1)
|
|
|
|
# Get API key
|
|
api_key = args.api_key or os.getenv('ANTHROPIC_API_KEY')
|
|
if not api_key:
|
|
print("Error: No API key provided. Either set ANTHROPIC_API_KEY environment variable "
|
|
"or use --api-key option")
|
|
sys.exit(1)
|
|
|
|
try:
|
|
logger.info("Starting PDF processing...")
|
|
# Initialize processor
|
|
processor = PDFProcessor(str(input_dir), str(output_dir), api_key)
|
|
|
|
# Modify glob pattern if in test mode
|
|
if args.test:
|
|
logger.info("Running in test mode - will process only first PDF file")
|
|
pdf_files = list(input_dir.glob(args.pattern))[:1]
|
|
else:
|
|
pdf_files = list(input_dir.glob(args.pattern))
|
|
|
|
if not pdf_files:
|
|
logger.info(f"No PDF files found in '{input_dir}' matching pattern '{args.pattern}'")
|
|
sys.exit(1)
|
|
|
|
logger.info(f"Found {len(pdf_files)} PDF files to process")
|
|
|
|
# Process files
|
|
results = processor.process_pdfs()
|
|
|
|
if args.write_metadata:
|
|
logger.info("Writing metadata back to PDF files...")
|
|
writer = PDFMetadataWriter(logger)
|
|
stats = writer.batch_write_metadata(results, backup=not args.no_backup)
|
|
|
|
logger.info("\nMetadata Writing Results:")
|
|
logger.info(f"Successfully updated: {stats['success_count']} files")
|
|
logger.info(f"Failed to update: {stats['failure_count']} files")
|
|
|
|
if stats['failure_count'] > 0:
|
|
logger.info("\nFailed files:")
|
|
for failed_file in stats['failure']:
|
|
logger.info(f" - {failed_file}")
|
|
|
|
# Cleanup temporary files unless --no-cleanup was specified
|
|
if not args.no_cleanup:
|
|
logger.info("Cleaning up temporary files...")
|
|
for result in results:
|
|
if 'image_path' in result:
|
|
try:
|
|
Path(result['image_path']).unlink()
|
|
except Exception as e:
|
|
logger.info(f"Warning: Could not delete temporary file {result['image_path']}: {e}")
|
|
|
|
logger.info(f"\nProcessed {len(results)} PDF files")
|
|
logger.info(f"Results saved to: {processor.output_dir}/processing_results.json")
|
|
|
|
except KeyboardInterrupt:
|
|
logger.info("\nOperation cancelled by user")
|
|
sys.exit(1)
|
|
except Exception as e:
|
|
logger.error(f"Error: {str(e)}")
|
|
sys.exit(1)
|
|
|
|
if __name__ == "__main__":
|
|
main() |