Skip to content

MLflow

Vue d'ensemble

MLflow est une plateforme open-source pour la gestion du cycle de vie complet du Machine Learning, de l'expérimentation au déploiement en production, développée par Databricks.

Philosophie

"The open source platform for the machine learning lifecycle - Gérez vos expériences, modèles et déploiements ML de manière reproductible."

Composants principaux

MLflow Tracking

  • Experiments : Organisation des runs d'entraînement
  • Metrics : Suivi des performances (accuracy, loss, etc.)
  • Parameters : Hyperparamètres et configuration
  • Artifacts : Modèles, datasets, visualisations

MLflow Models

  • Format standard : Packaging des modèles ML
  • Multi-framework : PyTorch, TensorFlow, Scikit-learn
  • Serving : Déploiement API REST automatique
  • Registry : Gestion versions et cycles de vie

MLflow Projects

  • Reproductibilité : Environnements et dépendances
  • Format standard : MLproject.yaml
  • Execution : Local, Docker, Kubernetes
  • Parameters : Configuration externalisée

Tracking d'expériences

Exemple avec PyTorch

import mlflow
import mlflow.pytorch
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from sklearn.metrics import accuracy_score

# Configuration MLflow
mlflow.set_experiment("image-classification")
mlflow.set_tracking_uri("http://mlflow-server:5000")

class ImageClassifier(nn.Module):
    def __init__(self, num_classes=10):
        super().__init__()
        self.conv1 = nn.Conv2d(3, 32, 3, padding=1)
        self.conv2 = nn.Conv2d(32, 64, 3, padding=1)
        self.pool = nn.MaxPool2d(2, 2)
        self.fc1 = nn.Linear(64 * 8 * 8, 512)
        self.fc2 = nn.Linear(512, num_classes)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        x = self.pool(self.relu(self.conv1(x)))
        x = self.pool(self.relu(self.conv2(x)))
        x = x.view(-1, 64 * 8 * 8)
        x = self.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x

def train_model():
    # Hyperparamètres
    params = {
        "learning_rate": 0.001,
        "batch_size": 32,
        "epochs": 50,
        "dropout": 0.5,
        "optimizer": "adam"
    }

    with mlflow.start_run():
        # Log des paramètres
        mlflow.log_params(params)

        # Modèle et optimizer
        model = ImageClassifier(num_classes=10)
        optimizer = torch.optim.Adam(model.parameters(), lr=params["learning_rate"])
        criterion = nn.CrossEntropyLoss()

        # Training loop
        for epoch in range(params["epochs"]):
            model.train()
            total_loss = 0

            for batch_idx, (data, target) in enumerate(train_loader):
                optimizer.zero_grad()
                output = model(data)
                loss = criterion(output, target)
                loss.backward()
                optimizer.step()
                total_loss += loss.item()

            # Validation
            model.eval()
            val_loss = 0
            predictions = []
            targets = []

            with torch.no_grad():
                for data, target in val_loader:
                    output = model(data)
                    val_loss += criterion(output, target).item()
                    pred = output.argmax(dim=1)
                    predictions.extend(pred.cpu().numpy())
                    targets.extend(target.cpu().numpy())

            # Métriques
            accuracy = accuracy_score(targets, predictions)
            avg_train_loss = total_loss / len(train_loader)
            avg_val_loss = val_loss / len(val_loader)

            # Log des métriques
            mlflow.log_metrics({
                "train_loss": avg_train_loss,
                "val_loss": avg_val_loss,
                "accuracy": accuracy
            }, step=epoch)

            print(f"Epoch {epoch}: Train Loss: {avg_train_loss:.4f}, "
                  f"Val Loss: {avg_val_loss:.4f}, Accuracy: {accuracy:.4f}")

        # Sauvegarde du modèle
        mlflow.pytorch.log_model(
            model,
            "model",
            registered_model_name="image-classifier"
        )

        # Artifacts additionnels
        torch.save(model.state_dict(), "model_weights.pth")
        mlflow.log_artifact("model_weights.pth")

        return model

# Exécution
trained_model = train_model()

Intégration avec OpenCV

import mlflow
import cv2
import numpy as np
from sklearn.metrics import classification_report

class ImagePreprocessor:
    def __init__(self, target_size=(224, 224)):
        self.target_size = target_size

    def preprocess_batch(self, image_paths):
        processed_images = []

        for path in image_paths:
            # Lecture avec OpenCV
            image = cv2.imread(path)
            image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

            # Redimensionnement
            image = cv2.resize(image, self.target_size)

            # Normalisation
            image = image.astype(np.float32) / 255.0

            processed_images.append(image)

        return np.array(processed_images)

def experiment_with_preprocessing():
    with mlflow.start_run(run_name="preprocessing-experiment"):
        # Paramètres de preprocessing
        preprocessing_params = {
            "target_size": (224, 224),
            "normalization": "0-1",
            "augmentation": True,
            "blur_kernel": 3
        }

        mlflow.log_params(preprocessing_params)

        # Preprocessing avec OpenCV
        preprocessor = ImagePreprocessor(
            target_size=preprocessing_params["target_size"]
        )

        # Traitement des images
        train_images = preprocessor.preprocess_batch(train_image_paths)
        val_images = preprocessor.preprocess_batch(val_image_paths)

        # Entraînement du modèle
        model = train_with_processed_data(train_images, val_images)

        # Métriques finales
        predictions = model.predict(val_images)
        report = classification_report(val_labels, predictions, output_dict=True)

        mlflow.log_metrics({
            "final_accuracy": report["accuracy"],
            "precision": report["macro avg"]["precision"],
            "recall": report["macro avg"]["recall"],
            "f1_score": report["macro avg"]["f1-score"]
        })

        # Sauvegarde des artifacts
        mlflow.log_artifact("preprocessing_stats.json")
        mlflow.log_artifact("confusion_matrix.png")

Model Registry

Gestion des versions

from mlflow.tracking import MlflowClient

client = MlflowClient()

# Enregistrement d'un modèle
def register_model(model_name, run_id):
    model_version = mlflow.register_model(
        f"runs:/{run_id}/model",
        model_name
    )
    return model_version

# Transition vers production
def promote_to_production(model_name, version):
    client.transition_model_version_stage(
        name=model_name,
        version=version,
        stage="Production"
    )

# Utilisation en production
def load_production_model(model_name):
    model = mlflow.pytorch.load_model(
        f"models:/{model_name}/Production"
    )
    return model

# Exemple d'usage
with mlflow.start_run() as run:
    model = train_model()

    # Enregistrement automatique
    mlflow.pytorch.log_model(
        model,
        "model",
        registered_model_name="image-classifier-v2"
    )

    # Si les métriques sont bonnes, promotion
    if accuracy > 0.95:
        promote_to_production("image-classifier-v2", run.info.run_id)

Déploiement avec Kubernetes

MLflow Model Server

# mlflow-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: mlflow-model-server
spec:
  replicas: 3
  selector:
    matchLabels:
      app: mlflow-model-server
  template:
    metadata:
      labels:
        app: mlflow-model-server
    spec:
      containers:
      - name: model-server
        image: mlflow-models:latest
        ports:
        - containerPort: 8080
        env:
        - name: MLFLOW_TRACKING_URI
          value: "http://mlflow-tracking:5000"
        - name: MODEL_URI
          value: "models:/image-classifier/Production"
        command:
        - mlflow
        - models
        - serve
        - --model-uri
        - $(MODEL_URI)
        - --host
        - "0.0.0.0"
        - --port
        - "8080"
        resources:
          requests:
            memory: "1Gi"
            cpu: "500m"
          limits:
            memory: "2Gi"
            cpu: "1"

---
apiVersion: v1
kind: Service
metadata:
  name: mlflow-model-service
spec:
  selector:
    app: mlflow-model-server
  ports:
  - port: 80
    targetPort: 8080
  type: LoadBalancer

Intégration avec KubeFlow

# kubeflow-pipeline.yaml
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  name: mlflow-training-pipeline
spec:
  entrypoint: training-pipeline
  templates:
  - name: training-pipeline
    dag:
      tasks:
      - name: data-preparation
        template: prepare-data
      - name: model-training
        template: train-model
        dependencies: [data-preparation]
      - name: model-evaluation
        template: evaluate-model
        dependencies: [model-training]
      - name: model-deployment
        template: deploy-model
        dependencies: [model-evaluation]

  - name: train-model
    container:
      image: mlflow-training:latest
      command: [python]
      args: [train.py]
      env:
      - name: MLFLOW_TRACKING_URI
        value: "http://mlflow.kubeflow:5000"
      - name: MLFLOW_EXPERIMENT_NAME
        value: "kubeflow-training"

Intégration CI/CD avec GitLab CI

# .gitlab-ci.yml pour MLOps
stages:
  - data-validation
  - training
  - evaluation
  - deployment

variables:
  MLFLOW_TRACKING_URI: "https://mlflow.example.com"
  MODEL_NAME: "fraud-detection"

data_validation:
  stage: data-validation
  image: python:3.9
  script:
    - pip install mlflow pandas great-expectations
    - python validate_data.py
  rules:
    - if: $CI_COMMIT_BRANCH == "main"
      changes:
        - "data/**/*"

model_training:
  stage: training
  image: pytorch/pytorch:latest
  script:
    - pip install mlflow torch torchvision
    - python train.py --experiment-name $CI_COMMIT_SHA
  artifacts:
    reports:
      junit: test-results.xml
    paths:
      - model_metrics.json
  rules:
    - if: $CI_COMMIT_BRANCH == "main"

model_evaluation:
  stage: evaluation
  image: python:3.9
  dependencies:
    - model_training
  script:
    - python evaluate_model.py --run-id $MLFLOW_RUN_ID
    - python check_model_quality.py
  rules:
    - if: $CI_COMMIT_BRANCH == "main"

deploy_staging:
  stage: deployment
  image: bitnami/kubectl:latest
  script:
    - mlflow models build-docker --model-uri "runs:/$MLFLOW_RUN_ID/model" --name $MODEL_NAME
    - kubectl set image deployment/model-server model-server=$MODEL_NAME:$CI_COMMIT_SHA
  environment:
    name: staging
    url: https://model-staging.example.com
  rules:
    - if: $CI_COMMIT_BRANCH == "main"

deploy_production:
  stage: deployment
  script:
    - mlflow models transition-stage --model-name $MODEL_NAME --version $MODEL_VERSION --stage Production
  environment:
    name: production
    url: https://model.example.com
  when: manual
  only:
    - main

Monitoring avec Prometheus

# mlflow_metrics_exporter.py
from prometheus_client import Gauge, Counter, start_http_server
import mlflow
from mlflow.tracking import MlflowClient
import time

# Métriques Prometheus
model_accuracy = Gauge('mlflow_model_accuracy', 'Model accuracy', ['model_name', 'version'])
model_predictions = Counter('mlflow_model_predictions_total', 'Total predictions', ['model_name'])
model_latency = Gauge('mlflow_model_prediction_latency_seconds', 'Prediction latency')

class MLflowMonitor:
    def __init__(self, tracking_uri):
        mlflow.set_tracking_uri(tracking_uri)
        self.client = MlflowClient()

    def export_model_metrics(self):
        """Export des métriques des modèles en production"""
        models = self.client.search_registered_models()

        for model in models:
            # Récupération version production
            production_versions = [
                v for v in model.latest_versions 
                if v.current_stage == "Production"
            ]

            for version in production_versions:
                run = self.client.get_run(version.run_id)

                # Export métriques vers Prometheus
                if 'accuracy' in run.data.metrics:
                    model_accuracy.labels(
                        model_name=model.name,
                        version=version.version
                    ).set(run.data.metrics['accuracy'])

    def start_monitoring(self):
        start_http_server(8000)
        while True:
            self.export_model_metrics()
            time.sleep(60)

if __name__ == "__main__":
    monitor = MLflowMonitor("http://mlflow:5000")
    monitor.start_monitoring()

API et intégrations

API REST pour prédictions

import mlflow.pyfunc
from flask import Flask, request, jsonify
import numpy as np

app = Flask(__name__)

# Chargement du modèle
model = mlflow.pyfunc.load_model("models:/image-classifier/Production")

@app.route('/predict', methods=['POST'])
def predict():
    try:
        # Données d'entrée
        data = request.json['data']
        input_data = np.array(data).reshape(1, -1)

        # Prédiction
        prediction = model.predict(input_data)

        # Logging des prédictions
        mlflow.log_metric("predictions_made", 1)

        return jsonify({
            'prediction': prediction.tolist(),
            'model_version': model.metadata.get_model_info().model_uuid
        })

    except Exception as e:
        return jsonify({'error': str(e)}), 400

@app.route('/health', methods=['GET'])
def health():
    return jsonify({'status': 'healthy', 'model_loaded': model is not None})

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8080)

Ressources