C# Channels — Publish / Subscribe Workflows Part I

Keattisak Chinburarat
5 min readOct 22, 2021

--

สำหรับบทความนี้เป็นบทความแรก series: System.Threading.Channels

หลายวันก่อนได้อ่านบทความเรื่อง new data structure System.Threading.Channels namespace เพื่อนำไปใช้ทำ concurrent work flow แต่ก่อนอื่นเรามาพูดเรื่อง Concurrency กันก่อนเพื่อให้เห็นภาพตรงกัน

Concurrency

หลายคนมักสับสนความสัมพันธ์ ระหว่าง concurrency และ parallelism

การทำงานของ 2 procedures พร้อมกัน(Concurrency) ไม่ได้หมายถึงว่ามันทำงานแบบคู่ขนาน(parallel) แต่คำว่า Concurrency ตามคำกล่าวของ Martin Kleppmann คือ

For defining concurrency, the exact time doesn’t matter: we simply call two operations concurrent if they are both unaware of each other, regardless of the physical time at which they occurred.

“Designing Data-Intensive Applications” by Martin Kleppmann

พูดง่ายๆก็คือ มันไม่ได้ขึ้นกลับเวลาเลยว่ามันจะทำงานพร้อมกัน สาระสำคัญคือพวกมันต้องทำงานพร้อมกันโดยไม่ต้องรู้จักกัน ต่างคนต่างทำงานของตัวเอง concurrency เลยทำให้เราสามารถทำงานแบบคู่ขนานได้

A concurrent programจัดการกับหลายๆสิ่งในเวลาเดียวกัน ในขณะที่ parallel program ทำงานหลายงานในเวลาเดียวกัน แต่ผมว่าเรามองแบบนี้ดีกว่า concurrency มันเกี่ยวกับการวาง structure และ parallelism คือการกระทำ

Channels

A concurrent program คือการวางโครงสร้างของส่วนทำงาน สองส่วนเพื่อทำงานร่วมกัน เพือจะทำให้มันทำงานได้ เราต้องการการสื่อสารระหว่างกัน ใน .NET เราสามารถทำได้หลายทางมาก แต่ในบทความนี้เราจะมาเรียนรู้การใช้งาน System.Threading.Channels ซึ่งเริ่มมีให้ใช้ตั้งแต่ .NET Core 3.1

A channel คือ data structure ที่อนุญาตให้ Thread A คุยกับ Thread B แต่ก่อนจะมี Channels ให้ใช้เราอาจจะทำการ implement โดนใช้การ share ตัวแปรที่ support การเข้าถึงแบบ concurrency ConcurrentQueue<T> แต่ channels นั้นต่างออกไปครับ โดยมันสามารถส่ง ข้อมูลโดยตรงจาก Thread A ไป Thread B ได้เลย และ message ที่ไปจะเป็นแบบ FIFO (first in first out) โดยข้างล่างจะเป็นตัวอย่างการสร้าง channel

Channel<T> ch = Channel.CreateUnbounded<T>();

Channel เป็น static class ที่มี factory method ให้เราใช้ในการสร้าง channel ส่วน T คือ Type ของ message

เช่นผมจะสร้าง channel ของ string แล้วทำการเขียน message ลงไป

Channel<string> ch = Channel.CreateUnbounded<string>();
await ch.Writer.WriteAsync("My first message");
await ch.Writer.WriteAsync("My second message");
ch.Writer.Complete();

ส่วน code ที่จะอ่าน message ออกจาก channel จะเป็นตามด้านล่างครับ

while (await ch.Reader.WaitToReadAsync()) 
Console.WriteLine(await ch.Reader.ReadAsync());

ตัว WaitToReadAsync() จะมีค่าเป็น true เมื่อมา message มาถึง หรือจะเป็น false เมื่อ channel ด้าน writter เรียก method Complete()

สำหรับ reader ยังมี return IAsyncEnumerable<T> ซึ่งเป็น streaming API ทำให้เราเขียน code ให้ clean ขึ้นดังนี้

Ref: C# — Iterating with Async Enumerables in C# 8 | Microsoft Docs

await foreach (var item in ch.Reader.ReadAllAsync())
Console.WriteLine(item);

Using Channels

ตัวอย่างแรก เราจะแยก producer thread และ consumer thread โดยคุยกันผ่าน channel

var ch = Channel.CreateUnbounded<string>();var consumer = Task.Run(async () =>
{
await foreach (var item in ch.Reader.ReadAllAsync())
Console.WriteLine(item);
});
var producer = Task.Run(async () =>
{
var rnd = new Random();
for (int i = 0; i < 5; i++)
{
await Task.Delay(TimeSpan.FromSeconds(rnd.Next(3)));
await ch.Writer.WriteAsync($"Message {i}");
}
ch.Writer.Complete();
});
await Task.WhenAll(producer, consumer);Console.ReadKey();

Concurrency Patterns

Don’t communicate by sharing memory, share memory by communicating.

เรามาต่อกันในเรื่อง concurrent programming techniques ด้วย channel ซึ่งจะมีหลากหลายโดยแต่ล่ะตัวอย่างจะไม่เกี่ยวข้องกันเลย

The Generator

generator คือ method ที่ return ChannelReader<T> โดยด้านหนึ่งจะทำการสร้าง channel ส่วน consumer ก็ทำการระบุจำนวน message ที่ต้องการโดยการทำงานแยก thread กัน

static ChannelReader<string> CreateMessenger(string msg, int count)
{
var ch = Channel.CreateUnbounded<string>();
var rnd = new Random();

Task.Run(async () =>
{
for (int i = 0; i < count; i++)
{
await ch.Writer.WriteAsync($"{msg} {i}");
await Task.Delay(TimeSpan.FromSeconds(rnd.Next(3)));
}
ch.Writer.Complete();
});

return ch.Reader;
}

เรา return ChannelReader<string> เพื่อให้แน่ใจว่า consumer จะไม่ write message ลง channel

var pom = CreateMessenger("Pom", 5);
await foreach (var item in pom.ReadAllAsync())
Console.WriteLine(item);

ที่นี้เรามาลองสร้าง หลายๆ channel กัน

var pom = CreateMessenger("Pom", 3);
var ja = CreateMessenger("Ja", 3);

while (await pom.WaitToReadAsync() || await ja.WaitToReadAsync())
{
Console.WriteLine(await pom.ReadAsync());
Console.WriteLine(await ja.ReadAsync());
}

แต่เราจะเห็นปัญหาในหลายแบบ สมมุติว่า ja มี message มากกว่า เช่น

var pom = CreateMessenger("Pom", 2);
var ja = CreateMessenger("Ja", 5);

กรี๊ดเราเจอ

ถ้าจะแก้เร็วอาจจะครอบด้วย Try Catch

try
{
Console.WriteLine(await pom.ReadAsync());
Console.WriteLine(await ja.ReadAsync());
}
catch (ChannelClosedException) { }

แต่มันไม่ใช่ป่าวแบบนี้ เพราะเราต้องให้ pom ทำงานเสร็จก่อน ja ค่อยทำ เราต้องทำได้ดีกว่านี้ซิ

Multiplexer

เราต้องหารอ่านข้อมูลจาก pom และ ja พร้อมกัน ทำไมเราไม่ทำการ merge message ไปอีก channel แล้ว read จาก channel ใหม่เลยล่ะ เริ่มจาก สร้าง Merge<T>

static ChannelReader<T> Merge<T>(
ChannelReader<T> first, ChannelReader<T> second)
{
var output = Channel.CreateUnbounded<T>();

Task.Run(async () =>
{
await foreach (var item in first.ReadAllAsync())
await output.Writer.WriteAsync(item);
});
Task.Run(async () =>
{
await foreach (var item in second.ReadAllAsync())
await output.Writer.WriteAsync(item);
});

return output;
}

ตัว merge รับ 2 channel แล้ว create 2 task เพื่อทำการ read จาก 2 channel และทำการ write ไปอีก channel เหมือนรูปด้านล่าง

เราก็จะใช้มันได้แบบนี้

var ch = Merge(CreateMessenger("Pom", 3), CreateMessenger("Ja", 5));

await foreach (var item in ch.ReadAllAsync())
Console.WriteLine(item);

แต่ code เรายังมี defect เพราะเรายังไม่ได้ Complete channel และ Merge<T> เรารับได้แค่ 2 channel เราสามารถปรับให้สมบูรณ์ได้ตามนี้

static ChannelReader<T> Merge<T>(params ChannelReader<T>[] inputs)
{
var output = Channel.CreateUnbounded<T>();

Task.Run(async () =>
{
async Task Redirect(ChannelReader<T> input) // local function
{
await foreach (var item in input.ReadAllAsync())
await output.Writer.WriteAsync(item);
}

await Task.WhenAll(inputs.Select(i => Redirect(i)).ToArray());
output.Writer.Complete(); // <== เรียก Complete
});

return output;
}

Demultiplexer

ในกรณีที่เรามีงานเยอะแล้วต้องการกระจายงานให้ หลายๆ consumers ช่วย เรามาทำการ Split<T> กัน

static IList<ChannelReader<T>> Split<T>(ChannelReader<T> ch, int n)
{
var outputs = new Channel<T>[n];

for (int i = 0; i < n; i++)
outputs[i] = Channel.CreateUnbounded<T>();

Task.Run(async () =>
{
var index = 0;
await foreach (var item in ch.ReadAllAsync())
{
await outputs[index].Writer.WriteAsync(item);
index = (index + 1) % n;
}

foreach (var ch in outputs)
ch.Writer.Complete();
});

return outputs.Select(ch => ch.Reader).ToArray();
}

Split<T> รับ channel มาแล้วสร้าง new reader ตาม parameter ที่ส่งมา

var pom = CreateMessenger("Pom", 10);
var readers = Split<string>(pom, 3);
var tasks = new List<Task>();

for (int i = 0; i < readers.Count; i++)
{
var reader = readers[i];
var index = i;
tasks.Add(Task.Run(async () =>
{
await foreach (var item in reader.ReadAllAsync())
Console.WriteLine($"Reader {index}: {item}");
}));
}

await Task.WhenAll(tasks);

Conclusion

บทความนี้เราได้ทำความรุ้จักว่า Channel คืออะไร และเรียนรู้ในการ implement publish/subscribe workflows และได้เรียนรู้การใช้งาน cpu เต็มที่ในการแยกการ Read/Write ออกจากกัน

Source code

https://github.com/execution/ChannelsCodeExample

--

--

Keattisak Chinburarat
Keattisak Chinburarat

Written by Keattisak Chinburarat

Father, Husband, and Engineering Technical @Agoda

No responses yet