Skip to content

Commit 8488614

Browse files
committed
Restore changes
Signed-off-by: Gao Hongtao <hanahmily@gmail.com>
1 parent 560fc48 commit 8488614

File tree

3 files changed

+9
-33
lines changed

3 files changed

+9
-33
lines changed

src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java

Lines changed: 7 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -400,30 +400,15 @@ public StreamWrite createStreamWrite(String group, String name, final String ele
400400
}
401401

402402
/**
403-
* Build a StreamWrite request.
403+
* Build a trace bulk write processor.
404404
*
405-
* @param group the group of the stream
406-
* @param name the name of the stream
407-
* @param elementId the primary key of the stream
408-
* @param timestamp the timestamp of the stream
409-
* @return the request to be built
405+
* @param maxBulkSize the max size of each flush. The actual size is determined by the length of byte array.
406+
* @param flushInterval if given maxBulkSize is not reached in this period, the flush would be trigger
407+
* automatically. Unit is second.
408+
* @param concurrency the number of concurrency would run for the flush max.
409+
* @param timeout network timeout threshold in seconds.
410+
* @return trace bulk write processor
410411
*/
411-
public StreamWrite createStreamWrite(String group, String name, final String elementId, long timestamp) throws BanyanDBException {
412-
Preconditions.checkArgument(!Strings.isNullOrEmpty(group));
413-
Preconditions.checkArgument(!Strings.isNullOrEmpty(name));
414-
return new StreamWrite(this.metadataCache.findStreamMetadata(group, name), elementId, timestamp);
415-
}
416-
417-
/**
418-
* Build a trace bulk write processor.
419-
*
420-
* @param maxBulkSize the max size of each flush. The actual size is determined by the length of byte array.
421-
* @param flushInterval if given maxBulkSize is not reached in this period, the flush would be trigger
422-
* automatically. Unit is second.
423-
* @param concurrency the number of concurrency would run for the flush max.
424-
* @param timeout network timeout threshold in seconds.
425-
* @return trace bulk write processor
426-
*/
427412
public TraceBulkWriteProcessor buildTraceWriteProcessor(int maxBulkSize, int flushInterval, int concurrency, int timeout) {
428413
checkState(this.traceServiceStub != null, "trace service is null");
429414

src/main/java/org/apache/skywalking/banyandb/v1/client/StreamWrite.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -50,16 +50,6 @@ public class StreamWrite extends AbstractWrite<BanyandbStream.WriteRequest> {
5050
this.elementId = elementId;
5151
}
5252

53-
/**
54-
* Create a StreamWrite with initial timestamp.
55-
*
56-
* @param timestamp in milliseconds
57-
*/
58-
public StreamWrite(MetadataCache.EntityMetadata streamMetadata, String elementId, long timestamp) {
59-
this(streamMetadata, elementId);
60-
this.timestamp = Optional.of(timestamp);
61-
}
62-
6353
@Override
6454
public StreamWrite tag(String tagName, Serializable<BanyandbModel.TagValue> tagValue) throws BanyanDBException {
6555
return (StreamWrite) super.tag(tagName, tagValue);

src/test/java/org/apache/skywalking/banyandb/v1/client/ITBanyanDBStreamQueryTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public void testStreamQuery_TraceID() throws BanyanDBException, ExecutionExcepti
9494
String dbType = "SQL";
9595
String dbInstance = "127.0.0.1:3306";
9696

97-
StreamWrite streamWrite = client.createStreamWrite("sw_record", "trace", segmentId, now.toEpochMilli())
97+
StreamWrite streamWrite = client.createStreamWrite("sw_record", "trace", segmentId)
9898
.tag("data_binary", Value.binaryTagValue(byteData))
9999
.tag("trace_id", Value.stringTagValue(traceId)) // 0
100100
.tag("state", Value.longTagValue(state)) // 1
@@ -109,6 +109,7 @@ public void testStreamQuery_TraceID() throws BanyanDBException, ExecutionExcepti
109109
.tag("mq.broker", Value.stringTagValue(broker)) // 10
110110
.tag("mq.topic", Value.stringTagValue(topic)) // 11
111111
.tag("mq.queue", Value.stringTagValue(queue)); // 12
112+
streamWrite.setTimestamp(now.toEpochMilli());
112113

113114
CompletableFuture<Void> f = processor.add(streamWrite);
114115
f.exceptionally(exp -> {

0 commit comments

Comments
 (0)