########################################################################################################## ########################################################################################################## # Course: Using Digital Trace Data in the Social Sciences # Instructor: Andreas Jungherr, U Konstanz # July 18 - July 22, 2016, U Konstanz # Session 6: Loading Twitter Data Into a Database ########################################################################################################## ########################################################################################################## # For a detailed tutorial on how to collect and work with Twitter data in the social sciences see # Jürgens & Jungherr (2016a) pp.29-41. # Script Examples to illustrate the workings of the scripts provided in Jürgens & Jungherr (2016b). ###Further Reading: # Pascal Jürgens and Andreas Jungherr. 2016a. A Tutorial for Using Twitter Data in the Social Sciences: # Data Collection, Preparation, and Analysis. Social Science Research Network (SSRN). # doi: 10.2139/ssrn.2710146 # Pascal Jürgens and Andreas Jungherr. 2016b. twitterresearch [Computer software]. # Available at https://github.com/trifle/twitterresearch ########################################################################################################## ########################################################################################################## # In this session, we will learn how to load downloaded tweets into a database to allow for more # flexbible analyses later on. # First, let's point the command line to your working directory for this project cd "/Users/(...)/twitterresearch" #First, let's get some data. # Start iPython and collect all available tweets of a user. In our example case, we collect available # messages posted by US law professor Lawrence Lessig. ipython import examples examples.save_user_archive_to_database() # Now, you should find a new file in your working directory called "tweets.db". The file contains # all available tweets posted by Lawrence Lessig saved in database ready formatting. # Of course you are able to change the user or users you want to track by adapting the function # save_user_archive_to_database() in the file "examples.py" according to your interests. There you # can also change the name of the file into something to your liking. Doing so is advisable when # you adapt our scripts. # In a next step, we have to transform the file "tweets.db" into a SQLite database. We do so through # running our script "database.py" included in our "twitterresearch" script set. # Before we run the script, let's have a look at its workings in detail. ########################################################################################################## # database.py # We start by importing necessary modules import logging import datetime from dateutil import parser from pytz import utc import peewee from playhouse.fields import ManyToManyField # In a next step, we load the file "tweets.db" as a SQLite database to the object "db" # We will interact with SQLite directly from Python through the tool peewee which you # downloaded with other modules through our file "requirements.txt" # For more information on pewee see: # http://peewee.readthedocs.io/en/latest/index.html # For more information on SQLite see: # Grant Allen and Mike Owens. The Definitive Guide to SQLite. 2nd. New York, NY: Apress, 2010. # or: https://www.sqlite.org/docs.html db = peewee.SqliteDatabase("tweets.db", threadlocals=True) db.connect() # After connecting to the database, we have define fields in our database. # Or in other words, define database models. # Our script will take to most of the work for you but let's have a close # read through to understand better what happens here: # We start by setting a base model for the database class BaseModel(peewee.Model): """ Base model for setting the database to use """ class Meta: database = db # Now we use a selection of the fields provided by Twitter through its API # to create database fields that will allow us to directly query or summarize # the values contained in these field. # Here we define the field "Hashtag": class Hashtag(BaseModel): """ Hashtag model. """ tag = peewee.CharField(unique=True, primary_key=True) # Here we define the field "URL": class URL(BaseModel): """ URL model. """ url = peewee.CharField(unique=True, primary_key=True) # Here we define the field "User": class User(BaseModel): """ Twitter user model. Stores the user's unique ID as a primary key along with the username. """ id = peewee.BigIntegerField(unique=True, primary_key=True) username = peewee.CharField(null=True) def last_tweet(self): return Tweet.select().where(Tweet.user == self).order_by(Tweet.id.desc())[0] def first_tweet(self): return Tweet.select().where(Tweet.user == self).order_by(Tweet.id.asc())[0] # Here we define the field "Tweet": class Tweet(BaseModel): """ Tweet model. Stores the tweet's unique ID as a primary key along with the user, text and date. """ id = peewee.BigIntegerField(unique=True, primary_key=True) user = peewee.ForeignKeyField(User, related_name='tweets', index=True) text = peewee.TextField() date = peewee.DateTimeField(index=True) tags = ManyToManyField(Hashtag) urls = ManyToManyField(URL) mentions = ManyToManyField(User) reply_to_user = peewee.ForeignKeyField( User, null=True, index=True, related_name='replies') reply_to_tweet = peewee.BigIntegerField(null=True, index=True) retweet = peewee.ForeignKeyField( 'self', null=True, index=True, related_name='retweets') # For the detailed workings of these database models make sure to check out # SQLite's and peewee's documentaion: # http://peewee.readthedocs.io/en/latest/index.html # https://www.sqlite.org/docs.html # Now, we define a series of helper functions, that facilitate the loading # of the data to the database: def deduplicate_lowercase(l): """ Helper function that performs two things: - Converts everything in the list to lower case - Deduplicates the list by converting it into a set and back to a list """ lowercase = [e.lower() for e in l] deduplicated = list(set(lowercase)) return deduplicated def create_user_from_tweet(tweet): """ Function for creating a database entry for one user using the information contained within a tweet :param tweet: :type tweet: dictionary from a parsed tweet :returns: database user object """ user, created = User.get_or_create( id=tweet['user']['id'], defaults={'username': tweet['user']['screen_name']}, ) return user def create_hashtags_from_entities(entities): """ Attention: Casts tags into lower case! Function for creating database entries for hashtags using the information contained within entities :param entities: :type entities: dictionary from a parsed tweet's "entities" key :returns: list of database hashtag objects """ tags = [h["text"] for h in entities["hashtags"]] # Deduplicate tags since they may be used multiple times per tweet tags = deduplicate_lowercase(tags) db_tags = [] for h in tags: tag, created = Hashtag.get_or_create(tag=h) db_tags.append(tag) return db_tags def create_urls_from_entities(entities): """ Attention: Casts urls into lower case! Function for creating database entries for urls using the information contained within entities :param entities: :type entities: dictionary from a parsed tweet's "entities" key :returns: list of database url objects """ urls = [u["expanded_url"] for u in entities["urls"]] urls = deduplicate_lowercase(urls) db_urls = [] for u in urls: url, created = URL.get_or_create(url=u) db_urls.append(url) return db_urls def create_users_from_entities(entities): """ Function for creating database entries for users using the information contained within entities :param entities: :type entities: dictionary from a parsed tweet's "entities" key :returns: list of database user objects """ users = [(u["id"], u["screen_name"]) for u in entities["user_mentions"]] users = list(set(users)) db_users = [] for id, name in users: user, created = User.get_or_create( id=id, defaults={'username': name}, ) db_users.append(user) return db_users def create_tweet_from_dict(tweet, user=None): """ Function for creating a tweet and all related information as database entries from a dictionary (that's the result of parsed json) This does not do any deduplication, i.e. there is no check whether the tweet is already present in the database. If it is, there will be an UNIQUE CONSTRAINT exception. :param tweet: :type tweet: dictionary from a parsed tweet :returns: bool success """ # If the user isn't stored in the database yet, we # need to create it now so that tweets can reference her/him try: if not user: user = create_user_from_tweet(tweet) tags = create_hashtags_from_entities(tweet["entities"]) urls = create_urls_from_entities(tweet["entities"]) mentions = create_users_from_entities(tweet["entities"]) # Create new database entry for this tweet t = Tweet.create( id=tweet['id'], user=user, text=tweet['text'], # We are parsing Twitter's date format using a "magic" parser from the python-dateutil package # The resulting datetime object has timezone information attached. # However, since SQLite cannot store timezones, that information is stripped away. # If you use PostgreSQL instead, please refer to the DateTimeTZField in peewee # and remove the "strftime" call here date=parser.parse(tweet['created_at']).strftime( "%Y-%m-%d %H:%M:%S"), ) if tags: t.tags = tags if urls: t.urls = urls if mentions: t.mentions = mentions if tweet["in_reply_to_user_id"]: # Create a mock user dict so we can re-use create_user_from_tweet reply_to_user_dict = {"user": {'id': tweet['in_reply_to_user_id'], 'screen_name': tweet['in_reply_to_screen_name'], }} reply_to_user = create_user_from_tweet(reply_to_user_dict) t.reply_to_user = reply_to_user t.reply_to_tweet = tweet['in_reply_to_status_id'] if 'retweeted_status' in tweet: retweet = create_tweet_from_dict(tweet['retweeted_status']) t.retweet = retweet t.save() return t except peewee.IntegrityError as exc: logging.error(exc) return False # Next, these are a series of helper functions allowing us to create # summary statistics for information contained in our database field. def database_counts(): """ Generate counts for objects in the database. :returns: dictionary with counts """ return { "tweets": Tweet.select().count(), "hashtags": Hashtag.select().count(), "urls": URL.select().count(), "users": User.select().count(), } def mention_counts(start_date, stop_date): """ Perform an SQL query that returns users sorted by mention count. Users are returned as database objects in decreasing order. The mention count is available as ".count" attribute. """ # First we get the Table that sits between Tweets and Users mentions = Tweet.mentions.get_through_model() # The query - note the Count statement creating our count variable users = (User.select(User, peewee.fn.Count(mentions.id).alias('count')) # join in the intermediary table .join(mentions) # join in the tweets .join(Tweet, on=(mentions.tweet == Tweet.id)) # filter by date .where(Tweet.date >= to_utc(start_date), Tweet.date < to_utc(stop_date)) # group by user to eliminate duplicates .group_by(User) # sort by tweetcount .order_by( peewee.fn.Count(mentions.tweet).desc()) ) return users def url_counts(start_date, stop_date): """ Perform an SQL query that returns URLs sorted by mention count. URLs are returned as database objects in decreasing order. The mention count is available as ".count" attribute. """ urlmentions = Tweet.urls.get_through_model() urls = (URL.select(URL, peewee.fn.Count(urlmentions.id).alias('count')) .join(urlmentions) .join(Tweet, on=(urlmentions.tweet == Tweet.id)) .where(Tweet.date >= to_utc(start_date), Tweet.date < to_utc(stop_date)) .group_by(URL) .order_by(peewee.fn.Count(urlmentions.tweet).desc()) ) return urls def hashtag_counts(start_date, stop_date): """ Perform an SQL query that returns hashtags sorted by mention count. Hashtags are returned as database objects in decreasing order. The mention count is available as ".count" attribute. """ hashtagmentions = Tweet.tags.get_through_model() hashtags = (Hashtag.select(Hashtag, peewee.fn.Count(hashtagmentions.id).alias('count')) .join(hashtagmentions) .join(Tweet, on=(hashtagmentions.tweet == Tweet.id)) .where(Tweet.date >= to_utc(start_date), Tweet.date < to_utc(stop_date)) .group_by(Hashtag) .order_by(peewee.fn.Count(hashtagmentions.tweet).desc()) ) return hashtags def retweet_counts(start_date, stop_date, n=50): """ Find most retweeted users. Instead of performing a rather complex SQL query, we do this in more readable python code. It may take a few minutes to complete. It's possible to pass in a premade query that will be used as the baseline for retweet counts (for example in order to limit date ranges). """ from collections import Counter rt = Tweet.alias() rtu = User.alias() baseline = (Tweet.select(). where(Tweet.date >= to_utc(start_date), Tweet.date < to_utc(stop_date)) ) query = (baseline.select(Tweet.id, rt.id, rtu.id) .join(rt, on=(Tweet.retweet == rt.id)) .join(rtu, on=(rt.user == rtu.id)) ) c = Counter( (tweet.retweet.user.id for tweet in query) ) # We use an ordered dict for the results so that the top results # appear first from collections import OrderedDict results = OrderedDict() for k, v in c.most_common(n): results[User.get(id=k).username] = v return results # Finally, we have here a series of helper functions supporting us in querying the data # in our database: def tweetcount_per_user(): """ This function executes a query that: - joins the User and Tweet tables so we can reason about their Relationship - groups by User so we have one result per User - counts tweets per user and stores the result in the name "count" - orders users by descending tweet count The users objects are accessible when looping over the query, such as: for user in query: print("{0}: {1}".format(user.username, user.count)) Note that as always, this query can be augmented by appending further operations. """ tweet_ct = peewee.fn.Count(Tweet.id) query = (User .select(User, tweet_ct.alias('count')) .join(Tweet) .group_by(User) .order_by(tweet_ct.desc(), User.username)) return query def first_tweet(): """ Find the first Tweet by date """ return Tweet.select().order_by(Tweet.date.asc()).first() def last_tweet(): """ Find the last Tweet by date """ return Tweet.select().order_by(Tweet.date.desc()).first() def to_utc(dt): """ Helper function to return UTC-based datetime objects. If the input datetime object has any timezone information, it is converted to UTC. Otherwise, the datetime is taken as-is and only the timezone information UTC is added. """ if dt.tzinfo is None: logging.warning( "You supplied a naive date/time object. Timezone cannot be guessed and is assumed to be UTC. If your date/time is NOT UTC, you will get wrong results!") return utc.localize(dt) else: return utc.normalize(dt) def objects_by_interval(Obj, date_attr_name="date", interval="day", start_date=None, stop_date=None): """ General helper function that returns objects by date intervals, mainly useful for counting. WARNING: If used as-is with SQLite, all date/times in data and queries are UTC-based! If you want to use local time for queries, take note that it will be converted correctly ONLY if you supply the correct timezone information. In general, as long as you only use timezone-aware objects you should be safe. :param obj: :type obj: database model :param date_attr_name: :type date_attr_name: the name of the filed containing date/time information on the model obj, as strings :param interval: :type interval: day (default), hour, minute as string. :param start_date: :type start_date: date to start from as datetime object, defaults to first date found. :param stop_date: :type stop_date: date to stop on as datetime object, defaults to last date found. :returns: bool success """ # define intervals, then select the one given as a function argument interval = { "minute": datetime.timedelta(minutes=1), "hour": datetime.timedelta(hours=1), "day": datetime.timedelta(days=1), }.get(interval) date_field = getattr(Obj, date_attr_name) # Determine first object if no start_date given # Todo: Maybe prettify this humongous expression. start_date = start_date or MST.localize(datetime.datetime(2015, 10, 27, 0)) stop_date = stop_date or MST.localize(datetime.datetime(2015, 11, 2, 23, 59)) # If we wanted to use the first and last element instead of the pre-determined dates, # this would be the way to do it: # getattr(Obj.select().order_by(date_field.asc()).first(), date_attr_name) # getattr(Obj.select().order_by(date_field.desc()).first(), date_attr_name) # Ensure UTC times start_date = to_utc(start_date) stop_date = to_utc(stop_date) # This iteration code could be shorter, but using two dedicated variables # makes it more intuitive. interval_start = start_date interval_stop = start_date + interval # Notice that this loop stops BEFORE the interval extends beyond stop_date # This way, we may get intervals that do not reach stop_date, but on the other hand # we never get intervals that are not covered by the data. while interval_stop <= stop_date: query = Obj.select().where( date_field >= interval_start, date_field < interval_stop) # First yield the results, then step to the next interval yield ((interval_start, interval_stop), query) interval_start += interval interval_stop += interval # After setting up these basic database models and helper functions, we noe have # to actually set up the database tables. This needs to run at least once before using the db. try: db.create_tables([Hashtag, URL, User, Tweet, Tweet.tags.get_through_model( ), Tweet.urls.get_through_model(), Tweet.mentions.get_through_model()]) except Exception as exc: logging.debug( "Database setup failed, probably already present: {0}".format(exc)) ########################################################################################################## # After examining the database.py script you are ready to run it and load Lawrence Lessig's tweets in a # database. # You can either call the script from the command line directly by typing python database.py # If you rather want to call it from iPython open iPython and run the following command: ipython run database # After loading the database through SQLite let's check what we can you do now? # Let's examine the field "data" in the table "Tweet" Tweet.date # Let's count how many tweets there actually are in our database: Tweet.select().count() #Let's count how many users were mentioned in Lawrence Lessig's tweets User.select().count() #What were the tweets Lessig was mentioning Donald Trump? for tweet in Tweet.select().where(Tweet.text.contains("Trump")): print(tweet.text) #Or let's do the same more elegantly by defining a query as a variable: query = Tweet.select().where(Tweet.text.contains("Trump")) # This offers new an interesting ways to interact with the query: first_tweet = query.get() first_tweet.text ########################################################################################################## ########################################################################################################## # For more examples and advise how to prepare and analyze Twitter data please see the tutorial: # Pascal Jürgens and Andreas Jungherr. 2016a. A Tutorial for Using Twitter Data in the Social Sciences: # Data Collection, Preparation, and Analysis. Social Science Research Network (SSRN). # doi: 10.2139/ssrn.2710146 # If you run into trouble with or find any bugs in the code provided in Jürgens, Jungherr (2016b) please # report an issue in our GitHub repository https://github.com/trifle/twitterresearch ########################################################################################################## ##########################################################################################################