Technical writing

Federal Dataset Ingest: Keeping 208 Federal Datasets Fresh at the Edge

· 8 min read· AI Analytics
Regulatory dataInfrastructureData engineeringCloudflare

A federal regulatory data platform is only as good as its freshness. A compliance team running due diligence on a new vendor needs OFAC's SDN list from this morning, not last week. A journalist investigating an EPA enforcement case needs to know whether it's still open or resolved. The D1 database that stores 208 datasets across 45 agencies is described elsewhere; this post covers the pipeline that keeps it current.

The ingest architecture

Every dataset has a Cloudflare Worker cron trigger. The cron fires once daily for most datasets, more frequently for the highest-stakes sources:

  • Every 4 hours: OFAC SDN, OFAC Non-SDN, SAM.gov exclusions, FinCEN 314(a)
  • Every 6 hours: HHS-OIG exclusions, SEC enforcement actions, CISA KEV
  • Daily at 07:00 UTC: Most datasets — EPA, OSHA, MSHA, DOJ, CFPB, FDA, FINRA, CFTC, PCAOB, IRS, FDIC, OCC, NHTSA, CPSC, FTC, and the 150+ others
  • Weekly: Large static datasets that change infrequently — GLEIF LEI universe, USAspending historical awards, NBER county datasets

The cron worker for each dataset is intentionally separate — no shared Worker process. A hung or erroring OFAC ingest doesn't block the EPA ingest. Cloudflare Cron Triggers execute Workers in their own isolated runtime with a 30-minute wall-clock limit per invocation.

Three source categories

Federal datasets come in three structural categories, each needing a different ingest approach:

Structured REST APIs

The cleanest sources. CISA KEV, SAM.gov, USAspending, and SEC EDGAR all expose JSON or XML over a documented REST API with pagination and optionallastModified filtering. These are the easiest to keep fresh — the API tells us what changed:

# SAM.gov exclusions — paginated JSON API with delta support
async function ingest_sam_exclusions() {
  const last_ingest = await kv.get("sam_exclusions:last_ingest_at")

  // Request only records modified since last run
  const params = new URLSearchParams({
    api_key: env.SAM_API_KEY,
    isActive: "Yes",
    fromDate: last_ingest ?? "2012-01-01",
    toDate: today_iso(),
    limit: "100",
    offset: "0",
  })

  let page = 0
  while (true) {
    params.set("offset", String(page * 100))
    const resp = await fetch(`https://api.sam.gov/exclusions/v1/search?${params}`)
    const { totalRecords, exclusionList } = await resp.json()

    for (const record of exclusionList) {
      await d1.prepare(
        "INSERT OR REPLACE INTO sam_exclusions VALUES (?, ?, ?, ?, ?, ?)"
      ).bind(record.uei, record.legalBusinessName, record.exclusionType,
              record.activeDate, record.terminationDate, today_iso()).run()
    }

    if ((page + 1) * 100 >= totalRecords) break
    page++
  }

  await kv.put("sam_exclusions:last_ingest_at", today_iso())
}

Bulk file downloads

Most agencies don't have a REST API. They publish a bulk CSV, XML, or ZIP file that contains the full current dataset. OFAC publishes the SDN as a 15MB XML. The EPA ECHO database publishes quarterly CSV dumps. HHS-OIG publishes monthly exclusion lists as a CSV download.

For bulk downloads, we use HTTP conditional GET — sending If-Modified-Sinceand If-None-Match headers to avoid downloading the full file when nothing changed. Most government file servers respect these correctly:

async function ingest_ofac_sdn() {
  const etag_stored = await kv.get("ofac_sdn:etag")

  const resp = await fetch("https://sanctionslistservice.ofac.treas.gov/api/PublicationPreview/exports/SDN_XML.zip", {
    headers: {
      "If-None-Match": etag_stored ?? "",
      "User-Agent": "AI-Analytics-Regulatory-Hub/1.0 (regulatory-data@ai-analytics.org)",
    }
  })

  if (resp.status === 304) {
    // No change since last ingest; update timestamp but skip parse
    await kv.put("ofac_sdn:last_check_at", now_iso())
    return { skipped: true }
  }

  const etag_new = resp.headers.get("ETag")
  const zip_buf = await resp.arrayBuffer()
  const xml = await unzip_single(zip_buf, "sdn.xml")

  await parse_and_upsert_sdn(xml)
  await kv.put("ofac_sdn:etag", etag_new)
  await kv.put("ofac_sdn:last_ingest_at", now_iso())
}

Scraping and unofficial feeds

Some agencies have no API and no stable bulk export. FTC enforcement action pages are HTML. Some OIG exclusion supplements are published as PDFs. PCAOB disciplinary orders are individual HTML pages linked from a JavaScript-rendered list.

For these we maintain lightweight scrapers that run as Cloudflare Workers. The scraper fetches the index page, parses the HTML for new record links, and for each new link fetches and extracts the structured data we need. These are the most fragile ingest pipelines — a HTML redesign breaks them — so they have more aggressive monitoring and a shorter staleness threshold that triggers an alert.

Schema drift detection

Government datasets change their schemas without notice. A CSV that had 23 columns in October has 25 columns in November. A JSON API adds a new field that breaks a strict parser. An XML schema changes an attribute name from nameto entityName.

We detect schema drift by comparing the incoming record structure against a stored schema fingerprint:

function check_schema_drift(
  dataset_id: string,
  incoming_record: Record<string, unknown>,
  stored_schema: StoredSchema
): SchemaDriftResult {
  const incoming_keys = new Set(Object.keys(incoming_record))
  const expected_keys = new Set(stored_schema.required_fields)

  const added = incoming_keys.difference(expected_keys)
  const removed = expected_keys.difference(incoming_keys)

  if (removed.size > 0) {
    // Missing required fields — likely schema break
    return {
      status: "BREAKING_DRIFT",
      added: [...added],
      removed: [...removed],
    }
  }

  if (added.size > 0) {
    // New fields — additive, non-breaking; update schema fingerprint
    return {
      status: "ADDITIVE_DRIFT",
      added: [...added],
    }
  }

  return { status: "OK" }
}

A BREAKING_DRIFT result halts the ingest, fires an alert, and preserves the last-known-good state in D1 rather than overwriting with broken records. An ADDITIVE_DRIFT result logs the new fields, stores them in D1's flexible extra_fields JSON column, and updates the schema fingerprint in KV.

Schema fingerprints are stored as JSON in Cloudflare KV:

{
  "dataset_id": "ofac_sdn",
  "required_fields": ["uid", "lastName", "sdnType", "programs", "ids", "addresses"],
  "optional_fields": ["firstName", "title", "remarks", "dateOfBirth", "placeOfBirth"],
  "last_updated": "2025-11-03T00:00:00Z",
  "version": 47
}

Delta detection for bulk sources

For bulk-download datasets, the entire file is new every time — there's no built-in delta. We compute our own deltas using a record-hash approach:

# For each record in the incoming bulk file:
# 1. Compute a stable hash of the record content (normalized, sorted keys)
# 2. Compare against the stored hash for that record's primary key
# 3. Only write to D1 if the hash changed or the record is new

async function upsert_with_delta(
  table: string,
  pk_field: string,
  record: Record<string, unknown>,
  db: D1Database,
  kv: KVNamespace
) {
  const pk = record[pk_field] as string
  const record_hash = await sha256_hex(stable_stringify(record))
  const stored_hash = await kv.get(`${table}:${pk}:hash`)

  if (stored_hash === record_hash) return  // no change

  await db.prepare(`INSERT OR REPLACE INTO ${table} VALUES (${placeholders(record)})`)
    .bind(...Object.values(record)).run()

  await kv.put(`${table}:${pk}:hash`, record_hash)
  return "updated"
}

This means daily D1 writes are proportional to what actually changed, not to the full dataset size. OFAC SDN has ~12,000 entries; on a typical day 5–20 records are added, modified, or removed. We write those 5–20, not 12,000.

Error handling and retry budgets

Government data sources go down. data.gov has planned maintenance windows. OFAC occasionally updates their CDN configuration in ways that temporarily break direct downloads. The ingest workers handle this with per-source retry budgets:

  • Transient errors (5xx, timeout): Retry with exponential backoff — 2min, 8min, 30min — up to 3 retries per invocation.
  • Consecutive failure threshold: If a dataset fails on 3 consecutive scheduled runs, fire a staleness alert (email + Slack). The D1 table continues serving its last-known-good state.
  • Critical-list threshold: OFAC SDN, SAM.gov exclusions, and HHS-OIG exclusions have a tighter threshold — 1 consecutive failure fires an immediate alert given their compliance criticality.

Staleness is surfaced in the API response for every record via adata_freshness field:

{
  "dataset": "ofac_sdn",
  "last_ingest_at": "2026-01-15T04:00:00Z",
  "last_modified_at": "2026-01-15T10:30:00Z",  // from OFAC ETag
  "status": "fresh",  // fresh | stale | error
  "records": 12483
}

Timing: when government sources actually update

A practical lesson: not all daily datasets are refreshed at midnight. Federal agencies update their data during business hours, often mid-morning EST:

  • OFAC SDN: Updates typically published 10:00–11:30 AM ET on business days. Our 4-hour cron catches same-day updates by the afternoon run (14:00 ET).
  • SEC EDGAR: New filings become searchable on EDGAR within 10–30 minutes of submission. Their full-text search index lags by ~4 hours.
  • DOJ press releases: Published throughout the business day; our 07:00 UTC cron misses same-day DOJ releases until the next day. The 19:00 UTC run catches US business day releases.
  • CISA KEV: Updates can be published at any time, including weekends. The 6-hour cron is appropriate here.

We schedule the primary daily cron at 07:00 UTC (overnight for US time zones) to avoid hitting agency servers during peak hours, then run a second pass at 19:00 UTC (mid-afternoon US) for datasets that update during business hours.

Monitoring and freshness dashboard

Each ingest result writes a row to an ingest_log D1 table:

CREATE TABLE ingest_log (
  id           INTEGER PRIMARY KEY,
  dataset_id   TEXT NOT NULL,
  started_at   TEXT NOT NULL,
  completed_at TEXT,
  status       TEXT NOT NULL,  -- success | skipped | error | schema_drift
  records_written INTEGER DEFAULT 0,
  error_message TEXT,
  etag_changed  INTEGER DEFAULT 0  -- 1 if source had new content
);

A public freshness endpoint at api.ai-analytics.org/freshness returns the ingest status for every dataset, including the age of the last successful ingest. This lets API consumers check whether the data they're querying is current before relying on it for a compliance decision.


For the Cloudflare D1 database architecture this pipeline populates: Building the Federal Regulatory Data Hub on Cloudflare D1: 50M+ records at the edge →

For the REST, MCP, and JSON-LD API that serves the ingested data: The Federal Regulatory API: REST, MCP, and JSON-LD for 208 federal datasets →

For the entity bridge that links records across all 197 ingested datasets: Building the cross-agency regulatory entity graph: 50M+ records, one join →

For the compliance risk score that queries across the 30+ enforcement lists this pipeline feeds: Compliance screening across 30+ federal enforcement lists: how the risk score works →

For the Swarm SDK mesh transport layer that handles reliable drone-to-drone frame delivery over lossy RF links — sliding window ARQ, EWMA RTT, and transparent fragmentation: Swarm SDK mesh transport: reliable delivery over contested RF links →