# Copyright 2021 The SQLNet Company GmbH
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to
# deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
# DEALINGS IN THE SOFTWARE.
"""
Custom class for handling the SQL code of the features.
"""
import re
import shutil
from pathlib import Path
from getml.data.helpers import _is_typed_list
from .dialect import _drop_table, _table_pattern, sqlite3
from .helpers import _edit_windows_filename
from .sql_string import SQLString
[docs]class SQLCode:
"""
Custom class for handling the SQL code of the
features generated by the pipeline.
Example:
.. code-block:: python
sql_code = my_pipeline.features.to_sql()
# You can access individual features
# by index.
feature_1_1 = sql_code[0]
# You can also access them by name.
feature_1_10 = sql_code["FEATURE_1_10"]
# You can also type the name of
# a table or column to find all
# features related to that table
# or column.
features = sql_code.find("SOME_TABLE")
# HINT: The generated SQL code always
# escapes table and column names using
# quotation marks. So if you want exact
# matching, you can do this:
features = sql_code.find('"SOME_TABLE"')
"""
def __init__(self, code, dialect=sqlite3):
if not _is_typed_list(code, str):
raise TypeError("'code' must be a list of str.")
self.code = [SQLString(elem) for elem in code]
self.dialect = dialect
self.tables = re.findall(_table_pattern(self.dialect), "".join(code))
def __getitem__(self, key):
if isinstance(key, int):
return self.code[key]
if isinstance(key, slice):
return SQLCode(self.code[key], self.dialect)
if isinstance(key, str):
if key.upper() in self.tables:
return self.find(_drop_table(self.dialect, key))[0]
return SQLString("")
raise TypeError(
"Features can only be indexed by: int, slices, "
+ f"or str, not {type(key).__name__}"
)
def __len__(self):
return len(self.code)
def __repr__(self):
return "\n\n\n".join(self.code)
def _repr_markdown_(self):
return "```sql\n" + self.__repr__() + "\n```"
[docs] def find(self, keyword):
"""
Returns the SQLCode for all features
containing the keyword.
Args:
keyword (str): The keyword to be found.
"""
if not isinstance(keyword, str):
raise TypeError("'keyword' must be a str.")
return SQLCode([elem for elem in self.code if keyword in elem], self.dialect)
[docs] def save(self, fname, split=True):
"""
Saves the SQL code to a file.
Args:
fname (str):
The name of the file or folder (if `split` is True)
in which you want to save the features.
split (bool):
If True, the code will be split into multiple files, one for
each feature and saved into a folder `fname`.
"""
if not split:
with open(fname, "w") as file:
file.write(self.__repr__())
return
directory = Path(fname)
if directory.exists():
shutil.rmtree(fname, ignore_errors=True)
directory.mkdir()
for index, code in enumerate(self.code, 1):
match = re.search(_table_pattern(self.dialect), str(code))
name = match.group(1).lower() # pytype: disable=attribute-error
name = _edit_windows_filename(name)
file_path = directory / f"{index:04d}_{name}.sql"
with open(file_path, "w") as f:
f.write(str(code))
[docs] def to_str(self):
"""
Returns a raw string representation of the SQL code.
"""
return str(self)