Source code for getml.datasets.samples_generator

# Copyright 2022 The SQLNet Company GmbH
#
# This file is licensed under the Elastic License 2.0 (ELv2).
# Refer to the LICENSE.txt file in the root of the repository
# for details.
#

"""
Generate samples of artificial data sets.
"""

import datetime
from typing import Optional, Tuple

import numpy as np
import pandas as pd  # type: ignore

import getml.data as data
from getml.data.data_frame import DataFrame
from getml.feature_learning import aggregations

# -----------------------------------------------------------------------------


def _aggregate(
    table: pd.DataFrame, aggregation: aggregations.Aggregation, col: str, join_key: str
) -> pd.DataFrame:
    """
    Implements the aggregation."""

    if aggregation == aggregations.Avg:
        return table[[col, join_key]].groupby([join_key], as_index=False).mean()

    if aggregation == aggregations.Count:
        return table[[col, join_key]].groupby([join_key], as_index=False).count()

    if aggregation == aggregations.CountDistinct:
        series = (
            table[[col, join_key]].groupby([join_key], as_index=False)[col].nunique()
        )

        output = table[[col, join_key]].groupby([join_key], as_index=False).count()

        output[col] = series

        return output

    if aggregation == aggregations.CountMinusCountDistinct:
        series = (
            table[[col, join_key]].groupby([join_key], as_index=False)[col].nunique()
        )

        output = table[[col, join_key]].groupby([join_key], as_index=False).count()

        output[col] -= series

        return output

    if aggregation == aggregations.Max:
        return table[[col, join_key]].groupby([join_key], as_index=False).max()

    if aggregation == aggregations.Median:
        return table[[col, join_key]].groupby([join_key], as_index=False).median()

    if aggregation == aggregations.Min:
        return table[[col, join_key]].groupby([join_key], as_index=False).min()

    if aggregation == aggregations.Stddev:
        return table[[col, join_key]].groupby([join_key], as_index=False).std()

    if aggregation == aggregations.Sum:
        return table[[col, join_key]].groupby([join_key], as_index=False).sum()

    if aggregation == aggregations.Var:
        return table[[col, join_key]].groupby([join_key], as_index=False).var()

    raise Exception("Aggregation '" + aggregation + "' not known!")


# -----------------------------------------------------------------------------


[docs]def make_categorical( n_rows_population: int = 500, n_rows_peripheral: int = 125000, random_state: Optional[int] = None, population_name: str = "", peripheral_name: str = "", aggregation: aggregations.Aggregation = aggregations.Count, ) -> Tuple[DataFrame, DataFrame]: """ Generate a random dataset with categorical variables The dataset consists of a population table and one peripheral table. The peripheral table has 3 columns: * `column_01`: random categorical variable between '0' and '9' * `join_key`: random integer in the range from 0 to ``n_rows_population`` * `time_stamp`: random number between 0 and 1 The population table has 4 columns: * `column_01`: random categorical variable between '0' and '9' * `join_key`: unique integer in the range from 0 to ``n_rows_population`` * `time_stamp`: random number between 0 and 1 * `targets`: target variable. Defined as the number of matching entries in the peripheral table for which ``time_stamp_peripheral < time_stamp_population`` and the category in the peripheral table is not 1, 2 or 9. The SQL definition of the target variable read like this .. code-block:: sql SELECT aggregation( column_01 ) FROM POPULATION_TABLE t1 LEFT JOIN PERIPHERAL_TABLE t2 ON t1.join_key = t2.join_key WHERE ( ( t2.column_01 != '1' AND t2.column_01 != '2' AND t2.column_01 != '9' ) ) AND t2.time_stamps <= t1.time_stamps GROUP BY t1.join_key, t1.time_stamp; Args: n_rows_population (int, optional): Number of rows in the population table. n_row_peripheral (int, optional): Number of rows in the peripheral table. random_state (Optional[int], optional): Seed to initialize the random number generator used for the dataset creation. If set to None, the seed will be the 'microsecond' component of :py:func:`datetime.datetime.now()`. population_name (string, optional): Name assigned to the create :class:`~getml.DataFrame` holding the population table. If set to a name already existing on the getML engine, the corresponding :class:`~getml.DataFrame` will be overwritten. If set to an empty string, a unique name will be generated by concatenating `categorical_population_` and the seed of the random number generator. peripheral_name (string, optional): Name assigned to the create :class:`~getml.DataFrame` holding the peripheral table. If set to a name already existing on the getML engine, the corresponding :class:`~getml.DataFrame` will be overwritten. If set to an empty string, a unique name will be generated by concatenating `categorical_peripheral_` and the seed of the random number generator. aggregation(string, optional): :mod:`~getml.models.aggregations` used to generate the 'target' column. Returns: tuple: tuple containing: * population (:class:`getml.DataFrame`): Population table * peripheral (:class:`getml.DataFrame`): Peripheral table """ if random_state is None: random_state = datetime.datetime.now().microsecond random = np.random.RandomState(random_state) # pylint: disable=E1101 population_table = pd.DataFrame() population_table["column_01"] = random.randint(0, 10, n_rows_population).astype(str) population_table["join_key"] = np.arange(n_rows_population) population_table["time_stamp_population"] = random.rand(n_rows_population) peripheral_table = pd.DataFrame() peripheral_table["column_01"] = random.randint(0, 10, n_rows_peripheral).astype(str) peripheral_table["join_key"] = random.randint( 0, n_rows_population, n_rows_peripheral ) peripheral_table["time_stamp_peripheral"] = random.rand(n_rows_peripheral) # Compute targets temp = peripheral_table.merge( population_table[["join_key", "time_stamp_population"]], how="left", on="join_key", ) # Apply some conditions temp = temp[ (temp["time_stamp_peripheral"] <= temp["time_stamp_population"]) & (temp["column_01"] != "1") & (temp["column_01"] != "2") & (temp["column_01"] != "9") ] # Define the aggregation temp = _aggregate(temp, aggregation, "column_01", "join_key") temp = temp.rename(index=str, columns={"column_01": "targets"}) population_table = population_table.merge(temp, how="left", on="join_key") del temp population_table = population_table.rename( index=str, columns={"time_stamp_population": "time_stamp"} ) peripheral_table = peripheral_table.rename( index=str, columns={"time_stamp_peripheral": "time_stamp"} ) # Replace NaN targets with 0.0 - target values may never be NaN!. population_table.targets = np.where( np.isnan(population_table["targets"]), 0, population_table["targets"] ) # Set default names if none where provided. if not population_name: population_name = "categorical_population_" + str(random_state) if not peripheral_name: peripheral_name = "categorical_peripheral_" + str(random_state) # Create the data.DataFrame counterpart. population_on_engine = data.DataFrame( name=population_name, roles={ "join_key": ["join_key"], "categorical": ["column_01"], "time_stamp": ["time_stamp"], "target": ["targets"], }, ).read_pandas(population_table) peripheral_on_engine = data.DataFrame( name=peripheral_name, roles={ "join_key": ["join_key"], "categorical": ["column_01"], "time_stamp": ["time_stamp"], }, ).read_pandas(peripheral_table) return population_on_engine, peripheral_on_engine
# -----------------------------------------------------------------------------
[docs]def make_discrete( n_rows_population: int = 500, n_rows_peripheral: int = 125000, random_state: Optional[int] = None, population_name: str = "", peripheral_name: str = "", aggregation: aggregations.Aggregation = aggregations.Count, ) -> Tuple[DataFrame, DataFrame]: """ Generate a random dataset with categorical variables The dataset consists of a population table and one peripheral table. The peripheral table has 3 columns: * `column_01`: random integer between -10 and 10 * `join_key`: random integer in the range from 0 to ``n_rows_population`` * `time_stamp`: random number between 0 and 1 The population table has 4 columns: * `column_01`: random number between -1 and 1 * `join_key`: unique integer in the range from 0 to ``n_rows_population`` * `time_stamp`: random number between 0 and 1 * `targets`: target variable. Defined as the minimum value greater than 0 in the peripheral table for which ``time_stamp_peripheral < time_stamp_population`` and the join key matches .. code-block:: sql SELECT aggregation( column_01 ) FROM POPULATION t1 LEFT JOIN PERIPHERAL t2 ON t1.join_key = t2.join_key WHERE ( ( t2.column_01 > 0 ) ) AND t2.time_stamp <= t1.time_stamp GROUP BY t1.join_key, t1.time_stamp; Args: n_rows_population (int, optional): Number of rows in the population table. n_row_peripheral (int, optional): Number of rows in the peripheral table. random_state (Optional[int], optional): Seed to initialize the random number generator used for the dataset creation. If set to None, the seed will be the 'microsecond' component of :py:func:`datetime.datetime.now()`. population_name (string, optional): Name assigned to the create :class:`~getml.DataFrame` holding the population table. If set to a name already existing on the getML engine, the corresponding :class:`~getml.DataFrame` will be overwritten. If set to an empty string, a unique name will be generated by concatenating `discrete_population_` and the seed of the random number generator. peripheral_name (string, optional): Name assigned to the create :class:`~getml.DataFrame` holding the peripheral table. If set to a name already existing on the getML engine, the corresponding :class:`~getml.DataFrame` will be overwritten. If set to an empty string, a unique name will be generated by concatenating `discrete_peripheral_` and the seed of the random number generator. aggregation(string, optional): :mod:`~getml.models.aggregations` used to generate the 'target' column. Returns: tuple: tuple containing: * population (:class:`getml.DataFrame`): Population table * peripheral (:class:`getml.DataFrame`): Peripheral table """ if random_state is None: random_state = datetime.datetime.now().microsecond random = np.random.RandomState(random_state) # pylint: disable=E1101 population_table = pd.DataFrame() population_table["column_01"] = random.randint(0, 10, n_rows_population).astype(str) population_table["join_key"] = np.arange(n_rows_population) population_table["time_stamp_population"] = random.rand(n_rows_population) peripheral_table = pd.DataFrame() peripheral_table["column_01"] = random.randint(-11, 11, n_rows_peripheral) peripheral_table["join_key"] = random.randint( 0, n_rows_population, n_rows_peripheral ) peripheral_table["time_stamp_peripheral"] = random.rand(n_rows_peripheral) # Compute targets temp = peripheral_table.merge( population_table[["join_key", "time_stamp_population"]], how="left", on="join_key", ) # Apply some conditions temp = temp[ (temp["time_stamp_peripheral"] <= temp["time_stamp_population"]) & (temp["column_01"] > 0.0) ] # Define the aggregation temp = _aggregate(temp, aggregation, "column_01", "join_key") temp = temp.rename(index=str, columns={"column_01": "targets"}) population_table = population_table.merge(temp, how="left", on="join_key") del temp population_table = population_table.rename( index=str, columns={"time_stamp_population": "time_stamp"} ) peripheral_table = peripheral_table.rename( index=str, columns={"time_stamp_peripheral": "time_stamp"} ) # Replace NaN targets with 0.0 - target values may never be NaN!. population_table.targets = np.where( np.isnan(population_table["targets"]), 0, population_table["targets"] ) # Set default names if none where provided. if not population_name: population_name = "discrete_population_" + str(random_state) if not peripheral_name: peripheral_name = "discrete_peripheral_" + str(random_state) # Create the data.DataFrame counterpart. population_on_engine = data.DataFrame( name=population_name, roles={ "join_key": ["join_key"], "numerical": ["column_01"], "time_stamp": ["time_stamp"], "target": ["targets"], }, ).read_pandas(population_table) peripheral_on_engine = data.DataFrame( name=peripheral_name, roles={ "join_key": ["join_key"], "numerical": ["column_01"], "time_stamp": ["time_stamp"], }, ).read_pandas(peripheral_table) return population_on_engine, peripheral_on_engine
# -----------------------------------------------------------------------------
[docs]def make_numerical( n_rows_population: int = 500, n_rows_peripheral: int = 125000, random_state: Optional[int] = None, population_name: str = "", peripheral_name: str = "", aggregation: aggregations.Aggregation = aggregations.Count, ) -> Tuple[DataFrame, DataFrame]: """ Generate a random dataset with continous numerical variables The dataset consists of a population table and one peripheral table. The peripheral table has 3 columns: * `column_01`: random number between -1 and 1 * `join_key`: random integer in the range from 0 to ``n_rows_population`` * `time_stamp`: random number between 0 and 1 The population table has 4 columns: * `column_01`: random number between -1 and 1 * `join_key`: unique integer in the range from 0 to ``n_rows_population`` * `time_stamp`: random number between 0 and 1 * `targets`: target variable. Defined as the number of matching entries in the peripheral table for which ``time_stamp_peripheral < time_stamp_population < time_stamp_peripheral + 0.5`` .. code-block:: sql SELECT aggregation( column_01 ) FROM POPULATION t1 LEFT JOIN PERIPHERAL t2 ON t1.join_key = t2.join_key WHERE ( ( t1.time_stamp - t2.time_stamp <= 0.5 ) ) AND t2.time_stamp <= t1.time_stamp GROUP BY t1.join_key, t1.time_stamp; Args: n_rows_population (int, optional): Number of rows in the population table. n_row_peripheral (int, optional): Number of rows in the peripheral table. random_state (Optional[int], optional): Seed to initialize the random number generator used for the dataset creation. If set to None, the seed will be the 'microsecond' component of :py:func:`datetime.datetime.now()`. population_name (string, optional): Name assigned to the create :class:`~getml.DataFrame` holding the population table. If set to a name already existing on the getML engine, the corresponding :class:`~getml.DataFrame` will be overwritten. If set to an empty string, a unique name will be generated by concatenating `numerical_population_` and the seed of the random number generator. peripheral_name (string, optional): Name assigned to the create :class:`~getml.DataFrame` holding the peripheral table. If set to a name already existing on the getML engine, the corresponding :class:`~getml.DataFrame` will be overwritten. If set to an empty string, a unique name will be generated by concatenating `numerical_peripheral_` and the seed of the random number generator. aggregation(string, optional): :mod:`~getml.models.aggregations` used to generate the 'target' column. Returns: tuple: tuple containing: * population (:class:`getml.DataFrame`): Population table * peripheral (:class:`getml.DataFrame`): Peripheral table """ if random_state is None: random_state = datetime.datetime.now().microsecond random = np.random.RandomState(random_state) # pylint: disable=E1101 population_table = pd.DataFrame() population_table["column_01"] = random.rand(n_rows_population) * 2.0 - 1.0 population_table["join_key"] = np.arange(n_rows_population) population_table["time_stamp_population"] = random.rand(n_rows_population) peripheral_table = pd.DataFrame() peripheral_table["column_01"] = random.rand(n_rows_peripheral) * 2.0 - 1.0 peripheral_table["join_key"] = random.randint( 0, n_rows_population, n_rows_peripheral ) peripheral_table["time_stamp_peripheral"] = random.rand(n_rows_peripheral) # Compute targets temp = peripheral_table.merge( population_table[["join_key", "time_stamp_population"]], how="left", on="join_key", ) # Apply some conditions temp = temp[ (temp["time_stamp_peripheral"] <= temp["time_stamp_population"]) & (temp["time_stamp_peripheral"] >= temp["time_stamp_population"] - 0.5) ] # Define the aggregation temp = _aggregate(temp, aggregation, "column_01", "join_key") temp = temp.rename(index=str, columns={"column_01": "targets"}) population_table = population_table.merge(temp, how="left", on="join_key") del temp population_table = population_table.rename( index=str, columns={"time_stamp_population": "time_stamp"} ) peripheral_table = peripheral_table.rename( index=str, columns={"time_stamp_peripheral": "time_stamp"} ) # Replace NaN targets with 0.0 - target values may never be NaN!. population_table.targets = np.where( np.isnan(population_table["targets"]), 0, population_table["targets"] ) # Set default names if none where provided. if not population_name: population_name = "numerical_population_" + str(random_state) if not peripheral_name: peripheral_name = "numerical_peripheral_" + str(random_state) # Create the data.DataFrame counterpart. population_on_engine = data.DataFrame( name=population_name, roles={ "join_key": ["join_key"], "numerical": ["column_01"], "time_stamp": ["time_stamp"], "target": ["targets"], }, ).read_pandas(population_table) peripheral_on_engine = data.DataFrame( name=peripheral_name, roles={ "join_key": ["join_key"], "numerical": ["column_01"], "time_stamp": ["time_stamp"], }, ).read_pandas(peripheral_table) return population_on_engine, peripheral_on_engine
# -----------------------------------------------------------------------------
[docs]def make_same_units_categorical( n_rows_population: int = 500, n_rows_peripheral: int = 125000, random_state: Optional[int] = None, population_name: str = "", peripheral_name: str = "", aggregation: aggregations.Aggregation = aggregations.Count, ) -> Tuple[DataFrame, DataFrame]: """ Generate a random dataset with categorical variables The dataset consists of a population table and one peripheral table. The peripheral table has 3 columns: * `column_01`: random categorical variable between '0' and '9' * `join_key`: random integer in the range from 0 to ``n_rows_population`` * `time_stamp`: random number between 0 and 1 The population table has 4 columns: * `column_01`: random categorical variable between '0' and '9' * `join_key`: unique integer in the range from 0 to ``n_rows_population`` * `time_stamp`: random number between 0 and 1 * `targets`: target variable. Defined as the number of matching entries in the peripheral table for which ``time_stamp_peripheral < time_stamp_population`` and the category in the peripheral table is not 1, 2 or 9 .. code-block:: sql SELECT aggregation( column_02 ) FROM POPULATION_TABLE t1 LEFT JOIN PERIPHERAL_TABLE t2 ON t1.join_key = t2.join_key WHERE ( ( t1.column_01 == t2.column_01 ) ) AND t2.time_stamps <= t1.time_stamps GROUP BY t1.join_key, t1.time_stamp; Args: n_rows_population (int, optional): Number of rows in the population table. n_row_peripheral (int, optional): Number of rows in the peripheral table. random_state (Optional[int], optional): Seed to initialize the random number generator used for the dataset creation. If set to None, the seed will be the 'microsecond' component of :py:func:`datetime.datetime.now()`. population_name (string, optional): Name assigned to the create :class:`~getml.DataFrame` holding the population table. If set to a name already existing on the getML engine, the corresponding :class:`~getml.DataFrame` will be overwritten. If set to an empty string, a unique name will be generated by concatenating `make_same_units_categorical_population_` and the seed of the random number generator. peripheral_name (string, optional): Name assigned to the create :class:`~getml.DataFrame` holding the peripheral table. If set to a name already existing on the getML engine, the corresponding :class:`~getml.DataFrame` will be overwritten. If set to an empty string, a unique name will be generated by concatenating `make_same_units_categorical_peripheral_` and the seed of the random number generator. aggregation(string, optional): :mod:`~getml.models.aggregations` used to generate the 'target' column. Returns: tuple: tuple containing: * population (:class:`getml.DataFrame`): Population table * peripheral (:class:`getml.DataFrame`): Peripheral table """ if random_state is None: random_state = datetime.datetime.now().microsecond random = np.random.RandomState(random_state) # pylint: disable=E1101 population_table = pd.DataFrame() population_table["column_01_population"] = ( (random.rand(n_rows_population) * 10.0).astype(np.int32).astype(str) ) population_table["join_key"] = range(n_rows_population) population_table["time_stamp_population"] = random.rand(n_rows_population) peripheral_table = pd.DataFrame() peripheral_table["column_01_peripheral"] = ( (random.rand(n_rows_peripheral) * 10.0).astype(np.int32).astype(str) ) peripheral_table["column_02"] = random.rand(n_rows_peripheral) * 2.0 - 1.0 peripheral_table["join_key"] = [ int(float(n_rows_population) * random.rand(1)[0]) for i in range(n_rows_peripheral) ] peripheral_table["time_stamp_peripheral"] = random.rand(n_rows_peripheral) # ---------------- temp = peripheral_table.merge( population_table[["join_key", "time_stamp_population", "column_01_population"]], how="left", on="join_key", ) # Apply some conditions temp = temp[ (temp["time_stamp_peripheral"] <= temp["time_stamp_population"]) & (temp["column_01_peripheral"] == temp["column_01_population"]) ] # Define the aggregation temp = _aggregate(temp, aggregation, "column_02", "join_key") temp = temp.rename(index=str, columns={"column_02": "targets"}) population_table = population_table.merge(temp, how="left", on="join_key") population_table = population_table.rename( index=str, columns={"column_01_population": "column_01"} ) peripheral_table = peripheral_table.rename( index=str, columns={"column_01_peripheral": "column_01"} ) del temp # ---------------- population_table = population_table.rename( index=str, columns={"time_stamp_population": "time_stamp"} ) peripheral_table = peripheral_table.rename( index=str, columns={"time_stamp_peripheral": "time_stamp"} ) # ---------------- # Replace NaN targets with 0.0 - target values may never be NaN!. population_table["targets"] = [ 0.0 if val != val else val for val in population_table["targets"] ] # ---------------- # Set default names if none where provided. population_name = ( population_name or "make_same_units_categorical_population__" + str(random_state) ) peripheral_name = ( peripheral_name or "make_same_units_categorical_peripheral__" + str(random_state) ) # Create the data.DataFrame counterpart. population_on_engine = data.DataFrame( name=population_name, roles={ "join_key": ["join_key"], "categorical": ["column_01"], "time_stamp": ["time_stamp"], "target": ["targets"], }, ).read_pandas(population_table) peripheral_on_engine = data.DataFrame( name=peripheral_name, roles={ "join_key": ["join_key"], "categorical": ["column_01"], "numerical": ["column_02"], "time_stamp": ["time_stamp"], }, ).read_pandas(peripheral_table) # ---------------- return population_on_engine, peripheral_on_engine
# -----------------------------------------------------------------------------
[docs]def make_same_units_numerical( n_rows_population: int = 500, n_rows_peripheral: int = 125000, random_state: Optional[int] = None, population_name: str = "", peripheral_name: str = "", aggregation: aggregations.Aggregation = aggregations.Count, ) -> Tuple[DataFrame, DataFrame]: """ Generate a random dataset with continous numerical variables The dataset consists of a population table and one peripheral table. The peripheral table has 3 columns: * `column_01`: random number between -1 and 1 * `join_key`: random integer in the range from 0 to ``n_rows_population`` * `time_stamp`: random number between 0 and 1 The population table has 4 columns: * `column_01`: random number between -1 and 1 * `join_key`: unique integer in the range from 0 to ``n_rows_population`` * `time_stamp`: random number between 0 and 1 * `targets`: target variable. Defined as the number of matching entries in the peripheral table for which ``time_stamp_peripheral < time_stamp_population < time_stamp_peripheral + 0.5`` .. code-block:: sql SELECT aggregation( column_01 ) FROM POPULATION t1 LEFT JOIN PERIPHERAL t2 ON t1.join_key = t2.join_key WHERE ( ( t1.column_01 - t2.column_01 <= 0.5 ) ) AND t2.time_stamp <= t1.time_stamp GROUP BY t1.join_key, t1.time_stamp; Args: n_rows_population (int, optional): Number of rows in the population table. n_row_peripheral (int, optional): Number of rows in the peripheral table. random_state (Union[int, None], optional): Seed to initialize the random number generator used for the dataset creation. If set to None, the seed will be the 'microsecond' component of :py:func:`datetime.datetime.now()`. population_name (string, optional): Name assigned to the create :class:`~getml.DataFrame` holding the population table. If set to a name already existing on the getML engine, the corresponding :class:`~getml.DataFrame` will be overwritten. If set to an empty string, a unique name will be generated by concatenating `make_same_units_numerical_population_` and the seed of the random number generator. peripheral_name (string, optional): Name assigned to readcreate :class:`~getml.DataFrame` holding the peripheral table. If set to a name already existing on the getML engine, the corresponding :class:`~getml.DataFrame` will be overwritten. If set to an empty string, a unique name will be generated by concatenating `make_same_units_numerical_peripheral_` and the seed of the random number generator. aggregation(string, optional): :mod:`~getml.models.aggregations` used to generate the 'target' column. Returns: tuple: tuple containing: * population (:class:`getml.DataFrame`): Population table * peripheral (:class:`getml.DataFrame`): Peripheral table """ if random_state is None: random_state = datetime.datetime.now().microsecond random = np.random.RandomState(random_state) # pylint: disable=E1101 population_table = pd.DataFrame() population_table["column_01_population"] = ( random.rand(n_rows_population) * 2.0 - 1.0 ) population_table["join_key"] = range(n_rows_population) population_table["time_stamp_population"] = random.rand(n_rows_population) peripheral_table = pd.DataFrame() peripheral_table["column_01_peripheral"] = ( random.rand(n_rows_peripheral) * 2.0 - 1.0 ) peripheral_table["join_key"] = [ int(float(n_rows_population) * random.rand(1)[0]) for i in range(n_rows_peripheral) ] peripheral_table["time_stamp_peripheral"] = random.rand(n_rows_peripheral) # ---------------- temp = peripheral_table.merge( population_table[["join_key", "time_stamp_population", "column_01_population"]], how="left", on="join_key", ) # Apply some conditions temp = temp[ (temp["time_stamp_peripheral"] <= temp["time_stamp_population"]) & (temp["column_01_peripheral"] > temp["column_01_population"] - 0.5) ] # Define the aggregation temp = ( temp[["column_01_peripheral", "join_key"]] .groupby(["join_key"], as_index=False) .count() ) temp = temp.rename(index=str, columns={"column_01_peripheral": "targets"}) population_table = population_table.merge(temp, how="left", on="join_key") population_table = population_table.rename( index=str, columns={"column_01_population": "column_01"} ) peripheral_table = peripheral_table.rename( index=str, columns={"column_01_peripheral": "column_01"} ) del temp # ---------------- population_table = population_table.rename( index=str, columns={"time_stamp_population": "time_stamp"} ) peripheral_table = peripheral_table.rename( index=str, columns={"time_stamp_peripheral": "time_stamp"} ) # ---------------- # Replace NaN targets with 0.0 - target values may never be NaN!. population_table["targets"] = [ 0.0 if val != val else val for val in population_table["targets"] ] # ---------------- # Set default names if none where provided. if not population_name: population_name = "same_unit_numerical_population_" + str(random_state) if not peripheral_name: peripheral_name = "same_unit_numerical_peripheral_" + str(random_state) # Create the data.DataFrame counterpart. population_on_engine = data.DataFrame( name=population_name, roles={ "join_key": ["join_key"], "numerical": ["column_01"], "time_stamp": ["time_stamp"], "target": ["targets"], }, ).read_pandas(population_table) peripheral_on_engine = data.DataFrame( name=peripheral_name, roles={ "join_key": ["join_key"], "numerical": ["column_01"], "time_stamp": ["time_stamp"], }, ).read_pandas(peripheral_table) return population_on_engine, peripheral_on_engine
# -----------------------------------------------------------------------------
[docs]def make_snowflake( n_rows_population: int = 500, n_rows_peripheral1: int = 5000, n_rows_peripheral2: int = 125000, random_state: Optional[int] = None, population_name: str = "", peripheral_name1: str = "", peripheral_name2: str = "", aggregation1: aggregations.Aggregation = aggregations.Sum, aggregation2: aggregations.Aggregation = aggregations.Count, ) -> Tuple[DataFrame, DataFrame, DataFrame]: """ Generate a random dataset with continous numerical variables The dataset consists of a population table and two peripheral tables. The first peripheral table has 4 columns: * `column_01`: random number between -1 and 1 * `join_key`: random integer in the range from 0 to ``n_rows_population`` * `join_key2`: unique integer in the range from 0 to ``n_rows_peripheral1`` * `time_stamp`: random number between 0 and 1 The second peripheral table has 3 columns: * `column_01`: random number between -1 and 1 * `join_key2`: random integer in the range from 0 to ``n_rows_peripheral1`` * `time_stamp`: random number between 0 and 1 The population table has 4 columns: * `column_01`: random number between -1 and 1 * `join_key`: unique integer in the range from 0 to ``n_rows_population`` * `time_stamp`: random number between 0 and 1 * `targets`: target variable as defined by the SQL block below: .. code-block:: sql SELECT aggregation1( feature_1_1 ) FROM POPULATION t1 LEFT JOIN ( SELECT aggregation2( t4.column_01 ) AS feature_1_1 FROM PERIPHERAL t3 LEFT JOIN PERIPHERAL2 t4 ON t3.join_key2 = t4.join_key2 WHERE ( ( t3.time_stamp - t4.time_stamp <= 0.5 ) ) AND t4.time_stamp <= t3.time_stamp GROUP BY t3.join_key, t3.time_stamp ) t2 ON t1.join_key = t2.join_key WHERE t2.time_stamp <= t1.time_stamp GROUP BY t1.join_key, t1.time_stamp; Args: n_rows_population (int, optional): Number of rows in the population table. n_row_peripheral1 (int, optional): Number of rows in the first peripheral table. n_row_peripheral2 (int, optional): Number of rows in the second peripheral table. random_state (Union[int, None], optional): Seed to initialize the random number generator used for the dataset creation. If set to None, the seed will be the 'microsecond' component of :py:func:`datetime.datetime.now()`. population_name (string, optional): Name assigned to the create :class:`~getml.DataFrame` holding the population table. If set to a name already existing on the getML engine, the corresponding :class:`~getml.DataFrame` will be overwritten. If set to an empty string, a unique name will be generated by concatenating `snowflake_population_` and the seed of the random number generator. peripheral_name1 (string, optional): Name assigned to the create :class:`~getml.DataFrame` holding the first peripheral table. If set to a name already existing on the getML engine, the corresponding :class:`~getml.DataFrame` will be overwritten. If set to an empty string, a unique name will be generated by concatenating `snowflake_peripheral_1_` and the seed of the random number generator. peripheral_name2 (string, optional): Name assigned to the create :class:`~getml.DataFrame` holding the second peripheral table. If set to a name already existing on the getML engine, the corresponding :class:`~getml.DataFrame` will be overwritten. If set to an empty string, a unique name will be generated by concatenating `snowflake_peripheral_2_` and the seed of the random number generator. aggregation1(string, optional): :mod:`~getml.models.aggregations` used to generate the 'target' column in the first peripheral table. aggregation2(string, optional): :mod:`~getml.models.aggregations` used to generate the 'target' column in the second peripheral table. Returns: tuple: tuple containing: * population (:class:`getml.DataFrame`): Population table * peripheral (:class:`getml.DataFrame`): Peripheral table * peripheral_2 (:class:`getml.DataFrame`): Peripheral table """ if random_state is None: random_state = datetime.datetime.now().microsecond random = np.random.RandomState(random_state) # pylint: disable=E1101 population_table = pd.DataFrame() population_table["column_01"] = random.rand(n_rows_population) * 2.0 - 1.0 population_table["join_key"] = range(n_rows_population) population_table["time_stamp_population"] = random.rand(n_rows_population) peripheral_table = pd.DataFrame() peripheral_table["column_01"] = random.rand(n_rows_peripheral1) * 2.0 - 1.0 peripheral_table["join_key"] = [ int(float(n_rows_population) * random.rand(1)[0]) for i in range(n_rows_peripheral1) ] peripheral_table["join_key2"] = range(n_rows_peripheral1) peripheral_table["time_stamp_peripheral"] = random.rand(n_rows_peripheral1) peripheral_table2 = pd.DataFrame() peripheral_table2["column_01"] = random.rand(n_rows_peripheral2) * 2.0 - 1.0 peripheral_table2["join_key2"] = [ int(float(n_rows_peripheral1) * random.rand(1)[0]) for i in range(n_rows_peripheral2) ] peripheral_table2["time_stamp_peripheral2"] = random.rand(n_rows_peripheral2) # ---------------- # Merge peripheral_table with peripheral_table2 temp = peripheral_table2.merge( peripheral_table[["join_key2", "time_stamp_peripheral"]], how="left", on="join_key2", ) # Apply some conditions temp = temp[ (temp["time_stamp_peripheral2"] <= temp["time_stamp_peripheral"]) & (temp["time_stamp_peripheral2"] >= temp["time_stamp_peripheral"] - 0.5) ] # Define the aggregation temp = _aggregate(temp, aggregation2, "column_01", "join_key2") temp = temp.rename(index=str, columns={"column_01": "temporary"}) peripheral_table = peripheral_table.merge(temp, how="left", on="join_key2") del temp # Replace NaN with 0.0 peripheral_table["temporary"] = [ 0.0 if val != val else val for val in peripheral_table["temporary"] ] # ---------------- # Merge population_table with peripheral_table temp2 = peripheral_table.merge( population_table[["join_key", "time_stamp_population"]], how="left", on="join_key", ) # Apply some conditions temp2 = temp2[(temp2["time_stamp_peripheral"] <= temp2["time_stamp_population"])] # Define the aggregation temp2 = _aggregate(temp2, aggregation1, "temporary", "join_key") temp2 = temp2.rename(index=str, columns={"temporary": "targets"}) population_table = population_table.merge(temp2, how="left", on="join_key") del temp2 # Replace NaN targets with 0.0 - target values may never be NaN!. population_table["targets"] = [ 0.0 if val != val else val for val in population_table["targets"] ] # Remove temporary column. del peripheral_table["temporary"] # ---------------- population_table = population_table.rename( index=str, columns={"time_stamp_population": "time_stamp"} ) peripheral_table = peripheral_table.rename( index=str, columns={"time_stamp_peripheral": "time_stamp"} ) peripheral_table2 = peripheral_table2.rename( index=str, columns={"time_stamp_peripheral2": "time_stamp"} ) # ---------------- # Set default names if none where provided. if not population_name: population_name = "snowflake_population_" + str(random_state) if not peripheral_name1: peripheral_name1 = "snowflake_peripheral_1_" + str(random_state) if not peripheral_name2: peripheral_name2 = "snowflake_peripheral_2_" + str(random_state) # Create the data.DataFrame counterpart. population_on_engine = data.DataFrame( name=population_name, roles={ "join_key": ["join_key"], "numerical": ["column_01"], "time_stamp": ["time_stamp"], "target": ["targets"], }, ).read_pandas(population_table) peripheral_on_engine = data.DataFrame( name=peripheral_name1, roles={ "join_key": ["join_key", "join_key2"], "numerical": ["column_01"], "time_stamp": ["time_stamp"], }, ).read_pandas(peripheral_table) peripheral_on_engine2 = data.DataFrame( name=peripheral_name2, roles={ "join_key": ["join_key2"], "numerical": ["column_01"], "time_stamp": ["time_stamp"], }, ).read_pandas(peripheral_table2) # ---------------- return population_on_engine, peripheral_on_engine, peripheral_on_engine2
# -----------------------------------------------------------------------------