mx_tag: bulk UPDATE via temp-table join (per-domain UPDATE full-scanned 1.49M rows each time)

The real bottleneck was the write, not DNS: each per-domain UPDATE full-scanned
fmcsa_carriers (no functional index on lower(split_part(email,'@',2))). Resolve
all domains concurrently into a list, load a temp table, then ONE join-UPDATE =
single table scan. Tags ~12k domains -> hundreds of thousands of carriers fast.
This commit is contained in:
justin 2026-06-14 21:29:52 -05:00
parent fd1522abee
commit 3fbfcbfaab

View file

@ -97,27 +97,36 @@ def main() -> int:
domains = [r[0] for r in cur.fetchall() if r[0]] domains = [r[0] for r in cur.fetchall() if r[0]]
print(f"resolving MX for {len(domains)} distinct domains (concurrent)...", file=sys.stderr) print(f"resolving MX for {len(domains)} distinct domains (concurrent)...", file=sys.stderr)
# Resolve concurrently and write incrementally as each completes, so a few # Resolve concurrently (DNS is I/O-bound; per-call Resolver = thread-safe).
# slow/hung domains never hold up the whole batch (as_completed, not map).
from concurrent.futures import as_completed from concurrent.futures import as_completed
tagged = 0 resolved: list[tuple[str, str]] = []
with ThreadPoolExecutor(max_workers=60) as ex: with ThreadPoolExecutor(max_workers=60) as ex:
futs = {ex.submit(classify, d): d for d in domains} futs = {ex.submit(classify, d): d for d in domains}
for fut in as_completed(futs): for i, fut in enumerate(as_completed(futs), 1):
dom = futs[fut] dom = futs[fut]
try: try:
prov = fut.result() resolved.append((dom, fut.result()))
except Exception: except Exception:
prov = "no_mx" resolved.append((dom, "no_mx"))
cur.execute(""" if i % 1000 == 0:
UPDATE fmcsa_carriers SET mx_provider = %s print(f" resolved {i}/{len(domains)}", file=sys.stderr)
WHERE lower(split_part(email_address, '@', 2)) = %s
AND mx_provider IS NULL # ONE bulk UPDATE via a temp table + join. The per-domain UPDATE was doing a
""", (prov, dom)) # full 1.49M-row scan EACH time (no functional index on the email-domain
tagged += 1 # expression); the join scans the table once.
if tagged % 300 == 0: print(f"bulk-writing {len(resolved)} domain tags...", file=sys.stderr)
conn.commit() cur.execute("CREATE TEMP TABLE _mx_map (domain text PRIMARY KEY, provider text) ON COMMIT DROP")
print(f" tagged {tagged}/{len(domains)} domains", file=sys.stderr) from psycopg2.extras import execute_values
execute_values(cur, "INSERT INTO _mx_map (domain, provider) VALUES %s ON CONFLICT (domain) DO NOTHING",
resolved, page_size=1000)
cur.execute("""
UPDATE fmcsa_carriers f
SET mx_provider = m.provider
FROM _mx_map m
WHERE lower(split_part(f.email_address, '@', 2)) = m.domain
AND f.mx_provider IS NULL
""")
tagged = cur.rowcount
conn.commit() conn.commit()
# Report the operator distribution of what we just tagged. # Report the operator distribution of what we just tagged.