Skip to content
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
665571b
avro basic sample
lailabougria Aug 6, 2025
bcc9323
remove unneeded references
lailabougria Aug 6, 2025
1ca06c9
Null check
andreasohlund Aug 6, 2025
e9be722
Formatting
andreasohlund Aug 6, 2025
de09014
Tweaks
andreasohlund Aug 7, 2025
d8f205f
Return empty array when message type is not known
andreasohlund Aug 7, 2025
c899b51
Thrown better exception when schema not found
andreasohlund Aug 7, 2025
7294ca7
Naming
andreasohlund Aug 11, 2025
99ad207
Use record
andreasohlund Aug 11, 2025
4e9dfd2
Basic sample instructions
andreasohlund Aug 12, 2025
315b358
Switch to json
andreasohlund Aug 12, 2025
77bff4d
Link to sample for from shape the future
andreasohlund Aug 12, 2025
edd2723
Add avro landing page
andreasohlund Aug 12, 2025
d4093b2
Throw when asked to infer message type from payload
andreasohlund Aug 13, 2025
dec22d3
Document interface messages not being supported
andreasohlund Aug 13, 2025
894d57f
Add note about not suitable for production use
andreasohlund Aug 13, 2025
7d9548e
Add comment to the body logger
andreasohlund Aug 13, 2025
2cca73a
Use unrecoverable exception
andreasohlund Aug 13, 2025
8bb3d48
Add note about what happens when schema is not found
andreasohlund Aug 13, 2025
147056b
Add code section
andreasohlund Aug 13, 2025
c2f95d0
Update sample.md
andreasohlund Aug 13, 2025
661cce7
Update sample.md
andreasohlund Aug 13, 2025
015181b
Update menu.yaml
lailabougria Aug 13, 2025
79ae94e
Update avro.md
lailabougria Aug 13, 2025
f0f6f08
Update sample.md
lailabougria Aug 13, 2025
2df8f65
Update sample.md
lailabougria Aug 13, 2025
dffc219
Update sample.md
lailabougria Aug 13, 2025
a10745c
throw a messagedeserializationexception is schema is not found
lailabougria Aug 13, 2025
f90baee
Update sample.md
lailabougria Aug 13, 2025
42bde9a
links
lailabougria Aug 13, 2025
386587b
fix link
lailabougria Aug 13, 2025
6a20afb
align titles
lailabougria Aug 13, 2025
1c3e082
menu title
lailabougria Aug 13, 2025
d994df1
link to issue
lailabougria Aug 13, 2025
4530ebb
fix note display
jpalac Aug 13, 2025
0189ef2
Add Avro logo and make call-to-action more inviting for use case feed…
jasontaylordev Aug 14, 2025
dc5ed1d
Enable implicit usings and clean-up imports
jasontaylordev Aug 14, 2025
6f7089c
Avoid line-wrapping within docs snippets
jasontaylordev Aug 14, 2025
cf085b4
Add intro sentences for configuration, sending message, and output ex…
jasontaylordev Aug 14, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions menu/menu.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1721,6 +1721,8 @@
Url: shape-the-future
- Title: NServiceBus and CloudEvents
Url: shape-the-future/cloudevents
- Title: NServiceBus and Apache Avro
Url: shape-the-future/avro
- Title: NServiceBus and .NET Aspire
Url: shape-the-future/aspire
- Title: NServiceBus and Redis
Expand Down
19 changes: 19 additions & 0 deletions samples/serializers/avro/Core_9/AvroSample.sln
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 16
VisualStudioVersion = 16.0.29728.190
MinimumVisualStudioVersion = 15.0.26730.12
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Sample", "Sample\Sample.csproj", "{6E6C9C2A-8D9B-41AF-B5E5-1AF5310686E7}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{6E6C9C2A-8D9B-41AF-B5E5-1AF5310686E7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{6E6C9C2A-8D9B-41AF-B5E5-1AF5310686E7}.Debug|Any CPU.Build.0 = Debug|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
EndGlobal
54 changes: 54 additions & 0 deletions samples/serializers/avro/Core_9/Sample/AvroMessageSerializer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
using System;
using System.Collections.Generic;
using System.IO;
using Avro.IO;
using Avro.Reflect;
using NServiceBus;
using NServiceBus.Serialization;

#region serializer-implementation
public class AvroMessageSerializer(SchemaRegistry schemaRegistry, ClassCache classCache) : IMessageSerializer
{
public string ContentType => "avro/json";

public void Serialize(object message, Stream stream)
{
var messageType = message.GetType();
var schema = schemaRegistry.GetSchema(messageType);
var writer = new ReflectDefaultWriter(messageType, schema, classCache);

var encoder = new JsonEncoder(schema, stream);

writer.Write(message, encoder);

encoder.Flush();
}

public object[] Deserialize(ReadOnlyMemory<byte> body, IList<Type> messageTypes = null)
{
if (messageTypes == null)
{
throw new MessageDeserializationException("Avro is not able to infer message types from the body content only, the NServiceBus.EnclosedMessageTypes header must be present");
}

var messages = new List<object>();
foreach (var messageType in messageTypes)
{
try
{
var schema = schemaRegistry.GetSchema(messageType);
var reader = new ReflectDefaultReader(messageType, schema, schema, classCache);
using var stream = new ReadOnlyStream(body);
var message = reader.Read(null, schema, schema, new JsonDecoder(schema, stream));
messages.Add(message);
}
catch (KeyNotFoundException)
{
throw new MessageDeserializationException($"No schema found for message type {messageType.FullName}");
}
}

return messages.ToArray();
}
}
#endregion
44 changes: 44 additions & 0 deletions samples/serializers/avro/Core_9/Sample/AvroSerializer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
using System;
using System.IO;
using System.Linq;
using System.Reflection;
using Avro;
using Avro.Reflect;
using NServiceBus.MessageInterfaces;
using NServiceBus.Serialization;
using NServiceBus.Settings;
using NServiceBus.Unicast.Messages;

#region serializer-definition
public class AvroSerializer : SerializationDefinition
{
public override Func<IMessageMapper, IMessageSerializer> Configure(IReadOnlySettings settings)
{
var registry = settings.Get<MessageMetadataRegistry>();
var messageTypes = registry.GetAllMessages().Select(m => m.MessageType);
var schemaCache = new SchemaRegistry();
var assembly = Assembly.GetExecutingAssembly();

foreach (var messageType in messageTypes)
{
var manifestNamespace = "Sample.";
var schemaResourceName = manifestNamespace + messageType.Name + ".avsc";
using var stream = assembly.GetManifestResourceStream(schemaResourceName);

if (stream == null)
{
throw new InvalidOperationException($"Resource '{schemaResourceName}' not found in assembly '{assembly.FullName}'.");
}

// Load the schema from the embedded resource
using var reader = new StreamReader(stream);
var schemaJson = reader.ReadToEnd();

// Parse and cache the schema
schemaCache.Add(messageType, Schema.Parse(schemaJson));
}

return _ => new AvroMessageSerializer(schemaCache, new ClassCache());
}
}
#endregion
42 changes: 42 additions & 0 deletions samples/serializers/avro/Core_9/Sample/CreateOrder.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
{
"type": "record",
"name": "CreateOrder",
"namespace": "net.particular.samples.serializers.avro",
"fields": [
{
"name": "OrderId",
"type": "int"
},
{
"name": "Date",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
}
},
{
"name": "CustomerId",
"type": "int"
},
{
"name": "OrderItems",
"type": {
"type": "array",
"items": {
"type": "record",
"name": "OrderItem",
"fields": [
{
"name": "ItemId",
"type": "int"
},
{
"name": "Quantity",
"type": "int"
}
]
}
}
}
]
}
11 changes: 11 additions & 0 deletions samples/serializers/avro/Core_9/Sample/CreateOrder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using System;
using System.Collections.Generic;
using NServiceBus;

public record CreateOrder : IMessage
{
public int OrderId { get; set; }
public DateTime Date { get; set; }
public int CustomerId { get; set; }
public List<OrderItem> OrderItems { get; set; }
}
13 changes: 13 additions & 0 deletions samples/serializers/avro/Core_9/Sample/CreateOrderHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using NServiceBus;

public class CreateOrderHandler(ILogger<CreateOrderHandler> logger) :
IHandleMessages<CreateOrder>
{
public Task Handle(CreateOrder message, IMessageHandlerContext context)
{
logger.LogInformation($"Order {message.OrderId} received");
return Task.CompletedTask;
}
}
46 changes: 46 additions & 0 deletions samples/serializers/avro/Core_9/Sample/InputLoopService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using NServiceBus;

public class InputLoopService(IMessageSession messageSession) : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
Console.WriteLine("Press any key, to send a message");
Console.ReadKey();

#region message

var message = new CreateOrder
{
OrderId = 9,
Date = DateTime.Now,
CustomerId = 12,
OrderItems =
[
new OrderItem
{
ItemId = 6,
Quantity = 2
},

new OrderItem
{
ItemId = 5,
Quantity = 4
}
]
};

await messageSession.SendLocal(message, cancellationToken: stoppingToken);

#endregion

Console.WriteLine("Message Sent");
}
}
}
20 changes: 20 additions & 0 deletions samples/serializers/avro/Core_9/Sample/MessageBodyLogger.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using System;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using NServiceBus.Pipeline;

// The behavior is only needed to print the serialized avro message content to the console
public class MessageBodyLogger(ILogger<MessageBodyLogger> logger) : Behavior<IIncomingPhysicalMessageContext>
{
public override Task Invoke(IIncomingPhysicalMessageContext context, Func<Task> next)
{
var bodyAsString = Encoding.UTF8
.GetString(context.Message.Body.ToArray());

logger.LogInformation("Serialized Message Body:");
logger.LogInformation(bodyAsString);

return next();
}
}
5 changes: 5 additions & 0 deletions samples/serializers/avro/Core_9/Sample/OrderItem.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
public class OrderItem
{
public int ItemId { get; set; }
public int Quantity { get; set; }
}
25 changes: 25 additions & 0 deletions samples/serializers/avro/Core_9/Sample/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
using System;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using NServiceBus;
using NServiceBus.MessageMutator;

Console.Title = "Avro";

var builder = Host.CreateApplicationBuilder(args);
builder.Services.AddHostedService<InputLoopService>();

#region config

var endpointConfiguration = new EndpointConfiguration("Samples.Serialization.Avro");

endpointConfiguration.UseSerialization<AvroSerializer>();

#endregion

endpointConfiguration.UseTransport(new LearningTransport());

endpointConfiguration.Pipeline.Register(typeof(MessageBodyLogger), "Logs the message body received");

builder.UseNServiceBus(endpointConfiguration);
await builder.Build().RunAsync();
55 changes: 55 additions & 0 deletions samples/serializers/avro/Core_9/Sample/ReadOnlyStream.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
using System;
using System.IO;

public class ReadOnlyStream(ReadOnlyMemory<byte> memory) : Stream
{
int position = 0;

public override void Flush() => throw new NotSupportedException();

public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();

public override void SetLength(long value) => throw new NotSupportedException();

public override int Read(byte[] buffer, int offset, int count)
{
var bytesToCopy = Math.Min(count, memory.Length - position);

var destination = buffer.AsSpan().Slice(offset, bytesToCopy);
var source = memory.Span.Slice(position, bytesToCopy);

source.CopyTo(destination);

position += bytesToCopy;

return bytesToCopy;
}

public override int Read(Span<byte> buffer)
{
var bytesToCopy = Math.Min(memory.Length - position, buffer.Length);
if (bytesToCopy <= 0)
{
return 0;
}

var source = memory.Span.Slice(position, bytesToCopy);
source.CopyTo(buffer);

position += bytesToCopy;
return bytesToCopy;
}

public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();

public override bool CanRead => true;
public override bool CanSeek => false;
public override bool CanWrite => false;
public override long Length => memory.Length;

public override long Position
{
get => position;
set => position = (int)value;
}
}
15 changes: 15 additions & 0 deletions samples/serializers/avro/Core_9/Sample/Sample.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFrameworks>net9.0;net8.0</TargetFrameworks>
<OutputType>Exe</OutputType>
<LangVersion>12.0</LangVersion>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="NServiceBus" Version="9.*" />
<PackageReference Include="NServiceBus.Extensions.Hosting" Version="3.*" />
<PackageReference Include="Apache.Avro" Version="1.12.*" />
</ItemGroup>
<ItemGroup>
<EmbeddedResource Include="CreateOrder.avsc" />
</ItemGroup>
</Project>
Loading
Loading