RLock (Reentrant Lock)
Introduction
Le RLock (Reentrant Lock) est une version spéciale du Lock qui peut être
acquis plusieurs fois par le même thread. Cette caractéristique résout un
problème commun avec les Lock standards : l'interblocage (deadlock) quand un
thread essaie d'acquérir un verrou qu'il possède déjà.
Le problème avec Lock normal
Avec un Lock standard, si un thread essaie d'acquérir un verrou qu'il possède
déjà, il se bloque indéfiniment. Cela peut arriver facilement quand une méthode
verrouillée appelle une autre méthode verrouillée.
Exemple problématique avec Lock
import threading
import time
class CompteurAvecLock:
    """Exemple montrant le problème avec un Lock normal"""
    
    def __init__(self):
        self._valeur = 0
        self._lock = threading.Lock()  # Lock normal
    
    def incrementer(self):
        """Incrémente la valeur"""
        with self._lock:
            self._valeur += 1
            # ❌ PROBLÈME: Appel d'une méthode qui veut aussi le lock
            self._log_operation("increment")  # DEADLOCK!
            return self._valeur
    
    def _log_operation(self, operation):
        """Méthode qui veut aussi acquérir le lock"""
        with self._lock:  # ❌ DEADLOCK: le thread a déjà le lock!
            print(f"Thread {threading.current_thread().name}: {operation} -> {self._valeur}")
    
    def get_valeur(self):
        with self._lock:
            return self._valeur
# ❌ Ce code va se bloquer indéfiniment
def test_lock_probleme():
    """Démontre le problème avec Lock normal"""
    compteur = CompteurAvecLock()
    
    def worker():
        try:
            print("Avant incrementer...")
            compteur.incrementer()  # Se bloque ici!
            print("Après incrementer...")
        except Exception as e:
            print(f"Erreur: {e}")
    
    thread = threading.Thread(target=worker)
    thread.start()
    thread.join(timeout=2)  # Timeout de 2 secondes
    
    if thread.is_alive():
        print("❌ DEADLOCK détecté! Le thread est bloqué.")
    else:
        print("✅ Opération terminée normalement.")
# Décommentez pour tester (attention au blocage!)
# test_lock_probleme()
La solution : RLock
Le RLock résout ce problème en permettant au même thread d'acquérir le verrou
plusieurs fois. Il maintient un compteur interne du nombre d'acquisitions et
doit être libéré le même nombre de fois.
Caractéristiques du RLock
- Réentrant : Un thread peut acquérir le même RLock plusieurs fois
- Compteur interne : Garde trace du nombre d'acquisitions
- Libération équilibrée : Doit être libéré autant de fois qu'il a été acquis
- Thread-safe : Seul le thread propriétaire peut le libérer
import threading
import time
class CompteurAvecRLock:
    """Compteur utilisant un RLock pour permettre la réentrance"""
    
    def __init__(self):
        self._valeur = 0
        self._rlock = threading.RLock()  # Reentrant Lock
    
    def incrementer(self):
        """Incrémente avec possibilité de réentrance"""
        with self._rlock:
            self._valeur += 1
            # ✅ OK avec RLock : on peut appeler d'autres méthodes
            self._log_operation("increment")
            return self._valeur
    
    def _log_operation(self, operation):
        """Méthode interne qui utilise aussi le RLock"""
        with self._rlock:  # ✅ OK avec RLock, ❌ serait bloquant avec Lock normal
            print(f"Thread {threading.current_thread().name}: {operation} -> {self._valeur}")
    
    def incrementer_multiple(self, n):
        """Incrémente n fois (démontre la réentrance)"""
        with self._rlock:
            for i in range(n):
                self.incrementer()  # Réentrance: même thread, même RLock
    
    def decrementer(self):
        """Décrémente la valeur"""
        with self._rlock:
            self._valeur -= 1
            self._log_operation("decrement")
            return self._valeur
    
    def operation_complexe(self):
        """Opération qui combine plusieurs méthodes verrouillées"""
        with self._rlock:
            print(f"Début opération complexe (valeur: {self._valeur})")
            self.incrementer()  # 1ère réentrance
            self.incrementer()  # 2ème réentrance
            self.decrementer()  # 3ème réentrance
            print(f"Fin opération complexe (valeur: {self._valeur})")
    
    def get_valeur(self):
        with self._rlock:
            return self._valeur
def test_rlock():
    """Test du RLock avec réentrance"""
    compteur = CompteurAvecRLock()
    
    def worker(nom, increments):
        for i in range(increments):
            if i % 3 == 0:
                compteur.incrementer()
            elif i % 3 == 1:
                compteur.incrementer_multiple(2)  # Réentrance
            else:
                compteur.operation_complexe()  # Réentrance multiple
    
    threads = []
    for i in range(2):
        t = threading.Thread(target=worker, args=(f"Worker-{i+1}", 3))
        threads.append(t)
        t.start()
    
    for t in threads:
        t.join()
    
    print(f"Valeur finale avec RLock: {compteur.get_valeur()}")
test_rlock()
Comparaison Lock vs RLock
| Caractéristique | Lock | RLock | 
|---|---|---|
| Réentrance | ❌ Non | ✅ Oui | 
| Performance | ✅ Plus rapide | ⚠️ Légèrement plus lent | 
| Mémoire | ✅ Moins d'overhead | ⚠️ Plus d'overhead | 
| Risque de deadlock | ⚠️ Élevé si mal utilisé | ✅ Réduit pour un même thread | 
Quand utiliser RLock ?
✅ Utilisez RLock quand :
- Des méthodes synchronisées appellent d'autres méthodes synchronisées
- Vous avez une hiérarchie complexe d'appels de méthodes
- Vous voulez éviter les deadlocks liés à la réentrance