Skip to content

Commit f45347a

Browse files
authored
0.59.0
- Changes to support invoke method on broker. - Fix bug in streaming logs. - InboundRequest.getParams() -> getParameters() - OutboundRequestHandler.getParams() -> getParameters() - InboundListRequest added()->update(), changed()->update(), removed()->remove()
2 parents 7375b70 + 679925d commit f45347a

20 files changed

+250
-147
lines changed

build.gradle

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,14 @@ subprojects {
55
apply plugin: 'maven'
66

77
group 'org.iot-dsa'
8-
version '0.58.0'
8+
version '0.59.0'
99

1010
sourceCompatibility = 1.8
1111
targetCompatibility = 1.8
1212

1313
repositories {
1414
mavenLocal()
15-
mavenCentral()
15+
jcenter()
1616
}
1717

1818
task sourcesJar(group: 'build', type: Jar, dependsOn: classes) {
@@ -45,5 +45,5 @@ allprojects {
4545
}
4646

4747
wrapper {
48-
gradleVersion = '5.2'
48+
gradleVersion = '5.4'
4949
}

dslink-v2/src/main/java/com/acuity/iot/dsa/dslink/protocol/requester/DSOutboundInvokeStub.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@ public OutboundInvokeHandler getHandler() {
4040
@Override
4141
public void handleResponse(DSMap response) {
4242
try {
43+
if (handler instanceof RawHandler) {
44+
((RawHandler) handler).handleRaw(response);
45+
return;
46+
}
4347
DSList updates = response.getList("updates");
4448
DSMap meta = response.getMap("meta");
4549
if (meta != null) {
@@ -112,4 +116,10 @@ protected DSMap getParams() {
112116
return params;
113117
}
114118

119+
public interface RawHandler {
120+
121+
public void handleRaw(DSMap response);
122+
123+
}
124+
115125
}

dslink-v2/src/main/java/com/acuity/iot/dsa/dslink/protocol/requester/DSOutboundListStub.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ public boolean isStreamOpen() {
159159
}
160160

161161
@Override
162-
public DSIValue getParams() {
162+
public DSIValue getParameters() {
163163
return null;
164164
}
165165

dslink-v2/src/main/java/com/acuity/iot/dsa/dslink/protocol/responder/DSInboundInvoke.java

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ public class DSInboundInvoke extends DSInboundRequest
3939
private static final int STATE_UPDATES = 2;
4040
private static final int STATE_CLOSE_PENDING = 3;
4141
private static final int STATE_CLOSED = 4;
42+
private static final int STATE_RAW = 10;
4243

4344
///////////////////////////////////////////////////////////////////////////
4445
// Instance Fields
@@ -156,6 +157,9 @@ public void run() {
156157
DSIResponder responder = (DSIResponder) path.getTarget();
157158
setPath(path.getPath());
158159
result = responder.onInvoke(this);
160+
if (result instanceof RawActionResult) {
161+
state = STATE_RAW;
162+
}
159163
} else {
160164
DSInfo info = path.getTargetInfo();
161165
if (!info.isAction()) {
@@ -193,6 +197,10 @@ public void send(DSList row) {
193197
enqueueUpdate(new Update(row));
194198
}
195199

200+
public void sendRaw(DSMap raw) {
201+
enqueueUpdate(new Update(raw));
202+
}
203+
196204
/**
197205
* For v2 only, set to false to auto close the stream after sending the initial state.
198206
*/
@@ -214,6 +222,9 @@ public boolean write(DSSession session, MessageWriter writer) {
214222
}
215223
writeBegin(writer);
216224
switch (state) {
225+
case STATE_RAW:
226+
writeRaw(writer);
227+
break;
217228
case STATE_INIT:
218229
writeColumns(writer);
219230
writeInitialResults(writer);
@@ -429,6 +440,36 @@ private void writeInitialResults(MessageWriter writer) {
429440
}
430441
}
431442

443+
private void writeRaw(MessageWriter writer) {
444+
DSIWriter out = writer.getWriter();
445+
Update update = updateHead; //peak ahead
446+
if (update == null) {
447+
return;
448+
}
449+
DSMap map;
450+
DSResponder responder = getResponder();
451+
while (true) {
452+
update = dequeueUpdate();
453+
map = update.raw;
454+
map.remove("rid");
455+
for (DSMap.Entry e : map) {
456+
out.key(e.getKey());
457+
out.value(e.getValue());
458+
}
459+
if (updateHead == null) {
460+
break;
461+
}
462+
if (responder.shouldEndMessage()) {
463+
enqueueResponse();
464+
break;
465+
}
466+
}
467+
String stream = map.get("stream", "");
468+
if (stream.equals("closed")) {
469+
doClose();
470+
}
471+
}
472+
432473
private void writeUpdates(MessageWriter writer) {
433474
DSIWriter out = writer.getWriter();
434475
Update update = updateHead; //peak ahead
@@ -474,7 +515,8 @@ private void writeUpdates(MessageWriter writer) {
474515
protected enum UpdateType {
475516
INSERT("insert"),
476517
REFRESH("refresh"),
477-
REPLACE("replace");
518+
REPLACE("replace"),
519+
RAW("raw");
478520

479521
private String display;
480522

@@ -487,6 +529,13 @@ public String toString() {
487529
}
488530
}
489531

532+
/**
533+
* Used as a pass through on the broker.
534+
*/
535+
public interface RawActionResult extends ActionResult {
536+
537+
}
538+
490539
/**
491540
* Describes an update to be sent to the requester.
492541
*/
@@ -495,10 +544,16 @@ protected static class Update {
495544
int beginIndex = -1;
496545
int endIndex = -1;
497546
Update next;
547+
DSMap raw;
498548
DSList row;
499549
DSList[] rows;
500550
UpdateType type;
501551

552+
Update(DSMap raw) {
553+
this.raw = raw;
554+
this.type = UpdateType.RAW;
555+
}
556+
502557
Update(DSList row) {
503558
this.row = row;
504559
}

dslink-v2/src/main/java/com/acuity/iot/dsa/dslink/protocol/responder/DSInboundList.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public class DSInboundList extends DSInboundRequest
7777
///////////////////////////////////////////////////////////////////////////
7878

7979
@Override
80-
public void added(String name, ApiObject child) {
80+
public void update(String name, ApiObject child) {
8181
if (!isClosed()) {
8282
enqueue(new AddUpdate(name, child));
8383
}
@@ -89,7 +89,7 @@ public boolean canWrite(DSSession session) {
8989
}
9090

9191
@Override
92-
public void changed(String name, DSElement value) {
92+
public void update(String name, DSElement value) {
9393
if (!isClosed()) {
9494
enqueue(new ChangeUpdate(name, value));
9595
}
@@ -160,20 +160,20 @@ public void onClosed(DSISubscription subscription) {
160160
public void onEvent(DSEvent event, DSNode node, DSInfo child, DSIValue data) {
161161
switch (event.getEventId()) {
162162
case DSNode.CHILD_ADDED:
163-
added(child.getName(), child);
163+
update(child.getName(), child);
164164
break;
165165
case DSNode.CHILD_RENAMED:
166-
removed(data.toString());
167-
added(child.getName(), child);
166+
remove(data.toString());
167+
update(child.getName(), child);
168168
break;
169169
case DSNode.CHILD_REMOVED:
170-
removed(child.getName());
170+
remove(child.getName());
171171
break;
172172
}
173173
}
174174

175175
@Override
176-
public void removed(String name) {
176+
public void remove(String name) {
177177
if (!isClosed()) {
178178
enqueue(new RemoveUpdate(name));
179179
}

0 commit comments

Comments
 (0)