Predicting bicycle traffic with a REST API using real-time weather data

forecasting
tomorrow.io
Flask
API
ML
Python
Author

Fabian Rosenthal

Published

December 10, 2024

This post extends the project on modelling bicycle traffic in Cologne. We will host a REST API with our trained models to allow real-time predictions by querying the tomorrow.io API. Additionally, we will build a REST API with Flask to serve our models. This will allow us to make predictions with our models in a real-time setting.

This post showcases my ability to: - Think end-to-end about a machine learning project - Use the tomorrow.io API to get real-time weather forecasts - Use Flask to use the models in a REST API.

The tomorrow.io forecast API

In order to use our models in a real-time production setting, we need to think of which features can be available at inference. As the bicycle counter data is quantized in time, we can not use this data to predict, e.g., the bicycle traffic of the next hour. This is clearly a limitation of the original project idea and data set. Moreover, our prediction pipeline is based on knowing the weather data.

Here is an example, how we can get the weather data from the tomorrow.io API. You can get an API key with a free account. We will store the key in a .env file and use the python-dotenv package to load it. Don’t forget to commit .env to your .gitignore file. Let’s look at one time point from the forecast API.

Code
import os
from pprint import pprint
import dotenv
import requests
import json
from pathlib import Path
import polars as pl
dotenv.load_dotenv(Path(".env"))
tomorrow_api_key = os.getenv('TOMORROW_API_KEY')

location = '50.938361,6.959974'
forecast_api = "https://api.tomorrow.io/v4/weather/forecast"

params = {
    "location": location,
    "apikey": tomorrow_api_key
}

headers = {"accept": "application/json"}

r = requests.get(forecast_api, params=params, headers=headers)
r_data = r.json()

# save API data
with open("data/forecast_data.json", "w") as f:
    json.dump(r_data, f)

Processing the forecast data

Since the tomorrow.io API uses different feature names, we have to translate it, to be compatible with our data. However, a few columns will be missing. We can impute them by taking the mean of the training data. In practice, this will lead to worse predictions, but we will accept that for now to go through the whole process.

Code
# eval: false
#| code-fold: false
import polars as pl
from datetime import datetime

with open("data/forecast_data.json") as f:
    r_data = json.load(f)

train_data = pl.read_csv("data/train_data.csv")
inference_data = (
    pl.DataFrame(r_data["timelines"]["daily"][0]["values"])
    .select(
        "temperatureMax",
        "temperatureMin",
        "temperatureAvg",
        "rainAccumulationMax",
        "windSpeedMax",
        "windSpeedAvg",
        "cloudCoverAvg"
    )
    .with_columns(
        pl.lit("Venloer Straße").alias("location"),
        pl.col("temperatureMax").alias("air_temp_daymax_month_max"),
        pl.col("temperatureMax").alias("air_temp_daymax_month_mean"),
        pl.col("temperatureMin").alias("air_temp_daymin_month_min"),
        pl.col("temperatureMin").alias("air_temp_daymin_month_mean"),
        pl.col("temperatureAvg").alias("air_temp_daymean_month_mean"),
        pl.col("rainAccumulationMax").alias("precipitation_daymax_month_max"),
        pl.col("windSpeedMax").alias("wind_speed_daymax_month_max"),
        pl.col("windSpeedAvg").alias("wind_speed_month_mean"),
        pl.col("cloudCoverAvg").alias("sky_cov"),
        date = pl.lit(datetime.fromisoformat(r_data["timelines"]["daily"][0]["time"])).cast(pl.Date),
        sunshine_duration = train_data["sunshine_duration"].mean(),
        precipitation_month_sum = train_data["precipitation_month_sum"].mean(),
    )
)

inference_data.write_json("data/inference_data.json")

Using the models in a REST API

Let’s copy the files to another directory, where we use the models in a REST API. Flaskis a great choice to build a REST API with our trained models in a super simple way. We can follow the example provided by Muhammad Bilal Shinwari’s article on Medium (Code).

We will write two new files: app.py and client.py, the former to run the Flask app, the latter to post requests. Let’s assume, that the features are part of the request. In this way, clients can request predictions for time points of their liking. A possible other solution would be, that we assume, clients want alway predict the next possible time frame. Then we could move querying the tomorrow.io API to the Flask app. Here is what we need inside of app.py:

# app.py
import joblib
from pathlib import Path
import pandas as pd
import numpy as np
from flask import Flask, render_template, request, jsonify
from model_class import SimpleModel, MAPIEModel
from lookup import names_lookup

def process_json(json_):
    if isinstance(json_, list):
        json_ = json_[0]
    json_["location"] = names_lookup.get(json_["location"], json_["location"])
    df = pd.DataFrame(json_, index=[0])
    return df


def process_result(y_pred):
    return y_pred.round(2).astype(int).item()


def process_result_tuple(y_pis):
    return y_pis.round(2).astype(int).flatten().tolist()


model_path = Path("trained_models")
simple_path = model_path / "trained_ml_pipeline.pkl"
mapie_path = model_path / "trained_quantile_pipeline.pkl"
simple_model = joblib.load(simple_path)
mapie_model = joblib.load(mapie_path)

app = Flask(__name__)

@app.route("/predict",methods=["POST"])
def predict_simple():
    df = process_json(request.json)
    y_pred = simple_model.predict(df)
    y_pred = process_result(y_pred)
    return jsonify({"Prediction": y_pred})

@app.route("/predict-mapie",methods=["POST"])
def predict_mapie():
    df = process_json(request.json)
    y_pred, y_pis = mapie_model.predict_mapie(df)
    y_pred = process_result(y_pred)
    y_pis = process_result_tuple(y_pis)
    return jsonify({
        "Prediction": y_pred, 
        "PI": y_pis})

if __name__ == '__main__':
    app.run(debug=True)

And then client.py can look like this:

# client.py
import json
import requests

def make_request(data, url):
    response = requests.post(url, json=data)
    if response.status_code == 200:
        prediction = response.json()
        print(prediction)
    else:
        print(f'API Request Failed with Status Code: {response.status_code}')
        print(f'Response Content: {response.text}')


if __name__ == '__main__':
    # Define the URL of the Flask API
    url_simple = 'http://127.0.0.1:5000/predict'
    url_mapie = 'http://127.0.0.1:5000/predict-mapie'

    # load features for inference
    with open("data/inference_data.json") as f:
        data = json.load(f)

    make_request(data, url_simple)
    make_request(data, url_mapie)

Running the Flask app and the client

Now, you can run the Flask app with python app.py and then run the client with python client.py. You should see the predictions in the console. The terminal will show something like this on the Flask side:

* Serving Flask app 'app'
 * Debug mode: on
INFO:werkzeug:WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.
 * Running on http://127.0.0.1:5000
INFO:werkzeug:Press CTRL+C to quit
INFO:werkzeug:Press CTRL+C to quit
INFO:werkzeug: * Restarting with stat
WARNING:werkzeug: * Debugger is active!
INFO:werkzeug: * Debugger PIN: 248-803-927
INFO:werkzeug:127.0.0.1 - - [10/Dec/2024 11:39:17] "POST /predict HTTP/1.1" 200 -
INFO:werkzeug:127.0.0.1 - - [10/Dec/2024 11:39:50] "POST /predict-mapie HTTP/1.1" 200 -

And the client will show the predictions:

{'Prediction': 91762}
{'PI': [75288, 171766], 'Prediction': 124490}

So this is an extremly easy and convenient way to use our custom models in a real-time setting, where the client doesn’t have to know how the model is implemented.