Datetimes with Python

December 12, 2018

I decided to take the course 100DaysofCode with Python to improve my coding skills in this language. The first lesson was on datetime and it has been quite useful already. It helped me improve some code I worked on a few months ago. Since one the rules of the course is to share the progress I decided to make this post.

I work modeling spatiotemporal data, but as everybody who uses real data knows, some preprocessing is needed before we can plug it into a modeling pipeline. One of the common things I have to go around is matching the time frequency in which the data is reported and the time frequency in which the analysis is required. For example, sometimes I receive daily data, but I have to model it weekly or monthly. Monthly could be natural months or 4-week periods. Subtle differences that require attention and more time than the one I would like to spend on them. So I created a class called Timeframe to define the temporal frame in which my models work. The objective of this class is to allocate the data recieved into the time knots (periods) needed in my analysis. Maybe there is already some Pandas functionality or something else that handles this issue automatically, but I am not aware. I coded this my way.

from datetime import timedelta, datetime
import calendar
import pandas as pd
import numpy as np
import warnings

class Timeframe:

    def __init__(self, start, length, by='day', step=1, end=None, directive='%Y-%m-%d'):
        '''
        Define a timeframe for a model.

        :param start: Initial date of the timeframe.
                      Date encoded as: integer, float, string or datetime.
                      Can be a None value if end is specified.
        :param length: Length of the timeframe.
                       Positive integer.
        :param by: Units of time in which the frame is measured.
                   String, one of 'day', 'month', 'year'.
        :param step: Number of units per time point.
                     Positive integer.
        :param end: Last date of the timeframe (optional and only used if start is None).
                    Date encoded as: integer, float, string or datetime.
        :param directive: Date format representation (default '%Y-%m-%d').
                          String.
        '''
        assert isinstance(length, int) and length > 0, 'length must be a positive integer.'
        assert isinstance(step, int) and step > 0, 'step must be a positive integer.'
        assert isinstance(by, str), 'by has to be a str object.'
        if start is not None:
            start = datetime.strptime(start, directive)
            if by == 'day':
                diff_ = length * step
                init_date = [start + timedelta(days=i) for i in range(0, diff_, step)]
                end = init_date[-1] + timedelta(days=step - 1)
            elif by == 'month':
                diff_ = (1 + length) * step
                year_ = np.array([start.year + int((start.month + i)/12) for i in range(0, diff_, step)])
                month_ = np.array([(start.month + i) % 12 for i in range(0, diff_, step)])
                month_[month_ == 0] = 12
                day_ = np.array([min(start.day, calendar.monthrange(i, j)[1]) for i,j in zip(year_, month_)])
                init_date = [start.replace(year=i, month=j, day=k) for i,j,k in zip(year_, month_, day_)]
                end = init_date[-1] - timedelta(days=1)
                init_date = init_date[:-1]
            elif by == 'year':
                diff_ = (1 + length) * step
                init_date = [start.replace(year=start.year + i) for i in range(0, diff_, step)]
                end = init_date[-1] - timedelta(days=1)
                init_date = init_date[:-1]
            else:
                raise ValueError('by parameter was not understood.')
        else:
            assert end is not None, 'end date has to be specified if start date is None'
            end = datetime.strptime(end, directive)
            if by == 'day':
                diff_ = length * step
                e_ = end - timedelta(days=step - 1)
                init_date = [e_ - timedelta(days=i) for i in range(0, diff_, step)][::-1]
            elif by == 'month':
                diff_ = (1 + length) * step
                e_ = end + timedelta(days=1)
                year_ = np.array([e_.year - int((e_.month + i)/12) for i in range(0, diff_, step)[::-1]])
                month_ = np.array([(e_.month - i) % 12 for i in range(0, diff_, step)[::-1]])
                month_[month_ == 0] = 12
                day_ = np.array([min(e_.day, calendar.monthrange(i, j)[1]) for i,j in zip(year_, month_)])
                init_date = [e_.replace(year=i, month=j, day=k) for i,j,k in zip(year_, month_, day_)]
                init_date = init_date[:-1]
            elif by == 'year':
                diff_ = (1 + length) * step
                e_ = end + timedelta(days=1)
                init_date = [e_.replace(year=e_.year - i) for i in range(0, diff_, step)[::-1]]
                init_date = init_date[:-1]
            else:
                raise ValueError('by parameter was not understood.')
        # Store values
        self.start = init_date[0]
        self.end = end
        self.by = by
        self.length = len(init_date)
        self.knots_info = pd.DataFrame({'knot': np.arange(self.length),
                                        'init_date': init_date,
                                        'tag': [datetime.strftime(i, directive) for i in init_date]})


    def which_knot(self, x):
        '''
        Identify the knot (point within the timeframe) to which a date x belongs.

        :param x: Array of dates.
                  Numpy array, shape [n, ]. Elements may be of type: integer, float, string, datetime.
        :return: Array with knots per date (-1 for dates outside the timeframe).
                 Numpy array of integers.
        '''
        _x = pd.to_datetime(x)
        knots = np.repeat(None, _x.size)
        for a,b,k in zip(self.knots_info.init_date[:-1], self.knots_info.init_date[1:],
                         self.knots_info.knot[:-1]):
            knots[np.logical_and(a <= _x, _x < b)] = int(k)
        knots[np.logical_and(b <= _x, _x <= self.end)] = int(k + 1)

        return np.array([-1 if ki is None else ki for ki in knots])

This is how it works. Suppose we have daily reports of the number of traffic tickets in some area. Let's make some toy data.

>>> dform = '%Y-%m-%d'
>>> start = datetime.strptime('2018-01-01', dform)
>>> n_days = 150
>>> tickets_data = pd.DataFrame({'date': np.array([datetime.strftime(start + timedelta(days=i), dform) for i in range(n_days)]),
                             'num_tickets': np.random.randint(0, 20, n_days)})
>>> tickets_data.head()
        date num_tickets
0 2018-01-01           7
1 2018-01-02          11
2 2018-01-03           7
3 2018-01-04          17
4 2018-01-05           1

Now, imagine that because of "reasons" we want to aggregate the data into periods of 3 weeks starting on '2018-01-02' and we only want 5 periods. The data that does not fit into this time window can be ignored. This is how we define the timeframe:


>>> tf = Timeframe(start='2018-01-02', length=5, by='day', step=21, directive='%Y-%m-%d') #Timeframe measured in steps of 21 days
>>> tf.knots.info
   init_date knot        tag
0 2018-01-02    0 2018-01-02
1 2018-01-23    1 2018-01-23
2 2018-02-13    2 2018-02-13
3 2018-03-06    3 2018-03-06
4 2018-03-27    4 2018-03-27

If you are wondering what the difference is between init_date and tag, here it is:

>>> type(tf.knots_info['init_date'][0])
pandas._libs.tslib.Timestamp
>>> type(tf.knots_info['tag'][0])
str

The object tf also tell us the start and end dates of timeframe.

>>> tf.start, tf.end
(datetime.datetime(2018, 1, 2, 0, 0), datetime.datetime(2018, 4, 16, 0, 0))

It also identifies in which knot each of the data records fall.

>>> ix = tf.which_knot(np.array(tickets_data['date']))
>>> ix
array([-1,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0,  0, 0,  0,
        0,  0,  0,  0,  0,  1,  1,  1,  1,  1,  1,  1,  1,  1,  1, 1,  1,
        1,  1,  1,  1,  1,  1,  1,  1,  1,  2,  2,  2,  2,  2,  2,  2,  2,
        2,  2,  2,  2,  2,  2,  2,  2,  2,  2,  2,  2,  2,  3,  3,  3,  3,
        3,  3,  3,  3,  3,  3,  3,  3,  3,  3,  3,  3,  3,  3,  3,  3,  3,
        4,  4,  4,  4,  4,  4,  4,  4,  4,  4,  4,  4,  4,  4,  4,  4,  4,
        4,  4,  4,  4, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
       -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1,
       -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1])

The value -1 corresponds to dates that are outside the window of interest. Using this information per record we can easily aggregate them or compute other statistics for each time step. Here I will aggregate them.

> tickets_data = tickets_data.loc[ix>=0] #Remove data outside the period of inerest
> tickets_data['tag'] = np.array(tf.knots_info['tag'])[ix[ix>=0]] #Add tag to the dataframe
> tickets_data[['tag', 'num_tickets']].groupby('tag').sum() #Aggregate data, grouping by tag
           num_tickets
tag
2018-01-02         200
2018-01-23         241
2018-02-13         183
2018-03-06         177
2018-03-27         168

Well that's it. I'm sure there are many ways to improve this code, but so far it has been useful enough.