Technical writing
Voidly real-time anomaly scorer: ML inference in the streaming pipeline at 50,000 events per second
The batch ONNX inference service processes probe results with p99 latency under 50ms — sufficient for the analytical pipeline but too slow for the real-time incident detection path. When a country experiences a sudden onset of censorship (an election-day social media block, a coup-related internet shutdown), every additional minute of latency between probe observation and anomaly alert costs operational value for the NGOs and journalists who depend on Voidly's data.
The real-time anomaly scorer embeds ONNX Runtime directly inside an Apache Flink streaming job, scoring probe results in-stream at 50,000 events per second with end-to-end latency (probe result arrival to scored anomaly record in Kafka) under 100ms p99. This article covers the Flink operator design, thread-local ONNX session management, Kafka partition alignment, and the backpressure mechanism.
Flink job topology
The streaming job reads from the voidly.probe.results Kafka topic and writes scored anomaly events to voidly.anomalies.realtime. The pipeline topology:
// flink/src/main/java/net/voidly/scorer/AnomalyScorerJob.java
public class AnomalyScorerJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(16); // 16 task slots; each holds one ONNX session
// Source: Kafka probe results topic
KafkaSource<ProbeResult> source = KafkaSource.<ProbeResult>builder()
.setBootstrapServers(KAFKA_BROKERS)
.setTopics("voidly.probe.results")
.setGroupId("voidly-realtime-scorer")
.setStartingOffsets(OffsetsInitializer.latest())
.setDeserializer(new ProbeResultDeserializer())
.build();
DataStream<ProbeResult> probeResults = env.fromSource(
source,
WatermarkStrategy.<ProbeResult>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((r, t) -> r.getTsEpochMs()),
"KafkaProbeResults"
);
DataStream<ScoredAnomaly> scored = probeResults
// Partition by (country_code, asn) to co-locate state for the
// same network segment on the same task slot.
.keyBy(r -> r.getCountryCode() + ":" + r.getAsn())
// Feature extraction (stateful: needs 1-hour rolling window per key)
.process(new FeatureExtractionOperator())
.name("FeatureExtraction")
.uid("feature-extraction")
// ONNX inference (stateless: each record scored independently)
.map(new OnnxScoringOperator())
.name("OnnxScoring")
.uid("onnx-scoring")
// Anomaly threshold + deduplication (stateful: suppresses repeated alerts)
.keyBy(s -> s.getCountryCode() + ":" + s.getAsn() + ":" + s.getDomain())
.process(new AnomalyThresholdOperator())
.name("AnomalyThreshold")
.uid("anomaly-threshold");
// Sink: Kafka anomalies topic
KafkaSink<ScoredAnomaly> sink = KafkaSink.<ScoredAnomaly>builder()
.setBootstrapServers(KAFKA_BROKERS)
.setRecordSerializer(new ScoredAnomalySerializer("voidly.anomalies.realtime"))
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
scored.sinkTo(sink).name("KafkaAnomalieSink").uid("anomaly-sink");
env.execute("VoidlyRealtimeAnomalyScorer");
}
}Thread-local ONNX session management
The OnnxScoringOperator is a Flink MapFunction — it receives oneFeatureVector and returns one ScoredAnomaly. Each Flink task slot runs in a single JVM thread, so an ONNX Runtime session can be initialized once per task slot using Java's ThreadLocal, avoiding per-record session creation overhead:
// flink/src/main/java/net/voidly/scorer/OnnxScoringOperator.java
import ai.onnxruntime.*;
public class OnnxScoringOperator implements MapFunction<FeatureVector, ScoredAnomaly>, Serializable {
private static final long serialVersionUID = 1L;
// Model bytes loaded once from classpath; shared (read-only) across threads.
// The OrtSession wraps these bytes per-thread below.
private static final byte[] MODEL_BYTES = loadModelBytes("/models/classifier-v2.1.onnx");
private static final ThreadLocal<OrtSession> SESSION = ThreadLocal.withInitial(() -> {
try {
OrtEnvironment env = OrtEnvironment.getEnvironment();
OrtSession.SessionOptions opts = new OrtSession.SessionOptions();
opts.setIntraOpNumThreads(1); // single-threaded per Flink task slot
opts.setInterOpNumThreads(1);
opts.setOptimizationLevel(OrtSession.SessionOptions.OptLevel.ALL_OPT);
return env.createSession(MODEL_BYTES, opts);
} catch (OrtException e) {
throw new RuntimeException("Failed to initialize ONNX session", e);
}
});
@Override
public ScoredAnomaly map(FeatureVector features) throws Exception {
OrtSession session = SESSION.get();
// Build input tensors (one tensor per named input to match ONNX graph)
OrtEnvironment env = OrtEnvironment.getEnvironment();
Map<String, OnnxTensor> inputs = new HashMap<>();
float[][] dnsNxdomain = {{ features.getDnsNxdomain() ? 1.0f : 0.0f }};
inputs.put("dns_nxdomain", OnnxTensor.createTensor(env, dnsNxdomain));
// ... remaining 11 feature tensors ...
try (OrtSession.Result result = session.run(inputs)) {
// Output 0: predicted label [1][1] int64
// Output 1: probability [1][2] float32 (zipmap=False)
long[][] labels = (long[][]) result.get(0).getValue();
float[][] probs = (float[][]) result.get(1).getValue();
float censorProb = probs[0][1]; // P(class=1) = P(censored)
boolean isAnomaly = censorProb > ANOMALY_THRESHOLD; // 0.72
return new ScoredAnomaly(
features.getMeasurementId(),
features.getTs(),
features.getCountryCode(),
features.getAsn(),
features.getDomain(),
censorProb,
isAnomaly,
"v2.1"
);
} finally {
inputs.values().forEach(t -> { try { t.close(); } catch (Exception ignored) {} });
}
}
}The ONNX Runtime Java bindings are not thread-safe for OrtSession construction but are thread-safe for OrtSession.run() once constructed. UsingThreadLocal isolates construction per thread while allowing the read-only model bytes to be shared across all 16 task slots, saving ~250 MB of heap (16 threads × 18 MB model = 288 MB without sharing, vs. 18 MB with shared bytes plus 16 × ~8 MB session state).
Kafka partition alignment
The FeatureExtractionOperator is a stateful Flink operator that maintains a 1-hour rolling window of raw probe results per (country_code, asn) key to compute aggregate features like country_block_rate_7d and domain_block_rate_30d. State size grows proportionally to the number of distinct keys assigned to each task slot.
The Kafka topic voidly.probe.results uses 64 partitions withcountry_code + asn as the message key. Flink assigns Kafka partitions to task slots sequentially. With 16 Flink task slots and 64 Kafka partitions, each task slot processes exactly 4 Kafka partitions. As long as the Kafka partitioning and Flink keyBy both use the samecountry_code + asn hash, all records for a given key land on the same task slot, eliminating cross-slot state access.
The Kafka producer in the ingestion pipeline hashes the partition key consistently:
// ingestion/src/kafka_producer.rs
pub fn probe_result_partition_key(result: &ProbeResult) -> String {
// Same hash used by Flink's keyBy: Flink's KeySelector hashes the string key
// using MurmurHash3 mod num_partitions. We use the same compound key.
format!("{}:{}", result.country_code, result.asn)
}
// Kafka producer config:
// topic: voidly.probe.results
// num_partitions: 64
// key: probe_result_partition_key(result)
// partitioner: murmur2 (Kafka default)Backpressure mechanism
ONNX inference is the slowest operator in the pipeline. At sustained 50,000 events/sec and with 16 task slots, each slot processes 3,125 events/sec. Single-event ONNX inference latency is approximately 0.4ms, giving a theoretical capacity of 2,500 events/sec per task slot — below the required 3,125. The gap is closed by mini-batching: each task slot accumulates incoming records for up to 5ms or until 16 records have arrived, then scores the mini-batch in a single ONNX run() call. At batch size 16, inference throughput per slot is approximately 4,000 events/sec, providing 28% headroom.
Flink's built-in backpressure mechanism handles transient overload: if the ONNX operator cannot keep up, Flink throttles the upstream Kafka source by pausing partition consumption. The max.poll.interval.ms on the Kafka consumer is set to 120 seconds (the default of 5 minutes would allow a very long GC pause to trigger a consumer group rebalance).
| Metric | Steady state | Peak (election-day spike) |
|---|---|---|
| Throughput (events/sec) | 18,000 | 47,000 |
| End-to-end latency p50 (ms) | 28 | 61 |
| End-to-end latency p99 (ms) | 54 | 97 |
| ONNX task slot CPU utilization | 38% | 91% |
| Kafka consumer lag (messages) | < 50 | ~2,400 (peak, recovers in < 3 min) |
Related writing
Voidly real-time pipeline covers the broader Flink streaming architecture that the anomaly scorer operates within, including the Kafka topic topology and the incident state machine that consumes scored anomaly events.
Voidly incident resolution describes the downstream process that aggregates scored anomaly events from this pipeline into verified censorship incidents with analyst-reviewed resolution decisions.