Skip to content

MyJetWallet/MyJetWallet.Sdk.ServiceBus

Repository files navigation

using

Module:

public class ServiceBusModule : Module
{
    protected override void Load(ContainerBuilder builder)
    {
        var serviceBusClient = builder.RegisterMyServiceBusTcpClient(Program.ReloadedSettings(e => e.SpotServiceBusHostPort), ApplicationEnvironment.HostName, Program.LogFactory);

        var queryName = "Liquidity-Reports";


        // publisher (IServiceBusPublisher<PortfolioTrade>)
        builder.RegisterMyServiceBusPublisher<PortfolioTrade>(serviceBusClient, PortfolioTrade.TopicName, false);


        // batch subscriber (ISubscriber<IReadOnlyList<PortfolioTrade>>)
        builder.RegisterMyServiceBusSubscriberBatch<PortfolioTrade>(serviceBusClient, PortfolioTrade.TopicName, queryName, TopicQueueType.PermanentWithSingleConnection);


        // single subscriber (ISubscriber<PortfolioTrade>)
        builder.RegisterMyServiceBusSubscriberSingle<PortfolioTrade>(serviceBusClient, PortfolioPosition.TopicName, queryName, TopicQueueType.PermanentWithSingleConnection);
    }
}

DeserializeExceptionHandler

In case it you want to skip message with deserialize exception then you can use global handler to log and skip message. But be careful to use it, in this case you can miss important messages from topic in case if happe breking change in the message model and you forgot to update client service.

// call in sope plase static method to activete globally DeserializeExceptionHandler

MyServiceBusGlobalEventHandler.SetLogAndSkipDeserializeExceptionHandler(Program.LogFactory);

LifeTime:

public class ApplicationLifetimeManager : ApplicationLifetimeManagerBase
{
    private readonly ServiceBusLifeTime _myServiceBusLifeTime;

    public ApplicationLifetimeManager(IHostApplicationLifetime appLifetime, ServiceBusLifeTime myServiceBusLifeTime)
        : base(appLifetime)
    {
        _myServiceBusLifeTime = myServiceBusLifeTime;
    }

    protected override void OnStarted()
    {
        _myServiceBusLifeTime.Start();
    }

    protected override void OnStopping()
    {
        _myServiceBusLifeTime.Stop();
    }
}

Model:

[DataContract]
public class PortfolioTrade
{
    public const string TopicName = "spot-liquidity-engine-trade";

    [DataMember(Order = 1)] public string TradeId { get; set; }
    [DataMember(Order = 2)] public string Source { get; set; }
    [DataMember(Order = 3)] public bool IsInternal { get; set; }
}

Deduplication:

public class ServiceBusModule : Module
{
    protected override void Load(ContainerBuilder builder)
    {
        var serviceBusClient = builder.RegisterMyServiceBusTcpClient(() => Program.Settings.SpotServiceBusHostPort, ApplicationEnvironment.HostName, Program.LogFactory);

        var queryName = "Liquidity-Reports";

        var deduplicator = builder.RegisterMyServiceBusDeduplicator<PortfolioTrade>(
                    t => t.TraceId,                                             //Func to get unique id
                    Program.ReloadedSettings(t=>t.MyNoSqlWriterUrl),            
                    queryName,                                                  //NoSql table name
                    PortfolioTrade.TopicName,                                   //NoSql partition key
                    TimeSpan.FromHours(4),                                      //Expiration time
                    Program.LogFactory);


        // single subscriber (ISubscriber<PortfolioTrade>)
        // dedupication only available for single subscriber
        builder.RegisterMyServiceBusSubscriberSingle<PortfolioTrade>(serviceBusClient, PortfolioPosition.TopicName, queryName, TopicQueueType.Permanent, deduplicator);
    }
}

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Contributors 5

Languages