Source code for getml.data.star_schema

# Copyright 2022 The SQLNet Company GmbH
#
# This file is licensed under the Elastic License 2.0 (ELv2).
# Refer to the LICENSE.txt file in the root of the repository
# for details.
#


"""
Special container for star schemata.
"""

import copy
from inspect import cleandoc
from textwrap import indent

from .columns import FloatColumn, StringColumn
from .container import Container
from .data_frame import DataFrame
from .data_model import DataModel
from .relationship import many_to_many
from .view import View


[docs]class StarSchema: """ A StarSchema is a simplifying abstraction that can be used for machine learning problems that can be organized in a simple `star schema <https://en.wikipedia.org/wiki/Star_schema>`_. It unifies :class:`~getml.data.Container` and :class:`~getml.data.DataModel` thus abstracting away the need to differentiate between the concrete data and the abstract data model. The class is designed using `composition <https://en.wikipedia.org/wiki/Composition_over_inheritance>`_ - it *is* neither :class:`~getml.data.Container` nor :class:`~getml.data.DataModel`, but *has* both of them. This means that you can always fall back to the more flexible methods using :class:`~getml.data.Container` and :class:`~getml.data.DataModel` by directly accessing the attributes `container` and `data_model`. Args: population (:class:`~getml.DataFrame` or :class:`~getml.data.View`, optional): The population table defines the `statistical population <https://en.wikipedia.org/wiki/Statistical_population>`_ of the machine learning problem and contains the target variables. alias (str, optional): The alias to be used for the population table. This is required, if *population* is a :class:`~getml.data.View`. peripheral (dict, optional): The peripheral tables are joined onto *population* or other peripheral tables. Note that you can also pass them using :meth:`~getml.data.StarSchema.join`. split (:class:`~getml.data.columns.StringColumn` or :class:`~getml.data.columns.StringColumnView`, optional): Contains information on how you want to split *population* into different :class:`~getml.data.Subset` s. Also refer to :mod:`~getml.data.split`. deep_copy (bool, optional): Whether you want to create deep copies or your tables. train (:class:`~getml.DataFrame` or :class:`~getml.data.View`, optional): The population table used in the *train* :class:`~getml.data.Subset`. You can either pass *population* and *split* or you can pass the subsets separately using *train*, *validation*, *test* and *kwargs*. validation (:class:`~getml.DataFrame` or :class:`~getml.data.View`, optional): The population table used in the *validation* :class:`~getml.data.Subset`. You can either pass *population* and *split* or you can pass the subsets separately using *train*, *validation*, *test* and *kwargs*. test (:class:`~getml.DataFrame` or :class:`~getml.data.View`, optional): The population table used in the *test* :class:`~getml.data.Subset`. You can either pass *population* and *split* or you can pass the subsets separately using *train*, *validation*, *test* and *kwargs*. kwargs (:class:`~getml.DataFrame` or :class:`~getml.data.View`, optional): The population table used in :class:`~getml.data.Subset` s other than the predefined *train*, *validation* and *test* subsets. You can call these subsets anything you want to and can access them just like *train*, *validation* and *test*. You can either pass *population* and *split* or you can pass the subsets separately using *train*, *validation*, *test* and *kwargs*. Example: .. code-block:: python # Pass the subset. star_schema = getml.data.StarSchema( my_subset=my_data_frame) # You can access the subset just like train, # validation or test my_pipeline.fit(star_schema.my_subset) Examples: Note that this example is taken from the `loans notebook <https://nbviewer.getml.com/github/getml/getml-demo/blob/master/loans.ipynb>`_. You might also want to refer to :class:`~getml.DataFrame`, :class:`~getml.data.View` and :class:`~getml.Pipeline`. .. code-block:: python # First, we insert our data. # population_train and population_test are either # DataFrames or Views. The population table # defines the statistical population of your # machine learning problem and contains the # target variables. star_schema = getml.data.StarSchema( train=population_train, test=population_test ) # meta, order and trans are either # DataFrames or Views. # Because this is a star schema, # all joins take place on the population # table. star_schema.join( trans, on="account_id", time_stamps=("date_loan", "date") ) star_schema.join( order, on="account_id", ) star_schema.join( meta, on="account_id", ) # Now you can insert your data model, # your preprocessors, feature learners, # feature selectors and predictors # into the pipeline. # Note that the pipeline only knows # the abstract data model, but hasn't # seen the actual data yet. pipe = getml.Pipeline( data_model=star_schema.data_model, preprocessors=[mapping], feature_learners=[fast_prop], feature_selectors=[feature_selector], predictors=predictor, ) # Now, we pass the actual data. # This passes 'population_train' and the # peripheral tables (meta, order and trans) # to the pipeline. pipe.check(star_schema.train) pipe.fit(star_schema.train) pipe.score(star_schema.test) # To generate predictions on new data, # it is sufficient to use a Container. # You don't have to recreate the entire # StarSchema, because the abstract data model # is stored in the pipeline. container = getml.data.Container( population=population_new) container.add( trans=trans_new, order=order_new, meta=meta_new) predictions = pipe.predict(container.full) If you don't already have a train and test set, you can use a function from the :mod:`~getml.data.split` module. .. code-block:: python split = getml.data.split.random( train=0.8, test=0.2) star_schema = getml.data.StarSchema( population=population_all, split=split, ) # The remaining code is the same as in # the example above. In particular, # star_schema.train and star_schema.test # work just like above. """ def __init__( self, population=None, alias=None, peripheral=None, split=None, deep_copy=False, train=None, validation=None, test=None, **kwargs, ): if (population is None or isinstance(population, View)) and alias is None: raise ValueError( "If 'population' is None or a getml.data.View, you must set an alias." ) self._alias = alias or population.name self._container = Container( population=population, peripheral=peripheral, split=split, deep_copy=deep_copy, train=train, validation=validation, test=test, **kwargs, ) def get_placeholder(): if population is not None: return population.to_placeholder(alias) if train is not None: return train.to_placeholder(alias) if validation is not None: return validation.to_placeholder(alias) if test is not None: return test.to_placeholder(alias) assert ( len(kwargs) > 0 ), "This should have been checked by Container.__init__." return kwargs[list(kwargs.keys())[0]].to_placeholder(alias) self._data_model = DataModel(get_placeholder()) def __dir__(self): attrs = dir(type(self)) + [key[1:] for key in list(vars(self))] attrs += dir(self.container) attrs += dir(self.data_model) return list(set(attrs)) def __iter__(self): yield from [self.population] + list(self.peripheral.values()) def __getattr__(self, key): try: return self[key] except KeyError: return super().__getattribute__(key) def __getitem__(self, key): attrs = vars(self) if key in attrs: return attrs[key] if "_" + key in attrs: return attrs["_" + key] try: return attrs["_container"][key] except KeyError: return attrs["_data_model"][key] def __repr__(self): template = cleandoc( """ data model {data_model} container {container} """ ) return template.format( data_model=indent(repr(self.data_model), " "), container=indent(repr(self.container), " "), ) def _repr_html_(self): template = cleandoc( """ <span style='font-size: 1.2rem; font-weight: 500;'>data model</span> {data_model} <span style='font-size: 1.2rem; font-weight: 500;'>container</span> {container} """ ) return template.format( data_model=self.data_model._repr_html_(), container=self.container._repr_html_(), ) @property def container(self): """ The underlying :class:`~getml.data.Container`. """ return self._container @property def data_model(self): """ The underlying :class:`~getml.data.DataModel`. """ return self._data_model
[docs] def join( self, right_df, alias=None, on=None, time_stamps=None, relationship=many_to_many, memory=None, horizon=None, lagged_targets=False, upper_time_stamp=None, ): """ Joins a :class:`~getml.DataFrame` or :class:`~getml.data.View` to the population table. In a :class:`~getml.data.StarSchema` or :class:`~getml.data.TimeSeries`, all joins take place on the population table. If you want to create more complex data models, use :class:`~getml.data.DataModel` instead. Examples: This example will construct a data model in which the 'population_table' depends on the 'peripheral_table' via the 'join_key' column. In addition, only those rows in 'peripheral_table' for which 'time_stamp' is smaller or equal to the 'time_stamp' in 'population_table' are considered: .. code-block:: python star_schema = getml.data.StarSchema( population=population_table, split=split) star_schema.join( peripheral_table, on="join_key", time_stamps="time_stamp" ) If the relationship between two tables is many-to-one or one-to-one you should clearly say so: .. code-block:: python star_schema.join( peripheral_table, on="join_key", time_stamps="time_stamp", relationship=getml.data.relationship.many_to_one, ) Please also refer to :mod:`~getml.data.relationship`. If the join keys or time stamps are named differently in the two different tables, use a tuple: .. code-block:: python star_schema.join( peripheral_table, on=("join_key", "other_join_key"), time_stamps=("time_stamp", "other_time_stamp"), ) You can join over more than one join key: .. code-block:: python star_schema.join( peripheral_table, on=["join_key1", "join_key2", ("join_key3", "other_join_key3")], time_stamps="time_stamp", ) You can also limit the scope of your joins using *memory*. This can significantly speed up training time. For instance, if you only want to consider data from the last seven days, you could do something like this: .. code-block:: python star_schema.join( peripheral_table, on="join_key", time_stamps="time_stamp", memory=getml.data.time.days(7), ) In some use cases, particularly those involving time series, it might be a good idea to use targets from the past. You can activate this using *lagged_targets*. But if you do that, you must also define a prediction *horizon*. For instance, if you want to predict data for the next hour, using data from the last seven days, you could do this: .. code-block:: python star_schema.join( peripheral_table, on="join_key", time_stamps="time_stamp", lagged_targets=True, horizon=getml.data.time.hours(1), memory=getml.data.time.days(7), ) Please also refer to :mod:`~getml.data.time`. If the join involves many matches, it might be a good idea to set the relationship to :const:`~getml.data.relationship.propositionalization`. This forces the pipeline to always use a propositionalization algorithm for this join, which can significantly speed things up. .. code-block:: python star_schema.join( peripheral_table, on="join_key", time_stamps="time_stamp", relationship=getml.data.relationship.propositionalization, ) Please also refer to :mod:`~getml.data.relationship`. Args: right_df (:class:`~getml.DataFrame` or :class:`~getml.data.View`): The data frame or view you would like to join. alias (str or None): The name as which you want *right_df* to be referred to in the generated SQL code. on (None, string, Tuple[str] or List[Union[str, Tuple[str]]]): The join keys to use. If none is passed, then everything will be joined to everything else. time_stamps (string or Tuple[str]): The time stamps used to limit the join. relationship (str): The relationship between the two tables. Must be from :mod:`~getml.data.relationship`. memory (float): The difference between the time stamps until data is 'forgotten'. Limiting your joins using memory can significantly speed up training time. Also refer to :mod:`~getml.data.time`. horizon (float): The prediction horizon to apply to this join. Also refer to :mod:`~getml.data.time`. lagged_targets (bool): Whether you want to allow lagged targets. If this is set to True, you must also pass a positive, non-zero *horizon*. upper_time_stamp (str): Name of a time stamp in *right_df* that serves as an upper limit on the join. """ if not isinstance(right_df, (DataFrame, View)): raise TypeError( f"Expected a {DataFrame} as 'right_df', got: {type(right_df)}." ) if isinstance(right_df, View): if alias is None: raise ValueError( "Setting an 'alias' is required if a getml.data.View is supplied " "as a peripheral table." ) def modify_join_keys(on): if isinstance(on, list): return [modify_join_keys(jk) for jk in on] if isinstance(on, (str, StringColumn)): on = (on, on) if on is not None and on: on = tuple( jkey.name if isinstance(jkey, StringColumn) else jkey for jkey in on ) return on def modify_time_stamps(time_stamps): if isinstance(time_stamps, (str, FloatColumn)): time_stamps = (time_stamps, time_stamps) if time_stamps is not None: time_stamps = tuple( time_stamp.name if isinstance(time_stamp, FloatColumn) else time_stamp for time_stamp in time_stamps ) return time_stamps on = modify_join_keys(on) time_stamps = modify_time_stamps(time_stamps) upper_time_stamp = ( upper_time_stamp.name if isinstance(upper_time_stamp, FloatColumn) else upper_time_stamp ) right = right_df.to_placeholder(alias) self.data_model.population.join( right=right, on=on, time_stamps=time_stamps, relationship=relationship, memory=memory, horizon=horizon, lagged_targets=lagged_targets, upper_time_stamp=upper_time_stamp, ) alias = alias or right_df.name self.container.add(**{alias: right_df})
[docs] def sync(self): """ Synchronizes the last change with the data to avoid warnings that the data has been changed. This is only a problem when deep_copy=False. """ self.container.sync()