Skip to content

IA Embarquée - Drones Autonomes

Intelligence artificielle embarquée pour systèmes de drones autonomes critiques.

Vision IA Embarquée

Objectif

"Développer des systèmes d'IA embarquée pour drones autonomes avec contraintes temps réel et sécurité critique."

Défis techniques

  • Contraintes matérielles : Computing limité, consommation, poids
  • Temps réel : Latence < 50ms pour décisions critiques
  • Fiabilité : 99.99% de disponibilité en vol
  • Explicabilité : Comprendre les décisions de l'IA

Architectures IA Embarquée

Edge AI Computing

graph TB
    subgraph Drone["🚁 Drone Platform"]
        subgraph Sensors["📡 Capteurs"]
            Camera["Camera 4K<br/>30fps"]
            LiDAR["LiDAR 360°<br/>10Hz"]
            IMU["IMU 9-axis<br/>1kHz"]
            GPS["RTK GPS<br/>5Hz"]
        end

        subgraph EdgeCompute["🧠 Edge Computing"]
            Jetson["NVIDIA Jetson Xavier NX<br/>21 TOPS AI Performance"]
            TPU["Google Coral TPU<br/>4 TOPS/Watt"]
            CPU["ARM Cortex-A78<br/>General Processing"]
        end

        subgraph AIModels["🤖 AI Models"]
            Vision["Computer Vision<br/>TensorFlow Lite"]
            Navigation["Path Planning<br/>PyTorch Mobile"]
            Decision["Decision Making<br/>ONNX Runtime"]
        end

        subgraph FlightControl["✈️ Flight Control"]
            Pixhawk["Pixhawk 6C<br/>Flight Controller"]
            Motors["Motors & ESCs<br/>Actuators"]
        end
    end

    %% Connections
    Sensors --> EdgeCompute
    EdgeCompute --> AIModels
    AIModels --> FlightControl
    FlightControl --> Motors

    %% Feedback loops
    Motors -.->|Telemetry| IMU
    FlightControl -.->|Status| EdgeCompute

    classDef sensors fill:#3b82f6,stroke:#1e40af,color:#fff
    classDef compute fill:#10b981,stroke:#047857,color:#fff  
    classDef ai fill:#f59e0b,stroke:#d97706,color:#000
    classDef control fill:#ef4444,stroke:#dc2626,color:#fff

    class Camera,LiDAR,IMU,GPS sensors
    class Jetson,TPU,CPU compute
    class Vision,Navigation,Decision ai
    class Pixhawk,Motors control

Pipeline Temps Réel

import asyncio
import numpy as np
import cv2
import tensorflow as tf
from typing import Tuple, Optional

class RealTimeAIPipeline:
    def __init__(self):
        # Chargement des modèles optimisés
        self.vision_model = tf.lite.Interpreter("vision_optimized.tflite")
        self.navigation_model = tf.lite.Interpreter("navigation_optimized.tflite")

        # Allocation tensors une seule fois
        self.vision_model.allocate_tensors()
        self.navigation_model.allocate_tensors()

        # Cache pour optimisation
        self.frame_cache = None
        self.decision_cache = None

    async def process_sensor_data(self, 
                                 frame: np.ndarray,
                                 lidar_data: np.ndarray,
                                 imu_data: np.ndarray) -> dict:
        """Pipeline asynchrone optimisé pour contraintes temps réel"""

        start_time = asyncio.get_event_loop().time()

        # Traitement parallèle des capteurs
        vision_task = asyncio.create_task(self.process_vision(frame))
        lidar_task = asyncio.create_task(self.process_lidar(lidar_data))
        imu_task = asyncio.create_task(self.process_imu(imu_data))

        # Attendre tous les résultats
        vision_result, lidar_result, imu_result = await asyncio.gather(
            vision_task, lidar_task, imu_task
        )

        # Fusion des données
        fused_data = self.sensor_fusion(vision_result, lidar_result, imu_result)

        # Décision de navigation
        navigation_command = await self.make_navigation_decision(fused_data)

        # Vérification contrainte temps réel
        processing_time = (asyncio.get_event_loop().time() - start_time) * 1000
        if processing_time > 50:  # 50ms max
            logger.warning(f"Dépassement contrainte temps: {processing_time:.2f}ms")

        return {
            'navigation': navigation_command,
            'processing_time': processing_time,
            'confidence': fused_data.get('confidence', 0.0)
        }

    async def process_vision(self, frame: np.ndarray) -> dict:
        """Traitement vision computer optimisé"""
        # Préprocessing optimisé
        resized = cv2.resize(frame, (224, 224))
        normalized = resized.astype(np.float32) / 255.0

        # Inférence TensorFlow Lite
        input_details = self.vision_model.get_input_details()
        output_details = self.vision_model.get_output_details()

        self.vision_model.set_tensor(input_details[0]['index'], 
                                   np.expand_dims(normalized, axis=0))
        self.vision_model.invoke()

        # Résultats
        obstacles = self.vision_model.get_tensor(output_details[0]['index'])
        depth_map = self.vision_model.get_tensor(output_details[1]['index'])

        return {
            'obstacles': obstacles[0],
            'depth_map': depth_map[0],
            'confidence': float(np.max(obstacles[0]))
        }

Computer Vision Embarquée

Détection d'Obstacles Temps Réel

class ObstacleDetector:
    def __init__(self, model_path: str):
        self.model = tf.lite.Interpreter(model_path)
        self.model.allocate_tensors()

        # Configuration spécifique drones
        self.min_obstacle_size = 0.3  # mètres
        self.danger_distance = 5.0    # mètres
        self.confidence_threshold = 0.8

    def detect_and_classify(self, frame: np.ndarray) -> list:
        """Détection obstacles avec classification de danger"""

        # Préprocessing optimisé GPU
        processed_frame = self.preprocess_gpu(frame)

        # Inférence
        detections = self.run_inference(processed_frame)

        # Post-processing avec logique drone
        obstacles = []
        for detection in detections:
            if detection['confidence'] > self.confidence_threshold:
                obstacle = self.analyze_obstacle(detection, frame.shape)
                if obstacle['danger_level'] > 0:
                    obstacles.append(obstacle)

        return sorted(obstacles, key=lambda x: x['distance'])

    def analyze_obstacle(self, detection: dict, frame_shape: tuple) -> dict:
        """Analyse spécifique aux contraintes de vol"""

        # Calcul distance via depth estimation
        distance = self.estimate_distance(detection['bbox'], frame_shape)

        # Classification danger
        if distance < 2.0:
            danger_level = 3  # Critique
        elif distance < 5.0:
            danger_level = 2  # Élevé
        elif distance < 10.0:
            danger_level = 1  # Modéré
        else:
            danger_level = 0  # Faible

        return {
            'type': detection['class'],
            'confidence': detection['confidence'],
            'distance': distance,
            'danger_level': danger_level,
            'avoidance_vector': self.calculate_avoidance(detection['bbox'])
        }
import numpy as np
from typing import List, Tuple
import torch
import torch.nn as nn

class AutonomousNavigator:
    def __init__(self):
        self.path_planner = self.load_path_planner()
        self.obstacle_memory = ObstacleMemory(capacity=1000)
        self.current_mission = None

    def plan_trajectory(self, 
                       current_position: np.ndarray,
                       target_position: np.ndarray,
                       obstacles: List[dict],
                       constraints: dict) -> np.ndarray:
        """Planification de trajectoire optimale avec contraintes"""

        # Mise à jour mémoire obstacles
        self.obstacle_memory.update(obstacles)

        # Génération de trajectoires candidates
        candidate_paths = self.generate_candidate_paths(
            current_position, target_position, num_candidates=5
        )

        # Évaluation avec réseau de neurones
        path_scores = []
        for path in candidate_paths:
            score = self.evaluate_path(path, obstacles, constraints)
            path_scores.append(score)

        # Sélection trajectoire optimale
        best_path_idx = np.argmax(path_scores)
        optimal_path = candidate_paths[best_path_idx]

        # Smoothing et validation finale
        smooth_path = self.smooth_trajectory(optimal_path)
        validated_path = self.validate_safety(smooth_path, obstacles)

        return validated_path

    def evaluate_path(self, path: np.ndarray, obstacles: List[dict], 
                     constraints: dict) -> float:
        """Évaluation multi-critères d'une trajectoire"""

        # Critères d'évaluation
        safety_score = self.compute_safety_score(path, obstacles)
        efficiency_score = self.compute_efficiency_score(path)
        energy_score = self.compute_energy_score(path)
        comfort_score = self.compute_comfort_score(path)

        # Pondération adaptive selon contexte
        weights = self.get_adaptive_weights(constraints)

        total_score = (
            weights['safety'] * safety_score +
            weights['efficiency'] * efficiency_score +
            weights['energy'] * energy_score +
            weights['comfort'] * comfort_score
        )

        return total_score

Apprentissage par Renforcement

Environnement de Simulation

import gym
from gym import spaces
import numpy as np

class DroneNavigationEnv(gym.Env):
    """Environnement RL pour entraînement navigation autonome"""

    def __init__(self, config: dict):
        super().__init__()

        # Espace d'actions (commandes drone)
        self.action_space = spaces.Box(
            low=-1.0, high=1.0, shape=(4,), dtype=np.float32
        )  # [roll, pitch, yaw, throttle]

        # Espace d'observations
        self.observation_space = spaces.Dict({
            'position': spaces.Box(low=-100, high=100, shape=(3,)),
            'velocity': spaces.Box(low=-10, high=10, shape=(3,)),
            'obstacles': spaces.Box(low=0, high=50, shape=(20, 3)),
            'target': spaces.Box(low=-100, high=100, shape=(3,)),
            'battery': spaces.Box(low=0, high=1, shape=(1,))
        })

        self.config = config
        self.reset()

    def step(self, action: np.ndarray) -> Tuple[dict, float, bool, dict]:
        """Un pas de simulation"""

        # Application de l'action
        self.apply_action(action)

        # Mise à jour physique
        self.update_physics()

        # Calcul récompense
        reward = self.calculate_reward()

        # Vérification conditions d'arrêt
        done = self.check_termination()

        # Observations suivantes
        obs = self.get_observation()

        # Informations additionnelles
        info = {
            'distance_to_target': self.get_distance_to_target(),
            'collision': self.check_collision(),
            'battery_remaining': self.battery_level
        }

        return obs, reward, done, info

    def calculate_reward(self) -> float:
        """Fonction de récompense sophistiquée"""

        reward = 0.0

        # Récompense progression vers objectif
        distance_to_target = self.get_distance_to_target()
        prev_distance = getattr(self, 'prev_distance_to_target', distance_to_target)
        progress_reward = (prev_distance - distance_to_target) * 10
        reward += progress_reward

        # Pénalité collision
        if self.check_collision():
            reward -= 100

        # Pénalité sortie de zone
        if self.out_of_bounds():
            reward -= 50

        # Récompense efficacité énergétique
        energy_efficiency = 1.0 - (np.linalg.norm(self.last_action) / 4.0)
        reward += energy_efficiency * 2

        # Récompense stabilité
        stability = 1.0 - np.linalg.norm(self.angular_velocity) / 10.0
        reward += stability * 1

        # Bonus objectif atteint
        if distance_to_target < 1.0:
            reward += 200

        self.prev_distance_to_target = distance_to_target
        return reward

Agent PPO Optimisé

import torch
import torch.nn as nn
from stable_baselines3 import PPO
from stable_baselines3.common.callbacks import BaseCallback

class DroneNavigationAgent:
    def __init__(self):
        self.model = None
        self.training_stats = []

    def create_model(self, env, config: dict):
        """Création modèle PPO optimisé pour drones"""

        policy_kwargs = dict(
            activation_fn=torch.nn.ReLU,
            net_arch=[512, 512, 256],  # Architecture réseau
            log_std_init=-2,           # Exploration initiale
        )

        self.model = PPO(
            "MultiInputPolicy",
            env,
            policy_kwargs=policy_kwargs,
            learning_rate=3e-4,
            n_steps=2048,
            batch_size=64,
            n_epochs=10,
            gamma=0.99,
            gae_lambda=0.95,
            clip_range=0.2,
            ent_coef=0.01,
            verbose=1,
            tensorboard_log="./logs/",
            device="cuda" if torch.cuda.is_available() else "cpu"
        )

    def train(self, total_timesteps: int, callback=None):
        """Entraînement avec monitoring"""

        self.model.learn(
            total_timesteps=total_timesteps,
            callback=callback,
            progress_bar=True
        )

    def evaluate_performance(self, env, n_episodes: int = 10) -> dict:
        """Évaluation performance sur épisodes de test"""

        results = {
            'success_rate': 0,
            'avg_reward': 0,
            'avg_episode_length': 0,
            'collision_rate': 0
        }

        successes = 0
        total_reward = 0
        total_steps = 0
        collisions = 0

        for episode in range(n_episodes):
            obs = env.reset()
            episode_reward = 0
            episode_steps = 0
            done = False

            while not done:
                action, _ = self.model.predict(obs, deterministic=True)
                obs, reward, done, info = env.step(action)

                episode_reward += reward
                episode_steps += 1

                if info.get('collision', False):
                    collisions += 1

            if info.get('distance_to_target', float('inf')) < 1.0:
                successes += 1

            total_reward += episode_reward
            total_steps += episode_steps

        results['success_rate'] = successes / n_episodes
        results['avg_reward'] = total_reward / n_episodes
        results['avg_episode_length'] = total_steps / n_episodes
        results['collision_rate'] = collisions / (total_steps)

        return results

Optimisation Embarquée

Quantification de Modèles

import tensorflow as tf
import numpy as np

class ModelOptimizer:
    """Optimisation modèles pour contraintes embarquées"""

    @staticmethod
    def quantize_model(model_path: str, 
                      output_path: str,
                      optimization_type: str = "dynamic") -> dict:
        """Quantification INT8 pour Edge AI"""

        # Chargement modèle original
        model = tf.keras.models.load_model(model_path)

        # Configuration quantification
        converter = tf.lite.TFLiteConverter.from_keras_model(model)

        if optimization_type == "dynamic":
            converter.optimizations = [tf.lite.Optimize.DEFAULT]
        elif optimization_type == "int8":
            converter.optimizations = [tf.lite.Optimize.DEFAULT]
            converter.target_spec.supported_types = [tf.int8]
        elif optimization_type == "float16":
            converter.optimizations = [tf.lite.Optimize.DEFAULT]
            converter.target_spec.supported_types = [tf.float16]

        # Génération modèle optimisé
        tflite_model = converter.convert()

        # Sauvegarde
        with open(output_path, 'wb') as f:
            f.write(tflite_model)

        # Métriques d'optimisation
        original_size = os.path.getsize(model_path)
        optimized_size = len(tflite_model)
        compression_ratio = original_size / optimized_size

        return {
            'original_size_mb': original_size / (1024 * 1024),
            'optimized_size_mb': optimized_size / (1024 * 1024),
            'compression_ratio': compression_ratio,
            'optimization_type': optimization_type
        }

    @staticmethod
    def benchmark_model(model_path: str, num_runs: int = 100) -> dict:
        """Benchmark performance modèle optimisé"""

        # Chargement modèle
        interpreter = tf.lite.Interpreter(model_path=model_path)
        interpreter.allocate_tensors()

        input_details = interpreter.get_input_details()
        output_details = interpreter.get_output_details()

        # Génération données test
        input_shape = input_details[0]['shape'][1:]
        test_input = np.random.random((1,) + input_shape).astype(np.float32)

        # Benchmark timing
        times = []
        for _ in range(num_runs):
            start_time = time.time()

            interpreter.set_tensor(input_details[0]['index'], test_input)
            interpreter.invoke()
            result = interpreter.get_tensor(output_details[0]['index'])

            end_time = time.time()
            times.append((end_time - start_time) * 1000)  # ms

        return {
            'avg_inference_time_ms': np.mean(times),
            'std_inference_time_ms': np.std(times),
            'min_inference_time_ms': np.min(times),
            'max_inference_time_ms': np.max(times),
            'throughput_fps': 1000 / np.mean(times)
        }

Projets Pratiques

Drone de Surveillance Autonome

  • Objectif : Patrouille automatique avec détection d'intrusion
  • IA : YOLOv8 optimisé + tracking multi-objets
  • Navigation : A* modifié + évitement dynamique
  • Communication : 4G/5G pour alertes temps réel

Inspecteur d'Infrastructure

  • Objectif : Inspection automatisée ponts/bâtiments
  • IA : Détection défauts structurels par computer vision
  • Précision : RTK GPS + SLAM visuel
  • Données : Modèles 3D automatiques + rapports

Essaim Coordonné

  • Objectif : Coordination multi-drones pour missions complexes
  • IA : Apprentissage par renforcement multi-agents
  • Communication : Mesh network + consensus blockchain
  • Applications : SAR, livraison, cartographie