L'Event Loop
L'event loop est le coeur d'asyncio. Elle :
- Planifie les coroutines et tasks prêtes
- Gère les I/O non bloquantes via le système (epoll, kqueue, IOCP...)
- Réveille les tasks quand leurs opérations sont terminées
- Exécute des callbacks (timers, futures, signaux)
Cycle simplifié
- Sélectionne les tasks prêtes
- Exécute chacune jusqu'au prochain
await - Attend les nouveaux événements I/O / timers
- Retour à 1
Aucune task ne monopolise la loop si elle await régulièrement.
Observation basique
Cet exemple illustre la planification immédiate des coroutines via asyncio.create_task. Chaque tâche démarre et s'exécute jusqu'au premier point d'attente (await), ici asyncio.sleep(1), qui rend la main à la loop. Les deux sleep se chevauchent: on observe des "start" quasi simultanés puis des "end" environ 1 seconde plus tard. Les await t1 puis await t2 attendent la fin des tâches, sans empêcher leur exécution concurrente pendant leur sommeil asynchrone.
import asyncio, time
async def travail(n):
print(f"[{time.strftime('%X')}] start {n}")
await asyncio.sleep(1) # Rend la main
print(f"[{time.strftime('%X')}] end {n}")
async def main():
# create_task déclenche immédiatement la planification
t1 = asyncio.create_task(travail(1))
t2 = asyncio.create_task(travail(2))
await t1
await t2
asyncio.run(main())
create_task vs gather
create_task enregistre une coroutine auprès de la loop et retourne un objet
Task. gather est une aide qui attend un groupe de awaitables et retourne
leurs résultats.
async def valeur(x):
await asyncio.sleep(0.2)
return x
async def main():
# Deux styles équivalents fonctionnellement
t = asyncio.create_task(valeur(10))
r1 = await t
r2, r3 = await asyncio.gather(valeur(20), valeur(30))
print(r1, r2, r3)
asyncio.run(main())
gather et exceptions
Par défaut, si une coroutine échoue dans gather, les autres sont annulées.
async def ok():
await asyncio.sleep(0.1)
return "OK"
async def boom():
await asyncio.sleep(0.05)
raise ValueError("Erreur")
async def main():
try:
await asyncio.gather(ok(), boom(), ok())
except Exception as e:
print("Capture:", e)
asyncio.run(main())
Pour récupérer toutes les erreurs sans annuler : await asyncio.gather(..., return_exceptions=True).
Timers et callbacks
Il est possible de planifier des callbacks (fonctions classiques) à exécuter
plus tard avec call_later ou call_soon.
import asyncio
async def main():
loop = asyncio.get_running_loop()
def callback():
print("Callback appelé")
loop.call_later(0.5, callback) # Exécuté après ~0.5s
print("Attente...")
await asyncio.sleep(1)
asyncio.run(main())
Avec call_soon, le callback est planifié dès que possible (après la task courante).
File d'attente asynchrone
Les structures de synchronisation ont des versions async compatibles avec la loop :
import asyncio
async def producteur(q):
for i in range(3):
await asyncio.sleep(0.1)
await q.put(i)
print("Prod ->", i)
await q.put(None) # Sentinelle fin
async def consommateur(q):
while True:
item = await q.get()
if item is None:
break
print("Cons <-", item)
await asyncio.sleep(0.2)
async def main():
q = asyncio.Queue()
await asyncio.gather(producteur(q), consommateur(q))
asyncio.run(main())
Pourquoi utiliser asyncio.Queue en single-thread ? Même sans threads,
plusieurs tasks peuvent s'intercaler à chaque await. asyncio.Queue évite le
polling (pas de boucles d'attente), prévient les "réveils perdus", permet de
bloquer proprement sur await q.get(), offre une limite de taille (maxsize)
pour la backpressure et gère bien l'annulation. C'est la manière simple et
fiable de modéliser un schéma producteur/consommateur en asyncio.
Annulation de task
On peut demander l'annulation d'une tâche avec task.cancel(). L'annulation
est coopérative: elle se matérialise au prochain point d'attente (await), où
asyncio.CancelledError est injectée dans la coroutine. On peut intercepter
cette exception pour nettoyer, puis éventuellement la relancer. Il est
recommandé d'await la tâche annulée afin de consommer l'exception et
s'assurer que l'annulation a bien été traitée (sinon un avertissement peut être
émis en fin de programme).
import asyncio
async def long():
try:
for i in range(5):
await asyncio.sleep(0.3)
print("tick", i)
except asyncio.CancelledError:
print("Annulé proprement")
raise # Re-propager si nécessaire
async def main():
t = asyncio.create_task(long())
await asyncio.sleep(0.8)
t.cancel()
try:
await t
except asyncio.CancelledError:
print("Connu : task annulée")
asyncio.run(main())
Bonnes pratiques :
- Vérifier
task.cancelled()si besoin - Toujours
awaitla task annulée pour consommer l'exception
Pièges fréquents
| Problème | Symptôme | Solution |
|---|---|---|
time.sleep dans coroutine | Blocage total | Remplacer par await asyncio.sleep |
Oublier d'await une task | Avertissement "Task exception was never retrieved" | Conserver référence et await |
asyncio.run imbriqué | RuntimeError | Appeler une seule fois au top niveau |
| Appels CPU lourds inline | Latence élevée | to_thread ou ProcessPool |