C# Channels — Publish / Subscribe Workflows Part I
สำหรับบทความนี้เป็นบทความแรก series: System.Threading.Channels
- Part I : C# Channels — Publish / Subscribe Workflows (this post) :)
- Part 2: C# Channels — Timeout and Cancellation (comming soon !!!)
- Part 3: C# Channels — Async Data Pipelines (comming soon !!!)
หลายวันก่อนได้อ่านบทความเรื่อง new data structure System.Threading.Channels namespace เพื่อนำไปใช้ทำ concurrent work flow แต่ก่อนอื่นเรามาพูดเรื่อง Concurrency กันก่อนเพื่อให้เห็นภาพตรงกัน
Concurrency
หลายคนมักสับสนความสัมพันธ์ ระหว่าง concurrency และ parallelism
การทำงานของ 2 procedures พร้อมกัน(Concurrency) ไม่ได้หมายถึงว่ามันทำงานแบบคู่ขนาน(parallel) แต่คำว่า Concurrency ตามคำกล่าวของ Martin Kleppmann คือ
For defining concurrency, the exact time doesn’t matter: we simply call two operations concurrent if they are both unaware of each other, regardless of the physical time at which they occurred.
– “Designing Data-Intensive Applications” by Martin Kleppmann
พูดง่ายๆก็คือ มันไม่ได้ขึ้นกลับเวลาเลยว่ามันจะทำงานพร้อมกัน สาระสำคัญคือพวกมันต้องทำงานพร้อมกันโดยไม่ต้องรู้จักกัน ต่างคนต่างทำงานของตัวเอง concurrency เลยทำให้เราสามารถทำงานแบบคู่ขนานได้
A concurrent programจัดการกับหลายๆสิ่งในเวลาเดียวกัน ในขณะที่ parallel program ทำงานหลายงานในเวลาเดียวกัน แต่ผมว่าเรามองแบบนี้ดีกว่า concurrency มันเกี่ยวกับการวาง structure และ parallelism คือการกระทำ
Channels
A concurrent program คือการวางโครงสร้างของส่วนทำงาน สองส่วนเพื่อทำงานร่วมกัน เพือจะทำให้มันทำงานได้ เราต้องการการสื่อสารระหว่างกัน ใน .NET เราสามารถทำได้หลายทางมาก แต่ในบทความนี้เราจะมาเรียนรู้การใช้งาน System.Threading.Channels ซึ่งเริ่มมีให้ใช้ตั้งแต่ .NET Core 3.1
A channel คือ data structure ที่อนุญาตให้ Thread A คุยกับ Thread B แต่ก่อนจะมี Channels ให้ใช้เราอาจจะทำการ implement โดนใช้การ share ตัวแปรที่ support การเข้าถึงแบบ concurrency ConcurrentQueue<T> แต่ channels นั้นต่างออกไปครับ โดยมันสามารถส่ง ข้อมูลโดยตรงจาก Thread A ไป Thread B ได้เลย และ message ที่ไปจะเป็นแบบ FIFO (first in first out) โดยข้างล่างจะเป็นตัวอย่างการสร้าง channel
Channel<T> ch = Channel.CreateUnbounded<T>();
Channel เป็น static class ที่มี factory method ให้เราใช้ในการสร้าง channel ส่วน T คือ Type ของ message
เช่นผมจะสร้าง channel ของ string แล้วทำการเขียน message ลงไป
Channel<string> ch = Channel.CreateUnbounded<string>();
await ch.Writer.WriteAsync("My first message");
await ch.Writer.WriteAsync("My second message");
ch.Writer.Complete();
ส่วน code ที่จะอ่าน message ออกจาก channel จะเป็นตามด้านล่างครับ
while (await ch.Reader.WaitToReadAsync())
Console.WriteLine(await ch.Reader.ReadAsync());
ตัว WaitToReadAsync() จะมีค่าเป็น true เมื่อมา message มาถึง หรือจะเป็น false เมื่อ channel ด้าน writter เรียก method Complete()
สำหรับ reader ยังมี return IAsyncEnumerable<T> ซึ่งเป็น streaming API ทำให้เราเขียน code ให้ clean ขึ้นดังนี้
Ref: C# — Iterating with Async Enumerables in C# 8 | Microsoft Docs
await foreach (var item in ch.Reader.ReadAllAsync())
Console.WriteLine(item);
Using Channels
ตัวอย่างแรก เราจะแยก producer thread และ consumer thread โดยคุยกันผ่าน channel
var ch = Channel.CreateUnbounded<string>();var consumer = Task.Run(async () =>
{
await foreach (var item in ch.Reader.ReadAllAsync())
Console.WriteLine(item);
});var producer = Task.Run(async () =>
{
var rnd = new Random();
for (int i = 0; i < 5; i++)
{
await Task.Delay(TimeSpan.FromSeconds(rnd.Next(3)));
await ch.Writer.WriteAsync($"Message {i}");
}
ch.Writer.Complete();
});await Task.WhenAll(producer, consumer);Console.ReadKey();
Concurrency Patterns
Don’t communicate by sharing memory, share memory by communicating.
เรามาต่อกันในเรื่อง concurrent programming techniques ด้วย channel ซึ่งจะมีหลากหลายโดยแต่ล่ะตัวอย่างจะไม่เกี่ยวข้องกันเลย
The Generator
generator คือ method ที่ return ChannelReader<T>
โดยด้านหนึ่งจะทำการสร้าง channel ส่วน consumer ก็ทำการระบุจำนวน message ที่ต้องการโดยการทำงานแยก thread กัน
static ChannelReader<string> CreateMessenger(string msg, int count)
{
var ch = Channel.CreateUnbounded<string>();
var rnd = new Random();
Task.Run(async () =>
{
for (int i = 0; i < count; i++)
{
await ch.Writer.WriteAsync($"{msg} {i}");
await Task.Delay(TimeSpan.FromSeconds(rnd.Next(3)));
}
ch.Writer.Complete();
});
return ch.Reader;
}
เรา return ChannelReader<string>
เพื่อให้แน่ใจว่า consumer จะไม่ write message ลง channel
var pom = CreateMessenger("Pom", 5);
await foreach (var item in pom.ReadAllAsync())
Console.WriteLine(item);
ที่นี้เรามาลองสร้าง หลายๆ channel กัน
var pom = CreateMessenger("Pom", 3);
var ja = CreateMessenger("Ja", 3);
while (await pom.WaitToReadAsync() || await ja.WaitToReadAsync())
{
Console.WriteLine(await pom.ReadAsync());
Console.WriteLine(await ja.ReadAsync());
}
แต่เราจะเห็นปัญหาในหลายแบบ สมมุติว่า ja มี message มากกว่า เช่น
var pom = CreateMessenger("Pom", 2);
var ja = CreateMessenger("Ja", 5);
กรี๊ดเราเจอ
ถ้าจะแก้เร็วอาจจะครอบด้วย Try Catch
try
{
Console.WriteLine(await pom.ReadAsync());
Console.WriteLine(await ja.ReadAsync());
}
catch (ChannelClosedException) { }
แต่มันไม่ใช่ป่าวแบบนี้ เพราะเราต้องให้ pom ทำงานเสร็จก่อน ja ค่อยทำ เราต้องทำได้ดีกว่านี้ซิ
Multiplexer
เราต้องหารอ่านข้อมูลจาก pom และ ja พร้อมกัน ทำไมเราไม่ทำการ merge message ไปอีก channel แล้ว read จาก channel ใหม่เลยล่ะ เริ่มจาก สร้าง Merge<T>
static ChannelReader<T> Merge<T>(
ChannelReader<T> first, ChannelReader<T> second)
{
var output = Channel.CreateUnbounded<T>();
Task.Run(async () =>
{
await foreach (var item in first.ReadAllAsync())
await output.Writer.WriteAsync(item);
});
Task.Run(async () =>
{
await foreach (var item in second.ReadAllAsync())
await output.Writer.WriteAsync(item);
});
return output;
}
ตัว merge รับ 2 channel แล้ว create 2 task เพื่อทำการ read จาก 2 channel และทำการ write ไปอีก channel เหมือนรูปด้านล่าง
เราก็จะใช้มันได้แบบนี้
var ch = Merge(CreateMessenger("Pom", 3), CreateMessenger("Ja", 5));
await foreach (var item in ch.ReadAllAsync())
Console.WriteLine(item);
แต่ code เรายังมี defect เพราะเรายังไม่ได้ Complete
channel และ Merge<T>
เรารับได้แค่ 2 channel เราสามารถปรับให้สมบูรณ์ได้ตามนี้
static ChannelReader<T> Merge<T>(params ChannelReader<T>[] inputs)
{
var output = Channel.CreateUnbounded<T>();
Task.Run(async () =>
{
async Task Redirect(ChannelReader<T> input) // local function
{
await foreach (var item in input.ReadAllAsync())
await output.Writer.WriteAsync(item);
}
await Task.WhenAll(inputs.Select(i => Redirect(i)).ToArray());
output.Writer.Complete(); // <== เรียก Complete
});
return output;
}
Demultiplexer
ในกรณีที่เรามีงานเยอะแล้วต้องการกระจายงานให้ หลายๆ consumers ช่วย เรามาทำการ Split<T> กัน
static IList<ChannelReader<T>> Split<T>(ChannelReader<T> ch, int n)
{
var outputs = new Channel<T>[n];
for (int i = 0; i < n; i++)
outputs[i] = Channel.CreateUnbounded<T>();
Task.Run(async () =>
{
var index = 0;
await foreach (var item in ch.ReadAllAsync())
{
await outputs[index].Writer.WriteAsync(item);
index = (index + 1) % n;
}
foreach (var ch in outputs)
ch.Writer.Complete();
});
return outputs.Select(ch => ch.Reader).ToArray();
}
Split<T> รับ channel มาแล้วสร้าง new reader ตาม parameter ที่ส่งมา
var pom = CreateMessenger("Pom", 10);
var readers = Split<string>(pom, 3);
var tasks = new List<Task>();
for (int i = 0; i < readers.Count; i++)
{
var reader = readers[i];
var index = i;
tasks.Add(Task.Run(async () =>
{
await foreach (var item in reader.ReadAllAsync())
Console.WriteLine($"Reader {index}: {item}");
}));
}
await Task.WhenAll(tasks);
Conclusion
บทความนี้เราได้ทำความรุ้จักว่า Channel คืออะไร และเรียนรู้ในการ implement publish/subscribe workflows และได้เรียนรู้การใช้งาน cpu เต็มที่ในการแยกการ Read/Write ออกจากกัน