Stream Processing
INFO
Remember that you can download the samples from the GitHub repository.
This sample makes use of Kafka for the streams.
If you use Docker you can get up-and-running with the following docker-compose.yml
file:
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
hostname: zookeeper
container_name: shuttle-stream-zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-server:latest
hostname: broker
container_name: shuttle-stream-broker
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
kafka-ui:
container_name: shuttle-stream-kafka-ui
image: provectuslabs/kafka-ui:latest
ports:
- 8080:8080
depends_on:
- zookeeper
- broker
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: broker:29092
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper0:2181
Once you have opened the Shuttle.StreamProcessing.sln
solution in Visual Studio set the following projects as startup projects:
- Shuttle.StreamProcessing.Producer
- Shuttle.StreamProcessing.Consumer
Implementation
In order to get any processing done in Shuttle.Esb a message will need to be produced and sent to a stream, usually represented by a topic, for processing.
In this guide we'll create the following projects:
Shuttle.StreamProcessing.Producer
(Console Application)Shuttle.StreamProcessing.Consumer
(Console Application)Shuttle.StreamProcessing.Messages
(Class Library)
Messages
Create a new class library called
Shuttle.StreamProcessing.Messages
with a solution calledShuttle.StreamProcessing
Note: remember to change the Solution name.
TemperatureRead
Rename the
Class1
default file toTemperatureRead
and add aName
,Minute
and 'Celsius' property.
namespace Shuttle.StreamProcessing.Messages
{
public class TemperatureRead
{
public string Name { get; set; }
public int Minute { get; set; }
public decimal Celsius { get; set; }
}
}
Producer
Add a new
Console Application
to the solution calledShuttle.StreamProcessing.Producer
.
Install the
Shuttle.Esb.Kafka
nuget package.
This will provide access to the Kafka IQueue
implementation and also include the required dependencies.
Install the
Microsoft.Extensions.Configuration.Json
nuget package.
This will provide the ability to read the appsettings.json
file.
Add a reference to the
Shuttle.StreamProcessing.Messages
project.
Program
Implement the producer code as follows:
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Shuttle.Esb;
using Shuttle.Esb.Kafka;
using Shuttle.StreamProcessing.Messages;
namespace Shuttle.StreamProcessing.Producer
{
internal class Program
{
private static async Task Main(string[] args)
{
var services = new ServiceCollection();
var configuration = new ConfigurationBuilder().AddJsonFile("appsettings.json").Build();
services.AddSingleton<IConfiguration>(configuration);
services.AddServiceBus(builder =>
{
configuration.GetSection(ServiceBusOptions.SectionName).Bind(builder.Options);
});
services.AddKafka(builder =>
{
builder.AddOptions("local", new KafkaOptions
{
BootstrapServers = "localhost:9092",
EnableAutoCommit = true,
EnableAutoOffsetStore = true,
NumPartitions = 1,
UseCancellationToken = false,
ConsumeTimeout = TimeSpan.FromMilliseconds(25)
});
});
Console.WriteLine("Type a name for the set of readings, then press [enter] to submit; an empty line submission stops execution:");
Console.WriteLine();
var random = new Random();
decimal temperature = random.Next(-5, 30);
using (var serviceBus = services.BuildServiceProvider().GetRequiredService<IServiceBus>().Start())
{
string name;
while (!string.IsNullOrEmpty(name = Console.ReadLine()))
{
for (var minute = 0; minute < 1440; minute++)
{
serviceBus.Send(new TemperatureRead
{
Name = name,
Minute = minute,
Celsius = temperature
});
if (temperature > -5 && (random.Next(0, 100) < 50 || temperature > 29))
{
temperature -= random.Next(0, 100) / 100m;
}
else
{
temperature += random.Next(0, 100) / 100m;
}
}
}
}
}
}
}
Producer configuration file
Add an
appsettings.json
file as follows:
{
"Shuttle": {
"ServiceBus": {
"MessageRoutes": [
{
"Uri": "kafka://local/stream-consumer-work",
"Specifications": [
{
"Name": "StartsWith",
"Value": "Shuttle.StreamProcessing.Messages"
}
]
}
]
}
}
}
This tells the service bus that all messages sent having a type name starting with Shuttle.StreamProcessing.Messages
should be routed to endpoint kafka://local/stream-consumer-work
.
Consumer
Add a new
Console Application
to the solution calledShuttle.StreamProcessing.Consumer
.
Install the
Shuttle.Esb.Kafka
nuget package.
This will provide access to the Kafka IQueue
implementation and also include the required dependencies.
Install the
Microsoft.Extensions.Hosting
nuget package.
This allows a console application to be hosted using the .NET generic host.
Install the
Microsoft.Extensions.Configuration.Json
nuget package.
This will provide the ability to read the appsettings.json
file.
Add a reference to the
Shuttle.StreamProcessing.Messages
project.
Program
Implement the
Program
class as follows:
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Shuttle.Esb;
using Shuttle.Esb.Kafka;
namespace Shuttle.StreamProcessing.Consumer
{
internal class Program
{
private static async Task Main()
{
await Host.CreateDefaultBuilder()
.ConfigureServices(services =>
{
var configuration = new ConfigurationBuilder().AddJsonFile("appsettings.json").Build();
services.AddSingleton<IConfiguration>(configuration);
services.AddServiceBus(builder =>
{
configuration.GetSection(ServiceBusOptions.SectionName).Bind(builder.Options);
builder.Options.Asynchronous = true;
});
services.AddKafka(builder =>
{
builder.AddOptions("local", new KafkaOptions
{
BootstrapServers = "localhost:9092",
EnableAutoCommit = true,
EnableAutoOffsetStore = true,
NumPartitions = 1,
UseCancellationToken = false,
ConsumeTimeout = TimeSpan.FromMilliseconds(25)
});
});
})
.Build()
.RunAsync();
}
}
}
Consumer configuration file
Add an
appsettings.json
file as follows:
{
"Shuttle": {
"ServiceBus": {
"Inbox": {
"WorkQueueUri": "kafka://local/stream-consumer-work"
}
}
}
}
TemperatureReadHandler
Add a new class called
TemperatureReadHandler
that implements theIMessageHandler<TemperatureReadHandler>
interface as follows:
using Shuttle.Esb;
using Shuttle.StreamProcessing.Messages;
namespace Shuttle.StreamProcessing.Consumer;
public class TemperatureReadHandler : IAsyncMessageHandler<TemperatureRead>
{
public async Task ProcessMessageAsync(IHandlerContext<TemperatureRead> context)
{
Console.WriteLine($"[TEMPERATURE READ] : name = '{context.Message.Name}' / minute = {context.Message.Minute} / celsius = {context.Message.Celsius:F}");
await Task.CompletedTask;
}
}
Run
Set both the producer and consumer projects as startup projects.
Execute
In order to successfully execute the solution you would need a locally accessible Kafka instance.
Execute the application.
The producer application will wait for you to input a reading set name. For this example enter today's date and press enter:
INFO
You will observe that the consumer application has processed the messages.
You have now completed a full stream processing implementation.