Source code for getml.hyperopt.tuning

# Copyright 2020 The SQLNet Company GmbH

# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to
# deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:

# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.

# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
# DEALINGS IN THE SOFTWARE.

"""
Simplified tuning routines.
"""

import copy
import numbers
import time

import getml.communication as comm

from getml.data import (
    DataFrame
)

from getml.data.helpers import (
    _is_typed_list,
)

from getml import feature_learning

import getml.pipeline

from getml.pipeline.helpers import(
    _print_time_taken,
    _transform_peripheral
)

from getml.pipeline import scores

# -----------------------------------------------------------------------------


def _infer_score(pipeline):
    if pipeline.is_classification:
        return getml.pipeline.scores.auc
    return getml.pipeline.scores.rmse

# -----------------------------------------------------------------------------


def _make_final_pipeline(
        pipeline,
        tuned_feature_learners,
        tuned_predictors,
        population_table_training,
        population_table_validation,
        peripheral_tables):

    print("Building final pipeline...")
    print()

    final_pipeline = copy.deepcopy(pipeline)

    final_pipeline.feature_learners = tuned_feature_learners
    final_pipeline.predictors = tuned_predictors

    final_pipeline.fit(
        population_table=population_table_training,
        peripheral_tables=peripheral_tables
    )

    final_pipeline.score(
        population_table=population_table_validation,
        peripheral_tables=peripheral_tables
    )

    return final_pipeline

# -----------------------------------------------------------------------------


def _tune(
        what,
        pipeline,
        population_table_training,
        population_table_validation,
        peripheral_tables=None,
        n_iter=111,
        score=scores.rmse,
        num_threads=0,
        horizon=0.0,
        memory=0.0,
        allow_lagged_targets=True,
        self_join_keys=None,
        ts_name="",
        delta_t=0.0
):
    """
    Internal base tuning function that is called by other tuning functions.
    """
    # -----------------------------------------------------------

    peripheral_tables = peripheral_tables or []

    # -----------------------------------------------------------

    pipeline.check(
        population_table=population_table_training,
        peripheral_tables=peripheral_tables
    )

    # -----------------------------------------------------------

    peripheral_tables = _transform_peripheral(
        peripheral_tables, pipeline.peripheral)

    # -----------------------------------------------------------

    if not isinstance(population_table_training, DataFrame):
        raise TypeError("""'population_table_training'
                            must be a getml.data.DataFrame""")

    if not isinstance(population_table_validation, DataFrame):
        raise TypeError("""'population_table_validation'
                            must be a getml.data.DataFrame""")

    if not _is_typed_list(peripheral_tables, DataFrame):
        raise TypeError("""'peripheral_tables' must be a
                          getml.data.DataFrame, a list or
                          dictionary""")

    if not isinstance(n_iter, numbers.Real):
        raise TypeError("""'n_iter' must be a real number""")

    if not isinstance(num_threads, numbers.Real):
        raise TypeError("""'num_threads' must be a real number""")

    # -----------------------------------------------------------

    cmd = dict()

    cmd["name_"] = ""
    cmd["type_"] = "Hyperopt.tune"

    cmd["n_iter_"] = n_iter
    cmd["num_threads_"] = num_threads
    cmd["pipeline_"] = pipeline._getml_deserialize()
    cmd["score_"] = score
    cmd["what_"] = what

    cmd["population_training_name_"] = population_table_training.name
    cmd["population_validation_name_"] = population_table_validation.name
    cmd["peripheral_names_"] = [elem.name for elem in peripheral_tables]

    if what in ["RelboostTimeSeries", "MultirelTimeSeries"]:
        cmd["horizon_"] = horizon
        cmd["memory_"] = memory
        cmd["allow_lagged_targets_"] = allow_lagged_targets
        cmd["self_join_keys_"] = self_join_keys or []
        cmd["ts_name_"] = ts_name
        cmd["delta_t_"] = delta_t

    sock = comm.send_and_receive_socket(cmd)

    # ------------------------------------------------------------

    begin = time.time()

    msg = comm.log(sock)

    end = time.time()

    # ------------------------------------------------------------

    if msg != "Success!":
        comm.engine_exception_handler(msg)

    print()
    _print_time_taken(begin, end, "Time taken: ")

    # ------------------------------------------------------------

    pipeline_name = comm.recv_string(sock)

    return getml.pipeline.load(pipeline_name)


# -----------------------------------------------------------------------------


def _tune_feature_learner(
        feature_learner,
        pipeline,
        population_table_training,
        population_table_validation,
        peripheral_tables,
        n_iter,
        score,
        num_threads):

    if feature_learner.type == "MultirelModel":
        return _tune(
            "Multirel",
            pipeline,
            population_table_training,
            population_table_validation,
            peripheral_tables,
            n_iter,
            score,
            num_threads)

    if feature_learner.type == "MultirelTimeSeries":
        return _tune(
            "MultirelTimeSeries",
            pipeline,
            population_table_training,
            population_table_validation,
            peripheral_tables,
            n_iter,
            score,
            num_threads,
            feature_learner.horizon,
            feature_learner.memory,
            feature_learner.allow_lagged_targets,
            feature_learner.self_join_keys,
            feature_learner.ts_name,
            feature_learner.delta_t)

    if feature_learner.type == "RelboostModel":
        return _tune(
            "Relboost",
            pipeline,
            population_table_training,
            population_table_validation,
            peripheral_tables,
            n_iter,
            score,
            num_threads)

    if feature_learner.type == "RelboostTimeSeries":
        return _tune(
            "RelboostTimeSeries",
            pipeline,
            population_table_training,
            population_table_validation,
            peripheral_tables,
            n_iter,
            score,
            num_threads,
            feature_learner.horizon,
            feature_learner.memory,
            feature_learner.allow_lagged_targets,
            feature_learner.self_join_keys,
            feature_learner.ts_name,
            feature_learner.delta_t)

    if feature_learner.type == "RelMTModel":
        return _tune(
            "RelMT",
            pipeline,
            population_table_training,
            population_table_validation,
            peripheral_tables,
            n_iter,
            score,
            num_threads)

    if feature_learner.type == "RelMTTimeSeries":
        return _tune(
            "RelMTTimeSeries",
            pipeline,
            population_table_training,
            population_table_validation,
            peripheral_tables,
            n_iter,
            score,
            num_threads,
            feature_learner.horizon,
            feature_learner.memory,
            feature_learner.allow_lagged_targets,
            feature_learner.self_join_keys,
            feature_learner.ts_name,
            feature_learner.delta_t)

    raise ValueError("Unknown feature learner: " + feature_learner.type + "!")

# -----------------------------------------------------------------------------


def _tune_predictor(
        predictor,
        pipeline,
        population_table_training,
        population_table_validation,
        peripheral_tables,
        n_iter,
        score,
        num_threads):

    if "XGBoost" in predictor.type:
        return _tune(
            "XGBoost",
            pipeline,
            population_table_training,
            population_table_validation,
            peripheral_tables,
            n_iter,
            score,
            num_threads)

    if "Regression" in predictor.type:
        return _tune(
            "Linear",
            pipeline,
            population_table_training,
            population_table_validation,
            peripheral_tables,
            n_iter,
            score,
            num_threads)

    raise ValueError("Unknown predictor: '" + predictor.type + "'!")

# -----------------------------------------------------------------------------


[docs]def tune_feature_learners( pipeline, population_table_training, population_table_validation, peripheral_tables=None, n_iter=0, score=None, num_threads=0): """ Returns a pipeline containing tuned feature learners. Args: pipeline (:class:`~getml.pipeline.Pipeline`): Base pipeline used to derive all models fitted and scored during the hyperparameter optimization. It defines the data schema and any hyperparameters that are not optimized. population_table_training(:class:`~getml.data.DataFrame`): The population table that pipelines will be trained on. population_table_validation(:class:`~getml.data.DataFrame`): The population table that pipelines will be evaluated on. peripheral_tables(:class:`~getml.data.DataFrame`, list or dict): The peripheral tables used to provide additional information for the population tables. n_iter (int, optional): The number of iterations. score (str, optional): The score to optimize. Must be from :mod:`~getml.pipeline.scores`. num_threads (int, optional): The number of parallel threads to use. If set to 0, the number of threads will be inferred. Example: We assume that you have already set up your :class:`~getml.pipeline.Pipeline`. Moreover, we assume that you have defined a training set and a validation set as well as the peripheral tables. .. code-block:: python tuned_pipeline = getml.hyperopt.tune_feature_learners( pipeline=base_pipeline, population_table_training=training_set, population_table_validation=validation_set, peripheral_tables=peripheral_tables) Returns: A :class:`~getml.pipeline.Pipeline` containing tuned versions of the feature learners. Raises: TypeError: If any instance variable is of a wrong type. """ if not isinstance(pipeline, getml.pipeline.Pipeline): raise TypeError("'pipeline' must be a pipeline!") pipeline.validate() if not score: score = _infer_score(pipeline) tuned_feature_learners = [] for feature_learner in pipeline.feature_learners: tuned_pipeline = _tune_feature_learner( feature_learner=feature_learner, pipeline=pipeline, population_table_training=population_table_training, population_table_validation=population_table_validation, peripheral_tables=peripheral_tables, n_iter=n_iter, score=score, num_threads=num_threads) assert len( tuned_pipeline.feature_learners) == 1, "Expected exactly one feature learner!" tuned_feature_learners.append(tuned_pipeline.feature_learners[0]) return _make_final_pipeline( pipeline, tuned_feature_learners, copy.deepcopy(pipeline.predictors), population_table_training, population_table_validation, peripheral_tables)
# -----------------------------------------------------------------------------
[docs]def tune_predictors( pipeline, population_table_training, population_table_validation, peripheral_tables=None, n_iter=0, score=None, num_threads=0): """ Returns a pipeline containing tuned predictors. Args: pipeline (:class:`~getml.pipeline.Pipeline`): Base pipeline used to derive all models fitted and scored during the hyperparameter optimization. It defines the data schema and any hyperparameters that are not optimized. population_table_training(:class:`~getml.data.DataFrame`): The population table that pipelines will be trained on. population_table_validation(:class:`~getml.data.DataFrame`): The population table that pipelines will be evaluated on. peripheral_tables(:class:`~getml.data.DataFrame`, list or dict): The peripheral tables used to provide additional information for the population tables. n_iter (int, optional): The number of iterations. score (str, optional): The score to optimize. Must be from :mod:`~getml.pipeline.scores`. num_threads (int, optional): The number of parallel threads to use. If set to 0, the number of threads will be inferred. Example: We assume that you have already set up your :class:`~getml.pipeline.Pipeline`. Moreover, we assume that you have defined a training set and a validation set as well as the peripheral tables. .. code-block:: python tuned_pipeline = getml.hyperopt.tune_predictors( pipeline=base_pipeline, population_table_training=training_set, population_table_validation=validation_set, peripheral_tables=peripheral_tables) Returns: A :class:`~getml.pipeline.Pipeline` containing a tuned predictors. Raises: TypeError: If any instance variable is of a wrong type. """ if not isinstance(pipeline, getml.pipeline.Pipeline): raise TypeError("'pipeline' must be a pipeline!") pipeline.validate() if not score: score = _infer_score(pipeline) tuned_predictors = [] for predictor in pipeline.predictors: tuned_pipeline = _tune_predictor( predictor=predictor, pipeline=pipeline, population_table_training=population_table_training, population_table_validation=population_table_validation, peripheral_tables=peripheral_tables, n_iter=n_iter, score=score, num_threads=num_threads) assert len( tuned_pipeline.predictors) == 1, "Expected exactly one predictor!" tuned_predictors.append(tuned_pipeline.predictors[0]) return _make_final_pipeline( pipeline, copy.deepcopy(pipeline.feature_learners), tuned_predictors, population_table_training, population_table_validation, peripheral_tables)