Ingesting Twitter Data with Prefect
Jan. 25, 2022

This guide will demonstrate how to write a data flow that extracts and loads Twitter data into Snowflake.

James Sopkin
James SopkinSolutions Engineer

Social media’s importance has been growing exponentially. The wealth of available information on these platforms can be pivotal when building a business. It’s no wonder that companies are increasingly using social media to perform market research, engage with their audience, and build a brand image.

A powerful use-case for Prefect is building automated data pipelines to pull social media analytics data. This guide will demonstrate how to write a data flow that extracts and loads Twitter data into Snowflake. All data will be extracted via the free version of the Tweepy API, but if necessary, an enterprise account can afford the API even greater functionality.

Why use Prefect for a job like this? Prefect provides a data orchestration tool that automates tasks and neatly packages together features such as secret management, the ability to run flows from other machines, and a SaaS platform to view and manage workflow. Additionally, Prefect has a built-in Twitter Task.

First we’ll import the modules we need for the solution. Notice that Prefect provides the ability to map functions, handle secrets, and has a built-in Twitter task.

Before any data can be accessed through Tweepy, either OAuth 1a or OAuth 2 authentication must be completed. OAuth 2(application authentication) only grants read only access to public info, whereas OAuth 1a grants access to additional fields.

Twitter uses a fairly simple OAuth 1a authorization process. Our first task will be Authentication. In order to hide client secrets, they can be stored either locally or on Prefect Cloud and accessed with Prefect client secrets.

The API object will be used in downstream tasks to hit various Tweepy endpoints.

After authentication, user metrics and tweet data can be pulled. The example code below creates a list of a user’s tweets and makes a Pandas DataFrame with that tweet data:

Note - Specifying an nout allows for us to return more than one value. Although optional, in certain cases it may be helpful because it allows us to leverage convenient Python syntax to unpack multiple return values when we construct our flow. In this case, the DataFrame, along with a series containing all of the tweet IDs to be used in the next task to retrieve Twitter replies will be returned. Simply return the DataFrame if replies are not needed.

Prefect has a built in Twitter task- LoadTweetReplies- that can pull replies to a specific tweet. Since the task is already integrated with Prefect’s engine, there’s no need to write any code, just run the task and specify the Twitter USERNAME and TWEET_ID . If replies are needed for multiple tweets, one possible solution is to map the the task and pass in the tweet IDs.

Keep in mind that since replies are not a supported endpoint in the Tweepy API, replies are retrieved through the API search function which can only look back seven days unless the Twitter developer account is upgraded. Tweets that fall outside of that scope will not be retrieved (this seven day restriction does NOT apply to the the other metrics pulled from the Tweepy user timeline in the get-all-tweets task).

If it’s absolutely imperative that all replies are retrieved, Tweepy has premium search functions that can look back 30 days or full archive search that can look as far back as March 2006, but for the sake of this guide, only free-to-use options will be demonstrated.

The data can now be loaded into its final destination. The following code demonstrates loading a DataFrame into Snowflake. Data is staged in a temporary table before being inserted into the table.

In the example above, the name is passed in as a variable which will be declared in the flow context. For example, “TWEETS” is passed in as the name and the “tweet_table” is passed as df. A temporary table with the same structure as the main table will be created. “df.to_sql” will overwrite the temp table with the “tweet_table,” and then that table will be inserted into the main table. As a word of caution, if the SQL commands in the snowflake_load task are modified to take user input or query table data, there may be a risk of SQL injection attacks and it would be best practice to bind and/or filter any parameters.

The final step is to run the flow which would run each of the tasks listed. In the above example, I added a task to build a table for replies to tweets called build_replies_table. The auth task will run, returning a connection object. The downstream tasks that grab the tweets and the replies would run, then the snowflake_load tasks would load the data. After the data is loaded into the destination of your choice, this is where you have the freedom to manipulate and use it. Whether that’s building dashboards in Tableau to gather social media insights or even building NLP models to run trend analysis. Perhaps you wish to load your data into an AWS S3 bucket and utilize Lambda functions to sanitize it. Additionally, storage and schedules can be configured to run this flow from, say, a docker container and run once every four hours. There’s many ways to customize Prefect flows to accomplish your goals.

Happy engineering!

Posted on Jan 25, 2022
Blog Post

Love your workflows again

Orchestrate your stack to gain confidence in your data