Skip to content

Commit 1ec27be

Browse files
authored
Add trace model (#94)
1 parent 9d47a2c commit 1ec27be

19 files changed

+1433
-3
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,5 @@ packages/
2020
/test/plugin/workspace
2121
/test/jacoco/classes
2222
/test/jacoco/*.exec
23-
test/jacoco
23+
test/jacoco
24+
CLAUDE.md

CHANGES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ Release Notes.
1818
* Bump up parent Apache pom to v35.
1919
* Bump up maven to 3.6.3.
2020
* Add IDEA setup doc to support super large generated file(by protoc).
21+
* Add Trace model.
2122

2223
0.8.0
2324
------------------

src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.IndexRule;
4444
import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.IndexRuleBinding;
4545
import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.Subject;
46+
import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.Trace;
4647
import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
4748
import org.apache.skywalking.banyandb.property.v1.BanyandbProperty;
4849
import org.apache.skywalking.banyandb.property.v1.BanyandbProperty.Property;
@@ -53,6 +54,8 @@
5354
import org.apache.skywalking.banyandb.measure.v1.MeasureServiceGrpc;
5455
import org.apache.skywalking.banyandb.stream.v1.BanyandbStream;
5556
import org.apache.skywalking.banyandb.stream.v1.StreamServiceGrpc;
57+
import org.apache.skywalking.banyandb.trace.v1.BanyandbTrace;
58+
import org.apache.skywalking.banyandb.trace.v1.TraceServiceGrpc;
5659
import org.apache.skywalking.banyandb.v1.client.auth.AuthInterceptor;
5760
import org.apache.skywalking.banyandb.v1.client.grpc.HandleExceptionsWith;
5861
import org.apache.skywalking.banyandb.v1.client.grpc.channel.ChannelManager;
@@ -69,6 +72,7 @@
6972
import org.apache.skywalking.banyandb.v1.client.metadata.ResourceExist;
7073
import org.apache.skywalking.banyandb.v1.client.metadata.StreamMetadataRegistry;
7174
import org.apache.skywalking.banyandb.v1.client.metadata.TopNAggregationMetadataRegistry;
75+
import org.apache.skywalking.banyandb.v1.client.metadata.TraceMetadataRegistry;
7276

7377
import java.io.Closeable;
7478
import java.io.IOException;
@@ -124,6 +128,11 @@ public class BanyanDBClient implements Closeable {
124128
*/
125129
@Getter(value = AccessLevel.PACKAGE)
126130
private MeasureServiceGrpc.MeasureServiceStub measureServiceStub;
131+
/**
132+
* gRPC client stub
133+
*/
134+
@Getter(value = AccessLevel.PACKAGE)
135+
private TraceServiceGrpc.TraceServiceStub traceServiceStub;
127136
/**
128137
* gRPC future stub.
129138
*/
@@ -134,6 +143,11 @@ public class BanyanDBClient implements Closeable {
134143
*/
135144
@Getter(value = AccessLevel.PACKAGE)
136145
private MeasureServiceGrpc.MeasureServiceBlockingStub measureServiceBlockingStub;
146+
/**
147+
* gRPC future stub.
148+
*/
149+
@Getter(value = AccessLevel.PACKAGE)
150+
private TraceServiceGrpc.TraceServiceBlockingStub traceServiceBlockingStub;
137151
/**
138152
* The connection status.
139153
*/
@@ -211,8 +225,10 @@ public void connect() throws IOException {
211225
this.channel = interceptedChannel;
212226
streamServiceBlockingStub = StreamServiceGrpc.newBlockingStub(this.channel);
213227
measureServiceBlockingStub = MeasureServiceGrpc.newBlockingStub(this.channel);
228+
traceServiceBlockingStub = TraceServiceGrpc.newBlockingStub(this.channel);
214229
streamServiceStub = StreamServiceGrpc.newStub(this.channel);
215230
measureServiceStub = MeasureServiceGrpc.newStub(this.channel);
231+
traceServiceStub = TraceServiceGrpc.newStub(this.channel);
216232
isConnected = true;
217233
}
218234
} finally {
@@ -228,8 +244,10 @@ void connect(Channel channel) {
228244
this.channel = channel;
229245
streamServiceBlockingStub = StreamServiceGrpc.newBlockingStub(this.channel);
230246
measureServiceBlockingStub = MeasureServiceGrpc.newBlockingStub(this.channel);
247+
traceServiceBlockingStub = TraceServiceGrpc.newBlockingStub(this.channel);
231248
streamServiceStub = StreamServiceGrpc.newStub(this.channel);
232249
measureServiceStub = MeasureServiceGrpc.newStub(this.channel);
250+
traceServiceStub = TraceServiceGrpc.newStub(this.channel);
233251
isConnected = true;
234252
}
235253
} finally {
@@ -396,6 +414,49 @@ public StreamWrite createStreamWrite(String group, String name, final String ele
396414
return new StreamWrite(this.metadataCache.findStreamMetadata(group, name), elementId, timestamp);
397415
}
398416

417+
/**
418+
* Build a trace bulk write processor.
419+
*
420+
* @param maxBulkSize the max size of each flush. The actual size is determined by the length of byte array.
421+
* @param flushInterval if given maxBulkSize is not reached in this period, the flush would be trigger
422+
* automatically. Unit is second.
423+
* @param concurrency the number of concurrency would run for the flush max.
424+
* @param timeout network timeout threshold in seconds.
425+
* @return trace bulk write processor
426+
*/
427+
public TraceBulkWriteProcessor buildTraceWriteProcessor(int maxBulkSize, int flushInterval, int concurrency, int timeout) {
428+
checkState(this.traceServiceStub != null, "trace service is null");
429+
430+
return new TraceBulkWriteProcessor(this, maxBulkSize, flushInterval, concurrency, timeout, WRITE_HISTOGRAM, options);
431+
}
432+
433+
/**
434+
* Build a TraceWrite request.
435+
*
436+
* @param group the group of the trace
437+
* @param name the name of the trace
438+
* @param timestamp the timestamp of the trace
439+
* @return the request to be built
440+
*/
441+
public TraceWrite createTraceWrite(String group, String name, long timestamp) throws BanyanDBException {
442+
Preconditions.checkArgument(!Strings.isNullOrEmpty(group));
443+
Preconditions.checkArgument(!Strings.isNullOrEmpty(name));
444+
return new TraceWrite(this.metadataCache.findTraceMetadata(group, name), timestamp);
445+
}
446+
447+
/**
448+
* Build a TraceWrite request without initial timestamp.
449+
*
450+
* @param group the group of the trace
451+
* @param name the name of the trace
452+
* @return the request to be built
453+
*/
454+
public TraceWrite createTraceWrite(String group, String name) throws BanyanDBException {
455+
Preconditions.checkArgument(!Strings.isNullOrEmpty(group));
456+
Preconditions.checkArgument(!Strings.isNullOrEmpty(name));
457+
return new TraceWrite(this.metadataCache.findTraceMetadata(group, name));
458+
}
459+
399460
/**
400461
* Query streams according to given conditions
401462
*
@@ -456,6 +517,29 @@ public MeasureQueryResponse query(MeasureQuery measureQuery) throws BanyanDBExce
456517
throw new RuntimeException("No metadata found for the query");
457518
}
458519

520+
/**
521+
* Query traces according to given conditions
522+
*
523+
* @param traceQuery condition for query
524+
* @return trace query response.
525+
*/
526+
public TraceQueryResponse query(TraceQuery traceQuery) throws BanyanDBException {
527+
checkState(this.traceServiceStub != null, "trace service is null");
528+
529+
for (String group : traceQuery.groups) {
530+
MetadataCache.EntityMetadata em = this.metadataCache.findTraceMetadata(group, traceQuery.name);
531+
if (em != null) {
532+
final BanyandbTrace.QueryResponse response = HandleExceptionsWith.callAndTranslateApiException(() ->
533+
this.traceServiceBlockingStub
534+
.withDeadlineAfter(this.getOptions().getDeadline(), TimeUnit.SECONDS)
535+
.query(traceQuery.build(em)));
536+
return new TraceQueryResponse(response);
537+
}
538+
539+
}
540+
throw new RuntimeException("No metadata found for the query");
541+
}
542+
459543
/**
460544
* Define a new group and attach to the current client.
461545
*
@@ -968,6 +1052,69 @@ public DeleteResponse deleteProperty(String group, String name, String id) throw
9681052
}
9691053
}
9701054

1055+
/**
1056+
* Define a new trace
1057+
*
1058+
* @param trace the trace to be stored in the BanyanDB
1059+
* @throws BanyanDBException if the trace is invalid
1060+
*/
1061+
public void define(Trace trace) throws BanyanDBException {
1062+
TraceMetadataRegistry registry = new TraceMetadataRegistry(checkNotNull(this.channel));
1063+
registry.create(trace);
1064+
}
1065+
1066+
/**
1067+
* Update the trace.
1068+
*
1069+
* @param trace the trace to be stored in the BanyanDB
1070+
* @throws BanyanDBException if the trace is invalid
1071+
*/
1072+
public void update(Trace trace) throws BanyanDBException {
1073+
TraceMetadataRegistry registry = new TraceMetadataRegistry(checkNotNull(this.channel));
1074+
registry.update(trace);
1075+
}
1076+
1077+
/**
1078+
* Find the trace with given group and name
1079+
*
1080+
* @param group group of the metadata
1081+
* @param name name of the metadata
1082+
* @return the trace found in BanyanDB. Otherwise, null is returned.
1083+
*/
1084+
public Trace findTrace(String group, String name) throws BanyanDBException {
1085+
try {
1086+
return new TraceMetadataRegistry(checkNotNull(this.channel)).get(group, name);
1087+
} catch (BanyanDBException ex) {
1088+
if (ex.getStatus().equals(Status.Code.NOT_FOUND)) {
1089+
return null;
1090+
}
1091+
throw ex;
1092+
}
1093+
}
1094+
1095+
/**
1096+
* Find the traces with given group
1097+
*
1098+
* @param group group of the metadata
1099+
* @return the traces found in BanyanDB
1100+
*/
1101+
public List<Trace> findTraces(String group) throws BanyanDBException {
1102+
TraceMetadataRegistry registry = new TraceMetadataRegistry(checkNotNull(this.channel));
1103+
return registry.list(group);
1104+
}
1105+
1106+
/**
1107+
* Delete the trace
1108+
*
1109+
* @param group group of the metadata
1110+
* @param name name of the metadata
1111+
* @return if this trace has been deleted
1112+
*/
1113+
public boolean deleteTrace(String group, String name) throws BanyanDBException {
1114+
TraceMetadataRegistry registry = new TraceMetadataRegistry(checkNotNull(this.channel));
1115+
return registry.delete(group, name);
1116+
}
1117+
9711118
/**
9721119
* Try to find the group defined
9731120
*
@@ -1169,6 +1316,20 @@ public ResourceExist existProperty(String group, String name) throws BanyanDBExc
11691316
return new PropertyMetadataRegistry(checkNotNull(this.channel)).exist(group, name);
11701317
}
11711318

1319+
/**
1320+
* Check whether the trace definition is existed in the server
1321+
*
1322+
* @param group group of the metadata
1323+
* @param name name of the metadata
1324+
* @return ResourceExist which indicates whether group and trace exist
1325+
*/
1326+
public ResourceExist existTrace(String group, String name) throws BanyanDBException {
1327+
Preconditions.checkArgument(!Strings.isNullOrEmpty(group));
1328+
Preconditions.checkArgument(!Strings.isNullOrEmpty(name));
1329+
1330+
return new TraceMetadataRegistry(checkNotNull(this.channel)).exist(group, name);
1331+
}
1332+
11721333
/**
11731334
* Update the stream metadata cache from the server
11741335
* @param group the group of the stream
@@ -1193,6 +1354,18 @@ public MetadataCache.EntityMetadata updateMeasureMetadataCacheFromSever(String g
11931354
return this.metadataCache.updateMeasureFromSever(group, name);
11941355
}
11951356

1357+
/**
1358+
* Update the trace metadata cache from the server
1359+
* @param group the group of the trace
1360+
* @param name the name of the trace
1361+
* @return the updated trace metadata, or null if the trace does not exist
1362+
*/
1363+
public MetadataCache.EntityMetadata updateTraceMetadataCacheFromServer(String group, String name) throws BanyanDBException {
1364+
Preconditions.checkArgument(!Strings.isNullOrEmpty(group));
1365+
Preconditions.checkArgument(!Strings.isNullOrEmpty(name));
1366+
return this.metadataCache.updateTraceFromServer(group, name);
1367+
}
1368+
11961369
/**
11971370
* Get the API version of the server
11981371
*

src/main/java/org/apache/skywalking/banyandb/v1/client/TagAndValue.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import com.google.protobuf.ByteString;
2424
import com.google.protobuf.NullValue;
25+
import com.google.protobuf.Timestamp;
2526
import lombok.EqualsAndHashCode;
2627
import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
2728

@@ -77,6 +78,8 @@ public static TagAndValue<?> fromProtobuf(BanyandbModel.Tag tag) {
7778
return new StringArrayTagPair(tag.getKey(), tag.getValue().getStrArray().getValueList());
7879
case BINARY_DATA:
7980
return new BinaryTagPair(tag.getKey(), tag.getValue().getBinaryData());
81+
case TIMESTAMP:
82+
return new TimestampTagPair(tag.getKey(), tag.getValue().getTimestamp());
8083
case NULL:
8184
return new NullTagPair(tag.getKey());
8285
default:
@@ -182,6 +185,32 @@ public static TagAndValue<ByteString> newBinaryTag(final String tagName, final b
182185
return new BinaryTagPair(tagName, ByteString.copyFrom(bytes));
183186
}
184187

188+
@EqualsAndHashCode(callSuper = true)
189+
public static class TimestampTagPair extends TagAndValue<Long> {
190+
TimestampTagPair(final String tagName, final long epochMilli) {
191+
super(tagName, epochMilli);
192+
}
193+
194+
TimestampTagPair(final String tagName, final Timestamp value) {
195+
super(tagName, value.getSeconds() * 1000 + value.getNanos() / 1000000);
196+
}
197+
198+
@Override
199+
protected BanyandbModel.TagValue buildTypedTagValue() {
200+
Timestamp timestamp = Timestamp.newBuilder()
201+
.setSeconds(value / 1000)
202+
.setNanos((int) ((value % 1000) * 1000000))
203+
.build();
204+
return BanyandbModel.TagValue.newBuilder()
205+
.setTimestamp(timestamp)
206+
.build();
207+
}
208+
}
209+
210+
public static TagAndValue<Long> newTimestampTag(final String tagName, final long epochMilli) {
211+
return new TimestampTagPair(tagName, epochMilli);
212+
}
213+
185214
@EqualsAndHashCode(callSuper = true)
186215
public static class NullTagPair extends TagAndValue<Void> {
187216
NullTagPair(final String tagName) {

0 commit comments

Comments
 (0)