Skip to content

Handlers Module

Overview

File format handlers for different document types.

File type-specific handlers package.

Modules:

Name Description
csv

CSV file handler for text extraction.

doc

DOC file handler for text extraction.

docx

DOCX file handler for comprehensive text extraction.

html

HTML file handler for text extraction.

json

JSON file handler for text extraction.

md

Markdown (.md) file handler for text extraction.

pdf

PDF file handler for text extraction.

rtf

RTF file handler for text extraction.

txt

TXT file handler for text extraction.

xml

XML file handler for text extraction.

zip

ZIP file handler for text extraction.

Modules

csv

CSV file handler for text extraction.

Classes:

Name Description
CSVHandler

Handler for extracting text from CSV files.

Classes

CSVHandler

Bases: FileTypeHandler

Handler for extracting text from CSV files.

Methods:

Name Description
extract
extract_async
Source code in textxtract/handlers/csv.py
class CSVHandler(FileTypeHandler):
    """Handler for extracting text from CSV files."""

    def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
        try:
            encoding = (config or {}).get("encoding", "utf-8")
            with open(file_path, "r", encoding=encoding, newline="") as f:
                reader = csv.reader(f)
                return "\n".join([", ".join(row) for row in reader])
        except Exception as e:
            raise ExtractionError(f"CSV extraction failed: {e}")

    async def extract_async(
        self, file_path: Path, config: Optional[dict] = None
    ) -> str:
        import asyncio

        return await asyncio.to_thread(self.extract, file_path, config)
Functions
extract
extract(file_path, config=None)
Source code in textxtract/handlers/csv.py
def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
    try:
        encoding = (config or {}).get("encoding", "utf-8")
        with open(file_path, "r", encoding=encoding, newline="") as f:
            reader = csv.reader(f)
            return "\n".join([", ".join(row) for row in reader])
    except Exception as e:
        raise ExtractionError(f"CSV extraction failed: {e}")
extract_async async
extract_async(file_path, config=None)
Source code in textxtract/handlers/csv.py
async def extract_async(
    self, file_path: Path, config: Optional[dict] = None
) -> str:
    import asyncio

    return await asyncio.to_thread(self.extract, file_path, config)

doc

DOC file handler for text extraction.

Classes:

Name Description
DOCHandler

Handler for extracting text from DOC files with fallback options.

Classes

DOCHandler

Bases: FileTypeHandler

Handler for extracting text from DOC files with fallback options.

Methods:

Name Description
extract
extract_async
Source code in textxtract/handlers/doc.py
class DOCHandler(FileTypeHandler):
    """Handler for extracting text from DOC files with fallback options."""

    def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
        # Try antiword first
        try:
            return self._extract_with_antiword(file_path)
        except FileNotFoundError:
            # Try alternative methods if antiword is not available
            return self._extract_with_fallback(file_path, config)
        except Exception as e:
            if isinstance(e, ExtractionError):
                raise
            raise ExtractionError(f"DOC extraction failed: {e}")

    def _extract_with_antiword(self, file_path: Path) -> str:
        """Extract text using antiword command."""
        import subprocess

        try:
            result = subprocess.run(
                ["antiword", str(file_path)],
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                check=True,
                timeout=30,  # Add timeout
            )
            content = result.stdout.decode("utf-8").strip()
            if not content:
                raise ExtractionError("antiword returned empty content")
            return content
        except subprocess.TimeoutExpired:
            raise ExtractionError("antiword extraction timed out")
        except subprocess.CalledProcessError as e:
            error_msg = e.stderr.decode() if e.stderr else str(e)
            # Check if the error is due to missing libreoffice dependency
            if (
                "libreoffice" in error_msg.lower()
                or "no such file or directory" in error_msg.lower()
            ):
                # Trigger fallback by raising FileNotFoundError
                raise FileNotFoundError(
                    "antiword requires libreoffice which is not available"
                )
            raise ExtractionError(f"antiword extraction failed: {error_msg}")

    def _extract_with_fallback(
        self,
        file_path: Path,
        config: Optional[dict] = None,
    ) -> str:
        """Fallback extraction methods when antiword is not available."""

        # Try python-docx (works for some DOC files)
        try:
            from docx import Document

            doc = Document(file_path)
            text = "\n".join(paragraph.text for paragraph in doc.paragraphs)
            if text.strip():
                return text
        except Exception:
            pass  # Silent fail, try next method

        # Try reading as binary and looking for text patterns
        try:
            with open(file_path, "rb") as f:
                content = f.read()

            # Simple heuristic: look for readable text in the binary
            text_content = []
            current_text = []

            for byte in content:
                if 32 <= byte <= 126:  # Printable ASCII
                    current_text.append(chr(byte))
                else:
                    if len(current_text) > 3:  # Minimum word length
                        text_content.append("".join(current_text))
                    current_text = []

            if current_text and len(current_text) > 3:
                text_content.append("".join(current_text))

            result = " ".join(text_content)
            if result.strip():
                return f"[Extracted using fallback method - may contain formatting artifacts]\n{result}"

        except Exception:
            pass

        # If all methods fail
        raise ExtractionError(
            "DOC extraction failed. Please install 'antiword' command for better DOC support: "
            "sudo apt-get install antiword (Ubuntu/Debian) or brew install antiword (macOS)"
        )

    async def extract_async(
        self, file_path: Path, config: Optional[dict] = None
    ) -> str:
        import asyncio

        return await asyncio.to_thread(self.extract, file_path, config)
Functions
extract
extract(file_path, config=None)
Source code in textxtract/handlers/doc.py
def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
    # Try antiword first
    try:
        return self._extract_with_antiword(file_path)
    except FileNotFoundError:
        # Try alternative methods if antiword is not available
        return self._extract_with_fallback(file_path, config)
    except Exception as e:
        if isinstance(e, ExtractionError):
            raise
        raise ExtractionError(f"DOC extraction failed: {e}")
extract_async async
extract_async(file_path, config=None)
Source code in textxtract/handlers/doc.py
async def extract_async(
    self, file_path: Path, config: Optional[dict] = None
) -> str:
    import asyncio

    return await asyncio.to_thread(self.extract, file_path, config)

docx

DOCX file handler for comprehensive text extraction.

This handler extracts text from: - Document paragraphs - Tables and cells - Headers and footers - Text boxes and shapes - Footnotes and endnotes (if available)

Classes:

Name Description
DOCXHandler

Enhanced handler for comprehensive text extraction from DOCX files.

Classes

DOCXHandler

Bases: FileTypeHandler

Enhanced handler for comprehensive text extraction from DOCX files.

This handler provides complete text extraction from Microsoft Word documents,
including all document elements such as paragraphs, tables, headers, footers,
text boxes, and footnotes. It's designed to handle complex document layouts
commonly found in resumes, reports, and structured documents.

Features:
    - Extracts text from document body paragraphs
    - Processes table content with cell-by-cell extraction
    - Captures header and footer text from all sections
    - Attempts to extract text from embedded text boxes and shapes
    - Handles footnotes and endnotes when available
    - Deduplicates repeated content
    - Cleans and normalizes extracted text

Example:
    >>> handler = DOCXHandler()
    >>> text = handler.extract(Path("document.docx"))
    >>> print(text)
    "Document title

Paragraph content... Table data | Column 2..."

    >>> # Async extraction
    >>> text = await handler.extract_async(Path("document.docx"))

Methods:

Name Description
extract

Extract text from a DOCX file with comprehensive content capture.

extract_async

Asynchronously extract text from a DOCX file.

Source code in textxtract/handlers/docx.py
class DOCXHandler(FileTypeHandler):
    """Enhanced handler for comprehensive text extraction from DOCX files.

    This handler provides complete text extraction from Microsoft Word documents,
    including all document elements such as paragraphs, tables, headers, footers,
    text boxes, and footnotes. It's designed to handle complex document layouts
    commonly found in resumes, reports, and structured documents.

    Features:
        - Extracts text from document body paragraphs
        - Processes table content with cell-by-cell extraction
        - Captures header and footer text from all sections
        - Attempts to extract text from embedded text boxes and shapes
        - Handles footnotes and endnotes when available
        - Deduplicates repeated content
        - Cleans and normalizes extracted text

    Example:
        >>> handler = DOCXHandler()
        >>> text = handler.extract(Path("document.docx"))
        >>> print(text)
        "Document title\nParagraph content...\nTable data | Column 2..."

        >>> # Async extraction
        >>> text = await handler.extract_async(Path("document.docx"))
    """

    def _clean_text(self, text: str) -> str:
        """Clean and normalize extracted text.

        Performs various text cleaning operations to improve readability
        and consistency of extracted content.

        Args:
            text (str): Raw text to be cleaned.

        Returns:
            str: Cleaned and normalized text with proper spacing and formatting.

        Note:
            - Normalizes multiple whitespace characters to single spaces
            - Removes excessive consecutive dots/periods
            - Fixes spacing around punctuation marks
            - Strips leading and trailing whitespace
        """
        if not text:
            return ""

        # Normalize whitespace (replace multiple spaces, tabs, newlines with single space)
        text = re.sub(r'\s+', ' ', text)
        # Remove excessive dots/periods (likely formatting artifacts)
        text = re.sub(r'\.{2,}', ' ', text)
        # Clean up spacing around punctuation (remove spaces before punctuation)
        text = re.sub(r'\s+([.!?,:;])', r'\1', text)
        return text.strip()

    def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
        """Extract text from a DOCX file with comprehensive content capture.

        Performs thorough text extraction from all available document elements
        including body text, tables, headers, footers, and embedded content.

        Args:
            file_path (Path): Path to the DOCX file to extract text from.
            config (Optional[dict], optional): Configuration options for extraction.
                Currently not used but reserved for future enhancements.

        Returns:
            str: Extracted and cleaned text from the document with proper formatting.
                Returns empty string if no text is found.

        Raises:
            ExtractionError: If the file cannot be read or processed, or if the
                python-docx library is not available.

        Note:
            - Text is deduplicated to avoid repeated content from overlapping elements
            - Table content is formatted with pipe separators between columns
            - Special content (footnotes, text boxes) is labeled with descriptive tags
            - Sentence breaks are automatically inserted for better readability
        """
        try:
            from docx import Document
            import re

            # Load the document
            doc = Document(file_path)
            text_parts = []
            processed_text = set()  # Track processed text to avoid duplicates

            # Extract text from main document paragraphs
            for paragraph in doc.paragraphs:
                text = paragraph.text.strip()
                if text and text not in processed_text:
                    text_parts.append(text)
                    processed_text.add(text)

            # Extract text from all tables in the document
            for table in doc.tables:
                table_texts = []
                for row in table.rows:
                    row_text = []
                    for cell in row.cells:
                        # Process each paragraph within the cell
                        cell_paragraphs = []
                        for paragraph in cell.paragraphs:
                            text = paragraph.text.strip()
                            if text and text not in processed_text:
                                cell_paragraphs.append(text)
                                processed_text.add(text)
                        if cell_paragraphs:
                            row_text.append(" ".join(cell_paragraphs))
                    if row_text:
                        # Join cell contents with pipe separator for table structure
                        table_texts.append(" | ".join(row_text))

                # Add table content to main text collection
                if table_texts:
                    text_parts.extend(table_texts)

            # Extract text from headers and footers across all document sections
            for section in doc.sections:
                # Process header content
                if section.header:
                    for paragraph in section.header.paragraphs:
                        text = paragraph.text.strip()
                        if text and text not in processed_text:
                            text_parts.append(text)
                            processed_text.add(text)

                # Process footer content
                if section.footer:
                    for paragraph in section.footer.paragraphs:
                        text = paragraph.text.strip()
                        if text and text not in processed_text:
                            text_parts.append(text)
                            processed_text.add(text)

            # Attempt to extract footnotes and endnotes (may not be available in all documents)
            try:
                # Extract footnotes if present
                if hasattr(doc, 'footnotes'):
                    for footnote in doc.footnotes:
                        for paragraph in footnote.paragraphs:
                            text = paragraph.text.strip()
                            if text and text not in processed_text:
                                text_parts.append(f"[Footnote: {text}]")
                                processed_text.add(text)

                # Extract endnotes if present
                if hasattr(doc, 'endnotes'):
                    for endnote in doc.endnotes:
                        for paragraph in endnote.paragraphs:
                            text = paragraph.text.strip()
                            if text and text not in processed_text:
                                text_parts.append(f"[Endnote: {text}]")
                                processed_text.add(text)
            except Exception:
                # Footnote/endnote extraction is optional - continue if it fails
                pass

            # Attempt to extract text from embedded text boxes and shapes using XML parsing
            try:
                from docx.oxml.ns import qn

                # Iterate through document XML elements to find drawing content
                for element in doc.element.body.iter():
                    if element.tag.endswith('}txbxContent'):
                        # Extract text from text box elements
                        for para in element.iter():
                            if para.tag.endswith('}t') and para.text:
                                text = para.text.strip()
                                if text and text not in processed_text:
                                    text_parts.append(f"[TextBox: {text}]")
                                    processed_text.add(text)
            except Exception:
                # Text box extraction is optional - continue if it fails
                pass

            # Process and format the final output
            if text_parts:
                # Clean each text part and filter out empty content
                cleaned_parts = [self._clean_text(part) for part in text_parts if part.strip()]
                result = "\n".join(cleaned_parts)

                # Add proper sentence breaks for improved readability
                result = re.sub(r'([.!?])\s*([A-Z])', r'\1\n\2', result)
                return result.strip()

            return ""

        except Exception as e:
            raise ExtractionError(f"DOCX extraction failed: {e}")

    async def extract_async(
        self, file_path: Path, config: Optional[dict] = None
    ) -> str:
        """Asynchronously extract text from a DOCX file.

        Provides non-blocking text extraction by running the synchronous
        extraction method in a separate thread.

        Args:
            file_path (Path): Path to the DOCX file to extract text from.
            config (Optional[dict], optional): Configuration options for extraction.
                Currently not used but reserved for future enhancements.

        Returns:
            str: Extracted and cleaned text from the document with proper formatting.
                Returns empty string if no text is found.

        Raises:
            ExtractionError: If the file cannot be read or processed, or if the
                python-docx library is not available.

        Note:
            This method uses asyncio.to_thread() to run the synchronous extraction
            in a thread pool, making it suitable for async/await usage patterns.
        """
        import asyncio

        return await asyncio.to_thread(self.extract, file_path, config)
Functions
extract
extract(file_path, config=None)

Extract text from a DOCX file with comprehensive content capture.

Performs thorough text extraction from all available document elements including body text, tables, headers, footers, and embedded content.

Parameters:

Name Type Description Default
file_path Path

Path to the DOCX file to extract text from.

required
config Optional[dict]

Configuration options for extraction. Currently not used but reserved for future enhancements.

None

Returns:

Name Type Description
str str

Extracted and cleaned text from the document with proper formatting. Returns empty string if no text is found.

Raises:

Type Description
ExtractionError

If the file cannot be read or processed, or if the python-docx library is not available.

Note
  • Text is deduplicated to avoid repeated content from overlapping elements
  • Table content is formatted with pipe separators between columns
  • Special content (footnotes, text boxes) is labeled with descriptive tags
  • Sentence breaks are automatically inserted for better readability
Source code in textxtract/handlers/docx.py
def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
    """Extract text from a DOCX file with comprehensive content capture.

    Performs thorough text extraction from all available document elements
    including body text, tables, headers, footers, and embedded content.

    Args:
        file_path (Path): Path to the DOCX file to extract text from.
        config (Optional[dict], optional): Configuration options for extraction.
            Currently not used but reserved for future enhancements.

    Returns:
        str: Extracted and cleaned text from the document with proper formatting.
            Returns empty string if no text is found.

    Raises:
        ExtractionError: If the file cannot be read or processed, or if the
            python-docx library is not available.

    Note:
        - Text is deduplicated to avoid repeated content from overlapping elements
        - Table content is formatted with pipe separators between columns
        - Special content (footnotes, text boxes) is labeled with descriptive tags
        - Sentence breaks are automatically inserted for better readability
    """
    try:
        from docx import Document
        import re

        # Load the document
        doc = Document(file_path)
        text_parts = []
        processed_text = set()  # Track processed text to avoid duplicates

        # Extract text from main document paragraphs
        for paragraph in doc.paragraphs:
            text = paragraph.text.strip()
            if text and text not in processed_text:
                text_parts.append(text)
                processed_text.add(text)

        # Extract text from all tables in the document
        for table in doc.tables:
            table_texts = []
            for row in table.rows:
                row_text = []
                for cell in row.cells:
                    # Process each paragraph within the cell
                    cell_paragraphs = []
                    for paragraph in cell.paragraphs:
                        text = paragraph.text.strip()
                        if text and text not in processed_text:
                            cell_paragraphs.append(text)
                            processed_text.add(text)
                    if cell_paragraphs:
                        row_text.append(" ".join(cell_paragraphs))
                if row_text:
                    # Join cell contents with pipe separator for table structure
                    table_texts.append(" | ".join(row_text))

            # Add table content to main text collection
            if table_texts:
                text_parts.extend(table_texts)

        # Extract text from headers and footers across all document sections
        for section in doc.sections:
            # Process header content
            if section.header:
                for paragraph in section.header.paragraphs:
                    text = paragraph.text.strip()
                    if text and text not in processed_text:
                        text_parts.append(text)
                        processed_text.add(text)

            # Process footer content
            if section.footer:
                for paragraph in section.footer.paragraphs:
                    text = paragraph.text.strip()
                    if text and text not in processed_text:
                        text_parts.append(text)
                        processed_text.add(text)

        # Attempt to extract footnotes and endnotes (may not be available in all documents)
        try:
            # Extract footnotes if present
            if hasattr(doc, 'footnotes'):
                for footnote in doc.footnotes:
                    for paragraph in footnote.paragraphs:
                        text = paragraph.text.strip()
                        if text and text not in processed_text:
                            text_parts.append(f"[Footnote: {text}]")
                            processed_text.add(text)

            # Extract endnotes if present
            if hasattr(doc, 'endnotes'):
                for endnote in doc.endnotes:
                    for paragraph in endnote.paragraphs:
                        text = paragraph.text.strip()
                        if text and text not in processed_text:
                            text_parts.append(f"[Endnote: {text}]")
                            processed_text.add(text)
        except Exception:
            # Footnote/endnote extraction is optional - continue if it fails
            pass

        # Attempt to extract text from embedded text boxes and shapes using XML parsing
        try:
            from docx.oxml.ns import qn

            # Iterate through document XML elements to find drawing content
            for element in doc.element.body.iter():
                if element.tag.endswith('}txbxContent'):
                    # Extract text from text box elements
                    for para in element.iter():
                        if para.tag.endswith('}t') and para.text:
                            text = para.text.strip()
                            if text and text not in processed_text:
                                text_parts.append(f"[TextBox: {text}]")
                                processed_text.add(text)
        except Exception:
            # Text box extraction is optional - continue if it fails
            pass

        # Process and format the final output
        if text_parts:
            # Clean each text part and filter out empty content
            cleaned_parts = [self._clean_text(part) for part in text_parts if part.strip()]
            result = "\n".join(cleaned_parts)

            # Add proper sentence breaks for improved readability
            result = re.sub(r'([.!?])\s*([A-Z])', r'\1\n\2', result)
            return result.strip()

        return ""

    except Exception as e:
        raise ExtractionError(f"DOCX extraction failed: {e}")
extract_async async
extract_async(file_path, config=None)

Asynchronously extract text from a DOCX file.

Provides non-blocking text extraction by running the synchronous extraction method in a separate thread.

Parameters:

Name Type Description Default
file_path Path

Path to the DOCX file to extract text from.

required
config Optional[dict]

Configuration options for extraction. Currently not used but reserved for future enhancements.

None

Returns:

Name Type Description
str str

Extracted and cleaned text from the document with proper formatting. Returns empty string if no text is found.

Raises:

Type Description
ExtractionError

If the file cannot be read or processed, or if the python-docx library is not available.

Note

This method uses asyncio.to_thread() to run the synchronous extraction in a thread pool, making it suitable for async/await usage patterns.

Source code in textxtract/handlers/docx.py
async def extract_async(
    self, file_path: Path, config: Optional[dict] = None
) -> str:
    """Asynchronously extract text from a DOCX file.

    Provides non-blocking text extraction by running the synchronous
    extraction method in a separate thread.

    Args:
        file_path (Path): Path to the DOCX file to extract text from.
        config (Optional[dict], optional): Configuration options for extraction.
            Currently not used but reserved for future enhancements.

    Returns:
        str: Extracted and cleaned text from the document with proper formatting.
            Returns empty string if no text is found.

    Raises:
        ExtractionError: If the file cannot be read or processed, or if the
            python-docx library is not available.

    Note:
        This method uses asyncio.to_thread() to run the synchronous extraction
        in a thread pool, making it suitable for async/await usage patterns.
    """
    import asyncio

    return await asyncio.to_thread(self.extract, file_path, config)

html

HTML file handler for text extraction.

Classes:

Name Description
HTMLHandler

Handler for extracting text from HTML files.

Classes

HTMLHandler

Bases: FileTypeHandler

Handler for extracting text from HTML files.

Methods:

Name Description
extract
extract_async
Source code in textxtract/handlers/html.py
class HTMLHandler(FileTypeHandler):
    """Handler for extracting text from HTML files."""

    def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
        try:
            try:
                from bs4 import BeautifulSoup
            except ImportError:
                raise ExtractionError(
                    "beautifulsoup4 package is not installed. Install with 'pip install text-extractor[html]'"
                )
            text = file_path.read_text(encoding=(config or {}).get("encoding", "utf-8"))
            soup = BeautifulSoup(text, "html.parser")
            return soup.get_text()
        except Exception as e:
            raise ExtractionError(f"HTML extraction failed: {e}")

    async def extract_async(
        self, file_path: Path, config: Optional[dict] = None
    ) -> str:
        import asyncio

        return await asyncio.to_thread(self.extract, file_path, config)
Functions
extract
extract(file_path, config=None)
Source code in textxtract/handlers/html.py
def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
    try:
        try:
            from bs4 import BeautifulSoup
        except ImportError:
            raise ExtractionError(
                "beautifulsoup4 package is not installed. Install with 'pip install text-extractor[html]'"
            )
        text = file_path.read_text(encoding=(config or {}).get("encoding", "utf-8"))
        soup = BeautifulSoup(text, "html.parser")
        return soup.get_text()
    except Exception as e:
        raise ExtractionError(f"HTML extraction failed: {e}")
extract_async async
extract_async(file_path, config=None)
Source code in textxtract/handlers/html.py
async def extract_async(
    self, file_path: Path, config: Optional[dict] = None
) -> str:
    import asyncio

    return await asyncio.to_thread(self.extract, file_path, config)

json

JSON file handler for text extraction.

Classes:

Name Description
JSONHandler

Handler for extracting text from JSON files.

Classes

JSONHandler

Bases: FileTypeHandler

Handler for extracting text from JSON files.

Methods:

Name Description
extract
extract_async
Source code in textxtract/handlers/json.py
class JSONHandler(FileTypeHandler):
    """Handler for extracting text from JSON files."""

    def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
        try:
            encoding = (config or {}).get("encoding", "utf-8")
            with open(file_path, "r", encoding=encoding) as f:
                data = json.load(f)
                # Pretty print JSON as text
                return json.dumps(data, indent=2, ensure_ascii=False)
        except Exception as e:
            raise ExtractionError(f"JSON extraction failed: {e}")

    async def extract_async(
        self, file_path: Path, config: Optional[dict] = None
    ) -> str:
        import asyncio

        return await asyncio.to_thread(self.extract, file_path, config)
Functions
extract
extract(file_path, config=None)
Source code in textxtract/handlers/json.py
def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
    try:
        encoding = (config or {}).get("encoding", "utf-8")
        with open(file_path, "r", encoding=encoding) as f:
            data = json.load(f)
            # Pretty print JSON as text
            return json.dumps(data, indent=2, ensure_ascii=False)
    except Exception as e:
        raise ExtractionError(f"JSON extraction failed: {e}")
extract_async async
extract_async(file_path, config=None)
Source code in textxtract/handlers/json.py
async def extract_async(
    self, file_path: Path, config: Optional[dict] = None
) -> str:
    import asyncio

    return await asyncio.to_thread(self.extract, file_path, config)

md

Markdown (.md) file handler for text extraction.

Classes:

Name Description
MDHandler

Handler for extracting text from Markdown files.

Classes

MDHandler

Bases: FileTypeHandler

Handler for extracting text from Markdown files.

Methods:

Name Description
extract
extract_async
Source code in textxtract/handlers/md.py
class MDHandler(FileTypeHandler):
    """Handler for extracting text from Markdown files."""

    def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
        try:
            try:
                import markdown
            except ImportError:
                raise ExtractionError(
                    "markdown package is not installed. Install with 'pip install text-extractor[md]'"
                )
            text = file_path.read_text(encoding=(config or {}).get("encoding", "utf-8"))
            # Optionally, convert markdown to plain text (strip HTML)
            html = markdown.markdown(text)
            # Remove HTML tags (best effort, fallback to raw text)
            try:
                from bs4 import BeautifulSoup

                soup = BeautifulSoup(html, "html.parser")
                return soup.get_text()
            except ImportError:
                return text
        except Exception as e:
            raise ExtractionError(f"MD extraction failed: {e}")

    async def extract_async(
        self, file_path: Path, config: Optional[dict] = None
    ) -> str:
        import asyncio

        return await asyncio.to_thread(self.extract, file_path, config)
Functions
extract
extract(file_path, config=None)
Source code in textxtract/handlers/md.py
def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
    try:
        try:
            import markdown
        except ImportError:
            raise ExtractionError(
                "markdown package is not installed. Install with 'pip install text-extractor[md]'"
            )
        text = file_path.read_text(encoding=(config or {}).get("encoding", "utf-8"))
        # Optionally, convert markdown to plain text (strip HTML)
        html = markdown.markdown(text)
        # Remove HTML tags (best effort, fallback to raw text)
        try:
            from bs4 import BeautifulSoup

            soup = BeautifulSoup(html, "html.parser")
            return soup.get_text()
        except ImportError:
            return text
    except Exception as e:
        raise ExtractionError(f"MD extraction failed: {e}")
extract_async async
extract_async(file_path, config=None)
Source code in textxtract/handlers/md.py
async def extract_async(
    self, file_path: Path, config: Optional[dict] = None
) -> str:
    import asyncio

    return await asyncio.to_thread(self.extract, file_path, config)

pdf

PDF file handler for text extraction.

Classes:

Name Description
PDFHandler

Handler for extracting text from PDF files with improved error handling.

Classes

PDFHandler

Bases: FileTypeHandler

Handler for extracting text from PDF files with improved error handling.

Methods:

Name Description
extract
extract_async
Source code in textxtract/handlers/pdf.py
class PDFHandler(FileTypeHandler):
    """Handler for extracting text from PDF files with improved error handling."""

    def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
        try:
            try:
                import fitz  # PyMuPDF
            except ImportError:
                raise ExtractionError(
                    "PyMuPDF package is not installed. Install with 'pip install text-extractor[pdf]'"
                )

            doc = fitz.open(file_path)
            extracted_text = []
            empty_pages = 0

            for page_num, page in enumerate(doc):
                page_text = page.get_text("text").strip()
                if not page_text:
                    empty_pages += 1
                    # Try OCR-like text extraction for images
                    page_text = page.get_text("dict")  # Get structured text
                    if page_text and "blocks" in page_text:
                        # Check if page has images but no text
                        has_images = any(
                            block.get("type") == 1
                            for block in page_text.get("blocks", [])
                        )
                        if has_images:
                            extracted_text.append(
                                f"[Page {page_num + 1}: Contains images but no extractable text]"
                            )
                        else:
                            extracted_text.append(f"[Page {page_num + 1}: Empty page]")
                    else:
                        extracted_text.append(f"[Page {page_num + 1}: Empty page]")
                else:
                    extracted_text.append(page_text)

            doc.close()

            # Only raise error if ALL pages are empty and there's no content at all
            if not any(
                text.strip() and not text.startswith("[Page") for text in extracted_text
            ):
                if empty_pages == len(extracted_text):
                    raise InvalidFileError(
                        f"PDF contains {empty_pages} empty pages with no extractable text. "
                        "This may be a scanned PDF that requires OCR."
                    )

            result = "\n".join(extracted_text)
            return result

        except fitz.FileDataError as e:
            raise InvalidFileError(f"Invalid or corrupted PDF file: {e}")
        except fitz.EmptyFileError:
            raise InvalidFileError("PDF file is empty")
        except Exception as e:
            if isinstance(e, (ExtractionError, InvalidFileError)):
                raise
            raise ExtractionError(f"PDF extraction failed: {e}")

    async def extract_async(
        self, file_path: Path, config: Optional[dict] = None
    ) -> str:
        import asyncio

        return await asyncio.to_thread(self.extract, file_path, config)
Functions
extract
extract(file_path, config=None)
Source code in textxtract/handlers/pdf.py
def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
    try:
        try:
            import fitz  # PyMuPDF
        except ImportError:
            raise ExtractionError(
                "PyMuPDF package is not installed. Install with 'pip install text-extractor[pdf]'"
            )

        doc = fitz.open(file_path)
        extracted_text = []
        empty_pages = 0

        for page_num, page in enumerate(doc):
            page_text = page.get_text("text").strip()
            if not page_text:
                empty_pages += 1
                # Try OCR-like text extraction for images
                page_text = page.get_text("dict")  # Get structured text
                if page_text and "blocks" in page_text:
                    # Check if page has images but no text
                    has_images = any(
                        block.get("type") == 1
                        for block in page_text.get("blocks", [])
                    )
                    if has_images:
                        extracted_text.append(
                            f"[Page {page_num + 1}: Contains images but no extractable text]"
                        )
                    else:
                        extracted_text.append(f"[Page {page_num + 1}: Empty page]")
                else:
                    extracted_text.append(f"[Page {page_num + 1}: Empty page]")
            else:
                extracted_text.append(page_text)

        doc.close()

        # Only raise error if ALL pages are empty and there's no content at all
        if not any(
            text.strip() and not text.startswith("[Page") for text in extracted_text
        ):
            if empty_pages == len(extracted_text):
                raise InvalidFileError(
                    f"PDF contains {empty_pages} empty pages with no extractable text. "
                    "This may be a scanned PDF that requires OCR."
                )

        result = "\n".join(extracted_text)
        return result

    except fitz.FileDataError as e:
        raise InvalidFileError(f"Invalid or corrupted PDF file: {e}")
    except fitz.EmptyFileError:
        raise InvalidFileError("PDF file is empty")
    except Exception as e:
        if isinstance(e, (ExtractionError, InvalidFileError)):
            raise
        raise ExtractionError(f"PDF extraction failed: {e}")
extract_async async
extract_async(file_path, config=None)
Source code in textxtract/handlers/pdf.py
async def extract_async(
    self, file_path: Path, config: Optional[dict] = None
) -> str:
    import asyncio

    return await asyncio.to_thread(self.extract, file_path, config)

rtf

RTF file handler for text extraction.

Classes:

Name Description
RTFHandler

Handler for extracting text from RTF files.

Classes

RTFHandler

Bases: FileTypeHandler

Handler for extracting text from RTF files.

Methods:

Name Description
extract
extract_async
Source code in textxtract/handlers/rtf.py
class RTFHandler(FileTypeHandler):
    """Handler for extracting text from RTF files."""

    def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
        try:
            try:
                from striprtf.striprtf import rtf_to_text
            except ImportError:
                raise ExtractionError(
                    "striprtf package is not installed. Install with 'pip install text-extractor[rtf]'"
                )

            with open(
                file_path, "r", encoding=(config or {}).get("encoding", "utf-8")
            ) as f:
                rtf_content = f.read()
                return rtf_to_text(rtf_content)
        except Exception as e:
            raise ExtractionError(f"RTF extraction failed: {e}")

    async def extract_async(
        self, file_path: Path, config: Optional[dict] = None
    ) -> str:
        import asyncio

        return await asyncio.to_thread(self.extract, file_path, config)
Functions
extract
extract(file_path, config=None)
Source code in textxtract/handlers/rtf.py
def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
    try:
        try:
            from striprtf.striprtf import rtf_to_text
        except ImportError:
            raise ExtractionError(
                "striprtf package is not installed. Install with 'pip install text-extractor[rtf]'"
            )

        with open(
            file_path, "r", encoding=(config or {}).get("encoding", "utf-8")
        ) as f:
            rtf_content = f.read()
            return rtf_to_text(rtf_content)
    except Exception as e:
        raise ExtractionError(f"RTF extraction failed: {e}")
extract_async async
extract_async(file_path, config=None)
Source code in textxtract/handlers/rtf.py
async def extract_async(
    self, file_path: Path, config: Optional[dict] = None
) -> str:
    import asyncio

    return await asyncio.to_thread(self.extract, file_path, config)

txt

TXT file handler for text extraction.

Classes:

Name Description
TXTHandler

Handler for extracting text from TXT files.

Classes

TXTHandler

Bases: FileTypeHandler

Handler for extracting text from TXT files.

Methods:

Name Description
extract
extract_async
Source code in textxtract/handlers/txt.py
class TXTHandler(FileTypeHandler):
    """Handler for extracting text from TXT files."""

    def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
        encoding = (config or {}).get("encoding", "utf-8")
        try:
            return file_path.read_text(encoding=encoding)
        except Exception as e:
            raise ExtractionError(f"TXT extraction failed: {e}")

    async def extract_async(
        self, file_path: Path, config: Optional[dict] = None
    ) -> str:
        import asyncio

        return await asyncio.to_thread(self.extract, file_path, config)
Functions
extract
extract(file_path, config=None)
Source code in textxtract/handlers/txt.py
def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
    encoding = (config or {}).get("encoding", "utf-8")
    try:
        return file_path.read_text(encoding=encoding)
    except Exception as e:
        raise ExtractionError(f"TXT extraction failed: {e}")
extract_async async
extract_async(file_path, config=None)
Source code in textxtract/handlers/txt.py
async def extract_async(
    self, file_path: Path, config: Optional[dict] = None
) -> str:
    import asyncio

    return await asyncio.to_thread(self.extract, file_path, config)

xml

XML file handler for text extraction.

Classes:

Name Description
XMLHandler

Handler for extracting text from XML files.

Classes

XMLHandler

Bases: FileTypeHandler

Handler for extracting text from XML files.

Methods:

Name Description
extract
extract_async
Source code in textxtract/handlers/xml.py
class XMLHandler(FileTypeHandler):
    """Handler for extracting text from XML files."""

    def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
        try:
            try:
                from lxml import etree
            except ImportError:
                raise ExtractionError(
                    "lxml package is not installed. Install with 'pip install text-extractor[xml]'"
                )
            encoding = (config or {}).get("encoding", "utf-8")
            with open(file_path, "r", encoding=encoding) as f:
                tree = etree.parse(f)
                return " ".join(tree.xpath("//text()"))
        except Exception as e:
            raise ExtractionError(f"XML extraction failed: {e}")

    async def extract_async(
        self, file_path: Path, config: Optional[dict] = None
    ) -> str:
        import asyncio

        return await asyncio.to_thread(self.extract, file_path, config)
Functions
extract
extract(file_path, config=None)
Source code in textxtract/handlers/xml.py
def extract(self, file_path: Path, config: Optional[dict] = None) -> str:
    try:
        try:
            from lxml import etree
        except ImportError:
            raise ExtractionError(
                "lxml package is not installed. Install with 'pip install text-extractor[xml]'"
            )
        encoding = (config or {}).get("encoding", "utf-8")
        with open(file_path, "r", encoding=encoding) as f:
            tree = etree.parse(f)
            return " ".join(tree.xpath("//text()"))
    except Exception as e:
        raise ExtractionError(f"XML extraction failed: {e}")
extract_async async
extract_async(file_path, config=None)
Source code in textxtract/handlers/xml.py
async def extract_async(
    self, file_path: Path, config: Optional[dict] = None
) -> str:
    import asyncio

    return await asyncio.to_thread(self.extract, file_path, config)

zip

ZIP file handler for text extraction.

Classes:

Name Description
ZIPHandler

Handler for extracting text from ZIP archives with security checks.

Attributes:

Name Type Description
logger

Attributes

logger module-attribute
logger = getLogger('textxtract.handlers.zip')

Classes

ZIPHandler

Bases: FileTypeHandler

Handler for extracting text from ZIP archives with security checks.

Methods:

Name Description
extract
extract_async

Attributes:

Name Type Description
MAX_EXTRACT_SIZE
MAX_FILES
Source code in textxtract/handlers/zip.py
class ZIPHandler(FileTypeHandler):
    """Handler for extracting text from ZIP archives with security checks."""

    MAX_EXTRACT_SIZE = 1024 * 1024 * 1024  # 1GB total
    MAX_FILES = 1000  # Maximum files to process

    def extract(self, file_path: Path, config: Optional[dict] = None) -> List[str]:
        extracted_texts = []
        total_size = 0
        file_count = 0

        try:
            with zipfile.ZipFile(file_path, "r") as zip_file:
                for file_info in zip_file.infolist():
                    if file_info.is_dir():
                        continue

                    # Security checks
                    if file_count >= self.MAX_FILES:
                        logger.warning("Maximum file limit reached in ZIP archive")
                        break

                    # Check for path traversal
                    if self._is_unsafe_path(file_info.filename):
                        logger.warning("Skipping unsafe path: %s", file_info.filename)
                        continue

                    # Check file size
                    if file_info.file_size > 100 * 1024 * 1024:  # 100MB per file
                        logger.warning(
                            "Skipping large file: %s (%d bytes)",
                            file_info.filename,
                            file_info.file_size,
                        )
                        continue

                    total_size += file_info.file_size
                    if total_size > self.MAX_EXTRACT_SIZE:
                        logger.warning("Total extract size limit reached")
                        break

                    file_count += 1

                    try:
                        with zip_file.open(file_info.filename) as source_file:
                            file_bytes = source_file.read()
                            suffix = Path(file_info.filename).suffix.lower()

                            # Use registry to get handler
                            from textxtract.core.registry import registry

                            if registry.is_supported(suffix):
                                handler = registry.get_handler(suffix)
                                with tempfile.NamedTemporaryFile(
                                    delete=False, suffix=suffix
                                ) as temp_file:
                                    temp_file.write(file_bytes)
                                    temp_path = Path(temp_file.name)
                                try:
                                    text = handler.extract(temp_path, config)
                                    extracted_texts.append(text)
                                    logger.debug(
                                        "Extracted text from %s", file_info.filename
                                    )
                                except Exception as e:
                                    logger.warning(
                                        "Failed to extract text from %s: %s",
                                        file_info.filename,
                                        e,
                                    )
                                finally:
                                    temp_path.unlink(missing_ok=True)
                            else:
                                logger.debug(
                                    "Unsupported file type: %s", file_info.filename
                                )

                    except Exception as e:
                        logger.warning(
                            "Error processing file %s: %s", file_info.filename, e
                        )
                        continue

            logger.info(
                "Extracted text from %d files in ZIP archive", len(extracted_texts)
            )
            return extracted_texts

        except Exception as e:
            raise ExtractionError(f"ZIP extraction failed: {e}")

    def _is_unsafe_path(self, path: str) -> bool:
        """Check if a path contains unsafe elements."""
        # Normalize path separators
        normalized = path.replace("\\", "/")

        # Check for path traversal attempts
        if ".." in normalized or normalized.startswith("/"):
            return True

        # Check for absolute paths on Windows
        if len(normalized) > 1 and normalized[1] == ":":
            return True

        return False

    async def extract_async(
        self, file_path: Path, config: Optional[dict] = None
    ) -> List[str]:
        import asyncio

        return await asyncio.to_thread(self.extract, file_path, config)
Attributes
MAX_EXTRACT_SIZE class-attribute instance-attribute
MAX_EXTRACT_SIZE = 1024 * 1024 * 1024
MAX_FILES class-attribute instance-attribute
MAX_FILES = 1000
Functions
extract
extract(file_path, config=None)
Source code in textxtract/handlers/zip.py
def extract(self, file_path: Path, config: Optional[dict] = None) -> List[str]:
    extracted_texts = []
    total_size = 0
    file_count = 0

    try:
        with zipfile.ZipFile(file_path, "r") as zip_file:
            for file_info in zip_file.infolist():
                if file_info.is_dir():
                    continue

                # Security checks
                if file_count >= self.MAX_FILES:
                    logger.warning("Maximum file limit reached in ZIP archive")
                    break

                # Check for path traversal
                if self._is_unsafe_path(file_info.filename):
                    logger.warning("Skipping unsafe path: %s", file_info.filename)
                    continue

                # Check file size
                if file_info.file_size > 100 * 1024 * 1024:  # 100MB per file
                    logger.warning(
                        "Skipping large file: %s (%d bytes)",
                        file_info.filename,
                        file_info.file_size,
                    )
                    continue

                total_size += file_info.file_size
                if total_size > self.MAX_EXTRACT_SIZE:
                    logger.warning("Total extract size limit reached")
                    break

                file_count += 1

                try:
                    with zip_file.open(file_info.filename) as source_file:
                        file_bytes = source_file.read()
                        suffix = Path(file_info.filename).suffix.lower()

                        # Use registry to get handler
                        from textxtract.core.registry import registry

                        if registry.is_supported(suffix):
                            handler = registry.get_handler(suffix)
                            with tempfile.NamedTemporaryFile(
                                delete=False, suffix=suffix
                            ) as temp_file:
                                temp_file.write(file_bytes)
                                temp_path = Path(temp_file.name)
                            try:
                                text = handler.extract(temp_path, config)
                                extracted_texts.append(text)
                                logger.debug(
                                    "Extracted text from %s", file_info.filename
                                )
                            except Exception as e:
                                logger.warning(
                                    "Failed to extract text from %s: %s",
                                    file_info.filename,
                                    e,
                                )
                            finally:
                                temp_path.unlink(missing_ok=True)
                        else:
                            logger.debug(
                                "Unsupported file type: %s", file_info.filename
                            )

                except Exception as e:
                    logger.warning(
                        "Error processing file %s: %s", file_info.filename, e
                    )
                    continue

        logger.info(
            "Extracted text from %d files in ZIP archive", len(extracted_texts)
        )
        return extracted_texts

    except Exception as e:
        raise ExtractionError(f"ZIP extraction failed: {e}")
extract_async async
extract_async(file_path, config=None)
Source code in textxtract/handlers/zip.py
async def extract_async(
    self, file_path: Path, config: Optional[dict] = None
) -> List[str]:
    import asyncio

    return await asyncio.to_thread(self.extract, file_path, config)