The Will Will Web

記載著 Will 在網路世界的學習心得與技術分享

使用 C# 8.0 Async Streams 實現非同步串流 ASP.NET Core Web API

C# 8.0 引入了一個 Async Streams (非同步串流) 的語言特性,這個特性讓我們可以做到許多以前做不到的事,若用在 ASP.NET Core Web API 上面,你就可以很輕鬆的實現 API 資料串流功能,讓你可以透過「非同步」的方式有效率的處理大量資料。由於這個概念太過前衛,所以我們先從基本的語法開始,然後再來看看如何應用在 ASP.NET Core Web API 上面。

the concept of asynchronous streaming in programming  The image features flowing lines and interconnected no

前情提要

在 C# 8.0 之前,原本就有支援迭代器方法 IEnumerable<T> 和非同步方法 Task<T>,但是這兩者不能同時使用,也就是說你不能同時使用迭代器方法非同步方法。這個限制對於需要處理大量資料需要即時處理的應用程式來說是一個很大的限制,因為你無法以非同步的方式迭代一系列的值,也意味著你無法有效率的使用執行緒 (Threads)。

舉個例子,以下是一個簡單的迭代器方法,因為我們實作的方法回傳了 IEnumerable<int> 型別,這種回傳型別可以讓你用 yield return 語句回傳一個序列中的其中一筆結果:

public static IEnumerable<int> GenerateNumbers()
{
    for (int i = 0; i < 10; i++)
    {
        Thread.Sleep(50); // 模擬同步操作
        yield return i;
    }
}

同時,你可以利用 foreach 或直接透過迭代器方法來迭代這個方法回傳的每一筆資料:

public static void Main(string[] args)
{
    foreach (var number in GenerateNumbers())
    {
        number.Dump("Thread ID = " + Thread.CurrentThread.ManagedThreadId);
    }
}

這裡的 GenerateNumbers() 因為使用 IEnumerable<int> 來回傳一系列的數字,所以會讓 foreach 在執行迴圈的時候,讓每次 yield return 回傳一筆資料時,就會讓 foreach 迴圈執行一次,而不是等 GenerateNumbers() 回傳所有資料時才開始迭代所有資料。

如果你在 GenerateNumbers() 這個方法中會需要呼叫一些外部 API 以取得資料,我以 Thread.Sleep(50); 為例,用以模擬一個外部 API 呼叫的操作,或是模擬一個資料庫呼叫之類的,那麼這個迭代器就沒辦法很有效率的執行。那是因為這個迭代器方法 GenerateNumbers() 是同步的,當你呼叫 GenerateNumbers() 這個方法時,它雖然會在每次 yield return 的時候立即回傳資料,讓迭代器可以逐筆運作,但是所有資料的處理過程中,都必須在同一個執行緒下執行,你沒辦法使用 await 方法等待外部 API 呼叫完成,所以肯定會造成一些 CPU 的浪費。

非同步串流

感恩 C# 8.0 的出現,我們現在可以透過 IAsyncEnumerable<T> 這個介面來實作非同步串流,這個介面可以讓你在迭代的過程中,能夠使用 await 來等待非同步操作完成,這樣就可以讓你的迭代器方法在處理大量資料時,更有效率的使用 CPU 資源。

public static async IAsyncEnumerable<int> GenerateNumbersAsync()
{
    for (int i = 0; i < 10; i++)
    {
        await Task.Delay(50); // 模擬非同步操作
        yield return i;
    }
}

public static async Task Main(string[] args)
{
    await foreach (var number in GenerateNumbersAsync())
    {
        number.Dump("Thread ID = " + Thread.CurrentThread.ManagedThreadId);
    }
}

你可以比較一下程式碼的差異,GenerateNumbersAsync() 方法使用了 IAsyncEnumerable<int> 介面,並且也套用了 async 關鍵字,這個語法可以讓你在這個方法中使用 await 來等待非同步操作完成。除此之外,你在使用迭代器取得資料時,也可以透過 await foreach 來做到非同步的迭代,非常神奇的寫法!

設計非同步串流 Web API

在 ASP.NET Core Web API 中,你可以透過 IAsyncEnumerable<T> 來實作非同步串流的 Web API,這樣可以讓你的 API 在處理大量資料時,或是需要長時間處理資料時,能夠將回應的結果進行「串流」處理,一次一筆資料的回應給用戶端,讓前端也能透過非同步的方式處理資料,減少時間上的浪費。

這個非同步串流 Web API 設計出來後,理論上是這樣運作的:

  1. 用戶端發送請求給 Web API
  2. Web API 開始處理請求,並且透過非同步的方式取得資料
  3. Web API 透過 IAsyncEnumerable<T> 介面回傳資料給用戶端,並且用 yield return 逐筆回傳資料,每回傳一筆資料就會送出一個分段資料(Chunked Data)給用戶端
  4. 用戶端先接收到 Respone Header 資料
  5. 用戶端開始接收到第一筆資料,並且得到一個分段,接著再逐筆接收資料,每收到一筆資料時,用戶端都會知道,直到接收完所有資料

所以我們理想上會得到以下效益:

  1. 因為後端處理資料的時間較長,回應資料的時間也較多、較久,但用戶端不用等後端送出所有資料才能開始處理資料
  2. 前端不清楚後端到底要送多少資料,但前端可以在接收到第一筆資料後就開始處理資料,這樣可以讓用戶端更快的顯示資料

因此,我們就可以做到,後端只要處理完一筆資料,送到用戶端,用戶端立刻就能開始顯示資料,大幅提升了用戶體驗。

其實這種需求在這一年多來蠻常見的,因為 ChatGPT 這個服務就是這樣運作的,當你在使用 ChatGPT 這個服務時,你會發現你的提問送出後,後端 API 是一個 Token 一個 Token 的回傳給你,前端網頁可以不用等所有訊息都回傳才顯示訊息,而是使用者在接收到第一個 Token 開始,立刻就能看到回應,網頁上很明顯是一個字一個字的顯示在畫面上,不用讓使用者等太久。

實作非同步串流 Web API

以下我就用一個簡單的例子來實作非同步串流 Web API,並以 .NET 擔任用戶端,使用 HttpClient 呼叫這個 API,藉此示範非同步串流的效果。

  1. 建立專案

    dotnet new webapi -n async-streams-api --use-controllers
    cd async-streams-api
    
  2. 調整 API 內容

    原本內容如下:

    [HttpGet(Name = "GetWeatherForecast")]
    public IEnumerable<WeatherForecast> Get()
    {
        return Enumerable.Range(1, 5).Select(index => new WeatherForecast
        {
            Date = DateOnly.FromDateTime(DateTime.Now.AddDays(index)),
            TemperatureC = Random.Shared.Next(-20, 55),
            Summary = Summaries[Random.Shared.Next(Summaries.Length)]
        })
        .ToArray();
    }
    

    調整後內容如下,改用 async IAsyncEnumerable<WeatherForecast> 來回應:

    [HttpGet(Name = "GetWeatherForecast")]
    public async IAsyncEnumerable<WeatherForecast> Get()
    {
        for (int index = 1; index <= 5; index++)
        {
            await Task.Delay(100); // 模擬非同步操作
            yield return new WeatherForecast
            {
                Date = DateOnly.FromDateTime(DateTime.Now.AddDays(index)),
                TemperatureC = Random.Shared.Next(-20, 55),
                Summary = Summaries[Random.Shared.Next(Summaries.Length)]
            };
        }
    }
    
  3. 啟動網站

    dotnet run
    
  4. 簡易測試串流效果

    curl.exe -s http://localhost:5118/WeatherForecast
    

    這個命令會回傳一個 JSON 陣列,裡面有五筆資料,但是你會發現每筆資料都會間隔一秒後顯示,也意味著現在我們的 API 已經不是一口氣輸出五筆資料,而是一筆一筆的回傳,每筆資料之間會有一段時間的延遲,這樣就可以看到非同步串流的效果。

    這個 API 會分 5 個段落 (chunks) 送出資料:

    [{"date":"2024-09-28","temperatureC":22,"temperatureF":71,"summary":"Balmy"}
    ,{"date":"2024-09-29","temperatureC":-4,"temperatureF":25,"summary":"Bracing"}
    ,{"date":"2024-09-30","temperatureC":-6,"temperatureF":22,"summary":"Hot"}
    ,{"date":"2024-10-01","temperatureC":15,"temperatureF":58,"summary":"Sweltering"}
    ,{"date":"2024-10-02","temperatureC":-6,"temperatureF":22,"summary":"Hot"}]
    

實作用戶端串流接收程式

為了要能做到串流接收的效果,其實 HttpClient 程式的寫法也有所差異,以下是一個簡單的 .NET 8 控制台應用程式,用來接收這個串流資料:

  1. 建立主控台專案

    dotnet new console -o async-streams-api-console
    cd async-streams-api-console
    
  2. 安裝 ObjectDumper.NET NuGet 套件 (用來顯示物件資料)

    dotnet add package ObjectDumper.NET
    

    GitHub: https://github.com/thomasgalliker/ObjectDumper

    NuGet: https://www.nuget.org/packages/ObjectDumper.NET/

  3. 編輯 Program.cs 檔案

    using System.Text.Json;
    
    var client = new HttpClient();
    
    client.DefaultRequestHeaders.UserAgent.TryParseAdd("Duotify/1.0");
    
    // 這意味著我們要在接收到 Response Header 後就開始接收 Steam 資料
    using var resp = await client.GetAsync("http://localhost:5118/WeatherForecast", HttpCompletionOption.ResponseHeadersRead);
    
    using Stream responseStream = await resp.Content.ReadAsStreamAsync();
    
    var options = new JsonSerializerOptions
    {
        PropertyNameCaseInsensitive = true,
        DefaultBufferSize = 128
    };
    
    // 此方法會回傳 IAsyncEnumerable<WeatherForecast?> 型別的資料,好讓他跟 await foreach 一起使用
    var weathers = JsonSerializer.DeserializeAsyncEnumerable<WeatherForecast>(responseStream, options);
    
    await foreach (var weather in weathers)
    {
        Console.WriteLine(weather.Dump(DumpStyle.Console));
    }
    
    public class WeatherForecast
    {
        public DateTime Date { get; set; }
        public int TemperatureC { get; set; }
        public int TemperatureF => 32 + (int)(TemperatureC / 0.5556);
        public string? Summary { get; set; }
    }
    
  4. 啟動程式

    dotnet run
    

    這個程式會開始接收串流資料,並且每接收到一筆資料就會顯示出來,這樣就可以看到非同步串流的效果。

    {WeatherForecast}
      Date: 2024/9/28 上午 12:00:00
      TemperatureC: -13
      TemperatureF: 9
      Summary: "Scorching"
    {WeatherForecast}
      Date: 2024/9/29 上午 12:00:00
      TemperatureC: -17
      TemperatureF: 2
      Summary: "Cool"
    {WeatherForecast}
      Date: 2024/9/30 上午 12:00:00
      TemperatureC: -12
      TemperatureF: 11
      Summary: "Scorching"
    {WeatherForecast}
      Date: 2024/10/1 上午 12:00:00
      TemperatureC: 38
      TemperatureF: 100
      Summary: "Freezing"
    {WeatherForecast}
      Date: 2024/10/2 上午 12:00:00
      TemperatureC: 14
      TemperatureF: 57
      Summary: "Freezing"
    

相關連結

留言評論