Technical writing
Social media ingestion at scale: collecting 58M posts per day from 47 platform schemas
The NLP pipeline that identifies coordinated campaigns, election-day anomalies, and censorship-correlated posting patterns runs on data collected from 47 social media platforms. The downstream processing — 80 NLP workers, 2.4M posts per hour, MinHash deduplication distributed across Redis — gets the attention in write-ups about this stack. The upstream problem of actually getting those posts into the Kafka topic in the first place is less visible but equally complex.
This post covers the collection side: how 47 different platform schemas become a single CanonicalPost format, how we manage rate limits without getting banned, when we use each of the three collection strategies, and how language detection runs at ingest time rather than as a downstream processing step.
The canonical post schema
Normalizing 47 platform schemas into a single format is the first engineering constraint the pipeline imposes. Every platform calls a post something different (tweet, toot, post, status, reel, update), uses different timestamp formats, represents the author differently, and encodes retweets/reposts/shares in incompatible ways. The CanonicalPost struct resolves this:
@dataclass
class CanonicalPost:
post_id: str # platform:original_id (e.g., "twitter:1234567890")
platform: str # twitter | reddit | mastodon | telegram | ...
content: str # plain text, HTML stripped
author_id: str # platform-scoped: "twitter:@username"
timestamp: datetime # always UTC
language: str # ISO 639-1, from FastText lid.176
language_conf: float # FastText confidence score
is_repost: bool
original_post_id: Optional[str] # for reposts
reply_to_id: Optional[str]
url: Optional[str] # canonical URL if publicly accessible
content_hash: str # SHA-256 of normalized(content)
collection_strategy: str # api | activitypub | rss
# Populated downstream by NLP workers
entity_mentions: Optional[List[str]] = None
sentiment_score: Optional[float] = None
coordination_flags: Optional[List[str]] = NoneThe content_hash is computed over normalized content (Unicode NFC, whitespace collapsed, URLs replaced with a placeholder token) rather than raw text. This lets the deduplication layer identify near-identical posts across platforms even when trailing whitespace or URL formats differ between syndication sources.
Three-tier collection strategy
Each platform falls into one of three collection tiers based on what's available:
| Tier | Method | Platforms | Posts/day |
|---|---|---|---|
| 1 | Official API | Twitter/X, Reddit, YouTube, LinkedIn, Facebook (Page API) | ~31M |
| 2 | ActivityPub federation | Mastodon, Lemmy, Misskey, Pixelfed, PeerTube (26 instances) | ~4M |
| 3 | RSS / public feed | Telegram public channels, Truth Social, GETTR, Gab, Parler, Substack | ~23M |
The tier determines both the collection implementation and the reliability characteristics. Tier 1 platforms give us structured JSON with stable schemas; Tier 3 platforms require HTML parsing that breaks when the platform updates its front end. We accept higher maintenance burden from Tier 3 because the platforms in that tier are specifically where information operations and election-adjacent content are less moderated.
Tier 1: official API collection
Twitter/X API v2 with Academic Research access gives us search results and filtered stream access. The filtered stream endpoint pushes matching tweets in real time rather than requiring polling; we use it for election keywords and censorship terms, and fall back to polling for historical backfill. The API uses per-endpoint rate limits; we budget them using a token-bucket rate limiter per credential:
import asyncio, time
class TokenBucketRateLimiter:
def __init__(self, rate: float, burst: int):
self.rate = rate # tokens per second
self.burst = burst # max burst size
self.tokens = burst
self.last_refill = time.monotonic()
async def acquire(self, n: int = 1):
while True:
now = time.monotonic()
elapsed = now - self.last_refill
self.tokens = min(self.burst, self.tokens + elapsed * self.rate)
self.last_refill = now
if self.tokens >= n:
self.tokens -= n
return
await asyncio.sleep((n - self.tokens) / self.rate)
# Twitter search: 300 requests per 15 minutes per credential
twitter_limiter = TokenBucketRateLimiter(rate=300/900, burst=10)We run 3 Twitter credentials in rotation, with a circuit breaker that trips on HTTP 429 and pauses all traffic to that credential for 15 minutes. Reddit API (OAuth2 client credentials) allows 60 requests/minute; we collect r/all and approximately 1,200 target subreddits relevant to political content and censorship discussion.
Tier 2: ActivityPub federation
The fediverse presents a different collection challenge: there is no central API. We run a listener Mastodon account on a self-hosted instance that federates with 26 target instances, receiving posts via the ActivityPub protocol without polling. The listener receives Create activities from federated instances as they publish:
# ActivityPub inbox handler (FastAPI)
@app.post("/inbox")
async def activitypub_inbox(request: Request, body: dict):
if body.get("type") != "Create":
return {"status": "ignored"}
obj = body.get("object", {})
if obj.get("type") not in ("Note", "Article"):
return {"status": "ignored"}
post = CanonicalPost(
post_id=f"activitypub:{obj['id']}",
platform=extract_platform(obj['id']), # from URL domain
content=strip_html(obj.get("content", "")),
author_id=f"activitypub:{obj.get('attributedTo')}",
timestamp=parse_iso(obj["published"]),
language=detect_language(obj.get("content", "")),
is_repost="inReplyTo" not in obj and "reblog" in body,
collection_strategy="activitypub",
content_hash=hash_content(obj.get("content", "")),
)
await kafka_producer.send("posts.raw", post)
return {"status": "ok"}The 26 federated instances were selected for coverage of political discussion in specific language communities. We do not attempt to collect the entire fediverse, only instances where the content is relevant to the monitoring mission. Instance selection is reviewed quarterly; instances that shut down or change federation policy are replaced.
Tier 3: RSS and public feed scraping
Telegram public channels publish an RSS feed at https://t.me/s/{channel} that we parse with feedparser. The feed does not include full post content for media posts; we fetch the linked Telegram web view only for posts flagged by keyword filters (election terms, censorship terms, cross-reference with active Voidly incidents). For Truth Social and GETTR, the RSS feed is available but unofficial and breaks without notice; we maintain a per-platform schema version hash and alert when the hash changes.
TIER3_COLLECTORS = {
"telegram": {
"strategy": "rss",
"url_template": "https://t.me/s/{channel_id}/rss",
"poll_interval": 120, # seconds
"channels": TELEGRAM_CHANNEL_LIST, # ~8,400 monitored channels
},
"truth_social": {
"strategy": "rss",
"url_template": "https://truthsocial.com/@{handle}/feed.rss",
"poll_interval": 300,
"schema_hash": "a3f8c...", # alert if schema changes
},
"gettr": {
"strategy": "rss",
"url_template": "https://gettr.com/user/{handle}/feed.xml",
"poll_interval": 300,
"schema_hash": "b7e1d...",
},
}The 8,400 Telegram channels are polled in parallel with a 120-second interval, using a single connection pool shared across all channel pollers. We stagger poll start times using a hash of the channel ID to spread load: each channel starts polling at time_now + (hash(channel_id) % poll_interval) seconds to avoid the thundering-herd pattern of all 8,400 channels firing simultaneously.
Language detection at ingest
We run FastText lid.176 on every post before publishing to Kafka, not as a downstream NLP step. The reason is partitioning: our Kafka topics partition by language_code, which lets NLP worker pools be specialized by language (we run dedicated worker pools for Arabic, Chinese, Persian, and Russian, each with language-specific models). If language detection happened downstream, the Kafka consumer would need to re-route messages between language partitions, which complicates the consumer offset tracking.
import fasttext
# Load once, share across all collector threads
_lid_model = fasttext.load_model("lid.176.ftz")
def detect_language(text: str) -> tuple[str, float]:
if len(text.strip()) < 20:
return "und", 0.0 # undetermined for very short text
text_clean = text.replace("
", " ")[:500]
labels, probs = _lid_model.predict(text_clean, k=1)
lang = labels[0].replace("__label__", "")
return lang, float(probs[0])
# Kafka partition key: first 2 bytes of SHA-256 of language code
# -> consistent hashing to language-specific partitionsFastText lid.176 runs at approximately 2.1μs per post on a single CPU core, adding negligible latency to the collection loop. At 670 posts/second (2.4M/hour), the language detection step uses about 0.15% of a single core. Posts with confidence below 0.6 go to an und (undetermined) partition rather than being assigned a guessed language; the NLP pipeline handles und with a language-agnostic model.
Kafka topic design
All collectors publish to a single posts.raw topic with 64 partitions, using language_code as the partition key. The 64-partition count supports 64 independent consumer instances; at peak load (election periods), we scale the NLP worker pool to 80 instances using the oversubscription headroom.
# Kafka producer configuration
producer = KafkaProducer(
bootstrap_servers=KAFKA_BROKERS,
value_serializer=lambda v: msgpack.packb(asdict(v), use_bin_type=True),
key_serializer=lambda k: k.encode(),
compression_type='lz4', # ~35% size reduction at minimal CPU cost
batch_size=32_768, # 32KB batches
linger_ms=10, # 10ms batching window
acks='all', # all ISR replicas must ack
retries=10,
max_in_flight_requests_per_connection=5,
)
def publish_post(post: CanonicalPost):
producer.send(
topic='posts.raw',
key=post.language, # partition key
value=post,
headers=[
('platform', post.platform.encode()),
('collection_strategy', post.collection_strategy.encode()),
],
)Content deduplication at ingest
The same post often appears across multiple collection paths: a Telegram channel posts something that gets quoted on Twitter, then the original Telegram post and the Twitter quote both enter the pipeline. We deduplicate using a two-level strategy before publishing to Kafka.
First, exact-duplicate detection uses a Redis SET with the content_hash and a 24-hour TTL. Before publishing, each collector checks SISMEMBER content_hashes {content_hash} and skips if the hash is present. This catches verbatim reposts even across platforms.
Second, near-duplicate detection for coordinated content uses the MinHash LSH described in the processing article. This runs downstream rather than at ingest, because MinHash comparison requires the full corpus of recent hashes; the ingest pipeline only does exact deduplication. Near-duplicate clustering is a post-ingest NLP step that adds a coordination_cluster_id to matched posts in TimescaleDB.
Platform schema drift
The most operationally expensive part of maintaining 47 collectors is schema drift. Platforms change their RSS feed structure, add required OAuth scopes, modify API response formats, or simply go down. We detect schema drift by hashing the structure (field names and types, not values) of incoming API responses and alerting when the hash changes.
The alert goes to an on-call rotation that triages within 4 hours. Schema changes that affect the content, author_id, or timestamp fields cause the collector to enter circuit-breaker mode and stop publishing until a schema migration is deployed. Changes to auxiliary fields (engagement counts, media attachments) are logged but not circuit-breaking.
Related technical articles:
- How we process 2.4M social-media posts per hour →
- NLP pipeline for real-time sentiment analysis at scale →
- Building a digital-footprint reconnaissance pipeline for OSINT investigations →
- Named entity extraction and disambiguation in the OSINT pipeline: 58M posts per day, 15,000 entity mentions per hour →