ThreadPoolExecutor pour gérer les threads
Introduction
ThreadPoolExecutor
est une abstraction de haut niveau qui simplifie grandement
la gestion des threads. Au lieu de créer et gérer manuellement chaque thread,
vous définissez un "pool" (bassin) de threads réutilisables qui exécutent vos
tâches. Cette approche est plus efficace, plus robuste et plus facile à utiliser
que la gestion manuelle des threads.
Avantages du ThreadPoolExecutor
- Réutilisation : Les threads sont créés une fois et réutilisés pour plusieurs tâches
- Gestion automatique : Plus besoin de gérer manuellement
start()
etjoin()
- Interface future : Récupération facile des résultats avec les objets Future
- Contrôle de la charge : Limitation automatique du nombre de threads simultanés
- Exception handling : Gestion simplifiée des erreurs dans les threads
Comparaison avec la gestion manuelle
Avant d'explorer ThreadPoolExecutor
, voyons la différence avec la gestion
manuelle de threads :
import threading
import time
import concurrent.futures
def tache_simple(nom, duree):
"""Tâche qui simule du travail et retourne un résultat"""
print(f"Début de {nom}")
time.sleep(duree)
resultat = f"Résultat de {nom}"
print(f"Fin de {nom}")
return resultat
# ❌ Méthode manuelle (complexe et verbeuse)
print("=== Gestion manuelle des threads ===")
threads = []
resultats = []
def wrapper(nom, duree, resultats_list):
"""Wrapper nécessaire pour capturer les résultats"""
resultat = tache_simple(nom, duree)
resultats_list.append(resultat)
# Créer et démarrer manuellement
for i in range(3):
t = threading.Thread(target=wrapper, args=(f"Tâche-{i+1}", 1, resultats))
threads.append(t)
t.start()
# Attendre manuellement
for t in threads:
t.join()
print(f"Résultats manuels: {resultats}")
# ✅ Méthode ThreadPoolExecutor (simple et élégante)
print("\n=== Avec ThreadPoolExecutor ===")
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
# Soumettre toutes les tâches
futures = [executor.submit(tache_simple, f"Tâche-{i+1}", 1) for i in range(3)]
# Récupérer les résultats
resultats_pool = [future.result() for future in futures]
print(f"Résultats pool: {resultats_pool}")
Méthodes principales
Le ThreadPoolExecutor
offre plusieurs méthodes pour soumettre et gérer les tâches.
Submit() - Soumettre une tâche
La méthode submit()
soumet une tâche au pool et retourne immédiatement un
objet Future
qui représente l'exécution en cours.
import concurrent.futures
import time
import random
def calculer_carre(nombre):
"""Calcule le carré d'un nombre avec un délai simulé"""
duree = random.uniform(0.5, 2.0) # Délai aléatoire pour simuler des tâches variables
print(f"Calcul de {nombre}² commencé (durée: {duree:.1f}s)")
time.sleep(duree)
resultat = nombre ** 2
print(f"Calcul de {nombre}² terminé = {resultat}")
return resultat
# Utilisation de submit()
print("=== Utilisation de submit() ===")
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
# Soumettre plusieurs tâches et récupérer les Future
future1 = executor.submit(calculer_carre, 3)
future2 = executor.submit(calculer_carre, 5)
future3 = executor.submit(calculer_carre, 7)
print("Toutes les tâches soumises, en attente des résultats...")
# Récupérer les résultats (bloque jusqu'à ce que la tâche soit terminée)
print(f"3² = {future1.result()}") # Bloque jusqu'au résultat
print(f"5² = {future2.result()}") # Bloque jusqu'au résultat
print(f"7² = {future3.result()}") # Bloque jusqu'au résultat
print("Tous les calculs terminés")
Map() - Appliquer une fonction à une séquence
La méthode map()
est similaire à la fonction map()
intégrée de Python, mais
exécute la fonction en parallèle sur tous les éléments.
import concurrent.futures
import time
def traiter_element(element):
"""Traite un élément avec simulation d'une opération coûteuse
Cela pourrait représenter :
- Téléchargement d'une URL
- Traitement d'un fichier
- Requête vers une base de données
"""
print(f"Traitement de {element}")
time.sleep(1) # Simule une opération qui prend du temps
return element.upper()
# Données à traiter
elements = ["pomme", "banane", "orange", "kiwi", "mangue"]
# ❌ Traitement séquentiel
print("=== Traitement séquentiel ===")
start = time.time()
resultats_sequentiel = [traiter_element(elem) for elem in elements]
duree_sequentiel = time.time() - start
print(f"Résultats: {resultats_sequentiel}")
print(f"Durée: {duree_sequentiel:.2f}s")
# ✅ Traitement parallèle avec map()
print("\n=== Traitement parallèle avec map() ===")
start = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
resultats_parallele = list(executor.map(traiter_element, elements))
duree_parallele = time.time() - start
print(f"Résultats: {resultats_parallele}")
print(f"Durée: {duree_parallele:.2f}s")
print(f"Accélération: {duree_sequentiel/duree_parallele:.1f}x")
As_completed() - Traiter les résultats dès leur disponibilité
Parfois, vous voulez traiter les résultats dès qu'ils sont disponibles, sans
attendre que toutes les tâches se terminent. as_completed()
est parfait pour
cela.
import concurrent.futures
import time
import random
def tache_duree_variable(nom):
"""Tâche avec une durée d'exécution variable"""
duree = random.uniform(1, 4) # Entre 1 et 4 secondes
print(f"[{nom}] Démarrage (durée prévue: {duree:.1f}s)")
time.sleep(duree)
print(f"[{nom}] Terminé!")
return f"Résultat de {nom}"
print("=== Traitement avec as_completed() ===")
print("Les résultats s'affichent dès qu'ils sont prêts:")
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
# Soumettre plusieurs tâches avec des durées variables
futures = {
executor.submit(tache_duree_variable, f"Tâche-{i+1}"): f"Tâche-{i+1}"
for i in range(5)
}
# Traiter les résultats dans l'ordre de completion (pas de soumission)
for future in concurrent.futures.as_completed(futures):
nom_tache = futures[future]
try:
resultat = future.result()
print(f"✅ {nom_tache} terminée: {resultat}")
except Exception as e:
print(f"❌ {nom_tache} a échoué: {e}")
print("Toutes les tâches traitées")
Gestion des erreurs et exceptions
Une grande force de ThreadPoolExecutor
est sa gestion simplifiée des
exceptions qui peuvent survenir dans les threads. Contrairement aux threads
classiques où les exceptions sont "perdues", le ThreadPoolExecutor
capture
automatiquement toutes les exceptions et les rend disponibles via l'objet
Future
.
Mécanisme de propagation des exceptions
Le point clé à comprendre est que c'est l'appel à future.result()
qui lève
l'exception, pas le thread lui-même. Voici ce qui se passe :
- Dans le thread : L'exception est levée et capturée automatiquement
- Stockage : L'exception est stockée dans l'objet
Future
- Propagation : Quand
result()
est appelé, l'exception est re-levée dans le thread principal - Gestion : Vous pouvez alors la capturer avec un
try/except
classique
import concurrent.futures
import time
import random
def tache_potentiellement_faillible(numero):
"""Tâche qui peut échouer de manière aléatoire"""
print(f"[Tâche {numero}] Démarrage dans le thread")
time.sleep(0.5) # Simule du travail
if random.random() < 0.3: # 30% de chance d'échouer
print(f"[Tâche {numero}] ERREUR dans le thread!")
raise ValueError(f"Erreur simulée dans la tâche {numero}")
print(f"[Tâche {numero}] Succès dans le thread")
return f"Succès tâche {numero}"
print("=== Mécanisme de gestion des erreurs ===")
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
# Soumettre plusieurs tâches
futures = [executor.submit(tache_potentiellement_faillible, i) for i in range(6)]
print("Toutes les tâches soumises, récupération des résultats...")
# Traiter les résultats avec gestion d'erreur
for i, future in enumerate(futures):
try:
# ⚠️ C'EST ICI QUE L'EXCEPTION EST LEVÉE (pas dans le thread)
# Si une exception s'est produite dans le thread, result() la re-lève
resultat = future.result(timeout=5) # Timeout de 5 secondes
print(f"✅ Tâche {i}: {resultat}")
except ValueError as e:
# Exception spécifique levée par notre fonction
print(f"❌ Tâche {i} a échoué: {e}")
except concurrent.futures.TimeoutError:
# Exception levée si result() dépasse le timeout
print(f"⏱️ Tâche {i} timeout")
except Exception as e:
# Toute autre exception inattendue
print(f"💥 Tâche {i} erreur inattendue: {e}")
print("Traitement terminé malgré les erreurs")
Vérification de l'état sans lever d'exception
Vous pouvez également vérifier l'état d'un Future
sans risquer de lever une
exception :
import concurrent.futures
import time
def tache_avec_erreur():
time.sleep(1)
raise RuntimeError("Quelque chose a mal tourné")
def tache_normale():
time.sleep(1)
return "Tout va bien"
print("=== Vérification d'état des Future ===")
with concurrent.futures.ThreadPoolExecutor() as executor:
future_erreur = executor.submit(tache_avec_erreur)
future_normal = executor.submit(tache_normale)
# Attendre que les tâches se terminent
time.sleep(2)
# Vérifier l'état sans lever d'exception
print(f"Future normal terminé: {future_normal.done()}")
print(f"Future erreur terminé: {future_erreur.done()}")
# Pour le future normal
if future_normal.done():
try:
resultat = future_normal.result()
print(f"✅ Résultat normal: {resultat}")
except Exception as e:
print(f"❌ Erreur inattendue: {e}")
# Pour le future avec erreur
if future_erreur.done():
try:
resultat = future_erreur.result() # Cette ligne lèvera l'exception
print(f"✅ Résultat erreur: {resultat}")
except RuntimeError as e:
print(f"❌ Erreur attendue capturée: {e}")
Paramètres de configuration
max_workers - Contrôler le nombre de threads
Le paramètre max_workers
détermine le nombre maximum de threads dans le pool.
Recommandations pour max_workers
:
- Tâches I/O bound :
max_workers
peut être plus élevé (par exemple, 2-4x le nombre de CPU) - Tâches CPU bound :
max_workers
devrait égaler le nombre de CPU (mais mieux vaut utiliserProcessPoolExecutor
)
Bonnes pratiques
1. Utiliser le context manager (with)
Toujours utiliser with
pour s'assurer que le pool est correctement fermé :
# ✅ Recommandé
with concurrent.futures.ThreadPoolExecutor() as executor:
# Votre code ici
pass
# Le pool est automatiquement fermé ici
# ❌ À éviter
executor = concurrent.futures.ThreadPoolExecutor()
# Code...
executor.shutdown() # Facile d'oublier!
2. Choisir le bon type de pool
- ThreadPoolExecutor : Pour les tâches I/O bound (réseau, fichiers, base de données)
- ProcessPoolExecutor : Pour les tâches CPU bound (calculs intensifs)
3. Gérer les timeouts
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(tache_longue)
try:
resultat = future.result(timeout=30) # Timeout de 30 secondes
except concurrent.futures.TimeoutError:
print("Tâche trop longue, annulation...")
# Note: La tâche continue en arrière-plan mais le résultat est ignoré
4. Limiter la taille des données
Ne pas passer d'énormes structures de données aux threads. Préférez passer des identifiants ou des chemins et laisser chaque thread charger ses propres données.
Le ThreadPoolExecutor
est un outil puissant qui simplifie considérablement la
programmation concurrente en Python. Il gère automatiquement la création, la
réutilisation et la destruction des threads, tout en offrant une interface
intuitive pour soumettre des tâches et récupérer leurs résultats.