IA Embarquée - Drones Autonomes
Intelligence artificielle embarquée pour systèmes de drones autonomes critiques.
Vision IA Embarquée
Objectif
"Développer des systèmes d'IA embarquée pour drones autonomes avec contraintes temps réel et sécurité critique."
Défis techniques
- Contraintes matérielles : Computing limité, consommation, poids
- Temps réel : Latence < 50ms pour décisions critiques
- Fiabilité : 99.99% de disponibilité en vol
- Explicabilité : Comprendre les décisions de l'IA
Architectures IA Embarquée
Edge AI Computing
graph TB
subgraph Drone["🚁 Drone Platform"]
subgraph Sensors["📡 Capteurs"]
Camera["Camera 4K<br/>30fps"]
LiDAR["LiDAR 360°<br/>10Hz"]
IMU["IMU 9-axis<br/>1kHz"]
GPS["RTK GPS<br/>5Hz"]
end
subgraph EdgeCompute["🧠 Edge Computing"]
Jetson["NVIDIA Jetson Xavier NX<br/>21 TOPS AI Performance"]
TPU["Google Coral TPU<br/>4 TOPS/Watt"]
CPU["ARM Cortex-A78<br/>General Processing"]
end
subgraph AIModels["🤖 AI Models"]
Vision["Computer Vision<br/>TensorFlow Lite"]
Navigation["Path Planning<br/>PyTorch Mobile"]
Decision["Decision Making<br/>ONNX Runtime"]
end
subgraph FlightControl["✈️ Flight Control"]
Pixhawk["Pixhawk 6C<br/>Flight Controller"]
Motors["Motors & ESCs<br/>Actuators"]
end
end
%% Connections
Sensors --> EdgeCompute
EdgeCompute --> AIModels
AIModels --> FlightControl
FlightControl --> Motors
%% Feedback loops
Motors -.->|Telemetry| IMU
FlightControl -.->|Status| EdgeCompute
classDef sensors fill:#3b82f6,stroke:#1e40af,color:#fff
classDef compute fill:#10b981,stroke:#047857,color:#fff
classDef ai fill:#f59e0b,stroke:#d97706,color:#000
classDef control fill:#ef4444,stroke:#dc2626,color:#fff
class Camera,LiDAR,IMU,GPS sensors
class Jetson,TPU,CPU compute
class Vision,Navigation,Decision ai
class Pixhawk,Motors control
Pipeline Temps Réel
import asyncio
import numpy as np
import cv2
import tensorflow as tf
from typing import Tuple, Optional
class RealTimeAIPipeline:
def __init__(self):
# Chargement des modèles optimisés
self.vision_model = tf.lite.Interpreter("vision_optimized.tflite")
self.navigation_model = tf.lite.Interpreter("navigation_optimized.tflite")
# Allocation tensors une seule fois
self.vision_model.allocate_tensors()
self.navigation_model.allocate_tensors()
# Cache pour optimisation
self.frame_cache = None
self.decision_cache = None
async def process_sensor_data(self,
frame: np.ndarray,
lidar_data: np.ndarray,
imu_data: np.ndarray) -> dict:
"""Pipeline asynchrone optimisé pour contraintes temps réel"""
start_time = asyncio.get_event_loop().time()
# Traitement parallèle des capteurs
vision_task = asyncio.create_task(self.process_vision(frame))
lidar_task = asyncio.create_task(self.process_lidar(lidar_data))
imu_task = asyncio.create_task(self.process_imu(imu_data))
# Attendre tous les résultats
vision_result, lidar_result, imu_result = await asyncio.gather(
vision_task, lidar_task, imu_task
)
# Fusion des données
fused_data = self.sensor_fusion(vision_result, lidar_result, imu_result)
# Décision de navigation
navigation_command = await self.make_navigation_decision(fused_data)
# Vérification contrainte temps réel
processing_time = (asyncio.get_event_loop().time() - start_time) * 1000
if processing_time > 50: # 50ms max
logger.warning(f"Dépassement contrainte temps: {processing_time:.2f}ms")
return {
'navigation': navigation_command,
'processing_time': processing_time,
'confidence': fused_data.get('confidence', 0.0)
}
async def process_vision(self, frame: np.ndarray) -> dict:
"""Traitement vision computer optimisé"""
# Préprocessing optimisé
resized = cv2.resize(frame, (224, 224))
normalized = resized.astype(np.float32) / 255.0
# Inférence TensorFlow Lite
input_details = self.vision_model.get_input_details()
output_details = self.vision_model.get_output_details()
self.vision_model.set_tensor(input_details[0]['index'],
np.expand_dims(normalized, axis=0))
self.vision_model.invoke()
# Résultats
obstacles = self.vision_model.get_tensor(output_details[0]['index'])
depth_map = self.vision_model.get_tensor(output_details[1]['index'])
return {
'obstacles': obstacles[0],
'depth_map': depth_map[0],
'confidence': float(np.max(obstacles[0]))
}
Computer Vision Embarquée
Détection d'Obstacles Temps Réel
class ObstacleDetector:
def __init__(self, model_path: str):
self.model = tf.lite.Interpreter(model_path)
self.model.allocate_tensors()
# Configuration spécifique drones
self.min_obstacle_size = 0.3 # mètres
self.danger_distance = 5.0 # mètres
self.confidence_threshold = 0.8
def detect_and_classify(self, frame: np.ndarray) -> list:
"""Détection obstacles avec classification de danger"""
# Préprocessing optimisé GPU
processed_frame = self.preprocess_gpu(frame)
# Inférence
detections = self.run_inference(processed_frame)
# Post-processing avec logique drone
obstacles = []
for detection in detections:
if detection['confidence'] > self.confidence_threshold:
obstacle = self.analyze_obstacle(detection, frame.shape)
if obstacle['danger_level'] > 0:
obstacles.append(obstacle)
return sorted(obstacles, key=lambda x: x['distance'])
def analyze_obstacle(self, detection: dict, frame_shape: tuple) -> dict:
"""Analyse spécifique aux contraintes de vol"""
# Calcul distance via depth estimation
distance = self.estimate_distance(detection['bbox'], frame_shape)
# Classification danger
if distance < 2.0:
danger_level = 3 # Critique
elif distance < 5.0:
danger_level = 2 # Élevé
elif distance < 10.0:
danger_level = 1 # Modéré
else:
danger_level = 0 # Faible
return {
'type': detection['class'],
'confidence': detection['confidence'],
'distance': distance,
'danger_level': danger_level,
'avoidance_vector': self.calculate_avoidance(detection['bbox'])
}
Navigation Autonome
import numpy as np
from typing import List, Tuple
import torch
import torch.nn as nn
class AutonomousNavigator:
def __init__(self):
self.path_planner = self.load_path_planner()
self.obstacle_memory = ObstacleMemory(capacity=1000)
self.current_mission = None
def plan_trajectory(self,
current_position: np.ndarray,
target_position: np.ndarray,
obstacles: List[dict],
constraints: dict) -> np.ndarray:
"""Planification de trajectoire optimale avec contraintes"""
# Mise à jour mémoire obstacles
self.obstacle_memory.update(obstacles)
# Génération de trajectoires candidates
candidate_paths = self.generate_candidate_paths(
current_position, target_position, num_candidates=5
)
# Évaluation avec réseau de neurones
path_scores = []
for path in candidate_paths:
score = self.evaluate_path(path, obstacles, constraints)
path_scores.append(score)
# Sélection trajectoire optimale
best_path_idx = np.argmax(path_scores)
optimal_path = candidate_paths[best_path_idx]
# Smoothing et validation finale
smooth_path = self.smooth_trajectory(optimal_path)
validated_path = self.validate_safety(smooth_path, obstacles)
return validated_path
def evaluate_path(self, path: np.ndarray, obstacles: List[dict],
constraints: dict) -> float:
"""Évaluation multi-critères d'une trajectoire"""
# Critères d'évaluation
safety_score = self.compute_safety_score(path, obstacles)
efficiency_score = self.compute_efficiency_score(path)
energy_score = self.compute_energy_score(path)
comfort_score = self.compute_comfort_score(path)
# Pondération adaptive selon contexte
weights = self.get_adaptive_weights(constraints)
total_score = (
weights['safety'] * safety_score +
weights['efficiency'] * efficiency_score +
weights['energy'] * energy_score +
weights['comfort'] * comfort_score
)
return total_score
Apprentissage par Renforcement
Environnement de Simulation
import gym
from gym import spaces
import numpy as np
class DroneNavigationEnv(gym.Env):
"""Environnement RL pour entraînement navigation autonome"""
def __init__(self, config: dict):
super().__init__()
# Espace d'actions (commandes drone)
self.action_space = spaces.Box(
low=-1.0, high=1.0, shape=(4,), dtype=np.float32
) # [roll, pitch, yaw, throttle]
# Espace d'observations
self.observation_space = spaces.Dict({
'position': spaces.Box(low=-100, high=100, shape=(3,)),
'velocity': spaces.Box(low=-10, high=10, shape=(3,)),
'obstacles': spaces.Box(low=0, high=50, shape=(20, 3)),
'target': spaces.Box(low=-100, high=100, shape=(3,)),
'battery': spaces.Box(low=0, high=1, shape=(1,))
})
self.config = config
self.reset()
def step(self, action: np.ndarray) -> Tuple[dict, float, bool, dict]:
"""Un pas de simulation"""
# Application de l'action
self.apply_action(action)
# Mise à jour physique
self.update_physics()
# Calcul récompense
reward = self.calculate_reward()
# Vérification conditions d'arrêt
done = self.check_termination()
# Observations suivantes
obs = self.get_observation()
# Informations additionnelles
info = {
'distance_to_target': self.get_distance_to_target(),
'collision': self.check_collision(),
'battery_remaining': self.battery_level
}
return obs, reward, done, info
def calculate_reward(self) -> float:
"""Fonction de récompense sophistiquée"""
reward = 0.0
# Récompense progression vers objectif
distance_to_target = self.get_distance_to_target()
prev_distance = getattr(self, 'prev_distance_to_target', distance_to_target)
progress_reward = (prev_distance - distance_to_target) * 10
reward += progress_reward
# Pénalité collision
if self.check_collision():
reward -= 100
# Pénalité sortie de zone
if self.out_of_bounds():
reward -= 50
# Récompense efficacité énergétique
energy_efficiency = 1.0 - (np.linalg.norm(self.last_action) / 4.0)
reward += energy_efficiency * 2
# Récompense stabilité
stability = 1.0 - np.linalg.norm(self.angular_velocity) / 10.0
reward += stability * 1
# Bonus objectif atteint
if distance_to_target < 1.0:
reward += 200
self.prev_distance_to_target = distance_to_target
return reward
Agent PPO Optimisé
import torch
import torch.nn as nn
from stable_baselines3 import PPO
from stable_baselines3.common.callbacks import BaseCallback
class DroneNavigationAgent:
def __init__(self):
self.model = None
self.training_stats = []
def create_model(self, env, config: dict):
"""Création modèle PPO optimisé pour drones"""
policy_kwargs = dict(
activation_fn=torch.nn.ReLU,
net_arch=[512, 512, 256], # Architecture réseau
log_std_init=-2, # Exploration initiale
)
self.model = PPO(
"MultiInputPolicy",
env,
policy_kwargs=policy_kwargs,
learning_rate=3e-4,
n_steps=2048,
batch_size=64,
n_epochs=10,
gamma=0.99,
gae_lambda=0.95,
clip_range=0.2,
ent_coef=0.01,
verbose=1,
tensorboard_log="./logs/",
device="cuda" if torch.cuda.is_available() else "cpu"
)
def train(self, total_timesteps: int, callback=None):
"""Entraînement avec monitoring"""
self.model.learn(
total_timesteps=total_timesteps,
callback=callback,
progress_bar=True
)
def evaluate_performance(self, env, n_episodes: int = 10) -> dict:
"""Évaluation performance sur épisodes de test"""
results = {
'success_rate': 0,
'avg_reward': 0,
'avg_episode_length': 0,
'collision_rate': 0
}
successes = 0
total_reward = 0
total_steps = 0
collisions = 0
for episode in range(n_episodes):
obs = env.reset()
episode_reward = 0
episode_steps = 0
done = False
while not done:
action, _ = self.model.predict(obs, deterministic=True)
obs, reward, done, info = env.step(action)
episode_reward += reward
episode_steps += 1
if info.get('collision', False):
collisions += 1
if info.get('distance_to_target', float('inf')) < 1.0:
successes += 1
total_reward += episode_reward
total_steps += episode_steps
results['success_rate'] = successes / n_episodes
results['avg_reward'] = total_reward / n_episodes
results['avg_episode_length'] = total_steps / n_episodes
results['collision_rate'] = collisions / (total_steps)
return results
Optimisation Embarquée
Quantification de Modèles
import tensorflow as tf
import numpy as np
class ModelOptimizer:
"""Optimisation modèles pour contraintes embarquées"""
@staticmethod
def quantize_model(model_path: str,
output_path: str,
optimization_type: str = "dynamic") -> dict:
"""Quantification INT8 pour Edge AI"""
# Chargement modèle original
model = tf.keras.models.load_model(model_path)
# Configuration quantification
converter = tf.lite.TFLiteConverter.from_keras_model(model)
if optimization_type == "dynamic":
converter.optimizations = [tf.lite.Optimize.DEFAULT]
elif optimization_type == "int8":
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.target_spec.supported_types = [tf.int8]
elif optimization_type == "float16":
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.target_spec.supported_types = [tf.float16]
# Génération modèle optimisé
tflite_model = converter.convert()
# Sauvegarde
with open(output_path, 'wb') as f:
f.write(tflite_model)
# Métriques d'optimisation
original_size = os.path.getsize(model_path)
optimized_size = len(tflite_model)
compression_ratio = original_size / optimized_size
return {
'original_size_mb': original_size / (1024 * 1024),
'optimized_size_mb': optimized_size / (1024 * 1024),
'compression_ratio': compression_ratio,
'optimization_type': optimization_type
}
@staticmethod
def benchmark_model(model_path: str, num_runs: int = 100) -> dict:
"""Benchmark performance modèle optimisé"""
# Chargement modèle
interpreter = tf.lite.Interpreter(model_path=model_path)
interpreter.allocate_tensors()
input_details = interpreter.get_input_details()
output_details = interpreter.get_output_details()
# Génération données test
input_shape = input_details[0]['shape'][1:]
test_input = np.random.random((1,) + input_shape).astype(np.float32)
# Benchmark timing
times = []
for _ in range(num_runs):
start_time = time.time()
interpreter.set_tensor(input_details[0]['index'], test_input)
interpreter.invoke()
result = interpreter.get_tensor(output_details[0]['index'])
end_time = time.time()
times.append((end_time - start_time) * 1000) # ms
return {
'avg_inference_time_ms': np.mean(times),
'std_inference_time_ms': np.std(times),
'min_inference_time_ms': np.min(times),
'max_inference_time_ms': np.max(times),
'throughput_fps': 1000 / np.mean(times)
}
Projets Pratiques
Drone de Surveillance Autonome
- Objectif : Patrouille automatique avec détection d'intrusion
- IA : YOLOv8 optimisé + tracking multi-objets
- Navigation : A* modifié + évitement dynamique
- Communication : 4G/5G pour alertes temps réel
Inspecteur d'Infrastructure
- Objectif : Inspection automatisée ponts/bâtiments
- IA : Détection défauts structurels par computer vision
- Précision : RTK GPS + SLAM visuel
- Données : Modèles 3D automatiques + rapports
Essaim Coordonné
- Objectif : Coordination multi-drones pour missions complexes
- IA : Apprentissage par renforcement multi-agents
- Communication : Mesh network + consensus blockchain
- Applications : SAR, livraison, cartographie