# Copyright 2020 The SQLNet Company GmbH
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to
# deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
# DEALINGS IN THE SOFTWARE.
"""
Collection of helper functions that are not intended to be used by
the end-user.
"""
import json
import os
import numpy as np
import pandas as pd
from getml import constants
from getml import database
import getml.communication as comm
# --------------------------------------------------------------------
[docs]def list_data_frames():
"""Lists all available data frames of the project.
Examples:
.. code-block:: python
d, _ = getml.datasets.make_numerical()
getml.data.list_data_frames()
d.save()
getml.data.list_data_frames()
Raises:
IOError:
If an error in the communication with the getML engine
occurred.
Returns:
dict:
Dict containing lists of strings representing the names of
the data frames objects
* 'in_memory'
held in memory (RAM).
* 'in_project_folder'
stored on disk.
"""
cmd = dict()
cmd["type_"] = "list_data_frames"
cmd["name_"] = ""
sock = comm.send_and_receive_socket(cmd)
msg = comm.recv_string(sock)
if msg != "Success!":
comm.engine_exception_handler(msg)
json_str = comm.recv_string(sock)
sock.close()
return json.loads(json_str)
# --------------------------------------------------------------------
def _check_if_exists(colnames, all_colnames):
for col in colnames:
if col in all_colnames:
raise ValueError("Duplicate column: '" + col + "'!")
all_colnames.append(col)
return all_colnames
# --------------------------------------------------------------------
def _empty_data_frame():
return "Empty getml.data.DataFrame\nColumns: []\n"
# --------------------------------------------------------------------
def _exists_in_memory(name):
if not isinstance(name, str):
raise TypeError("'name' must be of type str")
all_df = list_data_frames()
return name in all_df["in_memory"]
# --------------------------------------------------------------------
def _extract_shape(cmd, name):
shape = cmd[name + "_shape_"]
shape = np.asarray(shape).astype(np.int32)
return shape.tolist()
# --------------------------------------------------------------------
def _get_column(name, columns):
for col in columns:
if col.name == name:
return col
return None
# --------------------------------------------------------------------
def _is_numerical_type(coltype):
return coltype in [
int,
float,
np.int_,
np.int8,
np.int16,
np.int32,
np.int64,
np.uint8,
np.uint16,
np.uint32,
np.uint64,
np.float_,
np.float16,
np.float32,
np.float64]
# --------------------------------------------------------------------
def _is_subclass_list(some_list, parent):
is_subclass_list = isinstance(some_list, list)
is_subclass_list = is_subclass_list and all(
[issubclass(type(ll), parent) for ll in some_list])
return is_subclass_list
# --------------------------------------------------------------------
def _is_typed_list(some_list, types):
if not isinstance(types, list):
types = [types]
is_typed_list = isinstance(some_list, list)
is_typed_list = is_typed_list and all([
any([isinstance(ll, t) for t in types]) for ll in some_list
])
return is_typed_list
# --------------------------------------------------------------------
def _is_non_empty_typed_list(some_list, types):
return _is_typed_list(some_list, types) and len(some_list) > 0
# --------------------------------------------------------------------
def _merge_join_keys(join_key, other_join_key):
begin = constants.MULTIPLE_JOIN_KEYS_BEGIN
end = constants.MULTIPLE_JOIN_KEYS_END
sep = constants.JOIN_KEY_SEP
len_jk = len_other_jk = 1
if not other_join_key:
other_join_key = join_key
if _is_typed_list(join_key, str):
len_jk = len(join_key)
if len_jk > 1:
join_key = begin + sep.join(join_key) + end
else:
join_key = join_key[0]
if _is_typed_list(other_join_key, str):
len_other_jk = len(other_join_key)
if len_other_jk > 1:
other_join_key = begin + sep.join(other_join_key) + end
else:
other_join_key = other_join_key[0]
if len_jk != len_other_jk:
raise ValueError(
"""The number of join keys passed to
'join_key' and 'other_join_key'
must match!""")
return join_key, other_join_key
# --------------------------------------------------------------------
def _modify_pandas_columns(pandas_df):
pandas_df_copy = pandas_df
pandas_df_copy.columns = np.asarray(
pandas_df.columns).astype(str).tolist()
return pandas_df_copy
# --------------------------------------------------------------------
def _remove_trailing_underscores(some_dict):
new_dict = dict()
for kkey in some_dict:
new_key = kkey
if kkey[-1] == '_':
new_key = kkey[:-1]
if isinstance(some_dict[kkey], dict):
new_dict[new_key] = _remove_trailing_underscores(some_dict[kkey])
elif isinstance(some_dict[kkey], list):
new_dict[new_key] = [
_remove_trailing_underscores(elem) if isinstance(elem, dict)
else elem for elem in some_dict[kkey]
]
else:
new_dict[new_key] = some_dict[kkey]
return new_dict
# --------------------------------------------------------------------
def _update_sniffed_roles(sniffed_roles, roles):
# -------------------------------------------------------
if not isinstance(roles, dict):
raise TypeError("roles must be a dict!")
if not isinstance(sniffed_roles, dict):
raise TypeError("sniffed_roles must be a dict!")
for role in list(roles.keys()):
if not _is_typed_list(roles[role], str):
raise TypeError("Entries in roles must be lists of str!")
for role in list(sniffed_roles.keys()):
if not _is_typed_list(sniffed_roles[role], str):
raise TypeError("Entries in sniffed_roles must be lists of str!")
# -------------------------------------------------------
for new_role in list(roles.keys()):
for colname in roles[new_role]:
for old_role in list(sniffed_roles.keys()):
if colname in sniffed_roles[old_role]:
sniffed_roles[old_role].remove(colname)
break
if new_role in sniffed_roles:
sniffed_roles[new_role] += [colname]
else:
sniffed_roles[new_role] = [colname]
# -------------------------------------------------------
return sniffed_roles
# ------------------------------------------------------------
def _send_numpy_array(col, numpy_array, sock=None):
# -------------------------------------------
# Send the columns' JSON command to getml engine
if sock is None:
sock = comm.send_and_receive_socket(col.thisptr)
else:
cmd = json.dumps(col.thisptr)
comm.send_string(sock, cmd)
# -------------------------------------------
# Send data to getml engine
if col.thisptr["type_"] == "StringColumn":
if pd.api.types.is_datetime64_dtype(numpy_array):
str_array = np.datetime_as_string(numpy_array, unit="us")
else:
str_array = numpy_array.astype(str)
comm.send_categorical_matrix(sock, str_array)
elif col.thisptr["type_"] == "FloatColumn":
comm.send_matrix(sock, numpy_array)
# -------------------------------------------
# Make sure everything went well
msg = comm.recv_string(sock)
if msg != "Success!":
comm.engine_exception_handler(msg)
# --------------------------------------------------------------------
def _sniff_csv(
fnames,
num_lines_sniffed=1000,
quotechar='"',
sep=',',
skip=0,
colnames=None):
"""Sniffs a list of CSV files and returns the result as a dictionary of
roles.
Args:
fnames (List[str]): The list of CSV file names to be read.
num_lines_sniffed (int, optional):
Number of lines analysed by the sniffer.
quotechar (str, optional):
The character used to wrap strings.
sep (str, optional):
The character used for separating fields.
skip (int, optional):
Number of lines to skip at the beginning of each file.
colnames(List[str] or None, optional): The first line of a CSV file
usually contains the column names. When this is not the case, you need to
explicitly pass them.
Raises:
IOError:
If an error in the communication with the getML engine
occurred.
Returns:
dict: Keyword arguments (kwargs) that can be used to construct
a DataFrame.
"""
# ----------------------------------------------------------------
# Transform paths
fnames_ = [os.path.abspath(_) for _ in fnames]
# ----------------------------------------------------------------
# Prepare command.
cmd = dict()
cmd["name_"] = ""
cmd["type_"] = "Database.sniff_csv"
cmd["dialect_"] = "python"
cmd["fnames_"] = fnames_
cmd["num_lines_sniffed_"] = num_lines_sniffed
cmd["quotechar_"] = quotechar
cmd["sep_"] = sep
cmd["skip_"] = skip
cmd["conn_id_"] = "default"
if colnames is not None:
cmd["colnames_"] = colnames
# ----------------------------------------------------------------
# Send JSON command to engine.
sock = comm.send_and_receive_socket(cmd)
# ----------------------------------------------------------------
# Make sure that everything went well.
msg = comm.recv_string(sock)
if msg != "Success!":
sock.close()
raise IOError(msg)
# ----------------------------------------------------------------
roles = comm.recv_string(sock)
sock.close()
return json.loads(roles)
# --------------------------------------------------------------------
def _sniff_db(table_name, conn=None):
"""
Sniffs a table in the database and returns a dictionary of roles.
Args:
table_name (str): Name of the table to be sniffed.
conn (:class:`~getml.database.Connection`, optional): The database connection to be used.
If you don't explicitly pass a connection, the engine will use the default connection.
Returns:
dict: Keyword arguments (kwargs) that can be used to construct
a DataFrame.
"""
# -------------------------------------------
conn = conn or database.Connection()
# ----------------------------------------------------------------
# Prepare command.
cmd = dict()
cmd["name_"] = table_name
cmd["type_"] = "Database.sniff_table"
cmd["conn_id_"] = conn.conn_id
# ----------------------------------------------------------------
# Send JSON command to engine.
sock = comm.send_and_receive_socket(cmd)
# ----------------------------------------------------------------
# Make sure that everything went well.
msg = comm.recv_string(sock)
if msg != "Success!":
sock.close()
raise Exception(msg)
# ----------------------------------------------------------------
roles = comm.recv_string(sock)
sock.close()
return json.loads(roles)
# --------------------------------------------------------------------
def _sniff_json(json_str):
"""Sniffs a JSON str and returns the result as a dictionary of
roles.
Args:
json_str (str): The JSON string to be sniffed.
Returns:
dict: Roles that can be used to construct a DataFrame.
"""
json_dict = json.loads(json_str)
roles = dict()
roles["unused_float"] = []
roles["unused_string"] = []
for cname, col in json_dict.items():
if _is_numerical_type(np.array(col).dtype):
roles["unused_float"].append(cname)
else:
roles["unused_string"].append(cname)
return roles
# --------------------------------------------------------------------
def _sniff_pandas(pandas_df):
"""Sniffs a pandas.DataFrame and returns the result as a dictionary of
roles.
Args:
pandas_df (pandas.DataFrame): The pandas.DataFrame to be sniffed.
Returns:
dict: Roles that can be used to construct a DataFrame.
"""
roles = dict()
roles["unused_float"] = []
roles["unused_string"] = []
colnames = pandas_df.columns
coltypes = pandas_df.dtypes
for cname, ctype in zip(colnames, coltypes):
if _is_numerical_type(ctype):
roles["unused_float"].append(cname)
else:
roles["unused_string"].append(cname)
return roles
# --------------------------------------------------------------------
def _sniff_s3(
bucket,
keys,
region,
num_lines_sniffed=1000,
sep=',',
skip=0,
colnames=None):
"""Sniffs a list of CSV files located in an S3 bucket
and returns the result as a dictionary of roles.
Args:
bucket (str):
The bucket from which to read the files.
keys (List[str]): The list of keys (files in the bucket) to be read.
region (str):
The region in which the bucket is located.
num_lines_sniffed (int, optional):
Number of lines analysed by the sniffer.
sep (str, optional):
The character used for separating fields.
skip (int, optional):
Number of lines to skip at the beginning of each file.
colnames(List[str] or None, optional): The first line of a CSV file
usually contains the column names. When this is not the case, you need to
explicitly pass them.
Raises:
IOError:
If an error in the communication with the getML engine
occurred.
Returns:
dict: Keyword arguments (kwargs) that can be used to construct
a DataFrame.
"""
# ----------------------------------------------------------------
# Prepare command.
cmd = dict()
cmd["name_"] = ""
cmd["type_"] = "Database.sniff_s3"
cmd["bucket_"] = bucket
cmd["dialect_"] = "python"
cmd["keys_"] = keys
cmd["num_lines_sniffed_"] = num_lines_sniffed
cmd["region_"] = region
cmd["sep_"] = sep
cmd["skip_"] = skip
cmd["conn_id_"] = "default"
if colnames is not None:
cmd["colnames_"] = colnames
# ----------------------------------------------------------------
# Send JSON command to engine.
sock = comm.send_and_receive_socket(cmd)
# ----------------------------------------------------------------
# Make sure that everything went well.
msg = comm.recv_string(sock)
if msg != "Success!":
sock.close()
raise IOError(msg)
# ----------------------------------------------------------------
roles = comm.recv_string(sock)
sock.close()
return json.loads(roles)
# --------------------------------------------------------------------
def _transform_timestamps(time_stamps):
"""Transforming a time stamp using to_numeric
will result in the number of nanoseconds since
the beginning of UNIX time. We want the number of seconds since
UNIX time."""
if not isinstance(time_stamps, pd.DataFrame):
raise TypeError("'time_stamps' must be a pandas.DataFrame!")
transformed = pd.DataFrame()
for colname in time_stamps.columns:
if pd.api.types.is_numeric_dtype(time_stamps[colname]):
transformed[colname] = time_stamps[colname]
elif pd.api.types.is_datetime64_ns_dtype(time_stamps[colname]):
transformed[colname] = time_stamps[[colname]].apply(
pd.to_datetime,
errors="coerce"
).apply(
pd.to_numeric,
errors="coerce"
).apply(
lambda val: val / 1.0e+9
)[colname]
else:
raise TypeError("""
Column '""" + colname + """' has the wrong type!
If you want to send a numpy array or a column in a
pandas.DataFrame to the engine as a time_stamp, its
type must either be numerical or numpy.datetime64.
To fix this problem, you can do one of the following:
1) Read it in as an unused_string and then use
set_role(...) to make it a time stamp. (You might
have to explicitly set time_formats.)
2) Cast your column or array as a numerical value
or a numpy.datetime64.""")
return transformed.values
# --------------------------------------------------------------------