diff --git a/parsedmarc/resources/maps/find_bad_utf8.py b/parsedmarc/resources/maps/find_bad_utf8.py new file mode 100755 index 0000000..0852ad9 --- /dev/null +++ b/parsedmarc/resources/maps/find_bad_utf8.py @@ -0,0 +1,429 @@ +#!/usr/bin/env python3 + + +import argparse +import codecs +import os +import sys +import shutil +from typing import List, Tuple + +""" +Locates and optionally corrects bad UTF-8 bytes in a file. +Generated by GPT-5 Use at your own risk. +""" + +# ------------------------- +# UTF-8 scanning +# ------------------------- + +def scan_line_for_utf8_errors(line_bytes: bytes, line_no: int, base_offset: int, context: int): + """ + Scan one line of raw bytes for UTF-8 decoding errors. + Returns a list of dicts describing each error. + """ + pos = 0 + results = [] + while pos < len(line_bytes): + dec = codecs.getincrementaldecoder("utf-8")("strict") + try: + dec.decode(line_bytes[pos:], final=True) + break + except UnicodeDecodeError as e: + rel_index = e.start + abs_index_in_line = pos + rel_index + abs_offset = base_offset + abs_index_in_line + + start_ctx = max(0, abs_index_in_line - context) + end_ctx = min(len(line_bytes), abs_index_in_line + 1 + context) + ctx_bytes = line_bytes[start_ctx:end_ctx] + bad_byte = line_bytes[abs_index_in_line:abs_index_in_line+1] + col = abs_index_in_line + 1 # 1-based byte column + + results.append({ + "line": line_no, + "column": col, + "abs_offset": abs_offset, + "bad_byte_hex": bad_byte.hex(), + "context_hex": ctx_bytes.hex(), + "context_preview": ctx_bytes.decode("utf-8", errors="replace"), + }) + # Move past the offending byte and continue + pos = abs_index_in_line + 1 + return results + + +def scan_file_for_utf8_errors(path: str, context: int, limit: int): + errors_found = 0 + limit_val = limit if limit != 0 else float("inf") + + with open(path, "rb") as f: + total_offset = 0 + line_no = 0 + while True: + line = f.readline() + if not line: + break + line_no += 1 + results = scan_line_for_utf8_errors(line, line_no, total_offset, context) + for r in results: + errors_found += 1 + print(f"[ERROR {errors_found}] Line {r['line']}, Column {r['column']}, " + f"Absolute byte offset {r['abs_offset']}") + print(f" Bad byte: 0x{r['bad_byte_hex']}") + print(f" Context (hex): {r['context_hex']}") + print(f" Context (preview): {r['context_preview']}") + print() + if errors_found >= limit_val: + print(f"Reached limit of {limit} errors. Stopping.") + return errors_found + total_offset += len(line) + + if errors_found == 0: + print("No invalid UTF-8 bytes found. 🎉") + else: + print(f"Found {errors_found} invalid UTF-8 byte(s).") + return errors_found + + +# ------------------------- +# Whole-file conversion +# ------------------------- + +def detect_encoding_text(path: str) -> Tuple[str, str]: + """ + Use charset-normalizer to detect file encoding. + Return (encoding_name, decoded_text). Falls back to cp1252 if needed. + """ + try: + from charset_normalizer import from_path + except ImportError: + print("Please install charset-normalizer: pip install charset-normalizer", file=sys.stderr) + sys.exit(4) + + matches = from_path(path) + match = matches.best() + if match is None or match.encoding is None: + # Fallback heuristic for Western single-byte text + with open(path, "rb") as fb: + data = fb.read() + try: + return "cp1252", data.decode("cp1252", errors="strict") + except UnicodeDecodeError: + print("Unable to detect encoding reliably.", file=sys.stderr) + sys.exit(5) + + return match.encoding, str(match) + + +def convert_to_utf8(src_path: str, out_path: str, src_encoding: str = None) -> str: + """ + Convert an entire file to UTF-8 (re-decoding everything). + If src_encoding is provided, use it; else auto-detect. + Returns the encoding actually used. + """ + if src_encoding: + with open(src_path, "rb") as fb: + data = fb.read() + try: + text = data.decode(src_encoding, errors="strict") + except LookupError: + print(f"Unknown encoding: {src_encoding}", file=sys.stderr) + sys.exit(6) + except UnicodeDecodeError as e: + print(f"Decoding failed with {src_encoding}: {e}", file=sys.stderr) + sys.exit(7) + used = src_encoding + else: + used, text = detect_encoding_text(src_path) + + with open(out_path, "w", encoding="utf-8", newline="") as fw: + fw.write(text) + return used + + +def verify_utf8_file(path: str) -> Tuple[bool, str]: + try: + with open(path, "rb") as fb: + fb.read().decode("utf-8", errors="strict") + return True, "" + except UnicodeDecodeError as e: + return False, str(e) + + +# ------------------------- +# Targeted single-byte fixer +# ------------------------- + +def iter_lines_with_offsets(b: bytes): + """ + Yield (line_bytes, line_start_abs_offset). Preserves LF/CRLF/CR in bytes. + """ + start = 0 + for i, byte in enumerate(b): + if byte == 0x0A: # LF + yield b[start:i+1], start + start = i + 1 + if start < len(b): + yield b[start:], start + + +def detect_probable_fallbacks() -> List[str]: + # Good defaults for Western/Portuguese text + return ["cp1252", "iso-8859-1", "iso-8859-15"] + + +def repair_mixed_utf8_line(line: bytes, base_offset: int, fallback_chain: List[str]): + """ + Strictly validate UTF-8 and fix *only* the exact offending byte when an error occurs. + This avoids touching adjacent valid UTF-8 (prevents mojibake like 'é'). + """ + out_fragments: List[str] = [] + fixes = [] + pos = 0 + n = len(line) + + while pos < n: + dec = codecs.getincrementaldecoder("utf-8")("strict") + try: + s = dec.decode(line[pos:], final=True) + out_fragments.append(s) + break + except UnicodeDecodeError as e: + # Append the valid prefix before the error + if e.start > 0: + out_fragments.append(line[pos:pos + e.start].decode("utf-8", errors="strict")) + + bad_index = pos + e.start # absolute index in 'line' + bad_slice = line[bad_index:bad_index + 1] # FIX EXACTLY ONE BYTE + + # Decode that single byte using the first working fallback + decoded = None + used_enc = None + for enc in fallback_chain: + try: + decoded = bad_slice.decode(enc, errors="strict") + used_enc = enc + break + except Exception: + continue + if decoded is None: + # latin-1 always succeeds (byte->same code point) + decoded = bad_slice.decode("latin-1") + used_enc = "latin-1 (fallback)" + + out_fragments.append(decoded) + + # Log the fix + col_1based = bad_index + 1 # byte-based column + fixes.append({ + "line_base_offset": base_offset, + "line": None, # caller fills line number + "column": col_1based, + "abs_offset": base_offset + bad_index, + "bad_bytes_hex": bad_slice.hex(), + "used_encoding": used_enc, + "replacement_preview": decoded + }) + + # Advance exactly one byte past the offending byte and continue + pos = bad_index + 1 + + return "".join(out_fragments), fixes + + +def targeted_fix_to_utf8(src_path: str, out_path: str, fallback_chain: List[str], dry_run: bool, max_fixes: int): + with open(src_path, "rb") as fb: + data = fb.read() + + total_fixes = 0 + repaired_lines: List[str] = [] + line_no = 0 + max_val = max_fixes if max_fixes != 0 else float("inf") + + for line_bytes, base_offset in iter_lines_with_offsets(data): + line_no += 1 + # Fast path: keep lines that are already valid UTF-8 + try: + repaired_lines.append(line_bytes.decode("utf-8", errors="strict")) + continue + except UnicodeDecodeError: + pass + + fixed_text, fixes = repair_mixed_utf8_line( + line_bytes, base_offset, fallback_chain=fallback_chain + ) + for f in fixes: + f["line"] = line_no + + repaired_lines.append(fixed_text) + + # Log fixes + for f in fixes: + total_fixes += 1 + print(f"[FIX {total_fixes}] Line {f['line']}, Column {f['column']}, Abs offset {f['abs_offset']}") + print(f" Bad bytes: 0x{f['bad_bytes_hex']}") + print(f" Used encoding: {f['used_encoding']}") + preview = f['replacement_preview'].replace("\r", "\\r").replace("\n", "\\n") + if len(preview) > 40: + preview = preview[:40] + "…" + print(f" Replacement preview: {preview}") + print() + if total_fixes >= max_val: + print(f"Reached max fixes limit ({max_fixes}). Stopping scan.") + break + if total_fixes >= max_val: + break + + if dry_run: + print(f"Dry run complete. Detected {total_fixes} fix(es). No file written.") + return total_fixes + + # Join and verify result can be encoded to UTF-8 + repaired_text = "".join(repaired_lines) + try: + repaired_text.encode("utf-8", errors="strict") + except UnicodeEncodeError as e: + print(f"Internal error: repaired text not valid UTF-8: {e}", file=sys.stderr) + sys.exit(3) + + with open(out_path, "w", encoding="utf-8", newline="") as fw: + fw.write(repaired_text) + + print(f"Fixed file written to: {out_path}") + print(f"Total fixes applied: {total_fixes}") + return total_fixes + + +# ------------------------- +# CLI +# ------------------------- + +def main(): + ap = argparse.ArgumentParser( + description=( + "Scan for invalid UTF-8; optionally convert whole file or fix only invalid bytes.\n\n" + "By default, --convert and --fix **edit the input file in place** and create a backup " + "named '.bak' before writing. If you pass --output, the original file is left " + "unchanged and no backup is created. Use --dry-run to preview fixes without writing." + ), + formatter_class=argparse.RawTextHelpFormatter, + ) + ap.add_argument("path", help="Path to the CSV/text file") + ap.add_argument("--context", type=int, default=20, help="Bytes of context to show around errors (default: 20)") + ap.add_argument("--limit", type=int, default=100, help="Max errors to report during scan (0 = unlimited)") + ap.add_argument("--skip-scan", action="store_true", help="Skip initial scan for speed") + + # Whole-file convert + ap.add_argument("--convert", action="store_true", + help="Convert entire file to UTF-8 using auto/forced encoding " + "(in-place by default; creates '.bak').") + ap.add_argument("--encoding", help="Force source encoding for --convert or first fallback for --fix") + ap.add_argument("--output", help="Write to this path instead of in-place (no .bak is created in that case)") + + # Targeted fix + ap.add_argument("--fix", action="store_true", + help="Fix only invalid byte(s) via fallback encodings " + "(in-place by default; creates '.bak').") + ap.add_argument("--fallbacks", + help="Comma-separated fallback encodings (default: cp1252,iso-8859-1,iso-8859-15)") + ap.add_argument("--dry-run", action="store_true", + help="(fix) Print fixes but do not write or create a .bak") + ap.add_argument("--max-fixes", type=int, default=0, help="(fix) Stop after N fixes (0 = unlimited)") + + args = ap.parse_args() + path = args.path + + if not os.path.isfile(path): + print(f"File not found: {path}", file=sys.stderr) + sys.exit(2) + + # Optional scan first + if not args.skip_scan: + scan_file_for_utf8_errors(path, context=args.context, limit=args.limit) + + # Mode selection guards + if args.convert and args.fix: + print("Choose either --convert or --fix (not both).", file=sys.stderr) + sys.exit(9) + if not args.convert and not args.fix and args.skip_scan: + print("No action selected (use --convert or --fix).") + return + if not args.convert and not args.fix: + # User only wanted a scan + return + + # Determine output path and backup behavior + # In-place by default: create '.bak' before overwriting. + if args.output: + out_path = args.output + in_place = False + else: + out_path = path + in_place = True + + # CONVERT mode + if args.convert: + print("\n[CONVERT MODE] Converting file to UTF-8...") + if in_place: + # Create backup before overwriting original + backup_path = path + ".bak" + shutil.copy2(path, backup_path) + print(f"Backup created: {backup_path}") + used = convert_to_utf8(path, out_path, src_encoding=args.encoding) + print(f"Source encoding used: {used}") + print(f"Saved UTF-8 file as: {out_path}") + ok, err = verify_utf8_file(out_path) + if ok: + print("Verification: output is valid UTF-8 ✅") + else: + print(f"Verification failed: {err}") + sys.exit(8) + return + + # FIX mode (targeted, single-byte) + if args.fix: + print("\n[FIX MODE] Fixing only invalid bytes to UTF-8...") + if args.dry_run: + # Dry-run: never write or create backup + out_path_effective = os.devnull + in_place_effective = False + else: + out_path_effective = out_path + in_place_effective = in_place + + # Build fallback chain (if --encoding provided, try it first) + if args.fallbacks: + fallback_chain = [e.strip() for e in args.fallbacks.split(",") if e.strip()] + else: + fallback_chain = detect_probable_fallbacks() + if args.encoding and args.encoding not in fallback_chain: + fallback_chain = [args.encoding] + fallback_chain + + if in_place_effective: + # Create backup before overwriting original (only when actually writing) + backup_path = path + ".bak" + shutil.copy2(path, backup_path) + print(f"Backup created: {backup_path}") + + fix_count = targeted_fix_to_utf8( + path, + out_path_effective, + fallback_chain=fallback_chain, + dry_run=args.dry_run, + max_fixes=args.max_fixes, + ) + + if not args.dry_run: + ok, err = verify_utf8_file(out_path_effective) + if ok: + print("Verification: output is valid UTF-8 ✅") + print(f"Fix mode completed — {fix_count} byte(s) corrected.") + else: + print(f"Verification failed: {err}") + sys.exit(8) + return + + +if __name__ == "__main__": + main()