Note
Academic project (Master's program). Not production‑hardened. Expect rough edges.
This project implements an end‑to‑end, near real‑time data pipeline that ingests raw US election tweets, enriches them with sentiment scores using Stanford CoreNLP, stores them in MongoDB, republishes processed results to Kafka, and streams them live to a web UI for interactive visualization (distribution, temporal evolution, filtering, and per‑tweet inspection).
%% Election-Tweets Real-time Sentiment Pipeline
flowchart TD
A[Kaggle Dataset / JSON Files] --> B[PySpark / pandas preprocessing]
B --> C[tweet_producer.py]
C -->|raw tweets| K1[(Kafka\ntopic: election-tweets)]
subgraph ST [Apache Storm Topology]
direction LR
SP[KafkaSpout] --> JP[JsonParsingBolt]
JP --> M1[MongoBolt<br>raw persistence]
M1 --> SB[SentimentBolt<br>CoreNLP analysis]
SB --> M2[MongoBolt<br>update sentiment]
M2 --> KB[KafkaBolt<br>emit processed]
end
K1 --> ST
KB --> K2[(Kafka\ntopic: election-tweets-processed)]
K2 --> WS[WebSocket API<br>Node.js]
WS -->|event: tweet_sentiment| FE[React + Vite Frontend<br>Charts & Lists]
| Layer | Technology | Purpose |
|---|---|---|
| Ingestion | Python (PySpark) | Reads & shuffles dataset; streams rows to Kafka |
| Messaging | Apache Kafka | Backbone for raw & processed tweet streams |
| Processing | Apache Storm | Parallel JSON parsing, persistence & NLP sentiment scoring |
| NLP | Stanford CoreNLP (Java) | Sentence‑level sentiment + numeric score 0‑4 |
| Storage | MongoDB Atlas (or local) | Raw + sentiment‑enriched tweet documents |
| Streaming API | Node.js (Express + socket.io) | Bridges Kafka → WebSockets |
| Frontend | React + Vite + Tailwind + Recharts | Real‑time visual dashboards |
data/tweet_producer.pyloads two candidate subsets (Biden / Trump), tags them, randomizes, and sends JSON messages to Kafka topicelection-tweets.- Storm topology (
TwitterStormTopology):
KafkaSpoutconsumes raw messages.JsonParsingBoltextracts configured fields.- First
MongoBoltinserts the raw structured tweet. SentimentBoltcleans text, computes sentiment label + score using CoreNLP, emits JSON.- Second
MongoBoltupdates the existing MongoDB document with sentiment fields. KafkaBoltpublishes final enriched JSON toelection-tweets-processed.
- WebSocket server (
api/server.jsorapi/server.py) consumes processed topic and emitstweet_sentimentevents to clients. - Frontend (
web/) connects via Socket.IO, maintains in‑memory dataset, derives:
- Distribution pie chart
- Sentiment over time line chart
- Candidate comparison / filters
- Live tweet list with badges & scores
sentiment: "Positive:0.75" (label + normalized score 0–1 truncated to 2 decimals).
sentiment_score: Integer 0‑4 (Very Negative → Very Positive). Both are stored / emitted.
{
"tweet_id": "1321234567890",
"tweet": "Infrastructure plan could create jobs!",
"user_name": "Jane Doe",
"user_handle": "janed",
"createdAt": "2020-10-29T14:31:00Z",
"processed_at": 1733965223123,
"sentiment": "Positive:0.82",
"sentiment_score": 4,
"candidate": "biden"
}api/ # WebSocket bridges (Node.js + optional)
data/ # Dataset processing & Kafka producer scripts
src/main/java/... # Storm topology & bolts (Kafka ↔ MongoDB ↔ CoreNLP)
web/ # React + Vite client (real-time dashboards)
Install / provision locally (or via containers):
- Java 11+ & Maven (for Storm & CoreNLP components)
- Apache Kafka & Zookeeper running on localhost:9092
- MongoDB Atlas cluster or local MongoDB instance
- Python 3.9+ (pandas, kafka-python, pyspark if preprocessing)
- Node.js 18+ / pnpm (or npm) for frontend
- Sufficient RAM (CoreNLP can be memory hungry)
Update credentials & brokers:
MongoBolt.java: Replace the hardcodedconnectionStringwith an environment variable (recommended). Example placeholder:MONGODB_URI.- Kafka broker list (currently
localhost:9092) can be externalized similarly.
MONGODB_URI=mongodb+srv://<user>:<pass>@cluster0.example.mongodb.net/?retryWrites=true&w=majority
KAFKA_BROKERS=localhost:9092
RAW_TOPIC=election-tweets
PROCESSED_TOPIC=election-tweets-processed
- Clone & enter repository.
- Start Kafka & Zookeeper.
- Create topics (if auto‑create disabled):
kafka-topics --create --topic election-tweets --partitions 1 --replication-factor 1 --bootstrap-server localhost:9092
kafka-topics --create --topic election-tweets-processed --partitions 1 --replication-factor 1 --bootstrap-server localhost:9092- (Optional) Preprocess raw CSV with Spark: run
data/data_processor.pyor adapt for both candidates. - Start the Python tweet producer:
pip install -r data/requirements.txt # (create this if missing: pandas kafka-python)
python data/tweet_producer.py- Build & run the Storm topology (local mode):
mvn -q -DskipTests package
java -cp target/tweets-sentiment-analysis-1.0-SNAPSHOT.jar org.octopustech.TwitterStormTopology- Start WebSocket bridge:
cd api
npm install
node server.js- Run frontend:
cd web
pnpm install # or npm install
pnpm dev # or npm run dev- Open the displayed Vite dev URL (default http://localhost:5173) and observe live updates.
- Produce a tiny synthetic tweet JSON manually into
election-tweetsand verify propagation to UI. - Check MongoDB collection
tweets_db.election_sentimentsfor inserted & updated documents. - Monitor Storm logs for processing latency & errors (memory warnings from CoreNLP).
- Remove hardcoded Mongo credentials; load from environment / secrets manager.
- Add input validation & rate limiting on WebSocket layer.
- Introduce schema versioning (e.g., Avro or JSON Schema with registry) for Kafka messages.
- Sanitise / anonymize user data where required by policy.
- Increase Kafka partitions for parallel ingestion (adjust Storm spout parallelism accordingly).
- Cache CoreNLP pipeline (already reused) but consider lighter sentiment models (e.g., DL-based) for higher throughput.
- Consider Flink / Kafka Streams if exactly-once semantics or windowing aggregations become needed.
- Batch MongoDB writes or switch to bulk API for higher ingest volume.
- Docker Compose orchestration (Kafka, Zookeeper, Mongo, Web, Storm Nimbus/Supervisors).
- Replace CoreNLP with a lightweight transformer (DistilBERT) via ONNX for speed.
- Add candidate comparison time‑series + geographic heatmap (if location resolved).
- Implement backend REST API for historical queries (pagination, filters).
- Add unit tests for text cleaning & sentiment mapping logic.
- Introduce CI workflow (build + lint + vulnerability scan).
- Sentiment only analyses first sentence (implementation detail in
SentimentBolt). - Cleaning removes all non‑alphabetic characters; may distort hashtag semantics.
- No retry/backoff logic for Mongo transient failures.
- Lack of schema evolution handling for Kafka messages.
US Election 2020 Tweets (Kaggle):
Kaggle Dataset Link
- Stanford CoreNLP for sentiment pipeline.
- Apache Storm & Kafka communities.
- Kaggle dataset contributors.
| Concern | Status |
|---|---|
| Externalized secrets | Needs improvement |
| Basic pipeline works locally | Yes (manual steps) |
| Real-time UI update | Via Socket.IO events |
| Persistence | MongoDB inserts + updates |
| Fault tolerance | Minimal (no retries/backpressure tuning) |
