Aller au contenu principal

L'Event Loop

L'event loop est le coeur d'asyncio. Elle :

  • Planifie les coroutines et tasks prêtes
  • Gère les I/O non bloquantes via le système (epoll, kqueue, IOCP...)
  • Réveille les tasks quand leurs opérations sont terminées
  • Exécute des callbacks (timers, futures, signaux)

Cycle simplifié

  1. Sélectionne les tasks prêtes
  2. Exécute chacune jusqu'au prochain await
  3. Attend les nouveaux événements I/O / timers
  4. Retour à 1

Aucune task ne monopolise la loop si elle await régulièrement.

Observation basique

Cet exemple illustre la planification immédiate des coroutines via asyncio.create_task. Chaque tâche démarre et s'exécute jusqu'au premier point d'attente (await), ici asyncio.sleep(1), qui rend la main à la loop. Les deux sleep se chevauchent: on observe des "start" quasi simultanés puis des "end" environ 1 seconde plus tard. Les await t1 puis await t2 attendent la fin des tâches, sans empêcher leur exécution concurrente pendant leur sommeil asynchrone.

import asyncio, time

async def travail(n):
print(f"[{time.strftime('%X')}] start {n}")
await asyncio.sleep(1) # Rend la main
print(f"[{time.strftime('%X')}] end {n}")

async def main():
# create_task déclenche immédiatement la planification
t1 = asyncio.create_task(travail(1))
t2 = asyncio.create_task(travail(2))
await t1
await t2

asyncio.run(main())

create_task vs gather

create_task enregistre une coroutine auprès de la loop et retourne un objet Task. gather est une aide qui attend un groupe de awaitables et retourne leurs résultats.

async def valeur(x):
await asyncio.sleep(0.2)
return x

async def main():
# Deux styles équivalents fonctionnellement
t = asyncio.create_task(valeur(10))
r1 = await t

r2, r3 = await asyncio.gather(valeur(20), valeur(30))
print(r1, r2, r3)

asyncio.run(main())

gather et exceptions

Par défaut, si une coroutine échoue dans gather, les autres sont annulées.

async def ok():
await asyncio.sleep(0.1)
return "OK"

async def boom():
await asyncio.sleep(0.05)
raise ValueError("Erreur")

async def main():
try:
await asyncio.gather(ok(), boom(), ok())
except Exception as e:
print("Capture:", e)

asyncio.run(main())

Pour récupérer toutes les erreurs sans annuler : await asyncio.gather(..., return_exceptions=True).

Timers et callbacks

Il est possible de planifier des callbacks (fonctions classiques) à exécuter plus tard avec call_later ou call_soon.

import asyncio

async def main():
loop = asyncio.get_running_loop()

def callback():
print("Callback appelé")

loop.call_later(0.5, callback) # Exécuté après ~0.5s
print("Attente...")
await asyncio.sleep(1)

asyncio.run(main())

Avec call_soon, le callback est planifié dès que possible (après la task courante).

File d'attente asynchrone

Les structures de synchronisation ont des versions async compatibles avec la loop :

import asyncio

async def producteur(q):
for i in range(3):
await asyncio.sleep(0.1)
await q.put(i)
print("Prod ->", i)
await q.put(None) # Sentinelle fin

async def consommateur(q):
while True:
item = await q.get()
if item is None:
break
print("Cons <-", item)
await asyncio.sleep(0.2)

async def main():
q = asyncio.Queue()
await asyncio.gather(producteur(q), consommateur(q))

asyncio.run(main())
remarque

Pourquoi utiliser asyncio.Queue en single-thread ? Même sans threads, plusieurs tasks peuvent s'intercaler à chaque await. asyncio.Queue évite le polling (pas de boucles d'attente), prévient les "réveils perdus", permet de bloquer proprement sur await q.get(), offre une limite de taille (maxsize) pour la backpressure et gère bien l'annulation. C'est la manière simple et fiable de modéliser un schéma producteur/consommateur en asyncio.

Annulation de task

On peut demander l'annulation d'une tâche avec task.cancel(). L'annulation est coopérative: elle se matérialise au prochain point d'attente (await), où asyncio.CancelledError est injectée dans la coroutine. On peut intercepter cette exception pour nettoyer, puis éventuellement la relancer. Il est recommandé d'await la tâche annulée afin de consommer l'exception et s'assurer que l'annulation a bien été traitée (sinon un avertissement peut être émis en fin de programme).

import asyncio

async def long():
try:
for i in range(5):
await asyncio.sleep(0.3)
print("tick", i)
except asyncio.CancelledError:
print("Annulé proprement")
raise # Re-propager si nécessaire

async def main():
t = asyncio.create_task(long())
await asyncio.sleep(0.8)
t.cancel()
try:
await t
except asyncio.CancelledError:
print("Connu : task annulée")

asyncio.run(main())

Bonnes pratiques :

  • Vérifier task.cancelled() si besoin
  • Toujours await la task annulée pour consommer l'exception

Pièges fréquents

ProblèmeSymptômeSolution
time.sleep dans coroutineBlocage totalRemplacer par await asyncio.sleep
Oublier d'await une taskAvertissement "Task exception was never retrieved"Conserver référence et await
asyncio.run imbriquéRuntimeErrorAppeler une seule fois au top niveau
Appels CPU lourds inlineLatence élevéeto_thread ou ProcessPool