Sdd find_bad_utf8.py

This commit is contained in:
Sean Whalen
2025-08-17 15:55:47 -04:00
parent ed25526d59
commit 39347cb244
+429
View File
@@ -0,0 +1,429 @@
#!/usr/bin/env python3
import argparse
import codecs
import os
import sys
import shutil
from typing import List, Tuple
"""
Locates and optionally corrects bad UTF-8 bytes in a file.
Generated by GPT-5 Use at your own risk.
"""
# -------------------------
# UTF-8 scanning
# -------------------------
def scan_line_for_utf8_errors(line_bytes: bytes, line_no: int, base_offset: int, context: int):
"""
Scan one line of raw bytes for UTF-8 decoding errors.
Returns a list of dicts describing each error.
"""
pos = 0
results = []
while pos < len(line_bytes):
dec = codecs.getincrementaldecoder("utf-8")("strict")
try:
dec.decode(line_bytes[pos:], final=True)
break
except UnicodeDecodeError as e:
rel_index = e.start
abs_index_in_line = pos + rel_index
abs_offset = base_offset + abs_index_in_line
start_ctx = max(0, abs_index_in_line - context)
end_ctx = min(len(line_bytes), abs_index_in_line + 1 + context)
ctx_bytes = line_bytes[start_ctx:end_ctx]
bad_byte = line_bytes[abs_index_in_line:abs_index_in_line+1]
col = abs_index_in_line + 1 # 1-based byte column
results.append({
"line": line_no,
"column": col,
"abs_offset": abs_offset,
"bad_byte_hex": bad_byte.hex(),
"context_hex": ctx_bytes.hex(),
"context_preview": ctx_bytes.decode("utf-8", errors="replace"),
})
# Move past the offending byte and continue
pos = abs_index_in_line + 1
return results
def scan_file_for_utf8_errors(path: str, context: int, limit: int):
errors_found = 0
limit_val = limit if limit != 0 else float("inf")
with open(path, "rb") as f:
total_offset = 0
line_no = 0
while True:
line = f.readline()
if not line:
break
line_no += 1
results = scan_line_for_utf8_errors(line, line_no, total_offset, context)
for r in results:
errors_found += 1
print(f"[ERROR {errors_found}] Line {r['line']}, Column {r['column']}, "
f"Absolute byte offset {r['abs_offset']}")
print(f" Bad byte: 0x{r['bad_byte_hex']}")
print(f" Context (hex): {r['context_hex']}")
print(f" Context (preview): {r['context_preview']}")
print()
if errors_found >= limit_val:
print(f"Reached limit of {limit} errors. Stopping.")
return errors_found
total_offset += len(line)
if errors_found == 0:
print("No invalid UTF-8 bytes found. 🎉")
else:
print(f"Found {errors_found} invalid UTF-8 byte(s).")
return errors_found
# -------------------------
# Whole-file conversion
# -------------------------
def detect_encoding_text(path: str) -> Tuple[str, str]:
"""
Use charset-normalizer to detect file encoding.
Return (encoding_name, decoded_text). Falls back to cp1252 if needed.
"""
try:
from charset_normalizer import from_path
except ImportError:
print("Please install charset-normalizer: pip install charset-normalizer", file=sys.stderr)
sys.exit(4)
matches = from_path(path)
match = matches.best()
if match is None or match.encoding is None:
# Fallback heuristic for Western single-byte text
with open(path, "rb") as fb:
data = fb.read()
try:
return "cp1252", data.decode("cp1252", errors="strict")
except UnicodeDecodeError:
print("Unable to detect encoding reliably.", file=sys.stderr)
sys.exit(5)
return match.encoding, str(match)
def convert_to_utf8(src_path: str, out_path: str, src_encoding: str = None) -> str:
"""
Convert an entire file to UTF-8 (re-decoding everything).
If src_encoding is provided, use it; else auto-detect.
Returns the encoding actually used.
"""
if src_encoding:
with open(src_path, "rb") as fb:
data = fb.read()
try:
text = data.decode(src_encoding, errors="strict")
except LookupError:
print(f"Unknown encoding: {src_encoding}", file=sys.stderr)
sys.exit(6)
except UnicodeDecodeError as e:
print(f"Decoding failed with {src_encoding}: {e}", file=sys.stderr)
sys.exit(7)
used = src_encoding
else:
used, text = detect_encoding_text(src_path)
with open(out_path, "w", encoding="utf-8", newline="") as fw:
fw.write(text)
return used
def verify_utf8_file(path: str) -> Tuple[bool, str]:
try:
with open(path, "rb") as fb:
fb.read().decode("utf-8", errors="strict")
return True, ""
except UnicodeDecodeError as e:
return False, str(e)
# -------------------------
# Targeted single-byte fixer
# -------------------------
def iter_lines_with_offsets(b: bytes):
"""
Yield (line_bytes, line_start_abs_offset). Preserves LF/CRLF/CR in bytes.
"""
start = 0
for i, byte in enumerate(b):
if byte == 0x0A: # LF
yield b[start:i+1], start
start = i + 1
if start < len(b):
yield b[start:], start
def detect_probable_fallbacks() -> List[str]:
# Good defaults for Western/Portuguese text
return ["cp1252", "iso-8859-1", "iso-8859-15"]
def repair_mixed_utf8_line(line: bytes, base_offset: int, fallback_chain: List[str]):
"""
Strictly validate UTF-8 and fix *only* the exact offending byte when an error occurs.
This avoids touching adjacent valid UTF-8 (prevents mojibake like 'é').
"""
out_fragments: List[str] = []
fixes = []
pos = 0
n = len(line)
while pos < n:
dec = codecs.getincrementaldecoder("utf-8")("strict")
try:
s = dec.decode(line[pos:], final=True)
out_fragments.append(s)
break
except UnicodeDecodeError as e:
# Append the valid prefix before the error
if e.start > 0:
out_fragments.append(line[pos:pos + e.start].decode("utf-8", errors="strict"))
bad_index = pos + e.start # absolute index in 'line'
bad_slice = line[bad_index:bad_index + 1] # FIX EXACTLY ONE BYTE
# Decode that single byte using the first working fallback
decoded = None
used_enc = None
for enc in fallback_chain:
try:
decoded = bad_slice.decode(enc, errors="strict")
used_enc = enc
break
except Exception:
continue
if decoded is None:
# latin-1 always succeeds (byte->same code point)
decoded = bad_slice.decode("latin-1")
used_enc = "latin-1 (fallback)"
out_fragments.append(decoded)
# Log the fix
col_1based = bad_index + 1 # byte-based column
fixes.append({
"line_base_offset": base_offset,
"line": None, # caller fills line number
"column": col_1based,
"abs_offset": base_offset + bad_index,
"bad_bytes_hex": bad_slice.hex(),
"used_encoding": used_enc,
"replacement_preview": decoded
})
# Advance exactly one byte past the offending byte and continue
pos = bad_index + 1
return "".join(out_fragments), fixes
def targeted_fix_to_utf8(src_path: str, out_path: str, fallback_chain: List[str], dry_run: bool, max_fixes: int):
with open(src_path, "rb") as fb:
data = fb.read()
total_fixes = 0
repaired_lines: List[str] = []
line_no = 0
max_val = max_fixes if max_fixes != 0 else float("inf")
for line_bytes, base_offset in iter_lines_with_offsets(data):
line_no += 1
# Fast path: keep lines that are already valid UTF-8
try:
repaired_lines.append(line_bytes.decode("utf-8", errors="strict"))
continue
except UnicodeDecodeError:
pass
fixed_text, fixes = repair_mixed_utf8_line(
line_bytes, base_offset, fallback_chain=fallback_chain
)
for f in fixes:
f["line"] = line_no
repaired_lines.append(fixed_text)
# Log fixes
for f in fixes:
total_fixes += 1
print(f"[FIX {total_fixes}] Line {f['line']}, Column {f['column']}, Abs offset {f['abs_offset']}")
print(f" Bad bytes: 0x{f['bad_bytes_hex']}")
print(f" Used encoding: {f['used_encoding']}")
preview = f['replacement_preview'].replace("\r", "\\r").replace("\n", "\\n")
if len(preview) > 40:
preview = preview[:40] + ""
print(f" Replacement preview: {preview}")
print()
if total_fixes >= max_val:
print(f"Reached max fixes limit ({max_fixes}). Stopping scan.")
break
if total_fixes >= max_val:
break
if dry_run:
print(f"Dry run complete. Detected {total_fixes} fix(es). No file written.")
return total_fixes
# Join and verify result can be encoded to UTF-8
repaired_text = "".join(repaired_lines)
try:
repaired_text.encode("utf-8", errors="strict")
except UnicodeEncodeError as e:
print(f"Internal error: repaired text not valid UTF-8: {e}", file=sys.stderr)
sys.exit(3)
with open(out_path, "w", encoding="utf-8", newline="") as fw:
fw.write(repaired_text)
print(f"Fixed file written to: {out_path}")
print(f"Total fixes applied: {total_fixes}")
return total_fixes
# -------------------------
# CLI
# -------------------------
def main():
ap = argparse.ArgumentParser(
description=(
"Scan for invalid UTF-8; optionally convert whole file or fix only invalid bytes.\n\n"
"By default, --convert and --fix **edit the input file in place** and create a backup "
"named '<input>.bak' before writing. If you pass --output, the original file is left "
"unchanged and no backup is created. Use --dry-run to preview fixes without writing."
),
formatter_class=argparse.RawTextHelpFormatter,
)
ap.add_argument("path", help="Path to the CSV/text file")
ap.add_argument("--context", type=int, default=20, help="Bytes of context to show around errors (default: 20)")
ap.add_argument("--limit", type=int, default=100, help="Max errors to report during scan (0 = unlimited)")
ap.add_argument("--skip-scan", action="store_true", help="Skip initial scan for speed")
# Whole-file convert
ap.add_argument("--convert", action="store_true",
help="Convert entire file to UTF-8 using auto/forced encoding "
"(in-place by default; creates '<input>.bak').")
ap.add_argument("--encoding", help="Force source encoding for --convert or first fallback for --fix")
ap.add_argument("--output", help="Write to this path instead of in-place (no .bak is created in that case)")
# Targeted fix
ap.add_argument("--fix", action="store_true",
help="Fix only invalid byte(s) via fallback encodings "
"(in-place by default; creates '<input>.bak').")
ap.add_argument("--fallbacks",
help="Comma-separated fallback encodings (default: cp1252,iso-8859-1,iso-8859-15)")
ap.add_argument("--dry-run", action="store_true",
help="(fix) Print fixes but do not write or create a .bak")
ap.add_argument("--max-fixes", type=int, default=0, help="(fix) Stop after N fixes (0 = unlimited)")
args = ap.parse_args()
path = args.path
if not os.path.isfile(path):
print(f"File not found: {path}", file=sys.stderr)
sys.exit(2)
# Optional scan first
if not args.skip_scan:
scan_file_for_utf8_errors(path, context=args.context, limit=args.limit)
# Mode selection guards
if args.convert and args.fix:
print("Choose either --convert or --fix (not both).", file=sys.stderr)
sys.exit(9)
if not args.convert and not args.fix and args.skip_scan:
print("No action selected (use --convert or --fix).")
return
if not args.convert and not args.fix:
# User only wanted a scan
return
# Determine output path and backup behavior
# In-place by default: create '<input>.bak' before overwriting.
if args.output:
out_path = args.output
in_place = False
else:
out_path = path
in_place = True
# CONVERT mode
if args.convert:
print("\n[CONVERT MODE] Converting file to UTF-8...")
if in_place:
# Create backup before overwriting original
backup_path = path + ".bak"
shutil.copy2(path, backup_path)
print(f"Backup created: {backup_path}")
used = convert_to_utf8(path, out_path, src_encoding=args.encoding)
print(f"Source encoding used: {used}")
print(f"Saved UTF-8 file as: {out_path}")
ok, err = verify_utf8_file(out_path)
if ok:
print("Verification: output is valid UTF-8 ✅")
else:
print(f"Verification failed: {err}")
sys.exit(8)
return
# FIX mode (targeted, single-byte)
if args.fix:
print("\n[FIX MODE] Fixing only invalid bytes to UTF-8...")
if args.dry_run:
# Dry-run: never write or create backup
out_path_effective = os.devnull
in_place_effective = False
else:
out_path_effective = out_path
in_place_effective = in_place
# Build fallback chain (if --encoding provided, try it first)
if args.fallbacks:
fallback_chain = [e.strip() for e in args.fallbacks.split(",") if e.strip()]
else:
fallback_chain = detect_probable_fallbacks()
if args.encoding and args.encoding not in fallback_chain:
fallback_chain = [args.encoding] + fallback_chain
if in_place_effective:
# Create backup before overwriting original (only when actually writing)
backup_path = path + ".bak"
shutil.copy2(path, backup_path)
print(f"Backup created: {backup_path}")
fix_count = targeted_fix_to_utf8(
path,
out_path_effective,
fallback_chain=fallback_chain,
dry_run=args.dry_run,
max_fixes=args.max_fixes,
)
if not args.dry_run:
ok, err = verify_utf8_file(out_path_effective)
if ok:
print("Verification: output is valid UTF-8 ✅")
print(f"Fix mode completed — {fix_count} byte(s) corrected.")
else:
print(f"Verification failed: {err}")
sys.exit(8)
return
if __name__ == "__main__":
main()