MLflow
Vue d'ensemble
MLflow est une plateforme open-source pour la gestion du cycle de vie complet du Machine Learning, de l'expérimentation au déploiement en production, développée par Databricks.
Philosophie
"The open source platform for the machine learning lifecycle - Gérez vos expériences, modèles et déploiements ML de manière reproductible."
Composants principaux
MLflow Tracking
- Experiments : Organisation des runs d'entraînement
- Metrics : Suivi des performances (accuracy, loss, etc.)
- Parameters : Hyperparamètres et configuration
- Artifacts : Modèles, datasets, visualisations
MLflow Models
- Format standard : Packaging des modèles ML
- Multi-framework : PyTorch, TensorFlow, Scikit-learn
- Serving : Déploiement API REST automatique
- Registry : Gestion versions et cycles de vie
MLflow Projects
- Reproductibilité : Environnements et dépendances
- Format standard : MLproject.yaml
- Execution : Local, Docker, Kubernetes
- Parameters : Configuration externalisée
Tracking d'expériences
Exemple avec PyTorch
import mlflow
import mlflow.pytorch
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from sklearn.metrics import accuracy_score
# Configuration MLflow
mlflow.set_experiment("image-classification")
mlflow.set_tracking_uri("http://mlflow-server:5000")
class ImageClassifier(nn.Module):
def __init__(self, num_classes=10):
super().__init__()
self.conv1 = nn.Conv2d(3, 32, 3, padding=1)
self.conv2 = nn.Conv2d(32, 64, 3, padding=1)
self.pool = nn.MaxPool2d(2, 2)
self.fc1 = nn.Linear(64 * 8 * 8, 512)
self.fc2 = nn.Linear(512, num_classes)
self.relu = nn.ReLU()
self.dropout = nn.Dropout(0.5)
def forward(self, x):
x = self.pool(self.relu(self.conv1(x)))
x = self.pool(self.relu(self.conv2(x)))
x = x.view(-1, 64 * 8 * 8)
x = self.relu(self.fc1(x))
x = self.dropout(x)
x = self.fc2(x)
return x
def train_model():
# Hyperparamètres
params = {
"learning_rate": 0.001,
"batch_size": 32,
"epochs": 50,
"dropout": 0.5,
"optimizer": "adam"
}
with mlflow.start_run():
# Log des paramètres
mlflow.log_params(params)
# Modèle et optimizer
model = ImageClassifier(num_classes=10)
optimizer = torch.optim.Adam(model.parameters(), lr=params["learning_rate"])
criterion = nn.CrossEntropyLoss()
# Training loop
for epoch in range(params["epochs"]):
model.train()
total_loss = 0
for batch_idx, (data, target) in enumerate(train_loader):
optimizer.zero_grad()
output = model(data)
loss = criterion(output, target)
loss.backward()
optimizer.step()
total_loss += loss.item()
# Validation
model.eval()
val_loss = 0
predictions = []
targets = []
with torch.no_grad():
for data, target in val_loader:
output = model(data)
val_loss += criterion(output, target).item()
pred = output.argmax(dim=1)
predictions.extend(pred.cpu().numpy())
targets.extend(target.cpu().numpy())
# Métriques
accuracy = accuracy_score(targets, predictions)
avg_train_loss = total_loss / len(train_loader)
avg_val_loss = val_loss / len(val_loader)
# Log des métriques
mlflow.log_metrics({
"train_loss": avg_train_loss,
"val_loss": avg_val_loss,
"accuracy": accuracy
}, step=epoch)
print(f"Epoch {epoch}: Train Loss: {avg_train_loss:.4f}, "
f"Val Loss: {avg_val_loss:.4f}, Accuracy: {accuracy:.4f}")
# Sauvegarde du modèle
mlflow.pytorch.log_model(
model,
"model",
registered_model_name="image-classifier"
)
# Artifacts additionnels
torch.save(model.state_dict(), "model_weights.pth")
mlflow.log_artifact("model_weights.pth")
return model
# Exécution
trained_model = train_model()
Intégration avec OpenCV
import mlflow
import cv2
import numpy as np
from sklearn.metrics import classification_report
class ImagePreprocessor:
def __init__(self, target_size=(224, 224)):
self.target_size = target_size
def preprocess_batch(self, image_paths):
processed_images = []
for path in image_paths:
# Lecture avec OpenCV
image = cv2.imread(path)
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
# Redimensionnement
image = cv2.resize(image, self.target_size)
# Normalisation
image = image.astype(np.float32) / 255.0
processed_images.append(image)
return np.array(processed_images)
def experiment_with_preprocessing():
with mlflow.start_run(run_name="preprocessing-experiment"):
# Paramètres de preprocessing
preprocessing_params = {
"target_size": (224, 224),
"normalization": "0-1",
"augmentation": True,
"blur_kernel": 3
}
mlflow.log_params(preprocessing_params)
# Preprocessing avec OpenCV
preprocessor = ImagePreprocessor(
target_size=preprocessing_params["target_size"]
)
# Traitement des images
train_images = preprocessor.preprocess_batch(train_image_paths)
val_images = preprocessor.preprocess_batch(val_image_paths)
# Entraînement du modèle
model = train_with_processed_data(train_images, val_images)
# Métriques finales
predictions = model.predict(val_images)
report = classification_report(val_labels, predictions, output_dict=True)
mlflow.log_metrics({
"final_accuracy": report["accuracy"],
"precision": report["macro avg"]["precision"],
"recall": report["macro avg"]["recall"],
"f1_score": report["macro avg"]["f1-score"]
})
# Sauvegarde des artifacts
mlflow.log_artifact("preprocessing_stats.json")
mlflow.log_artifact("confusion_matrix.png")
Model Registry
Gestion des versions
from mlflow.tracking import MlflowClient
client = MlflowClient()
# Enregistrement d'un modèle
def register_model(model_name, run_id):
model_version = mlflow.register_model(
f"runs:/{run_id}/model",
model_name
)
return model_version
# Transition vers production
def promote_to_production(model_name, version):
client.transition_model_version_stage(
name=model_name,
version=version,
stage="Production"
)
# Utilisation en production
def load_production_model(model_name):
model = mlflow.pytorch.load_model(
f"models:/{model_name}/Production"
)
return model
# Exemple d'usage
with mlflow.start_run() as run:
model = train_model()
# Enregistrement automatique
mlflow.pytorch.log_model(
model,
"model",
registered_model_name="image-classifier-v2"
)
# Si les métriques sont bonnes, promotion
if accuracy > 0.95:
promote_to_production("image-classifier-v2", run.info.run_id)
Déploiement avec Kubernetes
MLflow Model Server
# mlflow-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: mlflow-model-server
spec:
replicas: 3
selector:
matchLabels:
app: mlflow-model-server
template:
metadata:
labels:
app: mlflow-model-server
spec:
containers:
- name: model-server
image: mlflow-models:latest
ports:
- containerPort: 8080
env:
- name: MLFLOW_TRACKING_URI
value: "http://mlflow-tracking:5000"
- name: MODEL_URI
value: "models:/image-classifier/Production"
command:
- mlflow
- models
- serve
- --model-uri
- $(MODEL_URI)
- --host
- "0.0.0.0"
- --port
- "8080"
resources:
requests:
memory: "1Gi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "1"
---
apiVersion: v1
kind: Service
metadata:
name: mlflow-model-service
spec:
selector:
app: mlflow-model-server
ports:
- port: 80
targetPort: 8080
type: LoadBalancer
Intégration avec KubeFlow
# kubeflow-pipeline.yaml
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: mlflow-training-pipeline
spec:
entrypoint: training-pipeline
templates:
- name: training-pipeline
dag:
tasks:
- name: data-preparation
template: prepare-data
- name: model-training
template: train-model
dependencies: [data-preparation]
- name: model-evaluation
template: evaluate-model
dependencies: [model-training]
- name: model-deployment
template: deploy-model
dependencies: [model-evaluation]
- name: train-model
container:
image: mlflow-training:latest
command: [python]
args: [train.py]
env:
- name: MLFLOW_TRACKING_URI
value: "http://mlflow.kubeflow:5000"
- name: MLFLOW_EXPERIMENT_NAME
value: "kubeflow-training"
Intégration CI/CD avec GitLab CI
# .gitlab-ci.yml pour MLOps
stages:
- data-validation
- training
- evaluation
- deployment
variables:
MLFLOW_TRACKING_URI: "https://mlflow.example.com"
MODEL_NAME: "fraud-detection"
data_validation:
stage: data-validation
image: python:3.9
script:
- pip install mlflow pandas great-expectations
- python validate_data.py
rules:
- if: $CI_COMMIT_BRANCH == "main"
changes:
- "data/**/*"
model_training:
stage: training
image: pytorch/pytorch:latest
script:
- pip install mlflow torch torchvision
- python train.py --experiment-name $CI_COMMIT_SHA
artifacts:
reports:
junit: test-results.xml
paths:
- model_metrics.json
rules:
- if: $CI_COMMIT_BRANCH == "main"
model_evaluation:
stage: evaluation
image: python:3.9
dependencies:
- model_training
script:
- python evaluate_model.py --run-id $MLFLOW_RUN_ID
- python check_model_quality.py
rules:
- if: $CI_COMMIT_BRANCH == "main"
deploy_staging:
stage: deployment
image: bitnami/kubectl:latest
script:
- mlflow models build-docker --model-uri "runs:/$MLFLOW_RUN_ID/model" --name $MODEL_NAME
- kubectl set image deployment/model-server model-server=$MODEL_NAME:$CI_COMMIT_SHA
environment:
name: staging
url: https://model-staging.example.com
rules:
- if: $CI_COMMIT_BRANCH == "main"
deploy_production:
stage: deployment
script:
- mlflow models transition-stage --model-name $MODEL_NAME --version $MODEL_VERSION --stage Production
environment:
name: production
url: https://model.example.com
when: manual
only:
- main
Monitoring avec Prometheus
# mlflow_metrics_exporter.py
from prometheus_client import Gauge, Counter, start_http_server
import mlflow
from mlflow.tracking import MlflowClient
import time
# Métriques Prometheus
model_accuracy = Gauge('mlflow_model_accuracy', 'Model accuracy', ['model_name', 'version'])
model_predictions = Counter('mlflow_model_predictions_total', 'Total predictions', ['model_name'])
model_latency = Gauge('mlflow_model_prediction_latency_seconds', 'Prediction latency')
class MLflowMonitor:
def __init__(self, tracking_uri):
mlflow.set_tracking_uri(tracking_uri)
self.client = MlflowClient()
def export_model_metrics(self):
"""Export des métriques des modèles en production"""
models = self.client.search_registered_models()
for model in models:
# Récupération version production
production_versions = [
v for v in model.latest_versions
if v.current_stage == "Production"
]
for version in production_versions:
run = self.client.get_run(version.run_id)
# Export métriques vers Prometheus
if 'accuracy' in run.data.metrics:
model_accuracy.labels(
model_name=model.name,
version=version.version
).set(run.data.metrics['accuracy'])
def start_monitoring(self):
start_http_server(8000)
while True:
self.export_model_metrics()
time.sleep(60)
if __name__ == "__main__":
monitor = MLflowMonitor("http://mlflow:5000")
monitor.start_monitoring()
API et intégrations
API REST pour prédictions
import mlflow.pyfunc
from flask import Flask, request, jsonify
import numpy as np
app = Flask(__name__)
# Chargement du modèle
model = mlflow.pyfunc.load_model("models:/image-classifier/Production")
@app.route('/predict', methods=['POST'])
def predict():
try:
# Données d'entrée
data = request.json['data']
input_data = np.array(data).reshape(1, -1)
# Prédiction
prediction = model.predict(input_data)
# Logging des prédictions
mlflow.log_metric("predictions_made", 1)
return jsonify({
'prediction': prediction.tolist(),
'model_version': model.metadata.get_model_info().model_uuid
})
except Exception as e:
return jsonify({'error': str(e)}), 400
@app.route('/health', methods=['GET'])
def health():
return jsonify({'status': 'healthy', 'model_loaded': model is not None})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8080)
Ressources
- Documentation : mlflow.org
- GitHub : github.com/mlflow/mlflow
- Examples : github.com/mlflow/mlflow/tree/master/examples