# Copyright 2021 The SQLNet Company GmbH
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to
# deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
# DEALINGS IN THE SOFTWARE.
"""
Helper functions that depend on Pipeline.
"""
import json
import getml.communication as comm
from getml.data import DataModel
from getml.data.helpers import _remove_trailing_underscores
from .pipeline import Pipeline
# --------------------------------------------------------------------
def _make_dummy(name):
data_model = DataModel("dummy")
pipeline = Pipeline(data_model=data_model)
pipeline._id = name
return pipeline
# --------------------------------------------------------------------
def _from_json(json_obj):
pipe = _make_dummy("dummy")
pipe._parse_json_obj(json_obj)
return pipe
# --------------------------------------------------------------------
def _refresh_all():
cmd = dict()
cmd["type_"] = "Pipeline.refresh_all"
cmd["name_"] = ""
# ----------------------------------------------------------------
sock = comm.send_and_get_socket(cmd)
# ----------------------------------------------------------------
msg = comm.recv_string(sock)
if msg != "Success!":
comm.engine_exception_handler(msg)
# ----------------------------------------------------------------
json_str = comm.recv_string(sock)
# ----------------------------------------------------------------
sock.close()
# ----------------------------------------------------------------
json_obj = json.loads(json_str)
return [_from_json(obj) for obj in json_obj["pipelines"]]
# --------------------------------------------------------------------
[docs]def list_pipelines():
"""Lists all pipelines present in the engine.
Note that this function only lists pipelines which are part of the
current project. See :func:`~getml.engine.set_project` for
changing projects and :mod:`~getml.pipelines` for more details about
the lifecycles of the pipelines.
To subsequently load one of them, use
:func:`~getml.pipeline.load`.
Returns:
list containing the names of all pipelines.
"""
cmd = dict()
cmd["type_"] = "list_pipelines"
cmd["name_"] = ""
# ----------------------------------------------------------------
sock = comm.send_and_get_socket(cmd)
# ----------------------------------------------------------------
msg = comm.recv_string(sock)
if msg != "Success!":
comm.engine_exception_handler(msg)
# ----------------------------------------------------------------
json_str = comm.recv_string(sock)
# ----------------------------------------------------------------
sock.close()
# ----------------------------------------------------------------
return json.loads(json_str)["names"]
# --------------------------------------------------------------------
[docs]def load(name):
"""Loads a pipeline from the getML engine into Python.
Args:
name: The name of the pipeline to be loaded.
Returns:
A :meth:`~getml.Pipeline` that is a handler
for the pipeline signified by name.
"""
return _make_dummy(name).refresh()
# --------------------------------------------------------------------
[docs]def exists(name):
"""
Returns true if a pipeline named 'name' exists.
Args:
name (str):
Name of the pipeline.
"""
if not isinstance(name, str):
raise TypeError("'name' must be of type str")
all_pipelines = list_pipelines()
return name in all_pipelines
# --------------------------------------------------------------------
[docs]def delete(name):
"""
If a pipeline named 'name' exists, it is deleted.
Args:
name (str):
Name of the pipeline.
"""
if not isinstance(name, str):
raise TypeError("'name' must be of type str")
if exists(name):
_make_dummy(name).delete()
# --------------------------------------------------------------------