Kafka
PM> Install-Package Shuttle.Esb.Kafka
Configuration
The URI structure is kafka://configuration-name/queue-name
.
c#
services.AddKafka(builder =>
{
var kafkaOptions = new KafkaOptions
{
BootstrapServers = "localhost:9092",
ReplicationFactor = 1,
NumPartitions = 1,
MessageSendMaxRetries = 3,
RetryBackoff = TimeSpan.FromSeconds(1),
EnableAutoCommit = false,
EnableAutoOffsetStore = false,
FlushEnqueue = false,
UseCancellationToken = true,
ConsumeTimeout = TimeSpan.FromSeconds(30),
OperationTimeout = TimeSpan.FromSeconds(30),
ConnectionsMaxIdle = TimeSpan.Zero,
Acks = Acks.All,
EnableIdempotence = true
};
kafkaOptions.ConfigureConsumer += (sender, args) =>
{
Console.WriteLine($"[event] : ConfigureConsumer / Uri = '{((IQueue)sender).Uri}'");
};
kafkaOptions.ConfigureProducer += (sender, args) =>
{
Console.WriteLine($"[event] : ConfigureProducer / Uri = '{((IQueue)sender).Uri}'");
};
builder.AddOptions("local", kafkaOptions);
});
The ConfigureConsumer
event args
arugment exposes the ConsumerConfig
directly for any specific options that need to be set. Similarly, the ConfigureProducer
event args
arugment exposes the ProducerConfig
.
The default JSON settings structure is as follows:
json
{
"Shuttle": {
"Kafka": {
"local": {
"BootstrapServers": "localhost:9092",
"ReplicationFactor": 1,
"NumPartitions": 1,
"MessageSendMaxRetries": 3,
"RetryBackoff": "00:00:01",
"EnableAutoCommit": false,
"EnableAutoOffsetStore": false,
"FlushEnqueue": false,
"UseCancellationToken": true,
"ConsumeTimeout": "00:00:30",
"OperationTimeout": "00:00:30",
"ConnectionsMaxIdle": "00:00:00",
"Acks": "All",
"EnableIdempotence": true,
}
}
}
}
Options
Option | Default | Description |
---|---|---|
BootstrapServers | Initial list of brokers as a CSV list of broker host or host:port. | |
ReplicationFactor | 1 | The replication factor for the new topic or -1 (the default) if a replica assignment is specified instead. |
NumPartitions | 1 | The number of partitions for the new topic or -1 (the default) if a replica assignment is specified. |
MessageSendMaxRetries | 3 | How many times to retry sending a failing Message. Note: retrying may cause reordering unless enable.idempotence is set to true. |
RetryBackoff | "00:00:01" | The backoff time before retrying a protocol request. |
EnableAutoCommit | false | Automatically and periodically commit offsets in the background. |
EnableAutoOffsetStore | false | Automatically store offset of last message provided to application. |
FlushEnqueue | false | If true will call Flush on the producer after a message has been enqueued. |
UseCancellationToken | true | Indicates whether a cancellation token is used for relevant methods. |
ConsumeTimeout | "00:00:30" | The duration to poll for messages before returning null , when the cancellation token is not used. |
OperationTimeout | "00:00:30" | The duration to wait for relevant async methods to complete before timing out. |
ConnectionsMaxIdle | "00:00:00" | Close broker connections after the specified time of inactivity. |
Acks | "All" | This field indicates the number of acknowledgements the leader broker must receive from ISR brokers before responding to the request. |
EnableIdempotence | true | When set to true , the producer will ensure that messages are successfully produced exactly once and in the original produce order. |