Source code for getml.data.container

# Copyright 2022 The SQLNet Company GmbH
#
# This file is licensed under the Elastic License 2.0 (ELv2).
# Refer to the LICENSE.txt file in the root of the repository
# for details.
#


"""
For keeping and versioning data.
"""

from datetime import datetime
from inspect import cleandoc
from typing import Dict

import pandas as pd  # type: ignore

import getml.communication as comm
from getml.data.columns import StringColumn, StringColumnView, from_value
from getml.log import logger
from getml.utilities.formatting import _Formatter

from .data_frame import DataFrame
from .helpers import _is_typed_dict, _make_id
from .helpers2 import (
    _deep_copy,
    _get_last_change,
    _make_subsets_from_kwargs,
    _make_subsets_from_split,
)
from .subset import Subset
from .view import View


[docs]class Container: """ A container holds the actual data in the form of a :class:`~getml.DataFrame` or a :class:`~getml.data.View`. The purpose of a container is twofold: - Assigning concrete data to an abstract :class:`~getml.data.DataModel`. - Storing data and allowing you to reproduce previous results. Args: population (:class:`~getml.DataFrame` or :class:`~getml.data.View`, optional): The population table defines the `statistical population <https://en.wikipedia.org/wiki/Statistical_population>`_ of the machine learning problem and contains the target variables. peripheral (dict, optional): The peripheral tables are joined onto *population* or other peripheral tables. Note that you can also pass them using :meth:`~getml.data.Container.add`. split (:class:`~getml.data.columns.StringColumn` or :class:`~getml.data.columns.StringColumnView`, optional): Contains information on how you want to split *population* into different :class:`~getml.data.Subset` s. Also refer to :mod:`~getml.data.split`. deep_copy (bool, optional): Whether you want to create deep copies or your tables. train (:class:`~getml.DataFrame` or :class:`~getml.data.View`, optional): The population table used in the *train* :class:`~getml.data.Subset`. You can either pass *population* and *split* or you can pass the subsets separately using *train*, *validation*, *test* and *kwargs*. validation (:class:`~getml.DataFrame` or :class:`~getml.data.View`, optional): The population table used in the *validation* :class:`~getml.data.Subset`. You can either pass *population* and *split* or you can pass the subsets separately using *train*, *validation*, *test* and *kwargs*. test (:class:`~getml.DataFrame` or :class:`~getml.data.View`, optional): The population table used in the *test* :class:`~getml.data.Subset`. You can either pass *population* and *split* or you can pass the subsets separately using *train*, *validation*, *test* and *kwargs*. kwargs (:class:`~getml.DataFrame` or :class:`~getml.data.View`, optional): The population table used in :class:`~getml.data.Subset` s other than the predefined *train*, *validation* and *test* subsets. You can call these subsets anything you want to and can access them just like *train*, *validation* and *test*. You can either pass *population* and *split* or you can pass the subsets separately using *train*, *validation*, *test* and *kwargs*. Example: .. code-block:: python # Pass the subset. container = getml.data.Container(my_subset=my_data_frame) # You can access the subset just like train, # validation or test my_pipeline.fit(container.my_subset) Examples: A :class:`~getml.data.DataModel` only contains abstract data. When we fit a pipeline, we need to assign concrete data. Note that this example is taken from the `loans notebook <https://nbviewer.getml.com/github/getml/getml-demo/blob/master/loans.ipynb>`_. .. code-block:: python # The abstract data model is constructed # using the DataModel class. A data model # does not contain any actual data. It just # defines the abstract relational structure. dm = getml.data.DataModel( population_train.to_placeholder("population") ) dm.add(getml.data.to_placeholder( meta=meta, order=order, trans=trans) ) dm.population.join( dm.trans, on="account_id", time_stamps=("date_loan", "date") ) dm.population.join( dm.order, on="account_id", ) dm.population.join( dm.meta, on="account_id", ) # We now have abstract placeholders on something # called "population", "meta", "order" and "trans". # But how do we assign concrete data? By using # a container. container = getml.data.Container( train=population_train, test=population_test ) # meta, order and trans are either # DataFrames or Views. Their aliases need # to match the names of the placeholders in the # data model. container.add( meta=meta, order=order, trans=trans ) # Freezing makes the container immutable. # This is not required, but often a good idea. container.freeze() # When we call 'train', the container # will return the train set and the # peripheral tables. my_pipeline.fit(container.train) # Same for 'test' my_pipeline.score(container.test) If you don't already have a train and test set, you can use a function from the :mod:`~getml.data.split` module. .. code-block:: python split = getml.data.split.random( train=0.8, test=0.2) container = getml.data.Container( population=population_all, split=split, ) # The remaining code is the same as in # the example above. In particular, # container.train and container.test # work just like above. Containers can also be used for storage and reproducing your results. A recommended pattern is to assign 'baseline roles' to your data frames and then using a :class:`~getml.data.View` to tweak them: .. code-block:: python # Assign baseline roles data_frame.set_role(["jk"], getml.data.roles.join_key) data_frame.set_role(["col1", "col2"], getml.data.roles.categorical) data_frame.set_role(["col3", "col4"], getml.data.roles.numerical) data_frame.set_role(["col5"], getml.data.roles.target) # Make the data frame immutable, so in-place operations are # no longer possible. data_frame.freeze() # Save the data frame. data_frame.save() # I suspect that col1 leads to overfitting, so I will drop it. view = data_frame.drop(["col1"]) # Insert the view into a container. container = getml.data.Container(...) container.add(some_alias=view) container.save() The advantage of using such a pattern is that it enables you to always completely retrace your entire pipeline without creating deep copies of the data frames whenever you have made a small change like the one in our example. Note that the pipeline will record which container you have used. """ def __init__( self, population=None, peripheral=None, split=None, deep_copy=False, train=None, validation=None, test=None, **kwargs, ): if population is not None and not isinstance(population, (DataFrame, View)): raise TypeError( "'population' must be a getml.DataFrame or a getml.data.View, got " + type(population).__name__ + "." ) if peripheral is not None and not _is_typed_dict( peripheral, str, [DataFrame, View] ): raise TypeError( "'peripheral' must be a dict " + "of getml.DataFrames or getml.data.Views." ) if split is not None and not isinstance( split, (StringColumn, StringColumnView) ): raise TypeError( "'split' must be StringColumn or a StringColumnView, got " + type(split).__name__ + "." ) if not isinstance(deep_copy, bool): raise TypeError( "'deep_copy' must be a bool, got " + type(split).__name__ + "." ) exclusive = (population is not None) ^ ( len(_make_subsets_from_kwargs(train, validation, test, **kwargs)) != 0 ) if not exclusive: raise ValueError( "'population' and 'train', 'validation', 'test' as well as " + "other subsets signified by kwargs are mutually exclusive. " + "You have to pass " + "either 'population' or some subsets, but you cannot pass both." ) if population is None and split is not None: raise ValueError( "'split's are used for splitting population DataFrames." "Hence, if you supply 'split', you also have to supply " "a population." ) if population is not None and split is None: logger.warning( "You have passed a population table without passing 'split'. " "You can access the entire set to pass to your pipeline " "using the .full attribute." ) split = from_value("full") self._id = _make_id() self._population = population self._peripheral = peripheral or {} self._split = split self._deep_copy = deep_copy self._subsets = ( _make_subsets_from_split(population, split) if split is not None else _make_subsets_from_kwargs(train, validation, test, **kwargs) ) if split is None and not _is_typed_dict(self._subsets, str, [DataFrame, View]): raise TypeError( "'train', 'validation', 'test' and all other subsets must be either a " "getml.DataFrame or a getml.data.View." ) if deep_copy: self._population = _deep_copy(self._population, self._id) self._peripheral = { k: _deep_copy(v, self._id) for (k, v) in self._peripheral.items() } self._subsets = { k: _deep_copy(v, self._id) for (k, v) in self._subsets.items() } self._last_change = _get_last_change( self._population, self._peripheral, self._subsets ) self._frozen_time = None def __dir__(self): attrs = dir(type(self)) + self._ipython_key_completion() attrs = [x for x in attrs if x.isidentifier()] return attrs def __getattr__(self, key): try: return self[key] except KeyError: super().__getattribute__(key) def __getitem__(self, key): if "_" + key in self.__dict__: return self.__dict__["_" + key] if key in self.__dict__["_subsets"]: if self.__dict__["_deep_copy"] and self._frozen_time is None: raise ValueError( cleandoc( f""" If you set deep_copy=True, you must call {type(self).__name__}.freeze() before you can extract data. The idea of deep_copy is to ensure that you can always retrace and reproduce your results. That is why the container needs to be immutable before it can be used. """ ) ) last_change = _get_last_change( self.__dict__["_population"], self.__dict__["_peripheral"], self.__dict__["_subsets"], ) if self.__dict__["_last_change"] != last_change: logger.warning( cleandoc( f""" Some of the data frames added to the {type(self).__name__} have been modified after they were added. This might lead to unexpected results. To avoid these sort of problems, you can set deep_copy=True when creating the {type(self).__name__}. """ ) ) return Subset( container_id=self.__dict__["_id"], population=self.__dict__["_subsets"][key].with_name(key), peripheral=self.__dict__["_peripheral"], ) if key in self.__dict__["_peripheral"]: return self.__dict__["_peripheral"][key] raise KeyError(f"{type(self).__name__} holds no data with name {key!r}.") def __repr__(self): pop, perph = self._format() template = cleandoc( """ population {pop} peripheral {perph} """ ) return template.format(pop=pop._render_string(), perph=perph._render_string()) def _repr_html_(self): pop, perph = self._format() template = cleandoc( """ <div style='margin-top: 15px;'> <div style='float: left; margin-right: 50px;'> <div style='margin-bottom: 10px; font-size: 1rem;'>population</div> {pop} </div> <div style='float: left;'> <div style='margin-bottom: 10px; font-size: 1rem;'>peripheral</div> {perph} </div> </div> """ ) return template.format(pop=pop._render_html(), perph=perph._render_html()) def _format(self): headers_pop = [["subset", "name", "rows", "type"]] rows_pop = [ [key, subset.name, subset.nrows(), type(subset).__name__] for key, subset in self.subsets.items() # pytype: disable=attribute-error ] headers_perph = [["name", "rows", "type"]] rows_perph = [ [perph.name, perph.nrows(), type(perph).__name__] for perph in self.peripheral.values() # pytype: disable=attribute-error ] names = [ perph.name for perph in self.peripheral.values() # pytype: disable=attribute-error ] aliases = list(self.peripheral.keys()) # pytype: disable=attribute-error if any(alias not in names for alias in aliases): headers_perph[0].insert(0, "alias") for alias, row in zip(aliases, rows_perph): row.insert(0, alias) return _Formatter(headers=headers_pop, rows=rows_pop), _Formatter( headers=headers_perph, rows=rows_perph ) def _getml_deserialize(self): cmd = {k[1:] + "_": v for (k, v) in self.__dict__.items() if v is not None} if self._population is not None: cmd["population_"] = self._population._getml_deserialize() if self._split is not None: cmd[ "split_" ] = self._split._getml_deserialize() # pytype: disable=attribute-error cmd["peripheral_"] = { k: v._getml_deserialize() for (k, v) in self._peripheral.items() } cmd["subsets_"] = { k: v._getml_deserialize() for (k, v) in self._subsets.items() } return cmd def _ipython_key_completion(self): attrs = [v[1:] for v in list(vars(self))] attrs.extend(self._peripheral) if not self._deep_copy or self._frozen_time is not None: attrs.extend(self._subsets) return attrs def __setattr__(self, name, value): if not name or name[0] != "_": raise ValueError("Attempting a write operation on read-only data.") vars(self)[name] = value
[docs] def add(self, *args, **kwargs): """ Adds new peripheral data frames or views. """ wrong_type = [item for item in args if not isinstance(item, (DataFrame, View))] if wrong_type: raise TypeError( "All unnamed arguments must be getml.DataFrames or getml.data.Views." ) wrong_type = [ k for (k, v) in kwargs.items() if not isinstance(v, (DataFrame, View)) ] if wrong_type: raise TypeError( "You must pass getml.DataFrames or getml.data.Views, " f"but the following arguments were neither: {wrong_type!r}." ) kwargs = {**{item.name: item for item in args}, **kwargs} if self._frozen_time is not None: raise ValueError( f"You cannot add data frames after the {type(self).__name__} has been frozen." ) if self._deep_copy: kwargs = {k: _deep_copy(v, self._id) for (k, v) in kwargs.items()} self._peripheral = {**self._peripheral, **kwargs} self._last_change = _get_last_change( self._population, self._peripheral, self._subsets )
[docs] def freeze(self): """ Freezes the container, so that changes are no longer possible. This is required before you can extract data when deep_copy=True. The idea of deep_copy is to ensure that you can always retrace and reproduce your results. That is why the container needs to be immutable before it can be used. """ self.sync() self._frozen_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
[docs] def save(self): """ Saves the Container to disk. """ cmd = dict() cmd["type_"] = "DataContainer.save" cmd["name_"] = self._id cmd["container_"] = self._getml_deserialize() comm.send(cmd)
[docs] def sync(self): """ Synchronizes the last change with the data to avoid warnings that the data has been changed. This is only a problem when deep_copy=False. """ if self._frozen_time is not None: raise ValueError(f"{type(self).__name__} has already been frozen.") self._last_change = _get_last_change( self._population, self._peripheral, self._subsets )
[docs] def to_pandas(self) -> Dict[str, pd.DataFrame]: """ TODO """ subsets = ( {name: df.to_pandas() for name, df in self._subsets.items()} if self._subsets else {} ) peripherals = ( {name: df.to_pandas() for name, df in self.peripheral.items()} if self.peripheral else {} ) if subsets or peripherals: return {**subsets, **peripherals} raise ValueError("Container is empty.")