mirror of
https://github.com/paperless-ngx/paperless-ngx.git
synced 2026-03-20 07:55:57 +00:00
Refactor RasterisedDocumentParser to ParserProtocol interface
- Add RasterisedDocumentParser to registry.register_defaults() - Update parser class: remove DocumentParser inheritance, add Protocol class attrs/classmethods/properties, context-manager lifecycle - Add read_file_handle_unicode_errors() to shared parsers/utils.py - Replace inline unicode-error-handling with shared utility call Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -195,6 +195,7 @@ class ParserRegistry:
|
||||
"""
|
||||
from paperless.parsers.mail import MailDocumentParser
|
||||
from paperless.parsers.remote import RemoteDocumentParser
|
||||
from paperless.parsers.tesseract import RasterisedDocumentParser
|
||||
from paperless.parsers.text import TextDocumentParser
|
||||
from paperless.parsers.tika import TikaDocumentParser
|
||||
|
||||
@@ -202,6 +203,7 @@ class ParserRegistry:
|
||||
self.register_builtin(RemoteDocumentParser)
|
||||
self.register_builtin(TikaDocumentParser)
|
||||
self.register_builtin(MailDocumentParser)
|
||||
self.register_builtin(RasterisedDocumentParser)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Discovery
|
||||
|
||||
@@ -1,13 +1,17 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING
|
||||
from typing import Self
|
||||
|
||||
from django.conf import settings
|
||||
from PIL import Image
|
||||
|
||||
from documents.parsers import DocumentParser
|
||||
from documents.parsers import ParseError
|
||||
from documents.parsers import make_thumbnail_from_pdf
|
||||
from documents.utils import maybe_override_pixel_limit
|
||||
@@ -16,6 +20,28 @@ from paperless.config import OcrConfig
|
||||
from paperless.models import ArchiveFileChoices
|
||||
from paperless.models import CleanChoices
|
||||
from paperless.models import ModeChoices
|
||||
from paperless.parsers.utils import read_file_handle_unicode_errors
|
||||
from paperless.version import __full_version_str__
|
||||
|
||||
if TYPE_CHECKING:
|
||||
import datetime
|
||||
from types import TracebackType
|
||||
|
||||
from paperless.parsers import MetadataEntry
|
||||
from paperless.parsers import ParserContext
|
||||
|
||||
logger = logging.getLogger("paperless.parsing.tesseract")
|
||||
|
||||
_SUPPORTED_MIME_TYPES: dict[str, str] = {
|
||||
"application/pdf": ".pdf",
|
||||
"image/jpeg": ".jpg",
|
||||
"image/png": ".png",
|
||||
"image/tiff": ".tif",
|
||||
"image/gif": ".gif",
|
||||
"image/bmp": ".bmp",
|
||||
"image/webp": ".webp",
|
||||
"image/heic": ".heic",
|
||||
}
|
||||
|
||||
|
||||
class NoTextFoundException(Exception):
|
||||
@@ -26,79 +52,123 @@ class RtlLanguageException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class RasterisedDocumentParser(DocumentParser):
|
||||
class RasterisedDocumentParser:
|
||||
"""
|
||||
This parser uses Tesseract to try and get some text out of a rasterised
|
||||
image, whether it's a PDF, or other graphical format (JPEG, TIFF, etc.)
|
||||
"""
|
||||
|
||||
logging_name = "paperless.parsing.tesseract"
|
||||
name: str = "Paperless-ngx Tesseract OCR Parser"
|
||||
version: str = __full_version_str__
|
||||
author: str = "Paperless-ngx Contributors"
|
||||
url: str = "https://github.com/paperless-ngx/paperless-ngx"
|
||||
|
||||
def get_settings(self) -> OcrConfig:
|
||||
"""
|
||||
This parser uses the OCR configuration settings to parse documents
|
||||
"""
|
||||
return OcrConfig()
|
||||
# ------------------------------------------------------------------
|
||||
# Class methods
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def get_page_count(self, document_path, mime_type):
|
||||
page_count = None
|
||||
if mime_type == "application/pdf":
|
||||
try:
|
||||
import pikepdf
|
||||
@classmethod
|
||||
def supported_mime_types(cls) -> dict[str, str]:
|
||||
return _SUPPORTED_MIME_TYPES
|
||||
|
||||
with pikepdf.Pdf.open(document_path) as pdf:
|
||||
page_count = len(pdf.pages)
|
||||
except Exception as e:
|
||||
self.log.warning(
|
||||
f"Unable to determine PDF page count {document_path}: {e}",
|
||||
)
|
||||
return page_count
|
||||
@classmethod
|
||||
def score(
|
||||
cls,
|
||||
mime_type: str,
|
||||
filename: str,
|
||||
path: Path | None = None,
|
||||
) -> int | None:
|
||||
if mime_type in _SUPPORTED_MIME_TYPES:
|
||||
return 10
|
||||
return None
|
||||
|
||||
def extract_metadata(self, document_path, mime_type):
|
||||
result = []
|
||||
if mime_type == "application/pdf":
|
||||
import pikepdf
|
||||
# ------------------------------------------------------------------
|
||||
# Properties
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
namespace_pattern = re.compile(r"\{(.*)\}(.*)")
|
||||
@property
|
||||
def can_produce_archive(self) -> bool:
|
||||
return True
|
||||
|
||||
pdf = pikepdf.open(document_path)
|
||||
meta = pdf.open_metadata()
|
||||
for key, value in meta.items():
|
||||
if isinstance(value, list):
|
||||
value = " ".join([str(e) for e in value])
|
||||
value = str(value)
|
||||
try:
|
||||
m = namespace_pattern.match(key)
|
||||
if m is None: # pragma: no cover
|
||||
continue
|
||||
namespace = m.group(1)
|
||||
key_value = m.group(2)
|
||||
try:
|
||||
namespace.encode("utf-8")
|
||||
key_value.encode("utf-8")
|
||||
except UnicodeEncodeError as e: # pragma: no cover
|
||||
self.log.debug(f"Skipping metadata key {key}: {e}")
|
||||
continue
|
||||
result.append(
|
||||
{
|
||||
"namespace": namespace,
|
||||
"prefix": meta.REVERSE_NS[namespace],
|
||||
"key": key_value,
|
||||
"value": value,
|
||||
},
|
||||
)
|
||||
except Exception as e:
|
||||
self.log.warning(
|
||||
f"Error while reading metadata {key}: {value}. Error: {e}",
|
||||
)
|
||||
return result
|
||||
@property
|
||||
def requires_pdf_rendition(self) -> bool:
|
||||
return False
|
||||
|
||||
def get_thumbnail(self, document_path, mime_type, file_name=None):
|
||||
return make_thumbnail_from_pdf(
|
||||
self.archive_path or document_path,
|
||||
self.tempdir,
|
||||
self.logging_group,
|
||||
# ------------------------------------------------------------------
|
||||
# Lifecycle
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def __init__(self, logging_group: object = None) -> None:
|
||||
settings.SCRATCH_DIR.mkdir(parents=True, exist_ok=True)
|
||||
self.tempdir = Path(
|
||||
tempfile.mkdtemp(prefix="paperless-", dir=settings.SCRATCH_DIR),
|
||||
)
|
||||
self.settings = OcrConfig()
|
||||
self.archive_path: Path | None = None
|
||||
self.text: str | None = None
|
||||
self.date: datetime.datetime | None = None
|
||||
self.log = logger
|
||||
|
||||
def __enter__(self) -> Self:
|
||||
return self
|
||||
|
||||
def __exit__(
|
||||
self,
|
||||
exc_type: type[BaseException] | None,
|
||||
exc_val: BaseException | None,
|
||||
exc_tb: TracebackType | None,
|
||||
) -> None:
|
||||
logger.debug("Cleaning up temporary directory %s", self.tempdir)
|
||||
shutil.rmtree(self.tempdir, ignore_errors=True)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Core parsing interface
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def configure(self, context: ParserContext) -> None:
|
||||
pass
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Result accessors
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def get_text(self) -> str | None:
|
||||
return self.text
|
||||
|
||||
def get_date(self) -> datetime.datetime | None:
|
||||
return self.date
|
||||
|
||||
def get_archive_path(self) -> Path | None:
|
||||
return self.archive_path
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Thumbnail, page count, and metadata
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def get_thumbnail(self, document_path: Path, mime_type: str) -> Path:
|
||||
return make_thumbnail_from_pdf(
|
||||
self.archive_path or Path(document_path),
|
||||
self.tempdir,
|
||||
)
|
||||
|
||||
def get_page_count(self, document_path: Path, mime_type: str) -> int | None:
|
||||
if mime_type == "application/pdf":
|
||||
from paperless.parsers.utils import get_page_count_for_pdf
|
||||
|
||||
return get_page_count_for_pdf(Path(document_path), log=self.log)
|
||||
return None
|
||||
|
||||
def extract_metadata(
|
||||
self,
|
||||
document_path: Path,
|
||||
mime_type: str,
|
||||
) -> list[MetadataEntry]:
|
||||
if mime_type != "application/pdf":
|
||||
return []
|
||||
|
||||
from paperless.parsers.utils import extract_pdf_metadata
|
||||
|
||||
return extract_pdf_metadata(Path(document_path), log=self.log)
|
||||
|
||||
def is_image(self, mime_type) -> bool:
|
||||
return mime_type in [
|
||||
@@ -163,7 +233,7 @@ class RasterisedDocumentParser(DocumentParser):
|
||||
and sidecar_file.is_file()
|
||||
and self.settings.mode != "redo"
|
||||
):
|
||||
text = self.read_file_handle_unicode_errors(sidecar_file)
|
||||
text = read_file_handle_unicode_errors(sidecar_file)
|
||||
|
||||
if "[OCR skipped on page" not in text:
|
||||
# This happens when there's already text in the input file.
|
||||
@@ -196,7 +266,7 @@ class RasterisedDocumentParser(DocumentParser):
|
||||
],
|
||||
logger=self.log,
|
||||
)
|
||||
text = self.read_file_handle_unicode_errors(Path(tmp.name))
|
||||
text = read_file_handle_unicode_errors(Path(tmp.name))
|
||||
|
||||
return post_process_text(text)
|
||||
|
||||
@@ -218,8 +288,6 @@ class RasterisedDocumentParser(DocumentParser):
|
||||
*,
|
||||
safe_fallback=False,
|
||||
):
|
||||
if TYPE_CHECKING:
|
||||
assert isinstance(self.settings, OcrConfig)
|
||||
ocrmypdf_args = {
|
||||
"input_file_or_options": input_file,
|
||||
"output_file": output_file,
|
||||
@@ -330,7 +398,13 @@ class RasterisedDocumentParser(DocumentParser):
|
||||
|
||||
return ocrmypdf_args
|
||||
|
||||
def parse(self, document_path: Path, mime_type, file_name=None) -> None:
|
||||
def parse(
|
||||
self,
|
||||
document_path: Path,
|
||||
mime_type: str,
|
||||
*,
|
||||
produce_archive: bool = True,
|
||||
) -> None:
|
||||
# This forces tesseract to use one core per page.
|
||||
os.environ["OMP_THREAD_LIMIT"] = "1"
|
||||
VALID_TEXT_LENGTH = 50
|
||||
|
||||
@@ -20,6 +20,34 @@ if TYPE_CHECKING:
|
||||
logger = logging.getLogger("paperless.parsers.utils")
|
||||
|
||||
|
||||
def read_file_handle_unicode_errors(
|
||||
filepath: Path,
|
||||
log: logging.Logger | None = None,
|
||||
) -> str:
|
||||
"""Read a file as UTF-8 text, replacing invalid bytes rather than raising.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
filepath:
|
||||
Absolute path to the file to read.
|
||||
log:
|
||||
Logger to use for warnings. Falls back to the module-level logger
|
||||
when omitted.
|
||||
|
||||
Returns
|
||||
-------
|
||||
str
|
||||
File content as a string, with any invalid UTF-8 sequences replaced
|
||||
by the Unicode replacement character.
|
||||
"""
|
||||
_log = log or logger
|
||||
try:
|
||||
return filepath.read_text(encoding="utf-8")
|
||||
except UnicodeDecodeError as e:
|
||||
_log.warning("Unicode error during text reading, continuing: %s", e)
|
||||
return filepath.read_bytes().decode("utf-8", errors="replace")
|
||||
|
||||
|
||||
def get_page_count_for_pdf(
|
||||
document_path: Path,
|
||||
log: logging.Logger | None = None,
|
||||
|
||||
Reference in New Issue
Block a user