diff --git a/scripts/mx_tag_carriers.py b/scripts/mx_tag_carriers.py index 6d3a6a6..cec97f1 100644 --- a/scripts/mx_tag_carriers.py +++ b/scripts/mx_tag_carriers.py @@ -98,28 +98,27 @@ def main() -> int: domains = [r[0] for r in cur.fetchall() if r[0]] print(f"resolving MX for {len(domains)} distinct domains (concurrent)...", file=sys.stderr) - # Resolve concurrently -- DNS is I/O-bound, so a thread pool gives a huge - # speedup over the serial 3s-timeout-per-domain path. - results: dict[str, str] = {} - done = 0 - with ThreadPoolExecutor(max_workers=40) as ex: - for dom, prov in zip(domains, ex.map(classify, domains)): - results[dom] = prov - done += 1 - if done % 500 == 0: - print(f" resolved {done}/{len(domains)}", file=sys.stderr) - - # Batch-write the tags. + # Resolve concurrently and write incrementally as each completes, so a few + # slow/hung domains never hold up the whole batch (as_completed, not map). + from concurrent.futures import as_completed tagged = 0 - for dom, prov in results.items(): - cur.execute(""" - UPDATE fmcsa_carriers SET mx_provider = %s - WHERE lower(split_part(email_address, '@', 2)) = %s - AND mx_provider IS NULL - """, (prov, dom)) - tagged += 1 - if tagged % 500 == 0: - conn.commit() + with ThreadPoolExecutor(max_workers=60) as ex: + futs = {ex.submit(classify, d): d for d in domains} + for fut in as_completed(futs): + dom = futs[fut] + try: + prov = fut.result() + except Exception: + prov = "no_mx" + cur.execute(""" + UPDATE fmcsa_carriers SET mx_provider = %s + WHERE lower(split_part(email_address, '@', 2)) = %s + AND mx_provider IS NULL + """, (prov, dom)) + tagged += 1 + if tagged % 300 == 0: + conn.commit() + print(f" tagged {tagged}/{len(domains)} domains", file=sys.stderr) conn.commit() # Report the operator distribution of what we just tagged.