Provide generic prediction endpoint via FastAPI

A common way to communicate with resources is via REST-APIs. Under Python FastAPI is a well known web framework package to build web-APIs.

The following shows an example how easy pipelines in a project can be made accessible via endpoints in FastAPI.

It is assumed that you have some basic knowledge of FastAPI and the getML framework.

Helpfull resources to get started:

This integration example requires at least v1.3.1 of the getml package and at least Python 3.8.

Example Data

As an example project we first run the demo notebook “Loan default prediction” which creates a project named “loans” in the getML engine.

Code Explained

First, import the necessary packages, creaet a FastAPI-App app. If, the enigne isn’t running yet (getml.engine.is_alive()) launch the getML engine (getml.engine.launch()). The launch_browser=False option prevents the browser to be opened when the engine spins up. Further, direct the engine to load and set the previously created project “loans” (getml.engine.set_project()).

 1from typing import Dict, List, Optional, Union
 2
 3from fastapi import FastAPI, HTTPException
 4from pydantic import BaseModel
 5from uvicorn import run
 6
 7from getml import engine, pipeline, Pipeline, DataFrame
 8
 9app: FastAPI = FastAPI()
10
11if not engine.is_alive():
12    engine.launch(launch_browser=False)
13engine.set_project("loans")

Create the first GET endpoint which returns a list with all pipeline s present (getml.pipeline.list_pipelines()) in the project. The list itself will only contain the names of the pipelines and no additional metainformation. For sake of simplicity of the tutorial pagination is left out.

16@app.get("/pipeline")
17async def get_pipeline() -> List[str]:
18    return pipeline.list_pipelines()

The following is required to start the app with uvicorn. Run your Python code and test the endpoint via localhost:8080/pipeline.

104if __name__ == "__main__":
105    run(app, host="0.0.0.0", port=8000)

To expand the functionality, add another informative GET endpoint for a single pipeline. The pipeline_id can be retrieved from the previously created GET endpoint. The existence of the pipeline can be checked using getml.pipeline.exists(). After the existence validation the engine must be directed to laod the pipeline identified with the provided pipeline_id. Information of interest could be the name of the population data frame and peripheral data frames, the applied preprocessors, used feature learners and selectors and target predictors. Those information can be retrieved from the member variable metadata of the pipeline (pipeline_.metadata) and the pipeline itself. Again this endpoint can be tested by running your code and invoking the endpoint localhost:8080/pipeline/a1b2c3 assuming that the previously created pipeline has the id a1b2c3.

21@app.get("/pipeline/{pipeline_id}")
22async def get_pipeline_pipeline_id(pipeline_id: str) -> Dict[str, Union[str, List[str]]]:
23    if not pipeline.exists(pipeline_id):
24        raise HTTPException(status_code=404, detail=f'Pipeline {pipeline_id} not found.')
25
26    pipeline_ = pipeline.load(pipeline_id)
27
28    if pipeline_.metadata is None:
29        raise HTTPException(status_code=409,
30                            detail='The data schema is missing or pipeline is incomplete')
31
32    meta_data = pipeline_.metadata
33    metadata: Dict[str, Union[str, List[str]]] = {}
34    metadata["data_model"] = meta_data.population.name
35    metadata["peripheral"] = [_.name for _ in meta_data.peripheral]
36    metadata["preprocessors"] = [_.type for _ in pipeline_.preprocessors]
37    metadata["feature_learners"] = [_.type for _ in pipeline_.feature_learners]
38    metadata["feature_selectors"] = [_.type for _ in pipeline_.feature_selectors]
39    metadata["predictors"] = [_.type for _ in pipeline_.predictors]
40
41    return metadata

To create the prediction endpoint the data scheme for the request body needs to be created first. For a prediction the getML engine requires multiple data sets, the population data set population and any related peripheral data set peripheral based on the Data model of the pipeline. The peripheral data sets can be either a list or a dictionary where the order of the data sets in the list needs to match the order returned by [_.name for _ in getml.pipeline.metadata.peripheral]. This information can also be retriefed by calling the previously created GET endpoint.

44class PredictionBody(BaseModel):
45    peripheral: Union[List[Dict[str, List]], Dict[str, Dict[str, List]]]
46    population: Dict[str, List]

Next up, implement the POST endpoint which accepts data to task the engine to make a prediction. Validate that the pipeline exist, load the pipeline (getml.pipeline.load()), and validate that the pipeline has been finalized.

49@app.post("/pipeline/{pipeline_id}/predict")
50async def post_project_predict(pipeline_id: str, body: PredictionBody) -> Optional[List]:
51    if not pipeline.exists(pipeline_id):
52        raise HTTPException(status_code=404,
53                            detail=f'Pipeline {pipeline_id} not found.')
54
55    pipeline_: Pipeline = pipeline.load(pipeline_id)
56
57    if pipeline_.metadata is None:
58        raise HTTPException(status_code=409,
59                            detail='The data schema is missing or pipeline is incomplete')

The request body should contain both the population and peripheral data. Check that the population in the request body contains any content. Create a data frame from the dictionary (getml.DataFrame.from_dict()): the name of the data frame must not colide with an existing data frame in the pipeline, the roles of the population can be obtained from the pipeline, using pipeline_.metadata.population.roles.

62    if not body.population:
63        raise HTTPException(status_code=400, detail='Missing population data.')
64
65    population_data_frame = DataFrame.from_dict(name='future',
66                                                roles=pipeline_.metadata.population.roles,
67                                                data=body.population)

The peripheral can be submitted in the request body both as list and dictionary. Check that in case the peripheral data sets are received as dictionaries that the names of all required peripheral data sets exist in the dictonary keys, and in case the peripheral data sets are received as a list check that the length of the list matches the number of peripheral data sets in the pipeline. After, create a list of data frames of the peripheral data. Again, ensure that the names of the created data frames do not collide with existing data frames and use the roles defined in the pipeline for the peripheral data sets (pipeline_.metadata.peripheral[i].roles).

70    peripheral_names = [_.name for _ in pipeline_.peripheral]
71
72    if isinstance(body.peripheral, dict):
73        if set(peripheral_names) - set(body.peripheral.keys()):
74            raise HTTPException(
75                status_code=400,
76                detail=f'Missing peripheral data, expected {peripheral_names}')
77        periperal_raw_data = body.peripheral
78    else:
79        if len(peripheral_names) != len(body.peripheral):
80            raise HTTPException(
81                status_code=400,
82                detail=f"Expected {len(pipeline_.peripheral)} peripheral data frames.")
83        periperal_raw_data = dict(zip(peripheral_names, body.peripheral))
84
85    peripheral_data_frames = [
86        DataFrame.from_dict(name=name + '_predict',
87                            data=periperal_raw_data[name],
88                            roles=pipeline_.metadata.peripheral[i].roles)
89        for i, name in enumerate(peripheral_names)
90    ]

This leaves the actual call to the engine to make a prediction (getml.Pipeline.predict()) using the previously created population data frame and peripheral data frames. The predicted target value is a numpy array and returned transformed to a list as request response.

 93    prediction = pipeline_.predict(
 94        population_table=population_data_frame,
 95        peripheral_tables=peripheral_data_frames
 96    )
 97
 98    if prediction:
 99        return prediction.tolist()
100
101    raise HTTPException(status_code=500, detail='GetML-Engine didn\'t return a result.')

This endpoint can be called on localhost:8080/pipeline/a1b2c3/predict. where the body needs the form:

 1{
 2    "peripheral": [{
 3        "column_1": [2.4, 3.0, 1.2, 1.4, 2.2],
 4        "column_2": ["a", "b", "a", "b", "b"]
 5    }],
 6    "population": {
 7        "column_1": [0.2, 0.1],
 8        "column_2": ["a", "b"]
 9        "time_stamp": ["2010-01-01 12:30:00", "2010-01-01 23:30:00"]
10    }
11}

Example json data can be extraced from the notebook using the following code snippet at the end of the notebook used to create the Example Data.

 1from typing import Union, Any
 2from datetime import datetime
 3from json import dumps
 4
 5
 6def handle_timestamp(x: Union[Any, datetime]):
 7    if isinstance(x, datetime):
 8        return x.strftime(r'%Y-%m-%d %H:%M:%S')
 9
10
11pd_population_test = population_test.to_pandas()
12account_id = pd_population_test.iloc[0]["account_id"]
13populaton_dict = pd_population_test[pd_population_test["account_id"] == account_id].to_dict()
14populaton_json = dumps({k: list(v.values()) for k, v in populaton_dict.items()}, default=handle_timestamp)
15pd_peripherals = {_.name: _.to_pandas() for _ in [order, trans, meta]}
16peripheral_dict = {k: v[v["account_id"] == account_id].to_dict() for k, v in pd_peripherals.items()}
17peripheral_json = dumps(
18    {k: {vk: list(vv.values()) for vk, vv in v.items()} for k, v in peripheral_dict.items()},
19    default=handle_timestamp)
20populaton_json
21peripheral_json

Conclusion

With only a few lines it is possible to create a web API to make project pipelines accessible and request target predictions for provided population and peripheral data.