Aller au contenu principal

Conditions de course et solution avec Condition

L'objet Condition est un mécanisme de synchronisation avancé qui permet aux threads d'attendre qu'une condition spécifique soit remplie avant de continuer leur exécution. Il s'agit d'un concept fondamental dans la programmation concurrente, particulièrement utile pour implémenter des patterns complexes comme le producteur-consommateur.

Qu'est-ce qu'une Condition ?

Une Condition est essentiellement un verrou (lock) associé à une ou plusieurs conditions logiques. Elle permet de :

  • Attendre qu'une condition soit vraie avec wait()
  • Notifier un ou plusieurs threads en attente avec notify() ou notify_all()
  • Protéger l'accès aux ressources partagées comme un verrou classique

Avantages par rapport aux verrous simples

  1. Attente intelligente : Au lieu de faire du polling (vérification continue), les threads peuvent se mettre en attente passive
  2. Économie de ressources : Pas de consommation CPU inutile pendant l'attente
  3. Synchronisation fine : Permet de coordonner précisément les actions entre threads
  4. Évite les blocages : Réduit les risques de deadlock grâce à la notification explicite

Comprendre l'objet Condition en détail

Création et utilisation de base

Un objet Condition s'utilise toujours avec le mot-clé with pour garantir une gestion correcte du verrou :

import threading

# Création d'un objet Condition
condition = threading.Condition()

# Utilisation correcte avec 'with'
with condition:
# Zone critique protégée par le verrou
# Ici, on peut utiliser wait(), notify(), notify_all()
pass

Les trois opérations principales

  • wait() - Attendre passivement. La méthode wait() met le thread courant en attente jusqu'à ce qu'un autre thread le réveille.
  • notify() - Réveiller un thread. La méthode notify() réveille un seul thread en attente.
  • notify_all() - Réveiller tous les threads. La méthode notify_all() réveille tous les threads en attente.

Voici un exemple simple avec deux threads qui attendent qu'un compteur atteigne une certaine valeur :

import threading
import time

compteur = 0
condition = threading.Condition()

def attendre_compteur_atteint(valeur_cible):
"""Thread qui attend que le compteur atteigne une valeur"""
with condition:
while compteur < valeur_cible:
print(f"⏳ Attente... compteur={compteur}, cible={valeur_cible}")
condition.wait() # ← Le thread se met en PAUSE ici
print(f"✅ Objectif atteint ! compteur={compteur}")

def incrementer_compteur():
"""Thread qui incrémente le compteur"""
global compteur
for i in range(10):
time.sleep(0.5) # Simule du travail

with condition:
compteur += 1
print(f"📈 Compteur incrémenté: {compteur}")
condition.notify_all() # ← Réveille tous les threads en attente

# Test
t1 = threading.Thread(target=attendre_compteur_atteint, args=(5,))
t2 = threading.Thread(target=attendre_compteur_atteint, args=(8,))
t3 = threading.Thread(target=incrementer_compteur)

t1.start()
t2.start()
t3.start()

t1.join()
t2.join()
t3.join()

Différence crucial : notify() vs notify_all()

MéthodeEffetQuand l'utiliser
notify()Réveille 1 seul threadQuand une seule ressource devient disponible
notify_all()Réveille tous les threadsQuand la condition peut concerner plusieurs threads

Le pattern while + wait() : Pourquoi c'est obligatoire

INCORRECT - Ne jamais faire ça :

with condition:
if not condition_ok:
condition.wait() # Dangereux !
# faire quelque chose

CORRECT - Toujours utiliser while :

with condition:
while not condition_ok:
condition.wait() # Sûr !
# faire quelque chose

Pourquoi while ? À cause des "spurious wakeups" :

  • Un thread peut se réveiller sans qu'aucun notify() n'ait été appelé, car le système d'exploitation peut réveiller le thread pour d'autres raisons
  • La boucle while vérifie à nouveau la condition après le réveil

Pattern Producteur-Consommateur

Le pattern producteur-consommateur est un des plus utilisés en programmation concurrente. Il permet de découpler la production et la consommation de données grâce à un buffer intermédiaire.

import threading
import time
import random
from collections import deque

class ProducteurConsommateur:
"""Pattern Producteur-Consommateur avec Condition"""

def __init__(self, taille_max_buffer=5):
self.buffer = deque()
self.taille_max = taille_max_buffer
self.condition = threading.Condition()
self.production_terminee = False

def produire(self, producteur_id, nb_items):
"""Produit des items"""
for i in range(nb_items):
item = f"Item-{producteur_id}-{i+1}"

with self.condition:
# Attendre que le buffer ne soit pas plein
while len(self.buffer) >= self.taille_max:
print(f"🔴 Producteur-{producteur_id}: Buffer plein, attente...")
self.condition.wait()

# Ajouter l'item
self.buffer.append(item)
print(f"📦 Producteur-{producteur_id}: Produit {item} (buffer: {len(self.buffer)})")

# Notifier les consommateurs
self.condition.notify_all()

# Simuler le temps de production
time.sleep(random.uniform(0.1, 0.5))

print(f"✅ Producteur-{producteur_id}: Production terminée")

def consommer(self, consommateur_id, nb_items):
"""Consomme des items"""
items_consommes = 0

while items_consommes < nb_items:
with self.condition:
# Attendre qu'il y ait des items ou que la production soit terminée
while len(self.buffer) == 0 and not self.production_terminee:
print(f"🔵 Consommateur-{consommateur_id}: Buffer vide, attente...")
self.condition.wait()

# Si le buffer est vide et la production terminée, arrêter
if len(self.buffer) == 0 and self.production_terminee:
break

# Consommer un item
if self.buffer:
item = self.buffer.popleft()
items_consommes += 1
print(f"🍽️ Consommateur-{consommateur_id}: Consomme {item} (buffer: {len(self.buffer)})")

# Notifier les producteurs
self.condition.notify_all()

# Simuler le temps de consommation
time.sleep(random.uniform(0.2, 0.8))

print(f"✅ Consommateur-{consommateur_id}: Consommation terminée ({items_consommes} items)")

def terminer_production(self):
"""Indique que la production est terminée"""
with self.condition:
self.production_terminee = True
self.condition.notify_all()

def test_producteur_consommateur():
"""Test du pattern producteur-consommateur"""
print("=== Test Producteur-Consommateur ===")

pc = ProducteurConsommateur(taille_max_buffer=3)
threads = []

# Créer les producteurs
for i in range(2):
t = threading.Thread(
target=pc.produire,
args=(i+1, 5),
name=f"Producteur-{i+1}"
)
threads.append(t)
t.start()

# Créer les consommateurs
for i in range(2):
t = threading.Thread(
target=pc.consommer,
args=(i+1, 6),
name=f"Consommateur-{i+1}"
)
threads.append(t)
t.start()

# Attendre que tous les producteurs terminent
for t in threads[:2]: # Les 2 premiers threads sont les producteurs
t.join()

# Signaler la fin de production
pc.terminer_production()

# Attendre les consommateurs
for t in threads[2:]:
t.join()

print("Test terminé")

test_producteur_consommateur()

Analyse détaillée du code

Structure de la classe ProducteurConsommateur

La classe encapsule tous les éléments nécessaires pour une synchronisation sûre :

def __init__(self, taille_max_buffer=5):
self.buffer = deque() # Buffer thread-safe
self.taille_max = taille_max_buffer
self.condition = threading.Condition() # Objet de synchronisation
self.production_terminee = False # Flag d'arrêt
  • deque : Structure de données thread-safe pour le buffer
  • Condition : Objet central pour la synchronisation
  • production_terminee : Flag booléen pour signaler l'arrêt

Mécanisme d'attente et de notification

Dans le producteur :

with self.condition:
while len(self.buffer) >= self.taille_max:
self.condition.wait() # Attente passive

# Production
self.buffer.append(item)
self.condition.notify_all() # Réveil des consommateurs

Dans le consommateur :

with self.condition:
while len(self.buffer) == 0 and not self.production_terminee:
self.condition.wait() # Attente passive

# Consommation
item = self.buffer.popleft()
self.condition.notify_all() # Réveil des producteurs

Bonnes pratiques observées

  1. Utilisation de with : Garantit l'acquisition et la libération du verrou
  2. Boucle while avec wait() : Évite les réveils intempestifs (spurious wakeups)
  3. notify_all() : Assure que tous les threads concernés sont notifiés
  4. Condition d'arrêt claire : Le flag production_terminee permet un arrêt propre

Comparaison avec d'autres mécanismes

MécanismeAvantagesInconvénientsCas d'usage
LockSimple, rapidePas d'attente intelligenteProtection basique
SemaphoreCompteur de ressourcesPas de condition complexeLimitation de ressources
ConditionAttente intelligente, notificationsPlus complexeSynchronisation avancée

Conclusion

L'objet Condition est un outil puissant pour la synchronisation avancée entre threads. Il permet d'implémenter des patterns complexes tout en évitant les problèmes de performance liés au polling actif. Bien que plus complexe qu'un simple verrou, il offre une flexibilité incomparable pour coordonner l'exécution de threads selon des conditions métier spécifiques.