Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
using Azure.Messaging.ServiceBus.Administration;
using McMaster.Extensions.CommandLineUtils;

static class Endpoint
static class MigrationTopologyEndpoint
{
public static async Task Create(ServiceBusAdministrationClient client, CommandArgument name, CommandOption topicName, CommandOption topicToPublishTo, CommandOption topicToSubscribeOn, CommandOption subscriptionName, CommandOption<int> size, CommandOption partitioning)
{
Expand Down
212 changes: 161 additions & 51 deletions src/CommandLine/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,44 @@ static int Main(string[] args)

app.HelpOption(inherited: true);

void SubscribeTopicPerEventType(CommandLineApplication subscribeCommand)
{
subscribeCommand.Description = "Subscribes an endpoint to an event using topic-per-event topology.";
var name = subscribeCommand.Argument("name", "Name of the endpoint (required)").IsRequired();
var topicName = subscribeCommand.Argument("topic", "Topic name to subscribe (required)").IsRequired();

subscribeCommand.AddOption(connectionString);
subscribeCommand.AddOption(fullyQualifiedNamespace);
subscribeCommand.AddOption(size);
subscribeCommand.AddOption(partitioning);
var subscriptionName = subscribeCommand.Option("-b|--subscription", "Subscription name (defaults to endpoint name) ", CommandOptionType.SingleValue);

subscribeCommand.OnExecuteAsync(async ct =>
{
await CommandRunner.Run(connectionString, fullyQualifiedNamespace, client => TopicPerEventTopologyEndpoint.Subscribe(client, name, topicName, subscriptionName, size, partitioning));

Console.WriteLine($"Endpoint '{name.Value}' subscribed to '{topicName.Value}'.");
});
}

void UnsubscribeTopicPerEventType(CommandLineApplication unsubscribeCommand)
{
unsubscribeCommand.Description = "Unsubscribes an endpoint from an event using topic-per-event topology.";
var name = unsubscribeCommand.Argument("name", "Name of the endpoint (required)").IsRequired();
var topicName = unsubscribeCommand.Argument("topic", "Topic name to unsubscribe (required)").IsRequired();

unsubscribeCommand.AddOption(connectionString);
unsubscribeCommand.AddOption(fullyQualifiedNamespace);
var subscriptionName = unsubscribeCommand.Option("-b|--subscription", "Subscription name (defaults to endpoint name) ", CommandOptionType.SingleValue);

unsubscribeCommand.OnExecuteAsync(async ct =>
{
await CommandRunner.Run(connectionString, fullyQualifiedNamespace, client => TopicPerEventTopologyEndpoint.Unsubscribe(client, name, topicName, subscriptionName));

Console.WriteLine($"Endpoint '{name.Value}' unsubscribed from '{topicName.Value}'.");
});
}

app.Command("endpoint", endpointCommand =>
{
endpointCommand.OnExecute(() =>
Expand All @@ -49,79 +87,151 @@ static int Main(string[] args)

endpointCommand.Command("create", createCommand =>
{
createCommand.Description = "Creates required infrastructure for an endpoint.";
createCommand.Description = "Creates required infrastructure for an endpoint that uses topic-per-event topology.";
var name = createCommand.Argument("name", "Name of the endpoint (required)").IsRequired();

createCommand.AddOption(connectionString);
createCommand.AddOption(fullyQualifiedNamespace);
createCommand.AddOption(size);
createCommand.AddOption(partitioning);
var topicName = createCommand.Option("-t|--topic", "Topic name (defaults to 'bundle-1')", CommandOptionType.SingleValue);
var topicToPublishTo = createCommand.Option("-tp|--topic-to-publish-to", "The topic name to publish to", CommandOptionType.SingleValue);
var topicToSubscribeOn = createCommand.Option("-ts|--topic-to-subscribe-on", "The topic name to subscribe on", CommandOptionType.SingleValue);

topicName.OnValidate(v => ValidateTopicInformationIsCorrect(topicName, topicToPublishTo, topicToSubscribeOn));
topicToPublishTo.OnValidate(v => ValidateTopicInformationIsCorrect(topicName, topicToPublishTo, topicToSubscribeOn));
topicToSubscribeOn.OnValidate(v => ValidateTopicInformationIsCorrect(topicName, topicToPublishTo, topicToSubscribeOn));

var subscriptionName = createCommand.Option("-b|--subscription", "Subscription name (defaults to endpoint name) ", CommandOptionType.SingleValue);

createCommand.OnExecuteAsync(async ct =>
{
// Unfortunately the default value cannot be set outside the execute because it would then
// trigger the validation. There seems to be no way in the command handling library to
// differentiate defaults from user inputs we set a default for the topicName here.
if (!topicName.HasValue() && !topicToPublishTo.HasValue() && !topicToSubscribeOn.HasValue())
{
topicName.DefaultValue = Topic.DefaultTopicName;
}

await CommandRunner.Run(connectionString, fullyQualifiedNamespace, client => Endpoint.Create(client, name, topicName, topicToPublishTo, topicToSubscribeOn, subscriptionName, size, partitioning));
await CommandRunner.Run(connectionString, fullyQualifiedNamespace,
client => TopicPerEventTopologyEndpoint.Create(client, name, size, partitioning));

Console.WriteLine($"Endpoint '{name.Value}' is ready.");
});
});

endpointCommand.Command("subscribe", subscribeCommand =>
endpointCommand.Command("subscribe", SubscribeTopicPerEventType);
endpointCommand.Command("unsubscribe", UnsubscribeTopicPerEventType);
});

app.Command("migration", migrationCommand =>
{
migrationCommand.Command("endpoint", endpointCommand =>
{
subscribeCommand.Description = "Subscribes an endpoint to an event.";
var name = subscribeCommand.Argument("name", "Name of the endpoint (required)").IsRequired();
var eventType = subscribeCommand.Argument("event-type", "Full name of the event to subscribe to (e.g. MyNamespace.MyMessage) (required)").IsRequired();

subscribeCommand.AddOption(connectionString);
subscribeCommand.AddOption(fullyQualifiedNamespace);
var topicName = subscribeCommand.Option("-t|--topic", "Topic name to subscribe on (defaults to 'bundle-1')", CommandOptionType.SingleValue);
topicName.DefaultValue = Topic.DefaultTopicName;
var subscriptionName = subscribeCommand.Option("-b|--subscription", "Subscription name (defaults to endpoint name) ", CommandOptionType.SingleValue);
var shortenedRuleName = subscribeCommand.Option("-r|--rule-name", "Rule name (defaults to event type) ", CommandOptionType.SingleValue);

subscribeCommand.OnExecuteAsync(async ct =>
endpointCommand.OnExecute(() =>
{
await CommandRunner.Run(connectionString, fullyQualifiedNamespace, client => Endpoint.Subscribe(client, name, topicName, subscriptionName, eventType, shortenedRuleName));
Console.WriteLine("Specify a subcommand");
endpointCommand.ShowHelp();
return 1;
});

Console.WriteLine($"Endpoint '{name.Value}' subscribed to '{eventType.Value}'.");
endpointCommand.Command("create", createCommand =>
{
createCommand.Description =
"Creates required infrastructure for an endpoint that uses migration topology.";
var name = createCommand.Argument("name", "Name of the endpoint (required)")
.IsRequired();

createCommand.AddOption(connectionString);
createCommand.AddOption(fullyQualifiedNamespace);
createCommand.AddOption(size);
createCommand.AddOption(partitioning);

var topicName = createCommand.Option("-t|--topic",
"Topic name (defaults to 'bundle-1')", CommandOptionType.SingleValue);
var topicToPublishTo = createCommand.Option("-tp|--topic-to-publish-to",
"The topic name to publish to", CommandOptionType.SingleValue);
var topicToSubscribeOn = createCommand.Option("-ts|--topic-to-subscribe-on",
"The topic name to subscribe on", CommandOptionType.SingleValue);

topicName.OnValidate(v =>
ValidateTopicInformationIsCorrect(topicName, topicToPublishTo, topicToSubscribeOn));
topicToPublishTo.OnValidate(v =>
ValidateTopicInformationIsCorrect(topicName, topicToPublishTo, topicToSubscribeOn));
topicToSubscribeOn.OnValidate(v =>
ValidateTopicInformationIsCorrect(topicName, topicToPublishTo, topicToSubscribeOn));

var subscriptionName = createCommand.Option("-b|--subscription",
"Subscription name (defaults to endpoint name) ", CommandOptionType.SingleValue);

createCommand.OnExecuteAsync(async ct =>
{
// Unfortunately the default value cannot be set outside the execute because it would then
// trigger the validation. There seems to be no way in the command handling library to
// differentiate defaults from user inputs we set a default for the topicName here.
if (!topicName.HasValue() && !topicToPublishTo.HasValue() &&
!topicToSubscribeOn.HasValue())
{
topicName.DefaultValue = Topic.DefaultTopicName;
}

await CommandRunner.Run(connectionString, fullyQualifiedNamespace,
client => MigrationTopologyEndpoint.Create(client, name, topicName,
topicToPublishTo, topicToSubscribeOn,
subscriptionName, size, partitioning));


Console.WriteLine($"Endpoint '{name.Value}' is ready.");
});
});
});

endpointCommand.Command("unsubscribe", unsubscribeCommand =>
{
unsubscribeCommand.Description = "Unsubscribes an endpoint from an event.";
var name = unsubscribeCommand.Argument("name", "Name of the endpoint (required)").IsRequired();
var eventType = unsubscribeCommand.Argument("event-type", "Full name of the event to unsubscribe from (e.g. MyNamespace.MyMessage) (required)").IsRequired();

unsubscribeCommand.AddOption(connectionString);
unsubscribeCommand.AddOption(fullyQualifiedNamespace);
var topicName = unsubscribeCommand.Option("-t|--topic", "Topic name to unsubscribe from (defaults to 'bundle-1')", CommandOptionType.SingleValue);
topicName.DefaultValue = Topic.DefaultTopicName;
var subscriptionName = unsubscribeCommand.Option("-b|--subscription", "Subscription name (defaults to endpoint name) ", CommandOptionType.SingleValue);
var shortenedRuleName = unsubscribeCommand.Option("-r|--rule-name", "Rule name (defaults to event type) ", CommandOptionType.SingleValue);

unsubscribeCommand.OnExecuteAsync(async ct =>
endpointCommand.Command("subscribe", subscribeCommand =>
{
subscribeCommand.Description =
"Subscribes an endpoint to an event using single-topic approach (forwarding topology).";
var name = subscribeCommand.Argument("name", "Name of the endpoint (required)")
.IsRequired();
var eventType = subscribeCommand.Argument("event-type",
"Full name of the event to subscribe to (e.g. MyNamespace.MyMessage) (required)")
.IsRequired();

subscribeCommand.AddOption(connectionString);
subscribeCommand.AddOption(fullyQualifiedNamespace);
var topicName = subscribeCommand.Option("-t|--topic",
"Topic name to subscribe on (defaults to 'bundle-1')", CommandOptionType.SingleValue);
topicName.DefaultValue = Topic.DefaultTopicName;
var subscriptionName = subscribeCommand.Option("-b|--subscription",
"Subscription name (defaults to endpoint name) ", CommandOptionType.SingleValue);
var shortenedRuleName = subscribeCommand.Option("-r|--rule-name",
"Rule name (defaults to event type) ", CommandOptionType.SingleValue);

subscribeCommand.OnExecuteAsync(async ct =>
{
await CommandRunner.Run(connectionString, fullyQualifiedNamespace,
client => MigrationTopologyEndpoint.Subscribe(client, name, topicName,
subscriptionName, eventType, shortenedRuleName));

Console.WriteLine($"Endpoint '{name.Value}' subscribed to '{eventType.Value}'.");
});
});

endpointCommand.Command("unsubscribe", unsubscribeCommand =>
{
await CommandRunner.Run(connectionString, fullyQualifiedNamespace, client => Endpoint.Unsubscribe(client, name, topicName, subscriptionName, eventType, shortenedRuleName));
unsubscribeCommand.Description =
"Unsubscribes an endpoint from an event using single-topic approach (forwarding topology).";
var name = unsubscribeCommand.Argument("name", "Name of the endpoint (required)")
.IsRequired();
var eventType = unsubscribeCommand.Argument("event-type",
"Full name of the event to unsubscribe from (e.g. MyNamespace.MyMessage) (required)")
.IsRequired();

unsubscribeCommand.AddOption(connectionString);
unsubscribeCommand.AddOption(fullyQualifiedNamespace);
var topicName = unsubscribeCommand.Option("-t|--topic",
"Topic name to unsubscribe from (defaults to 'bundle-1')",
CommandOptionType.SingleValue);
topicName.DefaultValue = Topic.DefaultTopicName;
var subscriptionName = unsubscribeCommand.Option("-b|--subscription",
"Subscription name (defaults to endpoint name) ", CommandOptionType.SingleValue);
var shortenedRuleName = unsubscribeCommand.Option("-r|--rule-name",
"Rule name (defaults to event type) ", CommandOptionType.SingleValue);

unsubscribeCommand.OnExecuteAsync(async ct =>
{
await CommandRunner.Run(connectionString, fullyQualifiedNamespace,
client => MigrationTopologyEndpoint.Unsubscribe(client, name, topicName,
subscriptionName, eventType, shortenedRuleName));

Console.WriteLine($"Endpoint '{name.Value}' unsubscribed from '{eventType.Value}'.");
Console.WriteLine($"Endpoint '{name.Value}' unsubscribed from '{eventType.Value}'.");
});
});

endpointCommand.Command("subscribe-migrated", SubscribeTopicPerEventType);
endpointCommand.Command("unsubscribe-migrated", UnsubscribeTopicPerEventType);
});
});

Expand Down
24 changes: 24 additions & 0 deletions src/CommandLine/Subscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,23 @@ public static Task Create(ServiceBusAdministrationClient client, CommandArgument
return client.CreateSubscriptionAsync(options, new CreateRuleOptions("$default", new FalseRuleFilter()));
}

public static Task Create(ServiceBusAdministrationClient client, CommandArgument endpointName, CommandArgument topicName, CommandOption subscriptionName)
{
var subscriptionNameToUse = subscriptionName.HasValue() ? subscriptionName.Value() : endpointName.Value;

var options = new CreateSubscriptionOptions(topicName.Value, subscriptionNameToUse)
{
LockDuration = TimeSpan.FromMinutes(5),
ForwardTo = endpointName.Value,
EnableDeadLetteringOnFilterEvaluationExceptions = false,
MaxDeliveryCount = int.MaxValue,
EnableBatchedOperations = true,
UserMetadata = endpointName.Value
};

return client.CreateSubscriptionAsync(options, new CreateRuleOptions("$default", new FalseRuleFilter()));
}

public static Task CreateForwarding(ServiceBusAdministrationClient client, CommandOption topicToPublishTo, CommandOption topicToSubscribeTo, string subscriptionName)
{
var options = new CreateSubscriptionOptions(topicToPublishTo.Value(), subscriptionName)
Expand All @@ -39,5 +56,12 @@ public static Task CreateForwarding(ServiceBusAdministrationClient client, Comma

return client.CreateSubscriptionAsync(options, new CreateRuleOptions("$default", new TrueRuleFilter()));
}

public static Task Delete(ServiceBusAdministrationClient client, CommandArgument endpointName,
CommandArgument topicName, CommandOption subscriptionName)
{
var subscriptionNameToUse = subscriptionName.HasValue() ? subscriptionName.Value() : endpointName.Value;
return client.DeleteSubscriptionAsync(topicName.Value, subscriptionNameToUse);
}
}
}
Loading