ThreadPoolExecutor pour gérer les threads
Introduction
ThreadPoolExecutor est une abstraction de haut niveau qui simplifie grandement
la gestion des threads. Au lieu de créer et gérer manuellement chaque thread,
vous définissez un "pool" (bassin) de threads réutilisables qui exécutent vos
tâches. Cette approche est plus efficace, plus robuste et plus facile à utiliser
que la gestion manuelle des threads.
Avantages du ThreadPoolExecutor
- Réutilisation : Les threads sont créés une fois et réutilisés pour plusieurs tâches
- Gestion automatique : Plus besoin de gérer manuellement start()etjoin()
- Interface future : Récupération facile des résultats avec les objets Future
- Contrôle de la charge : Limitation automatique du nombre de threads simultanés
- Exception handling : Gestion simplifiée des erreurs dans les threads
Comparaison avec la gestion manuelle
Avant d'explorer ThreadPoolExecutor, voyons la différence avec la gestion
manuelle de threads :
import threading
import time
import concurrent.futures
def tache_simple(nom, duree):
    """Tâche qui simule du travail et retourne un résultat"""
    print(f"Début de {nom}")
    time.sleep(duree)
    resultat = f"Résultat de {nom}"
    print(f"Fin de {nom}")
    return resultat
# ❌ Méthode manuelle (complexe et verbeuse)
print("=== Gestion manuelle des threads ===")
threads = []
resultats = []
def wrapper(nom, duree, resultats_list):
    """Wrapper nécessaire pour capturer les résultats"""
    resultat = tache_simple(nom, duree)
    resultats_list.append(resultat)
# Créer et démarrer manuellement
for i in range(3):
    t = threading.Thread(target=wrapper, args=(f"Tâche-{i+1}", 1, resultats))
    threads.append(t)
    t.start()
# Attendre manuellement
for t in threads:
    t.join()
print(f"Résultats manuels: {resultats}")
# ✅ Méthode ThreadPoolExecutor (simple et élégante)
print("\n=== Avec ThreadPoolExecutor ===")
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    # Soumettre toutes les tâches
    futures = [executor.submit(tache_simple, f"Tâche-{i+1}", 1) for i in range(3)]
    
    # Récupérer les résultats
    resultats_pool = [future.result() for future in futures]
print(f"Résultats pool: {resultats_pool}")
Méthodes principales
Le ThreadPoolExecutor offre plusieurs méthodes pour soumettre et gérer les tâches.
Submit() - Soumettre une tâche
La méthode submit() soumet une tâche au pool et retourne immédiatement un
objet Future qui représente l'exécution en cours.
import concurrent.futures
import time
import random
def calculer_carre(nombre):
    """Calcule le carré d'un nombre avec un délai simulé"""
    duree = random.uniform(0.5, 2.0)  # Délai aléatoire pour simuler des tâches variables
    print(f"Calcul de {nombre}² commencé (durée: {duree:.1f}s)")
    time.sleep(duree)
    resultat = nombre ** 2
    print(f"Calcul de {nombre}² terminé = {resultat}")
    return resultat
# Utilisation de submit()
print("=== Utilisation de submit() ===")
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
    # Soumettre plusieurs tâches et récupérer les Future
    future1 = executor.submit(calculer_carre, 3)
    future2 = executor.submit(calculer_carre, 5)
    future3 = executor.submit(calculer_carre, 7)
    
    print("Toutes les tâches soumises, en attente des résultats...")
    
    # Récupérer les résultats (bloque jusqu'à ce que la tâche soit terminée)
    print(f"3² = {future1.result()}")  # Bloque jusqu'au résultat
    print(f"5² = {future2.result()}")  # Bloque jusqu'au résultat  
    print(f"7² = {future3.result()}")  # Bloque jusqu'au résultat
print("Tous les calculs terminés")
Map() - Appliquer une fonction à une séquence
La méthode map() est similaire à la fonction map() intégrée de Python, mais
exécute la fonction en parallèle sur tous les éléments.
import concurrent.futures
import time
def traiter_element(element):
    """Traite un élément avec simulation d'une opération coûteuse
    
    Cela pourrait représenter :
    - Téléchargement d'une URL
    - Traitement d'un fichier
    - Requête vers une base de données
    """
    print(f"Traitement de {element}")
    time.sleep(1)  # Simule une opération qui prend du temps
    return element.upper()
# Données à traiter
elements = ["pomme", "banane", "orange", "kiwi", "mangue"]
# ❌ Traitement séquentiel
print("=== Traitement séquentiel ===")
start = time.time()
resultats_sequentiel = [traiter_element(elem) for elem in elements]
duree_sequentiel = time.time() - start
print(f"Résultats: {resultats_sequentiel}")
print(f"Durée: {duree_sequentiel:.2f}s")
# ✅ Traitement parallèle avec map()
print("\n=== Traitement parallèle avec map() ===")
start = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    resultats_parallele = list(executor.map(traiter_element, elements))
duree_parallele = time.time() - start
print(f"Résultats: {resultats_parallele}")
print(f"Durée: {duree_parallele:.2f}s")
print(f"Accélération: {duree_sequentiel/duree_parallele:.1f}x")
As_completed() - Traiter les résultats dès leur disponibilité
Parfois, vous voulez traiter les résultats dès qu'ils sont disponibles, sans
attendre que toutes les tâches se terminent. as_completed() est parfait pour
cela.
import concurrent.futures
import time
import random
def tache_duree_variable(nom):
    """Tâche avec une durée d'exécution variable"""
    duree = random.uniform(1, 4)  # Entre 1 et 4 secondes
    print(f"[{nom}] Démarrage (durée prévue: {duree:.1f}s)")
    time.sleep(duree)
    print(f"[{nom}] Terminé!")
    return f"Résultat de {nom}"
print("=== Traitement avec as_completed() ===")
print("Les résultats s'affichent dès qu'ils sont prêts:")
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    # Soumettre plusieurs tâches avec des durées variables
    futures = {
        executor.submit(tache_duree_variable, f"Tâche-{i+1}"): f"Tâche-{i+1}" 
        for i in range(5)
    }
    
    # Traiter les résultats dans l'ordre de completion (pas de soumission)
    for future in concurrent.futures.as_completed(futures):
        nom_tache = futures[future]
        try:
            resultat = future.result()
            print(f"✅ {nom_tache} terminée: {resultat}")
        except Exception as e:
            print(f"❌ {nom_tache} a échoué: {e}")
print("Toutes les tâches traitées")
Gestion des erreurs et exceptions
Une grande force de ThreadPoolExecutor est sa gestion simplifiée des
exceptions qui peuvent survenir dans les threads. Contrairement aux threads
classiques où les exceptions sont "perdues", le ThreadPoolExecutor capture
automatiquement toutes les exceptions et les rend disponibles via l'objet
Future.
Mécanisme de propagation des exceptions
Le point clé à comprendre est que c'est l'appel à future.result() qui lève
l'exception, pas le thread lui-même. Voici ce qui se passe :
- Dans le thread : L'exception est levée et capturée automatiquement
- Stockage : L'exception est stockée dans l'objet Future
- Propagation : Quand result()est appelé, l'exception est re-levée dans le thread principal
- Gestion : Vous pouvez alors la capturer avec un try/exceptclassique
import concurrent.futures
import time
import random
def tache_potentiellement_faillible(numero):
    """Tâche qui peut échouer de manière aléatoire"""
    print(f"[Tâche {numero}] Démarrage dans le thread")
    time.sleep(0.5)  # Simule du travail
    
    if random.random() < 0.3:  # 30% de chance d'échouer
        print(f"[Tâche {numero}] ERREUR dans le thread!")
        raise ValueError(f"Erreur simulée dans la tâche {numero}")
    
    print(f"[Tâche {numero}] Succès dans le thread")
    return f"Succès tâche {numero}"
print("=== Mécanisme de gestion des erreurs ===")
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    # Soumettre plusieurs tâches
    futures = [executor.submit(tache_potentiellement_faillible, i) for i in range(6)]
    
    print("Toutes les tâches soumises, récupération des résultats...")
    
    # Traiter les résultats avec gestion d'erreur
    for i, future in enumerate(futures):
        try:
            # ⚠️ C'EST ICI QUE L'EXCEPTION EST LEVÉE (pas dans le thread)
            # Si une exception s'est produite dans le thread, result() la re-lève
            resultat = future.result(timeout=5)  # Timeout de 5 secondes
            print(f"✅ Tâche {i}: {resultat}")
            
        except ValueError as e:
            # Exception spécifique levée par notre fonction
            print(f"❌ Tâche {i} a échoué: {e}")
            
        except concurrent.futures.TimeoutError:
            # Exception levée si result() dépasse le timeout
            print(f"⏱️ Tâche {i} timeout")
            
        except Exception as e:
            # Toute autre exception inattendue
            print(f"💥 Tâche {i} erreur inattendue: {e}")
print("Traitement terminé malgré les erreurs")
Vérification de l'état sans lever d'exception
Vous pouvez également vérifier l'état d'un Future sans risquer de lever une
exception :
import concurrent.futures
import time
def tache_avec_erreur():
    time.sleep(1)
    raise RuntimeError("Quelque chose a mal tourné")
def tache_normale():
    time.sleep(1)
    return "Tout va bien"
print("=== Vérification d'état des Future ===")
with concurrent.futures.ThreadPoolExecutor() as executor:
    future_erreur = executor.submit(tache_avec_erreur)
    future_normal = executor.submit(tache_normale)
    
    # Attendre que les tâches se terminent
    time.sleep(2)
    
    # Vérifier l'état sans lever d'exception
    print(f"Future normal terminé: {future_normal.done()}")
    print(f"Future erreur terminé: {future_erreur.done()}")
    
    # Pour le future normal
    if future_normal.done():
        try:
            resultat = future_normal.result()
            print(f"✅ Résultat normal: {resultat}")
        except Exception as e:
            print(f"❌ Erreur inattendue: {e}")
    
    # Pour le future avec erreur
    if future_erreur.done():
        try:
            resultat = future_erreur.result()  # Cette ligne lèvera l'exception
            print(f"✅ Résultat erreur: {resultat}")
        except RuntimeError as e:
            print(f"❌ Erreur attendue capturée: {e}")
Paramètres de configuration
max_workers - Contrôler le nombre de threads
Le paramètre max_workers détermine le nombre maximum de threads dans le pool.
Recommandations pour max_workers :
- Tâches I/O bound : max_workerspeut être plus élevé (par exemple, 2-4x le nombre de CPU)
- Tâches CPU bound : max_workersdevrait égaler le nombre de CPU (mais mieux vaut utiliserProcessPoolExecutor)
Bonnes pratiques
1. Utiliser le context manager (with)
Toujours utiliser with pour s'assurer que le pool est correctement fermé :
# ✅ Recommandé
with concurrent.futures.ThreadPoolExecutor() as executor:
    # Votre code ici
    pass
# Le pool est automatiquement fermé ici
# ❌ À éviter
executor = concurrent.futures.ThreadPoolExecutor()
# Code...
executor.shutdown()  # Facile d'oublier!
2. Choisir le bon type de pool
- ThreadPoolExecutor : Pour les tâches I/O bound (réseau, fichiers, base de données)
- ProcessPoolExecutor : Pour les tâches CPU bound (calculs intensifs)
3. Gérer les timeouts
with concurrent.futures.ThreadPoolExecutor() as executor:
    future = executor.submit(tache_longue)
    try:
        resultat = future.result(timeout=30)  # Timeout de 30 secondes
    except concurrent.futures.TimeoutError:
        print("Tâche trop longue, annulation...")
        # Note: La tâche continue en arrière-plan mais le résultat est ignoré
4. Limiter la taille des données
Ne pas passer d'énormes structures de données aux threads. Préférez passer des identifiants ou des chemins et laisser chaque thread charger ses propres données.
Le ThreadPoolExecutor est un outil puissant qui simplifie considérablement la
programmation concurrente en Python. Il gère automatiquement la création, la
réutilisation et la destruction des threads, tout en offrant une interface
intuitive pour soumettre des tâches et récupérer leurs résultats.