Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
using Azure.Messaging.ServiceBus.Administration;
using McMaster.Extensions.CommandLineUtils;

static class Endpoint
static class MigrationTopologyEndpoint
{
public static async Task Create(ServiceBusAdministrationClient client, CommandArgument name, CommandOption topicName, CommandOption topicToPublishTo, CommandOption topicToSubscribeOn, CommandOption subscriptionName, CommandOption<int> size, CommandOption partitioning)
{
Expand All @@ -23,7 +23,7 @@ public static async Task Create(ServiceBusAdministrationClient client, CommandAr
{
try
{
await Topic.Create(client, topicName, size, partitioning);
await Topic.Create(client, topicName.Value(), size, partitioning);
}
catch (ServiceBusException ex) when (ex.Reason == ServiceBusFailureReason.MessagingEntityAlreadyExists)
{
Expand All @@ -48,7 +48,7 @@ public static async Task Create(ServiceBusAdministrationClient client, CommandAr
{
try
{
await Topic.Create(client, topicToPublishTo, size, partitioning);
await Topic.Create(client, topicToPublishTo.Value(), size, partitioning);
}
catch (ServiceBusException ex) when (ex.Reason == ServiceBusFailureReason.MessagingEntityAlreadyExists)
{
Expand All @@ -57,7 +57,7 @@ public static async Task Create(ServiceBusAdministrationClient client, CommandAr

try
{
await Topic.Create(client, topicToSubscribeOn, size, partitioning);
await Topic.Create(client, topicToSubscribeOn.Value(), size, partitioning);
}
catch (ServiceBusException ex) when (ex.Reason == ServiceBusFailureReason.MessagingEntityAlreadyExists)
{
Expand Down
212 changes: 161 additions & 51 deletions src/CommandLine/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,44 @@ static int Main(string[] args)

app.HelpOption(inherited: true);

void SubscribeTopicPerEventType(CommandLineApplication subscribeCommand)
{
subscribeCommand.Description = "Subscribes an endpoint to an event using topic-per-event topology.";
var name = subscribeCommand.Argument("name", "Name of the endpoint (required)").IsRequired();
var topicName = subscribeCommand.Argument("topic", "Topic name to subscribe. Unless configured otherwise on the publisher defaults to full name of the event (e.g. MyNamespace.MyMessage) (required)").IsRequired();

subscribeCommand.AddOption(connectionString);
subscribeCommand.AddOption(fullyQualifiedNamespace);
subscribeCommand.AddOption(size);
subscribeCommand.AddOption(partitioning);
var subscriptionName = subscribeCommand.Option("-b|--subscription", "Subscription name (defaults to endpoint name) ", CommandOptionType.SingleValue);

subscribeCommand.OnExecuteAsync(async ct =>
{
await CommandRunner.Run(connectionString, fullyQualifiedNamespace, client => TopicPerEventTopologyEndpoint.Subscribe(client, name, topicName, subscriptionName, size, partitioning));

Console.WriteLine($"Endpoint '{name.Value}' subscribed to '{topicName.Value}'.");
});
}

void UnsubscribeTopicPerEventType(CommandLineApplication unsubscribeCommand)
{
unsubscribeCommand.Description = "Unsubscribes an endpoint from an event using topic-per-event topology.";
var name = unsubscribeCommand.Argument("name", "Name of the endpoint (required)").IsRequired();
var topicName = unsubscribeCommand.Argument("topic", "Topic name to subscribe. Unless configured otherwise on the publisher defaults to full name of the event (e.g. MyNamespace.MyMessage) (required)").IsRequired();

unsubscribeCommand.AddOption(connectionString);
unsubscribeCommand.AddOption(fullyQualifiedNamespace);
var subscriptionName = unsubscribeCommand.Option("-b|--subscription", "Subscription name (defaults to endpoint name) ", CommandOptionType.SingleValue);

unsubscribeCommand.OnExecuteAsync(async ct =>
{
await CommandRunner.Run(connectionString, fullyQualifiedNamespace, client => TopicPerEventTopologyEndpoint.Unsubscribe(client, name, topicName, subscriptionName));

Console.WriteLine($"Endpoint '{name.Value}' unsubscribed from '{topicName.Value}'.");
});
}

app.Command("endpoint", endpointCommand =>
{
endpointCommand.OnExecute(() =>
Expand All @@ -49,79 +87,151 @@ static int Main(string[] args)

endpointCommand.Command("create", createCommand =>
{
createCommand.Description = "Creates required infrastructure for an endpoint.";
createCommand.Description = "Creates required infrastructure for an endpoint that uses topic-per-event topology.";
var name = createCommand.Argument("name", "Name of the endpoint (required)").IsRequired();

createCommand.AddOption(connectionString);
createCommand.AddOption(fullyQualifiedNamespace);
createCommand.AddOption(size);
createCommand.AddOption(partitioning);
var topicName = createCommand.Option("-t|--topic", "Topic name (defaults to 'bundle-1')", CommandOptionType.SingleValue);
var topicToPublishTo = createCommand.Option("-tp|--topic-to-publish-to", "The topic name to publish to", CommandOptionType.SingleValue);
var topicToSubscribeOn = createCommand.Option("-ts|--topic-to-subscribe-on", "The topic name to subscribe on", CommandOptionType.SingleValue);

topicName.OnValidate(v => ValidateTopicInformationIsCorrect(topicName, topicToPublishTo, topicToSubscribeOn));
topicToPublishTo.OnValidate(v => ValidateTopicInformationIsCorrect(topicName, topicToPublishTo, topicToSubscribeOn));
topicToSubscribeOn.OnValidate(v => ValidateTopicInformationIsCorrect(topicName, topicToPublishTo, topicToSubscribeOn));

var subscriptionName = createCommand.Option("-b|--subscription", "Subscription name (defaults to endpoint name) ", CommandOptionType.SingleValue);

createCommand.OnExecuteAsync(async ct =>
{
// Unfortunately the default value cannot be set outside the execute because it would then
// trigger the validation. There seems to be no way in the command handling library to
// differentiate defaults from user inputs we set a default for the topicName here.
if (!topicName.HasValue() && !topicToPublishTo.HasValue() && !topicToSubscribeOn.HasValue())
{
topicName.DefaultValue = Topic.DefaultTopicName;
}

await CommandRunner.Run(connectionString, fullyQualifiedNamespace, client => Endpoint.Create(client, name, topicName, topicToPublishTo, topicToSubscribeOn, subscriptionName, size, partitioning));
await CommandRunner.Run(connectionString, fullyQualifiedNamespace,
client => TopicPerEventTopologyEndpoint.Create(client, name, size, partitioning));

Console.WriteLine($"Endpoint '{name.Value}' is ready.");
});
});

endpointCommand.Command("subscribe", subscribeCommand =>
endpointCommand.Command("subscribe", SubscribeTopicPerEventType);
endpointCommand.Command("unsubscribe", UnsubscribeTopicPerEventType);
});

app.Command("migration", migrationCommand =>
{
migrationCommand.Command("endpoint", endpointCommand =>
{
subscribeCommand.Description = "Subscribes an endpoint to an event.";
var name = subscribeCommand.Argument("name", "Name of the endpoint (required)").IsRequired();
var eventType = subscribeCommand.Argument("event-type", "Full name of the event to subscribe to (e.g. MyNamespace.MyMessage) (required)").IsRequired();

subscribeCommand.AddOption(connectionString);
subscribeCommand.AddOption(fullyQualifiedNamespace);
var topicName = subscribeCommand.Option("-t|--topic", "Topic name to subscribe on (defaults to 'bundle-1')", CommandOptionType.SingleValue);
topicName.DefaultValue = Topic.DefaultTopicName;
var subscriptionName = subscribeCommand.Option("-b|--subscription", "Subscription name (defaults to endpoint name) ", CommandOptionType.SingleValue);
var shortenedRuleName = subscribeCommand.Option("-r|--rule-name", "Rule name (defaults to event type) ", CommandOptionType.SingleValue);

subscribeCommand.OnExecuteAsync(async ct =>
endpointCommand.OnExecute(() =>
{
await CommandRunner.Run(connectionString, fullyQualifiedNamespace, client => Endpoint.Subscribe(client, name, topicName, subscriptionName, eventType, shortenedRuleName));
Console.WriteLine("Specify a subcommand");
endpointCommand.ShowHelp();
return 1;
});

Console.WriteLine($"Endpoint '{name.Value}' subscribed to '{eventType.Value}'.");
endpointCommand.Command("create", createCommand =>
{
createCommand.Description =
"Creates required infrastructure for an endpoint that uses migration topology.";
var name = createCommand.Argument("name", "Name of the endpoint (required)")
.IsRequired();

createCommand.AddOption(connectionString);
createCommand.AddOption(fullyQualifiedNamespace);
createCommand.AddOption(size);
createCommand.AddOption(partitioning);

var topicName = createCommand.Option("-t|--topic",
"Topic name (defaults to 'bundle-1')", CommandOptionType.SingleValue);
var topicToPublishTo = createCommand.Option("-tp|--topic-to-publish-to",
"The topic name to publish to", CommandOptionType.SingleValue);
var topicToSubscribeOn = createCommand.Option("-ts|--topic-to-subscribe-on",
"The topic name to subscribe on", CommandOptionType.SingleValue);

topicName.OnValidate(v =>
ValidateTopicInformationIsCorrect(topicName, topicToPublishTo, topicToSubscribeOn));
topicToPublishTo.OnValidate(v =>
ValidateTopicInformationIsCorrect(topicName, topicToPublishTo, topicToSubscribeOn));
topicToSubscribeOn.OnValidate(v =>
ValidateTopicInformationIsCorrect(topicName, topicToPublishTo, topicToSubscribeOn));

var subscriptionName = createCommand.Option("-b|--subscription",
"Subscription name (defaults to endpoint name) ", CommandOptionType.SingleValue);

createCommand.OnExecuteAsync(async ct =>
{
// Unfortunately the default value cannot be set outside the execute because it would then
// trigger the validation. There seems to be no way in the command handling library to
// differentiate defaults from user inputs we set a default for the topicName here.
if (!topicName.HasValue() && !topicToPublishTo.HasValue() &&
!topicToSubscribeOn.HasValue())
{
topicName.DefaultValue = Topic.DefaultTopicName;
}

await CommandRunner.Run(connectionString, fullyQualifiedNamespace,
client => MigrationTopologyEndpoint.Create(client, name, topicName,
topicToPublishTo, topicToSubscribeOn,
subscriptionName, size, partitioning));


Console.WriteLine($"Endpoint '{name.Value}' is ready.");
});
});
});

endpointCommand.Command("unsubscribe", unsubscribeCommand =>
{
unsubscribeCommand.Description = "Unsubscribes an endpoint from an event.";
var name = unsubscribeCommand.Argument("name", "Name of the endpoint (required)").IsRequired();
var eventType = unsubscribeCommand.Argument("event-type", "Full name of the event to unsubscribe from (e.g. MyNamespace.MyMessage) (required)").IsRequired();

unsubscribeCommand.AddOption(connectionString);
unsubscribeCommand.AddOption(fullyQualifiedNamespace);
var topicName = unsubscribeCommand.Option("-t|--topic", "Topic name to unsubscribe from (defaults to 'bundle-1')", CommandOptionType.SingleValue);
topicName.DefaultValue = Topic.DefaultTopicName;
var subscriptionName = unsubscribeCommand.Option("-b|--subscription", "Subscription name (defaults to endpoint name) ", CommandOptionType.SingleValue);
var shortenedRuleName = unsubscribeCommand.Option("-r|--rule-name", "Rule name (defaults to event type) ", CommandOptionType.SingleValue);

unsubscribeCommand.OnExecuteAsync(async ct =>
endpointCommand.Command("subscribe", subscribeCommand =>
{
subscribeCommand.Description =
"Subscribes an endpoint to an event using legacy forwarding topology.";
var name = subscribeCommand.Argument("name", "Name of the endpoint (required)")
.IsRequired();
var eventType = subscribeCommand.Argument("event-type",
"Full name of the event to subscribe to (e.g. MyNamespace.MyMessage) (required)")
.IsRequired();

subscribeCommand.AddOption(connectionString);
subscribeCommand.AddOption(fullyQualifiedNamespace);
var topicName = subscribeCommand.Option("-t|--topic",
"Topic name to subscribe on (defaults to 'bundle-1')", CommandOptionType.SingleValue);
topicName.DefaultValue = Topic.DefaultTopicName;
var subscriptionName = subscribeCommand.Option("-b|--subscription",
"Subscription name (defaults to endpoint name) ", CommandOptionType.SingleValue);
var shortenedRuleName = subscribeCommand.Option("-r|--rule-name",
"Rule name (defaults to event type) ", CommandOptionType.SingleValue);

subscribeCommand.OnExecuteAsync(async ct =>
{
await CommandRunner.Run(connectionString, fullyQualifiedNamespace,
client => MigrationTopologyEndpoint.Subscribe(client, name, topicName,
subscriptionName, eventType, shortenedRuleName));

Console.WriteLine($"Endpoint '{name.Value}' subscribed to '{eventType.Value}'.");
});
});

endpointCommand.Command("unsubscribe", unsubscribeCommand =>
{
await CommandRunner.Run(connectionString, fullyQualifiedNamespace, client => Endpoint.Unsubscribe(client, name, topicName, subscriptionName, eventType, shortenedRuleName));
unsubscribeCommand.Description =
"Unsubscribes an endpoint from an event using legacy forwarding topology.";
var name = unsubscribeCommand.Argument("name", "Name of the endpoint (required)")
.IsRequired();
var eventType = unsubscribeCommand.Argument("event-type",
"Full name of the event to unsubscribe from (e.g. MyNamespace.MyMessage) (required)")
.IsRequired();

unsubscribeCommand.AddOption(connectionString);
unsubscribeCommand.AddOption(fullyQualifiedNamespace);
var topicName = unsubscribeCommand.Option("-t|--topic",
"Topic name to unsubscribe from (defaults to 'bundle-1')",
CommandOptionType.SingleValue);
topicName.DefaultValue = Topic.DefaultTopicName;
var subscriptionName = unsubscribeCommand.Option("-b|--subscription",
"Subscription name (defaults to endpoint name) ", CommandOptionType.SingleValue);
var shortenedRuleName = unsubscribeCommand.Option("-r|--rule-name",
"Rule name (defaults to event type) ", CommandOptionType.SingleValue);

unsubscribeCommand.OnExecuteAsync(async ct =>
{
await CommandRunner.Run(connectionString, fullyQualifiedNamespace,
client => MigrationTopologyEndpoint.Unsubscribe(client, name, topicName,
subscriptionName, eventType, shortenedRuleName));

Console.WriteLine($"Endpoint '{name.Value}' unsubscribed from '{eventType.Value}'.");
Console.WriteLine($"Endpoint '{name.Value}' unsubscribed from '{eventType.Value}'.");
});
});

endpointCommand.Command("subscribe-migrated", SubscribeTopicPerEventType);
endpointCommand.Command("unsubscribe-migrated", UnsubscribeTopicPerEventType);
});
});

Expand Down
Loading