Aller au contenu principal

ThreadPoolExecutor pour gérer les threads

Introduction

ThreadPoolExecutor est une abstraction de haut niveau qui simplifie grandement la gestion des threads. Au lieu de créer et gérer manuellement chaque thread, vous définissez un "pool" (bassin) de threads réutilisables qui exécutent vos tâches. Cette approche est plus efficace, plus robuste et plus facile à utiliser que la gestion manuelle des threads.

Avantages du ThreadPoolExecutor

  • Réutilisation : Les threads sont créés une fois et réutilisés pour plusieurs tâches
  • Gestion automatique : Plus besoin de gérer manuellement start() et join()
  • Interface future : Récupération facile des résultats avec les objets Future
  • Contrôle de la charge : Limitation automatique du nombre de threads simultanés
  • Exception handling : Gestion simplifiée des erreurs dans les threads

Comparaison avec la gestion manuelle

Avant d'explorer ThreadPoolExecutor, voyons la différence avec la gestion manuelle de threads :

import threading
import time
import concurrent.futures

def tache_simple(nom, duree):
"""Tâche qui simule du travail et retourne un résultat"""
print(f"Début de {nom}")
time.sleep(duree)
resultat = f"Résultat de {nom}"
print(f"Fin de {nom}")
return resultat

# ❌ Méthode manuelle (complexe et verbeuse)
print("=== Gestion manuelle des threads ===")
threads = []
resultats = []

def wrapper(nom, duree, resultats_list):
"""Wrapper nécessaire pour capturer les résultats"""
resultat = tache_simple(nom, duree)
resultats_list.append(resultat)

# Créer et démarrer manuellement
for i in range(3):
t = threading.Thread(target=wrapper, args=(f"Tâche-{i+1}", 1, resultats))
threads.append(t)
t.start()

# Attendre manuellement
for t in threads:
t.join()

print(f"Résultats manuels: {resultats}")

# ✅ Méthode ThreadPoolExecutor (simple et élégante)
print("\n=== Avec ThreadPoolExecutor ===")
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
# Soumettre toutes les tâches
futures = [executor.submit(tache_simple, f"Tâche-{i+1}", 1) for i in range(3)]

# Récupérer les résultats
resultats_pool = [future.result() for future in futures]

print(f"Résultats pool: {resultats_pool}")

Méthodes principales

Le ThreadPoolExecutor offre plusieurs méthodes pour soumettre et gérer les tâches.

Submit() - Soumettre une tâche

La méthode submit() soumet une tâche au pool et retourne immédiatement un objet Future qui représente l'exécution en cours.

import concurrent.futures
import time
import random

def calculer_carre(nombre):
"""Calcule le carré d'un nombre avec un délai simulé"""
duree = random.uniform(0.5, 2.0) # Délai aléatoire pour simuler des tâches variables
print(f"Calcul de {nombre}² commencé (durée: {duree:.1f}s)")
time.sleep(duree)
resultat = nombre ** 2
print(f"Calcul de {nombre}² terminé = {resultat}")
return resultat

# Utilisation de submit()
print("=== Utilisation de submit() ===")
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
# Soumettre plusieurs tâches et récupérer les Future
future1 = executor.submit(calculer_carre, 3)
future2 = executor.submit(calculer_carre, 5)
future3 = executor.submit(calculer_carre, 7)

print("Toutes les tâches soumises, en attente des résultats...")

# Récupérer les résultats (bloque jusqu'à ce que la tâche soit terminée)
print(f"3² = {future1.result()}") # Bloque jusqu'au résultat
print(f"5² = {future2.result()}") # Bloque jusqu'au résultat
print(f"7² = {future3.result()}") # Bloque jusqu'au résultat

print("Tous les calculs terminés")

Map() - Appliquer une fonction à une séquence

La méthode map() est similaire à la fonction map() intégrée de Python, mais exécute la fonction en parallèle sur tous les éléments.

import concurrent.futures
import time

def traiter_element(element):
"""Traite un élément avec simulation d'une opération coûteuse

Cela pourrait représenter :
- Téléchargement d'une URL
- Traitement d'un fichier
- Requête vers une base de données
"""
print(f"Traitement de {element}")
time.sleep(1) # Simule une opération qui prend du temps
return element.upper()

# Données à traiter
elements = ["pomme", "banane", "orange", "kiwi", "mangue"]

# ❌ Traitement séquentiel
print("=== Traitement séquentiel ===")
start = time.time()
resultats_sequentiel = [traiter_element(elem) for elem in elements]
duree_sequentiel = time.time() - start
print(f"Résultats: {resultats_sequentiel}")
print(f"Durée: {duree_sequentiel:.2f}s")

# ✅ Traitement parallèle avec map()
print("\n=== Traitement parallèle avec map() ===")
start = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
resultats_parallele = list(executor.map(traiter_element, elements))
duree_parallele = time.time() - start
print(f"Résultats: {resultats_parallele}")
print(f"Durée: {duree_parallele:.2f}s")
print(f"Accélération: {duree_sequentiel/duree_parallele:.1f}x")

As_completed() - Traiter les résultats dès leur disponibilité

Parfois, vous voulez traiter les résultats dès qu'ils sont disponibles, sans attendre que toutes les tâches se terminent. as_completed() est parfait pour cela.

import concurrent.futures
import time
import random

def tache_duree_variable(nom):
"""Tâche avec une durée d'exécution variable"""
duree = random.uniform(1, 4) # Entre 1 et 4 secondes
print(f"[{nom}] Démarrage (durée prévue: {duree:.1f}s)")
time.sleep(duree)
print(f"[{nom}] Terminé!")
return f"Résultat de {nom}"

print("=== Traitement avec as_completed() ===")
print("Les résultats s'affichent dès qu'ils sont prêts:")

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
# Soumettre plusieurs tâches avec des durées variables
futures = {
executor.submit(tache_duree_variable, f"Tâche-{i+1}"): f"Tâche-{i+1}"
for i in range(5)
}

# Traiter les résultats dans l'ordre de completion (pas de soumission)
for future in concurrent.futures.as_completed(futures):
nom_tache = futures[future]
try:
resultat = future.result()
print(f"✅ {nom_tache} terminée: {resultat}")
except Exception as e:
print(f"❌ {nom_tache} a échoué: {e}")

print("Toutes les tâches traitées")

Gestion des erreurs et exceptions

Une grande force de ThreadPoolExecutor est sa gestion simplifiée des exceptions qui peuvent survenir dans les threads. Contrairement aux threads classiques où les exceptions sont "perdues", le ThreadPoolExecutor capture automatiquement toutes les exceptions et les rend disponibles via l'objet Future.

Mécanisme de propagation des exceptions

Le point clé à comprendre est que c'est l'appel à future.result() qui lève l'exception, pas le thread lui-même. Voici ce qui se passe :

  1. Dans le thread : L'exception est levée et capturée automatiquement
  2. Stockage : L'exception est stockée dans l'objet Future
  3. Propagation : Quand result() est appelé, l'exception est re-levée dans le thread principal
  4. Gestion : Vous pouvez alors la capturer avec un try/except classique
import concurrent.futures
import time
import random

def tache_potentiellement_faillible(numero):
"""Tâche qui peut échouer de manière aléatoire"""
print(f"[Tâche {numero}] Démarrage dans le thread")
time.sleep(0.5) # Simule du travail

if random.random() < 0.3: # 30% de chance d'échouer
print(f"[Tâche {numero}] ERREUR dans le thread!")
raise ValueError(f"Erreur simulée dans la tâche {numero}")

print(f"[Tâche {numero}] Succès dans le thread")
return f"Succès tâche {numero}"

print("=== Mécanisme de gestion des erreurs ===")

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
# Soumettre plusieurs tâches
futures = [executor.submit(tache_potentiellement_faillible, i) for i in range(6)]

print("Toutes les tâches soumises, récupération des résultats...")

# Traiter les résultats avec gestion d'erreur
for i, future in enumerate(futures):
try:
# ⚠️ C'EST ICI QUE L'EXCEPTION EST LEVÉE (pas dans le thread)
# Si une exception s'est produite dans le thread, result() la re-lève
resultat = future.result(timeout=5) # Timeout de 5 secondes
print(f"✅ Tâche {i}: {resultat}")

except ValueError as e:
# Exception spécifique levée par notre fonction
print(f"❌ Tâche {i} a échoué: {e}")

except concurrent.futures.TimeoutError:
# Exception levée si result() dépasse le timeout
print(f"⏱️ Tâche {i} timeout")

except Exception as e:
# Toute autre exception inattendue
print(f"💥 Tâche {i} erreur inattendue: {e}")

print("Traitement terminé malgré les erreurs")

Vérification de l'état sans lever d'exception

Vous pouvez également vérifier l'état d'un Future sans risquer de lever une exception :

import concurrent.futures
import time

def tache_avec_erreur():
time.sleep(1)
raise RuntimeError("Quelque chose a mal tourné")

def tache_normale():
time.sleep(1)
return "Tout va bien"

print("=== Vérification d'état des Future ===")

with concurrent.futures.ThreadPoolExecutor() as executor:
future_erreur = executor.submit(tache_avec_erreur)
future_normal = executor.submit(tache_normale)

# Attendre que les tâches se terminent
time.sleep(2)

# Vérifier l'état sans lever d'exception
print(f"Future normal terminé: {future_normal.done()}")
print(f"Future erreur terminé: {future_erreur.done()}")

# Pour le future normal
if future_normal.done():
try:
resultat = future_normal.result()
print(f"✅ Résultat normal: {resultat}")
except Exception as e:
print(f"❌ Erreur inattendue: {e}")

# Pour le future avec erreur
if future_erreur.done():
try:
resultat = future_erreur.result() # Cette ligne lèvera l'exception
print(f"✅ Résultat erreur: {resultat}")
except RuntimeError as e:
print(f"❌ Erreur attendue capturée: {e}")

Paramètres de configuration

max_workers - Contrôler le nombre de threads

Le paramètre max_workers détermine le nombre maximum de threads dans le pool.

Recommandations pour max_workers :

  • Tâches I/O bound : max_workers peut être plus élevé (par exemple, 2-4x le nombre de CPU)
  • Tâches CPU bound : max_workers devrait égaler le nombre de CPU (mais mieux vaut utiliser ProcessPoolExecutor)

Bonnes pratiques

1. Utiliser le context manager (with)

Toujours utiliser with pour s'assurer que le pool est correctement fermé :

# ✅ Recommandé
with concurrent.futures.ThreadPoolExecutor() as executor:
# Votre code ici
pass
# Le pool est automatiquement fermé ici

# ❌ À éviter
executor = concurrent.futures.ThreadPoolExecutor()
# Code...
executor.shutdown() # Facile d'oublier!

2. Choisir le bon type de pool

  • ThreadPoolExecutor : Pour les tâches I/O bound (réseau, fichiers, base de données)
  • ProcessPoolExecutor : Pour les tâches CPU bound (calculs intensifs)

3. Gérer les timeouts

with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(tache_longue)
try:
resultat = future.result(timeout=30) # Timeout de 30 secondes
except concurrent.futures.TimeoutError:
print("Tâche trop longue, annulation...")
# Note: La tâche continue en arrière-plan mais le résultat est ignoré

4. Limiter la taille des données

Ne pas passer d'énormes structures de données aux threads. Préférez passer des identifiants ou des chemins et laisser chaque thread charger ses propres données.

Le ThreadPoolExecutor est un outil puissant qui simplifie considérablement la programmation concurrente en Python. Il gère automatiquement la création, la réutilisation et la destruction des threads, tout en offrant une interface intuitive pour soumettre des tâches et récupérer leurs résultats.