# Copyright 2022 The SQLNet Company GmbH
#
# This file is licensed under the Elastic License 2.0 (ELv2).
# Refer to the LICENSE.txt file in the root of the repository
# for details.
#
"""
Sniffs a list of CSV files located in an S3 bucket.
"""
from typing import Any, Dict, List, Optional
import getml.communication as comm
from .connection import Connection
[docs]def sniff_s3(
name: str,
bucket: str,
keys: List[str],
region: str,
num_lines_sniffed: int = 1000,
sep: str = ",",
skip: int = 0,
colnames: Optional[List[str]] = None,
conn: Optional[Connection] = None,
):
"""
Sniffs a list of CSV files located in an S3 bucket.
Example:
Let's assume you have two CSV files - *file1.csv* and
*file2.csv* - in the bucket. You can
import their data into the getML engine using the following
commands:
>>> getml.engine.set_s3_access_key_id("YOUR-ACCESS-KEY-ID")
>>>
>>> getml.engine.set_s3_secret_access_key("YOUR-SECRET-ACCESS-KEY")
>>>
>>> stmt = data.database.sniff_s3(
... bucket="your-bucket-name",
... keys=["file1.csv", "file2.csv"],
... region="us-east-2",
... name="MY_TABLE",
... sep=';'
... )
You can also set the access credential as environment variables
before you launch the getML engine.
Args:
name (str):
Name of the table in which the data is to be inserted.
bucket (str):
The bucket from which to read the files.
keys (List[str]):
The list of keys (files in the bucket) to be read.
region (str):
The region in which the bucket is located.
num_lines_sniffed (int, optional):
Number of lines analyzed by the sniffer.
sep (str, optional):
The character used for separating fields.
skip (int, optional):
Number of lines to skip at the beginning of each file.
colnames(List[str] or None, optional):
The first line of a CSV file
usually contains the column names. When this is not the case, you need to
explicitly pass them.
conn (:class:`~getml.database.Connection`, optional):
The database connection to be used.
If you don't explicitly pass a connection,
the engine will use the default connection.
Returns:
str: Appropriate `CREATE TABLE` statement.
"""
conn = conn or Connection()
cmd: Dict[str, Any] = {}
cmd["name_"] = name
cmd["type_"] = "Database.sniff_s3"
cmd["bucket_"] = bucket
cmd["keys_"] = keys
cmd["num_lines_sniffed_"] = num_lines_sniffed
cmd["region_"] = region
cmd["sep_"] = sep
cmd["skip_"] = skip
cmd["conn_id_"] = conn.conn_id
if colnames is not None:
cmd["colnames_"] = colnames
with comm.send_and_get_socket(cmd) as sock:
msg = comm.recv_string(sock)
if msg != "Success!":
sock.close()
comm.engine_exception_handler(msg)
return comm.recv_string(sock)