Technical writing
Federal Dataset Ingest: Keeping 208 Federal Datasets Fresh at the Edge
A federal regulatory data platform is only as good as its freshness. A compliance team running due diligence on a new vendor needs OFAC's SDN list from this morning, not last week. A journalist investigating an EPA enforcement case needs to know whether it's still open or resolved. The D1 database that stores 208 datasets across 45 agencies is described elsewhere; this post covers the pipeline that keeps it current.
The ingest architecture
Every dataset has a Cloudflare Worker cron trigger. The cron fires once daily for most datasets, more frequently for the highest-stakes sources:
- Every 4 hours: OFAC SDN, OFAC Non-SDN, SAM.gov exclusions, FinCEN 314(a)
- Every 6 hours: HHS-OIG exclusions, SEC enforcement actions, CISA KEV
- Daily at 07:00 UTC: Most datasets — EPA, OSHA, MSHA, DOJ, CFPB, FDA, FINRA, CFTC, PCAOB, IRS, FDIC, OCC, NHTSA, CPSC, FTC, and the 150+ others
- Weekly: Large static datasets that change infrequently — GLEIF LEI universe, USAspending historical awards, NBER county datasets
The cron worker for each dataset is intentionally separate — no shared Worker process. A hung or erroring OFAC ingest doesn't block the EPA ingest. Cloudflare Cron Triggers execute Workers in their own isolated runtime with a 30-minute wall-clock limit per invocation.
Three source categories
Federal datasets come in three structural categories, each needing a different ingest approach:
Structured REST APIs
The cleanest sources. CISA KEV, SAM.gov, USAspending, and SEC EDGAR all expose JSON or XML over a documented REST API with pagination and optionallastModified filtering. These are the easiest to keep fresh — the API tells us what changed:
# SAM.gov exclusions — paginated JSON API with delta support
async function ingest_sam_exclusions() {
const last_ingest = await kv.get("sam_exclusions:last_ingest_at")
// Request only records modified since last run
const params = new URLSearchParams({
api_key: env.SAM_API_KEY,
isActive: "Yes",
fromDate: last_ingest ?? "2012-01-01",
toDate: today_iso(),
limit: "100",
offset: "0",
})
let page = 0
while (true) {
params.set("offset", String(page * 100))
const resp = await fetch(`https://api.sam.gov/exclusions/v1/search?${params}`)
const { totalRecords, exclusionList } = await resp.json()
for (const record of exclusionList) {
await d1.prepare(
"INSERT OR REPLACE INTO sam_exclusions VALUES (?, ?, ?, ?, ?, ?)"
).bind(record.uei, record.legalBusinessName, record.exclusionType,
record.activeDate, record.terminationDate, today_iso()).run()
}
if ((page + 1) * 100 >= totalRecords) break
page++
}
await kv.put("sam_exclusions:last_ingest_at", today_iso())
}Bulk file downloads
Most agencies don't have a REST API. They publish a bulk CSV, XML, or ZIP file that contains the full current dataset. OFAC publishes the SDN as a 15MB XML. The EPA ECHO database publishes quarterly CSV dumps. HHS-OIG publishes monthly exclusion lists as a CSV download.
For bulk downloads, we use HTTP conditional GET — sending If-Modified-Sinceand If-None-Match headers to avoid downloading the full file when nothing changed. Most government file servers respect these correctly:
async function ingest_ofac_sdn() {
const etag_stored = await kv.get("ofac_sdn:etag")
const resp = await fetch("https://sanctionslistservice.ofac.treas.gov/api/PublicationPreview/exports/SDN_XML.zip", {
headers: {
"If-None-Match": etag_stored ?? "",
"User-Agent": "AI-Analytics-Regulatory-Hub/1.0 (regulatory-data@ai-analytics.org)",
}
})
if (resp.status === 304) {
// No change since last ingest; update timestamp but skip parse
await kv.put("ofac_sdn:last_check_at", now_iso())
return { skipped: true }
}
const etag_new = resp.headers.get("ETag")
const zip_buf = await resp.arrayBuffer()
const xml = await unzip_single(zip_buf, "sdn.xml")
await parse_and_upsert_sdn(xml)
await kv.put("ofac_sdn:etag", etag_new)
await kv.put("ofac_sdn:last_ingest_at", now_iso())
}Scraping and unofficial feeds
Some agencies have no API and no stable bulk export. FTC enforcement action pages are HTML. Some OIG exclusion supplements are published as PDFs. PCAOB disciplinary orders are individual HTML pages linked from a JavaScript-rendered list.
For these we maintain lightweight scrapers that run as Cloudflare Workers. The scraper fetches the index page, parses the HTML for new record links, and for each new link fetches and extracts the structured data we need. These are the most fragile ingest pipelines — a HTML redesign breaks them — so they have more aggressive monitoring and a shorter staleness threshold that triggers an alert.
Schema drift detection
Government datasets change their schemas without notice. A CSV that had 23 columns in October has 25 columns in November. A JSON API adds a new field that breaks a strict parser. An XML schema changes an attribute name from nameto entityName.
We detect schema drift by comparing the incoming record structure against a stored schema fingerprint:
function check_schema_drift(
dataset_id: string,
incoming_record: Record<string, unknown>,
stored_schema: StoredSchema
): SchemaDriftResult {
const incoming_keys = new Set(Object.keys(incoming_record))
const expected_keys = new Set(stored_schema.required_fields)
const added = incoming_keys.difference(expected_keys)
const removed = expected_keys.difference(incoming_keys)
if (removed.size > 0) {
// Missing required fields — likely schema break
return {
status: "BREAKING_DRIFT",
added: [...added],
removed: [...removed],
}
}
if (added.size > 0) {
// New fields — additive, non-breaking; update schema fingerprint
return {
status: "ADDITIVE_DRIFT",
added: [...added],
}
}
return { status: "OK" }
}A BREAKING_DRIFT result halts the ingest, fires an alert, and preserves the last-known-good state in D1 rather than overwriting with broken records. An ADDITIVE_DRIFT result logs the new fields, stores them in D1's flexible extra_fields JSON column, and updates the schema fingerprint in KV.
Schema fingerprints are stored as JSON in Cloudflare KV:
{
"dataset_id": "ofac_sdn",
"required_fields": ["uid", "lastName", "sdnType", "programs", "ids", "addresses"],
"optional_fields": ["firstName", "title", "remarks", "dateOfBirth", "placeOfBirth"],
"last_updated": "2025-11-03T00:00:00Z",
"version": 47
}Delta detection for bulk sources
For bulk-download datasets, the entire file is new every time — there's no built-in delta. We compute our own deltas using a record-hash approach:
# For each record in the incoming bulk file:
# 1. Compute a stable hash of the record content (normalized, sorted keys)
# 2. Compare against the stored hash for that record's primary key
# 3. Only write to D1 if the hash changed or the record is new
async function upsert_with_delta(
table: string,
pk_field: string,
record: Record<string, unknown>,
db: D1Database,
kv: KVNamespace
) {
const pk = record[pk_field] as string
const record_hash = await sha256_hex(stable_stringify(record))
const stored_hash = await kv.get(`${table}:${pk}:hash`)
if (stored_hash === record_hash) return // no change
await db.prepare(`INSERT OR REPLACE INTO ${table} VALUES (${placeholders(record)})`)
.bind(...Object.values(record)).run()
await kv.put(`${table}:${pk}:hash`, record_hash)
return "updated"
}This means daily D1 writes are proportional to what actually changed, not to the full dataset size. OFAC SDN has ~12,000 entries; on a typical day 5–20 records are added, modified, or removed. We write those 5–20, not 12,000.
Error handling and retry budgets
Government data sources go down. data.gov has planned maintenance windows. OFAC occasionally updates their CDN configuration in ways that temporarily break direct downloads. The ingest workers handle this with per-source retry budgets:
- Transient errors (5xx, timeout): Retry with exponential backoff — 2min, 8min, 30min — up to 3 retries per invocation.
- Consecutive failure threshold: If a dataset fails on 3 consecutive scheduled runs, fire a staleness alert (email + Slack). The D1 table continues serving its last-known-good state.
- Critical-list threshold: OFAC SDN, SAM.gov exclusions, and HHS-OIG exclusions have a tighter threshold — 1 consecutive failure fires an immediate alert given their compliance criticality.
Staleness is surfaced in the API response for every record via adata_freshness field:
{
"dataset": "ofac_sdn",
"last_ingest_at": "2026-01-15T04:00:00Z",
"last_modified_at": "2026-01-15T10:30:00Z", // from OFAC ETag
"status": "fresh", // fresh | stale | error
"records": 12483
}Timing: when government sources actually update
A practical lesson: not all daily datasets are refreshed at midnight. Federal agencies update their data during business hours, often mid-morning EST:
- OFAC SDN: Updates typically published 10:00–11:30 AM ET on business days. Our 4-hour cron catches same-day updates by the afternoon run (14:00 ET).
- SEC EDGAR: New filings become searchable on EDGAR within 10–30 minutes of submission. Their full-text search index lags by ~4 hours.
- DOJ press releases: Published throughout the business day; our 07:00 UTC cron misses same-day DOJ releases until the next day. The 19:00 UTC run catches US business day releases.
- CISA KEV: Updates can be published at any time, including weekends. The 6-hour cron is appropriate here.
We schedule the primary daily cron at 07:00 UTC (overnight for US time zones) to avoid hitting agency servers during peak hours, then run a second pass at 19:00 UTC (mid-afternoon US) for datasets that update during business hours.
Monitoring and freshness dashboard
Each ingest result writes a row to an ingest_log D1 table:
CREATE TABLE ingest_log ( id INTEGER PRIMARY KEY, dataset_id TEXT NOT NULL, started_at TEXT NOT NULL, completed_at TEXT, status TEXT NOT NULL, -- success | skipped | error | schema_drift records_written INTEGER DEFAULT 0, error_message TEXT, etag_changed INTEGER DEFAULT 0 -- 1 if source had new content );
A public freshness endpoint at api.ai-analytics.org/freshness returns the ingest status for every dataset, including the age of the last successful ingest. This lets API consumers check whether the data they're querying is current before relying on it for a compliance decision.
For the Cloudflare D1 database architecture this pipeline populates: Building the Federal Regulatory Data Hub on Cloudflare D1: 50M+ records at the edge →
For the REST, MCP, and JSON-LD API that serves the ingested data: The Federal Regulatory API: REST, MCP, and JSON-LD for 208 federal datasets →
For the entity bridge that links records across all 197 ingested datasets: Building the cross-agency regulatory entity graph: 50M+ records, one join →
For the compliance risk score that queries across the 30+ enforcement lists this pipeline feeds: Compliance screening across 30+ federal enforcement lists: how the risk score works →
For the Swarm SDK mesh transport layer that handles reliable drone-to-drone frame delivery over lossy RF links — sliding window ARQ, EWMA RTT, and transparent fragmentation: Swarm SDK mesh transport: reliable delivery over contested RF links →