Skip to content

Running the demo

The demo is about getting real-time sentiment analysis done over recent temporal windows (e.g last 5 minutes) for tweets about a certain word. The focus is not on the quality of the sentiment analysis per se, as it's not the goal of the project. A model from NLTK ("Vader") is used.

image

To run the demo these steps are needed:

  1. Deploy a Kafka/Hadoop/Spark infrastructure.
  2. Create a Kafka topic for incoming data and another one for the final results of the sentiment analysis.
  3. Activate Twitter producer to generate the data.
  4. Submit the Spark streaming job.
    • Optionally launch dashboard to view results in a graphical representation.
  5. Submit the Spark ingestion into HDFS job.
  6. Launch PySpark interactive shell to do something with the ingested data.

Each point is described in the following sections.

Infrastructure deployment

Refer to previous chapter and the following sections, which describe related details specific to this demo.

Naming assumptions

As seen in Hadoop deployment, "master" Hadoop node is assumed to have container name and hostname nn1. The components of this demo also assume that at least a Kafka broker has kafka1 as container name/hostname.

Cluster deployment

Demo has been tested on AWS using 9 t2.medium instances:

  • 1 Kafka broker
  • 1 Hadoop "master" node
  • 6 Hadoop "slave" nodes
  • 1 Spark node

Yarn is already configured to have 3 queues: one (for streaming job) using up to 50% of resources, and the other 25% each.

Local deployment

Given the resource requirements, a local deployment of the entire demo is unrecommended for average PC. Refer to Hadoop configuration section to switch to a single Yarn queue using all resources. At that point run just one of the jobs (streaming, ingestion, interactive shell) at a time. A single Kafka broker and 3 Hadoop nodes should suffice.

Creating Kafka topics

Two topics are needed: one for handling the incoming tweets, and another to handle the results of the sentiment analysis streaming process.

One node with one Kafka broker is sufficient to run the demo. It's suggested to split the topics into multiple "partitions" as shown here, this allows parallel data consumption from Spark workers (see Kafka and Spark docs in case).

Naming

The name of (at least one) Kafka container is assumed to be kafka1 by demo components.

Create the two topics:

kafka-topics.sh --bootstrap-server localhost:9093 --replication-factor 1 --partitions 6 --create --topic tweets
kafka-topics.sh --bootstrap-server localhost:9093 --replication-factor 1 --partitions 6 --create --topic sentiment_scores

Activating Twitter source

This is the component that fetches tweets about a particular word and sends them to a Kafka topic. It requires developer API credentials from Twitter (registration is needed).

It's simply a Python3 script with a couple of module dependencies. A Docker image can be found at carmineds/twitter_producer. To build the image (not needed), follow the image build section.

It can be deployed on an ad-hoc node, or in an existing one (e.g same machine as Spark or an Hadoop "slave")

Run the container with (choose the appropriate network):

sudo docker run -it --net swarmnet carmineds/twitter_producer

Once the container is running, insert credentials in configuration file keys.ini (nano editor can be used for this).

Choose a word with at least a bit of popularity on Twitter, e.g. Trump. To see if the Twitter setup works try:

python3 twitter_printer.py trump

Tweets should be printed on console. Exit with Ctrl-C.

Finally run:

python3 twitter_producer.py kafka1:9093 tweets trump

Tweets are sent to the tweets topic on broker kafka1:9093

To see if it works try (on a Kafka broker):

kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic tweets

Running streaming job

This is the core part of the demo. A Spark (Structured Streaming) job that reads streaming (tweets) data from a Kafka topic and produces average sentiment analysis scores over temporal windows, serving it all on another Kafka topic.

Frequency and length of temporal windows can be changed in the script, but an increase in demand (higher frequency and/or larger time windows) could ramp up the resources needed. Also different quantities of data affect this. For more see memory configuration appendix, Structured Streaming section and Spark documentation.

(Cluster deploy only) Copy files to Spark node, e.g. with scp:

#"spark_node" IP configured in /etc/hosts
scp -r spark_scripts/ ubuntu@spark_node:

Copy files from host to the container:

sudo docker cp spark_scripts/ spark:SCRIPTS

Access to the container and the just created directory. Submit the streaming Spark job with:

spark-submit --master yarn --deploy-mode cluster --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1 --py-files dep.zip twitter_nltk_append.py

The zip file contains Python dependencies (NLTK)1.

Textual results can be seen on the sentiment_scores Kafka topic. On a Kafka container run:

kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic sentiment_scores

Data visualization (dashboard)

To view the results in a graphical representation, this component can be used. A Docker image can be found at carmineds/flask_chart. To build the image (not needed), follow the image build section.

It can be deployed on an ad-hoc node, or on an existing one (e.g same machine as Spark or an Hadoop "slave").

Run:

sudo docker run -it --name flask_chart -p 80:5000 --net swarmnet -d carmineds/flask_chart

The graph can be accessed by browser on port 80. In a cluster situation, be sure to open port 80/tcp towards the client public IP (for AWS see the port forwarding section).

Ingesting data to HDFS

The script for this is in the same folder as the script for the streaming job on the Spark instance.

To submit this job the other spark-submit can be closed locally with Ctrl-C (it continues to run). As an alternative, another bash instance can be opened.

Run:

spark-submit --master yarn --deploy-mode cluster --queue second --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1 kafka_hdfs_sink.py

Note that a different queue with respect to the one used by the streaming job is used.

Size of the ingested data can be seen at any moment executing this in the Hadoop "master" container nn1:

hdfs dfs -du -h -s /kafka_data.parquet

Interactive shell (batch job on ingested data)

Once some data is ingested, using a third Yarn queue, another job can be submitted to do something on it.

Open an interactive PySpark shell (on Spark container) with:

pyspark --master yarn --queue third

Once it's initialized, to count the number of ingested tweets run:

df = spark.read.parquet('/kafka_data.parquet')
df.count()

Here is a sligthly more advanced script to try:

# Top 20 words (including common ones) in ingested tweets
from pyspark.sql.types import *
from pyspark.sql.functions import *


def ascii_ignore(x):
    return x.encode('ascii', 'ignore').decode('ascii')

ascii_udf = udf(ascii_ignore)

tweets = spark.read.parquet('/kafka_data.parquet')
stweets = tweets.select(tweets.value.cast(StringType()))
words = stweets.select(explode(split(stweets.value, '\s+')).alias('word'))
filtered_words = words.select(ascii_udf(words.word).alias('word'))

filtered_words.groupBy(filtered_words.word).count().orderBy(col("count"), ascending=False).limit(20).show()

Appendix: Spark Structured Streaming

Structured Streaming is the new Spark standard for streaming data. Based on DataFrame/Dataset API (as most Spark layers now in version 3), it allows to handle the incoming stream of data as it was a fixed size Dataframe, so the difference between writing a batch job and a streaming job has been reduced to a minimum.

Under the hood it works with micro-batches, with a delay as small as ~100ms (but more recent Continous Processing mode allows much smaller delays). It offers exactly-once semantics, with checkpointing (to resume processing in case of interruptions) and handling of late data (watermarking).

The processing state is handled automatically, so, working with aggregations, just the useful data is retained over time.

There are 3 ways to "sink" the data:

  • Complete mode: At each trigger/micro-batch all the output aggregations since the beginning of the job are written.
  • Append mode: Results for a particular window are written once, when it's finished (and the late data wait time has passed).
  • Update mode: At each trigger, just time window aggregations for which new data has appeared are written to output sink.

For more refer to the official Spark documentation.

Appendix: Spark memory configuration

To increase memory used by Spark executors (running in Yarn containers), driver-memory and executor-memory options can be used.

To dedicate the whole node assigned resources to the driver, in case of a Yarn configuration for which maximum memory for containers is 3072m, add the option in this way:

spark-submit ... --driver-memory 2688m ...

E.g. for demo streaming job:

spark-submit --master yarn --deploy-mode cluster --queue second --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1 --driver-memory 2688m kafka_hdfs_sink.py

2688m is calculated as 3072m (maximum capacity configured for Yarn NodeManagers) minus 384m (fixed Spark memory overhead). For more refer to Spark and Hadoop docs.


  1. https://stackoverflow.com/a/39777410