Python 的 Coroutine 發展已經逐漸穩定成熟,已經成為了提升 Python 程式效能的優秀解決方案之一,在之前簡單介紹 Coroutine 和 await/async 時,我們在範例 code 中一直有用到一個 Python buildin 模組 asyncio,它提供了一套完整的工具和接口,用於建立非同步應用程式,其核心是 Event-Loop,會追蹤所有註冊的任務,並根據任務的狀態調度它們的執行。 故接著來了解 Event-Loop 和其工作的執行單位 Task 吧 !

還是使用之前的範例 :

import asyncio

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)
    await asyncio.sleep(delay)

async def do():
    await say_after(1, 'hello')
    print("####### finished do function #######")

asyncio.run(do())

之前一直都有提到 Coroutine 是無法直接運行的,故這邊使用了 asyncio.run(do()) 程式才開始運行,那 asyncio.run() 做了什麼呢?

啟動 asyncio.run(main())的作用是會把這個 main() 協程轉成 Task 放到 Event-Loop 內

接下來解釋上面提到的兩個關鍵字: 「Event-Loop」、「Task」

Event-Loop(事件迴圈)

Event-Loop 是個 Python 類別 BaseEventLoop ,關鍵運作部分是有一個無窮迴圈,可不斷地 loop 來進行排程或執行其他 Task ,而 Event-Loop 將任務分成 2 種狀態 : 「scheduled」 和 「ready」:

  • 只有處於 ready 狀態的任務才會被執行
  • scheduled 狀態的任務,會看是否到達到/超過預定時間,是的話才會轉成 ready 狀態

Python 3.7 之後 Event-Loop 做了蠻好的封裝,官網上也說原則上盡量都使用 asyncio.run() 來調用 Event-Loop 就可以了,盡量不要直接控制 Event-Loop 這個物件

Task

Task 是 Event-Loop 的調度基本單位,可進行更細粒度的控制如可以被取消、等待或加入任務群組。 可以將 Task 視為 Coroutine 的再包裝,這由 asyncio.create_task() 能感受到,因為他的輸入參數是一個 Coroutine ,然後經過此方法 Coroutine 會變成 Task 並會註冊到 Event-Loop 內。而讓 Coroutine 變成 Task 的方式有以下方法 :

  • asyncio.create_task()
  • asyncio.gather()

Event-Loop 一次只能執行 1 個 Task ,最常見的頂層運行中的 Task 就是 asyncio.run() 啟動的 Coroutine ,這基本上就是一個 「運行中的 Task」。

接下來如果目前「運行中的 Task」 進入「正在等待執行結果的狀態」時,也就是到 code 的 await 位置,由於 await 後面可以接 Coroutine 或 Awaitable obj ,故簡單分成兩種狀況 :

  • await 後面接 Coroutine :

    • 這時其實是繼續同步調用直到這個 Coroutine 結束或是遇到下一個 await 標記點,「運行中的 Task」並不會交出控制權給 Event-Loop ,這代表 Asynchronous 可能還不完善,可以看之後舉例了解
  • await 後面接 Awaitable obj :

    • 這時的情形會比較重要,「運行中的 Task」會把主控權交還給 Event-Loop 且暫停

注意一下,所有控制權的返回都是顯性的,也就是 Event-Loop 其實沒辦法強行從 Task 拿回控制權,都是 Task 主動把控制權交回去。控制權交出的方式如上述所說有:

  • 遇到 await
  • Task 運行結束

所以如果有一個 Task 裡面有死尋循環,Event-Loop 就可能會卡死了,要特別小心。


上面是一個簡單的 single task 範例,但注意 「 Event-Loop 的核心是可以有很多個 task ,然後 Event-Loop 它會決定哪個 task 要來運行 」。所以接下來是要討論在 async 模式下多個 task 有哪些運行情況 ?

單純多個 await 情況

import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

async def main():
    print(f"started at {time.strftime('%X')}")

    await say_after(1, 'hello')
    await say_after(2, 'world')

    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

# Terminal:
# started at 21:53:03
# hello
# world
# finished at 21:53:06

首先差不多 1 秒後 print 了 hello ,再來 2 秒後 print 了 world ,總共花了 3 秒

使用 asyncio.create_task()

上面的 code 運行完後發現一個大問題 : 「 print hello 要 1 秒,再來 print world 要 2 秒,總共花了 3 秒 」,竟然沒有同時一起執行 ! ? 這樣 Coroutine 到底有什麼意義 ? 那這就是 await 不足的地方,為了解決這問題 asyncio 有提供了 create_task(),範例如下 :

import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

async def main():
    task1 = asyncio.create_task(say_after(1, 'hello'))
    task2 = asyncio.create_task(say_after(2, 'world'))

    print(f"started at {time.strftime('%X')}")

    await task1
    await task2

    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

# Terminal:
# started at 00:00:15
# hello
# world
# finished at 00:00:17

上面這個範例的結果,發現總共只花了 2 秒,是我們期望的!

這範例最重要的是 asyncio.create_task(),代表有把兩個任務都先添加到 Event-Loop 裡面了,故 task1task2同時執行,因為它們是獨立的 Task,故代表 :

  • await task1 會等待 task1 完成,但不會影響 task2 的執行
  • await task2 會等待 task2 完成,確保 main() 不會過早結束

上面說明部分可以嘗試著調整 delay 時間和註解掉 await task2 來做一些測試,會發現一些特別的事情,例如 :

import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

async def main():
    task1 = asyncio.create_task(say_after(3, 'hello'))
    task2 = asyncio.create_task(say_after(2, 'world'))
    
    print(f"started at {time.strftime('%X')}")
    
    await task1
    #await task2
    
    print(f"finished at {time.strftime('%X')}")
    
asyncio.run(main())

# Terminal :
# started at 18:15:41
# world
# hello
# finished at 18:15:44

特別的地方是會先等 2 秒後顯示 world,在等 1 秒顯示 hello ! 雖然沒有 await task2 但他還是有被排程!

再例如 :

import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

async def main():
    task1 = asyncio.create_task(say_after(1, 'hello'))
    task2 = asyncio.create_task(say_after(2, 'world'))
    
    print(f"started at {time.strftime('%X')}")
    
    await task1
    #await task2
    
    print(f"finished at {time.strftime('%X')}")
    
asyncio.run(main())

# Terminal :
# started at 18:18:09
# hello
# finished at 18:18:10

承前例我們知道 task2 是有被排到 Event-Loop 內的,但因為沒有 await task2,所以等 1 秒後顯示 hello 後就直接結束了,沒有等 task2 完成。

Coroutine 的 return value

Coroutine 的返回值也是蠻重要的,這該怎麼做呢 ? 而 await 有一個重要功用是把 Coroutine 或 Task 的返回值拿出來,故首先我們修改一下原始 code :

import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    return f"{delay} - {what}"

async def main():
    task1 = asyncio.create_task(say_after(1, 'hello'))
    task2 = asyncio.create_task(say_after(2, 'world'))

    print(f"started at {time.strftime('%X')}")

    result1 = await task1
    result2 = await task2

    print(result1)
    print(result2)

    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

這裡展示一個簡單範例,該怎麼拿到 async def 函數的返回值。

使用 asyncio.gather()

承上 code 應該會思考,難道我有 10 個 Task 我就要寫 10 行 await 嗎 ? 在這裡 asyncio 提供了 gather() 來簡化這個問題。 但要注意 gather() 並不是一個 Coroutine Function,它會返回一個叫 Future 的東西,而 Future 也是需要 await 的。

import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    return f"{delay} - {what}"

async def main():
    task1 = asyncio.create_task(say_after(1, 'hello'))
    task2 = asyncio.create_task(say_after(2, 'world'))

    print(f"started at {time.strftime('%X')}")

    result = await asyncio.gather(task1, task2)

    print(result)

    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

asyncio.gather() 的作用是可執行多個 Coroutine,並可收集每個的 return value 存於 list 中, 更進一步來說 gather() 輸入參數其實可以是多個 Coroutine 、Task 或者 Future,故可以不用特地把 Coroutine 轉成 Task 再放入 gather() 內也沒關係,因此可以再把 code 簡化成 :

import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    return f"{delay} - {what}"

async def main():

    print(f"started at {time.strftime('%X')}")

    result = await asyncio.gather(say_after(1, 'hello'), say_after(2, 'world'))

    print(result)

    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

參考資料