Further to my prior post on Channels, in this post I will go over implementation details and explore System.Threading.Channels
.
The static factory methods of the
Channel
class can be used to create Channels (see the code snippet below). The type parameter T
is used to define the type of
object that can be handled by the created Channel; and passed from a publisher to a subscriber.
public static class Channel
{
public static Channel<T> CreateUnbounded<T>();
public static Channel<T> CreateUnbounded<T>(UnboundedChannelOptions options);
public static Channel<T> CreateBounded<T>(int capacity);
public static Channel<T> CreateBounded<T>(BoundedChannelOptions options);
}
You can create a channel with unlimited capacity using CreateUnbounded<T>
. As cool as this is, probably a little dangerous
if your producer outpaces your consumer. Without a capacity limit, the channel will keep accepting new items. As the consumer
fails to keep up, the number of queued items will continually increase. Memory consumption will spike and not be released until
items in the channel have been handled by the consumer.
CreateBounded<T>
creates a channel with an explicit capacity (maintained by the implementation). At the point at which the
channel fills up TryWrite
will return false
. ChannelWriter<T>
provides a WriteAsync
method designed to deal with the
instance described above, where the channel is full and writing needs to wait. Thi is a form of backpressure, WriteAsync
can be used with the producer (awaiting) the result of WriteAsync
and only being allowed to continue when room becomes
available.
CreateUnbounded<T>
and CreateBounded<T>
contain overloads which take
BoundedChannelOptions
that lets you configure the channel, this includes;
AllowSynchronousContinuations
:SingleReader
SingleWriter
Capacity
Creating a channel;
var capacity = 100;
var channel = Channel.CreateBounded<string>(capacity);
Writing to a channel;
var writer = channel.Writer;
await writer.WriteAsync("this is the first message");
Reading from a channel;
var reader = channel.Reader;
while (await reader.WaitToReadAsync())
{
if (reader.TryRead(out var msg))
{
Console.WriteLine(msg);
}
}
The reader
and writer
properties expose the following abstract classes;
public abstract class ChannelWriter<T>
{
public abstract bool TryWrite(T item);
public virtual ValueTask WriteAsync(T item, CancellationToken cancellationToken = default);
public abstract ValueTask<bool> WaitToWriteAsync(CancellationToken cancellationToken = default);
public void Complete(Exception error);
public virtual bool TryComplete(Exception error);
}
public abstract class ChannelReader<T>
{
public abstract bool TryRead(out T item);
public virtual ValueTask<T> ReadAsync(CancellationToken cancellationToken = default)
public abstract ValueTask<bool> WaitToReadAsync(CancellationToken cancellationToken = default);
public virtual Task Completion { get; }
}
Breakdown of the splits between producer-consumer;
Through my reading and research I also came across System.Threading.Tasks.Dataflow
which I am keen to learn more about.
As Stephen Toub mentions in his devblogs post, if all you need to do is hand-off data to producer(s) and consumer(s) Channels is a much simpler, leaner bet.
There are some cool developments in the GitHub repo, particularly the Issues section on new features and community discussions.
Thanks for taking the time to read this post!