Provide generic prediction endpoint via FastAPI¶
A common way to communicate with resources is via REST-APIs. Under Python FastAPI is a well known web framework package to build web-APIs.
The following shows an example how easy pipelines in a project can be made accessible via endpoints in FastAPI.
It is assumed that you have some basic knowledge of FastAPI and the getML framework.
- Helpfull resources to get started:
This integration example requires at least v1.3.1 of the getml package and at least Python 3.8.
Example Data¶
As an example project we first run the demo notebook “Loan default prediction” which creates a project named “loans” in the getML engine.
Code Explained¶
First, import the necessary packages, creaet a FastAPI-App app
. If,
the enigne isn’t running yet (getml.engine.is_alive()
) launch the getML
engine (getml.engine.launch()
). The launch_browser=False
option
prevents the browser to be opened when the engine spins up. Further, direct the
engine to load and set the previously created project
“loans” (getml.engine.set_project()
).
1from typing import Dict, List, Optional, Union
2
3from fastapi import FastAPI, HTTPException
4from pydantic import BaseModel
5from uvicorn import run
6
7from getml import engine, pipeline, Pipeline, DataFrame
8
9app: FastAPI = FastAPI()
10
11if not engine.is_alive():
12 engine.launch(launch_browser=False)
13engine.set_project("loans")
Create the first GET endpoint which returns a list with all
pipeline
s present (getml.pipeline.list_pipelines()
) in the
project. The list itself will only contain the names of the pipelines and no
additional metainformation. For sake of simplicity of the tutorial pagination is
left out.
16@app.get("/pipeline")
17async def get_pipeline() -> List[str]:
18 return pipeline.list_pipelines()
The following is required to start the app with uvicorn. Run your Python code and test the endpoint via localhost:8080/pipeline.
104if __name__ == "__main__":
105 run(app, host="0.0.0.0", port=8000)
To expand the functionality, add another informative GET endpoint for a single
pipeline. The pipeline_id
can be retrieved from the previously
created GET endpoint. The existence of the pipeline can be checked using
getml.pipeline.exists()
. After the existence validation the engine must be
directed to laod the pipeline identified with the provided
pipeline_id
. Information of interest could be the
name of the population data frame and peripheral data frames, the applied
preprocessors, used feature learners and selectors and target predictors. Those
information can be retrieved from the member variable metadata
of the
pipeline (pipeline_.metadata
) and the pipeline itself.
Again this endpoint can be tested by running your code and invoking the endpoint
localhost:8080/pipeline/a1b2c3 assuming that
the previously created pipeline has the id a1b2c3
.
21@app.get("/pipeline/{pipeline_id}")
22async def get_pipeline_pipeline_id(pipeline_id: str) -> Dict[str, Union[str, List[str]]]:
23 if not pipeline.exists(pipeline_id):
24 raise HTTPException(status_code=404, detail=f'Pipeline {pipeline_id} not found.')
25
26 pipeline_ = pipeline.load(pipeline_id)
27
28 if pipeline_.metadata is None:
29 raise HTTPException(status_code=409,
30 detail='The data schema is missing or pipeline is incomplete')
31
32 meta_data = pipeline_.metadata
33 metadata: Dict[str, Union[str, List[str]]] = {}
34 metadata["data_model"] = meta_data.population.name
35 metadata["peripheral"] = [_.name for _ in meta_data.peripheral]
36 metadata["preprocessors"] = [_.type for _ in pipeline_.preprocessors]
37 metadata["feature_learners"] = [_.type for _ in pipeline_.feature_learners]
38 metadata["feature_selectors"] = [_.type for _ in pipeline_.feature_selectors]
39 metadata["predictors"] = [_.type for _ in pipeline_.predictors]
40
41 return metadata
To create the prediction endpoint the data scheme for the request body needs to
be created first. For a prediction the getML engine requires multiple data sets,
the population data set population
and any related peripheral data set
peripheral
based on the Data model of the pipeline. The
peripheral data sets can be either a list or a dictionary where the order of the
data sets in the list needs to match the order returned by
[_.name for _ in getml.pipeline.metadata.peripheral]
. This information
can also be retriefed by calling the previously created GET endpoint.
44class PredictionBody(BaseModel):
45 peripheral: Union[List[Dict[str, List]], Dict[str, Dict[str, List]]]
46 population: Dict[str, List]
Next up, implement the POST endpoint which accepts data to task the engine to
make a prediction. Validate that the pipeline exist, load the pipeline
(getml.pipeline.load()
), and validate that the pipeline has been
finalized.
49@app.post("/pipeline/{pipeline_id}/predict")
50async def post_project_predict(pipeline_id: str, body: PredictionBody) -> Optional[List]:
51 if not pipeline.exists(pipeline_id):
52 raise HTTPException(status_code=404,
53 detail=f'Pipeline {pipeline_id} not found.')
54
55 pipeline_: Pipeline = pipeline.load(pipeline_id)
56
57 if pipeline_.metadata is None:
58 raise HTTPException(status_code=409,
59 detail='The data schema is missing or pipeline is incomplete')
The request body should contain both the population and peripheral data. Check
that the population in the request body contains any content. Create a
data frame from the dictionary (getml.DataFrame.from_dict()
): the name of
the data frame must not colide with an existing data frame in the pipeline, the
roles of the population can be obtained from the pipeline, using
pipeline_.metadata.population.roles
.
62 if not body.population:
63 raise HTTPException(status_code=400, detail='Missing population data.')
64
65 population_data_frame = DataFrame.from_dict(name='future',
66 roles=pipeline_.metadata.population.roles,
67 data=body.population)
The peripheral can be submitted in the request body both as list and dictionary.
Check that in case the peripheral data sets are received as dictionaries that
the names of all required peripheral data sets exist in the dictonary keys, and
in case the peripheral data sets are received as a list check that the length of
the list matches the number of peripheral data sets in the pipeline. After,
create a list of data frames of the peripheral data. Again, ensure that the
names of the created data frames do not collide with existing data frames and
use the roles defined in the pipeline for the peripheral data sets
(pipeline_.metadata.peripheral[i].roles
).
70 peripheral_names = [_.name for _ in pipeline_.peripheral]
71
72 if isinstance(body.peripheral, dict):
73 if set(peripheral_names) - set(body.peripheral.keys()):
74 raise HTTPException(
75 status_code=400,
76 detail=f'Missing peripheral data, expected {peripheral_names}')
77 periperal_raw_data = body.peripheral
78 else:
79 if len(peripheral_names) != len(body.peripheral):
80 raise HTTPException(
81 status_code=400,
82 detail=f"Expected {len(pipeline_.peripheral)} peripheral data frames.")
83 periperal_raw_data = dict(zip(peripheral_names, body.peripheral))
84
85 peripheral_data_frames = [
86 DataFrame.from_dict(name=name + '_predict',
87 data=periperal_raw_data[name],
88 roles=pipeline_.metadata.peripheral[i].roles)
89 for i, name in enumerate(peripheral_names)
90 ]
This leaves the actual call to the engine to make a prediction
(getml.Pipeline.predict()
) using the previously created population data
frame and peripheral data frames. The predicted target value is a numpy array
and returned transformed to a list as request response.
93 prediction = pipeline_.predict(
94 population_table=population_data_frame,
95 peripheral_tables=peripheral_data_frames
96 )
97
98 if prediction:
99 return prediction.tolist()
100
101 raise HTTPException(status_code=500, detail='GetML-Engine didn\'t return a result.')
This endpoint can be called on localhost:8080/pipeline/a1b2c3/predict. where the body needs the form:
1{
2 "peripheral": [{
3 "column_1": [2.4, 3.0, 1.2, 1.4, 2.2],
4 "column_2": ["a", "b", "a", "b", "b"]
5 }],
6 "population": {
7 "column_1": [0.2, 0.1],
8 "column_2": ["a", "b"]
9 "time_stamp": ["2010-01-01 12:30:00", "2010-01-01 23:30:00"]
10 }
11}
Example json data can be extraced from the notebook using the following code snippet at the end of the notebook used to create the Example Data.
1from typing import Union, Any
2from datetime import datetime
3from json import dumps
4
5
6def handle_timestamp(x: Union[Any, datetime]):
7 if isinstance(x, datetime):
8 return x.strftime(r'%Y-%m-%d %H:%M:%S')
9
10
11pd_population_test = population_test.to_pandas()
12account_id = pd_population_test.iloc[0]["account_id"]
13populaton_dict = pd_population_test[pd_population_test["account_id"] == account_id].to_dict()
14populaton_json = dumps({k: list(v.values()) for k, v in populaton_dict.items()}, default=handle_timestamp)
15pd_peripherals = {_.name: _.to_pandas() for _ in [order, trans, meta]}
16peripheral_dict = {k: v[v["account_id"] == account_id].to_dict() for k, v in pd_peripherals.items()}
17peripheral_json = dumps(
18 {k: {vk: list(vv.values()) for vk, vv in v.items()} for k, v in peripheral_dict.items()},
19 default=handle_timestamp)
20populaton_json
21peripheral_json
Conclusion¶
With only a few lines it is possible to create a web API to make project pipelines accessible and request target predictions for provided population and peripheral data.