Technical writing

Social media ingestion at scale: collecting 58M posts per day from 47 platform schemas

· 9 min read· AI Analytics
NLPInfrastructureKafkaOSINT

The NLP pipeline that identifies coordinated campaigns, election-day anomalies, and censorship-correlated posting patterns runs on data collected from 47 social media platforms. The downstream processing — 80 NLP workers, 2.4M posts per hour, MinHash deduplication distributed across Redis — gets the attention in write-ups about this stack. The upstream problem of actually getting those posts into the Kafka topic in the first place is less visible but equally complex.

This post covers the collection side: how 47 different platform schemas become a single CanonicalPost format, how we manage rate limits without getting banned, when we use each of the three collection strategies, and how language detection runs at ingest time rather than as a downstream processing step.

The canonical post schema

Normalizing 47 platform schemas into a single format is the first engineering constraint the pipeline imposes. Every platform calls a post something different (tweet, toot, post, status, reel, update), uses different timestamp formats, represents the author differently, and encodes retweets/reposts/shares in incompatible ways. The CanonicalPost struct resolves this:

@dataclass
class CanonicalPost:
    post_id: str        # platform:original_id (e.g., "twitter:1234567890")
    platform: str       # twitter | reddit | mastodon | telegram | ...
    content: str        # plain text, HTML stripped
    author_id: str      # platform-scoped: "twitter:@username"
    timestamp: datetime # always UTC
    language: str       # ISO 639-1, from FastText lid.176
    language_conf: float  # FastText confidence score
    is_repost: bool
    original_post_id: Optional[str]  # for reposts
    reply_to_id: Optional[str]
    url: Optional[str]  # canonical URL if publicly accessible
    content_hash: str   # SHA-256 of normalized(content)
    collection_strategy: str  # api | activitypub | rss

    # Populated downstream by NLP workers
    entity_mentions: Optional[List[str]] = None
    sentiment_score: Optional[float] = None
    coordination_flags: Optional[List[str]] = None

The content_hash is computed over normalized content (Unicode NFC, whitespace collapsed, URLs replaced with a placeholder token) rather than raw text. This lets the deduplication layer identify near-identical posts across platforms even when trailing whitespace or URL formats differ between syndication sources.

Three-tier collection strategy

Each platform falls into one of three collection tiers based on what's available:

TierMethodPlatformsPosts/day
1Official APITwitter/X, Reddit, YouTube, LinkedIn, Facebook (Page API)~31M
2ActivityPub federationMastodon, Lemmy, Misskey, Pixelfed, PeerTube (26 instances)~4M
3RSS / public feedTelegram public channels, Truth Social, GETTR, Gab, Parler, Substack~23M

The tier determines both the collection implementation and the reliability characteristics. Tier 1 platforms give us structured JSON with stable schemas; Tier 3 platforms require HTML parsing that breaks when the platform updates its front end. We accept higher maintenance burden from Tier 3 because the platforms in that tier are specifically where information operations and election-adjacent content are less moderated.

Tier 1: official API collection

Twitter/X API v2 with Academic Research access gives us search results and filtered stream access. The filtered stream endpoint pushes matching tweets in real time rather than requiring polling; we use it for election keywords and censorship terms, and fall back to polling for historical backfill. The API uses per-endpoint rate limits; we budget them using a token-bucket rate limiter per credential:

import asyncio, time

class TokenBucketRateLimiter:
    def __init__(self, rate: float, burst: int):
        self.rate = rate     # tokens per second
        self.burst = burst   # max burst size
        self.tokens = burst
        self.last_refill = time.monotonic()

    async def acquire(self, n: int = 1):
        while True:
            now = time.monotonic()
            elapsed = now - self.last_refill
            self.tokens = min(self.burst, self.tokens + elapsed * self.rate)
            self.last_refill = now
            if self.tokens >= n:
                self.tokens -= n
                return
            await asyncio.sleep((n - self.tokens) / self.rate)

# Twitter search: 300 requests per 15 minutes per credential
twitter_limiter = TokenBucketRateLimiter(rate=300/900, burst=10)

We run 3 Twitter credentials in rotation, with a circuit breaker that trips on HTTP 429 and pauses all traffic to that credential for 15 minutes. Reddit API (OAuth2 client credentials) allows 60 requests/minute; we collect r/all and approximately 1,200 target subreddits relevant to political content and censorship discussion.

Tier 2: ActivityPub federation

The fediverse presents a different collection challenge: there is no central API. We run a listener Mastodon account on a self-hosted instance that federates with 26 target instances, receiving posts via the ActivityPub protocol without polling. The listener receives Create activities from federated instances as they publish:

# ActivityPub inbox handler (FastAPI)
@app.post("/inbox")
async def activitypub_inbox(request: Request, body: dict):
    if body.get("type") != "Create":
        return {"status": "ignored"}
    obj = body.get("object", {})
    if obj.get("type") not in ("Note", "Article"):
        return {"status": "ignored"}

    post = CanonicalPost(
        post_id=f"activitypub:{obj['id']}",
        platform=extract_platform(obj['id']),  # from URL domain
        content=strip_html(obj.get("content", "")),
        author_id=f"activitypub:{obj.get('attributedTo')}",
        timestamp=parse_iso(obj["published"]),
        language=detect_language(obj.get("content", "")),
        is_repost="inReplyTo" not in obj and "reblog" in body,
        collection_strategy="activitypub",
        content_hash=hash_content(obj.get("content", "")),
    )
    await kafka_producer.send("posts.raw", post)
    return {"status": "ok"}

The 26 federated instances were selected for coverage of political discussion in specific language communities. We do not attempt to collect the entire fediverse, only instances where the content is relevant to the monitoring mission. Instance selection is reviewed quarterly; instances that shut down or change federation policy are replaced.

Tier 3: RSS and public feed scraping

Telegram public channels publish an RSS feed at https://t.me/s/{channel} that we parse with feedparser. The feed does not include full post content for media posts; we fetch the linked Telegram web view only for posts flagged by keyword filters (election terms, censorship terms, cross-reference with active Voidly incidents). For Truth Social and GETTR, the RSS feed is available but unofficial and breaks without notice; we maintain a per-platform schema version hash and alert when the hash changes.

TIER3_COLLECTORS = {
    "telegram": {
        "strategy": "rss",
        "url_template": "https://t.me/s/{channel_id}/rss",
        "poll_interval": 120,   # seconds
        "channels": TELEGRAM_CHANNEL_LIST,  # ~8,400 monitored channels
    },
    "truth_social": {
        "strategy": "rss",
        "url_template": "https://truthsocial.com/@{handle}/feed.rss",
        "poll_interval": 300,
        "schema_hash": "a3f8c...",  # alert if schema changes
    },
    "gettr": {
        "strategy": "rss",
        "url_template": "https://gettr.com/user/{handle}/feed.xml",
        "poll_interval": 300,
        "schema_hash": "b7e1d...",
    },
}

The 8,400 Telegram channels are polled in parallel with a 120-second interval, using a single connection pool shared across all channel pollers. We stagger poll start times using a hash of the channel ID to spread load: each channel starts polling at time_now + (hash(channel_id) % poll_interval) seconds to avoid the thundering-herd pattern of all 8,400 channels firing simultaneously.

Language detection at ingest

We run FastText lid.176 on every post before publishing to Kafka, not as a downstream NLP step. The reason is partitioning: our Kafka topics partition by language_code, which lets NLP worker pools be specialized by language (we run dedicated worker pools for Arabic, Chinese, Persian, and Russian, each with language-specific models). If language detection happened downstream, the Kafka consumer would need to re-route messages between language partitions, which complicates the consumer offset tracking.

import fasttext

# Load once, share across all collector threads
_lid_model = fasttext.load_model("lid.176.ftz")

def detect_language(text: str) -> tuple[str, float]:
    if len(text.strip()) < 20:
        return "und", 0.0   # undetermined for very short text
    text_clean = text.replace("
", " ")[:500]
    labels, probs = _lid_model.predict(text_clean, k=1)
    lang = labels[0].replace("__label__", "")
    return lang, float(probs[0])

# Kafka partition key: first 2 bytes of SHA-256 of language code
# -> consistent hashing to language-specific partitions

FastText lid.176 runs at approximately 2.1μs per post on a single CPU core, adding negligible latency to the collection loop. At 670 posts/second (2.4M/hour), the language detection step uses about 0.15% of a single core. Posts with confidence below 0.6 go to an und (undetermined) partition rather than being assigned a guessed language; the NLP pipeline handles und with a language-agnostic model.

Kafka topic design

All collectors publish to a single posts.raw topic with 64 partitions, using language_code as the partition key. The 64-partition count supports 64 independent consumer instances; at peak load (election periods), we scale the NLP worker pool to 80 instances using the oversubscription headroom.

# Kafka producer configuration
producer = KafkaProducer(
    bootstrap_servers=KAFKA_BROKERS,
    value_serializer=lambda v: msgpack.packb(asdict(v), use_bin_type=True),
    key_serializer=lambda k: k.encode(),
    compression_type='lz4',        # ~35% size reduction at minimal CPU cost
    batch_size=32_768,             # 32KB batches
    linger_ms=10,                  # 10ms batching window
    acks='all',                    # all ISR replicas must ack
    retries=10,
    max_in_flight_requests_per_connection=5,
)

def publish_post(post: CanonicalPost):
    producer.send(
        topic='posts.raw',
        key=post.language,            # partition key
        value=post,
        headers=[
            ('platform', post.platform.encode()),
            ('collection_strategy', post.collection_strategy.encode()),
        ],
    )

Content deduplication at ingest

The same post often appears across multiple collection paths: a Telegram channel posts something that gets quoted on Twitter, then the original Telegram post and the Twitter quote both enter the pipeline. We deduplicate using a two-level strategy before publishing to Kafka.

First, exact-duplicate detection uses a Redis SET with the content_hash and a 24-hour TTL. Before publishing, each collector checks SISMEMBER content_hashes {content_hash} and skips if the hash is present. This catches verbatim reposts even across platforms.

Second, near-duplicate detection for coordinated content uses the MinHash LSH described in the processing article. This runs downstream rather than at ingest, because MinHash comparison requires the full corpus of recent hashes; the ingest pipeline only does exact deduplication. Near-duplicate clustering is a post-ingest NLP step that adds a coordination_cluster_id to matched posts in TimescaleDB.

Platform schema drift

The most operationally expensive part of maintaining 47 collectors is schema drift. Platforms change their RSS feed structure, add required OAuth scopes, modify API response formats, or simply go down. We detect schema drift by hashing the structure (field names and types, not values) of incoming API responses and alerting when the hash changes.

The alert goes to an on-call rotation that triages within 4 hours. Schema changes that affect the content, author_id, or timestamp fields cause the collector to enter circuit-breaker mode and stop publishing until a schema migration is deployed. Changes to auxiliary fields (engagement counts, media attachments) are logged but not circuit-breaking.


Related technical articles: