mx_tag: incremental write via as_completed (no straggler hold-up)

This commit is contained in:
justin 2026-06-14 21:24:46 -05:00
parent 60e6dc5d19
commit 5fd187a001

View file

@ -98,28 +98,27 @@ def main() -> int:
domains = [r[0] for r in cur.fetchall() if r[0]]
print(f"resolving MX for {len(domains)} distinct domains (concurrent)...", file=sys.stderr)
# Resolve concurrently -- DNS is I/O-bound, so a thread pool gives a huge
# speedup over the serial 3s-timeout-per-domain path.
results: dict[str, str] = {}
done = 0
with ThreadPoolExecutor(max_workers=40) as ex:
for dom, prov in zip(domains, ex.map(classify, domains)):
results[dom] = prov
done += 1
if done % 500 == 0:
print(f" resolved {done}/{len(domains)}", file=sys.stderr)
# Batch-write the tags.
# Resolve concurrently and write incrementally as each completes, so a few
# slow/hung domains never hold up the whole batch (as_completed, not map).
from concurrent.futures import as_completed
tagged = 0
for dom, prov in results.items():
cur.execute("""
UPDATE fmcsa_carriers SET mx_provider = %s
WHERE lower(split_part(email_address, '@', 2)) = %s
AND mx_provider IS NULL
""", (prov, dom))
tagged += 1
if tagged % 500 == 0:
conn.commit()
with ThreadPoolExecutor(max_workers=60) as ex:
futs = {ex.submit(classify, d): d for d in domains}
for fut in as_completed(futs):
dom = futs[fut]
try:
prov = fut.result()
except Exception:
prov = "no_mx"
cur.execute("""
UPDATE fmcsa_carriers SET mx_provider = %s
WHERE lower(split_part(email_address, '@', 2)) = %s
AND mx_provider IS NULL
""", (prov, dom))
tagged += 1
if tagged % 300 == 0:
conn.commit()
print(f" tagged {tagged}/{len(domains)} domains", file=sys.stderr)
conn.commit()
# Report the operator distribution of what we just tagged.