.NET Aspire Apache Kafka integration
In this article, you learn how to use the .NET Aspire Apache Kafka client message-broker. The Aspire.Confluent.Kafka
library registers an IProducer<TKey, TValue>
and an IConsumer<TKey, TValue>
in the dependency injection (DI) container for connecting to a Apache Kafka server. It enables corresponding health check, logging and telemetry.
Get started
To get started with the .NET Aspire Apache Kafka integration, install the Aspire.Confluent.Kafka NuGet package in the client-consuming project, i.e., the project for the application that uses the Apache Kafka client.
dotnet add package Aspire.Confluent.Kafka
For more information, see dotnet add package or Manage package dependencies in .NET applications.
Example usage
In the Program.cs file of your client-consuming project, call the AddKafkaProducer extension method to register an IProducer<TKey, TValue>
for use via the dependency injection container. The method takes two generic parameters corresponding to the type of the key and the type of the message to send to the broker. These generic parameters will be used to new an instance of ProducerBuilder<TKey, TValue>
. This method also take connection name parameter.
builder.AddKafkaProducer<string, string>("messaging");
You can then retrieve the IProducer<TKey, TValue>
instance using dependency injection. For example, to retrieve the producer from an IHostedService
:
internal sealed class MyWorker(IProducer<string, string> producer) : BackgroundService
{
// Use producer...
}
App host usage
To model the Kafka resource in the app host, install the Aspire.Hosting.Kafka NuGet package in the app host project.
dotnet add package Aspire.Hosting.Kafka
In your app host project, register a Kafka container and consume the connection using the following methods:
var builder = DistributedApplication.CreateBuilder(args);
var messaging = builder.AddKafka("messaging")
.WithKafkaUI();
var myService = builder.AddProject<Projects.MyService>()
.WithReference(messaging);
The WithKafkaUI()
extension method which provides a web-based interface to view the state of the Kafka container instance. The WithReference
method configures a connection in the MyService
project named messaging
. In the Program.cs file of MyService
, the Apache Kafka broker connection can be consumed using:
builder.AddKafkaProducer<string, string>("messaging");
or
builder.AddKafkaConsumer<string, string>("messaging");
Configuration
The .NET Aspire Apache Kafka integration provides multiple options to configure the connection based on the requirements and conventions of your project.
Use a connection string
When using a connection string from the ConnectionStrings
configuration section, you can provide the name of the connection string when calling builder.AddKafkaProducer()
or builder.AddKafkaProducer()
:
builder.AddKafkaProducer<string, string>("myConnection");
And then the connection string will be retrieved from the ConnectionStrings
configuration section:
{
"ConnectionStrings": {
"myConnection": "broker:9092"
}
}
The value provided as connection string will be set to the BootstrapServers
property of the produced IProducer<TKey, TValue>
or IConsumer<TKey, TValue>
instance. Refer to BootstrapServers for more information.
Use configuration providers
The .NET Aspire Apache Kafka integration supports Microsoft.Extensions.Configuration. It loads the KafkaProducerSettings
or KafkaConsumerSettings
from configuration by respectively using the Aspire:Confluent:Kafka:Producer
and Aspire.Confluent:Kafka:Consumer
keys. This example appsettings.json configures some of the options:
{
"Aspire": {
"Confluent": {
"Kafka": {
"Producer": {
"DisableHealthChecks": false,
"Config": {
"Acks": "All"
}
}
}
}
}
}
The Config
properties of both Aspire:Confluent:Kafka:Producer
and Aspire.Confluent:Kafka:Consumer
configuration sections respectively bind to instances of ProducerConfig
and ConsumerConfig
.
Confluent.Kafka.Consumer<TKey, TValue>
requires the ClientId
property to be set to let the broker track consumed message offsets.
Use inline delegates
Configuring KafkaProducerSettings
and KafkaConsumerSettings
You can pass the Action<KafkaProducerSettings> configureSettings
delegate to set up some or all the options inline, for example to disable health checks from code:
builder.AddKafkaProducer<string, string>("messaging", settings => settings.DisableHealthChecks = true);
You can configure inline a consumer from code:
builder.AddKafkaConsumer<string, string>("messaging", settings => settings.DisableHealthChecks = true);
Configuring ProducerBuilder<TKey, TValue>
and ConsumerBuilder<TKey, TValue>
To configure Confluent.Kafka
builders, pass an Action<ProducerBuilder<TKey, TValue>>
(or Action<ConsumerBuilder<TKey, TValue>>
):
builder.AddKafkaProducer<string, MyMessage>("messaging", producerBuilder => {
producerBuilder.SetValueSerializer(new MyMessageSerializer());
})
Refer to ProducerBuilder<TKey, TValue>
and ConsumerBuilder<TKey, TValue>
API documentation for more information.
Health checks
By default, .NET Aspire integrations enable health checks for all services. For more information, see .NET Aspire integrations overview.
The .NET Aspire Apache Kafka integration handles the following:
- Adds the
Aspire.Confluent.Kafka.Producer
health check when KafkaProducerSettings.DisableHealthChecks isfalse
. - Adds the
Aspire.Confluent.Kafka.Consumer
health check when KafkaConsumerSettings.DisableHealthChecks isfalse
. - Integrates with the
/health
HTTP endpoint, which specifies all registered health checks must pass for app to be considered ready to accept traffic.
Observability and telemetry
.NET Aspire integrations automatically set up Logging, Tracing, and Metrics configurations, which are sometimes known as the pillars of observability. For more information about integration observability and telemetry, see .NET Aspire integrations overview. Depending on the backing service, some integrations may only support some of these features. For example, some integrations support logging and tracing, but not metrics. Telemetry features can also be disabled using the techniques presented in the Configuration section.
Logging
The .NET Aspire Apache Kafka integration uses the following log categories:
Aspire.Confluent.Kafka
Tracing
The .NET Aspire Apache Kafka integration will emit the following tracing activities using OpenTelemetry:
Aspire.Confluent.Kafka
Metrics
The .NET Aspire Apache Kafka integration will emit the following metrics using OpenTelemetry:
messaging.kafka.network.tx
messaging.kafka.network.transmitted
messaging.kafka.network.rx
messaging.kafka.network.received
messaging.publish.messages
messaging.kafka.message.transmitted
messaging.receive.messages
messaging.kafka.message.received
See also
.NET Aspire