Sentiment Analysis of Tweets using Apache Pulsar

July 18, 2019

Jerry Peng

Apache Pulsar is often thought of as only a messaging platform, however it has far more capabilities than a traditional messaging platform and can be used for much more than just publishing and consuming messages. In fact, whole data pipelines can be built just using Pulsar.

For an overview of Apache Pulsar, please view https://streaml.io/blog/intro-to-pulsar

Imagine a use case in which I want to know what people are saying about a particular subject that I am interested in, such as Apache Pulsar, on social media. Not only am I interested in the volume of traffic on the subject of Pulsar, but also the overall sentiment on that topic. “Are people generally positive or negative on the subject” is equally, if not more important, to me as a contributor to Apache Pulsar.

The above use case involving quantifying people’s thoughts on Apache Pulsar can be implemented by using Apache Pulsar. By using Pulsar, we can ingest tweets from Twitter, analyze the tweets, and query the results at any time, all on top of Pulsar. The complete data pipeline for this use case is illustrated below in Figure 1.

Figure 1. Sentiment analysis pipeline
Figure 1. Sentiment analysis pipeline

In this blog, I will show you how to implement an end-to-end solution based entirely on Apache Pulsar that will

  1. Ingest tweets from Twitter into a Pulsar cluster by using a Pulsar IO connector
  2. Classify tweets based on sentiment by using Pulsar Functions
  3. Query results by using Pulsar SQL

I hope this blog will not only demonstrate some of the capabilities of Apache Pulsar to the reader but also inspire the reader with new ideas on how to use Apache Pulsar.

Overview

The data pipeline I am going present in this blog has several stages. In the first stage of our pipeline, I use a Pulsar IO source, Twitter FireHose, to ingest tweets and publish them to a topic.

In the next stage, I perform sentiment analysis on the tweets by having a Pulsar Function that consumes tweets from the tweet topic, classifies the tweets as positive, neutral, or negative, and then routes the tweets to respective topics based on their sentiment. In this stage, I took advantage of Pulsar Functions’ ability to perform stateful processing and support for distributed counters to keep running counts of how many negative, neutral, and negative we have encountered so far.

In the final stage of the pipeline, I am going to use Pulsar SQL to query the topics that contain negative, neutral, and positive tweets and see how many tweets mention Apache Pulsar in them. Table 1 provides a summary of the major stages of our pipelines as well as the features of Pulsar that the stage uses.

Stage Technologies used
Ingress Tweets from twitter Pulsar IO
Sentiment analysis of tweets Pulsar Functions
Query results using SQL Pulsar SQL

Ingest Tweets

Data can ingress or egress to and from a Pulsar cluster by running connectors, or as I would like to call them sources and sinks, built on the Pulsar IO framework. Many Pulsar IO sources and sinks are already implemented and ready for use so users can start connecting data to a Pulsar cluster without writing a single line of code. Tweets published to Twitter can be ingressed into a Pulsar cluster by running the Twitter FireHose source.

You can deploy the source to run in an existing Pulsar cluster (could be a Pulsar standalone cluster) like such:

$ ./bin/pulsar-admin source create \
  --name twitter \
  --source-type twitter \
  --destinationTopicName tweets \
  --source-config '{"consumerKey":<CONSUMER_KEY>,"consumerSecret":<CONSUMER_SECRET>,"token":<TOKEN>,"tokenSecret":<TOKEN_SECRET>}'

For more information on how to get started with a standalone cluster, please visit https://pulsar.apache.org/docs/en/standalone/

You can verify you are indeed getting the tweets by using the pulsar-client CLI tool to read from the tweets topic:

$ ./bin/pulsar-client consume -n 0 -r 0 -s test tweets

----- got message -----
{"id":1138573864626270208,"text":"@apache_pulsar is awesome","truncated":false,"user":{"id":1185419150,"name":"Jerry Peng","verified":false},"favorited":false,"retweeted":false,"lang":"en"}----- got message -----{"id":1138573864638853121,"text":"RT @apache_pulsar is the best","truncated":false,"user":{"id":24314279,"name":"Pulsar","location":"Palo Alto","verified":false},"favorited":false,"retweeted":false,"lang":"en"}

What is special about Pulsar IO is that running the connectors (sources or sinks) is taken care of for you by the framework. Other connector frameworks require the developer to figure out not only how to run but also where to run the connector. This can be quite an obstacle and headache for developers, especially when you take into account fault tolerance and scalability. With Pulsar IO, connectors are run in a fault tolerant manner and users can take advantage of capabilities such as scaling and stateful processing.

Sentiment Analysis on Tweets

A Pulsar Function can read from the topic that contains the tweets and then classify them based on their sentiment by using a pre-trained model. Because Pulsar Functions’ API is modeled after a generic function with input and output, using external libraries is incredibly straightforward. For the sentiment analysis Pulsar Function, I am using the Stanford CoreNLP library which comes with pre-trained models to classify tweets as positive, neutral, or negative.

Below is a snippet of the code for the sentiment analysis Pulsar Function.

public class SentimentAnalysisFunction implements Function<TweetData, Void>
{

  ...

  @Override
  public Void process(TweetData tweetData, Context context) throws Exception
  {
    if (isNotBlank(tweetData.getText())) {
      if (tweetData.getLang() != null && tweetData.getLang().equals("en")) {
        String cleanedTweet = cleanTweet(tweetData.getText());
        if (isNotBlank(cleanedTweet)) {
          int sentimentScore = findSentiment(cleanedTweet);

          // negative sentiment
          if (sentimentScore <= 0) {
            context.publish("negative_tweets", tweetData);
            context.incrCounter("negative_tweets", 1);
          } else if (sentimentScore >= 1 && sentimentScore <= 2) {
            // neutral
            context.publish("neutral_tweets", tweetData);
            context.incrCounter("neutral_tweets", 1);
          } else if (sentimentScore >= 3) {
            // positive
            context.publish("positive_tweets", tweetData);
            context.incrCounter("positive_tweets", 1);
          } else {
            log.error("Tweet: {} generator unexpected sentiment score: {}", tweetData.getText(), sentimentScore);
          }
        }
      }
    }
    return null;
  }

  ...

}

The “SentimentAnalysisFunction” implements the “Function” interface in which an input and output type is specified. The input type for this Function will be “TweetData” which contains all the fields of a tweet we receive from Twitter. The direct output or return type for this Function is “Void” because we need to generate multiple outputs to different topics.

In the first couple lines of the code, I have some conditional checks to make sure the text of the tweet is not blank and the language the tweet is written in is in English. I also perform some minor cleaning of the tweet to remove special characters.

  if (isNotBlank(tweetData.getText())) {
    if (tweetData.getLang() != null && tweetData.getLang().equals("en")) {
      String cleanedTweet = cleanTweet(tweetData.getText());

Next, I calculate a sentiment score from the text of the tweet. At the core, I am using the Stanford CoreNLP library to calculate this score.

  int sentimentScore = findSentiment(cleanedTweet);

After I have the sentiment score, I can determine the sentiment of the tweet. If the sentiment score is zero or lower, I classify the tweet as negative and route the tweet to the negative tweet topic.

  context.publish("negative_tweets", tweetData);

Since Pulsar Functions have support for distributed counters, I also increment a counter to keep track of the number of negative tweets so that I can query for the running count later.

  context.incrCounter("negative_tweets", 1);

For more information on Pulsar Function state, please view https://pulsar.apache.org/docs/en/functions-state/

I do the same for tweets classified as neutral, sentiment score between one and two inclusive, and tweets classified as positive, sentiment score of three or greater.

You can deploy the function as follows:

$ ./bin/pulsar-admin functions create \
  --classname com.streamlio.sentiment.SentimentAnalysisFunction \
  --name sentiment-analysis-function \
  --inputs tweets \
  --jar sentiment-analysis/target/pulsar-functions-demos-sentiment-analysis-1.0-SNAPSHOT.jar

In the sentiment analysis function, I incremented counters for negative, neutral, and positive tweets that have been classified. You can now query the counters from the CLI:

$ ./bin/pulsar-admin functions querystate \
  --name sentiment-analysis-function \
  --key positive_tweets{ "key": "positive_tweets", "numberValue": 5849, "version": 5848}

Querying Pulsar Function counters and state can also be done via REST.

If topic backlog for the tweet topic is growing, i.e. there are too many tweets for one Pulsar Function instance to classify, you can simply scale out the function by increasing the parallelism of the function, for example:

$ ./bin/pulsar-admin functions update \
  --name sentiment-analysis-function \
  --parallelism 2

For the complete code of the sentiment analysis Pulsar Function please visit: https://github.com/jerrypeng/pulsar-functions-demos/tree/master/sentiment-analysis.

As you can see, with only a few lines of code I can implement some very powerful features with Pulsar Functions. With Pulsar Functions much of the complexity of stream processing is removed by having a simple and intuitive API interface which most programmers are already very familiar with. Developers do not need to become specialized experts in the stream processing domain and learn a DSL (Domain Specific Language) to implement a stream processing application.

Query Results using Pulsar SQL

Since Pulsar has a built-in schema registry, data within topics can be structured and thus queryable via SQL. Thus, Pulsar SQL was created to enable users to query data stored in topics in an efficient manner. Users can query both data that just arrived as well as old data. That is a major differentiator between Pulsar SQL and other streaming/messaging platforms that offer a SQL interface. Other platforms offer “streaming SQL” which allows users to query data that has recently arrived, but not historical data. Also, since Pulsar has a two-layer architecture that separates serving from storage, Pulsar SQL can go directly to the storage layer to maximize read throughput.

At its core, Pulsar SQL is a connector that connects Presto, a distributed SQL query engine, to a Pulsar cluster. Queries are executed via Presto and the data is retrieved from the Pulsar cluster.

For more information on how to get started with Pulsar SQL, please visit https://pulsar.apache.org/docs/en/sql-getting-started/

You can now query the topics containing negative, neutral, and positive tweets. You can do some exploratory queries to see what positive, neutral, or negative tweets we have processed as follows:

presto> select * from pulsar."public/default".positive_tweets;

id                            | text                                                  | user.id     | user.name  
------------------------------+-------------------------------------------------------+-------------+----------------------  
1138589123491614720           | @apache_pulsar is awesome!                            | 4324343     | Jerry Peng          |   
1138589127673204736           | Sentiment analysis on @apache_pulsar                  | 4324343     | Jerry Peng          |  
1138589127702646784           | RT @apache_pulsar is next generation!                 | 2432433     | Apache Pulsar       |
...

We can do a simple count of all positive tweets.

presto> select count(*) from pulsar."public/default".positive_tweets;

 _col0
-------
  3836
(1 row)

For our main use case, let’s see how many tweets have mentioned Pulsar with positive, neutral or negative sentiment. You can execute the following query to get the results.

presto> SELECT ( SELECT COUNT(*) FROM pulsar."public/default".positive_tweets where text like '%pulsar%' ) AS positive, ( SELECT COUNT(*) FROM pulsar."public/default".neutral_tweets where text like '%pulsar%' ) AS neutral, ( SELECT COUNT(*) FROM pulsar."public/default".negative_tweets where text like '%pulsar%' ) AS negative;  


 positive | neutral | negative   
----------+---------+----------  
    3836  |   20240 |      53

Conclusion

I hope this blog has been insightful and you have learned about some of the cool features of Apache Pulsar. If you want to learn more about Pulsar, you are encouraged to read the blogs we have published on the Streamlio website (https://streaml.io/blog) and visit the official Apache Pulsar website (https://pulsar.apache.org).

If you have any questions about Pulsar, please feel free to contact us at Streamlio.

Also feel free to ask questions on the community mailing list or community Slack channel: https://pulsar.apache.org/en/contact/