# Copyright 2021 The SQLNet Company GmbH
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to
# deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
# DEALINGS IN THE SOFTWARE.
"""
Special container for star schemata.
"""
import copy
from inspect import cleandoc
from textwrap import indent
from .columns import FloatColumn, StringColumn
from .container import Container
from .data_frame import DataFrame
from .data_model import DataModel
from .relationship import many_to_many
from .view import View
[docs]class StarSchema:
"""
A StarSchema is a simplifying abstraction that can be used
for machine learning problems that can be organized in a simple
`star schema <https://en.wikipedia.org/wiki/Star_schema>`_.
It unifies :class:`~getml.data.Container` and
:class:`~getml.data.DataModel` thus abstracting away the need to
differentiate between the concrete data and the abstract data model.
The class is designed using
`composition <https://en.wikipedia.org/wiki/Composition_over_inheritance>`_
- it *is* neither :class:`~getml.data.Container` nor :class:`~getml.data.DataModel`,
but *has* both of them.
This means that you can always fall back to the more flexible methods using
:class:`~getml.data.Container` and :class:`~getml.data.DataModel` by directly
accessing the attributes `container` and `data_model`.
Args:
population (:class:`~getml.DataFrame` or :class:`~getml.data.View`, optional):
The population table defines the
`statistical population <https://en.wikipedia.org/wiki/Statistical_population>`_
of the machine learning problem and contains the target variables.
alias (str, optional):
The alias to be used for the population table. This is required,
if *population* is a :class:`~getml.data.View`.
peripheral (dict, optional):
The peripheral tables are joined onto *population* or other
peripheral tables. Note that you can also pass them using
:meth:`~getml.data.StarSchema.join`.
split (:class:`~getml.data.columns.StringColumn` or :class:`~getml.data.columns.StringColumnView`, optional):
Contains information on how you want to split *population* into
different :class:`~getml.data.Subset` s.
Also refer to :mod:`~getml.data.split`.
deep_copy (bool, optional):
Whether you want to create deep copies or your tables.
train (:class:`~getml.DataFrame` or :class:`~getml.data.View`, optional):
The population table used in the *train*
:class:`~getml.data.Subset`.
You can either pass *population* and *split* or you can pass
the subsets separately using *train*, *validation*, *test*
and *kwargs*.
validation (:class:`~getml.DataFrame` or :class:`~getml.data.View`, optional):
The population table used in the *validation*
:class:`~getml.data.Subset`.
You can either pass *population* and *split* or you can pass
the subsets separately using *train*, *validation*, *test*
and *kwargs*.
test (:class:`~getml.DataFrame` or :class:`~getml.data.View`, optional):
The population table used in the *test*
:class:`~getml.data.Subset`.
You can either pass *population* and *split* or you can pass
the subsets separately using *train*, *validation*, *test*
and *kwargs*.
kwargs (:class:`~getml.DataFrame` or :class:`~getml.data.View`, optional):
The population table used in :class:`~getml.data.Subset` s
other than the predefined *train*, *validation* and *test* subsets.
You can call these subsets anything you want to and can access them
just like *train*, *validation* and *test*.
You can either pass *population* and *split* or you can pass
the subsets separately using *train*, *validation*, *test*
and *kwargs*.
Example:
.. code-block:: python
# Pass the subset.
star_schema = getml.data.StarSchema(
my_subset=my_data_frame)
# You can access the subset just like train,
# validation or test
my_pipeline.fit(star_schema.my_subset)
Examples:
Note that this example is taken from the
`loans notebook <https://nbviewer.getml.com/github/getml/getml-demo/blob/master/loans.ipynb>`_.
You might also want to refer to
:class:`~getml.DataFrame`, :class:`~getml.data.View`
and :class:`~getml.Pipeline`.
.. code-block:: python
# First, we insert our data.
# population_train and population_test are either
# DataFrames or Views. The population table
# defines the statistical population of your
# machine learning problem and contains the
# target variables.
star_schema = getml.data.StarSchema(
train=population_train,
test=population_test
)
# meta, order and trans are either
# DataFrames or Views.
# Because this is a star schema,
# all joins take place on the population
# table.
star_schema.join(
trans,
on="account_id",
time_stamps=("date_loan", "date")
)
star_schema.join(
order,
on="account_id",
)
star_schema.join(
meta,
on="account_id",
)
# Now you can insert your data model,
# your preprocessors, feature learners,
# feature selectors and predictors
# into the pipeline.
# Note that the pipeline only knows
# the abstract data model, but hasn't
# seen the actual data yet.
pipe = getml.Pipeline(
data_model=star_schema.data_model,
preprocessors=[mapping],
feature_learners=[fast_prop],
feature_selectors=[feature_selector],
predictors=predictor,
)
# Now, we pass the actual data.
# This passes 'population_train' and the
# peripheral tables (meta, order and trans)
# to the pipeline.
pipe.check(star_schema.train)
pipe.fit(star_schema.train)
pipe.score(star_schema.test)
# To generate predictions on new data,
# it is sufficient to use a Container.
# You don't have to recreate the entire
# StarSchema, because the abstract data model
# is stored in the pipeline.
container = getml.data.Container(
population=population_new)
container.add(
trans=trans_new,
order=order_new,
meta=meta_new)
predictions = pipe.predict(container.full)
If you don't already have a train and test set,
you can use a function from the
:mod:`~getml.data.split` module.
.. code-block:: python
split = getml.data.split.random(
train=0.8, test=0.2)
star_schema = getml.data.StarSchema(
population=population_all,
split=split,
)
# The remaining code is the same as in
# the example above. In particular,
# star_schema.train and star_schema.test
# work just like above.
"""
def __init__(
self,
population=None,
alias=None,
peripheral=None,
split=None,
deep_copy=False,
train=None,
validation=None,
test=None,
**kwargs,
):
if (population is None or isinstance(population, View)) and alias is None:
raise ValueError(
"If 'population' is None or a getml.data.View, you must set an alias."
)
self._alias = alias or population.name
self._container = Container(
population=population,
peripheral=peripheral,
split=split,
deep_copy=deep_copy,
train=train,
validation=validation,
test=test,
**kwargs,
)
def get_placeholder():
if population is not None:
return population.to_placeholder(alias)
if train is not None:
return train.to_placeholder(alias)
if validation is not None:
return validation.to_placeholder(alias)
if test is not None:
return test.to_placeholder(alias)
assert (
len(kwargs) > 0
), "This should have been checked by Container.__init__."
return kwargs[list(kwargs.keys())[0]].to_placeholder(alias)
self._data_model = DataModel(get_placeholder())
def __dir__(self):
attrs = dir(type(self)) + [key[1:] for key in list(vars(self))]
attrs += dir(self.container)
attrs += dir(self.data_model)
return list(set(attrs))
def __iter__(self):
yield from [self.population] + list(self.peripheral.values())
def __getattr__(self, key):
try:
return self[key]
except KeyError:
return super().__getattribute__(key)
def __getitem__(self, key):
attrs = vars(self)
if key in attrs:
return attrs[key]
if "_" + key in attrs:
return attrs["_" + key]
try:
return attrs["_container"][key]
except KeyError:
return attrs["_data_model"][key]
def __repr__(self):
template = cleandoc(
"""
data model
{data_model}
container
{container}
"""
)
return template.format(
data_model=indent(repr(self.data_model), " "),
container=indent(repr(self.container), " "),
)
def _repr_html_(self):
template = cleandoc(
"""
<h2>data model</h2>
{data_model}
<h2>container</h2>
{container}
"""
)
return template.format(
data_model=self.data_model._repr_html_(),
container=self.container._repr_html_(),
)
@property
def container(self):
"""
The underlying :class:`~getml.data.Container`.
"""
return self._container
@property
def data_model(self):
"""
The underlying :class:`~getml.data.DataModel`.
"""
return self._data_model
[docs] def join(
self,
right_df,
alias=None,
on=None,
time_stamps=None,
relationship=many_to_many,
memory=None,
horizon=None,
lagged_targets=False,
upper_time_stamp=None,
):
"""
Joins a :class:`~getml.DataFrame` or :class:`~getml.data.View`
to the population table.
In a :class:`~getml.data.StarSchema` or :class:`~getml.data.TimeSeries`,
all joins take place on the population table. If you want to create more
complex data models, use :class:`~getml.data.DataModel` instead.
Examples:
This example will construct a data model in which the
'population_table' depends on the 'peripheral_table' via
the 'join_key' column. In addition, only those rows in
'peripheral_table' for which 'time_stamp' is smaller or
equal to the 'time_stamp' in 'population_table' are considered:
.. code-block:: python
star_schema = getml.data.StarSchema(
population=population_table, split=split)
star_schema.join(
peripheral_table,
on="join_key",
time_stamps="time_stamp"
)
If the relationship between two tables is many-to-one or one-to-one
you should clearly say so:
.. code-block:: python
star_schema.join(
peripheral_table,
on="join_key",
time_stamps="time_stamp",
relationship=getml.data.relationship.many_to_one,
)
Please also refer to :mod:`~getml.data.relationship`.
If the join keys or time stamps are named differently in the two
different tables, use a tuple:
.. code-block:: python
star_schema.join(
peripheral_table,
on=("join_key", "other_join_key"),
time_stamps=("time_stamp", "other_time_stamp"),
)
You can join over more than one join key:
.. code-block:: python
star_schema.join(
peripheral_table,
on=["join_key1", "join_key2", ("join_key3", "other_join_key3")],
time_stamps="time_stamp",
)
You can also limit the scope of your joins using *memory*. This
can significantly speed up training time. For instance, if you
only want to consider data from the last seven days, you could
do something like this:
.. code-block:: python
star_schema.join(
peripheral_table,
on="join_key",
time_stamps="time_stamp",
memory=getml.data.time.days(7),
)
In some use cases, particularly those involving time series, it
might be a good idea to use targets from the past. You can activate
this using *lagged_targets*. But if you do that, you must
also define a prediction *horizon*. For instance, if you want to
predict data for the next hour, using data from the last seven days,
you could do this:
.. code-block:: python
star_schema.join(
peripheral_table,
on="join_key",
time_stamps="time_stamp",
lagged_targets=True,
horizon=getml.data.time.hours(1),
memory=getml.data.time.days(7),
)
Please also refer to :mod:`~getml.data.time`.
If the join involves many matches, it might be a good idea to set the
relationship to :const:`~getml.data.relationship.propositionalization`.
This forces the pipeline to always use a propositionalization
algorithm for this join, which can significantly speed things up.
.. code-block:: python
star_schema.join(
peripheral_table,
on="join_key",
time_stamps="time_stamp",
relationship=getml.data.relationship.propositionalization,
)
Please also refer to :mod:`~getml.data.relationship`.
Args:
right_df (:class:`~getml.DataFrame` or :class:`~getml.data.View`):
The data frame or view you would like to join.
alias (str or None):
The name as which you want *right_df* to be referred to in
the generated SQL code.
on (None, string, Tuple[str] or List[Union[str, Tuple[str]]]):
The join keys to use. If none is passed, then everything
will be joined to everything else.
time_stamps (string or Tuple[str]):
The time stamps used to limit the join.
relationship (str):
The relationship between the two tables. Must be from
:mod:`~getml.data.relationship`.
memory (float):
The difference between the time stamps until data is 'forgotten'.
Limiting your joins using memory can significantly speed up
training time. Also refer to :mod:`~getml.data.time`.
horizon (float):
The prediction horizon to apply to this join.
Also refer to :mod:`~getml.data.time`.
lagged_targets (bool):
Whether you want to allow lagged targets. If this is set to True,
you must also pass a positive, non-zero *horizon*.
upper_time_stamp (str):
Name of a time stamp in *right_df* that serves as an upper limit
on the join.
"""
if not isinstance(right_df, (DataFrame, View)):
raise TypeError(
f"Expected a {DataFrame} as 'right_df', got: {type(right_df)}."
)
if isinstance(right_df, View):
if alias is None:
raise ValueError(
"Setting an 'alias' is required if a getml.data.View is supplied "
"as a peripheral table."
)
def modify_join_keys(on):
if isinstance(on, list):
return [modify_join_keys(jk) for jk in on]
if isinstance(on, (str, StringColumn)):
on = (on, on)
if on is not None and on:
on = tuple(
jkey.name if isinstance(jkey, StringColumn) else jkey for jkey in on
)
return on
def modify_time_stamps(time_stamps):
if isinstance(time_stamps, (str, FloatColumn)):
time_stamps = (time_stamps, time_stamps)
if time_stamps is not None:
time_stamps = tuple(
time_stamp.name
if isinstance(time_stamp, FloatColumn)
else time_stamp
for time_stamp in time_stamps
)
return time_stamps
on = modify_join_keys(on)
time_stamps = modify_time_stamps(time_stamps)
upper_time_stamp = (
upper_time_stamp.name
if isinstance(upper_time_stamp, FloatColumn)
else upper_time_stamp
)
right = right_df.to_placeholder(alias)
self.data_model.population.join(
right=right,
on=on,
time_stamps=time_stamps,
relationship=relationship,
memory=memory,
horizon=horizon,
lagged_targets=lagged_targets,
upper_time_stamp=upper_time_stamp,
)
alias = alias or right_df.name
self.container.add(**{alias: right_df})
[docs] def sync(self):
"""
Synchronizes the last change with the data to avoid warnings that the data
has been changed.
This is only a problem when deep_copy=False.
"""
self.container.sync()