Source code for tools.pdf_generator

"""Advanced PDF Generator Tool.

Creates complex PDFs from JSON specifications with text, images, tables,
headers, links, and various styling options across multiple pages.
"""

from __future__ import annotations

import asyncio
import io
import json
import logging
import os
import re
import tempfile
import uuid
from typing import Dict, List, Optional, Any, Union, Tuple, TYPE_CHECKING

if TYPE_CHECKING:
    from tool_context import ToolContext

try:
    from reportlab.lib import colors
    from reportlab.lib.pagesizes import letter, A4
    from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
    from reportlab.lib.units import inch
    from reportlab.pdfgen import canvas
    from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle, Image, PageBreak
    from reportlab.platypus.flowables import KeepTogether, Flowable
    import httpx
    from PIL import Image as PILImage
except ImportError as e:
    logging.error(f"Required dependencies not installed: {e}")
    colors = None
    canvas = None
    SimpleDocTemplate = None
    Flowable = object

logger = logging.getLogger(__name__)


[docs] class LinkFlowable(Flowable): """LinkFlowable (inherits from Flowable). Attributes: text: The text. url: The url. style: The style. width: The width. height: The height. paragraph: The paragraph. """
[docs] def __init__(self, text, url, style=None): """Initialize the instance. Args: text: Text content. url: URL string. style: The style value. """ Flowable.__init__(self) self.text = text self.url = url self.style = style or getSampleStyleSheet()['Normal'] self.width = 0 self.height = 0 self.paragraph = Paragraph(self.text, self.style)
[docs] def wrap(self, availWidth, availHeight): """Wrap. Args: availWidth: The availWidth value. availHeight: The availHeight value. """ self.width, self.height = self.paragraph.wrap(availWidth, availHeight) return self.width, self.height
[docs] def draw(self): """Draw. """ self.paragraph.drawOn(self.canv, 0, 0) self.canv.linkURL(self.url, (0, 0, self.width, self.height), relative=0)
[docs] class PDFGenerator: """PDFGenerator. Attributes: page_size: The page size. styles: The styles. """
[docs] def __init__(self, page_size: str = "A4"): """Initialize the instance. Args: page_size (str): The page size value. """ if not all([colors, canvas, SimpleDocTemplate]): raise ImportError("Required PDF generation dependencies not available") self.page_size = A4 if page_size.upper() == "A4" else letter self.width, self.height = self.page_size self.styles = getSampleStyleSheet() self.styles.add(ParagraphStyle(name='CustomHeader', parent=self.styles['Heading1'], fontSize=24, spaceAfter=30, alignment=1)) self.styles.add(ParagraphStyle(name='CustomSubHeader', parent=self.styles['Heading2'], fontSize=18, spaceAfter=20)) self.styles.add(ParagraphStyle(name='CustomBody', parent=self.styles['Normal'], fontSize=12, spaceAfter=12))
def _parse_color(self, color_spec): """Internal helper: parse color. Args: color_spec: The color spec value. """ if isinstance(color_spec, str): color_map = {'red': colors.red, 'blue': colors.blue, 'green': colors.green, 'black': colors.black, 'white': colors.white, 'gray': colors.gray} return color_map.get(color_spec.lower(), colors.black) elif isinstance(color_spec, list) and len(color_spec) >= 3: r, g, b = color_spec[:3] a = color_spec[3] if len(color_spec) == 4 else 255 return colors.Color(r/255.0, g/255.0, b/255.0, a/255.0) return colors.black def _download_image(self, url): """Internal helper: download image. Args: url: URL string. """ from tools._safe_http import assert_safe_http_url try: url = assert_safe_http_url(str(url).strip()) except ValueError as exc: logger.error("Blocked PDF image URL: %s", exc) return None try: with httpx.Client(timeout=10) as client: response = client.get(url) response.raise_for_status() return response.content except Exception as e: logger.error(f"Failed to download image from {url}: {e}") return None def _get_font_name(self, base_font, is_bold, is_italic): """Internal helper: get font name. Args: base_font: The base font value. is_bold: The is bold value. is_italic: The is italic value. """ font_styles = { 'helvetica': {(True, True): 'Helvetica-BoldOblique', (True, False): 'Helvetica-Bold', (False, True): 'Helvetica-Oblique', (False, False): 'Helvetica'}, 'times': {(True, True): 'Times-BoldItalic', (True, False): 'Times-Bold', (False, True): 'Times-Italic', (False, False): 'Times-Roman'}, 'courier': {(True, True): 'Courier-BoldOblique', (True, False): 'Courier-Bold', (False, True): 'Courier-Oblique', (False, False): 'Courier'}, } base = base_font.lower() if base in font_styles: return font_styles[base][(is_bold, is_italic)] suffix = '-BoldItalic' if is_bold and is_italic else '-Bold' if is_bold else '-Italic' if is_italic else '' return base_font + suffix def _create_text_element(self, content, style_spec): """Internal helper: create text element. Args: content: Content data. style_spec: The style spec value. """ if style_spec.get('parse_markdown_links', False): return self._parse_markdown_links(content, style_spec) base_style_name = style_spec.get('base_style', 'Normal') if base_style_name not in self.styles: base_style_name = 'Normal' style = self.styles[base_style_name] if 'size' in style_spec or 'font_size' in style_spec: style.fontSize = style_spec.get('size', style_spec.get('font_size', style.fontSize)) if 'color' in style_spec: style.textColor = self._parse_color(style_spec['color']) font_style = style_spec.get('font_style', '') is_bold = style_spec.get('bold', False) or 'B' in font_style.upper() is_italic = style_spec.get('italic', False) or 'I' in font_style.upper() base_font = style_spec.get('font', 'Helvetica') style.fontName = self._get_font_name(base_font, is_bold, is_italic) if 'align' in style_spec: align_map = {'left': 0, 'center': 1, 'right': 2, 'justify': 4, 'c': 1, 'l': 0, 'r': 2, 'j': 4} style.alignment = align_map.get(style_spec['align'].lower(), 0) return Paragraph(content, style) def _parse_markdown_links(self, text, style_spec): """Internal helper: parse markdown links. Args: text: Text content. style_spec: The style spec value. """ link_pattern = r'\[([^\]]+)\]\(([^)]+)\)' elements = [] last_end = 0 for match in re.finditer(link_pattern, text): if match.start() > last_end: pre_text = text[last_end:match.start()] if pre_text.strip(): elements.append(Paragraph(pre_text, self.styles['Normal'])) link_style = ParagraphStyle(name='MarkdownLink', parent=self.styles['Normal'], textColor=colors.blue, underline=True) if 'font_size' in style_spec or 'size' in style_spec: link_style.fontSize = style_spec.get('size', style_spec.get('font_size', link_style.fontSize)) elements.append(LinkFlowable(match.group(1), match.group(2), link_style)) last_end = match.end() if last_end < len(text): post_text = text[last_end:] if post_text.strip(): elements.append(Paragraph(post_text, self.styles['Normal'])) return elements or [Paragraph(text, self.styles['Normal'])] def _create_title_element(self, title_spec): """Internal helper: create title element. Args: title_spec: The title spec value. """ text = title_spec.get('text', '') if not text: return None title_style = ParagraphStyle(name='Title', parent=self.styles['Heading1'], fontSize=title_spec.get('font_size', 28), spaceAfter=40, alignment=1, fontName='Helvetica-Bold') if 'align' in title_spec: align_map = {'left': 0, 'center': 1, 'right': 2, 'c': 1, 'l': 0, 'r': 2} title_style.alignment = align_map.get(title_spec['align'].lower(), 1) return Paragraph(text, title_style) def _create_header_element(self, header_spec): """Internal helper: create header element. Args: header_spec: The header spec value. """ text = header_spec.get('text', '') if not text: return None level = max(1, min(6, header_spec.get('level', 1))) style_map = {1: 'Heading1', 2: 'Heading2', 3: 'Heading3', 4: 'Heading4', 5: 'Heading5', 6: 'Normal'} header_style = ParagraphStyle(name=f'Header{level}', parent=self.styles[style_map.get(level, 'Heading1')], spaceAfter=20 - level * 2) if 'font_size' in header_spec: header_style.fontSize = header_spec['font_size'] if 'align' in header_spec: align_map = {'left': 0, 'center': 1, 'right': 2, 'c': 1, 'l': 0, 'r': 2} header_style.alignment = align_map.get(header_spec['align'].lower(), 0) return Paragraph(text, header_style) def _create_link_element(self, link_spec): """Internal helper: create link element. Args: link_spec: The link spec value. """ text = link_spec.get('text', '') url = link_spec.get('url', '') if not text or not url: return None link_style = ParagraphStyle(name='Link', parent=self.styles['Normal'], textColor=colors.blue, underline=True) if 'font_size' in link_spec: link_style.fontSize = link_spec['font_size'] return LinkFlowable(text, url, link_style) def _create_image_element(self, image_spec): """Internal helper: create image element. Args: image_spec: The image spec value. """ try: image_data = None if 'url' in image_spec: image_data = self._download_image(image_spec['url']) elif 'base64' in image_spec: import base64 image_data = base64.b64decode(image_spec['base64']) elif 'path' in image_spec: with open(image_spec['path'], 'rb') as f: image_data = f.read() if not image_data: return None temp_file = tempfile.NamedTemporaryFile(suffix='.png', delete=False) try: pil_image = PILImage.open(io.BytesIO(image_data)) pil_image.save(temp_file.name, 'PNG') temp_file.close() img = Image(temp_file.name) if 'width' in image_spec: img.drawWidth = image_spec['width'] if 'height' in image_spec: img.drawHeight = image_spec['height'] caption = image_spec.get('caption') if caption: caption_style = ParagraphStyle(name='ImageCaption', parent=self.styles['Normal'], fontSize=10, spaceBefore=5, fontName='Helvetica-Oblique') return KeepTogether([img, Paragraph(caption, caption_style)]) return img finally: try: os.unlink(temp_file.name) except Exception: pass except Exception as e: logger.error(f"Failed to create image element: {e}") return None def _create_table_element(self, table_spec): """Internal helper: create table element. Args: table_spec: The table spec value. """ try: headers = table_spec.get('headers', []) data = table_spec.get('data', []) if not headers and not data: return None if not headers and data: headers = data[0] data = data[1:] if len(data) > 1 else [] table_data = [headers] + data if headers else data table = Table(table_data) style_commands = [] if headers: style_commands.extend([ ('BACKGROUND', (0, 0), (-1, 0), colors.grey), ('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke), ('ALIGN', (0, 0), (-1, 0), 'CENTER'), ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), ('FONTSIZE', (0, 0), (-1, 0), 14), ('BOTTOMPADDING', (0, 0), (-1, 0), 12), ('BACKGROUND', (0, 1), (-1, -1), colors.beige), ]) style_commands.append(('GRID', (0, 0), (-1, -1), 1, colors.black)) table.setStyle(TableStyle(style_commands)) return table except Exception as e: logger.error(f"Failed to create table element: {e}") return None
[docs] def generate_pdf(self, pdf_spec): """Generate pdf. Args: pdf_spec: The pdf spec value. """ try: buffer = io.BytesIO() doc = SimpleDocTemplate(buffer, pagesize=self.page_size, rightMargin=72, leftMargin=72, topMargin=72, bottomMargin=18) story = [] pages = pdf_spec.get('pages', [{"elements": []}]) for page_spec in pages: for element in page_spec.get('elements', []): element_type = element.get('type', '').lower() if element_type in ('text', 'paragraph'): text_elem = self._create_text_element(element.get('text', element.get('content', '')), element) if isinstance(text_elem, list): story.extend(text_elem) else: story.append(text_elem) elif element_type == 'title': elem = self._create_title_element(element) if elem: story.append(elem) elif element_type == 'header': elem = self._create_header_element(element) if elem: story.append(elem) elif element_type == 'image': elem = self._create_image_element(element) if elem: story.append(elem) elif element_type == 'table': elem = self._create_table_element(element) if elem: story.append(elem) elif element_type == 'spacer': story.append(Spacer(1, element.get('height', 12))) elif element_type == 'link': elem = self._create_link_element(element) if elem: story.append(elem) elif element_type == 'newpage': story.append(PageBreak()) continue story.append(Spacer(1, 12)) if page_spec != pages[-1]: story.append(PageBreak()) doc.build(story) pdf_content = buffer.getvalue() buffer.close() return pdf_content except Exception as e: logger.error(f"Failed to generate PDF: {e}", exc_info=True) return None
def _validate_pdf_spec(pdf_spec): """Internal helper: validate pdf spec. Args: pdf_spec: The pdf spec value. """ if not isinstance(pdf_spec, dict): return False, "PDF specification must be a dictionary" if 'elements' in pdf_spec: pdf_spec['pages'] = [{"elements": pdf_spec.pop('elements')}] if 'pages' not in pdf_spec: return False, "PDF specification must contain 'pages' key" pages = pdf_spec['pages'] if not isinstance(pages, list): return False, "'pages' must be a list" supported = {'title', 'header', 'paragraph', 'text', 'image', 'table', 'spacer', 'newpage', 'link'} for i, page in enumerate(pages): if not isinstance(page, dict) or 'elements' not in page: return False, f"Page {i} must be a dictionary with 'elements' key" for j, element in enumerate(page['elements']): if not isinstance(element, dict) or 'type' not in element: return False, f"Element {j} on page {i} must have 'type' key" if element['type'].lower() not in supported: return False, f"Unsupported element type '{element['type']}'" return True, "Valid" TOOL_NAME = "generate_pdf" TOOL_DESCRIPTION = ( "Generate a PDF from a JSON specification with pages containing " "text, titles, headers, images, tables, links, spacers, and page breaks. " "Returns the file path to the generated PDF." ) TOOL_PARAMETERS = { "type": "object", "properties": { "pdf_spec": { "type": "string", "description": ( "JSON string containing PDF specification with 'pages' array. " "Element types: title, header, paragraph/text, image, table, spacer, link, newpage." ), }, "filename": { "type": "string", "description": "Optional filename for the PDF (defaults to auto-generated).", }, }, "required": ["pdf_spec"], }
[docs] async def run(pdf_spec: str, filename: str = None, ctx: ToolContext | None = None) -> str: """Execute this tool and return the result. Args: pdf_spec (str): The pdf spec value. filename (str): The filename value. ctx (ToolContext | None): Tool execution context providing access to bot internals. Returns: str: Result string. """ try: try: if len(pdf_spec) >= 256 * 1024: spec_dict = await asyncio.to_thread(json.loads, pdf_spec) else: spec_dict = json.loads(pdf_spec) except json.JSONDecodeError as e: return f"ERROR: Invalid JSON specification: {e}" is_valid, error_msg = _validate_pdf_spec(spec_dict) if not is_valid: return f"ERROR: Invalid PDF specification: {error_msg}" if not filename: title = spec_dict.get('title', 'generated_pdf') safe_title = "".join(c for c in title if c.isalnum() or c in (' ', '-', '_')).rstrip() filename = f"{safe_title}_{uuid.uuid4().hex[:8]}.pdf" if not filename.lower().endswith('.pdf'): filename += '.pdf' page_size = spec_dict.get('page_size', 'A4') def _build_pdf() -> tuple[str, int]: """Internal helper: build pdf. Returns: tuple[str, int]: The result. """ generator = PDFGenerator(page_size=page_size) pdf_content = generator.generate_pdf(spec_dict) if not pdf_content: raise RuntimeError("Failed to generate PDF content") tmp_dir = tempfile.mkdtemp() filepath = os.path.join(tmp_dir, filename) with open(filepath, 'wb') as f: f.write(pdf_content) return filepath, len(pdf_content) filepath, size = await asyncio.to_thread(_build_pdf) return f"PDF generated: {filepath} ({size} bytes)" except Exception as e: logger.error(f"Unexpected error in PDF generation: {e}", exc_info=True) return f"ERROR: Unexpected error occurred: {e}"