Zero-allocation, extremely fast in-memory messaging library for .NET and Unity.
English | 日本語
Zero Messenger is a high-performance messaging library for .NET and Unity. It provides MessageBroker<T>
as an easy-to-use event system for subscribing and unsubscribing, as well as support for implementing the Pub/Sub pattern with IMessagePublisher<T>/IMessageSubscriber<T>
.
Zero Messenger is designed with performance as a priority, achieving faster Publish()
operations than libraries such as MessagePipe and VitalRouter. Moreover, there are no allocations during publishing.
Additionally, it minimizes allocations when constructing message pipelines compared to other libraries. Below is a benchmark result from executing Subscribe/Dispose
10,000 times.
Zero Messenger requires .NET Standard 2.1 or later. The package is available on NuGet.
dotnet add package ZeroMessenger
Install-Package ZeroMessenger
You can use Zero Messenger in Unity by utilizing NuGetForUnity. For more details, see the Unity section.
You can easily implement global Pub/Sub using MessageBroker<T>.Default
.
using System;
using ZeroMessenger;
// Subscribe to messages
var subscription = MessageBroker<Message>.Default.Subscribe(x =>
{
Console.WriteLine(x.Text);
});
// Publish a message
MessageBroker<Message>.Default.Publish(new Message("Hello!"));
// Unsubscribe
subscription.Dispose();
// Type used for the message
public record struct Message(string Text) { }
Additionally, an instance of MessageBroker<T>
can be used similarly to event
or Rx's Subject<T>
.
var broker = new MessageBroker<int>();
broker.Subscribe(x =>
{
Console.WriteLine(x);
});
broker.Publish(10);
broker.Dispose();
By adding Zero Messenger to a DI container, you can easily implement Pub/Sub between services.
Zero Messenger supports Pub/Sub on Microsoft.Extensions.DependencyInjection
, which requires the ZeroMessenger.DependencyInjection package.
dotnet add package ZeroMessenger.DependencyInjection
Install-Package ZeroMessenger.DependencyInjection
Adding services.AddZeroMessenger()
registers Zero Messenger in IServiceCollection
. The following example demonstrates Pub/Sub implementation on a Generic Host.
using ZeroMessenger;
using ZeroMessenger.DependencyInjection;
Host.CreateDefaultBuilder()
.ConfigureServices((context, services) =>
{
// Add Zero Messenger
services.AddZeroMessenger();
services.AddSingleton<ServiceA>();
services.AddSingleton<ServiceB>();
})
.Build()
.Run();
public record struct Message(string Text) { }
public class ServiceA
{
IMessagePublisher<Message> publisher;
public ServiceA(IMessagePublisher<Message> publisher)
{
this.publisher = publisher;
}
public Task SendAsync(CancellationToken cancellationToken = default)
{
publisher.Publish(new Message("Hello!"));
}
}
public class ServiceB : IDisposable
{
IDisposable subscription;
public ServiceB(IMessageSubscriber<Message> subscriber)
{
subscription = subscriber.Subscribe(x =>
{
Console.WriteLine(x);
});
}
public void Dispose()
{
subscription.Dispose();
}
}
The interfaces used for Pub/Sub are IMessagePublisher<T>
and IMessageSubscriber<T>
. The MessageBroker<T>
implements both of these interfaces.
public interface IMessagePublisher<T>
{
void Publish(T message, CancellationToken cancellationToken = default);
ValueTask PublishAsync(T message, AsyncPublishStrategy publishStrategy = AsyncPublishStrategy.Parallel, CancellationToken cancellationToken = default);
}
public interface IMessageSubscriber<T>
{
IDisposable Subscribe(MessageHandler<T> handler);
IDisposable SubscribeAwait(AsyncMessageHandler<T> handler, AsyncSubscribeStrategy subscribeStrategy = AsyncSubscribeStrategy.Sequential);
}
IMessagePublisher<T>
is an interface for publishing messages. You can publish messages using Publish()
, and with PublishAsync()
, you can wait for all processing to complete.
IMessagePublisher<Message> publisher;
// Publish a message (Fire-and-forget)
publisher.Publish(new Message("Foo!"));
// Publish a message and wait for all subscribers to finish processing
await publisher.PublishAsync(new Message("Bar!"), AsyncPublishStrategy.Parallel, cancellationToken);
You can specify AsyncPublishStrategy
to change how asynchronous message handlers are handled.
AsyncPublishStrategy |
- |
---|---|
AsyncPublishStrategy.Parallel |
All asynchronous message handlers are executed in parallel. |
AsyncPublishStrategy.Sequential |
Asynchronous message handlers are queued and executed one by one in order. |
IMessageSubscriber<T>
is an interface for subscribing to messages. It provides an extension method Subscribe()
that accepts an Action<T>
, allowing you to easily subscribe using lambda expressions. You can unsubscribe by calling Dispose()
on the returned IDisposable
.
IMessageSubscriber<Message> subscriber;
// Subscribe to messages
var subscription = subscriber.Subscribe(x =>
{
Console.WriteLine(x.Text);
});
// Unsubscribe
subscription.Dispose();
You can also perform asynchronous processing within the subscription using SubscribeAwait()
.
var subscription = subscriber.SubscribeAwait(async (x, ct) =>
{
await FooAsync(x, ct);
}, AsyncSubscribeStrategy.Sequential);
By specifying AsyncSubscribeStrategy
, you can change how messages are handled when received during processing.
AsyncSubscribeStrategy |
- |
---|---|
AsyncSubscribeStrategy.Sequential |
Messages are queued and executed in order. |
AsyncSubscribeStrategy.Parallel |
Messages are executed in parallel. |
AsyncSubscribeStrategy.Switch |
Cancels the ongoing processing and executes the new message. |
AsyncSubscribeStrategy.Drop |
Ignores new messages during ongoing processing. |
Filters allow you to add processing before and after message handling.
To create a new filter, define a class that implements IMessageFilter<T>
.
public class NopFilter<T> : IMessageFilter<T>
{
public async ValueTask InvokeAsync(T message, CancellationToken cancellationToken, Func<T, CancellationToken, ValueTask> next)
{
try
{
// Call the next processing step
await next(message, cancellationToken);
}
catch
{
throw;
}
finally
{
}
}
}
The definition of IMessageFilter<T>
adopts the async decorator pattern, which is also used in ASP.NET Core middleware.
Here’s an example of a filter that adds logging before and after processing.
public class LoggingFilter<T> : IMessageFilter<T>
{
public async ValueTask InvokeAsync(T message, CancellationToken cancellationToken, Func<T, CancellationToken, ValueTask> next)
{
Console.WriteLine("Before");
await next(message, cancellationToken);
Console.WriteLine("After");
}
}
There are several ways to add a created filter.
If adding directly to MessageBroker<T>
, use AddFilter<T>()
. The order of filter application will follow the order of addition.
var broker = new MessageBroker<int>();
// Add a filter
broker.AddFilter<LoggingFilter<int>>();
To add a global filter to a publisher in the DI container, configure it within the AddZeroMessenger()
method.
Host.CreateDefaultBuilder()
.ConfigureServices((context, services) =>
{
services.AddZeroMessenger(messenger =>
{
// Specify the type to add
messenger.AddFilter<LoggingFilter<Message>>();
// Add with open generics
messenger.AddFilter(typeof(LoggingFilter<>));
});
})
.Build()
.Run();
To add individual filters when subscribing, you can use the WithFilter<T>() / WithFilters()
extension methods.
IMessageSubscriber<Message> subscriber;
subscriber
.WithFilter<LoggingFilter<Message>>()
.Subscribe(x =>
{
});
Zero Messenger provides PredicateFilter<T>
. When you pass a Predicate<T>
as an argument to AddFilter<T>()
or WithFilter<T>()
, a PredicateFilter<T>
created based on that predicate is automatically added.
public record struct FooMessage(int Value);
IMessageSubscriber<FooMessage> subscriber;
subscriber
.WithFilter(x => x.Value >= 0) // Exclude values less than 0
.Subscribe(x =>
{
});
Zero Messenger supports integration with Cysharp/R3. To enable this feature, add the ZeroMessenger.R3
package.
dotnet add package ZeroMessenger.R3
Install-Package ZeroMessenger.R3
By adding ZeroMessenger.R3, you gain access to operators for converting IMessageSubscriber<T>
to Observable<T>
and connecting Observable<T>
to IMessagePublisher<T>
.
// Convert IMessageSubscriber<T> to Observable<T>
subscriber.ToObservable()
.Subscribe(x => { });
// Subscribe to Observable<T> and convert it to IMessagePublisher<T>'s Publish()
observable.SubscribeToPublish(publisher);
// SubscribeAwait to Observable<T> and convert it to IMessagePublisher<T>'s PublishAsync()
observable.SubscribeAwaitToPublish(publisher, AwaitOperation.Sequential, AsyncPublishStrategy.Parallel);
You can use Zero Messenger in Unity by installing NuGet packages via NugetForUnity.
- Unity 2021.3 or later
-
Install NugetForUnity.
-
Open the NuGet window by selecting
NuGet > Manage NuGet Packages
, search for theZeroMessenger
package, and install it.
There is also an extension package available for handling Zero Messenger with VContainer's DI container.
To install ZeroMessenger.VContainer, open the Package Manager window by selecting Window > Package Manager
, then use [+] > Add package from git URL
and enter the following URL:
https://github.com/AnnulusGames/ZeroMessenger.git?path=src/ZeroMessenger.Unity/Assets/ZeroMessenger.VContainer
By introducing ZeroMessenger.VContainer, the IContainerBuilder
gains the AddZeroMessenger()
extension method. Calling this method adds Zero Messenger to the DI container, allowing IMessagePublisher<T>
and IMessageSubscriber<T>
to be injected.
using VContainer;
using VContainer.Unity;
using ZeroMessenger.VContainer;
public class ExampleLifetimeScope : LifetimeScope
{
protected override void Configure(IContainerBuilder builder)
{
// Add Zero Messenger
builder.AddZeroMessenger();
}
}
Note
AddZeroMessenger()
registers using Open Generics, which may not work with IL2CPP versions prior to Unity 2022.1.
This library is released under the MIT License.