diff --git a/src/paperless/parsers/text.py b/src/paperless/parsers/text.py index 301c67149..43cb0020a 100644 --- a/src/paperless/parsers/text.py +++ b/src/paperless/parsers/text.py @@ -20,6 +20,7 @@ from PIL import Image from PIL import ImageDraw from PIL import ImageFont +from paperless.parsers.utils import read_file_handle_unicode_errors from paperless.version import __full_version_str__ if TYPE_CHECKING: @@ -183,7 +184,7 @@ class TextDocumentParser: documents.parsers.ParseError If the file cannot be read. """ - self._text = self._read_text(document_path) + self._text = read_file_handle_unicode_errors(document_path, log=logger) # ------------------------------------------------------------------ # Result accessors @@ -295,30 +296,3 @@ class TextDocumentParser: Always ``[]`` — plain text files carry no structured metadata. """ return [] - - # ------------------------------------------------------------------ - # Private helpers - # ------------------------------------------------------------------ - - def _read_text(self, filepath: Path) -> str: - """Read file content, replacing invalid UTF-8 bytes rather than failing. - - Parameters - ---------- - filepath: - Path to the file to read. - - Returns - ------- - str - File content as a string. - """ - try: - return filepath.read_text(encoding="utf-8") - except UnicodeDecodeError as exc: - logger.warning( - "Unicode error reading %s, replacing bad bytes: %s", - filepath, - exc, - ) - return filepath.read_bytes().decode("utf-8", errors="replace") diff --git a/src/paperless/parsers/utils.py b/src/paperless/parsers/utils.py index 8cc4630bf..0257ab736 100644 --- a/src/paperless/parsers/utils.py +++ b/src/paperless/parsers/utils.py @@ -8,6 +8,7 @@ share implementation. from __future__ import annotations +import codecs import logging import re import tempfile @@ -114,7 +115,7 @@ def read_file_handle_unicode_errors( filepath: Path, log: logging.Logger | None = None, ) -> str: - """Read a file as UTF-8 text, replacing invalid bytes rather than raising. + """Read a file as text, detecting encoding via BOM and stripping NUL bytes. Parameters ---------- @@ -127,15 +128,27 @@ def read_file_handle_unicode_errors( Returns ------- str - File content as a string, with any invalid UTF-8 sequences replaced - by the Unicode replacement character. + File content as a string, with NUL bytes removed so the result is + safe to store in PostgreSQL text fields. """ _log = log or logger + raw = filepath.read_bytes() + + if raw.startswith((codecs.BOM_UTF16_LE, codecs.BOM_UTF16_BE)): + encoding = "utf-16" + elif raw.startswith(codecs.BOM_UTF8): + encoding = "utf-8-sig" + else: + encoding = "utf-8" + try: - return filepath.read_text(encoding="utf-8") + text = raw.decode(encoding) except UnicodeDecodeError as e: _log.warning("Unicode error during text reading, continuing: %s", e) - return filepath.read_bytes().decode("utf-8", errors="replace") + text = raw.decode("utf-8", errors="replace") + + # PostgreSQL rejects NUL (0x00) bytes in text fields + return text.replace("\x00", "") def get_page_count_for_pdf( diff --git a/src/paperless/tests/test_parser_utils.py b/src/paperless/tests/test_parser_utils.py index ca6d9e6fe..c6bb3e34a 100644 --- a/src/paperless/tests/test_parser_utils.py +++ b/src/paperless/tests/test_parser_utils.py @@ -2,13 +2,50 @@ from __future__ import annotations +import codecs from pathlib import Path from paperless.parsers.utils import is_tagged_pdf +from paperless.parsers.utils import read_file_handle_unicode_errors SAMPLES = Path(__file__).parent / "samples" / "tesseract" +class TestReadFileHandleUnicodeErrors: + def test_plain_utf8(self, tmp_path: Path) -> None: + f = tmp_path / "plain.txt" + f.write_bytes(b"hello world") + assert read_file_handle_unicode_errors(f) == "hello world" + + def test_utf8_bom(self, tmp_path: Path) -> None: + f = tmp_path / "bom.txt" + f.write_bytes(codecs.BOM_UTF8 + b"hello") + assert read_file_handle_unicode_errors(f) == "hello" + + def test_utf16_le(self, tmp_path: Path) -> None: + f = tmp_path / "utf16le.txt" + f.write_bytes(codecs.BOM_UTF16_LE + "hello".encode("utf-16-le")) + assert read_file_handle_unicode_errors(f) == "hello" + + def test_utf16_be(self, tmp_path: Path) -> None: + f = tmp_path / "utf16be.txt" + f.write_bytes(codecs.BOM_UTF16_BE + "hello".encode("utf-16-be")) + assert read_file_handle_unicode_errors(f) == "hello" + + def test_nul_bytes_stripped(self, tmp_path: Path) -> None: + f = tmp_path / "null-bytes.txt" + f.write_bytes(b"foo\x00bar") + assert read_file_handle_unicode_errors(f) == "foobar" + + def test_invalid_utf8_replaced(self, tmp_path: Path) -> None: + f = tmp_path / "bad.txt" + f.write_bytes(b"ok\x80\x81bad") + result = read_file_handle_unicode_errors(f) + assert "ok" in result + assert "bad" in result + assert "\x00" not in result + + class TestIsTaggedPdf: def test_tagged_pdf_returns_true(self) -> None: assert is_tagged_pdf(SAMPLES / "simple-digital.pdf") is True