new-site/scripts/workers/usf_factor_monitor.py
justin f8cd37ac8c Initial commit — Performance West telecom compliance platform
Includes: API (Express/TypeScript), Astro site, Python workers,
document generators, FCC compliance tools, Canada CRTC formation,
Ansible infrastructure, and deployment scripts.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-27 06:54:22 -05:00

739 lines
30 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
USF Contribution Factor Monitor.
Daily cron that watches the USAC quarterly contribution factor page and
emails every customer with an FCC carrier when a new factor is announced
so they can update the USF line on their customer bills.
Why this exists
---------------
The FCC/USAC publishes a new federal Universal Service Fund contribution
factor each quarter via Public Notice, typically ~14 days before the
quarter starts. Customers who pass USF through to their end users must
update their billing systems the week it changes — small carriers often
miss this and end up under- or over-recovering.
We can't predict the exact day USAC posts, so we poll daily (9 AM
Chicago local is the recommended cron slot — FCC Public Notices usually
drop during East Coast business hours) and upsert each quarter into
``usf_contribution_factors`` (migration 049). When a quarter we haven't
seen before appears, we send a one-shot notification to every FCC
carrier customer and record ``notified_at`` so we don't repeat.
Source: https://www.usac.org/service-providers/making-payments/contribution-factors/
Usage
-----
python -m scripts.workers.usf_factor_monitor # one-shot
python -m scripts.workers.usf_factor_monitor --dry-run
CRON: 0 9 * * * python -m scripts.workers.usf_factor_monitor
Environment variables
---------------------
DATABASE_URL — PostgreSQL connection string
SMTP_HOST/PORT/USER/PASS — outbound mail (same vars as other workers)
FROM_EMAIL — defaults to notifications@performancewest.net
ADMIN_EMAIL — BCC target; defaults to ops@performancewest.net
USF_MONITOR_DRY_RUN — if truthy, skip sending emails
USAC_FACTORS_URL — override source URL (tests / staging)
"""
from __future__ import annotations
import argparse
import logging
import os
import re
import smtplib
import sys
import urllib.request
from collections import defaultdict
from datetime import date, datetime
from decimal import Decimal, InvalidOperation
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from typing import Iterable
import psycopg2
import psycopg2.extras
USAC_FACTORS_URL = os.environ.get(
"USAC_FACTORS_URL",
"https://www.usac.org/service-providers/making-payments/contribution-factors/",
)
# FCC VoIP safe-harbor percentage (interstate portion of VoIP revenue).
# Per 2026 Form 499-A Instructions p. 41. Interconnected VoIP providers
# that cannot separate interstate from intrastate usage may elect to
# treat this percentage of their revenue as interstate for USF purposes.
# Set by FCC; historically stable at 64.9% since the 2006 Interim
# Contribution Methodology Order (FCC 06-94).
VOIP_SAFE_HARBOR_PCT = Decimal(os.environ.get("VOIP_SAFE_HARBOR_PCT", "64.9"))
DATABASE_URL = os.environ.get("DATABASE_URL", "")
SMTP_HOST = os.environ.get("SMTP_HOST", "co.carrierone.com")
SMTP_PORT = int(os.environ.get("SMTP_PORT", "587"))
SMTP_USER = os.environ.get("SMTP_USER", "")
SMTP_PASS = os.environ.get("SMTP_PASS", "")
FROM_EMAIL = os.environ.get("FROM_EMAIL", "notifications@performancewest.net")
ADMIN_EMAIL = os.environ.get("ADMIN_EMAIL", "ops@performancewest.net")
log = logging.getLogger("usf_factor_monitor")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[logging.StreamHandler(sys.stdout)],
)
# ─── Fetch + parse ────────────────────────────────────────────────────────
# Matches quarterly labels as they appear on the USAC page.
# Examples seen on the real page: "April June 2026", "January March 2026",
# "October December 2025". Support abbreviated month names too in case the
# page styling changes.
_QUARTER_LABEL_RE = re.compile(
r"(?P<start>January|April|July|October|Jan|Apr|Jul|Oct)"
r"\s*[-\u2013\u2014]\s*"
r"(?P<end>March|June|September|December|Mar|Jun|Sep|Dec)"
r"\s+(?P<year>\d{4})",
re.IGNORECASE,
)
_MONTH_TO_QUARTER = {
"Jan": 1, "January": 1,
"Apr": 2, "April": 2,
"Jul": 3, "July": 3,
"Oct": 4, "October": 4,
}
_QUARTER_DATE_RANGES = {
1: (date(1, 1, 1), date(1, 3, 31)), # placeholder year
2: (date(1, 4, 1), date(1, 6, 30)),
3: (date(1, 7, 1), date(1, 9, 30)),
4: (date(1, 10, 1), date(1, 12, 31)),
}
def _percent_to_decimal(text: str) -> Decimal | None:
text = (text or "").strip().rstrip("%").strip()
if not text:
return None
try:
return Decimal(text)
except InvalidOperation:
return None
def fetch_factor_rows(url: str = USAC_FACTORS_URL) -> list[dict]:
"""Return a list of {year, quarter, factor_pct, circularity_pct, notice} dicts."""
from bs4 import BeautifulSoup
req = urllib.request.Request(
url,
headers={"User-Agent": "Mozilla/5.0 (PerformanceWest USF Monitor)"},
)
with urllib.request.urlopen(req, timeout=30) as resp:
html = resp.read().decode("utf-8", errors="replace")
soup = BeautifulSoup(html, "lxml")
rows: list[dict] = []
for table in soup.find_all("table"):
# Identify the factors table by looking for the expected header cells.
headers = [
(h.get_text(strip=True) or "").lower()
for h in table.find_all(["th", "td"], limit=6)
]
header_text = " ".join(headers)
if "contribution factor" not in header_text:
continue
for tr in table.find_all("tr"):
cells = tr.find_all(["td", "th"])
if len(cells) < 2:
continue
label_text = cells[0].get_text(" ", strip=True)
m = _QUARTER_LABEL_RE.search(label_text)
if not m:
continue
year = int(m.group("year"))
start_name = m.group("start").title()
# Normalize abbreviations to full names so the lookup hits
start_name = {"Jan": "January", "Apr": "April", "Jul": "July", "Oct": "October"}.get(start_name, start_name)
quarter = _MONTH_TO_QUARTER[start_name]
factor = _percent_to_decimal(cells[1].get_text(" ", strip=True))
if factor is None:
continue
circ = None
if len(cells) >= 3:
circ = _percent_to_decimal(cells[2].get_text(" ", strip=True))
notice = ""
for cell in cells[3:]:
link = cell.find("a")
if link:
notice = link.get("href", "") or cell.get_text(" ", strip=True)
break
rows.append({
"year": year,
"quarter": quarter,
"factor_pct": factor,
"circularity_pct": circ,
"fcc_public_notice": notice,
})
return rows
# ─── Upsert + new-quarter detection ───────────────────────────────────────
def upsert_factors(conn, rows: Iterable[dict]) -> list[dict]:
"""Upsert each factor row and return the ones that were NEW (not previously in DB)."""
new_rows: list[dict] = []
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
for row in rows:
start_m, _ = _QUARTER_DATE_RANGES[row["quarter"]]
end_m = _QUARTER_DATE_RANGES[row["quarter"]][1]
eff_start = date(row["year"], start_m.month, 1)
eff_end = date(row["year"], end_m.month, end_m.day)
cur.execute(
"""
SELECT id, factor_pct, notified_at
FROM usf_contribution_factors
WHERE year = %s AND quarter = %s
""",
(row["year"], row["quarter"]),
)
existing = cur.fetchone()
if existing is None:
cur.execute(
"""
INSERT INTO usf_contribution_factors
(year, quarter, effective_start, effective_end,
factor_pct, circularity_pct, fcc_public_notice)
VALUES (%s, %s, %s, %s, %s, %s, %s)
RETURNING id
""",
(
row["year"], row["quarter"], eff_start, eff_end,
row["factor_pct"], row["circularity_pct"],
row["fcc_public_notice"] or None,
),
)
row_with_eff = dict(row, effective_start=eff_start, effective_end=eff_end)
new_rows.append(row_with_eff)
log.info(
"Discovered new USF factor: %s Q%s = %s%%",
row["year"], row["quarter"], row["factor_pct"],
)
elif Decimal(existing["factor_pct"]) != Decimal(row["factor_pct"]):
# USAC sometimes corrects a posted factor; update but treat as
# a new notification event so customers hear about the fix.
cur.execute(
"""
UPDATE usf_contribution_factors
SET factor_pct = %s,
circularity_pct = %s,
fcc_public_notice = %s,
notified_at = NULL
WHERE id = %s
""",
(
row["factor_pct"], row["circularity_pct"],
row["fcc_public_notice"] or None, existing["id"],
),
)
row_with_eff = dict(row, effective_start=eff_start, effective_end=eff_end)
new_rows.append(row_with_eff)
log.info(
"USF factor CORRECTED: %s Q%s %s%%%s%%",
row["year"], row["quarter"],
existing["factor_pct"], row["factor_pct"],
)
conn.commit()
return new_rows
def fetch_previous_factor(conn, year: int, quarter: int) -> dict | None:
"""Return the factor immediately before (year, quarter) chronologically."""
prev_quarter = quarter - 1
prev_year = year
if prev_quarter == 0:
prev_quarter = 4
prev_year = year - 1
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(
"""
SELECT year, quarter, factor_pct, circularity_pct
FROM usf_contribution_factors
WHERE year = %s AND quarter = %s
""",
(prev_year, prev_quarter),
)
return cur.fetchone()
# ─── Customer lookup ──────────────────────────────────────────────────────
def fetch_fcc_carrier_recipients(conn) -> dict[str, dict]:
"""Return {email_lower: {name, entities: [ {legal_name, frn, filer_id_499} ]}}.
Pulls paid compliance customers and any telecom_entity with a
customer contact email. FCC jurisdiction only — CRTC carriers are
excluded because USF doesn't apply to them.
"""
recipients: dict[str, dict] = {}
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(
"""
SELECT DISTINCT
te.id,
te.legal_name,
te.frn,
te.filer_id_499,
LOWER(COALESCE(
(SELECT co.customer_email
FROM compliance_orders co
WHERE co.telecom_entity_id = te.id
AND co.payment_status = 'paid'
ORDER BY co.created_at DESC
LIMIT 1),
te.contact_email
)) AS email,
COALESCE(
(SELECT co.customer_name
FROM compliance_orders co
WHERE co.telecom_entity_id = te.id
AND co.payment_status = 'paid'
ORDER BY co.created_at DESC
LIMIT 1),
te.contact_name
) AS customer_name
FROM telecom_entities te
WHERE te.jurisdiction = 'FCC'
AND te.active = TRUE
"""
)
for row in cur.fetchall():
email = (row.get("email") or "").strip()
if not email or "@" not in email:
continue
bucket = recipients.setdefault(
email,
{"name": row.get("customer_name") or "", "entities": []},
)
bucket["entities"].append({
"legal_name": row.get("legal_name") or "",
"frn": row.get("frn") or "",
"filer_id_499": row.get("filer_id_499") or "",
})
return recipients
# ─── Email ────────────────────────────────────────────────────────────────
_EMAIL_HTML = """\
<!DOCTYPE html>
<html>
<head><meta charset="utf-8"></head>
<body style="font-family:Arial,Helvetica,sans-serif;color:#1f2937;background:#f4f4f4;margin:0;padding:0;">
<table width="100%" cellpadding="0" cellspacing="0" style="max-width:640px;margin:24px auto;background:#ffffff;border-radius:8px;overflow:hidden;">
<tr><td style="background:#1a2744;padding:24px 32px;">
<h1 style="margin:0;color:#ffffff;font-size:22px;">Performance West Inc.</h1>
<p style="margin:4px 0 0;color:#8fa4c8;font-size:13px;">FCC/USAC Regulatory Notice</p>
</td></tr>
<tr><td style="padding:28px 32px;">
<h2 style="margin:0 0 12px;color:#1a2744;font-size:19px;">
New USF Contribution Factor — {year} Q{quarter}
</h2>
<p style="margin:0 0 16px;font-size:15px;">Hi {customer_name},</p>
<p style="margin:0 0 16px;font-size:15px;">
USAC has published the federal Universal Service Fund contribution factor
effective <strong>{effective_start_human}</strong> through
<strong>{effective_end_human}</strong>. If you pass USF charges through
to your end users, update your billing system this week.
</p>
<table width="100%" cellpadding="10" cellspacing="0" style="border:1px solid #e2e8f0;border-radius:6px;margin-bottom:20px;">
<tr style="background:#f8fafc;">
<th align="left" style="font-size:12px;color:#475569;text-transform:uppercase;letter-spacing:.04em;">Quarter</th>
<th align="right" style="font-size:12px;color:#475569;text-transform:uppercase;letter-spacing:.04em;">Factor</th>
<th align="right" style="font-size:12px;color:#475569;text-transform:uppercase;letter-spacing:.04em;">Circularity</th>
</tr>
<tr>
<td style="font-size:15px;"><strong>{year} Q{quarter}</strong> (new)</td>
<td align="right" style="font-size:18px;font-weight:700;color:#059669;">{factor_pct_display}</td>
<td align="right" style="font-size:14px;">{circularity_display}</td>
</tr>
{previous_row}
{change_row}
</table>
<h3 style="margin:18px 0 8px;color:#1a2744;font-size:15px;">Your carriers on file</h3>
<ul style="margin:0 0 18px 20px;font-size:14px;">
{carrier_list}
</ul>
<h3 style="margin:18px 0 8px;color:#1a2744;font-size:15px;">What to do this week</h3>
<ul style="margin:0 0 18px 20px;font-size:14px;">
<li>Update the USF surcharge line on customer invoices to <strong>{factor_pct_display}</strong>, applied to the interstate/international portion of telecom revenue.</li>
<li>If you use the circularity factor to back out USF-on-USF, switch to <strong>{circularity_display}</strong>.</li>
<li>Confirm your billing system uses an effective date of <strong>{effective_start_human}</strong> — charges billed before that date still use the prior quarter's factor.</li>
<li>Note: Under 47 CFR &sect; 54.712(a), the customer-facing surcharge cannot exceed the contribution factor.</li>
</ul>
<h3 style="margin:18px 0 8px;color:#1a2744;font-size:15px;">
How to charge this on your customer bills (interconnected VoIP — Safe Harbor)
</h3>
<p style="margin:0 0 10px;font-size:14px;">
USF is only assessed on the <strong>interstate and international</strong>
portion of your telecom revenue — not the intrastate portion. For
traditional wireline carriers, "interstate vs. intrastate" is determined
by the physical endpoints of the call. For <strong>interconnected VoIP</strong>,
the endpoints aren't reliably knowable because calls are IP-routed and
the called party's location can change with their device. Rather than
require every VoIP provider to run a traffic study, the FCC lets you
elect the <strong>VoIP Safe Harbor</strong> under 47 CFR &sect; 54.706(d) /
FCC 06-94:
</p>
<p style="margin:0 0 10px;font-size:14px;padding:10px 14px;background:#f8fafc;border-left:3px solid #2d6cdf;">
<strong>Treat {safe_harbor_pct_display} of your VoIP revenue as interstate</strong> — no
traffic study required. The remaining <strong>{safe_harbor_intrastate_display}</strong>
is treated as intrastate and is not subject to federal USF.
</p>
<p style="margin:0 0 10px;font-size:14px;">
The customer-facing surcharge calculation — apply it to the
<em>assessable</em> portion of each VoIP line item, not the whole bill:
</p>
<table width="100%" cellpadding="10" cellspacing="0" style="border:1px solid #e2e8f0;border-radius:6px;margin-bottom:16px;background:#fffbea;">
<tr>
<td style="font-size:14px;font-family:'Courier New',monospace;">
USF surcharge = (VoIP line amount) &times; {safe_harbor_pct_display} (safe harbor)
&times; <strong>{factor_pct_display}</strong> (new factor)
</td>
</tr>
</table>
<p style="margin:0 0 10px;font-size:14px;"><strong>Worked example</strong> on a $100 monthly VoIP charge:</p>
<ul style="margin:0 0 16px 20px;font-size:14px;">
<li>Assessable portion: $100 &times; {safe_harbor_pct_display} = <strong>${example_assessable}</strong></li>
<li>USF surcharge: ${example_assessable} &times; {factor_pct_display} = <strong>${example_surcharge}</strong>{example_prior_note}</li>
<li>Line on customer bill: <em>"Federal Universal Service Fund"</em> = <strong>${example_surcharge}</strong></li>
</ul>
<p style="margin:0 0 10px;font-size:13px;color:#475569;">
<strong>Alternative if you DO have a traffic study:</strong> you can use
your actual interstate percentage instead of {safe_harbor_pct_display}. You must keep
the study on file for USAC audits (retained for 5 years per
47 CFR &sect; 54.711(a)). Small interconnected VoIP providers typically
stay on the safe harbor because the study cost exceeds the surcharge
savings. Your election is made once per filing year on 499-A Line 418.
</p>
<p style="margin:0 0 10px;font-size:13px;color:#475569;">
<strong>State-level USF:</strong> many states also assess their own USF
on the intrastate portion (the {safe_harbor_intrastate_display} that isn't interstate).
This notice only covers the federal factor — check your state PUC for
intrastate surcharges.
</p>
{notice_block}
<p style="margin:20px 0 0;font-size:13px;color:#64748b;">
Source: <a href="https://www.usac.org/service-providers/making-payments/contribution-factors/" style="color:#2d6cdf;">USAC Contribution Factors</a>.
This notice is sent automatically whenever USAC posts a new factor. Questions? Reply to this email.
</p>
</td></tr>
<tr><td style="background:#f0f0f0;padding:14px 32px;text-align:center;">
<p style="margin:0;color:#999;font-size:11px;">&copy; {current_year} Performance West Inc.</p>
</td></tr>
</table>
</body>
</html>
"""
def _quarter_start_human(year: int, quarter: int) -> str:
month = _QUARTER_DATE_RANGES[quarter][0].month
return date(year, month, 1).strftime("%B %-d, %Y")
def _quarter_end_human(year: int, quarter: int) -> str:
end = _QUARTER_DATE_RANGES[quarter][1]
return date(year, end.month, end.day).strftime("%B %-d, %Y")
def render_email(
*,
customer_name: str,
carriers: list[dict],
factor_row: dict,
previous_row: dict | None,
voip_safe_harbor_pct: Decimal = VOIP_SAFE_HARBOR_PCT,
) -> tuple[str, str]:
"""Return (subject, html_body)."""
year = factor_row["year"]
quarter = factor_row["quarter"]
factor_pct = Decimal(factor_row["factor_pct"])
circ_pct = factor_row.get("circularity_pct")
factor_display = f"{factor_pct:.1f}%"
circ_display = f"{Decimal(circ_pct):.4f}%" if circ_pct is not None else "n/a"
# Safe-harbor math — computed at render time so the quarterly factor
# and any future change to VOIP_SAFE_HARBOR_PCT flow through cleanly.
safe_harbor_display = f"{voip_safe_harbor_pct:.1f}%"
safe_harbor_intrastate = (Decimal("100") - voip_safe_harbor_pct).quantize(Decimal("0.1"))
safe_harbor_intrastate_display = f"{safe_harbor_intrastate}%"
# Worked example on a $100 monthly VoIP charge.
example_base = Decimal("100.00")
example_assessable = (
example_base * voip_safe_harbor_pct / Decimal("100")
).quantize(Decimal("0.01"))
example_surcharge = (
example_assessable * factor_pct / Decimal("100")
).quantize(Decimal("0.01"))
# If we know the prior factor, show what the surcharge WAS for contrast.
example_prior_note = ""
if previous_row:
prev_factor = Decimal(previous_row["factor_pct"])
prev_surcharge = (
example_assessable * prev_factor / Decimal("100")
).quantize(Decimal("0.01"))
delta = example_surcharge - prev_surcharge
sign = "+" if delta > 0 else ""
example_prior_note = (
f' <span style="color:#64748b;">'
f'(prior quarter was ${prev_surcharge}; {sign}${delta} per $100 billed)</span>'
)
previous_html = ""
change_html = ""
if previous_row:
prev_pct = Decimal(previous_row["factor_pct"])
previous_html = (
f'<tr><td style="font-size:14px;color:#64748b;">'
f'{previous_row["year"]} Q{previous_row["quarter"]} (prior)</td>'
f'<td align="right" style="font-size:14px;color:#64748b;">{prev_pct:.1f}%</td>'
f'<td align="right" style="font-size:14px;color:#64748b;">'
f'{Decimal(previous_row["circularity_pct"]):.4f}%</td></tr>'
if previous_row.get("circularity_pct") is not None
else f'<tr><td style="font-size:14px;color:#64748b;">'
f'{previous_row["year"]} Q{previous_row["quarter"]} (prior)</td>'
f'<td align="right" style="font-size:14px;color:#64748b;">{prev_pct:.1f}%</td>'
f'<td align="right" style="font-size:14px;color:#64748b;">n/a</td></tr>'
)
delta = factor_pct - prev_pct
bps = int(delta * 100) # percentage-point → basis-point display
direction = "increase" if bps > 0 else ("decrease" if bps < 0 else "unchanged")
color = "#b91c1c" if bps > 0 else ("#065f46" if bps < 0 else "#475569")
change_html = (
f'<tr><td style="font-size:13px;color:{color};" colspan="3">'
f'<strong>Change: {delta:+.1f} pp ({direction})</strong> vs. prior quarter</td></tr>'
)
else:
change_html = (
'<tr><td style="font-size:13px;color:#475569;" colspan="3">'
'Prior quarter factor not on file — delta unavailable.</td></tr>'
)
carrier_items = "".join(
f'<li><strong>{c["legal_name"]}</strong>'
+ (f' &mdash; FRN {c["frn"]}' if c.get("frn") else "")
+ (f', Filer ID {c["filer_id_499"]}' if c.get("filer_id_499") else "")
+ "</li>"
for c in carriers
) or "<li>(none on file — contact us if this is unexpected)</li>"
notice_block = ""
if factor_row.get("fcc_public_notice"):
href = factor_row["fcc_public_notice"]
notice_block = (
f'<p style="font-size:13px;color:#475569;">FCC Public Notice: '
f'<a href="{href}" style="color:#2d6cdf;">{href}</a></p>'
)
html = _EMAIL_HTML.format(
year=year,
quarter=quarter,
customer_name=customer_name or "there",
effective_start_human=_quarter_start_human(year, quarter),
effective_end_human=_quarter_end_human(year, quarter),
factor_pct_display=factor_display,
circularity_display=circ_display,
previous_row=previous_html,
change_row=change_html,
carrier_list=carrier_items,
notice_block=notice_block,
current_year=datetime.utcnow().year,
safe_harbor_pct_display=safe_harbor_display,
safe_harbor_intrastate_display=safe_harbor_intrastate_display,
example_assessable=str(example_assessable),
example_surcharge=str(example_surcharge),
example_prior_note=example_prior_note,
)
direction_word = ""
if previous_row:
prev_pct = Decimal(previous_row["factor_pct"])
if factor_pct > prev_pct:
direction_word = " (up)"
elif factor_pct < prev_pct:
direction_word = " (down)"
else:
direction_word = " (unchanged)"
subject = f"USF Contribution Factor {year} Q{quarter}: {factor_display}{direction_word}"
return subject, html
def send_email(*, to_email: str, subject: str, html_body: str, dry_run: bool) -> bool:
if dry_run:
log.info("[DRY RUN] would send to %s: %s", to_email, subject)
return True
if not SMTP_USER or not SMTP_PASS:
log.warning("SMTP not configured — skipping email to %s", to_email)
return False
msg = MIMEMultipart("alternative")
msg["Subject"] = subject
msg["From"] = FROM_EMAIL
msg["To"] = to_email
msg["Bcc"] = ADMIN_EMAIL
msg.attach(MIMEText(html_body, "html"))
try:
with smtplib.SMTP(SMTP_HOST, SMTP_PORT) as server:
if SMTP_PORT != 25:
server.starttls()
server.login(SMTP_USER, SMTP_PASS)
server.sendmail(FROM_EMAIL, [to_email, ADMIN_EMAIL], msg.as_string())
log.info("Sent USF factor notice to %s", to_email)
return True
except Exception as exc:
log.error("Failed to send to %s: %s", to_email, exc)
return False
def mark_notified(conn, year: int, quarter: int, recipient_count: int) -> None:
with conn.cursor() as cur:
cur.execute(
"""
UPDATE usf_contribution_factors
SET notified_at = NOW(),
notification_count = notification_count + %s
WHERE year = %s AND quarter = %s
""",
(recipient_count, year, quarter),
)
conn.commit()
# ─── Main ─────────────────────────────────────────────────────────────────
def run_once(dry_run: bool = False) -> dict:
"""Fetch, upsert, notify. Returns summary dict for cron log."""
summary = {
"rows_seen": 0,
"new_factors": 0,
"recipients_notified": 0,
"emails_sent": 0,
"errors": 0,
}
if not DATABASE_URL:
log.error("DATABASE_URL not set — aborting")
summary["errors"] += 1
return summary
try:
rows = fetch_factor_rows(USAC_FACTORS_URL)
except Exception as exc:
log.error("Could not fetch USAC factors page: %s", exc)
summary["errors"] += 1
return summary
summary["rows_seen"] = len(rows)
if not rows:
log.warning("USAC factors page returned no parseable rows")
return summary
conn = psycopg2.connect(DATABASE_URL)
try:
new_rows = upsert_factors(conn, rows)
summary["new_factors"] = len(new_rows)
if not new_rows:
log.info("No new factors to announce — %d rows on page", len(rows))
return summary
# Notify only for factors we haven't already emailed about.
for factor_row in new_rows:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(
"SELECT notified_at FROM usf_contribution_factors "
"WHERE year = %s AND quarter = %s",
(factor_row["year"], factor_row["quarter"]),
)
record = cur.fetchone()
if record and record.get("notified_at"):
log.info(
"Already notified for %s Q%s; skipping",
factor_row["year"], factor_row["quarter"],
)
continue
previous = fetch_previous_factor(
conn, factor_row["year"], factor_row["quarter"]
)
recipients = fetch_fcc_carrier_recipients(conn)
summary["recipients_notified"] += len(recipients)
for email, bucket in recipients.items():
subject, html = render_email(
customer_name=bucket["name"],
carriers=bucket["entities"],
factor_row=factor_row,
previous_row=previous,
)
if send_email(
to_email=email, subject=subject,
html_body=html, dry_run=dry_run,
):
summary["emails_sent"] += 1
else:
summary["errors"] += 1
if not dry_run:
mark_notified(
conn, factor_row["year"], factor_row["quarter"],
len(recipients),
)
finally:
conn.close()
log.info(
"USF monitor cycle: rows_seen=%s new=%s recipients=%s sent=%s errors=%s",
summary["rows_seen"], summary["new_factors"],
summary["recipients_notified"], summary["emails_sent"], summary["errors"],
)
return summary
def main() -> None:
parser = argparse.ArgumentParser(description="USF contribution factor monitor.")
parser.add_argument(
"--dry-run",
action="store_true",
default=bool(os.environ.get("USF_MONITOR_DRY_RUN")),
help="Parse + upsert but don't send emails.",
)
args = parser.parse_args()
run_once(dry_run=args.dry_run)
if __name__ == "__main__":
main()