Kubeflow
Vue d'ensemble
Kubeflow est une plateforme MLOps open-source construite sur Kubernetes pour déployer, orchestrer et gérer des workflows de machine learning scalables et portables.
Philosophie
"Machine learning for Kubernetes - Rendre le ML accessible, portable et scalable sur Kubernetes."
Composants principaux
Kubeflow Pipelines
- Workflow orchestration : Pipelines ML complexes
- Reproducibilité : Versioning et artifacts
- Scheduling : Exécution conditionnelle
- Monitoring : Suivi des runs
Jupyter Notebooks
- Environment managé : Jupyter sur Kubernetes
- Resource allocation : CPU/GPU à la demande
- Multi-user : Isolation et partage
- Persistent storage : Volumes Kubernetes
Model Serving
- KServe : Serving de modèles ML
- Auto-scaling : Basé sur la charge
- Multi-framework : PyTorch, TensorFlow, scikit-learn
- A/B Testing : Déploiement canary
Installation sur Kubernetes
Déploiement avec Kustomize
# Installation Kubeflow
export KUBEFLOW_VERSION=1.7.0
export KUBEFLOW_DOMAIN=kubeflow.example.com
# Clone du repository
git clone https://github.com/kubeflow/manifests.git
cd manifests
git checkout v${KUBEFLOW_VERSION}
# Installation complète
kubectl apply -k example
# Vérification du déploiement
kubectl get pods -n kubeflow
kubectl get svc -n kubeflow-gateway
Configuration avec Helm
# values.yaml pour Kubeflow
kubeflow:
domain: kubeflow.example.com
jupyter:
enabled: true
resources:
limits:
cpu: "2"
memory: "4Gi"
nvidia.com/gpu: "1"
requests:
cpu: "500m"
memory: "1Gi"
pipelines:
enabled: true
persistence:
storageClass: "fast-ssd"
size: "50Gi"
katib:
enabled: true # Hyperparameter tuning
kserve:
enabled: true
domain: models.example.com
Kubeflow Pipelines
Pipeline simple
import kfp
from kfp import dsl
from kfp.components import create_component_from_func
from typing import NamedTuple
# Composants de base
@create_component_from_func
def load_data(data_path: str) -> NamedTuple('Outputs', [('dataset', str)]):
"""Chargement des données"""
import pandas as pd
# Simulation chargement
data = pd.read_csv(data_path)
processed_path = '/tmp/processed_data.csv'
data.to_csv(processed_path, index=False)
return (processed_path,)
@create_component_from_func
def train_model(
dataset: str,
learning_rate: float = 0.01,
epochs: int = 10
) -> NamedTuple('Outputs', [('model', str), ('accuracy', float)]):
"""Entraînement du modèle"""
import joblib
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
import pandas as pd
# Chargement des données
data = pd.read_csv(dataset)
X = data.drop('target', axis=1)
y = data['target']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
# Entraînement
model = RandomForestClassifier(n_estimators=100)
model.fit(X_train, y_train)
# Évaluation
predictions = model.predict(X_test)
accuracy = accuracy_score(y_test, predictions)
# Sauvegarde
model_path = '/tmp/model.joblib'
joblib.dump(model, model_path)
return (model_path, accuracy)
@create_component_from_func
def evaluate_model(model: str, test_data: str) -> NamedTuple('Outputs', [('metrics', str)]):
"""Évaluation du modèle"""
import joblib
import json
import pandas as pd
from sklearn.metrics import classification_report
# Chargement
model = joblib.load(model)
data = pd.read_csv(test_data)
X_test = data.drop('target', axis=1)
y_test = data['target']
# Prédictions
predictions = model.predict(X_test)
# Métriques
report = classification_report(y_test, predictions, output_dict=True)
metrics_path = '/tmp/metrics.json'
with open(metrics_path, 'w') as f:
json.dump(report, f)
return (metrics_path,)
# Définition du pipeline
@dsl.pipeline(
name='ML Training Pipeline',
description='Pipeline d\'entraînement ML avec Kubeflow'
)
def ml_training_pipeline(
data_path: str,
learning_rate: float = 0.01,
epochs: int = 10
):
# Étape 1: Chargement des données
load_task = load_data(data_path)
# Étape 2: Entraînement
train_task = train_model(
dataset=load_task.outputs['dataset'],
learning_rate=learning_rate,
epochs=epochs
)
# Étape 3: Évaluation
eval_task = evaluate_model(
model=train_task.outputs['model'],
test_data=load_task.outputs['dataset']
)
# Conditions d'exécution
with dsl.Condition(train_task.outputs['accuracy'] > 0.8):
# Déploiement seulement si accuracy > 80%
deploy_task = deploy_model(train_task.outputs['model'])
# Compilation et soumission
if __name__ == '__main__':
kfp.compiler.Compiler().compile(ml_training_pipeline, 'ml_pipeline.yaml')
client = kfp.Client(host='http://kubeflow-pipelines.kubeflow:8888')
client.create_run_from_pipeline_func(
ml_training_pipeline,
arguments={
'data_path': '/data/training_data.csv',
'learning_rate': 0.001,
'epochs': 50
}
)
Pipeline avancé avec PyTorch
import kfp
from kfp import dsl
from kfp.components import create_component_from_func, InputPath, OutputPath
@create_component_from_func
def pytorch_training(
data_path: InputPath(),
model_path: OutputPath(),
metrics_path: OutputPath(),
learning_rate: float = 0.001,
batch_size: int = 32,
epochs: int = 10,
gpu_limit: str = "1"
):
"""Entraînement PyTorch avec GPU"""
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import pandas as pd
import json
import numpy as np
# Configuration GPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")
# Chargement des données
data = pd.read_csv(data_path)
X = torch.FloatTensor(data.drop('target', axis=1).values)
y = torch.LongTensor(data['target'].values)
# Dataset et DataLoader
dataset = TensorDataset(X, y)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
# Modèle
class SimpleNet(nn.Module):
def __init__(self, input_size, num_classes):
super().__init__()
self.fc1 = nn.Linear(input_size, 128)
self.fc2 = nn.Linear(128, 64)
self.fc3 = nn.Linear(64, num_classes)
self.relu = nn.ReLU()
self.dropout = nn.Dropout(0.2)
def forward(self, x):
x = self.relu(self.fc1(x))
x = self.dropout(x)
x = self.relu(self.fc2(x))
x = self.dropout(x)
x = self.fc3(x)
return x
model = SimpleNet(X.shape[1], len(torch.unique(y))).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
# Entraînement
model.train()
losses = []
for epoch in range(epochs):
epoch_loss = 0
for batch_X, batch_y in dataloader:
batch_X, batch_y = batch_X.to(device), batch_y.to(device)
optimizer.zero_grad()
outputs = model(batch_X)
loss = criterion(outputs, batch_y)
loss.backward()
optimizer.step()
epoch_loss += loss.item()
avg_loss = epoch_loss / len(dataloader)
losses.append(avg_loss)
print(f'Epoch {epoch+1}/{epochs}, Loss: {avg_loss:.4f}')
# Sauvegarde du modèle
torch.save(model.state_dict(), model_path)
# Métriques
final_metrics = {
'final_loss': losses[-1],
'training_losses': losses,
'epochs_completed': epochs,
'device_used': str(device)
}
with open(metrics_path, 'w') as f:
json.dump(final_metrics, f)
# Pipeline avec GPU
@dsl.pipeline(name='PyTorch GPU Pipeline')
def pytorch_gpu_pipeline(data_path: str):
train_op = pytorch_training(
data_path=data_path,
learning_rate=0.001,
epochs=100
)
# Configuration GPU
train_op.set_gpu_limit('1')
train_op.set_memory_limit('8Gi')
train_op.set_cpu_limit('4')
Intégration avec MLflow
# Composant MLflow pour Kubeflow
@create_component_from_func
def mlflow_logger(
model_path: InputPath(),
metrics_path: InputPath(),
experiment_name: str,
run_name: str
) -> str:
"""Log des résultats dans MLflow"""
import mlflow
import mlflow.pytorch
import json
import torch
# Configuration MLflow
mlflow.set_tracking_uri("http://mlflow-server.kubeflow:5000")
mlflow.set_experiment(experiment_name)
with mlflow.start_run(run_name=run_name):
# Log des métriques
with open(metrics_path, 'r') as f:
metrics = json.load(f)
mlflow.log_metrics(metrics)
# Log du modèle
mlflow.log_artifact(model_path, "model")
# Enregistrement dans le registry
model_uri = f"runs:/{mlflow.active_run().info.run_id}/model"
mlflow.register_model(model_uri, "kubeflow-model")
return mlflow.active_run().info.run_id
@dsl.pipeline(name='MLflow Integration Pipeline')
def mlflow_pipeline(data_path: str):
# Entraînement
train_task = pytorch_training(data_path=data_path)
# Log dans MLflow
mlflow_task = mlflow_logger(
model_path=train_task.outputs['model_path'],
metrics_path=train_task.outputs['metrics_path'],
experiment_name="kubeflow-experiments",
run_name=f"run-{dsl.RUN_ID_PLACEHOLDER}"
)
Model Serving avec KServe
Déploiement de modèles
# pytorch-model.yaml
apiVersion: serving.kserve.io/v1beta1
kind: InferenceService
metadata:
name: pytorch-classifier
namespace: kubeflow-user-example-com
spec:
predictor:
pytorch:
storageUri: gs://my-bucket/models/pytorch-model
resources:
limits:
cpu: "1"
memory: "2Gi"
nvidia.com/gpu: "1"
requests:
cpu: "100m"
memory: "200Mi"
env:
- name: STORAGE_URI
value: "gs://my-bucket/models/pytorch-model"
Serving personnalisé
# custom_predictor.py
import torch
import json
import base64
from typing import Dict, Any
import kserve
from kserve import Model
class PyTorchPredictor(Model):
def __init__(self, name: str):
super().__init__(name)
self.name = name
self.model = None
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
def load(self):
"""Chargement du modèle"""
model_path = "/mnt/models/model.pth"
self.model = torch.load(model_path, map_location=self.device)
self.model.eval()
print(f"Model loaded on {self.device}")
def predict(self, payload: Dict[str, Any]) -> Dict[str, Any]:
"""Prédiction"""
try:
# Extraction des données
inputs = payload["instances"]
# Conversion en tensor
input_tensor = torch.FloatTensor(inputs).to(self.device)
# Prédiction
with torch.no_grad():
outputs = self.model(input_tensor)
probabilities = torch.softmax(outputs, dim=1)
predictions = torch.argmax(probabilities, dim=1)
return {
"predictions": predictions.cpu().tolist(),
"probabilities": probabilities.cpu().tolist()
}
except Exception as e:
return {"error": str(e)}
if __name__ == "__main__":
model = PyTorchPredictor("pytorch-classifier")
model.load()
kserve.ModelServer().start([model])
Hyperparameter Tuning avec Katib
# katib-experiment.yaml
apiVersion: kubeflow.org/v1beta1
kind: Experiment
metadata:
name: pytorch-optimization
namespace: kubeflow-user-example-com
spec:
algorithm:
algorithmName: random
parallelTrialCount: 4
maxTrialCount: 20
maxFailedTrialCount: 3
objective:
type: maximize
objectiveMetricName: accuracy
parameters:
- name: learning_rate
parameterType: double
feasibleSpace:
min: "0.001"
max: "0.1"
- name: batch_size
parameterType: int
feasibleSpace:
min: "16"
max: "128"
- name: num_layers
parameterType: int
feasibleSpace:
min: "2"
max: "5"
trialTemplate:
primaryContainerName: training-container
trialSpec:
apiVersion: batch/v1
kind: Job
spec:
template:
spec:
containers:
- name: training-container
image: pytorch-training:latest
command:
- python
- train.py
- --learning-rate={{.HyperParameters.learning_rate}}
- --batch-size={{.HyperParameters.batch_size}}
- --num-layers={{.HyperParameters.num_layers}}
resources:
limits:
nvidia.com/gpu: "1"
restartPolicy: Never
Monitoring avec Prometheus
# prometheus-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: prometheus-kubeflow-config
data:
prometheus.yml: |
global:
scrape_interval: 15s
scrape_configs:
- job_name: 'kubeflow-pipelines'
kubernetes_sd_configs:
- role: pod
namespaces:
names:
- kubeflow
relabel_configs:
- source_labels: [__meta_kubernetes_pod_label_app]
action: keep
regex: ml-pipeline.*
- job_name: 'kserve-models'
kubernetes_sd_configs:
- role: service
namespaces:
names:
- kubeflow-user-example-com
relabel_configs:
- source_labels: [__meta_kubernetes_service_label_serving_kserve_io_inferenceservice]
action: keep
regex: .+
- job_name: 'jupyter-notebooks'
static_configs:
- targets: ['jupyter-web-app.kubeflow:80']
Workflows complexes
# pipeline-complexe.py
@dsl.pipeline(name='Production ML Pipeline')
def production_pipeline(
data_source: str,
model_name: str,
deploy_threshold: float = 0.9
):
# Étape 1: Validation des données
data_validation = validate_data_quality(data_source)
# Étape 2: Feature engineering
feature_task = feature_engineering(
data_validation.outputs['validated_data']
)
# Étape 3: Entraînement parallèle de plusieurs modèles
with dsl.ParallelFor(
["random_forest", "gradient_boosting", "neural_network"]
) as model_type:
model_training = train_model_parallel(
features=feature_task.outputs['features'],
model_type=model_type
)
# Étape 4: Sélection du meilleur modèle
best_model = select_best_model(model_training.outputs['models'])
# Étape 5: Tests A/B
ab_test = setup_ab_test(
best_model.outputs['model'],
traffic_split=20 # 20% sur le nouveau modèle
)
# Étape 6: Déploiement conditionnel
with dsl.Condition(best_model.outputs['accuracy'] > deploy_threshold):
production_deploy = deploy_to_production(
model=best_model.outputs['model'],
model_name=model_name
)
# Notification
notify_deployment = send_notification(
message=f"Model {model_name} deployed with accuracy {best_model.outputs['accuracy']}"
)
Ressources
- Documentation : kubeflow.org
- GitHub : github.com/kubeflow/kubeflow
- Examples : github.com/kubeflow/examples