I was chatting with @dff of @tidelift at the NumFocus summit last year, and he suggested classifying @pwang's tweets. For those who don't know, @pwang tweets alot, and the content is quite good - but it spans a variety of subjects. If you want to hear @pwang tweet about tech, but not about politics, it can be a problem. So we're going to collect @pwang tweets, do some topic modeling, and then make some bots!
You won't be able to run this notebook - You need your own twtiter developer api credentials, but if you do that, you can inject those below. We will use the twitter API (with the tweepy library) to download tweets, and then store them in S3.
Saturn manages S3 permissions and credentials for users. If you want to run this notebook - you should create your own S3 bucket and credentials. For writing to S3, we are using the s3fs Library which presents a very convenient file system oriented Python interface
import tweepy from os.path import join import s3fs import json with open("/home/jovyan/twitter-creds.json") as f: creds = json.load(f)['hhhuuugggooo'] auth = tweepy.OAuthHandler(creds['consumer_key'], creds['consumer_secret']) auth.set_access_token(creds['access_token'], creds['access_token_secret']) api = tweepy.API(auth, wait_on_rate_limit=True, timeout=120) twitter_username = 'pwang'
fs = s3fs.S3FileSystem() root = 'saturn-cloud-data/hugo/pwang-bot-data'
We use the
api.user_timeline method to retrieve tweets. The
user_timeline method takes a
count as parameters.
count is the number of
status objects returned, We are setting
count to 20.
page 1 returns the first 20 result,
page 2 returns results 20-40, and so on.
The call to
api.user_timeline returns a list of
Status objects. We iterate over them and store the raw
json representation to S3, keyed off of the
id of the status update.
count = 20 def write2(page): data = api.user_timeline(twitter_username, page=page, count=count) for status in data: with fs.open(join(root, str(status.id)), "w") as f: f.write(json.dumps(status._json)) return len(data)
We're going to use a local Dask cluster to collect tweets in parallel. The Twitter API for querying a User
from dask.distributed import Client from distributed import LocalCluster cluster = LocalCluster(n_workers=2, threads_per_worker=10)
c = Client(cluster)
Twitter only stores 3200 tweets. Since we are processing 20 tweets per page, querying 161 pages should be more than enough to get all tweets that Twitter is storing.
fut = c.map(write2, range(161)) counts = c.gather(fut) tweets = sum(counts) print('we collected %s tweets', tweets)
we collected %s tweets 3215