Simple Local end-to-end deployment tutorial. #
This is a quick demonstration of an end-to-end machine learning deployment pipeline on a local machine. There are a few key steps involved: data processing, model training, model serialization, model serving, and front-end UI. The end result is a front-end UI that allows user to input values for the features and get a real-time prediction from the REST API. This would be considered a barebones MLOps deployment pipeline on a local machine, but captures the key steps involved in a real-world deployment.
Recommended file structure:
ml_e2e_project/
├── main.py
├── data/
│ ├── train.csv
│ └── test.csv
├── src/
│ ├── generate_data.py
│ ├── preprocess_data.py
│ ├── model_training.py
│ ├── model_serialization.py
│ ├── serve_fastapi.py
└── └── ui_streamlit.py
Required libraries: pandas numpy scikit-learn torch onnx onnxruntime fastapi uvicorn streamlit
Generate Synthetic Data (generate_data.py) #
import pandas as pd
import numpy as np
import os
def generate_data(n=500):
"""
Generate synthetic train/test CSVs under project_root/data
"""
base_dir = os.path.dirname(os.path.abspath(__file__)) # src/
project_root = os.path.dirname(base_dir) # root/
data_dir = os.path.join(project_root, "data")
os.makedirs(data_dir, exist_ok=True)
# Create synthetic data
np.random.seed(42)
X1 = np.random.randn(n)
X2 = np.random.rand(n) * 10
X3 = np.random.choice(["A", "B", "C"], size=n)
y = 3 * X1 + 0.5 * X2 + (X3 == "B") * 2 + np.random.randn(n) * 0.3
df = pd.DataFrame({"x1": X1, "x2": X2, "x3": X3, "target": y})
train = df.sample(frac=0.8, random_state=1)
test = df.drop(train.index)
train_path = os.path.join(data_dir, "train.csv")
test_path = os.path.join(data_dir, "test.csv")
train.to_csv(train_path, index=False)
test.to_csv(test_path, index=False)
return train_path, test_path
Data Preprocessing (preprocess_data.py) #
import os
import pandas as pd
import numpy as np
from sklearn.preprocessing import OneHotEncoder, StandardScaler
def preprocess_data(train_path):
"""
Reads CSV, fits OneHotEncoder + StandardScaler, returns X, y, enc, scaler
"""
df_train = pd.read_csv(train_path)
cat = df_train[["x3"]]
num = df_train[["x1", "x2"]]
enc = OneHotEncoder(sparse_output=False).fit(cat)
scaler = StandardScaler().fit(num)
X_cat = enc.transform(cat)
X_num = scaler.transform(num)
X = np.hstack([X_num, X_cat])
y = df_train["target"].values
return X, y, enc, scaler
Model Training (model_training.py) #
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader
class SimpleNN(nn.Module):
def __init__(self, in_dim):
super().__init__()
self.net = nn.Sequential(
nn.Linear(in_dim, 16),
nn.ReLU(),
nn.Linear(16, 1)
)
def forward(self, x):
return self.net(x)
def train_model(X, y, enc, scaler, epochs=20, batch_size=32, lr=0.01):
"""
Train a simple feedforward NN on preprocessed data.
Returns trained model along with enc and scaler.
"""
# Convert to tensors
X_t = torch.tensor(X, dtype=torch.float32)
y_t = torch.tensor(y, dtype=torch.float32).unsqueeze(1)
ds = TensorDataset(X_t, y_t)
dl = DataLoader(ds, batch_size=batch_size, shuffle=True)
model = SimpleNN(in_dim=X.shape[1])
loss_fn = nn.MSELoss()
opt = torch.optim.Adam(model.parameters(), lr=lr)
for epoch in range(epochs):
for xb, yb in dl:
pred = model(xb)
loss = loss_fn(pred, yb)
opt.zero_grad()
loss.backward()
opt.step()
# Save model + preprocessing
torch.save({"model": model.state_dict(), "enc": enc, "scaler": scaler}, "model.pt")
return model, enc, scaler
Model Serialization to ONNX (model_serialization.py) #
import torch
import onnx
def export_onnx(model, X_shape, path="model.onnx"):
dummy_input = torch.randn(1, X_shape[1])
torch.onnx.export(
model,
dummy_input,
path,
input_names=["input"],
output_names=["output"]
)
onnx.checker.check_model(path)
return path
Model Serving with FastAPI (serve_fastapi.py) #
from fastapi import FastAPI
import onnxruntime as ort
import numpy as np
app = FastAPI()
sess = ort.InferenceSession("model.onnx")
@app.post("/predict")
def predict(data: dict):
X = np.array(data["features"], dtype=np.float32)
pred = sess.run(None, {"input": X})[0]
return {"prediction": pred.tolist()}
Front end UI (ui_streamlit.py) #
import streamlit as st
import requests
import numpy as np
st.title("E2E ML Demo")
x1 = st.number_input("x1", value=0.0)
x2 = st.number_input("x2", value=5.0)
x3 = st.selectbox("x3", ["A", "B", "C"])
cat_map = {"A": [1, 0, 0], "B": [0, 1, 0], "C": [0, 0, 1]}
features = np.array([[x1, x2] + cat_map[x3]])
if st.button("Predict"):
res = requests.post("http://127.0.0.1:8000/predict", json={"features": features.tolist()})
st.write("Prediction:", res.json()["prediction"])
Main Script (main.py) #
import logging
import sys
import os
import subprocess
import time
import signal
# ---------- Logging ----------
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[logging.StreamHandler(sys.stdout)]
)
logger = logging.getLogger(__name__)
# ---------- Add src/ to Python path ----------
PROJECT_ROOT = os.path.dirname(os.path.abspath(__file__)) # root/
SRC_DIR = os.path.join(PROJECT_ROOT, "src")
sys.path.append(SRC_DIR)
# ---------- Import modules ----------
from generate_data import generate_data
from preprocess_data import preprocess_data
from model_training import train_model
from model_serialization import export_onnx
from serve_fastapi import app # for serving instructions
# ---------- Pipeline ----------
logger.info("Step 1: Generate data")
train_path, test_path = generate_data()
logger.info("Data generated at %s", train_path)
logger.info("Step 2: Preprocess data")
X, y, enc, scaler = preprocess_data(train_path)
logger.info("Preprocessing done. Shapes: X=%s, y=%s", X.shape, y.shape)
logger.info("Step 3: Train model")
model, enc, scaler = train_model(X, y, enc, scaler)
logger.info("Model training done.")
logger.info("Step 4: Serialize Model Object to ONNX")
onnx_path = export_onnx(model, X.shape)
logger.info("Model exported to %s", onnx_path)
# ---------- Step 5: Start FastAPI ----------
logger.info("Step 5: Start FastAPI server")
# Launch FastAPI in a separate process
fastapi_proc = subprocess.Popen(
["uvicorn", "src.serve_fastapi:app", "--reload", "--port", "8000"]
)
logging.info("Waiting 1 minute for FastAPI to start...")
time.sleep(60) # wait 60 seconds
# ---------- Step 6: Launch Streamlit UI ----------
logger.info("Step 6: Launch Streamlit")
# Launch Streamlit in a separate process
streamlit_proc = subprocess.Popen(
["streamlit", "run", "src/ui_streamlit.py"]
)
# ---------- Keep main.py alive, terminate subprocesses on Ctrl+C ----------
try:
logger.info("E2E pipeline running. Press Ctrl+C to stop both servers.")
# Wait indefinitely
while True:
time.sleep(1)
except KeyboardInterrupt:
logger.info("Ctrl+C detected. Terminating servers...")
fastapi_proc.send_signal(signal.SIGINT)
streamlit_proc.send_signal(signal.SIGINT)
fastapi_proc.wait()
streamlit_proc.wait()
logger.info("Both servers terminated. Exiting.")