diff --git a/parsedmarc/resources/maps/find_bad_utf8.py b/parsedmarc/resources/maps/find_bad_utf8.py index 0852ad9..90ddb0e 100755 --- a/parsedmarc/resources/maps/find_bad_utf8.py +++ b/parsedmarc/resources/maps/find_bad_utf8.py @@ -17,7 +17,10 @@ Generated by GPT-5 Use at your own risk. # UTF-8 scanning # ------------------------- -def scan_line_for_utf8_errors(line_bytes: bytes, line_no: int, base_offset: int, context: int): + +def scan_line_for_utf8_errors( + line_bytes: bytes, line_no: int, base_offset: int, context: int +): """ Scan one line of raw bytes for UTF-8 decoding errors. Returns a list of dicts describing each error. @@ -37,17 +40,19 @@ def scan_line_for_utf8_errors(line_bytes: bytes, line_no: int, base_offset: int, start_ctx = max(0, abs_index_in_line - context) end_ctx = min(len(line_bytes), abs_index_in_line + 1 + context) ctx_bytes = line_bytes[start_ctx:end_ctx] - bad_byte = line_bytes[abs_index_in_line:abs_index_in_line+1] + bad_byte = line_bytes[abs_index_in_line : abs_index_in_line + 1] col = abs_index_in_line + 1 # 1-based byte column - results.append({ - "line": line_no, - "column": col, - "abs_offset": abs_offset, - "bad_byte_hex": bad_byte.hex(), - "context_hex": ctx_bytes.hex(), - "context_preview": ctx_bytes.decode("utf-8", errors="replace"), - }) + results.append( + { + "line": line_no, + "column": col, + "abs_offset": abs_offset, + "bad_byte_hex": bad_byte.hex(), + "context_hex": ctx_bytes.hex(), + "context_preview": ctx_bytes.decode("utf-8", errors="replace"), + } + ) # Move past the offending byte and continue pos = abs_index_in_line + 1 return results @@ -68,8 +73,10 @@ def scan_file_for_utf8_errors(path: str, context: int, limit: int): results = scan_line_for_utf8_errors(line, line_no, total_offset, context) for r in results: errors_found += 1 - print(f"[ERROR {errors_found}] Line {r['line']}, Column {r['column']}, " - f"Absolute byte offset {r['abs_offset']}") + print( + f"[ERROR {errors_found}] Line {r['line']}, Column {r['column']}, " + f"Absolute byte offset {r['abs_offset']}" + ) print(f" Bad byte: 0x{r['bad_byte_hex']}") print(f" Context (hex): {r['context_hex']}") print(f" Context (preview): {r['context_preview']}") @@ -90,6 +97,7 @@ def scan_file_for_utf8_errors(path: str, context: int, limit: int): # Whole-file conversion # ------------------------- + def detect_encoding_text(path: str) -> Tuple[str, str]: """ Use charset-normalizer to detect file encoding. @@ -98,7 +106,10 @@ def detect_encoding_text(path: str) -> Tuple[str, str]: try: from charset_normalizer import from_path except ImportError: - print("Please install charset-normalizer: pip install charset-normalizer", file=sys.stderr) + print( + "Please install charset-normalizer: pip install charset-normalizer", + file=sys.stderr, + ) sys.exit(4) matches = from_path(path) @@ -155,6 +166,7 @@ def verify_utf8_file(path: str) -> Tuple[bool, str]: # Targeted single-byte fixer # ------------------------- + def iter_lines_with_offsets(b: bytes): """ Yield (line_bytes, line_start_abs_offset). Preserves LF/CRLF/CR in bytes. @@ -162,7 +174,7 @@ def iter_lines_with_offsets(b: bytes): start = 0 for i, byte in enumerate(b): if byte == 0x0A: # LF - yield b[start:i+1], start + yield b[start : i + 1], start start = i + 1 if start < len(b): yield b[start:], start @@ -192,10 +204,12 @@ def repair_mixed_utf8_line(line: bytes, base_offset: int, fallback_chain: List[s except UnicodeDecodeError as e: # Append the valid prefix before the error if e.start > 0: - out_fragments.append(line[pos:pos + e.start].decode("utf-8", errors="strict")) + out_fragments.append( + line[pos : pos + e.start].decode("utf-8", errors="strict") + ) bad_index = pos + e.start # absolute index in 'line' - bad_slice = line[bad_index:bad_index + 1] # FIX EXACTLY ONE BYTE + bad_slice = line[bad_index : bad_index + 1] # FIX EXACTLY ONE BYTE # Decode that single byte using the first working fallback decoded = None @@ -216,15 +230,17 @@ def repair_mixed_utf8_line(line: bytes, base_offset: int, fallback_chain: List[s # Log the fix col_1based = bad_index + 1 # byte-based column - fixes.append({ - "line_base_offset": base_offset, - "line": None, # caller fills line number - "column": col_1based, - "abs_offset": base_offset + bad_index, - "bad_bytes_hex": bad_slice.hex(), - "used_encoding": used_enc, - "replacement_preview": decoded - }) + fixes.append( + { + "line_base_offset": base_offset, + "line": None, # caller fills line number + "column": col_1based, + "abs_offset": base_offset + bad_index, + "bad_bytes_hex": bad_slice.hex(), + "used_encoding": used_enc, + "replacement_preview": decoded, + } + ) # Advance exactly one byte past the offending byte and continue pos = bad_index + 1 @@ -232,7 +248,13 @@ def repair_mixed_utf8_line(line: bytes, base_offset: int, fallback_chain: List[s return "".join(out_fragments), fixes -def targeted_fix_to_utf8(src_path: str, out_path: str, fallback_chain: List[str], dry_run: bool, max_fixes: int): +def targeted_fix_to_utf8( + src_path: str, + out_path: str, + fallback_chain: List[str], + dry_run: bool, + max_fixes: int, +): with open(src_path, "rb") as fb: data = fb.read() @@ -261,10 +283,12 @@ def targeted_fix_to_utf8(src_path: str, out_path: str, fallback_chain: List[str] # Log fixes for f in fixes: total_fixes += 1 - print(f"[FIX {total_fixes}] Line {f['line']}, Column {f['column']}, Abs offset {f['abs_offset']}") + print( + f"[FIX {total_fixes}] Line {f['line']}, Column {f['column']}, Abs offset {f['abs_offset']}" + ) print(f" Bad bytes: 0x{f['bad_bytes_hex']}") print(f" Used encoding: {f['used_encoding']}") - preview = f['replacement_preview'].replace("\r", "\\r").replace("\n", "\\n") + preview = f["replacement_preview"].replace("\r", "\\r").replace("\n", "\\n") if len(preview) > 40: preview = preview[:40] + "…" print(f" Replacement preview: {preview}") @@ -299,6 +323,7 @@ def targeted_fix_to_utf8(src_path: str, out_path: str, fallback_chain: List[str] # CLI # ------------------------- + def main(): ap = argparse.ArgumentParser( description=( @@ -310,26 +335,60 @@ def main(): formatter_class=argparse.RawTextHelpFormatter, ) ap.add_argument("path", help="Path to the CSV/text file") - ap.add_argument("--context", type=int, default=20, help="Bytes of context to show around errors (default: 20)") - ap.add_argument("--limit", type=int, default=100, help="Max errors to report during scan (0 = unlimited)") - ap.add_argument("--skip-scan", action="store_true", help="Skip initial scan for speed") + ap.add_argument( + "--context", + type=int, + default=20, + help="Bytes of context to show around errors (default: 20)", + ) + ap.add_argument( + "--limit", + type=int, + default=100, + help="Max errors to report during scan (0 = unlimited)", + ) + ap.add_argument( + "--skip-scan", action="store_true", help="Skip initial scan for speed" + ) # Whole-file convert - ap.add_argument("--convert", action="store_true", - help="Convert entire file to UTF-8 using auto/forced encoding " - "(in-place by default; creates '.bak').") - ap.add_argument("--encoding", help="Force source encoding for --convert or first fallback for --fix") - ap.add_argument("--output", help="Write to this path instead of in-place (no .bak is created in that case)") + ap.add_argument( + "--convert", + action="store_true", + help="Convert entire file to UTF-8 using auto/forced encoding " + "(in-place by default; creates '.bak').", + ) + ap.add_argument( + "--encoding", + help="Force source encoding for --convert or first fallback for --fix", + ) + ap.add_argument( + "--output", + help="Write to this path instead of in-place (no .bak is created in that case)", + ) # Targeted fix - ap.add_argument("--fix", action="store_true", - help="Fix only invalid byte(s) via fallback encodings " - "(in-place by default; creates '.bak').") - ap.add_argument("--fallbacks", - help="Comma-separated fallback encodings (default: cp1252,iso-8859-1,iso-8859-15)") - ap.add_argument("--dry-run", action="store_true", - help="(fix) Print fixes but do not write or create a .bak") - ap.add_argument("--max-fixes", type=int, default=0, help="(fix) Stop after N fixes (0 = unlimited)") + ap.add_argument( + "--fix", + action="store_true", + help="Fix only invalid byte(s) via fallback encodings " + "(in-place by default; creates '.bak').", + ) + ap.add_argument( + "--fallbacks", + help="Comma-separated fallback encodings (default: cp1252,iso-8859-1,iso-8859-15)", + ) + ap.add_argument( + "--dry-run", + action="store_true", + help="(fix) Print fixes but do not write or create a .bak", + ) + ap.add_argument( + "--max-fixes", + type=int, + default=0, + help="(fix) Stop after N fixes (0 = unlimited)", + ) args = ap.parse_args() path = args.path