Technical writing

Dataset freshness monitoring in the Federal Regulatory Data Hub: staleness detection, per-source thresholds, and multi-channel alerting

· 9 min read· AI Analytics
Regulatory dataInfrastructureCloudflareData engineering

The Federal Regulatory Data Hub ingests 208 federal datasets on schedules ranging from every 10 minutes (EDGAR 8-K filings) to once daily (SAM.gov exclusions). The ingest pipeline handles the ETL; the query layer routes requests across the eight D1 shards. Between those two systems sits a monitoring layer that answers a different question: did the ingest actually run, and is the data we are serving current? This post covers how that monitoring layer works — per-source freshness thresholds, the D1 ingest audit table, the cron worker that evaluates staleness, OFAC-specific publication monitoring, ingest error classification, and the public status surface.

Why staleness is a compliance risk

Staleness in a regulatory data platform is not an operational hygiene problem — it is a product correctness problem. A compliance team screening a new counterparty against the OFAC SDN list needs the list from this morning. If OFAC published a major sanctions package at 10:15am ET and our ingest failed silently, the compliance team may clear a newly sanctioned entity during the window between the OFAC publication and our next successful ingest. That window creates direct liability.

The risk is asymmetric. A stale EDGAR 8-K index for an hour is a minor inconvenience — the underlying SEC EDGAR site is accessible, the information exists, the delay is unlikely to affect a time-sensitive decision. A stale OFAC SDN list for three hours on the morning OFAC publishes a Russia-linked package is a material compliance gap. Staleness thresholds and alert severity must reflect this asymmetry, not treat all 197 sources uniformly.

Per-source staleness thresholds

Each of the 197 source datasets carries a freshness configuration that encodes three values: the expected update cadence (how often the source publishes new data), the maximum tolerated staleness before an alert fires, and the criticality tier that determines which channels receive the alert. A representative subset:

// freshness-config.ts
export type AlertChannel = 'slack' | 'email' | 'pagerduty';
export type Criticality  = 'critical' | 'high' | 'medium';

export interface FreshnessConfig {
  source_id:               string;
  expected_cadence_minutes: number;
  max_stale_minutes:       number;
  criticality:             Criticality;
  alert_channels:          AlertChannel[];
}

export const FRESHNESS_CONFIG: FreshnessConfig[] = [
  {
    source_id:               'ofac_sdn',
    expected_cadence_minutes: 60,
    max_stale_minutes:       90,
    criticality:             'critical',
    alert_channels:          ['slack', 'email', 'pagerduty'],
  },
  {
    source_id:               'sam_exclusions',
    expected_cadence_minutes: 1440,
    max_stale_minutes:       1560,
    criticality:             'high',
    alert_channels:          ['slack', 'email'],
  },
  {
    source_id:               'edgar_8k',
    expected_cadence_minutes: 10,
    max_stale_minutes:       20,
    criticality:             'high',
    alert_channels:          ['slack', 'email'],
  },
  {
    source_id:               'fda_enforcement',
    expected_cadence_minutes: 1440,
    max_stale_minutes:       1560,
    criticality:             'medium',
    alert_channels:          ['slack'],
  },
  {
    source_id:               'cisa_kev',
    expected_cadence_minutes: 60,
    max_stale_minutes:       90,
    criticality:             'critical',
    alert_channels:          ['slack', 'email', 'pagerduty'],
  },
  {
    source_id:               'usaspending',
    expected_cadence_minutes: 1440,
    max_stale_minutes:       1560,
    criticality:             'medium',
    alert_channels:          ['slack'],
  },
  // ... 191 additional sources
];

The gap between expected_cadence_minutes and max_stale_minutes is intentional. For OFAC SDN (cadence: 60, max stale: 90), the threshold allows one missed polling cycle before alerting — a transient network timeout should not immediately page the on-call team. For EDGAR 8-K (cadence: 10, max stale: 20), the window is tight because the ingest is event-driven and a 20-minute gap indicates a systemic problem, not a transient one. For daily-cadence datasets, the extra 120 minutes of buffer absorbs government sources that occasionally publish several hours late.

The D1 dataset_ingests table

Every ingest run — successful or failed — writes a row to a dedicated audit table in the monitoring D1 database. The schema captures the outcome of each ingest attempt with enough detail to reconstruct what happened during a retrospective:

-- monitoring.sql: ingest audit table
CREATE TABLE dataset_ingests (
  source_id         TEXT    NOT NULL,
  ingest_at         INTEGER NOT NULL,  -- unix epoch seconds
  record_count      INTEGER NOT NULL,
  delta_added       INTEGER NOT NULL,
  delta_modified    INTEGER NOT NULL,
  delta_removed     INTEGER NOT NULL,
  ingest_duration_ms INTEGER NOT NULL,
  error             TEXT    NULL       -- NULL on success, error class on failure
);

CREATE INDEX di_source_time ON dataset_ingests (source_id, ingest_at DESC);

The delta_* columns record the record-level diff from each ingest cycle (added, modified, and removed record counts). A row with error IS NULL and delta_added = 0 and delta_modified = 0 means the source published a new file that was identical to the previous version — a normal outcome for stable datasets between update cycles. A row with error IS NOT NULL means the ingest failed and the previous data remains in the serving layer unchanged.

The staleness check query aggregates this table by source, computing elapsed minutes since the last successful ingest:

-- staleness check query
SELECT
  source_id,
  MAX(ingest_at)                                      AS last_ingest,
  (strftime('%s', 'now') - MAX(ingest_at)) / 60       AS stale_minutes
FROM dataset_ingests
WHERE error IS NULL
GROUP BY source_id;

The WHERE error IS NULL filter is critical: it measures staleness from the last successful ingest, not the last ingest attempt. A source that has failed five consecutive ingest attempts still shows its staleness accumulating from the last success — which is the correct signal for the monitoring system. The failed attempts are visible in the raw table for debugging.

Cloudflare Cron Trigger for staleness checks

A dedicated Cloudflare Worker runs the staleness check on a 5-minute cron schedule (*/5 * * * *). It queries the staleness view, joins against the in-memory FRESHNESS_CONFIG, identifies sources that have exceeded their max_stale_minutes threshold, and routes alerts to the appropriate channels:

// staleness-worker.ts
export interface Env {
  DB_MONITORING: D1Database;
  KV_ALERT_STATE: KVNamespace;
  SLACK_WEBHOOK_URL: string;
  PAGERDUTY_ROUTING_KEY: string;
  ALERT_EMAIL_TO: string;
  SENDGRID_API_KEY: string;
}

export default {
  async scheduled(_event: ScheduledEvent, env: Env): Promise<void> {
    await checkStaleness(env);
  },
};

async function checkStaleness(env: Env): Promise<void> {
  const { results } = await env.DB_MONITORING
    .prepare(`
      SELECT
        source_id,
        MAX(ingest_at)                                AS last_ingest,
        (strftime('%s', 'now') - MAX(ingest_at)) / 60 AS stale_minutes
      FROM dataset_ingests
      WHERE error IS NULL
      GROUP BY source_id
    `)
    .all<{ source_id: string; last_ingest: number; stale_minutes: number }>();

  const stalenessMap = new Map(results.map(r => [r.source_id, r]));

  for (const config of FRESHNESS_CONFIG) {
    const row = stalenessMap.get(config.source_id);

    // Source has never successfully ingested — treat as maximally stale
    const staleMinutes = row ? row.stale_minutes : Infinity;

    if (staleMinutes > config.max_stale_minutes) {
      await sendAlert(config, staleMinutes, env);
    }
  }
}

The cron fires every 5 minutes and takes approximately 80ms to run across all 197 sources. Cloudflare Workers cron triggers have a minimum interval of 1 minute; 5 minutes gives the staleness check enough granularity to catch a EDGAR ingest failure (max stale: 20 minutes) within two polling cycles, while keeping D1 query load low.

Multi-channel alerting

The sendAlert() function routes alerts to the channels specified in FRESHNESS_CONFIG, applying severity-based logic for each channel. KV-backed deduplication ensures that a source that remains stale for multiple cron cycles does not flood alert channels:

// alert.ts
interface AlertState {
  first_alerted_at: number;   // unix epoch seconds
  last_alerted_at:  number;
  alert_count:      number;
}

async function sendAlert(
  config: FreshnessConfig,
  staleMinutes: number,
  env: Env,
): Promise<void> {
  const kvKey    = `alert:${config.source_id}`;
  const now      = Math.floor(Date.now() / 1000);
  const existing = await env.KV_ALERT_STATE.get<AlertState>(kvKey, 'json');

  // Slack: send on first alert for 'high' and 'critical' sources;
  // suppress subsequent Slack messages unless the source recovers and re-stales.
  if (config.alert_channels.includes('slack')) {
    const slackSent = existing !== null;
    if (!slackSent) {
      await postSlack(config, staleMinutes, env.SLACK_WEBHOOK_URL);
    }
  }

  // Email: for 'critical' sources, resend every 30 minutes until resolved.
  if (config.alert_channels.includes('email') && config.criticality === 'critical') {
    const minutesSinceLastEmail = existing
      ? (now - existing.last_alerted_at) / 60
      : Infinity;
    if (minutesSinceLastEmail >= 30) {
      await sendEmail(config, staleMinutes, env);
    }
  }

  // PagerDuty: fire only when stale_minutes exceeds 2× max threshold —
  // a BGP-like critical condition indicating a systemic ingest failure.
  if (config.alert_channels.includes('pagerduty')) {
    const criticalThreshold = config.max_stale_minutes * 2;
    if (staleMinutes > criticalThreshold) {
      const pgKey = `pd:${config.source_id}`;
      const pdSent = await env.KV_ALERT_STATE.get(pgKey);
      if (!pdSent) {
        await triggerPagerDuty(config, staleMinutes, env.PAGERDUTY_ROUTING_KEY);
        // PagerDuty deduplication key expires after 4 hours; auto-resolves on recovery.
        await env.KV_ALERT_STATE.put(pgKey, '1', { expirationTtl: 14400 });
      }
    }
  }

  // Update KV alert state for deduplication
  const state: AlertState = {
    first_alerted_at: existing?.first_alerted_at ?? now,
    last_alerted_at:  now,
    alert_count:      (existing?.alert_count ?? 0) + 1,
  };
  // Alert state expires after 2 hours; expiry signals recovery (clears dedup window).
  await env.KV_ALERT_STATE.put(kvKey, JSON.stringify(state), { expirationTtl: 7200 });
}

The KV TTL on the alert state is load-bearing. When a stale source recovers — the next successful ingest clears the staleness — the KV key for that source is deleted explicitly by the ingest worker on successful completion. If the ingest worker crashes before the explicit delete, the 2-hour TTL ensures the alert state expires naturally, allowing the next alert cycle to re-alert if the source is still stale rather than suppressing alerts indefinitely.

OFAC-specific monitoring

OFAC publishes its SDN list at approximately 10:00am ET on business days. The publication is not scheduled to the minute — it can arrive anywhere from 9:45am to 1:30pm. The cron-based staleness check handles the case where our ingest fails, but it cannot distinguish between "OFAC hasn't published yet" and "OFAC published and we failed to ingest." A separate OFAC publication monitor handles this.

The monitor polls the OFAC SDN XML publication endpoint every 10 minutes using conditional GET (comparing ETags). When a new ETag is detected, an immediate ingest is triggered without waiting for the next cron window. When 90 minutes have passed since the expected publish time and the ETag has not changed, a "OFAC publish delay" alert fires — because OFAC sometimes publishes late, and the team needs to verify that the delay is on OFAC's side, not ours:

// ofac-monitor.ts
const OFAC_SDN_URL    = 'https://www.treasury.gov/ofac/downloads/sdn_advanced.xml';
const PUBLISH_HOUR_ET = 10;  // 10am ET expected publish time
const DELAY_ALERT_MIN = 90;  // alert if no new ETag 90min after expected publish

async function checkOfacEtag(env: Env): Promise<void> {
  const storedEtag = await env.KV_ALERT_STATE.get('ofac:etag');

  const response = await fetch(OFAC_SDN_URL, {
    method: 'HEAD',
    headers: storedEtag ? { 'If-None-Match': storedEtag } : {},
  });

  if (response.status === 200) {
    const newEtag = response.headers.get('ETag');

    if (newEtag && newEtag !== storedEtag) {
      // New publication detected — trigger ingest immediately.
      await env.KV_ALERT_STATE.put('ofac:etag', newEtag);
      await env.KV_ALERT_STATE.put('ofac:last_publish_at',
        String(Math.floor(Date.now() / 1000)));
      await triggerImmediateIngest('ofac_sdn', env);
    }
  }

  // Publish delay check: only on business days after expected publish time.
  const nowET          = toEasternTime(new Date());
  const isBusinessDay  = nowET.getDay() >= 1 && nowET.getDay() <= 5;
  const pastPublishTime = nowET.getHours() >= PUBLISH_HOUR_ET;

  if (isBusinessDay && pastPublishTime) {
    const lastPublishAt = await env.KV_ALERT_STATE.get('ofac:last_publish_at');
    const lastPublish   = lastPublishAt ? parseInt(lastPublishAt, 10) : 0;
    const startOfToday  = getEtStartOfDay(nowET);

    // If no publish today yet, compute minutes since expected publish time.
    if (lastPublish < startOfToday) {
      const expectedPublishEpoch = startOfToday + PUBLISH_HOUR_ET * 3600;
      const minutesSinceExpected =
        (Math.floor(Date.now() / 1000) - expectedPublishEpoch) / 60;

      if (minutesSinceExpected > DELAY_ALERT_MIN) {
        const delayKey = `ofac:delay_alert:${nowET.toDateString()}`;
        const alreadySent = await env.KV_ALERT_STATE.get(delayKey);
        if (!alreadySent) {
          await postSlack(
            { source_id: 'ofac_sdn', criticality: 'critical' } as FreshnessConfig,
            minutesSinceExpected,
            env.SLACK_WEBHOOK_URL,
            { message: 'OFAC publish delay — no new SDN ETag after expected publish time.' },
          );
          await env.KV_ALERT_STATE.put(delayKey, '1', { expirationTtl: 86400 });
        }
      }
    }
  }
}

The distinction between an OFAC publish delay and an ingest failure matters for the resolution playbook. A publish delay means OFAC is late — no action required on our side until it arrives. An ingest failure on an already-published ETag means our pipeline is broken and needs immediate investigation. The monitoring layer surfaces both conditions with different alert text so the on-call team knows which playbook to follow.

Ingest error classification

Not all ingest failures are equal. A network timeout is transient and will likely resolve on retry. An authentication failure means a credential has expired and will not resolve until manually rotated. A schema drift error means the government source changed its data format and requires a developer fix, not an ops response. Routing these to the same alert channel would bury the actionable signals in noise.

Failed ingests are classified into five error classes before the error is written todataset_ingests.error:

// error-classifier.ts
export type IngestErrorClass =
  | 'network_timeout'   // HTTP fetch timed out — transient, retry
  | 'auth_failure'      // 401/403 — credential expired, trigger rotation alert
  | 'schema_drift'      // response valid but field structure changed — dev alert
  | 'empty_response'    // 200 OK but zero-length or whitespace-only body — source outage
  | 'parse_failure';    // response body present but could not be parsed — dev alert

export function classifyIngestError(
  httpStatus:   number | null,
  errorMessage: string,
  bodyLength:   number,
  parseSuccess: boolean,
): IngestErrorClass {
  if (httpStatus === null) {
    return 'network_timeout';
  }
  if (httpStatus === 401 || httpStatus === 403) {
    return 'auth_failure';
  }
  if (httpStatus === 200 && bodyLength === 0) {
    return 'empty_response';
  }
  if (httpStatus === 200 && !parseSuccess) {
    // Distinguish schema_drift (known format but unexpected structure)
    // from parse_failure (cannot parse at all).
    const isKnownFormat = errorMessage.includes('unexpected field') ||
                          errorMessage.includes('missing required column') ||
                          errorMessage.includes('column count mismatch');
    return isKnownFormat ? 'schema_drift' : 'parse_failure';
  }
  return 'network_timeout';  // fallback for 5xx, connection reset, etc.
}

The error class drives the alert routing:

  • network_timeout / empty_response: ops alert via Slack; auto-resolves when the next ingest succeeds. No human action required unless it persists beyond the max staleness threshold.
  • auth_failure: immediate Slack alert with a link to the credential rotation runbook; PagerDuty for OFAC and CISA KEV because these cannot go stale without manual intervention.
  • schema_drift: developer alert via Slack in the #data-eng channel, not the ops channel; not paged because a schema drift does not resolve itself and requires a code change.
  • parse_failure: developer alert; also triggers a sample payload capture to R2 storage so the engineer can inspect the raw response body without re-fetching.

The status page surface

A public-facing JSON endpoint at api.ai-analytics.org/status exposes per-dataset freshness status. The response enumerates every source with its last successful ingest timestamp, elapsed staleness in minutes, and a status label computed against the freshness config:

// Example response: GET api.ai-analytics.org/status
{
  "generated_at": "2025-04-17T14:32:00Z",
  "datasets": [
    {
      "source_id":     "ofac_sdn",
      "last_ingest_at": "2025-04-17T14:18:44Z",
      "stale_minutes": 13,
      "status":        "fresh",
      "record_count":  14923,
      "criticality":   "critical"
    },
    {
      "source_id":     "cisa_kev",
      "last_ingest_at": "2025-04-17T13:45:22Z",
      "stale_minutes": 47,
      "status":        "fresh",
      "record_count":  1102,
      "criticality":   "critical"
    },
    {
      "source_id":     "edgar_8k",
      "last_ingest_at": "2025-04-17T14:29:01Z",
      "stale_minutes": 3,
      "status":        "fresh",
      "record_count":  5841200,
      "criticality":   "high"
    }
  ]
}

The status field uses four values: fresh (stale_minutes below threshold), stale (threshold exceeded for non-critical sources), critical (threshold exceeded for critical sources), and error (last ingest row carries a non-null error field). The Cloudflare Worker that generates this response queries the same D1 staleness view used by the cron check and joins it against an in-memory copy of FRESHNESS_CONFIG at request time:

// status-handler.ts
export async function handleStatusRequest(env: Env): Promise<Response> {
  const { results } = await env.DB_MONITORING
    .prepare(`
      SELECT
        source_id,
        MAX(ingest_at)                                AS last_ingest,
        (strftime('%s', 'now') - MAX(ingest_at)) / 60 AS stale_minutes,
        (SELECT record_count FROM dataset_ingests di2
         WHERE di2.source_id = di.source_id
           AND di2.error IS NULL
         ORDER BY di2.ingest_at DESC LIMIT 1)         AS record_count,
        (SELECT error FROM dataset_ingests di3
         WHERE di3.source_id = di.source_id
         ORDER BY di3.ingest_at DESC LIMIT 1)         AS latest_error
      FROM dataset_ingests di
      WHERE error IS NULL
      GROUP BY source_id
    `)
    .all<{
      source_id:     string;
      last_ingest:   number;
      stale_minutes: number;
      record_count:  number;
      latest_error:  string | null;
    }>();

  const rowMap = new Map(results.map(r => [r.source_id, r]));

  const datasets = FRESHNESS_CONFIG.map(config => {
    const row          = rowMap.get(config.source_id);
    const staleMinutes = row ? row.stale_minutes : Infinity;
    const latestError  = row ? row.latest_error : null;

    let status: 'fresh' | 'stale' | 'critical' | 'error';
    if (latestError) {
      status = 'error';
    } else if (staleMinutes > config.max_stale_minutes) {
      status = config.criticality === 'critical' ? 'critical' : 'stale';
    } else {
      status = 'fresh';
    }

    return {
      source_id:      config.source_id,
      last_ingest_at: row ? new Date(row.last_ingest * 1000).toISOString() : null,
      stale_minutes:  row ? row.stale_minutes : null,
      status,
      record_count:   row?.record_count ?? null,
      criticality:    config.criticality,
    };
  });

  return new Response(
    JSON.stringify({ generated_at: new Date().toISOString(), datasets }),
    {
      headers: {
        'Content-Type': 'application/json',
        'Cache-Control': 'public, max-age=60',
        'Access-Control-Allow-Origin': '*',
      },
    },
  );
}

A companion endpoint at api.ai-analytics.org/coverage exposes the same per-dataset record counts without the staleness metadata — this endpoint is intended for the coverage page on ai-analytics.org and is updated at each ingest cycle. Because record counts update daily and the status endpoint already carries them, the coverage response is a lightweight projection cached with a 30-minute TTL.


For the ingest pipeline whose success is monitored here: Federal Dataset Ingest →

For the query layer that serves data from these ingested datasets: The Federal Regulatory Data Hub query layer →

For the change alert webhooks that fire when record content changes: Federal Regulatory Data Hub change alerts →

For the Cloudflare D1 architecture that the monitoring queries run against: Building the Federal Regulatory Data Hub on Cloudflare D1 →