diff --git a/src/main/java/io/openmessaging/connect/runtime/connectorwrapper/WorkerSinkTask.java b/src/main/java/io/openmessaging/connect/runtime/connectorwrapper/WorkerSinkTask.java index 0f44917..35c9163 100644 --- a/src/main/java/io/openmessaging/connect/runtime/connectorwrapper/WorkerSinkTask.java +++ b/src/main/java/io/openmessaging/connect/runtime/connectorwrapper/WorkerSinkTask.java @@ -23,6 +23,7 @@ import io.openmessaging.connect.runtime.common.ConnectKeyValue; import io.openmessaging.connect.runtime.common.LoggerName; import io.openmessaging.connect.runtime.common.QueuePartition; +import io.openmessaging.connector.api.common.QueueMetaData; import io.openmessaging.connector.api.data.Converter; import io.openmessaging.connector.api.data.SinkDataEntry; import io.openmessaging.connector.api.data.SourceDataEntry; @@ -105,38 +106,34 @@ public void run() { try { sinkTask.initialize(new SinkTaskContext() { @Override - public void resetOffset(String queueName, Long offset) { - //TODO oms-1.0.0-alpha支持获取, 并且queueName 是否需要换成QueuePartition - QueuePartition queuePartition = new QueuePartition(queueName, 0); + public void resetOffset(QueueMetaData queueMetaData, Long offset) { + QueuePartition queuePartition = new QueuePartition(queueMetaData.getQueueName(), 0); partitionOffsetMap.put(queuePartition, offset); } @Override - public void resetOffset(Map offsets) { - //TODO oms-1.0.0-alpha支持获取, 并且queueName 是否需要换成QueuePartition - for (Map.Entry entry : offsets.entrySet()) { - QueuePartition queuePartition = new QueuePartition(entry.getKey(), 0); + public void resetOffset(Map offsets) { + for (Map.Entry entry : offsets.entrySet()) { + QueuePartition queuePartition = new QueuePartition(entry.getKey().getQueueName(), 0); partitionOffsetMap.put(queuePartition, entry.getValue()); } } @Override - public void pause(List queueNames) { - //TODO queueName 是否需要换成QueuePartition - if (null != queueNames && queueNames.size() > 0) { - for (String queueName : queueNames) { - QueuePartition queuePartition = new QueuePartition(queueName, 0); + public void pause(List queueMetaDataList) { + if (null != queueMetaDataList && queueMetaDataList.size() > 0) { + for (QueueMetaData queueMetaData : queueMetaDataList) { + QueuePartition queuePartition = new QueuePartition(queueMetaData.getQueueName(), 0); partitionStatusMap.put(queuePartition, PartitionStatus.PAUSE); } } } @Override - public void resume(List queueNames) { - //TODO queueName 是否需要换成QueuePartition - if (null != queueNames && queueNames.size() > 0) { - for (String queueName : queueNames) { - QueuePartition queuePartition = new QueuePartition(queueName, 0); + public void resume(List queueMetaDataList) { + if (null != queueMetaDataList && queueMetaDataList.size() > 0) { + for (QueueMetaData queueMetaData : queueMetaDataList) { + QueuePartition queuePartition = new QueuePartition(queueMetaData.getQueueName(), 0); partitionStatusMap.remove(queuePartition); } } diff --git a/src/test/java/io/openmessaging/connect/runtime/connectorwrapper/testimpl/TestConnector.java b/src/test/java/io/openmessaging/connect/runtime/connectorwrapper/testimpl/TestConnector.java index e65609c..6bd2728 100644 --- a/src/test/java/io/openmessaging/connect/runtime/connectorwrapper/testimpl/TestConnector.java +++ b/src/test/java/io/openmessaging/connect/runtime/connectorwrapper/testimpl/TestConnector.java @@ -23,7 +23,7 @@ import java.util.List; -public class TestConnector implements Connector { +public class TestConnector extends Connector { @Override public String verifyAndSetConfig(KeyValue config) { diff --git a/src/test/java/io/openmessaging/connect/runtime/service/ConfigManagementServiceImplTest.java b/src/test/java/io/openmessaging/connect/runtime/service/ConfigManagementServiceImplTest.java index 6c035b3..adea173 100644 --- a/src/test/java/io/openmessaging/connect/runtime/service/ConfigManagementServiceImplTest.java +++ b/src/test/java/io/openmessaging/connect/runtime/service/ConfigManagementServiceImplTest.java @@ -227,7 +227,7 @@ public void testGetTaskConfigs() throws Exception { } -class TestConnector implements Connector { +class TestConnector extends Connector { private KeyValue config;