Skip to content

Commit 2945b30

Browse files
author
Aaron
committed
Refactor the Simple requester implementations.
Fix some message pack issues. More unit tests.
1 parent 6acece9 commit 2945b30

29 files changed

+578
-301
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ subprojects {
55
apply plugin: 'maven'
66

77
group 'org.iot-dsa'
8-
version '0.53.1a2'
8+
version '0.54.0'
99

1010
sourceCompatibility = 1.8
1111
targetCompatibility = 1.8

dslink-v2-poc/src/main/java/org/iot/dsa/dslink/poc/MainNode.java

Lines changed: 0 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -293,45 +293,6 @@ public enum MyEnum {
293293
// Inner Classes
294294
///////////////////////////////////////////////////////////////////////////
295295

296-
private class ListHandler implements OutboundListHandler {
297-
298-
OutboundStream stream;
299-
300-
@Override
301-
public void onClose() {
302-
System.out.println("list closed");
303-
}
304-
305-
@Override
306-
public void onError(ErrorType type, String msg) {
307-
System.out.println("list error " + type + ", " + msg);
308-
}
309-
310-
@Override
311-
public void onInit(String path, OutboundStream stream) {
312-
System.out.println("onInit");
313-
this.stream = stream;
314-
}
315-
316-
@Override
317-
public void onInitialized() {
318-
System.out.println("list initialized");
319-
stream.closeStream();
320-
}
321-
322-
@Override
323-
public void onRemove(String name) {
324-
System.out.println("list remove " + name);
325-
}
326-
327-
@Override
328-
public void onUpdate(String name, DSElement value) {
329-
System.out.print(name);
330-
System.out.print(": ");
331-
System.out.println(String.valueOf(value));
332-
}
333-
}
334-
335296
public static class TestNode extends DSNode {
336297

337298
public void declareDefaults() {

dslink-v2/src/main/java/com/acuity/iot/dsa/dslink/protocol/requester/DSOutboundSubscriptions.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.iot.dsa.io.DSIWriter;
1414
import org.iot.dsa.logging.DSLogger;
1515
import org.iot.dsa.node.DSElement;
16+
import org.iot.dsa.node.DSLong;
1617
import org.iot.dsa.node.DSNull;
1718
import org.iot.dsa.node.DSStatus;
1819
import org.iot.dsa.time.DSDateTime;
@@ -273,7 +274,7 @@ OutboundSubscribeHandler subscribe(String path, int qos, OutboundSubscribeHandle
273274
}
274275
}
275276
try {
276-
req.onInit(path, sub.getQos(), stub);
277+
req.onInit(path, DSLong.valueOf(sub.getQos()), stub);
277278
} catch (Exception x) {
278279
error(path, x);
279280
}

dslink-v2/src/main/java/com/acuity/iot/dsa/dslink/protocol/requester/DSRequester.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
import org.iot.dsa.dslink.requester.OutboundRequestHandler;
1515
import org.iot.dsa.dslink.requester.OutboundSubscribeHandler;
1616
import org.iot.dsa.node.DSIValue;
17+
import org.iot.dsa.node.DSInt;
18+
import org.iot.dsa.node.DSLong;
1719
import org.iot.dsa.node.DSMap;
1820
import org.iot.dsa.node.DSNode;
1921

@@ -69,7 +71,7 @@ public OutboundInvokeHandler invoke(String path, DSMap params, OutboundInvokeHan
6971
public OutboundListHandler list(String path, OutboundListHandler req) {
7072
DSOutboundListStub stub = makeList(path, req);
7173
requests.put(stub.getRequestId(), stub);
72-
req.onInit(path, stub);
74+
req.onInit(path, null, stub);
7375
session.enqueueOutgoingRequest(stub);
7476
return req;
7577
}

dslink-v2/src/main/java/com/acuity/iot/dsa/dslink/protocol/v2/requester/DS2OutboundListStub.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -33,36 +33,37 @@ public class DS2OutboundListStub extends DSOutboundListStub
3333
@Override
3434
public void handleResponse(DS2MessageReader response) {
3535
OutboundListHandler handler = getHandler();
36-
if (state == STS_INITIALIZING) {
37-
Byte status = (Byte) response.getHeader(MessageConstants.HDR_STATUS);
38-
if (status.byteValue() == STS_OK) {
39-
state = STS_OK;
40-
getHandler().onInitialized();
41-
}
42-
}
4336
try {
4437
MsgpackReader reader = response.getBodyReader();
4538
InputStream in = response.getBody();
4639
int bodyLen = response.getBodyLength();
4740
String name;
48-
DSElement value;
41+
DSElement value = null;
4942
while (bodyLen > 0) {
5043
int len = DSBytes.readShort(in, false);
44+
bodyLen -= len;
5145
name = reader.readUTF(len);
5246
len = DSBytes.readShort(in, false);
47+
bodyLen -= len;
48+
bodyLen -= 4; //the two lengths
5349
if (len == 0) {
5450
handler.onRemove(name);
5551
} else {
56-
bodyLen -= len;
5752
reader.reset();
5853
value = reader.getElement();
59-
bodyLen -= len;
6054
handler.onUpdate(DSPath.decodeName(name), value);
6155
}
6256
}
6357
} catch (IOException x) {
6458
DSException.throwRuntime(x);
6559
}
60+
if (state == STS_INITIALIZING) {
61+
Byte status = (Byte) response.getHeader(MessageConstants.HDR_STATUS);
62+
if (status.byteValue() == STS_OK) {
63+
state = STS_OK;
64+
getHandler().onInitialized();
65+
}
66+
}
6667
}
6768

6869
@Override

dslink-v2/src/main/java/org/iot/dsa/dslink/requester/AbstractInvokeHandler.java

Lines changed: 5 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -7,53 +7,17 @@
77
*
88
* @author Daniel Shapiro, Aaron Hansen
99
*/
10-
public abstract class AbstractInvokeHandler implements OutboundInvokeHandler {
11-
12-
///////////////////////////////////////////////////////////////////////////
13-
// Fields
14-
///////////////////////////////////////////////////////////////////////////
15-
16-
private DSMap params;
17-
private String path;
18-
private OutboundStream stream;
10+
public abstract class AbstractInvokeHandler
11+
extends AbstractRequestHandler
12+
implements OutboundInvokeHandler {
1913

2014
///////////////////////////////////////////////////////////////////////////
2115
// Methods
2216
///////////////////////////////////////////////////////////////////////////
2317

24-
/**
25-
* Returns the value passed to onInit.
26-
*/
27-
public DSMap getParams() {
28-
return params;
29-
}
30-
31-
/**
32-
* Returns the value passed to onInit.
33-
*/
34-
public String getPath() {
35-
return path;
36-
}
37-
38-
/**
39-
* Returns the value passed to onInit.
40-
*/
41-
public OutboundStream getStream() {
42-
return stream;
43-
}
44-
45-
/**
46-
* Sets the fields so they can be access via the corresponding getters.
47-
* <p>
48-
* <p>
49-
* <p>
50-
* {@inheritDoc}
51-
*/
5218
@Override
53-
public void onInit(String path, DSMap params, OutboundStream stream) {
54-
this.path = path;
55-
this.params = params;
56-
this.stream = stream;
19+
public DSMap getParams() {
20+
return (DSMap) super.getParams();
5721
}
5822

5923
}

dslink-v2/src/main/java/org/iot/dsa/dslink/requester/AbstractListHandler.java

Lines changed: 3 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -5,49 +5,12 @@
55
* <p>
66
* onUpdate will be called until the initial state is fully loaded. After which onInitialized will
77
* be called. After that, onUpdate and onRemove will be called for state changes.
8-
* <p>
9-
* Configs, or node metadata names start with $. Attributes start with @. Anything else represents
10-
* a child. Child maps will only contain configs/node metadata.
118
*
129
* @author Daniel Shapiro, Aaron Hansen
1310
*/
14-
public abstract class AbstractListHandler implements OutboundListHandler {
15-
16-
///////////////////////////////////////////////////////////////////////////
17-
// Fields
18-
///////////////////////////////////////////////////////////////////////////
19-
20-
private String path;
21-
private OutboundStream stream;
22-
23-
///////////////////////////////////////////////////////////////////////////
24-
// Methods
25-
///////////////////////////////////////////////////////////////////////////
26-
27-
/**
28-
* Returns the value passed to onInit.
29-
*/
30-
public String getPath() {
31-
return path;
32-
}
33-
34-
/**
35-
* Returns the value passed to onInit.
36-
*/
37-
public OutboundStream getStream() {
38-
return stream;
39-
}
40-
41-
/**
42-
* Sets the fields so they can be accessed with the corresponding getters.
43-
* <p>
44-
* {@inheritDoc}
45-
*/
46-
@Override
47-
public void onInit(String path, OutboundStream stream) {
48-
this.path = path;
49-
this.stream = stream;
50-
}
11+
public abstract class AbstractListHandler
12+
extends AbstractRequestHandler
13+
implements OutboundListHandler {
5114

5215
}
5316

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
package org.iot.dsa.dslink.requester;
2+
3+
import org.iot.dsa.node.DSIValue;
4+
5+
/**
6+
* Convenience base type for all callbacks passed to requester method invocations.
7+
*
8+
* @author Daniel Shapiro, Aaron Hansen
9+
*/
10+
public abstract class AbstractRequestHandler implements OutboundRequestHandler {
11+
12+
///////////////////////////////////////////////////////////////////////////
13+
// Class Fields
14+
///////////////////////////////////////////////////////////////////////////
15+
16+
private boolean closed = false;
17+
private RuntimeException error;
18+
private DSIValue params;
19+
private String path;
20+
private OutboundStream stream;
21+
22+
///////////////////////////////////////////////////////////////////////////
23+
// Public Methods
24+
///////////////////////////////////////////////////////////////////////////
25+
26+
/**
27+
* Error msg passed to onError
28+
*/
29+
public RuntimeException getError() {
30+
return error;
31+
}
32+
33+
/**
34+
* Any parameters supplied to the requester method invocation and passed to onInit.
35+
*
36+
* @return Possibly null.
37+
*/
38+
public DSIValue getParams() {
39+
return params;
40+
}
41+
42+
/**
43+
* Path passed to onInit.
44+
*/
45+
public String getPath() {
46+
return path;
47+
}
48+
49+
/**
50+
* Stream passed to onInit.
51+
*/
52+
public OutboundStream getStream() {
53+
return stream;
54+
}
55+
56+
/**
57+
* True if onClose ws callsed.
58+
*/
59+
public boolean isClosed() {
60+
return closed;
61+
}
62+
63+
/**
64+
* True if onError was called.
65+
*/
66+
public boolean isError() {
67+
return error != null;
68+
}
69+
70+
@Override
71+
public synchronized void onClose() {
72+
closed = true;
73+
notifyAll();
74+
}
75+
76+
@Override
77+
public synchronized void onError(ErrorType type, String msg) {
78+
error = ErrorType.makeException(type, msg);
79+
notifyAll();
80+
}
81+
82+
@Override
83+
public void onInit(String path, DSIValue params, OutboundStream stream) {
84+
this.path = path;
85+
this.params = params;
86+
this.stream = stream;
87+
}
88+
89+
/**
90+
* Waits for any callback from the responder. Will return immediately if already closed.
91+
* Subclasses should synchronize responder callback methods and notifyAll() at the end
92+
* of the method.
93+
*
94+
* @param timeout Passed to Object.wait
95+
* @throws RuntimeException if there is an error with the invocation.
96+
* @throws IllegalStateException if there is a timeout, or if there are any errors.
97+
*/
98+
public void waitForCallback(long timeout) {
99+
synchronized (this) {
100+
if (!closed) {
101+
long end = System.currentTimeMillis() + timeout;
102+
try {
103+
wait(timeout);
104+
} catch (Exception x) {
105+
}
106+
if (isError()) {
107+
throw getError();
108+
}
109+
if (System.currentTimeMillis() > end) {
110+
throw new IllegalStateException("Timed out");
111+
}
112+
}
113+
}
114+
}
115+
116+
/**
117+
* Waits for the stream to close or the timeout to occur.
118+
*
119+
* @param timeout Passed to Object.wait
120+
* @throws IllegalStateException if there is a timeout, or if there are any errors.
121+
*/
122+
public void waitForClose(long timeout) {
123+
long end = System.currentTimeMillis() + timeout;
124+
synchronized (this) {
125+
while (!closed) {
126+
try {
127+
wait(timeout);
128+
} catch (Exception x) {
129+
}
130+
if (isError()) {
131+
throw getError();
132+
}
133+
if (!closed && (System.currentTimeMillis() > end)) {
134+
throw new IllegalStateException("Timed out");
135+
}
136+
}
137+
}
138+
}
139+
140+
141+
}

0 commit comments

Comments
 (0)