Skip to content

Issue 462 - Unsubscribe topics from consumer group #571

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.kafbat.ui.controller;

import static io.kafbat.ui.model.rbac.permission.ConsumerGroupAction.DELETE;
import static io.kafbat.ui.model.rbac.permission.ConsumerGroupAction.DELETE_OFFSETS;
import static io.kafbat.ui.model.rbac.permission.ConsumerGroupAction.RESET_OFFSETS;
import static io.kafbat.ui.model.rbac.permission.ConsumerGroupAction.VIEW;
import static java.util.stream.Collectors.toMap;
Expand Down Expand Up @@ -188,6 +189,24 @@ public Mono<ResponseEntity<Void>> resetConsumerGroupOffsets(String clusterName,
}).thenReturn(ResponseEntity.ok().build());
}

@Override
public Mono<ResponseEntity<Void>> deleteConsumerGroupOffsets(String clusterName,
String group,
String topic,
ServerWebExchange exchange) {
var context = AccessContext.builder()
.cluster(clusterName)
.topicActions(topic, TopicAction.VIEW)
.consumerGroupActions(group, DELETE_OFFSETS)
.operationName("resetConsumerGroupOffsets")
.build();

return validateAccess(context)
.then(consumerGroupService.deleteConsumerGroupOffsets(getCluster(clusterName), group, topic))
.doOnEach(sig -> audit(context, sig))
.thenReturn(ResponseEntity.ok().build());
}

private ConsumerGroupsPageResponseDTO convertPage(ConsumerGroupService.ConsumerGroupsPage
consumerGroupConsumerGroupsPage) {
return new ConsumerGroupsPageResponseDTO()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@ public enum ConsumerGroupAction implements PermissibleAction {

VIEW,
DELETE(VIEW),
RESET_OFFSETS(VIEW)
RESET_OFFSETS(VIEW),
DELETE_OFFSETS(VIEW);

;

public static final Set<ConsumerGroupAction> ALTER_ACTIONS = Set.of(DELETE, RESET_OFFSETS);
public static final Set<ConsumerGroupAction> ALTER_ACTIONS = Set.of(DELETE, RESET_OFFSETS, DELETE_OFFSETS);

private final ConsumerGroupAction[] dependantActions;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,4 +270,11 @@ public EnhancedConsumer createConsumer(KafkaCluster cluster,
);
}

public Mono<Void> deleteConsumerGroupOffsets(KafkaCluster cluster,
String groupId,
String topic) {
return adminClientService
.get(cluster)
.flatMap(adminClient -> adminClient.deleteConsumerGroupOffsets(groupId, topic));
}
}
15 changes: 15 additions & 0 deletions api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,21 @@ public Mono<Void> alterConsumerGroupOffsets(String groupId, Map<TopicPartition,
.all());
}

public Mono<Void> deleteConsumerGroupOffsets(String groupId, String topic) {
var offsets = listConsumerGroupOffsets(List.of(groupId), null);
Mono<Set<TopicPartition>> partitions = offsets.map(tps ->
tps.row(groupId)
.keySet()
.stream()
.filter(tp -> tp.topic().equals(topic))
.collect(Collectors.toSet())
);

return partitions.flatMap(partitionSet ->
toMono(client.deleteConsumerGroupOffsets(groupId, partitionSet).all())
);
}

/**
* List offset for the topic's partitions and OffsetSpec.
*
Expand Down
28 changes: 28 additions & 0 deletions contract/src/main/resources/swagger/kafbat-ui-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1048,6 +1048,33 @@ paths:
200:
description: OK

/api/clusters/{clusterName}/consumer-groups/{id}/topics/{topic}:
delete:
tags:
- Consumer Groups
summary: deletes consumer group offsets
operationId: deleteConsumerGroupOffsets
parameters:
- name: clusterName
in: path
required: true
schema:
type: string
- name: id
in: path
required: true
schema:
type: string
- name: topic
in: path
required: true
schema:
type: string
responses:
200:
description: OK


/api/clusters/{clusterName}/schemas:
post:
tags:
Expand Down Expand Up @@ -3828,6 +3855,7 @@ components:
- CREATE
- DELETE
- RESET_OFFSETS
- DELETE_OFFSETS
- EXECUTE
- MODIFY_GLOBAL_COMPATIBILITY
- ANALYSIS_VIEW
Expand Down
27 changes: 24 additions & 3 deletions frontend/src/components/ConsumerGroups/Details/ListItem.tsx
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
import React from 'react';
import { ConsumerGroupTopicPartition } from 'generated-sources';
import {Action, ConsumerGroupTopicPartition, ResourceType} from 'generated-sources';

Check warning on line 2 in frontend/src/components/ConsumerGroups/Details/ListItem.tsx

View workflow job for this annotation

GitHub Actions / build / build-and-test

Replace `Action,·ConsumerGroupTopicPartition,·ResourceType` with `⏎··Action,⏎··ConsumerGroupTopicPartition,⏎··ResourceType,⏎`
import { Link } from 'react-router-dom';
import { ClusterName } from 'lib/interfaces/cluster';
import { clusterTopicPath } from 'lib/paths';
import {ClusterGroupParam, clusterTopicPath} from 'lib/paths';

Check warning on line 5 in frontend/src/components/ConsumerGroups/Details/ListItem.tsx

View workflow job for this annotation

GitHub Actions / build / build-and-test

Replace `ClusterGroupParam,·clusterTopicPath` with `·ClusterGroupParam,·clusterTopicPath·`
import MessageToggleIcon from 'components/common/Icons/MessageToggleIcon';
import IconButtonWrapper from 'components/common/Icons/IconButtonWrapper';
import { TableKeyLink } from 'components/common/table/Table/TableKeyLink.styled';

import TopicContents from './TopicContents/TopicContents';

Check failure on line 10 in frontend/src/components/ConsumerGroups/Details/ListItem.tsx

View workflow job for this annotation

GitHub Actions / build / build-and-test

`./TopicContents/TopicContents` import should occur after import of `../../../lib/hooks/api/consumers`
import { FlexWrapper } from './ListItem.styled';

Check failure on line 11 in frontend/src/components/ConsumerGroups/Details/ListItem.tsx

View workflow job for this annotation

GitHub Actions / build / build-and-test

There should be at least one empty line between import groups

Check failure on line 11 in frontend/src/components/ConsumerGroups/Details/ListItem.tsx

View workflow job for this annotation

GitHub Actions / build / build-and-test

`./ListItem.styled` import should occur after import of `../../../lib/hooks/api/consumers`
import {Dropdown} from "../../common/Dropdown";

Check warning on line 12 in frontend/src/components/ConsumerGroups/Details/ListItem.tsx

View workflow job for this annotation

GitHub Actions / build / build-and-test

Replace `Dropdown}·from·"../../common/Dropdown"` with `·Dropdown·}·from·'../../common/Dropdown'`

Check failure on line 12 in frontend/src/components/ConsumerGroups/Details/ListItem.tsx

View workflow job for this annotation

GitHub Actions / build / build-and-test

Relative imports from parent directories are not allowed. Please either pass what you're importing through at runtime (dependency injection), move `ListItem.tsx` to same directory as `../../common/Dropdown` or consider making `../../common/Dropdown` a package
import {ActionDropdownItem} from "../../common/ActionComponent";

Check warning on line 13 in frontend/src/components/ConsumerGroups/Details/ListItem.tsx

View workflow job for this annotation

GitHub Actions / build / build-and-test

Replace `ActionDropdownItem}·from·"../../common/ActionComponent"` with `·ActionDropdownItem·}·from·'../../common/ActionComponent'`

Check failure on line 13 in frontend/src/components/ConsumerGroups/Details/ListItem.tsx

View workflow job for this annotation

GitHub Actions / build / build-and-test

Relative imports from parent directories are not allowed. Please either pass what you're importing through at runtime (dependency injection), move `ListItem.tsx` to same directory as `../../common/ActionComponent` or consider making `../../common/ActionComponent` a package
import useAppParams from "../../../lib/hooks/useAppParams";

Check warning on line 14 in frontend/src/components/ConsumerGroups/Details/ListItem.tsx

View workflow job for this annotation

GitHub Actions / build / build-and-test

Replace `"../../../lib/hooks/useAppParams"` with `'../../../lib/hooks/useAppParams'`

Check failure on line 14 in frontend/src/components/ConsumerGroups/Details/ListItem.tsx

View workflow job for this annotation

GitHub Actions / build / build-and-test

Relative imports from parent directories are not allowed. Please either pass what you're importing through at runtime (dependency injection), move `ListItem.tsx` to same directory as `../../../lib/hooks/useAppParams` or consider making `../../../lib/hooks/useAppParams` a package
import {useDeleteConsumerGroupOffsetsMutation} from "../../../lib/hooks/api/consumers";

Check warning on line 15 in frontend/src/components/ConsumerGroups/Details/ListItem.tsx

View workflow job for this annotation

GitHub Actions / build / build-and-test

Replace `useDeleteConsumerGroupOffsetsMutation}·from·"../../../lib/hooks/api/consumers"` with `·useDeleteConsumerGroupOffsetsMutation·}·from·'../../../lib/hooks/api/consumers'`

Check failure on line 15 in frontend/src/components/ConsumerGroups/Details/ListItem.tsx

View workflow job for this annotation

GitHub Actions / build / build-and-test

Relative imports from parent directories are not allowed. Please either pass what you're importing through at runtime (dependency injection), move `ListItem.tsx` to same directory as `../../../lib/hooks/api/consumers` or consider making `../../../lib/hooks/api/consumers` a package

interface Props {
clusterName: ClusterName;
Expand All @@ -18,10 +22,12 @@

const ListItem: React.FC<Props> = ({ clusterName, name, consumers }) => {
const [isOpen, setIsOpen] = React.useState(false);
const consumer = useAppParams<ClusterGroupParam>()

Check warning on line 25 in frontend/src/components/ConsumerGroups/Details/ListItem.tsx

View workflow job for this annotation

GitHub Actions / build / build-and-test

Insert `;`
const deleteOffset = useDeleteConsumerGroupOffsetsMutation(consumer)

Check warning on line 26 in frontend/src/components/ConsumerGroups/Details/ListItem.tsx

View workflow job for this annotation

GitHub Actions / build / build-and-test

Insert `;`

const getTotalconsumerLag = () => {
let count = 0;
consumers.forEach((consumer) => {

Check failure on line 30 in frontend/src/components/ConsumerGroups/Details/ListItem.tsx

View workflow job for this annotation

GitHub Actions / build / build-and-test

'consumer' is already declared in the upper scope on line 25 column 9
count += consumer?.consumerLag || 0;
});
return count;
Expand All @@ -40,7 +46,22 @@
</TableKeyLink>
</FlexWrapper>
</td>
<td>{getTotalconsumerLag()}</td>
<td>

Check warning on line 49 in frontend/src/components/ConsumerGroups/Details/ListItem.tsx

View workflow job for this annotation

GitHub Actions / build / build-and-test

Replace `⏎··········{getTotalconsumerLag()}⏎········` with `{getTotalconsumerLag()}`
{getTotalconsumerLag()}
</td>
<td>
<Dropdown>
<ActionDropdownItem
onClick={() => deleteOffset.mutateAsync(name)}
permission={{
resource: ResourceType.CONSUMER,
action: Action.DELETE_OFFSETS,
value: consumer.consumerGroupID

Check warning on line 59 in frontend/src/components/ConsumerGroups/Details/ListItem.tsx

View workflow job for this annotation

GitHub Actions / build / build-and-test

Insert `,`
}}>
Unsubscribe from topic
</ActionDropdownItem>
</Dropdown>
</td>
</tr>
{isOpen && <TopicContents consumers={consumers} />}
</>
Expand Down
27 changes: 27 additions & 0 deletions frontend/src/lib/hooks/api/consumers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,30 @@ export const useResetConsumerGroupOffsetsMutation = ({
}
);
};

export const useDeleteConsumerGroupOffsetsMutation = ({
clusterName,
consumerGroupID,
}: UseConsumerGroupDetailsProps) => {
const queryClient = useQueryClient();
return useMutation(
(topic: string) =>
api.deleteConsumerGroupOffsets({
clusterName,
id: consumerGroupID,
topic,
}),
{
onSuccess: () => {
showSuccessAlert({
message: `Consumer ${consumerGroupID} group offsets deleted.`,
});
queryClient.invalidateQueries([
'clusters',
clusterName,
'consumerGroups',
]);
},
}
);
};
Loading