Staging Data Using PostgreSQL/Greenplum/Redshift (Linux and macOS only)

Please note that this is not supported on Windows.

Setting up PostgreSQL

For the complete script, please refer to example_01d_stage_data_using_postgres.py

getML features various ways to stage data. This section will illustrate how to stage data using the PostgreSQL. Because Greenplum and Redshift are based on the PostgreSQL syntax, these examples work on these two database systems as well.

The default database is sqlite3. In order to use PostgreSQL, you need to install PostgreSQL on your computer (ideally this should be the same machine that also runs the getML engine). Then you need to create a database and a user, like this:

CREATE DATABASE mydb;
CREATE USER myuser WITH ENCRYPTED PASSWORD 'mypassword';
GRANT ALL PRIVILEGES ON DATABASE mydb TO myuser;

Connect to the database using \connect mydb;

Obviously you can replace mydb, myuser and mypassword with words of your own choice.

Then you need to tell getML to connect to that database:

database.connect_postgres(
    pg_hostaddr="127.0.0.1",
    pg_host="localhost",
    pg_port=5432,
    dbname="mydb",
    user="myuser",
    password="mypassword",
    time_formats=['%Y/%m/%d']
)

Note the time_formats. When parsing strings that represent time stamps, for instance when calling .from_db(…), getML will use these time_formats to try to parse the time stamps. It is often a good idea to set them explicitly. Check out https://pocoproject.org/docs/Poco.DateTimeFormatter.html#9946 for the options.

If you are unsure what port PostgreSQL is on, just type this into psql:

SELECT * FROM pg_settings WHERE name = 'port';

Loading the data

We need to tell Python where to find the raw data. This obviously depends on where you have decided to put them.

# The folder that contains expd151.csv
RAW_DATA_FOLDER = os.getenv("HOME") + "/Downloads/diary15"

Then, we set the project:

engine.set_project("CE")

Now we “sniff” the data - meaning that we try to infer the data types from the CSV file.

csv_fnames = [
  RAW_DATA_FOLDER + "expd151.csv",
  RAW_DATA_FOLDER + "expd152.csv",
  RAW_DATA_FOLDER + "expd153.csv",
  RAW_DATA_FOLDER + "expd154.csv"
]

query = database.sniff_csv("EXPD_RAW", csv_fnames)

print(query)

getML provides a sophisticated CSV sniffer. However, no CSV sniffer is perfect. For instance, the UCC and NEWID look like integers, but they are actually strings. Reading them in as integers might lead to data loss.

You would get similar problems with just about any CSV sniffer we know - the underlying problem is that CSV sniffers don’t really have a real understanding about the columns are and can only use simple heuristics.

If you want to build a stable pipeline, it is actually a very good idea to hardcode your table schemas - your CSV files might look differently some day and if your table schemas suddenly change, that might break everything.

So what you should to is to simply copy and paste the results of print(query) into your code and make the changes you want:

DROP TABLE IF EXISTS "EXPD_RAW";

CREATE TABLE "EXPD_RAW"(
    "NEWID"    TEXT,
    "ALLOC"    INTEGER,
    "COST"     DOUBLE PRECISION,
    "GIFT"     INTEGER,
    "PUB_FLAG" INTEGER,
    "UCC"      TEXT,
    "EXPNSQDY" TEXT,
    "EXPN_QDY" TEXT,
    "EXPNWKDY" TEXT,
    "EXPN_KDY" TEXT,
    "EXPNMO"   TEXT,
    "EXPNMO_"  TEXT,
    "EXPNYR"   TEXT,
    "EXPNYR_"  TEXT);

This will get us an empty table. Let’s fill it with data:

database.read_csv("EXPD_RAW", csv_fnames)

After that, we can do the same for FMLD and MEMD.

Staging EXPD

We are doing a lot of column transformations at once.

database.execute("""
    DROP TABLE IF EXISTS "EXPD";

    CREATE TABLE "EXPD" AS
    SELECT CASE WHEN "GIFT"=2 THEN 0 ELSE 1 END AS "TARGET",
           "EXPNYR" || '/' || "EXPNMO" || '/' || '01' AS "TIME_STAMP",
           "NEWID",
           "EXPNYR",
           CAST("EXPNMO" AS INT) AS "EXPNMO",
           "COST",
           substr("UCC", 1, 1) AS "UCC1",
           substr("UCC", 1, 2) AS "UCC2",
           substr("UCC", 1, 3) AS "UCC3",
           substr("UCC", 1, 4) AS "UCC4",
           substr("UCC", 1, 5) AS "UCC5",
           substr("UCC", 1, 6) AS "UCC"
    FROM "EXPD_RAW"
    WHERE "EXPNMO" != '';
""")

Let’s go through them, one by one:

We want to predict whether the item was purchased as a gift. Strangely enough, the GIFT flag is set to 2 when the item is a gift. We want to change that to 0.0 and 1.0.

We want to transform the EXPNYR and EXPNMO entries into proper time stamps.

As we explained, the UCC is based on a hierarchical system of classification. We make use of that fact and create a bunch of substrings. UCC1 contains only the first digit, UCC2 the first two and so on.

Sometimes the month entry contains missing data. This does us no good, so we get rid of it.

Staging POPULATION

The POPULATION table is created by joining EXPD and FMLD. Since EXPD and FMLD are in a many-to-one relationship, there is no need to aggregate FMLD (and thus no need to use our feature engineerers).

database.execute("""
    DROP TABLE IF EXISTS "POPULATION_ALL";

    CREATE TABLE "POPULATION_ALL" AS
    SELECT t1.*,
           t2."INC_RANK",
           t2."INC_RNK1",
           t2."INC_RNK2",
           t2."INC_RNK3",
           t2."INC_RNK4",
           t2."INC_RNK5",
           t2."INC_RNKM"
    FROM "EXPD" t1
    LEFT JOIN "FMLD_RAW" t2
    ON t1."NEWID"= t2."NEWID";
""")

Loading the data into the engine

Now, we load the data frames into the engine:

EXPD_CATEGORICAL = [
    "UCC",
    "UCC1",
    "UCC2",
    "UCC3",
    "UCC4",
    "UCC5"
]

EXPD_JOIN_KEYS = [
    "NEWID"
]

EXPD_NUMERICAL = [
    "COST",
    "EXPNYR",
    "EXPNMO"
]

EXPD_TARGETS = [
    "TARGET"
]

EXPD_TIME_STAMPS = [
    "TIME_STAMP"
]

expd_roles = {
    "join_key": EXPD_JOIN_KEYS,
    "time_stamp": EXPD_TIME_STAMPS,
    "categorical": EXPD_CATEGORICAL,
    "numerical": EXPD_NUMERICAL,
    "target": EXPD_TARGETS
}

df_expd = data.DataFrame.from_db(
    table_name="EXPD",
    name="EXPD",
    roles=expd_roles,
    ignore=True)

df_expd.set_unit("UCC1", "UCC1")
df_expd.set_unit("UCC2", "UCC2")
df_expd.set_unit("UCC3", "UCC3")
df_expd.set_unit("UCC4", "UCC4")
df_expd.set_unit("UCC5", "UCC5")
df_expd.set_unit("UCC", "UCC")
df_expd.set_unit("EXPNMO", "month")

df_expd.save()

The code for loading POPULATION and MEMD is similar.

Separate data

Finally, we need to separate df_population_all into a training, testing and validation set.

random = df_population_all.random()

df_population_training = df_population_all.where("POPULATION_TRAINING", random <= 0.7)

df_population_training.save()

df_population_validation = df_population_all.where("POPULATION_VALIDATION", (random <= 0.85) & (random > 0.7))

df_population_validation.save()

df_population_testing = df_population_all.where("POPULATION_TESTING", random > 0.85)

df_population_testing.save()