Source code for getml.pipeline.sql_code

# Copyright 2022 The SQLNet Company GmbH

# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to
# deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:

# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.

# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
# DEALINGS IN THE SOFTWARE.

"""
Custom class for handling the SQL code of the features.
"""

from __future__ import annotations

import re
import shutil
from pathlib import Path
from typing import Iterator, Sequence, Union

from getml.data.helpers import _is_typed_list

from .dialect import AllDialects, _drop_table, _table_pattern, sqlite3
from .helpers import _edit_windows_filename
from .sql_string import SQLString


[docs]class SQLCode: """ Custom class for handling the SQL code of the features generated by the pipeline. Example: .. code-block:: python sql_code = my_pipeline.features.to_sql() # You can access individual features # by index. feature_1_1 = sql_code[0] # You can also access them by name. feature_1_10 = sql_code["FEATURE_1_10"] # You can also type the name of # a table or column to find all # features related to that table # or column. features = sql_code.find("SOME_TABLE") # HINT: The generated SQL code always # escapes table and column names using # quotation marks. So if you want exact # matching, you can do this: features = sql_code.find('"SOME_TABLE"') """ def __init__( self, code: Sequence[Union[str, SQLString]], dialect: AllDialects = sqlite3, ) -> None: if not _is_typed_list(code, str): raise TypeError("'code' must be a list of str.") self.code = [SQLString(elem) for elem in code] self.dialect = dialect self.tables = re.findall(_table_pattern(self.dialect), "".join(code)) def __getitem__(self, key: Union[int, slice, str]) -> Union[SQLCode, SQLString]: if isinstance(key, int): return self.code[key] if isinstance(key, slice): return SQLCode(self.code[key], self.dialect) if isinstance(key, str): if key.upper() in self.tables: return self.find(_drop_table(self.dialect, key))[0] return SQLString("") raise TypeError( "Features can only be indexed by: int, slices, " f"or str, not {type(key).__name__}" ) def __iter__(self) -> Iterator[SQLString]: yield from self.code def __len__(self) -> int: return len(self.code) def __repr__(self) -> str: return "\n\n\n".join(self.code) def _repr_markdown_(self) -> str: return "```sql\n" + self.__repr__() + "\n```"
[docs] def find(self, keyword: str) -> SQLCode: """ Returns the SQLCode for all features containing the keyword. Args: keyword (str): The keyword to be found. """ if not isinstance(keyword, str): raise TypeError("'keyword' must be a str.") return SQLCode([elem for elem in self.code if keyword in elem], self.dialect)
[docs] def save(self, fname: str, split: bool = True) -> None: """ Saves the SQL code to a file. Args: fname (str): The name of the file or folder (if `split` is True) in which you want to save the features. split (bool): If True, the code will be split into multiple files, one for each feature and saved into a folder `fname`. """ if not split: with open(fname, "w", encoding="utf-8") as sqlfile: sqlfile.write(str(self)) return directory = Path(fname) if directory.exists(): shutil.rmtree(fname, ignore_errors=True) directory.mkdir() for index, code in enumerate(self.code, 1): match = re.search(_table_pattern(self.dialect), str(code)) name = match.group(1).lower() if match else "feature" name = _edit_windows_filename(name).replace(".", "_").replace("`", "") file_path = directory / f"{index:04d}_{name}.sql" with open(file_path, "w", encoding="utf-8") as sqlfile: sqlfile.write(str(code))
[docs] def to_str(self) -> str: """ Returns a raw string representation of the SQL code. """ return str(self)