# Copyright 2022 The SQLNet Company GmbH
#
# This file is licensed under the Elastic License 2.0 (ELv2).
# Refer to the LICENSE.txt file in the root of the repository
# for details.
#
"""Handler for the data stored in the getML engine."""
import json
import numbers
import os
import shutil
from collections import namedtuple
from typing import Any, Dict, List, Optional, Union
import numpy as np
import pandas as pd # type: ignore
import pyarrow as pa # type: ignore
import getml.communication as comm
from getml import constants, database
from getml.constants import COMPARISON_ONLY, TIME_STAMP
from getml.database.helpers import _retrieve_temp_dir
from getml.utilities.formatting import _DataFrameFormatter
from . import roles as roles_
from .columns import (
BooleanColumnView,
FloatColumn,
FloatColumnView,
StringColumn,
StringColumnView,
arange,
rowid,
)
from .columns.last_change import _last_change
from .helpers import (
_check_if_exists,
_empty_data_frame,
_exists_in_memory,
_get_column,
_handle_cols,
_is_non_empty_typed_list,
_is_numerical_type,
_is_typed_list,
_make_default_slice,
_modify_pandas_columns,
_retrieve_urls,
_send_numpy_array,
_sniff_arrow,
_sniff_csv,
_sniff_db,
_sniff_json,
_sniff_pandas,
_sniff_parquet,
_sniff_s3,
_to_arrow,
_to_parquet,
_to_pyspark,
_transform_timestamps,
_update_sniffed_roles,
_with_column,
_with_role,
_with_subroles,
_with_unit,
)
from .placeholder import Placeholder
from .roles_obj import Roles
from .view import View
# --------------------------------------------------------------------
[docs]class DataFrame:
"""Handler for the data stored in the getML engine.
The :class:`~getml.DataFrame` class represents a data frame
object in the getML engine but does not contain any actual data
itself. To create such a data frame object, fill it with data via
the Python API, and to retrieve a handler for it, you can use one
of the :func:`~getml.DataFrame.from_csv`,
:func:`~getml.DataFrame.from_db`,
:func:`~getml.DataFrame.from_json`, or
:func:`~getml.DataFrame.from_pandas` class methods. The
:ref:`importing_data` section in the user guide explains the
particularities of each of those flavors of the unified
import interface.
If the data frame object is already present in the engine -
either in memory as a temporary object or on disk when
:meth:`~getml.DataFrame.save` was called earlier -, the
:func:`~getml.data.load_data_frame` function will create a new
handler without altering the underlying data. For more information
about the lifecycle of the data in the getML engine and its
synchronization with the Python API please see the
:ref:`corresponding user guide<the_getml_python_api_lifecycles>`.
Args:
name (str):
Unique identifier used to link the handler with
the underlying data frame object in the engine.
roles (dict[str, List[str]] or :class:`~getml.data.Roles`, optional):
Maps the :mod:`~getml.data.roles` to the
column names (see :meth:`~getml.DataFrame.colnames`).
The `roles` dictionary is expected to have the following format
.. code-block:: python
roles = {getml.data.role.numeric: ["colname1", "colname2"],
getml.data.role.target: ["colname3"]}
Otherwise, you can use the :class:`~getml.data.Roles` class.
Examples:
Creating a new data frame object in the getML engine and importing
data is done by one the class functions
:func:`~getml.DataFrame.from_csv`,
:func:`~getml.DataFrame.from_db`,
:func:`~getml.DataFrame.from_json`, or
:func:`~getml.DataFrame.from_pandas`.
.. code-block:: python
random = numpy.random.RandomState(7263)
table = pandas.DataFrame()
table['column_01'] = random.randint(0, 10, 1000).astype(numpy.str)
table['join_key'] = numpy.arange(1000)
table['time_stamp'] = random.rand(1000)
table['target'] = random.rand(1000)
df_table = getml.DataFrame.from_pandas(table, name = 'table')
In addition to creating a new data frame object in the getML
engine and filling it with all the content of `table`, the
:func:`~getml.DataFrame.from_pandas` function also
returns a :class:`~getml.DataFrame` handler to the
underlying data.
You don't have to create the data frame objects anew for each
session. You can use their :meth:`~getml.DataFrame.save`
method to write them to disk, the
:func:`~getml.data.list_data_frames` function to list all
available objects in the engine, and
:func:`~getml.data.load_data_frame` to create a
:class:`~getml.DataFrame` handler for a data set already
present in the getML engine (see
:ref:`the_getml_python_api_lifecycles` for details).
.. code-block:: python
df_table.save()
getml.data.list_data_frames()
df_table_reloaded = getml.data.load_data_frame('table')
Note:
Although the Python API does not store the actual data itself,
you can use the :meth:`~getml.DataFrame.to_csv`,
:meth:`~getml.DataFrame.to_db`,
:meth:`~getml.DataFrame.to_json`, and
:meth:`~getml.DataFrame.to_pandas` methods to retrieve
them.
"""
_categorical_roles = roles_._categorical_roles
_numerical_roles = roles_._numerical_roles
_possible_keys = _categorical_roles + _numerical_roles
def __init__(self, name, roles=None):
# ------------------------------------------------------------
if not isinstance(name, str):
raise TypeError("'name' must be str.")
vars(self)["name"] = name
# ------------------------------------------------------------
roles = roles.to_dict() if isinstance(roles, Roles) else roles
roles = roles or dict()
if not isinstance(roles, dict):
raise TypeError("'roles' must be dict or a getml.data.Roles object")
for key, val in roles.items():
if key not in self._possible_keys:
msg = "'{}' is not a proper role and will be ignored\n"
msg += "Possible roles are: {}"
raise ValueError(msg.format(key, self._possible_keys))
if not _is_typed_list(val, str):
raise TypeError(
"'{}' must be None, an empty list, or a list of str.".format(key)
)
# ------------------------------------------------------------
join_keys = roles.get("join_key", [])
time_stamps = roles.get("time_stamp", [])
categorical = roles.get("categorical", [])
numerical = roles.get("numerical", [])
targets = roles.get("target", [])
text = roles.get("text", [])
unused_floats = roles.get("unused_float", [])
unused_strings = roles.get("unused_string", [])
# ------------------------------------------------------------
vars(self)["_categorical_columns"] = [
StringColumn(name=cname, role=roles_.categorical, df_name=self.name)
for cname in categorical
]
vars(self)["_join_key_columns"] = [
StringColumn(name=cname, role=roles_.join_key, df_name=self.name)
for cname in join_keys
]
vars(self)["_numerical_columns"] = [
FloatColumn(name=cname, role=roles_.numerical, df_name=self.name)
for cname in numerical
]
vars(self)["_target_columns"] = [
FloatColumn(name=cname, role=roles_.target, df_name=self.name)
for cname in targets
]
vars(self)["_text_columns"] = [
StringColumn(name=cname, role=roles_.text, df_name=self.name)
for cname in text
]
vars(self)["_time_stamp_columns"] = [
FloatColumn(name=cname, role=roles_.time_stamp, df_name=self.name)
for cname in time_stamps
]
vars(self)["_unused_float_columns"] = [
FloatColumn(name=cname, role=roles_.unused_float, df_name=self.name)
for cname in unused_floats
]
vars(self)["_unused_string_columns"] = [
StringColumn(name=cname, role=roles_.unused_string, df_name=self.name)
for cname in unused_strings
]
# ------------------------------------------------------------
self._check_duplicates()
# ----------------------------------------------------------------
@property
def _columns(self):
return (
vars(self)["_categorical_columns"]
+ vars(self)["_join_key_columns"]
+ vars(self)["_numerical_columns"]
+ vars(self)["_target_columns"]
+ vars(self)["_text_columns"]
+ vars(self)["_time_stamp_columns"]
+ vars(self)["_unused_float_columns"]
+ vars(self)["_unused_string_columns"]
)
# ----------------------------------------------------------------
def _delete(self, mem_only: bool = False):
"""Deletes the data frame from the getML engine.
If called with the `mem_only` option set to True, the data
frame corresponding to the handler represented by the current
instance can be reloaded using the
:meth:`~getml.DataFrame.load` method.
Args:
mem_only (bool, optional):
If True, the data frame will not be deleted
permanently, but just from memory (RAM).
"""
if not isinstance(mem_only, bool):
raise TypeError("'mem_only' must be of type bool")
cmd: Dict[str, Any] = {}
cmd["type_"] = "DataFrame.delete"
cmd["name_"] = self.name
cmd["mem_only_"] = mem_only
comm.send(cmd)
# ------------------------------------------------------------
def __delitem__(self, colname: str):
self._drop(colname)
# ----------------------------------------------------------------
def __eq__(self, other):
if not isinstance(other, DataFrame):
raise TypeError(
"A DataFrame can only be compared to another getml.DataFrame"
)
# ------------------------------------------------------------
for kkey in self.__dict__:
if kkey not in other.__dict__:
return False
# Take special care when comparing numbers.
if isinstance(self.__dict__[kkey], numbers.Real):
if not np.isclose(self.__dict__[kkey], other.__dict__[kkey]):
return False
elif self.__dict__[kkey] != other.__dict__[kkey]:
return False
# ------------------------------------------------------------
return True
# ----------------------------------------------------------------
def __getattr__(self, name):
try:
return self[name]
except KeyError:
return super().__getattribute__(name)
# ------------------------------------------------------------
def __getitem__(self, name):
if isinstance(
name,
(numbers.Integral, slice, BooleanColumnView, FloatColumn, FloatColumnView),
):
return self.where(index=name)
if isinstance(name, list):
not_in_colnames = set(name) - set(self.colnames)
if not_in_colnames:
raise KeyError(f"{list(not_in_colnames)} not found.")
dropped = [col for col in self.colnames if col not in name]
return View(base=self, dropped=dropped)
col = _get_column(name, self._columns)
if col is not None:
return col
raise KeyError("Column named '" + name + "' not found.")
# ------------------------------------------------------------
def _getml_deserialize(self) -> Dict[str, Any]:
cmd: Dict[str, Any] = {}
cmd["type_"] = "DataFrame"
cmd["name_"] = self.name
return cmd
# ----------------------------------------------------------------
def __len__(self):
return self.nrows()
# ----------------------------------------------------------------
def __repr__(self):
if not _exists_in_memory(self.name):
return _empty_data_frame()
formatted = self._format()
footer = self._collect_footer_data()
return formatted._render_string(footer=footer)
# ----------------------------------------------------------------
def _repr_html_(self):
return self.to_html()
# ------------------------------------------------------------
def __setattr__(self, name, value):
if name in vars(self):
vars(self)[name] = value
else:
self.add(value, name)
# ------------------------------------------------------------
def __setitem__(self, name, col):
self.add(col, name)
# ------------------------------------------------------------
def _add_categorical_column(self, col, name, role, subroles, unit):
cmd: Dict[str, Any] = {}
cmd["type_"] = "DataFrame.add_categorical_column"
cmd["name_"] = name
cmd["col_"] = col.cmd
cmd["df_name_"] = self.name
cmd["role_"] = role
cmd["subroles_"] = subroles
cmd["unit_"] = unit
with comm.send_and_get_socket(cmd) as sock:
comm.recv_issues(sock)
msg = comm.recv_string(sock)
if msg != "Success!":
comm.engine_exception_handler(msg)
self.refresh()
# ------------------------------------------------------------
def _add_column(self, col, name, role, subroles, unit):
cmd: Dict[str, Any] = {}
cmd["type_"] = "DataFrame.add_column"
cmd["name_"] = name
cmd["col_"] = col.cmd
cmd["df_name_"] = self.name
cmd["role_"] = role
cmd["subroles_"] = subroles
cmd["unit_"] = unit
with comm.send_and_get_socket(cmd) as sock:
comm.recv_issues(sock)
msg = comm.recv_string(sock)
if msg != "Success!":
comm.engine_exception_handler(msg)
self.refresh()
# ------------------------------------------------------------
def _add_numpy_array(self, numpy_array, name, role, subroles, unit):
if len(numpy_array.shape) != 1:
raise TypeError(
"""numpy.ndarray needs to be one-dimensional!
Maybe you can call .ravel()."""
)
if role is None:
temp_df = pd.DataFrame()
temp_df["column"] = numpy_array
if _is_numerical_type(temp_df.dtypes[0]):
role = roles_.unused_float
else:
role = roles_.unused_string
col = (
FloatColumn(name=name, role=role, df_name=self.name)
if role in self._numerical_roles
else StringColumn(name=name, role=role, df_name=self.name)
)
_send_numpy_array(col, numpy_array)
if subroles:
self._set_subroles(name, subroles, append=False)
if unit:
self._set_unit(name, unit)
self.refresh()
# ------------------------------------------------------------
def _check_duplicates(self) -> None:
all_colnames: List[str] = []
all_colnames = _check_if_exists(self._categorical_names, all_colnames)
all_colnames = _check_if_exists(self._join_key_names, all_colnames)
all_colnames = _check_if_exists(self._numerical_names, all_colnames)
all_colnames = _check_if_exists(self._target_names, all_colnames)
all_colnames = _check_if_exists(self._text_names, all_colnames)
all_colnames = _check_if_exists(self._time_stamp_names, all_colnames)
all_colnames = _check_if_exists(self._unused_names, all_colnames)
# ------------------------------------------------------------
def _check_plausibility(self, data_frame):
self._check_duplicates()
for col in self._categorical_names:
if col not in data_frame.columns:
raise ValueError("Column named '" + col + "' does not exist!")
for col in self._join_key_names:
if col not in data_frame.columns:
raise ValueError("Column named '" + col + "' does not exist!")
for col in self._numerical_names:
if col not in data_frame.columns:
raise ValueError("Column named '" + col + "' does not exist!")
for col in self._target_names:
if col not in data_frame.columns:
raise ValueError("Column named '" + col + "' does not exist!")
for col in self._text_names:
if col not in data_frame.columns:
raise ValueError("Column named '" + col + "' does not exist!")
for col in self._time_stamp_names:
if col not in data_frame.columns:
raise ValueError("Column named '" + col + "' does not exist!")
for col in self._unused_names:
if col not in data_frame.columns:
raise ValueError("Column named '" + col + "' does not exist!")
# ------------------------------------------------------------
def _collect_footer_data(self):
footer = namedtuple(
"footer", ["n_rows", "n_cols", "memory_usage", "name", "type", "url"]
)
return footer(
n_rows=self.nrows(),
n_cols=self.ncols(),
memory_usage=self.memory_usage,
name=self.name,
type="getml.DataFrame",
url=self._monitor_url,
)
# ------------------------------------------------------------
def _close(self, sock):
cmd: Dict[str, Any] = {}
cmd["type_"] = "DataFrame.close"
cmd["name_"] = self.name
comm.send_string(sock, json.dumps(cmd))
msg = comm.recv_string(sock)
if msg != "Success!":
comm.engine_exception_handler(msg)
# ------------------------------------------------------------
def _drop(self, colname: str):
if not isinstance(colname, str):
raise TypeError("'colname' must be either a string.")
cmd: Dict[str, Any] = {}
cmd["type_"] = "DataFrame.remove_column"
cmd["name_"] = colname
cmd["df_name_"] = self.name
comm.send(cmd)
self.refresh()
# ------------------------------------------------------------
def _format(self):
formatted = _DataFrameFormatter(self)
return formatted
# ------------------------------------------------------------
def _set_role(self, name, role, time_formats):
if not isinstance(name, str):
raise TypeError("Parameter 'name' must be a string!")
col = self[name]
subroles = col.subroles
unit = TIME_STAMP + COMPARISON_ONLY if role == roles_.time_stamp else col.unit
self.add(
col,
name=name,
role=role,
subroles=subroles,
unit=unit,
time_formats=time_formats,
)
# ------------------------------------------------------------
def _set_subroles(self, name: str, subroles: List[str], append: bool):
if not isinstance(name, str):
raise TypeError("Parameter 'name' must be a string!")
col = self[name]
cmd: Dict[str, Any] = {}
cmd.update(col.cmd)
cmd["type_"] += ".set_subroles"
cmd["subroles_"] = list(set(col.subroles + subroles)) if append else subroles
comm.send(cmd)
# ------------------------------------------------------------
def _set_unit(self, name: str, unit: str):
if not isinstance(name, str):
raise TypeError("Parameter 'name' must be a string!")
col = self[name]
cmd: Dict[str, Any] = {}
cmd.update(col.cmd)
cmd["type_"] += ".set_unit"
cmd["unit_"] = unit
comm.send(cmd)
# ------------------------------------------------------------
[docs] def add(self, col, name, role=None, subroles=None, unit="", time_formats=None):
"""Adds a column to the current :class:`~getml.DataFrame`.
Args:
col (:mod:`~getml.column` or :mod:`numpy.ndarray`):
The column or numpy.ndarray to be added.
name (str):
Name of the new column.
role (str, optional):
Role of the new column. Must be from :mod:`getml.data.roles`.
subroles (str, List[str] or None, optional):
Subroles of the new column. Must be from :mod:`getml.data.subroles`.
unit (str, optional):
Unit of the column.
time_formats (str, optional):
Formats to be used to parse the time stamps.
This is only necessary, if an implicit conversion from
a :class:`~getml.data.columns.StringColumn` to a time
stamp is taking place.
The formats are allowed to contain the following
special characters:
* %w - abbreviated weekday (Mon, Tue, ...)
* %W - full weekday (Monday, Tuesday, ...)
* %b - abbreviated month (Jan, Feb, ...)
* %B - full month (January, February, ...)
* %d - zero-padded day of month (01 .. 31)
* %e - day of month (1 .. 31)
* %f - space-padded day of month ( 1 .. 31)
* %m - zero-padded month (01 .. 12)
* %n - month (1 .. 12)
* %o - space-padded month ( 1 .. 12)
* %y - year without century (70)
* %Y - year with century (1970)
* %H - hour (00 .. 23)
* %h - hour (00 .. 12)
* %a - am/pm
* %A - AM/PM
* %M - minute (00 .. 59)
* %S - second (00 .. 59)
* %s - seconds and microseconds (equivalent to %S.%F)
* %i - millisecond (000 .. 999)
* %c - centisecond (0 .. 9)
* %F - fractional seconds/microseconds (000000 - 999999)
* %z - time zone differential in ISO 8601 format (Z or +NN.NN)
* %Z - time zone differential in RFC format (GMT or +NNNN)
* %% - percent sign
"""
if isinstance(col, np.ndarray):
self._add_numpy_array(col, name, role, subroles, unit)
return
col, role, subroles = _with_column(
col, name, role, subroles, unit, time_formats
)
is_string = isinstance(col, (StringColumnView, StringColumn))
if is_string:
self._add_categorical_column(col, name, role, subroles, unit)
else:
self._add_column(col, name, role, subroles, unit)
# ------------------------------------------------------------
@property
def _categorical_names(self):
return [col.name for col in self._categorical_columns]
# ------------------------------------------------------------
@property
def colnames(self):
"""
List of the names of all columns.
Returns:
List[str]:
List of the names of all columns.
"""
return (
self._time_stamp_names
+ self._join_key_names
+ self._target_names
+ self._categorical_names
+ self._numerical_names
+ self._text_names
+ self._unused_names
)
# ------------------------------------------------------------
@property
def columns(self):
"""
Alias for :meth:`~getml.DataFrame.colnames`.
Returns:
List[str]:
List of the names of all columns.
"""
return self.colnames
# ------------------------------------------------------------
[docs] def copy(self, name: str) -> "DataFrame":
"""
Creates a deep copy of the data frame under a new name.
Args:
name (str):
The name of the new data frame.
Returns:
:class:`~getml.DataFrame`:
A handle to the deep copy.
"""
if not isinstance(name, str):
raise TypeError("'name' must be a string.")
cmd: Dict[str, Any] = {}
cmd["type_"] = "DataFrame.concat"
cmd["name_"] = name
cmd["data_frames_"] = [self._getml_deserialize()]
comm.send(cmd)
return DataFrame(name=name).refresh()
# ------------------------------------------------------------
[docs] def delete(self):
"""
Permanently deletes the data frame. :meth:`delete` first unloads the data frame
from memory and than deletes it from disk.
"""
# ------------------------------------------------------------
self._delete()
# ------------------------------------------------------------
[docs] def drop(self, cols):
"""Returns a new :class:`~getml.data.View` that has one or several columns removed.
Args:
cols (str, FloatColumn, StingColumn, or List[str, FloatColumn, StringColumn]):
The columns or the names thereof.
"""
names = _handle_cols(cols)
if not _is_typed_list(names, str):
raise TypeError("'cols' must be a string or a list of strings.")
return View(base=self, dropped=names)
# ------------------------------------------------------------
[docs] def freeze(self):
"""Freezes the data frame.
After you have frozen the data frame, the data frame is immutable
and in-place operations are no longer possible. However, you can
still create views. In other words, operations like
:meth:`~getml.DataFrame.set_role` are not longer possible,
but operations like :meth:`~getml.DataFrame.with_role` are.
"""
cmd: Dict[str, Any] = {}
cmd["type_"] = "DataFrame.freeze"
cmd["name_"] = self.name
comm.send(cmd)
# ------------------------------------------------------------
[docs] @classmethod
def from_arrow(cls, table, name, roles=None, ignore=False, dry=False):
"""Create a DataFrame from an Arrow Table.
This is one of the fastest way to get data into the
getML engine.
Args:
table (:py:class:`pyarrow.Table`):
The table to be read.
name (str):
Name of the data frame to be created.
roles (dict[str, List[str]] or :class:`~getml.data.Roles`, optional):
Maps the :mod:`~getml.data.roles` to the
column names (see :meth:`~getml.DataFrame.colnames`).
The `roles` dictionary is expected to have the following format:
.. code-block:: python
roles = {getml.data.role.numeric: ["colname1", "colname2"],
getml.data.role.target: ["colname3"]}
Otherwise, you can use the :class:`~getml.data.Roles` class.
ignore (bool, optional):
Only relevant when roles is not None.
Determines what you want to do with any colnames not
mentioned in roles. Do you want to ignore them (True)
or read them in as unused columns (False)?
dry (bool, optional):
If set to True, then the data
will not actually be read. Instead, the method will only
return the roles it would have used. This can be used
to hard-code roles when setting up a pipeline.
"""
# ------------------------------------------------------------
if not isinstance(table, pa.Table):
raise TypeError("'table' must be of type pyarrow.Table.")
if not isinstance(name, str):
raise TypeError("'name' must be str.")
# The content of roles is checked in the class constructor called below.
if roles is not None and not isinstance(roles, (dict, Roles)):
raise TypeError("'roles' must be a geml.data.Roles object, a dict or None.")
if not isinstance(ignore, bool):
raise TypeError("'ignore' must be bool.")
if not isinstance(dry, bool):
raise TypeError("'dry' must be bool.")
# ------------------------------------------------------------
roles = roles.to_dict() if isinstance(roles, Roles) else roles
if roles is None or not ignore:
sniffed_roles = _sniff_arrow(table)
if roles is None:
roles = sniffed_roles
else:
roles = _update_sniffed_roles(sniffed_roles, roles)
if dry:
return roles
data_frame = cls(name, roles)
return data_frame.read_arrow(table=table, append=False)
# --------------------------------------------------------------------
[docs] @classmethod
def from_csv(
cls,
fnames,
name,
num_lines_sniffed=1000,
num_lines_read=0,
quotechar='"',
sep=",",
skip=0,
colnames=None,
roles=None,
ignore=False,
dry=False,
verbose=True,
) -> "DataFrame":
"""Create a DataFrame from CSV files.
The getML engine will construct a data
frame object in the engine, fill it with the data read from
the CSV file(s), and return a corresponding
:class:`~getml.DataFrame` handle.
Args:
fnames (List[str]):
CSV file paths to be read.
name (str):
Name of the data frame to be created.
num_lines_sniffed (int, optional):
Number of lines analyzed by the sniffer.
num_lines_read (int, optional):
Number of lines read from each file.
Set to 0 to read in the entire file.
quotechar (str, optional):
The character used to wrap strings.
sep (str, optional):
The separator used for separating fields.
skip (int, optional):
Number of lines to skip at the beginning of each file.
colnames(List[str] or None, optional): The first line of a CSV file
usually contains the column names. When this is not the case,
you need to explicitly pass them.
roles(dict[str, List[str]] or :class:`~getml.data.Roles`, optional):
Maps the :mod:`~getml.data.roles` to the
column names (see :meth:`~getml.DataFrame.colnames`).
The `roles` dictionary is expected to have the following format
.. code-block:: python
roles = {getml.data.role.numeric: ["colname1", "colname2"],
getml.data.role.target: ["colname3"]}
Otherwise, you can use the :class:`~getml.data.Roles` class.
ignore (bool, optional):
Only relevant when roles is not None.
Determines what you want to do with any colnames not
mentioned in roles. Do you want to ignore them (True)
or read them in as unused columns (False)?
dry (bool, optional):
If set to True, then the data
will not actually be read. Instead, the method will only
return the roles it would have used. This can be used
to hard-code roles when setting up a pipeline.
verbose (bool, optional):
If True, when fnames are urls, the filenames are
printed to stdout during the download.
Returns:
:class:`~getml.DataFrame`:
Handler of the underlying data.
Note:
It is assumed that the first line of each CSV file
contains a header with the column names.
Examples:
Let's assume you have two CSV files - *file1.csv* and
*file2.csv* - in the current working directory. You can
import their data into the getML engine using.
>>> df_expd = data.DataFrame.from_csv(
... fnames=["file1.csv", "file2.csv"],
... name="MY DATA FRAME",
... sep=';',
... quotechar='"'
... )
However, the CSV format lacks type safety. If you want to
build a reliable pipeline, it is a good idea
to hard-code the roles:
>>> roles = {"categorical": ["col1", "col2"], "target": ["col3"]}
>>>
>>> df_expd = data.DataFrame.from_csv(
... fnames=["file1.csv", "file2.csv"],
... name="MY DATA FRAME",
... sep=';',
... quotechar='"',
... roles=roles
... )
If you think that typing out all of the roles by hand is too
cumbersome, you can use a dry run:
>>> roles = data.DataFrame.from_csv(
... fnames=["file1.csv", "file2.csv"],
... name="MY DATA FRAME",
... sep=';',
... quotechar='"',
... dry=True
... )
This will return the roles dictionary it would have used. You
can now hard-code this.
"""
if not isinstance(fnames, list):
fnames = [fnames]
if not _is_non_empty_typed_list(fnames, str):
raise TypeError("'fnames' must be either a str or a list of str.")
if not isinstance(name, str):
raise TypeError("'name' must be str.")
if not isinstance(num_lines_sniffed, numbers.Real):
raise TypeError("'num_lines_sniffed' must be a real number")
if not isinstance(num_lines_read, numbers.Real):
raise TypeError("'num_lines_read' must be a real number")
if not isinstance(quotechar, str):
raise TypeError("'quotechar' must be str.")
if not isinstance(sep, str):
raise TypeError("'sep' must be str.")
if not isinstance(skip, numbers.Real):
raise TypeError("'skip' must be a real number")
if roles is not None and not isinstance(roles, (dict, Roles)):
raise TypeError("'roles' must be a geml.data.Roles object, a dict or None.")
if not isinstance(ignore, bool):
raise TypeError("'ignore' must be bool.")
if not isinstance(ignore, bool):
raise TypeError("'dry' must be bool.")
if colnames is not None and not _is_non_empty_typed_list(colnames, str):
raise TypeError(
"'colnames' must be either be None or a non-empty list of str."
)
fnames = _retrieve_urls(fnames, verbose=verbose)
roles = roles.to_dict() if isinstance(roles, Roles) else roles
if roles is None or not ignore:
sniffed_roles = _sniff_csv(
fnames=fnames,
num_lines_sniffed=int(num_lines_sniffed),
quotechar=quotechar,
sep=sep,
skip=int(skip),
colnames=colnames,
)
if roles is None:
roles = sniffed_roles
else:
roles = _update_sniffed_roles(sniffed_roles, roles)
if dry:
return roles
data_frame = cls(name, roles)
return data_frame.read_csv(
fnames=fnames,
append=False,
quotechar=quotechar,
sep=sep,
num_lines_read=num_lines_read,
skip=skip,
colnames=colnames,
)
# ------------------------------------------------------------
[docs] @classmethod
def from_db(
cls, table_name, name=None, roles=None, ignore=False, dry=False, conn=None
):
"""Create a DataFrame from a table in a database.
It will construct a data frame object in the engine, fill it
with the data read from table `table_name` in the connected
database (see :mod:`~getml.database`), and return a
corresponding :class:`~getml.DataFrame` handle.
Args:
table_name (str):
Name of the table to be read.
name (str):
Name of the data frame to be created. If not passed,
then the *table_name* will be used.
roles(dict[str, List[str]] or :class:`~getml.data.Roles`, optional):
Maps the :mod:`~getml.data.roles` to the
column names (see :meth:`~getml.DataFrame.colnames`).
The `roles` dictionary is expected to have the following format:
.. code-block:: python
roles = {getml.data.role.numeric: ["colname1", "colname2"],
getml.data.role.target: ["colname3"]}
Otherwise, you can use the :class:`~getml.data.Roles` class.
ignore (bool, optional):
Only relevant when roles is not None.
Determines what you want to do with any colnames not
mentioned in roles. Do you want to ignore them (True)
or read them in as unused columns (False)?
dry (bool, optional):
If set to True, then the data
will not actually be read. Instead, the method will only
return the roles it would have used. This can be used
to hard-code roles when setting up a pipeline.
conn (:class:`~getml.database.Connection`, optional):
The database connection to be used.
If you don't explicitly pass a connection, the engine
will use the default connection.
Returns:
:class:`~getml.DataFrame`:
Handler of the underlying data.
Example:
.. code-block:: python
getml.database.connect_mysql(
host="relational.fit.cvut.cz",
port=3306,
dbname="financial",
user="guest",
password="relational"
)
loan = getml.DataFrame.from_db(
table_name='loan', name='data_frame_loan')
"""
# -------------------------------------------
name = name or table_name
# -------------------------------------------
if not isinstance(table_name, str):
raise TypeError("'table_name' must be str.")
if not isinstance(name, str):
raise TypeError("'name' must be str.")
# The content of roles is checked in the class constructor called below.
if roles is not None and not isinstance(roles, (dict, Roles)):
raise TypeError(
"'roles' must be a getml.data.Roles object, a dict or None."
)
if not isinstance(ignore, bool):
raise TypeError("'ignore' must be bool.")
if not isinstance(dry, bool):
raise TypeError("'dry' must be bool.")
# -------------------------------------------
conn = conn or database.Connection()
# ------------------------------------------------------------
roles = roles.to_dict() if isinstance(roles, Roles) else roles
if roles is None or not ignore:
sniffed_roles = _sniff_db(table_name, conn)
if roles is None:
roles = sniffed_roles
else:
roles = _update_sniffed_roles(sniffed_roles, roles)
if dry:
return roles
data_frame = cls(name, roles)
return data_frame.read_db(table_name=table_name, append=False, conn=conn)
# --------------------------------------------------------------------
[docs] @classmethod
def from_dict(
cls, data: Dict[str, List[Any]], name: str, roles=None, ignore=False, dry=False
):
"""Create a new DataFrame from a dict
Args:
data (dict):
The dict containing the data.
The data should be in the following format:
.. code-block:: python
data = {'col1': [1.0, 2.0, 1.0], 'col2': ['A', 'B', 'C']}
name (str):
Name of the data frame to be created.
roles(dict[str, List[str]] or :class:`~getml.data.Roles`, optional):
Maps the :mod:`~getml.data.roles` to the
column names (see :meth:`~getml.DataFrame.colnames`).
The `roles` dictionary is expected to have the following format:
.. code-block:: python
roles = {getml.data.role.numeric: ["colname1", "colname2"],
getml.data.role.target: ["colname3"]}
Otherwise, you can use the :class:`~getml.data.Roles` class.
ignore (bool, optional):
Only relevant when roles is not None.
Determines what you want to do with any colnames not
mentioned in roles. Do you want to ignore them (True)
or read them in as unused columns (False)?
dry (bool, optional):
If set to True, then the data
will not actually be read. Instead, the method will only
return the roles it would have used. This can be used
to hard-code roles when setting up a pipeline.
Returns:
:class:`~getml.DataFrame`:
Handler of the underlying data.
"""
if not isinstance(data, dict):
raise TypeError("'data' must be dict.")
return cls.from_arrow(
table=pa.Table.from_pydict(data),
name=name,
roles=roles,
ignore=ignore,
dry=dry,
)
# --------------------------------------------------------------------
[docs] @classmethod
def from_json(cls, json_str, name, roles=None, ignore=False, dry=False):
"""Create a new DataFrame from a JSON string.
It will construct a data frame object in the engine, fill it
with the data read from the JSON string, and return a
corresponding :class:`~getml.DataFrame` handle.
Args:
json_str (str):
The JSON string containing the data.
The json_str should be in the following format:
.. code-block:: python
json_str = "{'col1': [1.0, 2.0, 1.0], 'col2': ['A', 'B', 'C']}"
name (str):
Name of the data frame to be created.
roles(dict[str, List[str]] or :class:`~getml.data.Roles`, optional):
Maps the :mod:`~getml.data.roles` to the
column names (see :meth:`~getml.DataFrame.colnames`).
The `roles` dictionary is expected to have the following format:
.. code-block:: python
roles = {getml.data.role.numeric: ["colname1", "colname2"],
getml.data.role.target: ["colname3"]}
Otherwise, you can use the :class:`~getml.data.Roles` class.
ignore (bool, optional):
Only relevant when roles is not None.
Determines what you want to do with any colnames not
mentioned in roles. Do you want to ignore them (True)
or read them in as unused columns (False)?
dry (bool, optional):
If set to True, then the data
will not actually be read. Instead, the method will only
return the roles it would have used. This can be used
to hard-code roles when setting up a pipeline.
Returns:
:class:`~getml.DataFrame`:
Returns:
:class:`~getml.data.DataFrame`: Handler of the underlying data.
"""
if not isinstance(json_str, str):
raise TypeError("'json_str' must be str.")
return cls.from_dict(
data=json.loads(json_str),
name=name,
roles=roles,
ignore=ignore,
dry=dry,
)
# --------------------------------------------------------------------
[docs] @classmethod
def from_pandas(cls, pandas_df, name, roles=None, ignore=False, dry=False):
"""Create a DataFrame from a :py:class:`pandas.DataFrame`.
It will construct a data frame object in the engine, fill it
with the data read from the :py:class:`pandas.DataFrame`, and
return a corresponding :class:`~getml.DataFrame` handle.
Args:
pandas_df (:py:class:`pandas.DataFrame`):
The table to be read.
name (str):
Name of the data frame to be created.
roles (dict[str, List[str]] or :class:`~getml.data.Roles`, optional):
Maps the :mod:`~getml.data.roles` to the
column names (see :meth:`~getml.DataFrame.colnames`).
The `roles` dictionary is expected to have the following format:
.. code-block:: python
roles = {getml.data.role.numeric: ["colname1", "colname2"],
getml.data.role.target: ["colname3"]}
Otherwise, you can use the :class:`~getml.data.Roles` class.
ignore (bool, optional):
Only relevant when roles is not None.
Determines what you want to do with any colnames not
mentioned in roles. Do you want to ignore them (True)
or read them in as unused columns (False)?
dry (bool, optional):
If set to True, then the data
will not actually be read. Instead, the method will only
return the roles it would have used. This can be used
to hard-code roles when setting up a pipeline.
"""
# ------------------------------------------------------------
if not isinstance(pandas_df, pd.DataFrame):
raise TypeError("'pandas_df' must be of type pandas.DataFrame.")
if not isinstance(name, str):
raise TypeError("'name' must be str.")
# The content of roles is checked in the class constructor called below.
if roles is not None and not isinstance(roles, (dict, Roles)):
raise TypeError("'roles' must be a geml.data.Roles object, a dict or None.")
if not isinstance(ignore, bool):
raise TypeError("'ignore' must be bool.")
if not isinstance(dry, bool):
raise TypeError("'dry' must be bool.")
# ------------------------------------------------------------
pandas_df_modified = _modify_pandas_columns(pandas_df)
# ------------------------------------------------------------
roles = roles.to_dict() if isinstance(roles, Roles) else roles
if roles is None or not ignore:
sniffed_roles = _sniff_pandas(pandas_df_modified)
if roles is None:
roles = sniffed_roles
else:
roles = _update_sniffed_roles(sniffed_roles, roles)
if dry:
return roles
data_frame = cls(name, roles)
return data_frame.read_pandas(pandas_df=pandas_df_modified, append=False)
# --------------------------------------------------------------------
[docs] @classmethod
def from_parquet(cls, fname, name, roles=None, ignore=False, dry=False):
"""Create a DataFrame from parquet files.
This is one of the fastest way to get data into the
getML engine.
Args:
fname (str):
The path of the parquet file to be read.
name (str):
Name of the data frame to be created.
roles (dict[str, List[str]] or :class:`~getml.data.Roles`, optional):
Maps the :mod:`~getml.data.roles` to the
column names (see :meth:`~getml.DataFrame.colnames`).
The `roles` dictionary is expected to have the following format:
.. code-block:: python
roles = {getml.data.role.numeric: ["colname1", "colname2"],
getml.data.role.target: ["colname3"]}
Otherwise, you can use the :class:`~getml.data.Roles` class.
ignore (bool, optional):
Only relevant when roles is not None.
Determines what you want to do with any colnames not
mentioned in roles. Do you want to ignore them (True)
or read them in as unused columns (False)?
dry (bool, optional):
If set to True, then the data
will not actually be read. Instead, the method will only
return the roles it would have used. This can be used
to hard-code roles when setting up a pipeline.
"""
# ------------------------------------------------------------
if not isinstance(name, str):
raise TypeError("'name' must be str.")
# The content of roles is checked in the class constructor called below.
if roles is not None and not isinstance(roles, (dict, Roles)):
raise TypeError("'roles' must be a geml.data.Roles object, a dict or None.")
if not isinstance(ignore, bool):
raise TypeError("'ignore' must be bool.")
if not isinstance(dry, bool):
raise TypeError("'dry' must be bool.")
# ------------------------------------------------------------
roles = roles.to_dict() if isinstance(roles, Roles) else roles
if roles is None or not ignore:
sniffed_roles = _sniff_parquet(fname)
if roles is None:
roles = sniffed_roles
else:
roles = _update_sniffed_roles(sniffed_roles, roles)
if dry:
return roles
data_frame = cls(name, roles)
return data_frame.read_parquet(fname=fname, append=False)
# --------------------------------------------------------------------
[docs] @classmethod
def from_pyspark(cls, spark_df, name, roles=None, ignore=False, dry=False):
"""Create a DataFrame from a :py:class:`pyspark.sql.DataFrame`.
It will construct a data frame object in the engine, fill it
with the data read from the :py:class:`pyspark.sql.DataFrame`, and
return a corresponding :class:`~getml.DataFrame` handle.
Args:
spark_df (:py:class:`pyspark.sql.DataFrame`):
The table to be read.
name (str):
Name of the data frame to be created.
roles (dict[str, List[str]] or :class:`~getml.data.Roles`, optional):
Maps the :mod:`~getml.data.roles` to the
column names (see :meth:`~getml.DataFrame.colnames`).
The `roles` dictionary is expected to have the following format:
.. code-block:: python
roles = {getml.data.role.numeric: ["colname1", "colname2"],
getml.data.role.target: ["colname3"]}
Otherwise, you can use the :class:`~getml.data.Roles` class.
ignore (bool, optional):
Only relevant when roles is not None.
Determines what you want to do with any colnames not
mentioned in roles. Do you want to ignore them (True)
or read them in as unused columns (False)?
dry (bool, optional):
If set to True, then the data
will not actually be read. Instead, the method will only
return the roles it would have used. This can be used
to hard-code roles when setting up a pipeline.
Returns:
:class:`~getml.DataFrame`:
Handler of the underlying data.
"""
# ------------------------------------------------------------
if not isinstance(name, str):
raise TypeError("'name' must be str.")
# The content of roles is checked in the class constructor called below.
if roles is not None and not isinstance(roles, (dict, Roles)):
raise TypeError("'roles' must be a geml.data.Roles object, a dict or None.")
if not isinstance(ignore, bool):
raise TypeError("'ignore' must be bool.")
if not isinstance(dry, bool):
raise TypeError("'dry' must be bool.")
# ------------------------------------------------------------
roles = roles.to_dict() if isinstance(roles, Roles) else roles
if roles is None or not ignore:
head = spark_df.limit(2).toPandas()
sniffed_roles = _sniff_pandas(head)
if roles is None:
roles = sniffed_roles
else:
roles = _update_sniffed_roles(sniffed_roles, roles)
if dry:
return roles
data_frame = cls(name, roles)
return data_frame.read_pyspark(spark_df=spark_df, append=False)
# ------------------------------------------------------------
[docs] @classmethod
def from_s3(
cls,
bucket: str,
keys: List[str],
region: str,
name: str,
num_lines_sniffed=1000,
num_lines_read=0,
sep=",",
skip=0,
colnames=None,
roles=None,
ignore=False,
dry=False,
) -> "DataFrame":
"""Create a DataFrame from CSV files located in an S3 bucket.
This classmethod will construct a data
frame object in the engine, fill it with the data read from
the CSV file(s), and return a corresponding
:class:`~getml.DataFrame` handle.
Args:
bucket (str):
The bucket from which to read the files.
keys (List[str]):
The list of keys (files in the bucket) to be read.
region (str):
The region in which the bucket is located.
name (str):
Name of the data frame to be created.
num_lines_sniffed (int, optional):
Number of lines analyzed by the sniffer.
num_lines_read (int, optional):
Number of lines read from each file.
Set to 0 to read in the entire file.
sep (str, optional):
The separator used for separating fields.
skip (int, optional):
Number of lines to skip at the beginning of each file.
colnames(List[str] or None, optional):
The first line of a CSV file
usually contains the column names. When this is not the case,
you need to explicitly pass them.
roles(dict[str, List[str]] or :class:`~getml.data.Roles`, optional):
Maps the :mod:`~getml.data.roles` to the
column names (see :meth:`~getml.DataFrame.colnames`).
The `roles` dictionary is expected to have the following format:
.. code-block:: python
roles = {getml.data.role.numeric: ["colname1", "colname2"],
getml.data.role.target: ["colname3"]}
Otherwise, you can use the :class:`~getml.data.Roles` class.
ignore (bool, optional):
Only relevant when roles is not None.
Determines what you want to do with any colnames not
mentioned in roles. Do you want to ignore them (True)
or read them in as unused columns (False)?
dry (bool, optional):
If set to True, then the data
will not actually be read. Instead, the method will only
return the roles it would have used. This can be used
to hard-code roles when setting up a pipeline.
Returns:
:class:`~getml.DataFrame`:
Handler of the underlying data.
Example:
Let's assume you have two CSV files - *file1.csv* and
*file2.csv* - in the bucket. You can
import their data into the getML engine using the following
commands:
>>> getml.engine.set_s3_access_key_id("YOUR-ACCESS-KEY-ID")
>>>
>>> getml.engine.set_s3_secret_access_key("YOUR-SECRET-ACCESS-KEY")
>>>
>>> data_frame_expd = data.DataFrame.from_s3(
... bucket="your-bucket-name",
... keys=["file1.csv", "file2.csv"],
... region="us-east-2",
... name="MY DATA FRAME",
... sep=';'
... )
You can also set the access credential as environment variables
before you launch the getML engine.
Also refer to the documention on :meth:`~getml.DataFrame.from_csv`
for further information on overriding the CSV sniffer for greater
type safety.
Note:
Not supported in the getML community edition.
"""
if isinstance(keys, str):
keys = [keys]
if not isinstance(bucket, str):
raise TypeError("'bucket' must be str.")
if not _is_non_empty_typed_list(keys, str):
raise TypeError("'keys' must be either a string or a list of str")
if not isinstance(region, str):
raise TypeError("'region' must be str.")
if not isinstance(name, str):
raise TypeError("'name' must be str.")
if not isinstance(num_lines_sniffed, numbers.Real):
raise TypeError("'num_lines_sniffed' must be a real number")
if not isinstance(num_lines_read, numbers.Real):
raise TypeError("'num_lines_read' must be a real number")
if not isinstance(sep, str):
raise TypeError("'sep' must be str.")
if not isinstance(skip, numbers.Real):
raise TypeError("'skip' must be a real number")
if roles is not None and not isinstance(roles, (dict, Roles)):
raise TypeError("'roles' must be a geml.data.Roles object, a dict or None.")
if not isinstance(ignore, bool):
raise TypeError("'ignore' must be bool.")
if not isinstance(dry, bool):
raise TypeError("'dry' must be bool.")
if colnames is not None and not _is_non_empty_typed_list(colnames, str):
raise TypeError(
"'colnames' must be either be None or a non-empty list of str."
)
roles = roles.to_dict() if isinstance(roles, Roles) else roles
if roles is None or not ignore:
sniffed_roles = _sniff_s3(
bucket=bucket,
keys=keys,
region=region,
num_lines_sniffed=int(num_lines_sniffed),
sep=sep,
skip=int(skip),
colnames=colnames,
)
if roles is None:
roles = sniffed_roles
else:
roles = _update_sniffed_roles(sniffed_roles, roles)
if dry:
return roles
data_frame = cls(name, roles)
return data_frame.read_s3(
bucket=bucket,
keys=keys,
region=region,
append=False,
sep=sep,
num_lines_read=int(num_lines_read),
skip=int(skip),
colnames=colnames,
)
# ------------------------------------------------------------
[docs] @classmethod
def from_view(
cls,
view,
name,
dry=False,
):
"""Create a DataFrame from a :class:`~getml.data.View`.
This classmethod will construct a data
frame object in the engine, fill it with the data read from
the :class:`~getml.data.View`, and return a corresponding
:class:`~getml.DataFrame` handle.
Args:
view (:class:`~getml.data.View`):
The view from which we want to read the data.
name (str):
Name of the data frame to be created.
dry (bool, optional):
If set to True, then the data
will not actually be read. Instead, the method will only
return the roles it would have used. This can be used
to hard-code roles when setting up a pipeline.
Returns:
:class:`~getml.DataFrame`:
Handler of the underlying data.
"""
# ------------------------------------------------------------
if not isinstance(view, View):
raise TypeError("'view' must be getml.data.View.")
if not isinstance(name, str):
raise TypeError("'name' must be str.")
if not isinstance(dry, bool):
raise TypeError("'dry' must be bool.")
# ------------------------------------------------------------
if dry:
return view.roles
data_frame = cls(name)
# ------------------------------------------------------------
return data_frame.read_view(view=view, append=False)
# ------------------------------------------------------------
@property
def _join_key_names(self):
return [col.name for col in self._join_key_columns]
# ------------------------------------------------------------
@property
def last_change(self) -> str:
"""
A string describing the last time this data frame has been changed.
"""
return _last_change(self.name)
# ------------------------------------------------------------
[docs] def load(self) -> "DataFrame":
"""Loads saved data from disk.
The data frame object holding the same name as the current
:class:`~getml.DataFrame` instance will be loaded from
disk into the getML engine and updates the current handler
using :meth:`~getml.DataFrame.refresh`.
Examples:
First, we have to create and imporimport data sets.
.. code-block:: python
d, _ = getml.datasets.make_numerical(population_name = 'test')
getml.data.list_data_frames()
In the output of :func:`~getml.data.list_data_frames` we
can find our underlying data frame object 'test' listed
under the 'in_memory' key (it was created and imported by
:func:`~getml.datasets.make_numerical`). This means the
getML engine does only hold it in memory (RAM) yet and we
still have to :meth:`~getml.DataFrame.save` it to
disk in order to :meth:`~getml.DataFrame.load` it
again or to prevent any loss of information between
different sessions.
.. code-block:: python
d.save()
getml.data.list_data_frames()
d2 = getml.DataFrame(name = 'test').load()
Returns:
:class:`~getml.DataFrame`:
Updated handle the underlying data frame in the getML
engine.
Note:
When invoking :meth:`~getml.DataFrame.load` all
changes of the underlying data frame object that took
place after the last call to the
:meth:`~getml.DataFrame.save` method will be
lost. Thus, this method enables you to undo changes
applied to the :class:`~getml.DataFrame`.
.. code-block:: python
d, _ = getml.datasets.make_numerical()
d.save()
# Accidental change we want to undo
d.rm('column_01')
d.load()
If :meth:`~getml.DataFrame.save` hasn't be called
on the current instance yet or it wasn't stored to disk in
a previous session, :meth:`~getml.DataFrame.load`
will throw an exception
File or directory '../projects/X/data/Y/' not found!
Alternatively, :func:`~getml.data.load_data_frame`
offers an easier way of creating
:class:`~getml.DataFrame` handlers to data in the
getML engine.
"""
cmd: Dict[str, Any] = {}
cmd["type_"] = "DataFrame.load"
cmd["name_"] = self.name
comm.send(cmd)
return self.refresh()
# ------------------------------------------------------------
@property
def memory_usage(self):
"""
Convencience wrapper that returns the memory usage in MB.
"""
return self.nbytes() / 1e06
# ------------------------------------------------------------
@property
def _monitor_url(self) -> Optional[str]:
"""
A link to the data frame in the getML monitor.
"""
url = comm._monitor_url()
return (
url + "getdataframe/" + comm._get_project_name() + "/" + self.name + "/"
if url
else None
)
# ------------------------------------------------------------
[docs] def nbytes(self):
"""Size of the data stored in the underlying data frame in the getML
engine.
Returns:
:py:class:`numpy.uint64`:
Size of the underlying object in bytes.
"""
cmd: Dict[str, Any] = {}
cmd["type_"] = "DataFrame.nbytes"
cmd["name_"] = self.name
with comm.send_and_get_socket(cmd) as sock:
msg = comm.recv_string(sock)
if msg != "Found!":
sock.close()
comm.engine_exception_handler(msg)
nbytes = comm.recv_string(sock)
return np.uint64(nbytes)
# ------------------------------------------------------------
[docs] def ncols(self):
"""
Number of columns in the current instance.
Returns:
int:
Overall number of columns
"""
return len(self.colnames)
# ------------------------------------------------------------
[docs] def nrows(self):
"""
Number of rows in the current instance.
"""
cmd: Dict[str, Any] = {}
cmd["type_"] = "DataFrame.nrows"
cmd["name_"] = self.name
with comm.send_and_get_socket(cmd) as sock:
msg = comm.recv_string(sock)
if msg != "Found!":
sock.close()
comm.engine_exception_handler(msg)
nrows = comm.recv_string(sock)
return int(nrows)
# ------------------------------------------------------------
@property
def _numerical_names(self):
return [col.name for col in self._numerical_columns]
# --------------------------------------------------------------------------
[docs] def read_arrow(self, table, append=False):
"""Uploads a :class:`pyarrow.Table`.
Replaces the actual content of the underlying data frame in
the getML engine with `table`.
Args:
table (:class:`pyarrow.Table`):
Data the underlying data frame object in the getML
engine should obtain.
append (bool, optional):
If a data frame object holding the same ``name`` is
already present in the getML engine, should the content in
`query` be appended or replace the existing data?
Returns:
:class:`~getml.DataFrame`:
Current instance.
Note:
For columns containing :class:`pandas.Timestamp` there can
be small inconsistencies in the order to microseconds
when sending the data to the getML engine. This is due to
the way the underlying information is stored.
"""
# ------------------------------------------------------------
if not isinstance(table, pa.Table):
raise TypeError("'table' must be of type pyarrow.Table.")
if not isinstance(append, bool):
raise TypeError("'append' must be bool.")
# ------------------------------------------------------------
if self.ncols() == 0:
raise Exception(
"""Reading data is only possible in a DataFrame with more than zero
columns. You can pre-define columns during
initialization of the DataFrame or use the classmethod
from_pandas(...)."""
)
cmd: Dict[str, Any] = {}
cmd["type_"] = "DataFrame.from_arrow"
cmd["name_"] = self.name
cmd["append_"] = append
cmd["categorical_"] = self._categorical_names
cmd["join_keys_"] = self._join_key_names
cmd["numerical_"] = self._numerical_names
cmd["targets_"] = self._target_names
cmd["text_"] = self._text_names
cmd["time_stamps_"] = self._time_stamp_names
cmd["unused_floats_"] = self._unused_float_names
cmd["unused_strings_"] = self._unused_string_names
with comm.send_and_get_socket(cmd) as sock:
with sock.makefile(mode="wb") as sink:
batches = table.to_batches()
with pa.ipc.new_stream(sink, table.schema) as writer:
for batch in batches:
writer.write_batch(batch)
msg = comm.recv_string(sock)
if msg != "Success!":
comm.engine_exception_handler(msg)
return self.refresh()
# --------------------------------------------------------------------------
[docs] def read_csv(
self,
fnames,
append=False,
quotechar='"',
sep=",",
num_lines_read=0,
skip=0,
colnames=None,
time_formats=None,
verbose=True,
) -> "DataFrame":
"""Read CSV files.
It is assumed that the first line of each CSV file contains a
header with the column names.
Args:
fnames (List[str]):
CSV file paths to be read.
append (bool, optional):
If a data frame object holding the same ``name`` is
already present in the getML, should the content of of
the CSV files in `fnames` be appended or replace the
existing data?
quotechar (str, optional):
The character used to wrap strings.
sep (str, optional):
The separator used for separating fields.
num_lines_read (int, optional):
Number of lines read from each file.
Set to 0 to read in the entire file.
skip (int, optional):
Number of lines to skip at the beginning of each file.
colnames(List[str] or None, optional):
The first line of a CSV file
usually contains the column names.
When this is not the case, you need to explicitly pass them.
time_formats (List[str], optional):
The list of formats tried when parsing time stamps.
The formats are allowed to contain the following
special characters:
* %w - abbreviated weekday (Mon, Tue, ...)
* %W - full weekday (Monday, Tuesday, ...)
* %b - abbreviated month (Jan, Feb, ...)
* %B - full month (January, February, ...)
* %d - zero-padded day of month (01 .. 31)
* %e - day of month (1 .. 31)
* %f - space-padded day of month ( 1 .. 31)
* %m - zero-padded month (01 .. 12)
* %n - month (1 .. 12)
* %o - space-padded month ( 1 .. 12)
* %y - year without century (70)
* %Y - year with century (1970)
* %H - hour (00 .. 23)
* %h - hour (00 .. 12)
* %a - am/pm
* %A - AM/PM
* %M - minute (00 .. 59)
* %S - second (00 .. 59)
* %s - seconds and microseconds (equivalent to %S.%F)
* %i - millisecond (000 .. 999)
* %c - centisecond (0 .. 9)
* %F - fractional seconds/microseconds (000000 - 999999)
* %z - time zone differential in ISO 8601 format (Z or +NN.NN)
* %Z - time zone differential in RFC format (GMT or +NNNN)
* %% - percent sign
verbose (bool, optional):
If True, when fnames are urls, the filenames are printed to
stdout during the download.
Returns:
:class:`~getml.DataFrame`:
Handler of the underlying data.
"""
time_formats = time_formats or constants.TIME_FORMATS
if not isinstance(fnames, list):
fnames = [fnames]
if not _is_non_empty_typed_list(fnames, str):
raise TypeError("'fnames' must be either a string or a list of str")
if not isinstance(append, bool):
raise TypeError("'append' must be bool.")
if not isinstance(quotechar, str):
raise TypeError("'quotechar' must be str.")
if not isinstance(sep, str):
raise TypeError("'sep' must be str.")
if not isinstance(num_lines_read, numbers.Real):
raise TypeError("'num_lines_read' must be a real number")
if not isinstance(skip, numbers.Real):
raise TypeError("'skip' must be a real number")
if not _is_non_empty_typed_list(time_formats, str):
raise TypeError("'time_formats' must be a non-empty list of str")
if colnames is not None and not _is_non_empty_typed_list(colnames, str):
raise TypeError(
"'colnames' must be either be None or a non-empty list of str."
)
if self.ncols() == 0:
raise Exception(
"""Reading data is only possible in a DataFrame with more than zero
columns. You can pre-define columns during
initialization of the DataFrame or use the classmethod
from_csv(...)."""
)
if not _is_non_empty_typed_list(fnames, str):
raise TypeError(
"""'fnames' must be a list containing at
least one path to a CSV file"""
)
fnames_ = _retrieve_urls(fnames, verbose)
cmd: Dict[str, Any] = {}
cmd["type_"] = "DataFrame.read_csv"
cmd["name_"] = self.name
cmd["fnames_"] = fnames_
cmd["append_"] = append
cmd["num_lines_read_"] = num_lines_read
cmd["quotechar_"] = quotechar
cmd["sep_"] = sep
cmd["skip_"] = skip
cmd["time_formats_"] = time_formats
if colnames is not None:
cmd["colnames_"] = colnames
cmd["categorical_"] = self._categorical_names
cmd["join_keys_"] = self._join_key_names
cmd["numerical_"] = self._numerical_names
cmd["targets_"] = self._target_names
cmd["text_"] = self._text_names
cmd["time_stamps_"] = self._time_stamp_names
cmd["unused_floats_"] = self._unused_float_names
cmd["unused_strings_"] = self._unused_string_names
comm.send(cmd)
return self
# --------------------------------------------------------------------------
[docs] def read_json(self, json_str, append=False, time_formats=None):
"""Fill from JSON
Fills the data frame with data from a JSON string.
Args:
json_str (str):
The JSON string containing the data.
append (bool, optional):
If a data frame object holding the same ``name`` is
already present in the getML, should the content of
`json_str` be appended or replace the existing data?
time_formats (List[str], optional):
The list of formats tried when parsing time stamps.
The formats are allowed to contain the following
special characters:
* %w - abbreviated weekday (Mon, Tue, ...)
* %W - full weekday (Monday, Tuesday, ...)
* %b - abbreviated month (Jan, Feb, ...)
* %B - full month (January, February, ...)
* %d - zero-padded day of month (01 .. 31)
* %e - day of month (1 .. 31)
* %f - space-padded day of month ( 1 .. 31)
* %m - zero-padded month (01 .. 12)
* %n - month (1 .. 12)
* %o - space-padded month ( 1 .. 12)
* %y - year without century (70)
* %Y - year with century (1970)
* %H - hour (00 .. 23)
* %h - hour (00 .. 12)
* %a - am/pm
* %A - AM/PM
* %M - minute (00 .. 59)
* %S - second (00 .. 59)
* %s - seconds and microseconds (equivalent to %S.%F)
* %i - millisecond (000 .. 999)
* %c - centisecond (0 .. 9)
* %F - fractional seconds/microseconds (000000 - 999999)
* %z - time zone differential in ISO 8601 format (Z or +NN.NN)
* %Z - time zone differential in RFC format (GMT or +NNNN)
* %% - percent sign
Returns:
:class:`~getml.DataFrame`:
Handler of the underlying data.
Note:
This does not support NaN values. If you want support for NaN,
use :meth:`~getml.DataFrame.from_json` instead.
"""
time_formats = time_formats or constants.TIME_FORMATS
if self.ncols() == 0:
raise Exception(
"""Reading data is only possible in a DataFrame with more than zero
columns. You can pre-define columns during
initialization of the DataFrame or use the classmethod
from_json(...)."""
)
if not isinstance(json_str, str):
raise TypeError("'json_str' must be of type str")
if not isinstance(append, bool):
raise TypeError("'append' must be of type bool")
if not _is_non_empty_typed_list(time_formats, str):
raise TypeError(
"""'time_formats' must be a list of strings
containing at least one time format"""
)
cmd: Dict[str, Any] = {}
cmd["type_"] = "DataFrame.from_json"
cmd["name_"] = self.name
cmd["categorical_"] = self._categorical_names
cmd["join_keys_"] = self._join_key_names
cmd["numerical_"] = self._numerical_names
cmd["targets_"] = self._target_names
cmd["text_"] = self._text_names
cmd["time_stamps_"] = self._time_stamp_names
cmd["unused_floats_"] = self._unused_float_names
cmd["unused_strings_"] = self._unused_string_names
cmd["append_"] = append
cmd["time_formats_"] = time_formats
with comm.send_and_get_socket(cmd) as sock:
comm.send_string(sock, json_str)
msg = comm.recv_string(sock)
if msg != "Success!":
comm.engine_exception_handler(msg)
return self
# --------------------------------------------------------------------------
[docs] def read_parquet(
self,
fname: str,
append: bool = False,
verbose: bool = True,
) -> "DataFrame":
"""Read a parquet file.
Args:
fname (str):
The filepath of the parquet file to be read.
append (bool, optional):
If a data frame object holding the same ``name`` is
already present in the getML, should the content of of
the CSV files in `fnames` be appended or replace the
existing data?
"""
if not isinstance(fname, str):
raise TypeError("'fname' must be str.")
if not isinstance(append, bool):
raise TypeError("'append' must be bool.")
if self.ncols() == 0:
raise Exception(
"""Reading data is only possible in a DataFrame with more than
zero columns. You can pre-define columns during
initialization of the DataFrame or use the classmethod
from_parquet(...)."""
)
fname_ = _retrieve_urls([fname], verbose)[0]
cmd: Dict[str, Any] = {}
cmd["type_"] = "DataFrame.read_parquet"
cmd["name_"] = self.name
cmd["fname_"] = fname_
cmd["append_"] = append
cmd["categorical_"] = self._categorical_names
cmd["join_keys_"] = self._join_key_names
cmd["numerical_"] = self._numerical_names
cmd["targets_"] = self._target_names
cmd["text_"] = self._text_names
cmd["time_stamps_"] = self._time_stamp_names
cmd["unused_floats_"] = self._unused_float_names
cmd["unused_strings_"] = self._unused_string_names
comm.send(cmd)
return self
# --------------------------------------------------------------------------
[docs] def read_s3(
self,
bucket: str,
keys: List[str],
region: str,
append: bool = False,
sep: str = ",",
num_lines_read: int = 0,
skip: int = 0,
colnames: Optional[List[str]] = None,
time_formats: Optional[List[str]] = None,
):
"""Read CSV files from an S3 bucket.
It is assumed that the first line of each CSV file contains a
header with the column names.
Args:
bucket (str):
The bucket from which to read the files.
keys (List[str]):
The list of keys (files in the bucket) to be read.
region (str):
The region in which the bucket is located.
append (bool, optional):
If a data frame object holding the same ``name`` is
already present in the getML, should the content of of
the CSV files in `fnames` be appended or replace the
existing data?
sep (str, optional):
The separator used for separating fields.
num_lines_read (int, optional):
Number of lines read from each file.
Set to 0 to read in the entire file.
skip (int, optional):
Number of lines to skip at the beginning of each file.
colnames(List[str] or None, optional):
The first line of a CSV file
usually contains the column names.
When this is not the case, you need to explicitly pass them.
time_formats (List[str], optional):
The list of formats tried when parsing time stamps.
The formats are allowed to contain the following
special characters:
* %w - abbreviated weekday (Mon, Tue, ...)
* %W - full weekday (Monday, Tuesday, ...)
* %b - abbreviated month (Jan, Feb, ...)
* %B - full month (January, February, ...)
* %d - zero-padded day of month (01 .. 31)
* %e - day of month (1 .. 31)
* %f - space-padded day of month ( 1 .. 31)
* %m - zero-padded month (01 .. 12)
* %n - month (1 .. 12)
* %o - space-padded month ( 1 .. 12)
* %y - year without century (70)
* %Y - year with century (1970)
* %H - hour (00 .. 23)
* %h - hour (00 .. 12)
* %a - am/pm
* %A - AM/PM
* %M - minute (00 .. 59)
* %S - second (00 .. 59)
* %s - seconds and microseconds (equivalent to %S.%F)
* %i - millisecond (000 .. 999)
* %c - centisecond (0 .. 9)
* %F - fractional seconds/microseconds (000000 - 999999)
* %z - time zone differential in ISO 8601 format (Z or +NN.NN)
* %Z - time zone differential in RFC format (GMT or +NNNN)
* %% - percent sign
Returns:
:class:`~getml.DataFrame`:
Handler of the underlying data.
Note:
Not supported in the getML community edition.
"""
time_formats = time_formats or constants.TIME_FORMATS
if not isinstance(keys, list):
keys = [keys]
if not isinstance(bucket, str):
raise TypeError("'bucket' must be str.")
if not _is_non_empty_typed_list(keys, str):
raise TypeError("'keys' must be either a string or a list of str")
if not isinstance(region, str):
raise TypeError("'region' must be str.")
if not isinstance(append, bool):
raise TypeError("'append' must be bool.")
if not isinstance(sep, str):
raise TypeError("'sep' must be str.")
if not isinstance(num_lines_read, numbers.Real):
raise TypeError("'num_lines_read' must be a real number")
if not isinstance(skip, numbers.Real):
raise TypeError("'skip' must be a real number")
if not _is_non_empty_typed_list(time_formats, str):
raise TypeError("'time_formats' must be a non-empty list of str")
if colnames is not None and not _is_non_empty_typed_list(colnames, str):
raise TypeError(
"'colnames' must be either be None or a non-empty list of str."
)
if self.ncols() == 0:
raise Exception(
"""Reading data is only possible in a DataFrame with more than zero
columns. You can pre-define columns during
initialization of the DataFrame or use the classmethod
from_s3(...)."""
)
cmd: Dict[str, Any] = {}
cmd["type_"] = "DataFrame.read_s3"
cmd["name_"] = self.name
cmd["append_"] = append
cmd["bucket_"] = bucket
cmd["keys_"] = keys
cmd["region_"] = region
cmd["sep_"] = sep
cmd["time_formats_"] = time_formats
cmd["num_lines_read_"] = num_lines_read
cmd["skip_"] = skip
if colnames is not None:
cmd["colnames_"] = colnames
cmd["categorical_"] = self._categorical_names
cmd["join_keys_"] = self._join_key_names
cmd["numerical_"] = self._numerical_names
cmd["targets_"] = self._target_names
cmd["text_"] = self._text_names
cmd["time_stamps_"] = self._time_stamp_names
cmd["unused_floats_"] = self._unused_float_names
cmd["unused_strings_"] = self._unused_string_names
comm.send(cmd)
return self
# ------------------------------------------------------------
[docs] def read_view(
self,
view: View,
append: bool = False,
) -> "DataFrame":
"""Read the data from a :class:`~getml.data.View`.
Args:
view (:class:`~getml.data.View`):
The view to read.
append (bool, optional):
If a data frame object holding the same ``name`` is
already present in the getML, should the content of of
the CSV files in `fnames` be appended or replace the
existing data?
Returns:
:class:`~getml.DataFrame`:
Handler of the underlying data.
"""
if not isinstance(view, View):
raise TypeError("'view' must be getml.data.View.")
if not isinstance(append, bool):
raise TypeError("'append' must be bool.")
view.check()
cmd: Dict[str, Any] = {}
cmd["type_"] = "DataFrame.from_view"
cmd["name_"] = self.name
cmd["view_"] = view._getml_deserialize()
cmd["append_"] = append
comm.send(cmd)
return self.refresh()
# --------------------------------------------------------------------------
[docs] def read_db(self, table_name: str, append: bool = False, conn=None) -> "DataFrame":
"""
Fill from Database.
The DataFrame will be filled from a table in the database.
Args:
table_name(str):
Table from which we want to retrieve the data.
append(bool, optional):
If a data frame object holding the same ``name`` is
already present in the getML, should the content of
`table_name` be appended or replace the existing data?
conn (:class:`~getml.database.Connection`, optional):
The database connection to be used.
If you don't explicitly pass a connection,
the engine will use the default connection.
Returns:
:class:`~getml.DataFrame`:
Handler of the underlying data.
"""
if not isinstance(table_name, str):
raise TypeError("'table_name' must be str.")
if not isinstance(append, bool):
raise TypeError("'append' must be bool.")
if self.ncols() == 0:
raise Exception(
"""Reading data is only possible in a DataFrame with more than zero
columns. You can pre-define columns during
initialization of the DataFrame or use the classmethod
from_db(...)."""
)
conn = conn or database.Connection()
cmd: Dict[str, Any] = {}
cmd["type_"] = "DataFrame.from_db"
cmd["name_"] = self.name
cmd["table_name_"] = table_name
cmd["categorical_"] = self._categorical_names
cmd["join_keys_"] = self._join_key_names
cmd["numerical_"] = self._numerical_names
cmd["targets_"] = self._target_names
cmd["text_"] = self._text_names
cmd["time_stamps_"] = self._time_stamp_names
cmd["unused_floats_"] = self._unused_float_names
cmd["unused_strings_"] = self._unused_string_names
cmd["append_"] = append
cmd["conn_id_"] = conn.conn_id
comm.send(cmd)
return self
# --------------------------------------------------------------------------
[docs] def read_pandas(self, pandas_df: pd.DataFrame, append: bool = False) -> "DataFrame":
"""Uploads a :class:`pandas.DataFrame`.
Replaces the actual content of the underlying data frame in
the getML engine with `pandas_df`.
Args:
pandas_df (:class:`pandas.DataFrame`):
Data the underlying data frame object in the getML
engine should obtain.
append (bool, optional):
If a data frame object holding the same ``name`` is
already present in the getML engine, should the content in
`query` be appended or replace the existing data?
Note:
For columns containing :class:`pandas.Timestamp` there can
occur small inconsistencies in the order to microseconds
when sending the data to the getML engine. This is due to
the way the underlying information is stored.
"""
if not isinstance(pandas_df, pd.DataFrame):
raise TypeError("'pandas_df' must be of type pandas.DataFrame.")
if not isinstance(append, bool):
raise TypeError("'append' must be bool.")
if self.ncols() == 0:
raise Exception(
"""Reading data is only possible in a DataFrame with more than zero
columns. You can pre-define columns during
initialization of the DataFrame or use the classmethod
from_pandas(...)."""
)
table = pa.Table.from_pandas(_modify_pandas_columns(pandas_df))
return self.read_arrow(table, append=append)
# --------------------------------------------------------------------------
[docs] def read_pyspark(self, spark_df, append: bool = False) -> "DataFrame":
"""Uploads a :py:class:`pyspark.sql.DataFrame`.
Replaces the actual content of the underlying data frame in
the getML engine with `pandas_df`.
Args:
spark_df (:py:class:`pyspark.sql.DataFrame`):
Data the underlying data frame object in the getML
engine should obtain.
append (bool, optional):
If a data frame object holding the same ``name`` is
already present in the getML engine, should the content in
`query` be appended or replace the existing data?
"""
if not isinstance(append, bool):
raise TypeError("'append' must be bool.")
temp_dir = _retrieve_temp_dir()
os.makedirs(temp_dir, exist_ok=True)
path = os.path.join(temp_dir, self.name)
spark_df.write.mode("overwrite").parquet(path)
filepaths = [
os.path.join(path, filepath)
for filepath in os.listdir(path)
if filepath[-8:] == ".parquet"
]
for i, filepath in enumerate(filepaths):
self.read_parquet(filepath, append or i > 0)
shutil.rmtree(path)
return self
# --------------------------------------------------------------------------
[docs] def read_query(self, query: str, append: bool = False, conn=None) -> "DataFrame":
"""Fill from query
Fills the data frame with data from a table in the database.
Args:
query (str):
The query used to retrieve the data.
append (bool, optional):
If a data frame object holding the same ``name`` is
already present in the getML engine, should the content in
`query` be appended or replace the existing data?
conn (:class:`~getml.database.Connection`, optional):
The database connection to be used.
If you don't explicitly pass a connection,
the engine will use the default connection.
Returns:
:class:`~getml.DataFrame`:
Handler of the underlying data.
"""
if self.ncols() == 0:
raise Exception(
"""Reading data is only possible in a DataFrame with more than zero
columns. You can pre-define columns during
initialization of the DataFrame or use the classmethod
from_db(...)."""
)
if not isinstance(query, str):
raise TypeError("'query' must be of type str")
if not isinstance(append, bool):
raise TypeError("'append' must be of type bool")
conn = conn or database.Connection()
cmd: Dict[str, Any] = {}
cmd["type_"] = "DataFrame.from_query"
cmd["name_"] = self.name
cmd["query_"] = query
cmd["categorical_"] = self._categorical_names
cmd["join_keys_"] = self._join_key_names
cmd["numerical_"] = self._numerical_names
cmd["targets_"] = self._target_names
cmd["text_"] = self._text_names
cmd["time_stamps_"] = self._time_stamp_names
cmd["unused_floats_"] = self._unused_float_names
cmd["unused_strings_"] = self._unused_string_names
cmd["append_"] = append
cmd["conn_id_"] = conn.conn_id
comm.send(cmd)
return self
# --------------------------------------------------------------------------
[docs] def refresh(self) -> "DataFrame":
"""Aligns meta-information of the current instance with the
corresponding data frame in the getML engine.
This method can be used to avoid encoding conflicts. Note that
:meth:`~getml.DataFrame.load` as well as several other
methods automatically call :meth:`~getml.DataFrame.refresh`.
Returns:
:class:`~getml.DataFrame`:
Updated handle the underlying data frame in the getML
engine.
"""
cmd: Dict[str, Any] = {}
cmd["type_"] = "DataFrame.refresh"
cmd["name_"] = self.name
with comm.send_and_get_socket(cmd) as sock:
msg = comm.recv_string(sock)
if msg[0] != "{":
comm.engine_exception_handler(msg)
roles = json.loads(msg)
self.__init__(name=self.name, roles=roles) # type: ignore
return self
# ------------------------------------------------------------
@property
def roles(self):
"""
The roles of the columns included
in this DataFrame.
"""
return Roles(
categorical=self._categorical_names,
join_key=self._join_key_names,
numerical=self._numerical_names,
target=self._target_names,
text=self._text_names,
time_stamp=self._time_stamp_names,
unused_float=self._unused_float_names,
unused_string=self._unused_string_names,
)
# ------------------------------------------------------------
[docs] def remove_subroles(self, cols):
"""Removes all :mod:`~getml.data.subroles` from one or more columns.
Args:
columns (str, FloatColumn, StringColumn, or List[str, FloatColumn, StringColumn]):
The columns or the names thereof.
"""
names = _handle_cols(cols)
for name in names:
self._set_subroles(name, subroles=[], append=False)
self.refresh()
# ------------------------------------------------------------
[docs] def remove_unit(self, cols):
"""Removes the unit from one or more columns.
Args:
columns (str, FloatColumn, StringColumn, or List[str, FloatColumn, StringColumn]):
The columns or the names thereof.
"""
names = _handle_cols(cols)
for name in names:
self._set_unit(name, "")
self.refresh()
# ------------------------------------------------------------
@property
def rowid(self):
"""
The rowids for this data frame.
"""
return rowid()[: self.nrows()]
# ------------------------------------------------------------
[docs] def save(self) -> "DataFrame":
"""Writes the underlying data in the getML engine to disk.
Returns:
:class:`~getml.DataFrame`:
The current instance.
"""
cmd: Dict[str, Any] = {}
cmd["type_"] = "DataFrame.save"
cmd["name_"] = self.name
comm.send(cmd)
return self
# ------------------------------------------------------------
[docs] def set_role(self, cols, role, time_formats=None):
"""Assigns a new role to one or more columns.
When switching from a role based on type float to a role based on type
string or vice verse, an implicit type conversion will be conducted.
The :code:`time_formats` argument is used to interpret :ref:`time
format string <annotating_roles_time_stamp>`. For more information on
roles, please refer to the :ref:`user guide <annotating>`.
Args:
columns (str, FloatColumn, StringColumn, or List[str, FloatColumn, StringColumn]):
The columns or the names of the columns.
role (str):
The role to be assigned.
time_formats (str or List[str], optional):
Formats to be used to parse the time stamps.
This is only necessary, if an implicit conversion from a StringColumn to
a time stamp is taking place.
Example:
.. code-block:: python
data_df = dict(
animal=["hawk", "parrot", "goose"],
votes=[12341, 5127, 65311],
date=["04/06/2019", "01/03/2019", "24/12/2018"])
df = getml.DataFrame.from_dict(data_df, "animal_elections")
df.set_role(['animal'], getml.data.roles.categorical)
df.set_role(['votes'], getml.data.roles.numerical)
df.set_role(
['date'], getml.data.roles.time_stamp, time_formats=['%d/%m/%Y'])
df
.. code-block:: pycon
| date | animal | votes |
| time stamp | categorical | numerical |
---------------------------------------------------------
| 2019-06-04T00:00:00.000000Z | hawk | 12341 |
| 2019-03-01T00:00:00.000000Z | parrot | 5127 |
| 2018-12-24T00:00:00.000000Z | goose | 65311 |
"""
# ------------------------------------------------------------
time_formats = time_formats or constants.TIME_FORMATS
# ------------------------------------------------------------
names = _handle_cols(cols)
if not isinstance(role, str):
raise TypeError("'role' must be str.")
if not _is_non_empty_typed_list(time_formats, str):
raise TypeError("'time_formats' must be a non-empty list of str")
# ------------------------------------------------------------
for nname in names:
if nname not in self.colnames:
raise ValueError("No column called '" + nname + "' found.")
if role not in self._possible_keys:
raise ValueError(
"'role' must be one of the following values: "
+ str(self._possible_keys)
)
# ------------------------------------------------------------
for name in names:
if self[name].role != role:
self._set_role(name, role, time_formats)
# ------------------------------------------------------------
self.refresh()
# ------------------------------------------------------------
[docs] def set_subroles(self, cols, subroles, append=True):
"""Assigns one or several new :mod:`~getml.data.subroles` to one or more columns.
Args:
columns (str, FloatColumn, StringColumn, or List[str, FloatColumn, StringColumn]):
The columns or the names thereof.
subroles (str or List[str]):
The subroles to be assigned.
Must be from :mod:`~getml.data.subroles`.
append (bool, optional):
Whether you want to append the
new subroles to the existing subroles.
"""
names = _handle_cols(cols)
if isinstance(subroles, str):
subroles = [subroles]
if not _is_non_empty_typed_list(subroles, str):
raise TypeError("'subroles' must be either a string or a list of strings.")
if not isinstance(append, bool):
raise TypeError("'append' must be a bool.")
for name in names:
self._set_subroles(name, subroles, append)
self.refresh()
# ------------------------------------------------------------
[docs] def set_unit(self, cols, unit, comparison_only=False):
"""Assigns a new unit to one or more columns.
Args:
columns (str, FloatColumn, StringColumn, or List[str, FloatColumn, StringColumn]):
The columns or the names thereof.
unit (str):
The unit to be assigned.
comparison_only (bool):
Whether you want the column to
be used for comparison only. This means that the column can
only be used in comparison to other columns of the same unit.
An example might be a bank account number: The number in itself
is hardly interesting, but it might be useful to know how often
we have seen that same bank account number in another table.
"""
names = _handle_cols(cols)
if not isinstance(unit, str):
raise TypeError("Parameter 'unit' must be a str.")
if comparison_only:
unit += COMPARISON_ONLY
for name in names:
self._set_unit(name, unit)
self.refresh()
# ------------------------------------------------------------
@property
def shape(self):
"""
A tuple containing the number of rows and columns of
the DataFrame.
"""
self.refresh()
return (self.nrows(), self.ncols())
# ------------------------------------------------------------
@property
def _target_names(self):
return [col.name for col in self._target_columns]
# ------------------------------------------------------------
@property
def _text_names(self):
return [col.name for col in self._text_columns]
# ------------------------------------------------------------
@property
def _time_stamp_names(self):
return [col.name for col in self._time_stamp_columns]
# ----------------------------------------------------------------
[docs] def to_arrow(self):
"""Creates a :py:class:`pyarrow.Table` from the current instance.
Loads the underlying data from the getML engine and constructs
a :class:`pyarrow.Table`.
Returns:
:class:`pyarrow.Table`:
Pyarrow equivalent of the current instance including
its underlying data.
"""
return _to_arrow(self)
# ------------------------------------------------------------
[docs] def to_csv(
self, fname: str, quotechar: str = '"', sep: str = ",", batch_size: int = 0
):
"""
Writes the underlying data into a newly created CSV file.
Args:
fname (str):
The name of the CSV file.
The ending ".csv" and an optional batch number will
be added automatically.
quotechar (str, optional):
The character used to wrap strings.
sep (str, optional):
The character used for separating fields.
batch_size(int, optional):
Maximum number of lines per file. Set to 0 to read
the entire data frame into a single file.
"""
self.refresh()
if not isinstance(fname, str):
raise TypeError("'fname' must be of type str")
if not isinstance(quotechar, str):
raise TypeError("'quotechar' must be of type str")
if not isinstance(sep, str):
raise TypeError("'sep' must be of type str")
if not isinstance(batch_size, numbers.Real):
raise TypeError("'batch_size' must be a real number")
fname_ = os.path.abspath(fname)
cmd: Dict[str, Any] = {}
cmd["type_"] = "DataFrame.to_csv"
cmd["name_"] = self.name
cmd["fname_"] = fname_
cmd["quotechar_"] = quotechar
cmd["sep_"] = sep
cmd["batch_size_"] = batch_size
comm.send(cmd)
# ------------------------------------------------------------
[docs] def to_db(self, table_name, conn=None):
"""Writes the underlying data into a newly created table in the
database.
Args:
table_name (str):
Name of the table to be created.
If a table of that name already exists, it will be
replaced.
conn (:class:`~getml.database.Connection`, optional):
The database connection to be used.
If you don't explicitly pass a connection,
the engine will use the default connection.
"""
conn = conn or database.Connection()
self.refresh()
if not isinstance(table_name, str):
raise TypeError("'table_name' must be of type str")
cmd = {}
cmd["type_"] = "DataFrame.to_db"
cmd["name_"] = self.name
cmd["table_name_"] = table_name
cmd["conn_id_"] = conn.conn_id
comm.send(cmd)
# ----------------------------------------------------------------
[docs] def to_html(self, max_rows=10):
"""
Represents the data frame in HTML format, optimized for an
iPython notebook.
Args:
max_rows (int):
The maximum number of rows to be displayed.
"""
if not _exists_in_memory(self.name):
return _empty_data_frame().replace("\n", "<br>")
formatted = self._format()
formatted.max_rows = max_rows
footer = self._collect_footer_data()
return formatted._render_html(footer=footer)
# ------------------------------------------------------------
[docs] def to_json(self):
"""Creates a JSON string from the current instance.
Loads the underlying data from the getML engine and constructs
a JSON string.
"""
return self.to_pandas().to_json()
# ----------------------------------------------------------------
[docs] def to_pandas(self):
"""Creates a :py:class:`pandas.DataFrame` from the current instance.
Loads the underlying data from the getML engine and constructs
a :class:`pandas.DataFrame`.
Returns:
:class:`pandas.DataFrame`:
Pandas equivalent of the current instance including
its underlying data.
"""
return _to_arrow(self).to_pandas()
# ------------------------------------------------------------
[docs] def to_parquet(self, fname, compression="snappy"):
"""
Writes the underlying data into a newly created parquet file.
Args:
fname (str):
The name of the parquet file.
The ending ".parquet" will be added automatically.
compression (str):
The compression format to use.
Supported values are "brotli", "gzip", "lz4", "snappy", "zstd"
"""
_to_parquet(self, fname, compression)
# ----------------------------------------------------------------
[docs] def to_placeholder(self, name=None):
"""Generates a :class:`~getml.data.Placeholder` from the
current :class:`~getml.DataFrame`.
Args:
name (str, optional):
The name of the placeholder. If no
name is passed, then the name of the placeholder will
be identical to the name of the current data frame.
Returns:
:class:`~getml.data.Placeholder`:
A placeholder with the same name as this data frame.
"""
self.refresh()
return Placeholder(name=name or self.name, roles=self.roles)
# ----------------------------------------------------------------
[docs] def to_pyspark(self, spark, name=None):
"""Creates a :py:class:`pyspark.sql.DataFrame` from the current instance.
Loads the underlying data from the getML engine and constructs
a :class:`pyspark.sql.DataFrame`.
Args:
spark (:py:class:`pyspark.sql.SparkSession`):
The pyspark session in which you want to
create the data frame.
name (str or None):
The name of the temporary view to be created on top
of the :py:class:`pyspark.sql.DataFrame`,
with which it can be referred to
in Spark SQL (refer to
:py:meth:`pyspark.sql.DataFrame.createOrReplaceTempView`).
If none is passed, then the name of this
:class:`getml.DataFrame` will be used.
Returns:
:py:class:`pyspark.sql.DataFrame`:
Pyspark equivalent of the current instance including
its underlying data.
"""
return _to_pyspark(self, name, spark)
# ------------------------------------------------------------
[docs] def to_s3(self, bucket: str, key: str, region: str, sep=",", batch_size=50000):
"""
Writes the underlying data into a newly created CSV file
located in an S3 bucket.
NOTE THAT S3 IS NOT SUPPORTED ON WINDOWS.
Args:
bucket (str):
The bucket from which to read the files.
key (str):
The key in the S3 bucket in which you want to
write the output. The ending ".csv" and an optional
batch number will be added automatically.
region (str):
The region in which the bucket is located.
sep (str, optional):
The character used for separating fields.
batch_size(int, optional):
Maximum number of lines per file. Set to 0 to read
the entire data frame into a single file.
Example:
>>> getml.engine.set_s3_access_key_id("YOUR-ACCESS-KEY-ID")
>>>
>>> getml.engine.set_s3_secret_access_key("YOUR-SECRET-ACCESS-KEY")
>>>
>>> your_df.to_s3(
... bucket="your-bucket-name",
... key="filename-on-s3",
... region="us-east-2",
... sep=';'
... )
"""
self.refresh()
if not isinstance(bucket, str):
raise TypeError("'bucket' must be of type str")
if not isinstance(key, str):
raise TypeError("'fname' must be of type str")
if not isinstance(region, str):
raise TypeError("'region' must be of type str")
if not isinstance(sep, str):
raise TypeError("'sep' must be of type str")
if not isinstance(batch_size, numbers.Real):
raise TypeError("'batch_size' must be a real number")
cmd: Dict[str, Any] = {}
cmd["type_"] = "DataFrame.to_s3"
cmd["name_"] = self.name
cmd["bucket_"] = bucket
cmd["key_"] = key
cmd["region_"] = region
cmd["sep_"] = sep
cmd["batch_size_"] = batch_size
comm.send(cmd)
# ------------------------------------------------------------
@property
def _unused_float_names(self):
return [col.name for col in self._unused_float_columns]
# ------------------------------------------------------------
@property
def _unused_names(self):
return self._unused_float_names + self._unused_string_names
# ------------------------------------------------------------
@property
def _unused_string_names(self):
return [col.name for col in self._unused_string_columns]
# ------------------------------------------------------------
[docs] def unload(self):
"""
Unloads the data frame from memory.
"""
# ------------------------------------------------------------
self._delete(mem_only=True)
# ------------------------------------------------------------
[docs] def where(
self,
index: Union[
numbers.Integral, slice, BooleanColumnView, FloatColumnView, FloatColumn
],
) -> View:
"""Extract a subset of rows.
Creates a new :class:`~getml.data.View` as a
subselection of the current instance.
Args:
index (int, slice, :class:`~getml.data.columns.BooleanColumnView` or :class:`~getml.data.columns.FloatColumnView` or :class:`~getml.data.columns.FloatColumn`):
Indicates the rows you want to select.
Example:
Generate example data:
.. code-block:: python
data = dict(
fruit=["banana", "apple", "cherry", "cherry", "melon", "pineapple"],
price=[2.4, 3.0, 1.2, 1.4, 3.4, 3.4],
join_key=["0", "1", "2", "2", "3", "3"])
fruits = getml.DataFrame.from_dict(data, name="fruits",
roles={"categorical": ["fruit"], "join_key": ["join_key"], "numerical": ["price"]})
fruits
.. code-block:: pycon
| join_key | fruit | price |
| join key | categorical | numerical |
--------------------------------------
| 0 | banana | 2.4 |
| 1 | apple | 3 |
| 2 | cherry | 1.2 |
| 2 | cherry | 1.4 |
| 3 | melon | 3.4 |
| 3 | pineapple | 3.4 |
Apply where condition. This creates a new DataFrame called "cherries":
.. code-block:: python
cherries = fruits.where(
fruits["fruit"] == "cherry")
cherries
.. code-block:: pycon
| join_key | fruit | price |
| join key | categorical | numerical |
--------------------------------------
| 2 | cherry | 1.2 |
| 2 | cherry | 1.4 |
"""
if isinstance(index, numbers.Integral):
index = index if int(index) > 0 else len(self) + index
selector = arange(int(index), int(index) + 1)
return View(base=self, subselection=selector)
if isinstance(index, slice):
start, stop, _ = _make_default_slice(index, len(self))
selector = arange(start, stop, index.step)
return View(base=self, subselection=selector)
if isinstance(index, (BooleanColumnView, FloatColumn, FloatColumnView)):
return View(base=self, subselection=index)
raise TypeError("Unsupported type for a subselection: " + type(index).__name__)
# ------------------------------------------------------------
[docs] def with_column(
self, col, name, role=None, subroles=None, unit="", time_formats=None
):
"""Returns a new :class:`~getml.data.View` that contains an additional column.
Args:
col (:mod:`~getml.columns`):
The column to be added.
name (str):
Name of the new column.
role (str, optional):
Role of the new column. Must be from :mod:`getml.data.roles`.
subroles (str, List[str] or None, optional):
Subroles of the new column. Must be from :mod:`getml.data.subroles`.
unit (str, optional):
Unit of the column.
time_formats (str, optional):
Formats to be used to parse the time stamps.
This is only necessary, if an implicit conversion from
a :class:`~getml.data.columns.StringColumn` to a time
stamp is taking place.
The formats are allowed to contain the following
special characters:
* %w - abbreviated weekday (Mon, Tue, ...)
* %W - full weekday (Monday, Tuesday, ...)
* %b - abbreviated month (Jan, Feb, ...)
* %B - full month (January, February, ...)
* %d - zero-padded day of month (01 .. 31)
* %e - day of month (1 .. 31)
* %f - space-padded day of month ( 1 .. 31)
* %m - zero-padded month (01 .. 12)
* %n - month (1 .. 12)
* %o - space-padded month ( 1 .. 12)
* %y - year without century (70)
* %Y - year with century (1970)
* %H - hour (00 .. 23)
* %h - hour (00 .. 12)
* %a - am/pm
* %A - AM/PM
* %M - minute (00 .. 59)
* %S - second (00 .. 59)
* %s - seconds and microseconds (equivalent to %S.%F)
* %i - millisecond (000 .. 999)
* %c - centisecond (0 .. 9)
* %F - fractional seconds/microseconds (000000 - 999999)
* %z - time zone differential in ISO 8601 format (Z or +NN.NN)
* %Z - time zone differential in RFC format (GMT or +NNNN)
* %% - percent sign
"""
col, role, subroles = _with_column(
col, name, role, subroles, unit, time_formats
)
return View(
base=self,
added={
"col_": col,
"name_": name,
"role_": role,
"subroles_": subroles,
"unit_": unit,
},
)
# ------------------------------------------------------------
[docs] def with_name(self, name):
"""Returns a new :class:`~getml.data.View` with a new name.
Args:
name (str):
The name of the new view.
"""
return View(base=self, name=name)
# ------------------------------------------------------------
[docs] def with_role(self, cols, role, time_formats=None):
"""Returns a new :class:`~getml.data.View` with modified roles.
The difference between :meth:`~getml.DataFrame.with_role` and
:meth:`~getml.DataFrame.set_role` is that
:meth:`~getml.DataFrame.with_role` returns a view that is lazily
evaluated when needed whereas :meth:`~getml.DataFrame.set_role`
is an in-place operation. From a memory perspective, in-place operations
like :meth:`~getml.DataFrame.set_role` are preferable.
When switching from a role based on type float to a role based on type
string or vice verse, an implicit type conversion will be conducted.
The :code:`time_formats` argument is used to interpret :ref:`time
format string <annotating_roles_time_stamp>`. For more information on
roles, please refer to the :ref:`user guide <annotating>`.
Args:
cols (str, FloatColumn, StingColumn, or List[str, FloatColumn, StringColumn]):
The columns or the names thereof.
role (str):
The role to be assigned.
time_formats (str or List[str], optional):
Formats to be used to
parse the time stamps.
This is only necessary, if an implicit conversion from a StringColumn to
a time stamp is taking place.
"""
return _with_role(self, cols, role, time_formats)
# ------------------------------------------------------------
[docs] def with_subroles(self, cols, subroles, append=True):
"""Returns a new view with one or several new subroles on one or more columns.
The difference between :meth:`~getml.DataFrame.with_subroles` and
:meth:`~getml.DataFrame.set_subroles` is that
:meth:`~getml.DataFrame.with_subroles` returns a view that is lazily
evaluated when needed whereas :meth:`~getml.DataFrame.set_subroles`
is an in-place operation. From a memory perspective, in-place operations
like :meth:`~getml.DataFrame.set_subroles` are preferable.
Args:
cols (str, FloatColumn, StingColumn, or List[str, FloatColumn, StringColumn]):
The columns or the names thereof.
subroles (str or List[str]):
The subroles to be assigned.
append (bool, optional):
Whether you want to append the
new subroles to the existing subroles.
"""
return _with_subroles(self, cols, subroles, append)
# ------------------------------------------------------------
[docs] def with_unit(self, cols, unit, comparison_only=False):
"""Returns a view that contains a new unit on one or more columns.
The difference between :meth:`~getml.DataFrame.with_unit` and
:meth:`~getml.DataFrame.set_unit` is that
:meth:`~getml.DataFrame.with_unit` returns a view that is lazily
evaluated when needed whereas :meth:`~getml.DataFrame.set_unit`
is an in-place operation. From a memory perspective, in-place operations
like :meth:`~getml.DataFrame.set_unit` are preferable.
Args:
cols (str, FloatColumn, StingColumn, or List[str, FloatColumn, StringColumn]):
The columns or the names thereof.
unit (str):
The unit to be assigned.
comparison_only (bool):
Whether you want the column to
be used for comparison only. This means that the column can
only be used in comparison to other columns of the same unit.
An example might be a bank account number: The number in itself
is hardly interesting, but it might be useful to know how often
we have seen that same bank account number in another table.
If True, this will also set the
:const:`~getml.data.subroles.only.compare` subrole. The feature
learning algorithms and the feature selectors will interpret this
accordingly.
"""
return _with_unit(self, cols, unit, comparison_only)
# --------------------------------------------------------------------
_all_attr = DataFrame("dummy").__dir__()
def _custom_dir(self):
return _all_attr + self.colnames
DataFrame.__dir__ = _custom_dir # type: ignore
# --------------------------------------------------------------------