Source code for getml.pipeline.pipeline

# Copyright 2022 The SQLNet Company GmbH
#
# This file is licensed under the Elastic License 2.0 (ELv2).
# Refer to the LICENSE.txt file in the root of the repository
# for details.
#


"""
This submodule contains the Pipeline, which is the main
class for feature learning and prediction.
"""

from __future__ import annotations

import copy
import json
import numbers
import socket
import time
from datetime import datetime
from typing import Any, Dict, List, Optional, Sequence, Union, Tuple

import numpy as np
from numpy.typing import NDArray

import getml.communication as comm
from getml import data
from getml.data import DataModel, _decode_data_model, _decode_placeholder
from getml.data.data_frame import DataFrame
from getml.data.helpers import (
    _is_subclass_list,
    _is_typed_list,
    _remove_trailing_underscores,
)
from getml.data import Placeholder, Roles, View
from getml.feature_learning import (
    _FeatureLearner,
    FastProp,
    Fastboost,
    Multirel,
    Relboost,
    RelMT,
)
from getml.feature_learning.loss_functions import _classification_loss
from getml.predictors import (
    _classification_types,
    _Predictor,
    LinearRegression,
    LogisticRegression,
    XGBoostClassifier,
    XGBoostRegressor,
    ScaleGBMClassifier,
    ScaleGBMRegressor,
)
from getml.preprocessors import (
    CategoryTrimmer,
    EmailDomain,
    Imputation,
    Mapping,
    Seasonal,
    Substring,
    TextFieldSplitter,
)
from getml.preprocessors.preprocessor import _Preprocessor
from getml.utilities.formatting import _SignatureFormatter

from .columns import Columns
from .tables import Tables
from .features import Features
from .helpers import (
    _check_df_types,
    _handle_loss_function,
    _infer_peripheral,
    _make_id,
    _parse_fe,
    _parse_metadata,
    _parse_pred,
    _parse_preprocessor,
    _print_time_taken,
    _replace_with_nan_maybe,
    _transform_peripheral,
)
from .issues import Issues
from .metadata import AllMetadata
from .metrics import (
    _all_metrics,
    _classification_metrics,
    accuracy,
    auc,
    cross_entropy,
    mae,
    rmse,
    rsquared,
)
from .plots import Plots
from .score import ClassificationScore, RegressionScore
from .scores_container import Scores
from .tags import Tags

NOT_FITTED = "NOT FITTED"


[docs]class Pipeline: """ A Pipeline is the main class for feature learning and prediction. Args: data_model (:class:`~getml.data.DataModel`): Abstract representation of the data_model, which defines the abstract relationships between the tables. Required for the feature learners. peripheral (Union[:class:`~getml.data.Placeholder`, List[:class:`~getml.data.Placeholder`]], optional): Abstract representations of the additional tables used to augment the information provided in `population`. These have to be the same objects that were :meth:`~getml.data.Placeholder.join` ed onto the `population` :class:`~getml.data.Placeholder`. Their order determines the order of the peripheral :class:`~getml.DataFrame` passed to the 'peripheral_tables' argument in :meth:`~getml.Pipeline.check`, :meth:`~getml.Pipeline.fit`, :meth:`~getml.Pipeline.predict`, :meth:`~getml.Pipeline.score`, and :meth:`~getml.Pipeline.transform`, if you pass the data frames as a list. If you omit the peripheral placeholders, they will be inferred from the data model and ordered alphabetically. preprocessors (Union[:class:`~getml.feature_learning._Preprocessor`, List[:class:`~getml.feature_learning._Preprocessor`]], optional): The preprocessor(s) to be used. Must be from :mod:`~getml.preprocessors`. A single preprocessor does not have to be wrapped in a list. feature_learners (Union[:class:`~getml.feature_learning._FeatureLearner`, List[:class:`~getml.feature_learning._FeatureLearner`]], optional): The feature learner(s) to be used. Must be from :mod:`~getml.feature_learning`. A single feature learner does not have to be wrapped in a list. feature_selectors (Union[:class:`~getml.predictors._Predictor`, List[:class:`~getml.predictors._Predictor`]], optional): Predictor(s) used to select the best features. Must be from :mod:`~getml.predictors`. A single feature selector does not have to be wrapped in a list. Make sure to also set *share_selected_features*. predictors (Union[:class:`~getml.predictors._Predictor`, List[:class:`~getml.predictors._Predictor`]], optional): Predictor(s) used to generate the predictions. If more than one predictor is passed, the predictions generated will be averaged. Must be from :mod:`~getml.predictors`. A single predictor does not have to be wrapped in a list. loss_function (str or None): The loss function to use for the feature learners. tags (List[str], optional): Tags exist to help you organize your pipelines. You can add any tags that help you remember what you were trying to do. include_categorical (bool, optional): Whether you want to pass categorical columns in the population table to the predictor. share_selected_features(float, optional): The share of features you want the feature selection to keep. When set to 0.0, then all features will be kept. Examples: We assume that you have already set up your preprocessors (refer to :mod:`~getml.preprocessors`), your feature learners (refer to :mod:`~getml.feature_learning`) as well as your feature selectors and predictors (refer to :mod:`~getml.predictors`, which can be used for prediction and feature selection). You might also want to refer to :class:`~getml.DataFrame`, :class:`~getml.View`, :class:`~getml.data.DataModel`, :class:`~getml.data.Container`, :class:`~getml.data.Placeholder` and :class:`~getml.data.StarSchema`. If you want to create features for a time series problem, the easiest way to do so is to use the :class:`~getml.data.TimeSeries` abstraction. Note that this example is taken from the `robot notebook <https://nbviewer.getml.com/github/getml/getml-demo/blob/master/robot.ipynb>`_. .. code-block:: python # All rows before row 10500 will be used for training. split = getml.data.split.time(data_all, "rowid", test=10500) time_series = getml.data.TimeSeries( population=data_all, time_stamps="rowid", split=split, lagged_targets=False, memory=30, ) pipe = getml.Pipeline( data_model=time_series.data_model, feature_learners=[...], predictors=... ) pipe.check(time_series.train) pipe.fit(time_series.train) pipe.score(time_series.test) # To generate predictions on new data, # it is sufficient to use a Container. # You don't have to recreate the entire # TimeSeries, because the abstract data model # is stored in the pipeline. container = getml.data.Container( population=population_new, ) # Add the data as a peripheral table, for the # self-join. container.add(population=population_new) predictions = pipe.predict(container.full) If your data can be organized in a simple star schema, you can use :class:`~getml.data.StarSchema`. :class:`~getml.data.StarSchema` unifies :class:`~getml.data.Container` and :class:`~getml.data.DataModel`: Note that this example is taken from the `loans notebook <https://nbviewer.getml.com/github/getml/getml-demo/blob/master/loans.ipynb>`_. .. code-block:: python # First, we insert our data into a StarSchema. # population_train and population_test are either # DataFrames or Views. The population table # defines the statistical population of your # machine learning problem and contains the # target variables. star_schema = getml.data.StarSchema( train=population_train, test=population_test ) # meta, order and trans are either # DataFrames or Views. # Because this is a star schema, # all joins take place on the population # table. star_schema.join( trans, on="account_id", time_stamps=("date_loan", "date") ) star_schema.join( order, on="account_id", ) star_schema.join( meta, on="account_id", ) # Now you can insert your data model, # your preprocessors, feature learners, # feature selectors and predictors # into the pipeline. # Note that the pipeline only knows # the abstract data model, but hasn't # seen the actual data yet. pipe = getml.Pipeline( data_model=star_schema.data_model, preprocessors=[mapping], feature_learners=[fast_prop], feature_selectors=[feature_selector], predictors=predictor, ) # Now, we pass the actual data. # This passes 'population_train' and the # peripheral tables (meta, order and trans) # to the pipeline. pipe.check(star_schema.train) pipe.fit(star_schema.train) pipe.score(star_schema.test) :class:`~getml.data.StarSchema` is simpler, but cannot be used for more complex data models. The general approach is to use :class:`~getml.data.Container` and :class:`~getml.data.DataModel`: .. code-block:: python # First, we insert our data into a Container. # population_train and population_test are either # DataFrames or Views. container = getml.data.Container( train=population_train, test=population_test ) # meta, order and trans are either # DataFrames or Views. They are given # aliases, so we can refer to them in the # DataModel. container.add( meta=meta, order=order, trans=trans ) # Freezing makes the container immutable. # This is not required, but often a good idea. container.freeze() # The abstract data model is constructed # using the DataModel class. A data model # does not contain any actual data. It just # defines the abstract relational structure. dm = getml.data.DataModel( population_train.to_placeholder("population") ) dm.add(getml.data.to_placeholder( meta=meta, order=order, trans=trans) ) dm.population.join( dm.trans, on="account_id", time_stamps=("date_loan", "date") ) dm.population.join( dm.order, on="account_id", ) dm.population.join( dm.meta, on="account_id", ) # Now you can insert your data model, # your preprocessors, feature learners, # feature selectors and predictors # into the pipeline. # Note that the pipeline only knows # the abstract data model, but hasn't # seen the actual data yet. pipe = getml.Pipeline( data_model=dm, preprocessors=[mapping], feature_learners=[fast_prop], feature_selectors=[feature_selector], predictors=predictor, ) # This passes 'population_train' and the # peripheral tables (meta, order and trans) # to the pipeline. pipe.check(container.train) pipe.fit(container.train) pipe.score(container.test) Technically, you don't actually have to use a :class:`~getml.data.Container`. You might as well do this (in fact, a :class:`~getml.data.Container` is just syntactic sugar for this approach): .. code-block:: python pipe.check( population_train, {"meta": meta, "order": order, "trans": trans}, ) pipe.fit( population_train, {"meta": meta, "order": order, "trans": trans}, ) pipe.score( population_test, {"meta": meta, "order": order, "trans": trans}, ) Or you could even do this. The order of the peripheral tables can be inferred from the __repr__ method of the pipeline, and it is usually in alphabetical order. .. code-block:: python pipe.check( population_train, [meta, order, trans], ) pipe.fit( population_train, [meta, order, trans], ) pipe.score( population_test, [meta, order, trans], ) """ def __init__( self, data_model: Optional[DataModel] = None, peripheral: Optional[List[Placeholder]] = None, preprocessors: Optional[ Union[ CategoryTrimmer, EmailDomain, Imputation, Mapping, Seasonal, Substring, TextFieldSplitter, List[ Union[ CategoryTrimmer, EmailDomain, Imputation, Mapping, Seasonal, Substring, TextFieldSplitter, ] ], ], ] = None, feature_learners: Optional[ Union[ Union[Fastboost, FastProp, Multirel, Relboost, RelMT], List[Union[Fastboost, FastProp, Multirel, Relboost, RelMT]], ] ] = None, feature_selectors: Optional[ Union[ Union[ LinearRegression, LogisticRegression, XGBoostClassifier, XGBoostRegressor, ScaleGBMClassifier, ScaleGBMRegressor, ], List[ Union[ LinearRegression, LogisticRegression, XGBoostClassifier, XGBoostRegressor, ScaleGBMClassifier, ScaleGBMRegressor, ] ], ], ] = None, predictors: Optional[ Union[ LinearRegression, LogisticRegression, XGBoostClassifier, XGBoostRegressor, ScaleGBMClassifier, ScaleGBMRegressor, List[ Union[ LinearRegression, LogisticRegression, XGBoostClassifier, XGBoostRegressor, ScaleGBMClassifier, ScaleGBMRegressor, ] ], ] ] = None, loss_function: Optional[str] = None, tags: Optional[list[str]] = None, include_categorical: bool = False, share_selected_features: float = 0.5, ) -> None: data_model = data_model or DataModel("population") if not isinstance(data_model, DataModel): raise TypeError("'data_model' must be a getml.data.DataModel.") peripheral = peripheral or _infer_peripheral(data_model.population) preprocessors = preprocessors or [] feature_learners = feature_learners or [] feature_selectors = feature_selectors or [] predictors = predictors or [] tags = tags or [] if not isinstance(preprocessors, list): preprocessors = [preprocessors] if not isinstance(feature_learners, list): feature_learners = [feature_learners] if not isinstance(feature_selectors, list): feature_selectors = [feature_selectors] if not isinstance(predictors, list): predictors = [predictors] if not isinstance(peripheral, list): peripheral = [peripheral] if not isinstance(tags, list): tags = [tags] self._id: str = NOT_FITTED self.type = "Pipeline" loss_function = ( loss_function or ( [fl.loss_function for fl in feature_learners if fl.loss_function] or ["SquareLoss"] )[0] ) feature_learners = [ _handle_loss_function(fl, loss_function) for fl in feature_learners ] self.data_model = data_model self.feature_learners = feature_learners self.feature_selectors = feature_selectors self.include_categorical = include_categorical self.loss_function = loss_function self.peripheral = peripheral self.predictors = predictors self.preprocessors = preprocessors self.share_selected_features = share_selected_features self.tags = Tags(tags) self._metadata: Optional[AllMetadata] = None self._scores: Dict[str, Any] = {} self._targets: List[str] = [] setattr(type(self), "_supported_params", list(self.__dict__.keys())) self._validate() # ---------------------------------------------------------------- def __eq__(self, other: object) -> bool: if not isinstance(other, Pipeline): raise TypeError("A Pipeline can only be compared to another Pipeline") if len(set(self.__dict__.keys())) != len(set(other.__dict__.keys())): return False for kkey in self.__dict__: if kkey not in other.__dict__: return False # Take special care when comparing numbers. if isinstance(self.__dict__[kkey], numbers.Real): if not np.isclose(self.__dict__[kkey], other.__dict__[kkey]): return False elif self.__dict__[kkey] != other.__dict__[kkey]: return False return True # ---------------------------------------------------------------- def __repr__(self) -> str: obj_dict = self._make_object_dict() sig = _SignatureFormatter(data=obj_dict) repr_str = sig._format() if self.fitted: url = self._make_url() repr_str += "\n\nurl: " + url if url else "" return repr_str # ---------------------------------------------------------------- def _repr_html_(self) -> str: obj_dict = self._make_object_dict() sig = _SignatureFormatter(data=obj_dict) repr_str = sig._format() html = f"<pre>{repr_str}</pre>" if self.fitted: url = self._make_url() html += ( ( "<br><pre>" + "url: <a href='" + url + '\' target="_blank">' + url + "</a>" + "</pre>" ) if url else "" ) return html # ------------------------------------------------------------ def _check_classification_or_regression(self) -> bool: """ Checks whether there are inconsistencies in the algorithms used (mixing classification and regression algorithms). """ all_classifiers = all( [ elem.loss_function in _classification_loss for elem in self.feature_learners ] ) all_classifiers = all_classifiers and all( [elem.type in _classification_types for elem in self.feature_selectors] ) all_classifiers = all_classifiers and all( [elem.type in _classification_types for elem in self.predictors] ) all_regressors = all( [ elem.loss_function not in _classification_loss for elem in self.feature_learners ] ) all_regressors = all_regressors and all( [elem.type not in _classification_types for elem in self.feature_selectors] ) all_regressors = all_regressors and all( [elem.type not in _classification_types for elem in self.predictors] ) if not all_classifiers and not all_regressors: raise ValueError( """You are mixing classification and regression algorithms. Please make sure that your feature learning algorithms consistently have classification loss functions (like CrossEntropyLoss) or consistently have regression loss functions (like SquareLoss). Also make sure that your feature selectors and predictors are consistently classifiers (like XGBoostClassifier or LogisticRegression) or consistently regressors (like XGBoostRegressor or LinearRegression). """ ) return all_classifiers # ------------------------------------------------------------ def _check_whether_fitted(self) -> None: if not self.fitted: raise ValueError("Pipeline has not been fitted!") # ------------------------------------------------------------ def _close(self, sock: socket.socket) -> None: if not isinstance(sock, socket.socket): raise TypeError("'sock' must be a socket.") cmd = dict() cmd["type_"] = self.type + ".close" cmd["name_"] = self.id comm.send_string(sock, json.dumps(cmd)) msg = comm.recv_string(sock) if msg != "Success!": comm.engine_exception_handler(msg) # ------------------------------------------------------------ def _get_latest_score(self, score: str) -> List[float]: nan_ = [np.nan] * len(self.targets) if score not in _all_metrics: raise AttributeError(f"Not a valid score name: {score}") if not self.scored: return nan_ if self.is_classification and score not in _classification_metrics: return nan_ if self.is_regression and score in _classification_metrics: return nan_ return self._scores[score] # ------------------------------------------------------------ def _getml_deserialize(self) -> Dict[str, Any]: """ Expresses the pipeline in a form the engine can understand. """ cmd = dict() self_dict = self.__dict__ cmd["name_"] = self.id for key, value in self_dict.items(): cmd[key + "_"] = value del cmd["_id_"] del cmd["_metadata_"] del cmd["_scores_"] del cmd["_targets_"] return cmd # ---------------------------------------------------------------- def _make_object_dict(self) -> Dict[str, Any]: obj_dict = copy.deepcopy(self.__dict__) obj_dict["data_model"] = self.data_model.population.name obj_dict["peripheral"] = [elem.name for elem in self.peripheral] obj_dict["preprocessors"] = [elem.type for elem in self.preprocessors] obj_dict["feature_learners"] = [elem.type for elem in self.feature_learners] obj_dict["feature_selectors"] = [elem.type for elem in self.feature_selectors] obj_dict["predictors"] = [elem.type for elem in self.predictors] return obj_dict # ---------------------------------------------------------------- def _make_score_history(self) -> List[Union[ClassificationScore, RegressionScore]]: scores: List[Dict[str, Any]] = self._scores["history"] scores = [_replace_with_nan_maybe(score) for score in scores] if self.is_classification: return [ ClassificationScore( date_time=datetime.strptime( score.get("date_time", ""), "%Y-%m-%d %H:%M:%S" ), set_used=score.get("set_used", ""), target=target, accuracy=score.get(accuracy, [np.nan])[target_num], auc=score.get(auc, [np.nan])[target_num], cross_entropy=score.get(cross_entropy, [np.nan])[target_num], ) for score in scores for target_num, target in enumerate(self.targets) ] return [ RegressionScore( date_time=datetime.strptime( score.get("date_time", ""), "%Y-%m-%d %H:%M:%S" ), set_used=score.get("set_used", ""), target=target, mae=score.get(mae, [np.nan])[target_num], rmse=score.get(rmse, [np.nan])[target_num], rsquared=score.get(rsquared, [np.nan])[target_num], ) for score in scores for target_num, target in enumerate(self.targets) ] # ---------------------------------------------------------------- def _make_url(self) -> Optional[str]: url = comm._monitor_url() if not url: return None url += "getpipeline/" + comm._get_project_name() + "/" + self.id + "/0/" return url # ---------------------------------------------------------------- def _parse_cmd(self, json_obj: Dict[str, Any]) -> "Pipeline": ptype = json_obj["type_"] del json_obj["type_"] if ptype != "Pipeline": raise ValueError("Expected type 'Pipeline', got '" + ptype + "'.") preprocessors = [ _parse_preprocessor(elem) for elem in json_obj["preprocessors_"] ] del json_obj["preprocessors_"] feature_learners = [_parse_fe(elem) for elem in json_obj["feature_learners_"]] del json_obj["feature_learners_"] feature_selectors = [ _parse_pred(elem) for elem in json_obj["feature_selectors_"] ] del json_obj["feature_selectors_"] predictors = [_parse_pred(elem) for elem in json_obj["predictors_"]] del json_obj["predictors_"] data_model = _decode_data_model(json_obj["data_model_"]) del json_obj["data_model_"] peripheral = [_decode_placeholder(elem) for elem in json_obj["peripheral_"]] del json_obj["peripheral_"] id_ = json_obj["name_"] del json_obj["name_"] kwargs = _remove_trailing_underscores(json_obj) self.__init__( # type: ignore data_model=data_model, peripheral=peripheral, preprocessors=preprocessors, feature_learners=feature_learners, feature_selectors=feature_selectors, predictors=predictors, **kwargs, ) self._id = id_ return self # ---------------------------------------------------------------- def _parse_json_obj(self, all_json_objs: Dict[str, Any]) -> "Pipeline": obj = all_json_objs["obj"] scores = all_json_objs["scores"] targets = all_json_objs["targets"] self._parse_cmd(obj) scores = _remove_trailing_underscores(scores) scores = _replace_with_nan_maybe(scores) self._scores = scores self._targets = targets peripheral_metadata = [ _parse_metadata(m) for m in all_json_objs["peripheral_metadata"] ] population_metadata = _parse_metadata(all_json_objs["population_metadata"]) self._metadata = AllMetadata( peripheral=peripheral_metadata, population=population_metadata, ) return self # ---------------------------------------------------------------- def _save(self) -> None: """ Saves the pipeline as a JSON file. """ cmd = dict() cmd["type_"] = self.type + ".save" cmd["name_"] = self.id comm.send(cmd) # ------------------------------------------------------------ def _send(self, additional_tags: Optional[List[str]] = None) -> "Pipeline": self._validate() self._id = _make_id() cmd = self._getml_deserialize() if additional_tags is not None: cmd["tags_"] += additional_tags comm.send(cmd) return self # ------------------------------------------------------------ def _transform( self, peripheral_data_frames: Sequence[Union[DataFrame, View]], population_data_frame: Union[DataFrame, View], sock: socket.socket, score: bool = False, predict: bool = False, df_name: str = "", table_name: str = "", ) -> Union[NDArray[np.float_], None]: _check_df_types(population_data_frame, peripheral_data_frames) if not isinstance(sock, socket.socket): raise TypeError("'sock' must be a socket.") if not isinstance(score, bool): raise TypeError("'score' must be of type bool") if not isinstance(predict, bool): raise TypeError("'predict' must be of type bool") if not isinstance(table_name, str): raise TypeError("'table_name' must be of type str") if not isinstance(df_name, str): raise TypeError("'df_name' must be of type str") cmd: Dict[str, Any] = {} cmd["type_"] = self.type + ".transform" cmd["name_"] = self.id cmd["score_"] = score cmd["predict_"] = predict cmd["peripheral_dfs_"] = [ df._getml_deserialize() for df in peripheral_data_frames ] cmd["population_df_"] = population_data_frame._getml_deserialize() cmd["df_name_"] = df_name cmd["table_name_"] = table_name comm.send_string(sock, json.dumps(cmd)) msg = comm.log(sock) if msg == "Success!": if table_name == "" and df_name == "" and not score: yhat = comm.recv_float_matrix(sock) else: yhat = None else: comm.engine_exception_handler(msg) print() return yhat # ---------------------------------------------------------------- @property def accuracy(self) -> Union[float, List[float]]: """ A convenience wrapper to retrieve the accuracy of the latest scoring run (the last time `.score()` was called) on the pipeline. For programmatic access use `~getml.pipeline.metrics`. """ return self.scores.accuracy # ---------------------------------------------------------------- @property def auc(self) -> Union[float, List[float]]: """ A convenience wrapper to retrieve the auc of the latest scoring run (the last time `.score()` was called) on the pipeline. For programmatic access use `~getml.pipeline.metrics`. """ return self.scores.auc # ----------------------------------------------------------------
[docs] def check( self, population_table: Union[DataFrame, View, data.Subset], peripheral_tables: Optional[ Union[ Dict[str, Union[DataFrame, View]], Sequence[Union[DataFrame, View]], ] ] = None, ) -> Optional[Issues]: """ Checks the validity of the data model. Args: population_table (:class:`~getml.DataFrame`, :class:`~getml.View` or :class:`~getml.data.Subset`): Main table containing the target variable(s) and corresponding to the ``population`` :class:`~getml.data.Placeholder` instance variable. peripheral_tables (List[:class:`~getml.DataFrame` or :class:`~getml.View`], dict, :class:`~getml.DataFrame` or :class:`~getml.View`, optional): Additional tables corresponding to the ``peripheral`` :class:`~getml.data.Placeholder` instance variable. If passed as a list, the order needs to match the order of the corresponding placeholders passed to ``peripheral``. If you pass a :class:`~getml.data.Subset` to `population_table`, the peripheral tables from that subset will be used. If you use a :class:`~getml.data.Container`, :class:`~getml.data.StarSchema` or :class:`~getml.data.TimeSeries`, that means you are passing a :class:`~getml.data.Subset`. """ if isinstance(population_table, data.Subset): peripheral_tables = population_table.peripheral population_table = population_table.population peripheral_tables = _transform_peripheral(peripheral_tables, self.peripheral) _check_df_types(population_table, peripheral_tables) temp = copy.deepcopy(self) temp._send() cmd: Dict[str, Any] = {} cmd["type_"] = temp.type + ".check" cmd["name_"] = temp.id cmd["peripheral_dfs_"] = [df._getml_deserialize() for df in peripheral_tables] cmd["population_df_"] = population_table._getml_deserialize() with comm.send_and_get_socket(cmd) as sock: msg = comm.recv_string(sock) if msg != "Found!": comm.engine_exception_handler(msg) print("Checking data model...") msg = comm.log(sock) if msg != "Success!": comm.engine_exception_handler(msg) print() issues = Issues(comm.recv_issues(sock)) if len(issues) == 0: print("OK.") else: print( f"The pipeline check generated {len(issues.info)} " + f"issues labeled INFO and {len(issues.warnings)} " + "issues labeled WARNING." ) temp.delete() return None if len(issues) == 0 else issues
# ------------------------------------------------------------ @property def columns(self) -> Columns: """ :class:`~getml.pipeline.Columns` object that can be used to handle information about the original columns utilized by the feature learners. """ self._check_whether_fitted() return Columns(self.id, self.targets, self.peripheral) # ---------------------------------------------------------------- @property def cross_entropy(self) -> Union[float, List[float]]: """ A convenience wrapper to retrieve the cross entropy of the latest scoring run (the last time `.score()` was called) on the pipeline. For programmatic access use `~getml.pipeline.metrics`. """ return self.scores.cross_entropy # ----------------------------------------------------------------
[docs] def delete(self) -> None: """ Deletes the pipeline from the engine. Note: Caution: You can not undo this action! """ self._check_whether_fitted() cmd: Dict[str, Any] = {} cmd["type_"] = self.type + ".delete" cmd["name_"] = self.id cmd["mem_only_"] = False comm.send(cmd) self._id = NOT_FITTED
# ------------------------------------------------------------
[docs] def deploy(self, deploy: bool) -> None: """Allows a fitted pipeline to be addressable via an HTTP request. See :ref:`deployment` for details. Args: deploy (bool): If :code:`True`, the deployment of the pipeline will be triggered. """ self._check_whether_fitted() if not isinstance(deploy, bool): raise TypeError("'deploy' must be of type bool") self._validate() cmd: Dict[str, Any] = {} cmd["type_"] = self.type + ".deploy" cmd["name_"] = self.id cmd["deploy_"] = deploy comm.send(cmd) self._save()
# ------------------------------------------------------------ @property def features(self) -> Features: """ :class:`~getml.pipeline.Features` object that can be used to handle the features generated by the feature learners. """ self._check_whether_fitted() return Features(self.id, self.targets) # ------------------------------------------------------------
[docs] def fit( self, population_table: Union[DataFrame, View, data.Subset], peripheral_tables: Optional[ Union[ Sequence[Union[DataFrame, View]], Dict[str, Union[DataFrame, View]], ] ] = None, validation_table: Optional[Union[DataFrame, View, data.Subset]] = None, check: bool = True, ) -> "Pipeline": """Trains the feature learning algorithms, feature selectors and predictors. Args: population_table (:class:`~getml.DataFrame`, :class:`~getml.View` or :class:`~getml.data.Subset`): Main table containing the target variable(s) and corresponding to the ``population`` :class:`~getml.data.Placeholder` instance variable. peripheral_tables (List[:class:`~getml.DataFrame` or :class:`~getml.View`], dict, :class:`~getml.DataFrame` or :class:`~getml.View`, optional): Additional tables corresponding to the ``peripheral`` :class:`~getml.data.Placeholder` instance variable. If passed as a list, the order needs to match the order of the corresponding placeholders passed to ``peripheral``. If you pass a :class:`~getml.data.Subset` to `population_table`, the peripheral tables from that subset will be used. If you use a :class:`~getml.data.Container`, :class:`~getml.data.StarSchema` or :class:`~getml.data.TimeSeries`, that means you are passing a :class:`~getml.data.Subset`. validation_table (:class:`~getml.DataFrame`, :class:`~getml.View` or :class:`~getml.data.Subset`): Main table containing the target variable(s) and corresponding to the ``population`` :class:`~getml.data.Placeholder` instance variable. If you are passing a subset, that subset must be derived from the same container as *population_table*. Only used for early stopping in :class:`~getml.predictors.XGBoostClassifier` and :class:`~getml.predictors.XGBoostRegressor`. check (bool): Whether you want to check the data model before fitting. The checks are equivalent to the checks run by :meth:`~getml.Pipeline.check`. """ additional_tags = ( ["container-" + population_table.container_id] if isinstance(population_table, data.Subset) else [] ) if ( isinstance(population_table, data.Subset) and isinstance(validation_table, data.Subset) and validation_table.container_id != population_table.container_id ): raise ValueError( "The subset used for validation must be from the same container " + "as the subset used for training." ) if isinstance(population_table, data.Subset): peripheral_tables = population_table.peripheral population_table = population_table.population if isinstance(validation_table, data.Subset): validation_table = validation_table.population peripheral_tables = _transform_peripheral(peripheral_tables, self.peripheral) _check_df_types(population_table, peripheral_tables) if check: warnings = self.check(population_table, peripheral_tables) if warnings: print("To see the issues in full, run .check() on the pipeline.") print() self._send(additional_tags) cmd: Dict[str, Any] = {} cmd["type_"] = self.type + ".fit" cmd["name_"] = self.id cmd["peripheral_dfs_"] = [df._getml_deserialize() for df in peripheral_tables] cmd["population_df_"] = population_table._getml_deserialize() if validation_table is not None: cmd["validation_df_"] = validation_table._getml_deserialize() with comm.send_and_get_socket(cmd) as sock: msg = comm.recv_string(sock) if msg != "Found!": comm.engine_exception_handler(msg) begin = time.time() msg = comm.log(sock) end = time.time() if "Trained" in msg: print() print(msg) _print_time_taken(begin, end, "Time taken: ") else: comm.engine_exception_handler(msg) self._save() return self.refresh()
# ------------------------------------------------------------ @property def fitted(self) -> bool: """ Whether the pipeline has already been fitted. """ return self._id != NOT_FITTED # ---------------------------------------------------------------- @property def mae(self) -> Union[float, List[float]]: """ A convenience wrapper to retrieve the mae of the latest scoring run (the last time `.score()` was called) on the pipeline. For programmatic access use `~getml.pipeline.metrics`. """ return self.scores.mae # ------------------------------------------------------------ @property def plots(self) -> Plots: """ :class:`~getml.pipeline.Plots` object that can be used to generate plots like an ROC curve or a lift curve. """ self._check_whether_fitted() return Plots(self.id) # ------------------------------------------------------------ @property def id(self) -> str: """ ID of the pipeline. This is used to uniquely identify the pipeline on the engine. """ return self._id # ------------------------------------------------------------ @property def is_classification(self) -> bool: """ Whether the pipeline can used for classification problems. """ return self._check_classification_or_regression() # ------------------------------------------------------------ @property def is_regression(self) -> bool: """ Whether the pipeline can used for regression problems. """ return not self.is_classification # ------------------------------------------------------------ @property def metadata(self) -> Optional[AllMetadata]: """ Contains information on the data frames that were passed to .fit(...). The roles contained therein can be directly passed to existing data frames to correctly reassign the roles of existing columns. If the pipeline has not been fitted, this is None. """ return self._metadata # ------------------------------------------------------------ @property def name(self) -> str: """ Returns the ID of the pipeline. The name property is kept for backward compatibility. """ return self._id # ------------------------------------------------------------
[docs] def predict( self, population_table: Union[DataFrame, View, data.Subset], peripheral_tables: Optional[ Union[ Sequence[Union[DataFrame, View]], Dict[str, Union[DataFrame, View]], ] ] = None, table_name: str = "", ) -> Union[NDArray[np.float_], None]: """Forecasts on new, unseen data using the trained ``predictor``. Returns the predictions generated by the pipeline based on `population_table` and `peripheral_tables` or writes them into a data base named `table_name`. Args: population_table (:class:`~getml.DataFrame`, :class:`~getml.View` or :class:`~getml.data.Subset`): Main table containing the target variable(s) and corresponding to the ``population`` :class:`~getml.data.Placeholder` instance variable. peripheral_tables (List[:class:`~getml.DataFrame` or :class:`~getml.View`], dict, :class:`~getml.DataFrame` or :class:`~getml.View`, optional): Additional tables corresponding to the ``peripheral`` :class:`~getml.data.Placeholder` instance variable. If passed as a list, the order needs to match the order of the corresponding placeholders passed to ``peripheral``. If you pass a :class:`~getml.data.Subset` to `population_table`, the peripheral tables from that subset will be used. If you use a :class:`~getml.data.Container`, :class:`~getml.data.StarSchema` or :class:`~getml.data.TimeSeries`, that means you are passing a :class:`~getml.data.Subset`. table_name (str, optional): If not an empty string, the resulting predictions will be written into a table in a :mod:`~getml.database`. Refer to :ref:`unified_import_interface` for further information. Return: :class:`numpy.ndarray`: Resulting predictions provided in an array of the (number of rows in `population_table`, number of targets in `population_table`). Note: Only fitted pipelines (:meth:`~getml.Pipeline.fit`) can be used for prediction. """ self._check_whether_fitted() if isinstance(population_table, data.Subset): peripheral_tables = population_table.peripheral population_table = population_table.population peripheral_tables = _transform_peripheral(peripheral_tables, self.peripheral) _check_df_types(population_table, peripheral_tables) if not isinstance(table_name, str): raise TypeError("'table_name' must be of type str") self._validate() cmd: Dict[str, Any] = {} cmd["type_"] = self.type + ".transform" cmd["name_"] = self.id cmd["http_request_"] = False with comm.send_and_get_socket(cmd) as sock: msg = comm.recv_string(sock) if msg != "Found!": comm.engine_exception_handler(msg) y_hat = self._transform( peripheral_tables, population_table, sock, predict=True, table_name=table_name, ) return y_hat
# ------------------------------------------------------------
[docs] def refresh(self) -> "Pipeline": """Reloads the pipeline from the engine. This discards all local changes you have made since the last time you called :meth:`~getml.Pipeline.fit`. Returns: :class:`~getml.Pipeline`: Current instance """ cmd: Dict[str, Any] = {} cmd["type_"] = self.type + ".refresh" cmd["name_"] = self.id with comm.send_and_get_socket(cmd) as sock: msg = comm.recv_string(sock) if msg[0] != "{": comm.engine_exception_handler(msg) json_obj = json.loads(msg) self._parse_json_obj(json_obj) return self
# ---------------------------------------------------------------- @property def rmse(self) -> Union[float, List[float]]: """ A convenience wrapper to retrieve the rmse of the latest scoring run (the last time `.score()` was called) on the pipeline. For programmatic access use `~getml.pipeline.metrics`. """ return self.scores.rmse # ---------------------------------------------------------------- @property def rsquared(self) -> Union[float, List[float]]: """ A convenience wrapper to retrieve the rsquared of the latest scoring run (the last time `.score()` was called) on the pipeline. For programmatic access use `~getml.pipeline.metrics`. """ return self.scores.rsquared # ----------------------------------------------------------------
[docs] def score( self, population_table: Union[DataFrame, View, data.Subset], peripheral_tables: Optional[ Union[ Sequence[Union[DataFrame, View]], Dict[str, Union[DataFrame, View]], ] ] = None, ) -> Scores: """Calculates the performance of the ``predictor``. Returns different scores calculated on `population_table` and `peripheral_tables`. Args: population_table (:class:`~getml.DataFrame`, :class:`~getml.View` or :class:`~getml.data.Subset`): Main table containing the target variable(s) and corresponding to the ``population`` :class:`~getml.data.Placeholder` instance variable. peripheral_tables (List[:class:`~getml.DataFrame` or :class:`~getml.View`], dict, :class:`~getml.DataFrame` or :class:`~getml.View`, optional): Additional tables corresponding to the ``peripheral`` :class:`~getml.data.Placeholder` instance variable. If passed as a list, the order needs to match the order of the corresponding placeholders passed to ``peripheral``. If you pass a :class:`~getml.data.Subset` to `population_table`, the peripheral tables from that subset will be used. If you use a :class:`~getml.data.Container`, :class:`~getml.data.StarSchema` or :class:`~getml.data.TimeSeries`, that means you are passing a :class:`~getml.data.Subset`. Note: Only fitted pipelines (:meth:`~getml.Pipeline.fit`) can be scored. """ self._check_whether_fitted() if isinstance(population_table, data.Subset): peripheral_tables = population_table.peripheral population_table = population_table.population peripheral_tables = _transform_peripheral(peripheral_tables, self.peripheral) _check_df_types(population_table, peripheral_tables) cmd: Dict[str, Any] = {} cmd["type_"] = self.type + ".transform" cmd["name_"] = self.id cmd["http_request_"] = False with comm.send_and_get_socket(cmd) as sock: msg = comm.recv_string(sock) if msg != "Found!": comm.engine_exception_handler(msg) self._transform( peripheral_tables, population_table, sock, predict=True, score=True ) msg = comm.recv_string(sock) if msg != "Success!": comm.engine_exception_handler(msg) scores = comm.recv_string(sock) scores = json.loads(scores) self.refresh() self._save() return self.scores
# ---------------------------------------------------------------- @property def scores(self) -> Scores: """ Contains all scores generated by :meth:`~getml.Pipeline.score` Returns: :class:`~getml.pipeline.Scores`: A container that holds the scores for the pipeline. """ self._check_whether_fitted() scores = self._make_score_history() latest = {score: self._get_latest_score(score) for score in _all_metrics} return Scores(scores, latest) # ---------------------------------------------------------------- @property def scored(self) -> bool: """ Whether the pipeline has been scored. """ if self._scores is None: return False return len(self._scores) > 1 # ---------------------------------------------------------------- @property def tables(self) -> Tables: """ :class:`~getml.pipeline.Tables` object that can be used to handle information about the original tables utilized by the feature learners. """ self._check_whether_fitted() return Tables(self.targets, self.columns) # ---------------------------------------------------------------- @property def targets(self) -> List[str]: """ Contains the names of the targets used for this pipeline. """ self._check_whether_fitted() return copy.deepcopy(self._targets) # ----------------------------------------------------------------
[docs] def transform( self, population_table: Union[DataFrame, View, data.Subset], peripheral_tables: Optional[ Union[ Sequence[Union[DataFrame, View]], Dict[str, Union[DataFrame, View]], ] ] = None, df_name: str = "", table_name: str = "", ) -> Union[DataFrame, NDArray[np.float_], None]: """Translates new data into the trained features. Transforms the data passed in `population_table` and `peripheral_tables` into features, which can be inserted into machine learning models. Examples: By default, transform returns a :class:`numpy.ndarray`: .. code-block:: python my_features_array = pipe.transform() You can also export your features as a :class:`~getml.DataFrame` by providing the `df_name` argument: .. code-block:: python my_features_df = pipe.transform(df_name="my_features") Or you can write the results directly into a database: .. code-block:: python getml.database.connect_odbc(...) pipe.transform(table_name="MY_FEATURES") Args: population_table (:class:`~getml.DataFrame`, :class:`~getml.View` or :class:`~getml.data.Subset`): Main table containing the target variable(s) and corresponding to the ``population`` :class:`~getml.data.Placeholder` instance variable. peripheral_tables (List[:class:`~getml.DataFrame` or :class:`~getml.View`], dict, :class:`~getml.DataFrame` or :class:`~getml.View`, optional): Additional tables corresponding to the ``peripheral`` :class:`~getml.data.Placeholder` instance variable. If passed as a list, the order needs to match the order of the corresponding placeholders passed to ``peripheral``. If you pass a :class:`~getml.data.Subset` to `population_table`, the peripheral tables from that subset will be used. If you use a :class:`~getml.data.Container`, :class:`~getml.data.StarSchema` or :class:`~getml.data.TimeSeries`, that means you are passing a :class:`~getml.data.Subset`. df_name (str, optional): If not an empty string, the resulting features will be written into a newly created DataFrame. table_name (str, optional): If not an empty string, the resulting features will be written into a table in a :mod:`~getml.database`. Refer to :ref:`unified_import_interface` for further information. Note: Only fitted pipelines (:meth:`~getml.Pipeline.fit`) can transform data into features. """ self._check_whether_fitted() if isinstance(population_table, data.Subset): peripheral_tables = population_table.peripheral population_table = population_table.population peripheral_tables = _transform_peripheral(peripheral_tables, self.peripheral) _check_df_types(population_table, peripheral_tables) self._validate() cmd: Dict[str, Any] = {} cmd["type_"] = self.type + ".transform" cmd["name_"] = self.id cmd["http_request_"] = False with comm.send_and_get_socket(cmd) as sock: msg = comm.recv_string(sock) if msg != "Found!": comm.engine_exception_handler(msg) y_hat = self._transform( peripheral_tables, population_table, sock, df_name=df_name, table_name=table_name, ) if df_name != "": return data.DataFrame(name=df_name).refresh() return y_hat
# ---------------------------------------------------------------- def _validate(self) -> None: if not isinstance(self.id, str): raise TypeError("'name' must be of type str") if not isinstance(self.data_model, DataModel): raise TypeError("'data_model' must be a getml.data.DataModel.") if not _is_typed_list(self.peripheral, data.Placeholder): raise TypeError( "'peripheral' must be either a getml.data.Placeholder or a list thereof" ) if not _is_subclass_list(self.preprocessors, _Preprocessor): raise TypeError("'preprocessor' must be a list of _Preprocessor.") if not _is_subclass_list(self.feature_learners, _FeatureLearner): raise TypeError("'feature_learners' must be a list of _FeatureLearners.") if not _is_subclass_list(self.feature_selectors, _Predictor): raise TypeError( "'feature_selectors' must be a list of getml.predictors._Predictors." ) if not _is_subclass_list(self.predictors, _Predictor): raise TypeError( "'predictors' must be a list of getml.predictors._Predictors." ) if not isinstance(self.include_categorical, bool): raise TypeError("'include_categorical' must be a bool!") if not isinstance(self.share_selected_features, numbers.Real): raise TypeError("'share_selected_features' must be number!") if not _is_typed_list(self.tags, str): raise TypeError("'tags' must be a list of str.") if self.type != "Pipeline": raise ValueError("'type' must be 'Pipeline'") for kkey in self.__dict__: if kkey not in Pipeline._supported_params: # pylint: disable=E1101 raise KeyError( """Instance variable [""" + kkey + """] is not supported in Pipeline.""" ) for elem in self.feature_learners: elem.validate() for elem in self.feature_selectors: elem.validate() for elem in self.predictors: elem.validate() self._check_classification_or_regression()