Conditions de course et solution avec Condition
L'objet Condition
est un mécanisme de synchronisation avancé qui permet aux
threads d'attendre qu'une condition spécifique soit remplie avant de continuer
leur exécution. Il s'agit d'un concept fondamental dans la programmation
concurrente, particulièrement utile pour implémenter des patterns complexes
comme le producteur-consommateur.
Qu'est-ce qu'une Condition ?
Une Condition
est essentiellement un verrou (lock) associé à une ou plusieurs
conditions logiques. Elle permet de :
- Attendre qu'une condition soit vraie avec
wait()
- Notifier un ou plusieurs threads en attente avec
notify()
ounotify_all()
- Protéger l'accès aux ressources partagées comme un verrou classique
Avantages par rapport aux verrous simples
- Attente intelligente : Au lieu de faire du polling (vérification continue), les threads peuvent se mettre en attente passive
- Économie de ressources : Pas de consommation CPU inutile pendant l'attente
- Synchronisation fine : Permet de coordonner précisément les actions entre threads
- Évite les blocages : Réduit les risques de deadlock grâce à la notification explicite
Comprendre l'objet Condition en détail
Création et utilisation de base
Un objet Condition
s'utilise toujours avec le mot-clé with
pour garantir une gestion correcte du verrou :
import threading
# Création d'un objet Condition
condition = threading.Condition()
# Utilisation correcte avec 'with'
with condition:
# Zone critique protégée par le verrou
# Ici, on peut utiliser wait(), notify(), notify_all()
pass
Les trois opérations principales
wait()
- Attendre passivement. La méthodewait()
met le thread courant en attente jusqu'à ce qu'un autre thread le réveille.notify()
- Réveiller un thread. La méthodenotify()
réveille un seul thread en attente.notify_all()
- Réveiller tous les threads. La méthodenotify_all()
réveille tous les threads en attente.
Voici un exemple simple avec deux threads qui attendent qu'un compteur atteigne une certaine valeur :
import threading
import time
compteur = 0
condition = threading.Condition()
def attendre_compteur_atteint(valeur_cible):
"""Thread qui attend que le compteur atteigne une valeur"""
with condition:
while compteur < valeur_cible:
print(f"⏳ Attente... compteur={compteur}, cible={valeur_cible}")
condition.wait() # ← Le thread se met en PAUSE ici
print(f"✅ Objectif atteint ! compteur={compteur}")
def incrementer_compteur():
"""Thread qui incrémente le compteur"""
global compteur
for i in range(10):
time.sleep(0.5) # Simule du travail
with condition:
compteur += 1
print(f"📈 Compteur incrémenté: {compteur}")
condition.notify_all() # ← Réveille tous les threads en attente
# Test
t1 = threading.Thread(target=attendre_compteur_atteint, args=(5,))
t2 = threading.Thread(target=attendre_compteur_atteint, args=(8,))
t3 = threading.Thread(target=incrementer_compteur)
t1.start()
t2.start()
t3.start()
t1.join()
t2.join()
t3.join()
Différence crucial : notify()
vs notify_all()
Méthode | Effet | Quand l'utiliser |
---|---|---|
notify() | Réveille 1 seul thread | Quand une seule ressource devient disponible |
notify_all() | Réveille tous les threads | Quand la condition peut concerner plusieurs threads |
Le pattern while
+ wait()
: Pourquoi c'est obligatoire
❌ INCORRECT - Ne jamais faire ça :
with condition:
if not condition_ok:
condition.wait() # Dangereux !
# faire quelque chose
✅ CORRECT - Toujours utiliser while
:
with condition:
while not condition_ok:
condition.wait() # Sûr !
# faire quelque chose
Pourquoi while
? À cause des "spurious wakeups" :
- Un thread peut se réveiller sans qu'aucun
notify()
n'ait été appelé, car le système d'exploitation peut réveiller le thread pour d'autres raisons - La boucle
while
vérifie à nouveau la condition après le réveil
Pattern Producteur-Consommateur
Le pattern producteur-consommateur est un des plus utilisés en programmation concurrente. Il permet de découpler la production et la consommation de données grâce à un buffer intermédiaire.
import threading
import time
import random
from collections import deque
class ProducteurConsommateur:
"""Pattern Producteur-Consommateur avec Condition"""
def __init__(self, taille_max_buffer=5):
self.buffer = deque()
self.taille_max = taille_max_buffer
self.condition = threading.Condition()
self.production_terminee = False
def produire(self, producteur_id, nb_items):
"""Produit des items"""
for i in range(nb_items):
item = f"Item-{producteur_id}-{i+1}"
with self.condition:
# Attendre que le buffer ne soit pas plein
while len(self.buffer) >= self.taille_max:
print(f"🔴 Producteur-{producteur_id}: Buffer plein, attente...")
self.condition.wait()
# Ajouter l'item
self.buffer.append(item)
print(f"📦 Producteur-{producteur_id}: Produit {item} (buffer: {len(self.buffer)})")
# Notifier les consommateurs
self.condition.notify_all()
# Simuler le temps de production
time.sleep(random.uniform(0.1, 0.5))
print(f"✅ Producteur-{producteur_id}: Production terminée")
def consommer(self, consommateur_id, nb_items):
"""Consomme des items"""
items_consommes = 0
while items_consommes < nb_items:
with self.condition:
# Attendre qu'il y ait des items ou que la production soit terminée
while len(self.buffer) == 0 and not self.production_terminee:
print(f"🔵 Consommateur-{consommateur_id}: Buffer vide, attente...")
self.condition.wait()
# Si le buffer est vide et la production terminée, arrêter
if len(self.buffer) == 0 and self.production_terminee:
break
# Consommer un item
if self.buffer:
item = self.buffer.popleft()
items_consommes += 1
print(f"🍽️ Consommateur-{consommateur_id}: Consomme {item} (buffer: {len(self.buffer)})")
# Notifier les producteurs
self.condition.notify_all()
# Simuler le temps de consommation
time.sleep(random.uniform(0.2, 0.8))
print(f"✅ Consommateur-{consommateur_id}: Consommation terminée ({items_consommes} items)")
def terminer_production(self):
"""Indique que la production est terminée"""
with self.condition:
self.production_terminee = True
self.condition.notify_all()
def test_producteur_consommateur():
"""Test du pattern producteur-consommateur"""
print("=== Test Producteur-Consommateur ===")
pc = ProducteurConsommateur(taille_max_buffer=3)
threads = []
# Créer les producteurs
for i in range(2):
t = threading.Thread(
target=pc.produire,
args=(i+1, 5),
name=f"Producteur-{i+1}"
)
threads.append(t)
t.start()
# Créer les consommateurs
for i in range(2):
t = threading.Thread(
target=pc.consommer,
args=(i+1, 6),
name=f"Consommateur-{i+1}"
)
threads.append(t)
t.start()
# Attendre que tous les producteurs terminent
for t in threads[:2]: # Les 2 premiers threads sont les producteurs
t.join()
# Signaler la fin de production
pc.terminer_production()
# Attendre les consommateurs
for t in threads[2:]:
t.join()
print("Test terminé")
test_producteur_consommateur()
Analyse détaillée du code
Structure de la classe ProducteurConsommateur
La classe encapsule tous les éléments nécessaires pour une synchronisation sûre :
def __init__(self, taille_max_buffer=5):
self.buffer = deque() # Buffer thread-safe
self.taille_max = taille_max_buffer
self.condition = threading.Condition() # Objet de synchronisation
self.production_terminee = False # Flag d'arrêt
deque
: Structure de données thread-safe pour le bufferCondition
: Objet central pour la synchronisationproduction_terminee
: Flag booléen pour signaler l'arrêt
Mécanisme d'attente et de notification
Dans le producteur :
with self.condition:
while len(self.buffer) >= self.taille_max:
self.condition.wait() # Attente passive
# Production
self.buffer.append(item)
self.condition.notify_all() # Réveil des consommateurs
Dans le consommateur :
with self.condition:
while len(self.buffer) == 0 and not self.production_terminee:
self.condition.wait() # Attente passive
# Consommation
item = self.buffer.popleft()
self.condition.notify_all() # Réveil des producteurs
Bonnes pratiques observées
- Utilisation de
with
: Garantit l'acquisition et la libération du verrou - Boucle
while
avecwait()
: Évite les réveils intempestifs (spurious wakeups) notify_all()
: Assure que tous les threads concernés sont notifiés- Condition d'arrêt claire : Le flag
production_terminee
permet un arrêt propre
Comparaison avec d'autres mécanismes
Mécanisme | Avantages | Inconvénients | Cas d'usage |
---|---|---|---|
Lock | Simple, rapide | Pas d'attente intelligente | Protection basique |
Semaphore | Compteur de ressources | Pas de condition complexe | Limitation de ressources |
Condition | Attente intelligente, notifications | Plus complexe | Synchronisation avancée |
Conclusion
L'objet Condition
est un outil puissant pour la synchronisation avancée entre
threads. Il permet d'implémenter des patterns complexes tout en évitant les
problèmes de performance liés au polling actif. Bien que plus complexe qu'un
simple verrou, il offre une flexibilité incomparable pour coordonner
l'exécution de threads selon des conditions métier spécifiques.