From 3c93ead1e49ee328ae2df1cf844b027b1fe01dc0 Mon Sep 17 00:00:00 2001 From: Cole Smith Date: Wed, 2 Oct 2024 21:06:30 -0400 Subject: [PATCH 1/3] Add Delete Offsets --- api/pom.xml | 8 ++++- .../controller/ConsumerGroupsController.java | 19 ++++++++++++ .../rbac/permission/ConsumerGroupAction.java | 7 ++--- .../ui/service/ConsumerGroupService.java | 7 +++++ .../ui/service/ReactiveAdminClient.java | 15 ++++++++++ .../main/resources/swagger/kafbat-ui-api.yaml | 28 ++++++++++++++++++ .../ConsumerGroups/Details/ListItem.tsx | 29 +++++++++++++++++-- frontend/src/lib/hooks/api/consumers.ts | 27 +++++++++++++++++ 8 files changed, 132 insertions(+), 8 deletions(-) diff --git a/api/pom.xml b/api/pom.xml index 17bae6b85..a892eba7f 100644 --- a/api/pom.xml +++ b/api/pom.xml @@ -97,7 +97,13 @@ com.azure azure-identity - 1.13.0 + 1.13.3 + + + io.netty + netty-tcnative-boringssl-static + + diff --git a/api/src/main/java/io/kafbat/ui/controller/ConsumerGroupsController.java b/api/src/main/java/io/kafbat/ui/controller/ConsumerGroupsController.java index 248c6f7d7..67fd26c9b 100644 --- a/api/src/main/java/io/kafbat/ui/controller/ConsumerGroupsController.java +++ b/api/src/main/java/io/kafbat/ui/controller/ConsumerGroupsController.java @@ -3,6 +3,7 @@ import static io.kafbat.ui.model.rbac.permission.ConsumerGroupAction.DELETE; import static io.kafbat.ui.model.rbac.permission.ConsumerGroupAction.RESET_OFFSETS; import static io.kafbat.ui.model.rbac.permission.ConsumerGroupAction.VIEW; +import static io.kafbat.ui.model.rbac.permission.ConsumerGroupAction.DELETE_OFFSETS; import static java.util.stream.Collectors.toMap; import io.kafbat.ui.api.ConsumerGroupsApi; @@ -188,6 +189,24 @@ public Mono> resetConsumerGroupOffsets(String clusterName, }).thenReturn(ResponseEntity.ok().build()); } + @Override + public Mono> deleteConsumerGroupOffsets(String clusterName, + String group, + String topic, + ServerWebExchange exchange) { + var context = AccessContext.builder() + .cluster(clusterName) + .topicActions(topic, TopicAction.VIEW) + .consumerGroupActions(group, DELETE_OFFSETS) + .operationName("resetConsumerGroupOffsets") + .build(); + + return validateAccess(context) + .then(consumerGroupService.deleteConsumerGroupOffsets(getCluster(clusterName), group, topic)) + .doOnEach(sig -> audit(context, sig)) + .thenReturn(ResponseEntity.ok().build()); + } + private ConsumerGroupsPageResponseDTO convertPage(ConsumerGroupService.ConsumerGroupsPage consumerGroupConsumerGroupsPage) { return new ConsumerGroupsPageResponseDTO() diff --git a/api/src/main/java/io/kafbat/ui/model/rbac/permission/ConsumerGroupAction.java b/api/src/main/java/io/kafbat/ui/model/rbac/permission/ConsumerGroupAction.java index 6a3e3aae9..f4ecb8302 100644 --- a/api/src/main/java/io/kafbat/ui/model/rbac/permission/ConsumerGroupAction.java +++ b/api/src/main/java/io/kafbat/ui/model/rbac/permission/ConsumerGroupAction.java @@ -8,11 +8,10 @@ public enum ConsumerGroupAction implements PermissibleAction { VIEW, DELETE(VIEW), - RESET_OFFSETS(VIEW) + RESET_OFFSETS(VIEW), + DELETE_OFFSETS(VIEW); - ; - - public static final Set ALTER_ACTIONS = Set.of(DELETE, RESET_OFFSETS); + public static final Set ALTER_ACTIONS = Set.of(DELETE, RESET_OFFSETS, DELETE_OFFSETS); private final ConsumerGroupAction[] dependantActions; diff --git a/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java b/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java index 452de4a59..c8e64f523 100644 --- a/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java +++ b/api/src/main/java/io/kafbat/ui/service/ConsumerGroupService.java @@ -270,4 +270,11 @@ public EnhancedConsumer createConsumer(KafkaCluster cluster, ); } + public Mono deleteConsumerGroupOffsets(KafkaCluster cluster, + String groupId, + String topic) { + return adminClientService + .get(cluster) + .flatMap(adminClient -> adminClient.deleteConsumerGroupOffsets(groupId, topic)); + } } diff --git a/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java b/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java index bb04f5527..c64016f8d 100644 --- a/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java +++ b/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java @@ -531,6 +531,21 @@ public Mono alterConsumerGroupOffsets(String groupId, Map deleteConsumerGroupOffsets(String groupId, String topic) { + var offsets = listConsumerGroupOffsets(List.of(groupId), null); + Mono> partitions = offsets.map(tps -> + tps.row(groupId) + .keySet() + .stream() + .filter(tp -> tp.topic().equals(topic)) + .collect(Collectors.toSet()) + ); + + return partitions.flatMap(partitionSet -> + toMono(client.deleteConsumerGroupOffsets(groupId, partitionSet).all()) + ); + } + /** * List offset for the topic's partitions and OffsetSpec. * diff --git a/contract/src/main/resources/swagger/kafbat-ui-api.yaml b/contract/src/main/resources/swagger/kafbat-ui-api.yaml index 7ca62831f..837ab206a 100644 --- a/contract/src/main/resources/swagger/kafbat-ui-api.yaml +++ b/contract/src/main/resources/swagger/kafbat-ui-api.yaml @@ -1048,6 +1048,33 @@ paths: 200: description: OK + /api/clusters/{clusterName}/consumer-groups/{id}/topics/{topic}: + delete: + tags: + - Consumer Groups + summary: deletes consumer group offsets + operationId: deleteConsumerGroupOffsets + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + - name: id + in: path + required: true + schema: + type: string + - name: topic + in: path + required: true + schema: + type: string + responses: + 200: + description: OK + + /api/clusters/{clusterName}/schemas: post: tags: @@ -3828,6 +3855,7 @@ components: - CREATE - DELETE - RESET_OFFSETS + - DELETE_OFFSETS - EXECUTE - MODIFY_GLOBAL_COMPATIBILITY - ANALYSIS_VIEW diff --git a/frontend/src/components/ConsumerGroups/Details/ListItem.tsx b/frontend/src/components/ConsumerGroups/Details/ListItem.tsx index 57b7cb133..e9a698dcc 100644 --- a/frontend/src/components/ConsumerGroups/Details/ListItem.tsx +++ b/frontend/src/components/ConsumerGroups/Details/ListItem.tsx @@ -1,14 +1,18 @@ import React from 'react'; -import { ConsumerGroupTopicPartition } from 'generated-sources'; +import {Action, ConsumerGroupTopicPartition, ResourceType} from 'generated-sources'; import { Link } from 'react-router-dom'; import { ClusterName } from 'lib/interfaces/cluster'; -import { clusterTopicPath } from 'lib/paths'; +import {ClusterGroupParam, clusterTopicPath} from 'lib/paths'; import MessageToggleIcon from 'components/common/Icons/MessageToggleIcon'; import IconButtonWrapper from 'components/common/Icons/IconButtonWrapper'; import { TableKeyLink } from 'components/common/table/Table/TableKeyLink.styled'; import TopicContents from './TopicContents/TopicContents'; import { FlexWrapper } from './ListItem.styled'; +import {Dropdown} from "../../common/Dropdown"; +import {ActionDropdownItem} from "../../common/ActionComponent"; +import useAppParams from "../../../lib/hooks/useAppParams"; +import {useDeleteConsumerGroupOffsetsMutation} from "../../../lib/hooks/api/consumers"; interface Props { clusterName: ClusterName; @@ -18,6 +22,8 @@ interface Props { const ListItem: React.FC = ({ clusterName, name, consumers }) => { const [isOpen, setIsOpen] = React.useState(false); + const consumer = useAppParams() + const deleteOffset = useDeleteConsumerGroupOffsetsMutation(consumer) const getTotalconsumerLag = () => { let count = 0; @@ -40,7 +46,24 @@ const ListItem: React.FC = ({ clusterName, name, consumers }) => { - {getTotalconsumerLag()} + + {getTotalconsumerLag()} + + + + deleteOffset.mutateAsync(name)} + danger + confirm={"Are you sure you want to delete offsets for this topic?"} + permission={{ + resource: ResourceType.CONSUMER, + action: Action.DELETE_OFFSETS, + value: consumer.consumerGroupID + }}> + Unsubscribe from topic + + + {isOpen && } diff --git a/frontend/src/lib/hooks/api/consumers.ts b/frontend/src/lib/hooks/api/consumers.ts index 85ae048a9..5990a9be5 100644 --- a/frontend/src/lib/hooks/api/consumers.ts +++ b/frontend/src/lib/hooks/api/consumers.ts @@ -90,3 +90,30 @@ export const useResetConsumerGroupOffsetsMutation = ({ } ); }; + +export const useDeleteConsumerGroupOffsetsMutation = ({ + clusterName, + consumerGroupID, + }: UseConsumerGroupDetailsProps) => { + const queryClient = useQueryClient(); + return useMutation( + (topic: string) => + api.deleteConsumerGroupOffsets({ + clusterName, + id: consumerGroupID, + topic, + }), + { + onSuccess: () => { + showSuccessAlert({ + message: `Consumer ${consumerGroupID} group offsets deleted.`, + }); + queryClient.invalidateQueries([ + 'clusters', + clusterName, + 'consumerGroups', + ]); + }, + } + ); +}; From 37f39fef08e33d678907062ccb71b11b93a4299b Mon Sep 17 00:00:00 2001 From: Cole Smith Date: Thu, 3 Oct 2024 11:58:39 -0400 Subject: [PATCH 2/3] Frontend remove danger --- frontend/src/components/ConsumerGroups/Details/ListItem.tsx | 2 -- 1 file changed, 2 deletions(-) diff --git a/frontend/src/components/ConsumerGroups/Details/ListItem.tsx b/frontend/src/components/ConsumerGroups/Details/ListItem.tsx index e9a698dcc..be3e65f12 100644 --- a/frontend/src/components/ConsumerGroups/Details/ListItem.tsx +++ b/frontend/src/components/ConsumerGroups/Details/ListItem.tsx @@ -53,8 +53,6 @@ const ListItem: React.FC = ({ clusterName, name, consumers }) => { deleteOffset.mutateAsync(name)} - danger - confirm={"Are you sure you want to delete offsets for this topic?"} permission={{ resource: ResourceType.CONSUMER, action: Action.DELETE_OFFSETS, From ebd94c38ee8a08365b00cbcf0de00c8a12cd995b Mon Sep 17 00:00:00 2001 From: Cole Smith Date: Thu, 3 Oct 2024 14:28:37 -0400 Subject: [PATCH 3/3] Fix style --- .../controller/ConsumerGroupsController.java | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/api/src/main/java/io/kafbat/ui/controller/ConsumerGroupsController.java b/api/src/main/java/io/kafbat/ui/controller/ConsumerGroupsController.java index 67fd26c9b..2adf8a306 100644 --- a/api/src/main/java/io/kafbat/ui/controller/ConsumerGroupsController.java +++ b/api/src/main/java/io/kafbat/ui/controller/ConsumerGroupsController.java @@ -1,9 +1,9 @@ package io.kafbat.ui.controller; import static io.kafbat.ui.model.rbac.permission.ConsumerGroupAction.DELETE; +import static io.kafbat.ui.model.rbac.permission.ConsumerGroupAction.DELETE_OFFSETS; import static io.kafbat.ui.model.rbac.permission.ConsumerGroupAction.RESET_OFFSETS; import static io.kafbat.ui.model.rbac.permission.ConsumerGroupAction.VIEW; -import static io.kafbat.ui.model.rbac.permission.ConsumerGroupAction.DELETE_OFFSETS; import static java.util.stream.Collectors.toMap; import io.kafbat.ui.api.ConsumerGroupsApi; @@ -194,17 +194,17 @@ public Mono> deleteConsumerGroupOffsets(String clusterName, String group, String topic, ServerWebExchange exchange) { - var context = AccessContext.builder() - .cluster(clusterName) - .topicActions(topic, TopicAction.VIEW) - .consumerGroupActions(group, DELETE_OFFSETS) - .operationName("resetConsumerGroupOffsets") - .build(); + var context = AccessContext.builder() + .cluster(clusterName) + .topicActions(topic, TopicAction.VIEW) + .consumerGroupActions(group, DELETE_OFFSETS) + .operationName("resetConsumerGroupOffsets") + .build(); - return validateAccess(context) - .then(consumerGroupService.deleteConsumerGroupOffsets(getCluster(clusterName), group, topic)) - .doOnEach(sig -> audit(context, sig)) - .thenReturn(ResponseEntity.ok().build()); + return validateAccess(context) + .then(consumerGroupService.deleteConsumerGroupOffsets(getCluster(clusterName), group, topic)) + .doOnEach(sig -> audit(context, sig)) + .thenReturn(ResponseEntity.ok().build()); } private ConsumerGroupsPageResponseDTO convertPage(ConsumerGroupService.ConsumerGroupsPage