pwang-bot

by sebastianmetti

pwangbot/Data Ingestion.ipynb

Introducing pwangbot

I was chatting with @dff of @tidelift at the NumFocus summit last year, and he suggested classifying @pwang's tweets. For those who don't know, @pwang tweets alot, and the content is quite good - but it spans a variety of subjects. If you want to hear @pwang tweet about tech, but not about politics, it can be a problem. So we're going to collect @pwang tweets, do some topic modeling, and then make some bots!

Downloading Tweets

You won't be able to run this notebook - You need your own twtiter developer api credentials, but if you do that, you can inject those below. We will use the twitter API (with the tweepy library) to download tweets, and then store them in S3.

Saturn manages S3 permissions and credentials for users. If you want to run this notebook - you should create your own S3 bucket and credentials. For writing to S3, we are using the s3fs Library which presents a very convenient file system oriented Python interface

import tweepy
from os.path import join
import s3fs
import json
with open("/home/jovyan/twitter-creds.json") as f:
    creds = json.load(f)['hhhuuugggooo']
auth = tweepy.OAuthHandler(creds['consumer_key'], creds['consumer_secret'])
auth.set_access_token(creds['access_token'], creds['access_token_secret'])
api = tweepy.API(auth, wait_on_rate_limit=True, timeout=120)
twitter_username = 'pwang'
fs = s3fs.S3FileSystem()
root = 'saturn-cloud-data/hugo/pwang-bot-data'

We use the api.user_timeline method to retrieve tweets. The user_timeline method takes a page, and count as parameters. count is the number of status objects returned, We are setting count to 20. page 1 returns the first 20 result, page 2 returns results 20-40, and so on.

The call to api.user_timeline returns a list of Status objects. We iterate over them and store the raw json representation to S3, keyed off of the id of the status update.

count = 20

def write2(page):
    data = api.user_timeline(twitter_username, page=page, count=count)
    for status in data:
        with fs.open(join(root, str(status.id)), "w") as f:
            f.write(json.dumps(status._json))
    return len(data)

Dask

We're going to use a local Dask cluster to collect tweets in parallel. The Twitter API for querying a User

from dask.distributed import Client
from distributed import LocalCluster
cluster = LocalCluster(n_workers=2, threads_per_worker=10)
c = Client(cluster)

Twitter only stores 3200 tweets. Since we are processing 20 tweets per page, querying 161 pages should be more than enough to get all tweets that Twitter is storing.

fut = c.map(write2, range(161))
counts = c.gather(fut)
tweets = sum(counts)
print('we collected %s tweets', tweets)
we collected %s tweets 3215
len(fs.ls(root))
3319