Update code formatting

This commit is contained in:
Sean Whalen
2025-08-17 16:01:45 -04:00
parent 39347cb244
commit 01630bb61c
+103 -44
View File
@@ -17,7 +17,10 @@ Generated by GPT-5 Use at your own risk.
# UTF-8 scanning
# -------------------------
def scan_line_for_utf8_errors(line_bytes: bytes, line_no: int, base_offset: int, context: int):
def scan_line_for_utf8_errors(
line_bytes: bytes, line_no: int, base_offset: int, context: int
):
"""
Scan one line of raw bytes for UTF-8 decoding errors.
Returns a list of dicts describing each error.
@@ -37,17 +40,19 @@ def scan_line_for_utf8_errors(line_bytes: bytes, line_no: int, base_offset: int,
start_ctx = max(0, abs_index_in_line - context)
end_ctx = min(len(line_bytes), abs_index_in_line + 1 + context)
ctx_bytes = line_bytes[start_ctx:end_ctx]
bad_byte = line_bytes[abs_index_in_line:abs_index_in_line+1]
bad_byte = line_bytes[abs_index_in_line : abs_index_in_line + 1]
col = abs_index_in_line + 1 # 1-based byte column
results.append({
"line": line_no,
"column": col,
"abs_offset": abs_offset,
"bad_byte_hex": bad_byte.hex(),
"context_hex": ctx_bytes.hex(),
"context_preview": ctx_bytes.decode("utf-8", errors="replace"),
})
results.append(
{
"line": line_no,
"column": col,
"abs_offset": abs_offset,
"bad_byte_hex": bad_byte.hex(),
"context_hex": ctx_bytes.hex(),
"context_preview": ctx_bytes.decode("utf-8", errors="replace"),
}
)
# Move past the offending byte and continue
pos = abs_index_in_line + 1
return results
@@ -68,8 +73,10 @@ def scan_file_for_utf8_errors(path: str, context: int, limit: int):
results = scan_line_for_utf8_errors(line, line_no, total_offset, context)
for r in results:
errors_found += 1
print(f"[ERROR {errors_found}] Line {r['line']}, Column {r['column']}, "
f"Absolute byte offset {r['abs_offset']}")
print(
f"[ERROR {errors_found}] Line {r['line']}, Column {r['column']}, "
f"Absolute byte offset {r['abs_offset']}"
)
print(f" Bad byte: 0x{r['bad_byte_hex']}")
print(f" Context (hex): {r['context_hex']}")
print(f" Context (preview): {r['context_preview']}")
@@ -90,6 +97,7 @@ def scan_file_for_utf8_errors(path: str, context: int, limit: int):
# Whole-file conversion
# -------------------------
def detect_encoding_text(path: str) -> Tuple[str, str]:
"""
Use charset-normalizer to detect file encoding.
@@ -98,7 +106,10 @@ def detect_encoding_text(path: str) -> Tuple[str, str]:
try:
from charset_normalizer import from_path
except ImportError:
print("Please install charset-normalizer: pip install charset-normalizer", file=sys.stderr)
print(
"Please install charset-normalizer: pip install charset-normalizer",
file=sys.stderr,
)
sys.exit(4)
matches = from_path(path)
@@ -155,6 +166,7 @@ def verify_utf8_file(path: str) -> Tuple[bool, str]:
# Targeted single-byte fixer
# -------------------------
def iter_lines_with_offsets(b: bytes):
"""
Yield (line_bytes, line_start_abs_offset). Preserves LF/CRLF/CR in bytes.
@@ -162,7 +174,7 @@ def iter_lines_with_offsets(b: bytes):
start = 0
for i, byte in enumerate(b):
if byte == 0x0A: # LF
yield b[start:i+1], start
yield b[start : i + 1], start
start = i + 1
if start < len(b):
yield b[start:], start
@@ -192,10 +204,12 @@ def repair_mixed_utf8_line(line: bytes, base_offset: int, fallback_chain: List[s
except UnicodeDecodeError as e:
# Append the valid prefix before the error
if e.start > 0:
out_fragments.append(line[pos:pos + e.start].decode("utf-8", errors="strict"))
out_fragments.append(
line[pos : pos + e.start].decode("utf-8", errors="strict")
)
bad_index = pos + e.start # absolute index in 'line'
bad_slice = line[bad_index:bad_index + 1] # FIX EXACTLY ONE BYTE
bad_slice = line[bad_index : bad_index + 1] # FIX EXACTLY ONE BYTE
# Decode that single byte using the first working fallback
decoded = None
@@ -216,15 +230,17 @@ def repair_mixed_utf8_line(line: bytes, base_offset: int, fallback_chain: List[s
# Log the fix
col_1based = bad_index + 1 # byte-based column
fixes.append({
"line_base_offset": base_offset,
"line": None, # caller fills line number
"column": col_1based,
"abs_offset": base_offset + bad_index,
"bad_bytes_hex": bad_slice.hex(),
"used_encoding": used_enc,
"replacement_preview": decoded
})
fixes.append(
{
"line_base_offset": base_offset,
"line": None, # caller fills line number
"column": col_1based,
"abs_offset": base_offset + bad_index,
"bad_bytes_hex": bad_slice.hex(),
"used_encoding": used_enc,
"replacement_preview": decoded,
}
)
# Advance exactly one byte past the offending byte and continue
pos = bad_index + 1
@@ -232,7 +248,13 @@ def repair_mixed_utf8_line(line: bytes, base_offset: int, fallback_chain: List[s
return "".join(out_fragments), fixes
def targeted_fix_to_utf8(src_path: str, out_path: str, fallback_chain: List[str], dry_run: bool, max_fixes: int):
def targeted_fix_to_utf8(
src_path: str,
out_path: str,
fallback_chain: List[str],
dry_run: bool,
max_fixes: int,
):
with open(src_path, "rb") as fb:
data = fb.read()
@@ -261,10 +283,12 @@ def targeted_fix_to_utf8(src_path: str, out_path: str, fallback_chain: List[str]
# Log fixes
for f in fixes:
total_fixes += 1
print(f"[FIX {total_fixes}] Line {f['line']}, Column {f['column']}, Abs offset {f['abs_offset']}")
print(
f"[FIX {total_fixes}] Line {f['line']}, Column {f['column']}, Abs offset {f['abs_offset']}"
)
print(f" Bad bytes: 0x{f['bad_bytes_hex']}")
print(f" Used encoding: {f['used_encoding']}")
preview = f['replacement_preview'].replace("\r", "\\r").replace("\n", "\\n")
preview = f["replacement_preview"].replace("\r", "\\r").replace("\n", "\\n")
if len(preview) > 40:
preview = preview[:40] + ""
print(f" Replacement preview: {preview}")
@@ -299,6 +323,7 @@ def targeted_fix_to_utf8(src_path: str, out_path: str, fallback_chain: List[str]
# CLI
# -------------------------
def main():
ap = argparse.ArgumentParser(
description=(
@@ -310,26 +335,60 @@ def main():
formatter_class=argparse.RawTextHelpFormatter,
)
ap.add_argument("path", help="Path to the CSV/text file")
ap.add_argument("--context", type=int, default=20, help="Bytes of context to show around errors (default: 20)")
ap.add_argument("--limit", type=int, default=100, help="Max errors to report during scan (0 = unlimited)")
ap.add_argument("--skip-scan", action="store_true", help="Skip initial scan for speed")
ap.add_argument(
"--context",
type=int,
default=20,
help="Bytes of context to show around errors (default: 20)",
)
ap.add_argument(
"--limit",
type=int,
default=100,
help="Max errors to report during scan (0 = unlimited)",
)
ap.add_argument(
"--skip-scan", action="store_true", help="Skip initial scan for speed"
)
# Whole-file convert
ap.add_argument("--convert", action="store_true",
help="Convert entire file to UTF-8 using auto/forced encoding "
"(in-place by default; creates '<input>.bak').")
ap.add_argument("--encoding", help="Force source encoding for --convert or first fallback for --fix")
ap.add_argument("--output", help="Write to this path instead of in-place (no .bak is created in that case)")
ap.add_argument(
"--convert",
action="store_true",
help="Convert entire file to UTF-8 using auto/forced encoding "
"(in-place by default; creates '<input>.bak').",
)
ap.add_argument(
"--encoding",
help="Force source encoding for --convert or first fallback for --fix",
)
ap.add_argument(
"--output",
help="Write to this path instead of in-place (no .bak is created in that case)",
)
# Targeted fix
ap.add_argument("--fix", action="store_true",
help="Fix only invalid byte(s) via fallback encodings "
"(in-place by default; creates '<input>.bak').")
ap.add_argument("--fallbacks",
help="Comma-separated fallback encodings (default: cp1252,iso-8859-1,iso-8859-15)")
ap.add_argument("--dry-run", action="store_true",
help="(fix) Print fixes but do not write or create a .bak")
ap.add_argument("--max-fixes", type=int, default=0, help="(fix) Stop after N fixes (0 = unlimited)")
ap.add_argument(
"--fix",
action="store_true",
help="Fix only invalid byte(s) via fallback encodings "
"(in-place by default; creates '<input>.bak').",
)
ap.add_argument(
"--fallbacks",
help="Comma-separated fallback encodings (default: cp1252,iso-8859-1,iso-8859-15)",
)
ap.add_argument(
"--dry-run",
action="store_true",
help="(fix) Print fixes but do not write or create a .bak",
)
ap.add_argument(
"--max-fixes",
type=int,
default=0,
help="(fix) Stop after N fixes (0 = unlimited)",
)
args = ap.parse_args()
path = args.path