-
Notifications
You must be signed in to change notification settings - Fork 299
Avro sample #7612
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Avro sample #7612
Changes from 34 commits
Commits
Show all changes
39 commits
Select commit
Hold shift + click to select a range
665571b
avro basic sample
lailabougria bcc9323
remove unneeded references
lailabougria 1ca06c9
Null check
andreasohlund e9be722
Formatting
andreasohlund de09014
Tweaks
andreasohlund d8f205f
Return empty array when message type is not known
andreasohlund c899b51
Thrown better exception when schema not found
andreasohlund 7294ca7
Naming
andreasohlund 99ad207
Use record
andreasohlund 4e9dfd2
Basic sample instructions
andreasohlund 315b358
Switch to json
andreasohlund 77bff4d
Link to sample for from shape the future
andreasohlund edd2723
Add avro landing page
andreasohlund d4093b2
Throw when asked to infer message type from payload
andreasohlund dec22d3
Document interface messages not being supported
andreasohlund 894d57f
Add note about not suitable for production use
andreasohlund 7d9548e
Add comment to the body logger
andreasohlund 2cca73a
Use unrecoverable exception
andreasohlund 8bb3d48
Add note about what happens when schema is not found
andreasohlund 147056b
Add code section
andreasohlund c2f95d0
Update sample.md
andreasohlund 661cce7
Update sample.md
andreasohlund 015181b
Update menu.yaml
lailabougria 79ae94e
Update avro.md
lailabougria f0f6f08
Update sample.md
lailabougria 2df8f65
Update sample.md
lailabougria dffc219
Update sample.md
lailabougria a10745c
throw a messagedeserializationexception is schema is not found
lailabougria f90baee
Update sample.md
lailabougria 42bde9a
links
lailabougria 386587b
fix link
lailabougria 6a20afb
align titles
lailabougria 1c3e082
menu title
lailabougria d994df1
link to issue
lailabougria 4530ebb
fix note display
jpalac 0189ef2
Add Avro logo and make call-to-action more inviting for use case feed…
jasontaylordev dc5ed1d
Enable implicit usings and clean-up imports
jasontaylordev 6f7089c
Avoid line-wrapping within docs snippets
jasontaylordev cf085b4
Add intro sentences for configuration, sending message, and output ex…
jasontaylordev File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,19 @@ | ||
|
|
||
| Microsoft Visual Studio Solution File, Format Version 12.00 | ||
| # Visual Studio Version 16 | ||
| VisualStudioVersion = 16.0.29728.190 | ||
| MinimumVisualStudioVersion = 15.0.26730.12 | ||
| Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Sample", "Sample\Sample.csproj", "{6E6C9C2A-8D9B-41AF-B5E5-1AF5310686E7}" | ||
| EndProject | ||
| Global | ||
| GlobalSection(SolutionConfigurationPlatforms) = preSolution | ||
| Debug|Any CPU = Debug|Any CPU | ||
| EndGlobalSection | ||
| GlobalSection(ProjectConfigurationPlatforms) = postSolution | ||
| {6E6C9C2A-8D9B-41AF-B5E5-1AF5310686E7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU | ||
| {6E6C9C2A-8D9B-41AF-B5E5-1AF5310686E7}.Debug|Any CPU.Build.0 = Debug|Any CPU | ||
| EndGlobalSection | ||
| GlobalSection(SolutionProperties) = preSolution | ||
| HideSolutionNode = FALSE | ||
| EndGlobalSection | ||
| EndGlobal |
54 changes: 54 additions & 0 deletions
54
samples/serializers/avro/Core_9/Sample/AvroMessageSerializer.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,54 @@ | ||
| using System; | ||
| using System.Collections.Generic; | ||
| using System.IO; | ||
| using Avro.IO; | ||
| using Avro.Reflect; | ||
| using NServiceBus; | ||
| using NServiceBus.Serialization; | ||
|
|
||
| #region serializer-implementation | ||
| public class AvroMessageSerializer(SchemaRegistry schemaRegistry, ClassCache classCache) : IMessageSerializer | ||
| { | ||
| public string ContentType => "avro/json"; | ||
|
|
||
| public void Serialize(object message, Stream stream) | ||
| { | ||
| var messageType = message.GetType(); | ||
| var schema = schemaRegistry.GetSchema(messageType); | ||
| var writer = new ReflectDefaultWriter(messageType, schema, classCache); | ||
|
|
||
| var encoder = new JsonEncoder(schema, stream); | ||
|
|
||
| writer.Write(message, encoder); | ||
|
|
||
| encoder.Flush(); | ||
| } | ||
|
|
||
| public object[] Deserialize(ReadOnlyMemory<byte> body, IList<Type> messageTypes = null) | ||
| { | ||
| if (messageTypes == null) | ||
| { | ||
| throw new MessageDeserializationException("Avro is not able to infer message types from the body content only, the NServiceBus.EnclosedMessageTypes header must be present"); | ||
| } | ||
|
|
||
| var messages = new List<object>(); | ||
| foreach (var messageType in messageTypes) | ||
| { | ||
| try | ||
| { | ||
| var schema = schemaRegistry.GetSchema(messageType); | ||
| var reader = new ReflectDefaultReader(messageType, schema, schema, classCache); | ||
| using var stream = new ReadOnlyStream(body); | ||
| var message = reader.Read(null, schema, schema, new JsonDecoder(schema, stream)); | ||
| messages.Add(message); | ||
| } | ||
| catch (KeyNotFoundException) | ||
| { | ||
| throw new MessageDeserializationException($"No schema found for message type {messageType.FullName}"); | ||
| } | ||
| } | ||
|
|
||
| return messages.ToArray(); | ||
| } | ||
| } | ||
| #endregion |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,44 @@ | ||
| using System; | ||
| using System.IO; | ||
| using System.Linq; | ||
| using System.Reflection; | ||
| using Avro; | ||
| using Avro.Reflect; | ||
| using NServiceBus.MessageInterfaces; | ||
| using NServiceBus.Serialization; | ||
| using NServiceBus.Settings; | ||
| using NServiceBus.Unicast.Messages; | ||
|
|
||
| #region serializer-definition | ||
| public class AvroSerializer : SerializationDefinition | ||
| { | ||
| public override Func<IMessageMapper, IMessageSerializer> Configure(IReadOnlySettings settings) | ||
| { | ||
| var registry = settings.Get<MessageMetadataRegistry>(); | ||
| var messageTypes = registry.GetAllMessages().Select(m => m.MessageType); | ||
| var schemaCache = new SchemaRegistry(); | ||
| var assembly = Assembly.GetExecutingAssembly(); | ||
|
|
||
| foreach (var messageType in messageTypes) | ||
| { | ||
| var manifestNamespace = "Sample."; | ||
| var schemaResourceName = manifestNamespace + messageType.Name + ".avsc"; | ||
| using var stream = assembly.GetManifestResourceStream(schemaResourceName); | ||
|
|
||
| if (stream == null) | ||
| { | ||
| throw new InvalidOperationException($"Resource '{schemaResourceName}' not found in assembly '{assembly.FullName}'."); | ||
| } | ||
|
|
||
| // Load the schema from the embedded resource | ||
| using var reader = new StreamReader(stream); | ||
| var schemaJson = reader.ReadToEnd(); | ||
|
|
||
| // Parse and cache the schema | ||
| schemaCache.Add(messageType, Schema.Parse(schemaJson)); | ||
andreasohlund marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| return _ => new AvroMessageSerializer(schemaCache, new ClassCache()); | ||
| } | ||
| } | ||
| #endregion | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,42 @@ | ||
| { | ||
| "type": "record", | ||
| "name": "CreateOrder", | ||
| "namespace": "net.particular.samples.serializers.avro", | ||
| "fields": [ | ||
| { | ||
| "name": "OrderId", | ||
| "type": "int" | ||
| }, | ||
| { | ||
| "name": "Date", | ||
| "type": { | ||
| "type": "long", | ||
| "logicalType": "timestamp-millis" | ||
| } | ||
| }, | ||
| { | ||
| "name": "CustomerId", | ||
| "type": "int" | ||
| }, | ||
| { | ||
| "name": "OrderItems", | ||
| "type": { | ||
| "type": "array", | ||
| "items": { | ||
| "type": "record", | ||
| "name": "OrderItem", | ||
| "fields": [ | ||
| { | ||
| "name": "ItemId", | ||
| "type": "int" | ||
| }, | ||
| { | ||
| "name": "Quantity", | ||
| "type": "int" | ||
| } | ||
| ] | ||
| } | ||
| } | ||
| } | ||
| ] | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,11 @@ | ||
| using System; | ||
| using System.Collections.Generic; | ||
| using NServiceBus; | ||
|
|
||
| public record CreateOrder : IMessage | ||
| { | ||
| public int OrderId { get; set; } | ||
| public DateTime Date { get; set; } | ||
| public int CustomerId { get; set; } | ||
| public List<OrderItem> OrderItems { get; set; } | ||
| } |
13 changes: 13 additions & 0 deletions
13
samples/serializers/avro/Core_9/Sample/CreateOrderHandler.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,13 @@ | ||
| using System.Threading.Tasks; | ||
| using Microsoft.Extensions.Logging; | ||
| using NServiceBus; | ||
|
|
||
| public class CreateOrderHandler(ILogger<CreateOrderHandler> logger) : | ||
| IHandleMessages<CreateOrder> | ||
| { | ||
| public Task Handle(CreateOrder message, IMessageHandlerContext context) | ||
| { | ||
| logger.LogInformation($"Order {message.OrderId} received"); | ||
| return Task.CompletedTask; | ||
| } | ||
| } |
46 changes: 46 additions & 0 deletions
46
samples/serializers/avro/Core_9/Sample/InputLoopService.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,46 @@ | ||
| using System; | ||
| using System.Threading; | ||
| using System.Threading.Tasks; | ||
| using Microsoft.Extensions.Hosting; | ||
| using NServiceBus; | ||
|
|
||
| public class InputLoopService(IMessageSession messageSession) : BackgroundService | ||
| { | ||
| protected override async Task ExecuteAsync(CancellationToken stoppingToken) | ||
| { | ||
| while (!stoppingToken.IsCancellationRequested) | ||
| { | ||
| Console.WriteLine("Press any key, to send a message"); | ||
| Console.ReadKey(); | ||
|
|
||
| #region message | ||
|
|
||
| var message = new CreateOrder | ||
| { | ||
| OrderId = 9, | ||
| Date = DateTime.Now, | ||
| CustomerId = 12, | ||
| OrderItems = | ||
| [ | ||
| new OrderItem | ||
| { | ||
| ItemId = 6, | ||
| Quantity = 2 | ||
| }, | ||
|
|
||
| new OrderItem | ||
| { | ||
| ItemId = 5, | ||
| Quantity = 4 | ||
| } | ||
| ] | ||
| }; | ||
|
|
||
| await messageSession.SendLocal(message, cancellationToken: stoppingToken); | ||
|
|
||
| #endregion | ||
|
|
||
| Console.WriteLine("Message Sent"); | ||
| } | ||
| } | ||
| } |
20 changes: 20 additions & 0 deletions
20
samples/serializers/avro/Core_9/Sample/MessageBodyLogger.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,20 @@ | ||
| using System; | ||
| using System.Text; | ||
| using System.Threading.Tasks; | ||
| using Microsoft.Extensions.Logging; | ||
| using NServiceBus.Pipeline; | ||
|
|
||
| // The behavior is only needed to print the serialized avro message content to the console | ||
| public class MessageBodyLogger(ILogger<MessageBodyLogger> logger) : Behavior<IIncomingPhysicalMessageContext> | ||
andreasohlund marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| { | ||
| public override Task Invoke(IIncomingPhysicalMessageContext context, Func<Task> next) | ||
| { | ||
| var bodyAsString = Encoding.UTF8 | ||
| .GetString(context.Message.Body.ToArray()); | ||
|
|
||
| logger.LogInformation("Serialized Message Body:"); | ||
| logger.LogInformation(bodyAsString); | ||
|
|
||
| return next(); | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| public class OrderItem | ||
| { | ||
| public int ItemId { get; set; } | ||
| public int Quantity { get; set; } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,25 @@ | ||
| using System; | ||
| using Microsoft.Extensions.DependencyInjection; | ||
| using Microsoft.Extensions.Hosting; | ||
| using NServiceBus; | ||
| using NServiceBus.MessageMutator; | ||
|
|
||
| Console.Title = "Avro"; | ||
|
|
||
| var builder = Host.CreateApplicationBuilder(args); | ||
| builder.Services.AddHostedService<InputLoopService>(); | ||
|
|
||
| #region config | ||
|
|
||
| var endpointConfiguration = new EndpointConfiguration("Samples.Serialization.Avro"); | ||
|
|
||
| endpointConfiguration.UseSerialization<AvroSerializer>(); | ||
|
|
||
| #endregion | ||
|
|
||
| endpointConfiguration.UseTransport(new LearningTransport()); | ||
|
|
||
| endpointConfiguration.Pipeline.Register(typeof(MessageBodyLogger), "Logs the message body received"); | ||
|
|
||
| builder.UseNServiceBus(endpointConfiguration); | ||
| await builder.Build().RunAsync(); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,55 @@ | ||
| using System; | ||
| using System.IO; | ||
|
|
||
| public class ReadOnlyStream(ReadOnlyMemory<byte> memory) : Stream | ||
| { | ||
| int position = 0; | ||
|
|
||
| public override void Flush() => throw new NotSupportedException(); | ||
|
|
||
| public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException(); | ||
|
|
||
| public override void SetLength(long value) => throw new NotSupportedException(); | ||
|
|
||
| public override int Read(byte[] buffer, int offset, int count) | ||
| { | ||
| var bytesToCopy = Math.Min(count, memory.Length - position); | ||
|
|
||
| var destination = buffer.AsSpan().Slice(offset, bytesToCopy); | ||
| var source = memory.Span.Slice(position, bytesToCopy); | ||
|
|
||
| source.CopyTo(destination); | ||
|
|
||
| position += bytesToCopy; | ||
|
|
||
| return bytesToCopy; | ||
| } | ||
|
|
||
| public override int Read(Span<byte> buffer) | ||
| { | ||
| var bytesToCopy = Math.Min(memory.Length - position, buffer.Length); | ||
| if (bytesToCopy <= 0) | ||
| { | ||
| return 0; | ||
| } | ||
|
|
||
| var source = memory.Span.Slice(position, bytesToCopy); | ||
| source.CopyTo(buffer); | ||
|
|
||
| position += bytesToCopy; | ||
| return bytesToCopy; | ||
| } | ||
|
|
||
| public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException(); | ||
|
|
||
| public override bool CanRead => true; | ||
| public override bool CanSeek => false; | ||
| public override bool CanWrite => false; | ||
| public override long Length => memory.Length; | ||
|
|
||
| public override long Position | ||
| { | ||
| get => position; | ||
| set => position = (int)value; | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,15 @@ | ||
| <Project Sdk="Microsoft.NET.Sdk"> | ||
| <PropertyGroup> | ||
| <TargetFrameworks>net9.0;net8.0</TargetFrameworks> | ||
| <OutputType>Exe</OutputType> | ||
| <LangVersion>12.0</LangVersion> | ||
| </PropertyGroup> | ||
| <ItemGroup> | ||
| <PackageReference Include="NServiceBus" Version="9.*" /> | ||
| <PackageReference Include="NServiceBus.Extensions.Hosting" Version="3.*" /> | ||
| <PackageReference Include="Apache.Avro" Version="1.12.*" /> | ||
| </ItemGroup> | ||
| <ItemGroup> | ||
| <EmbeddedResource Include="CreateOrder.avsc" /> | ||
| </ItemGroup> | ||
| </Project> |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.