From b5773c6b4a81f5bb8ee778a9650995ebca8f82bd Mon Sep 17 00:00:00 2001 From: Sean Whalen Date: Wed, 24 Dec 2025 14:37:38 -0500 Subject: [PATCH] Fix etree import to type checkers don't complain --- parsedmarc/__init__.py | 49 ++++++++++++++++++++++++++++++------------ 1 file changed, 35 insertions(+), 14 deletions(-) diff --git a/parsedmarc/__init__.py b/parsedmarc/__init__.py index 8851f70..b65d10d 100644 --- a/parsedmarc/__init__.py +++ b/parsedmarc/__init__.py @@ -4,7 +4,19 @@ from __future__ import annotations -from typing import Dict, List, Any, Union, Optional, IO, Callable +from typing import ( + Dict, + List, + Any, + Union, + Optional, + IO, + Callable, + BinaryIO, + Protocol, + runtime_checkable, + cast, +) import binascii import email @@ -27,7 +39,7 @@ from io import BytesIO, StringIO import mailparser import xmltodict from expiringdict import ExpiringDict -from lxml import etree +import lxml.etree as etree from mailsuite.smtp import send_email from parsedmarc.log import logger @@ -847,7 +859,14 @@ def parse_aggregate_report_xml( raise InvalidAggregateReport("Unexpected error: {0}".format(error.__str__())) -def extract_report(content: Union[bytes, str, IO[Any]]) -> str: +@runtime_checkable +class _ReadableSeekable(Protocol): + def read(self, n: int = -1) -> bytes: ... + def seek(self, offset: int, whence: int = 0) -> int: ... + def tell(self) -> int: ... + + +def extract_report(content: Union[bytes, bytearray, memoryview, str, BinaryIO]) -> str: """ Extracts text from a zip or gzip file, as a base64-encoded string, file-like object, or bytes. @@ -860,19 +879,22 @@ def extract_report(content: Union[bytes, str, IO[Any]]) -> str: str: The extracted text """ - file_object = None + file_object: Optional[_ReadableSeekable] = None try: if isinstance(content, str): try: file_object = BytesIO(b64decode(content)) except binascii.Error: return content - elif type(content) is bytes: - file_object = BytesIO(content) + elif isinstance(content, (bytes, bytearray, memoryview)): + file_object = BytesIO(bytes(content)) else: - file_object = content + file_object = cast(_ReadableSeekable, content) header = file_object.read(6) + if isinstance(header, str): + raise ParserError("File objects must be opened in binary (rb) mode") + file_object.seek(0) if header.startswith(MAGIC_ZIP): _zip = zipfile.ZipFile(file_object) @@ -884,19 +906,18 @@ def extract_report(content: Union[bytes, str, IO[Any]]) -> str: elif header.startswith(MAGIC_XML) or header.startswith(MAGIC_JSON): report = file_object.read().decode(errors="ignore") else: - file_object.close() raise ParserError("Not a valid zip, gzip, json, or xml file") - file_object.close() - except UnicodeDecodeError: - if file_object: - file_object.close() raise ParserError("File objects must be opened in binary (rb) mode") except Exception as error: - if file_object: - file_object.close() raise ParserError("Invalid archive file: {0}".format(error.__str__())) + finally: + if file_object and hasattr(file_object, "close"): + try: + file_object.close() + except Exception: + pass return report