Skip to content

Kubeflow

Vue d'ensemble

Kubeflow est une plateforme MLOps open-source construite sur Kubernetes pour déployer, orchestrer et gérer des workflows de machine learning scalables et portables.

Philosophie

"Machine learning for Kubernetes - Rendre le ML accessible, portable et scalable sur Kubernetes."

Composants principaux

Kubeflow Pipelines

  • Workflow orchestration : Pipelines ML complexes
  • Reproducibilité : Versioning et artifacts
  • Scheduling : Exécution conditionnelle
  • Monitoring : Suivi des runs

Jupyter Notebooks

  • Environment managé : Jupyter sur Kubernetes
  • Resource allocation : CPU/GPU à la demande
  • Multi-user : Isolation et partage
  • Persistent storage : Volumes Kubernetes

Model Serving

  • KServe : Serving de modèles ML
  • Auto-scaling : Basé sur la charge
  • Multi-framework : PyTorch, TensorFlow, scikit-learn
  • A/B Testing : Déploiement canary

Installation sur Kubernetes

Déploiement avec Kustomize

# Installation Kubeflow
export KUBEFLOW_VERSION=1.7.0
export KUBEFLOW_DOMAIN=kubeflow.example.com

# Clone du repository
git clone https://github.com/kubeflow/manifests.git
cd manifests
git checkout v${KUBEFLOW_VERSION}

# Installation complète
kubectl apply -k example

# Vérification du déploiement
kubectl get pods -n kubeflow
kubectl get svc -n kubeflow-gateway

Configuration avec Helm

# values.yaml pour Kubeflow
kubeflow:
  domain: kubeflow.example.com

jupyter:
  enabled: true
  resources:
    limits:
      cpu: "2"
      memory: "4Gi"
      nvidia.com/gpu: "1"
    requests:
      cpu: "500m"
      memory: "1Gi"

pipelines:
  enabled: true
  persistence:
    storageClass: "fast-ssd"
    size: "50Gi"

katib:
  enabled: true  # Hyperparameter tuning

kserve:
  enabled: true
  domain: models.example.com

Kubeflow Pipelines

Pipeline simple

import kfp
from kfp import dsl
from kfp.components import create_component_from_func
from typing import NamedTuple

# Composants de base
@create_component_from_func
def load_data(data_path: str) -> NamedTuple('Outputs', [('dataset', str)]):
    """Chargement des données"""
    import pandas as pd

    # Simulation chargement
    data = pd.read_csv(data_path)
    processed_path = '/tmp/processed_data.csv'
    data.to_csv(processed_path, index=False)

    return (processed_path,)

@create_component_from_func
def train_model(
    dataset: str,
    learning_rate: float = 0.01,
    epochs: int = 10
) -> NamedTuple('Outputs', [('model', str), ('accuracy', float)]):
    """Entraînement du modèle"""
    import joblib
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.model_selection import train_test_split
    from sklearn.metrics import accuracy_score
    import pandas as pd

    # Chargement des données
    data = pd.read_csv(dataset)
    X = data.drop('target', axis=1)
    y = data['target']

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

    # Entraînement
    model = RandomForestClassifier(n_estimators=100)
    model.fit(X_train, y_train)

    # Évaluation
    predictions = model.predict(X_test)
    accuracy = accuracy_score(y_test, predictions)

    # Sauvegarde
    model_path = '/tmp/model.joblib'
    joblib.dump(model, model_path)

    return (model_path, accuracy)

@create_component_from_func
def evaluate_model(model: str, test_data: str) -> NamedTuple('Outputs', [('metrics', str)]):
    """Évaluation du modèle"""
    import joblib
    import json
    import pandas as pd
    from sklearn.metrics import classification_report

    # Chargement
    model = joblib.load(model)
    data = pd.read_csv(test_data)

    X_test = data.drop('target', axis=1)
    y_test = data['target']

    # Prédictions
    predictions = model.predict(X_test)

    # Métriques
    report = classification_report(y_test, predictions, output_dict=True)

    metrics_path = '/tmp/metrics.json'
    with open(metrics_path, 'w') as f:
        json.dump(report, f)

    return (metrics_path,)

# Définition du pipeline
@dsl.pipeline(
    name='ML Training Pipeline',
    description='Pipeline d\'entraînement ML avec Kubeflow'
)
def ml_training_pipeline(
    data_path: str,
    learning_rate: float = 0.01,
    epochs: int = 10
):
    # Étape 1: Chargement des données
    load_task = load_data(data_path)

    # Étape 2: Entraînement
    train_task = train_model(
        dataset=load_task.outputs['dataset'],
        learning_rate=learning_rate,
        epochs=epochs
    )

    # Étape 3: Évaluation
    eval_task = evaluate_model(
        model=train_task.outputs['model'],
        test_data=load_task.outputs['dataset']
    )

    # Conditions d'exécution
    with dsl.Condition(train_task.outputs['accuracy'] > 0.8):
        # Déploiement seulement si accuracy > 80%
        deploy_task = deploy_model(train_task.outputs['model'])

# Compilation et soumission
if __name__ == '__main__':
    kfp.compiler.Compiler().compile(ml_training_pipeline, 'ml_pipeline.yaml')

    client = kfp.Client(host='http://kubeflow-pipelines.kubeflow:8888')
    client.create_run_from_pipeline_func(
        ml_training_pipeline,
        arguments={
            'data_path': '/data/training_data.csv',
            'learning_rate': 0.001,
            'epochs': 50
        }
    )

Pipeline avancé avec PyTorch

import kfp
from kfp import dsl
from kfp.components import create_component_from_func, InputPath, OutputPath

@create_component_from_func
def pytorch_training(
    data_path: InputPath(),
    model_path: OutputPath(),
    metrics_path: OutputPath(),
    learning_rate: float = 0.001,
    batch_size: int = 32,
    epochs: int = 10,
    gpu_limit: str = "1"
):
    """Entraînement PyTorch avec GPU"""
    import torch
    import torch.nn as nn
    import torch.optim as optim
    from torch.utils.data import DataLoader, TensorDataset
    import pandas as pd
    import json
    import numpy as np

    # Configuration GPU
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"Using device: {device}")

    # Chargement des données
    data = pd.read_csv(data_path)
    X = torch.FloatTensor(data.drop('target', axis=1).values)
    y = torch.LongTensor(data['target'].values)

    # Dataset et DataLoader
    dataset = TensorDataset(X, y)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

    # Modèle
    class SimpleNet(nn.Module):
        def __init__(self, input_size, num_classes):
            super().__init__()
            self.fc1 = nn.Linear(input_size, 128)
            self.fc2 = nn.Linear(128, 64)
            self.fc3 = nn.Linear(64, num_classes)
            self.relu = nn.ReLU()
            self.dropout = nn.Dropout(0.2)

        def forward(self, x):
            x = self.relu(self.fc1(x))
            x = self.dropout(x)
            x = self.relu(self.fc2(x))
            x = self.dropout(x)
            x = self.fc3(x)
            return x

    model = SimpleNet(X.shape[1], len(torch.unique(y))).to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    # Entraînement
    model.train()
    losses = []

    for epoch in range(epochs):
        epoch_loss = 0
        for batch_X, batch_y in dataloader:
            batch_X, batch_y = batch_X.to(device), batch_y.to(device)

            optimizer.zero_grad()
            outputs = model(batch_X)
            loss = criterion(outputs, batch_y)
            loss.backward()
            optimizer.step()

            epoch_loss += loss.item()

        avg_loss = epoch_loss / len(dataloader)
        losses.append(avg_loss)
        print(f'Epoch {epoch+1}/{epochs}, Loss: {avg_loss:.4f}')

    # Sauvegarde du modèle
    torch.save(model.state_dict(), model_path)

    # Métriques
    final_metrics = {
        'final_loss': losses[-1],
        'training_losses': losses,
        'epochs_completed': epochs,
        'device_used': str(device)
    }

    with open(metrics_path, 'w') as f:
        json.dump(final_metrics, f)

# Pipeline avec GPU
@dsl.pipeline(name='PyTorch GPU Pipeline')
def pytorch_gpu_pipeline(data_path: str):
    train_op = pytorch_training(
        data_path=data_path,
        learning_rate=0.001,
        epochs=100
    )

    # Configuration GPU
    train_op.set_gpu_limit('1')
    train_op.set_memory_limit('8Gi')
    train_op.set_cpu_limit('4')

Intégration avec MLflow

# Composant MLflow pour Kubeflow
@create_component_from_func
def mlflow_logger(
    model_path: InputPath(),
    metrics_path: InputPath(),
    experiment_name: str,
    run_name: str
) -> str:
    """Log des résultats dans MLflow"""
    import mlflow
    import mlflow.pytorch
    import json
    import torch

    # Configuration MLflow
    mlflow.set_tracking_uri("http://mlflow-server.kubeflow:5000")
    mlflow.set_experiment(experiment_name)

    with mlflow.start_run(run_name=run_name):
        # Log des métriques
        with open(metrics_path, 'r') as f:
            metrics = json.load(f)

        mlflow.log_metrics(metrics)

        # Log du modèle
        mlflow.log_artifact(model_path, "model")

        # Enregistrement dans le registry
        model_uri = f"runs:/{mlflow.active_run().info.run_id}/model"
        mlflow.register_model(model_uri, "kubeflow-model")

        return mlflow.active_run().info.run_id

@dsl.pipeline(name='MLflow Integration Pipeline')
def mlflow_pipeline(data_path: str):
    # Entraînement
    train_task = pytorch_training(data_path=data_path)

    # Log dans MLflow
    mlflow_task = mlflow_logger(
        model_path=train_task.outputs['model_path'],
        metrics_path=train_task.outputs['metrics_path'],
        experiment_name="kubeflow-experiments",
        run_name=f"run-{dsl.RUN_ID_PLACEHOLDER}"
    )

Model Serving avec KServe

Déploiement de modèles

# pytorch-model.yaml
apiVersion: serving.kserve.io/v1beta1
kind: InferenceService
metadata:
  name: pytorch-classifier
  namespace: kubeflow-user-example-com
spec:
  predictor:
    pytorch:
      storageUri: gs://my-bucket/models/pytorch-model
      resources:
        limits:
          cpu: "1"
          memory: "2Gi"
          nvidia.com/gpu: "1"
        requests:
          cpu: "100m"
          memory: "200Mi"
      env:
      - name: STORAGE_URI
        value: "gs://my-bucket/models/pytorch-model"

Serving personnalisé

# custom_predictor.py
import torch
import json
import base64
from typing import Dict, Any
import kserve
from kserve import Model

class PyTorchPredictor(Model):
    def __init__(self, name: str):
        super().__init__(name)
        self.name = name
        self.model = None
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    def load(self):
        """Chargement du modèle"""
        model_path = "/mnt/models/model.pth"
        self.model = torch.load(model_path, map_location=self.device)
        self.model.eval()
        print(f"Model loaded on {self.device}")

    def predict(self, payload: Dict[str, Any]) -> Dict[str, Any]:
        """Prédiction"""
        try:
            # Extraction des données
            inputs = payload["instances"]

            # Conversion en tensor
            input_tensor = torch.FloatTensor(inputs).to(self.device)

            # Prédiction
            with torch.no_grad():
                outputs = self.model(input_tensor)
                probabilities = torch.softmax(outputs, dim=1)
                predictions = torch.argmax(probabilities, dim=1)

            return {
                "predictions": predictions.cpu().tolist(),
                "probabilities": probabilities.cpu().tolist()
            }

        except Exception as e:
            return {"error": str(e)}

if __name__ == "__main__":
    model = PyTorchPredictor("pytorch-classifier")
    model.load()
    kserve.ModelServer().start([model])

Hyperparameter Tuning avec Katib

# katib-experiment.yaml
apiVersion: kubeflow.org/v1beta1
kind: Experiment
metadata:
  name: pytorch-optimization
  namespace: kubeflow-user-example-com
spec:
  algorithm:
    algorithmName: random
  parallelTrialCount: 4
  maxTrialCount: 20
  maxFailedTrialCount: 3

  objective:
    type: maximize
    objectiveMetricName: accuracy

  parameters:
  - name: learning_rate
    parameterType: double
    feasibleSpace:
      min: "0.001"
      max: "0.1"
  - name: batch_size
    parameterType: int
    feasibleSpace:
      min: "16"
      max: "128"
  - name: num_layers
    parameterType: int
    feasibleSpace:
      min: "2"
      max: "5"

  trialTemplate:
    primaryContainerName: training-container
    trialSpec:
      apiVersion: batch/v1
      kind: Job
      spec:
        template:
          spec:
            containers:
            - name: training-container
              image: pytorch-training:latest
              command:
              - python
              - train.py
              - --learning-rate={{.HyperParameters.learning_rate}}
              - --batch-size={{.HyperParameters.batch_size}}
              - --num-layers={{.HyperParameters.num_layers}}
              resources:
                limits:
                  nvidia.com/gpu: "1"
            restartPolicy: Never

Monitoring avec Prometheus

# prometheus-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: prometheus-kubeflow-config
data:
  prometheus.yml: |
    global:
      scrape_interval: 15s

    scrape_configs:
    - job_name: 'kubeflow-pipelines'
      kubernetes_sd_configs:
      - role: pod
        namespaces:
          names:
          - kubeflow
      relabel_configs:
      - source_labels: [__meta_kubernetes_pod_label_app]
        action: keep
        regex: ml-pipeline.*

    - job_name: 'kserve-models'
      kubernetes_sd_configs:
      - role: service
        namespaces:
          names:
          - kubeflow-user-example-com
      relabel_configs:
      - source_labels: [__meta_kubernetes_service_label_serving_kserve_io_inferenceservice]
        action: keep
        regex: .+

    - job_name: 'jupyter-notebooks'
      static_configs:
      - targets: ['jupyter-web-app.kubeflow:80']

Workflows complexes

# pipeline-complexe.py
@dsl.pipeline(name='Production ML Pipeline')
def production_pipeline(
    data_source: str,
    model_name: str,
    deploy_threshold: float = 0.9
):
    # Étape 1: Validation des données
    data_validation = validate_data_quality(data_source)

    # Étape 2: Feature engineering
    feature_task = feature_engineering(
        data_validation.outputs['validated_data']
    )

    # Étape 3: Entraînement parallèle de plusieurs modèles
    with dsl.ParallelFor(
        ["random_forest", "gradient_boosting", "neural_network"]
    ) as model_type:
        model_training = train_model_parallel(
            features=feature_task.outputs['features'],
            model_type=model_type
        )

    # Étape 4: Sélection du meilleur modèle
    best_model = select_best_model(model_training.outputs['models'])

    # Étape 5: Tests A/B
    ab_test = setup_ab_test(
        best_model.outputs['model'],
        traffic_split=20  # 20% sur le nouveau modèle
    )

    # Étape 6: Déploiement conditionnel
    with dsl.Condition(best_model.outputs['accuracy'] > deploy_threshold):
        production_deploy = deploy_to_production(
            model=best_model.outputs['model'],
            model_name=model_name
        )

        # Notification
        notify_deployment = send_notification(
            message=f"Model {model_name} deployed with accuracy {best_model.outputs['accuracy']}"
        )

Ressources