Skip to content

Commit 8f7c18d

Browse files
authored
Merge pull request #61 from iot-dsa-v2/develop
0.34.0
2 parents cc0ebeb + 65b6c8c commit 8f7c18d

File tree

3 files changed

+28
-40
lines changed

3 files changed

+28
-40
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
[![](https://jitpack.io/v/iot-dsa-v2/sdk-dslink-java-v2.svg)](https://jitpack.io/#iot-dsa-v2/sdk-dslink-java-v2)
33

44
* [Developer Guide](https://iot-dsa-v2.github.io/sdk-dslink-java-v2/)
5-
* [Javadoc](https://iot-dsa-v2.github.io/sdk-dslink-java-v2/javadoc/)
5+
* [Javadoc](https://jitpack.io/com/github/iot-dsa-v2/sdk-dslink-java-v2/dslink-v2/master-SNAPSHOT/javadoc/)
66
* JDK 1.6+
77

88

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ subprojects {
44
apply plugin: 'maven'
55

66
group 'org.iot-dsa'
7-
version '0.33.0'
7+
version '0.34.0'
88

99
sourceCompatibility = 1.6
1010
targetCompatibility = 1.6

dslink-v2/src/main/java/com/acuity/iot/dsa/dslink/protocol/DSSession.java

Lines changed: 26 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,7 @@
44
import com.acuity.iot.dsa.dslink.protocol.responder.DSResponder;
55
import com.acuity.iot.dsa.dslink.transport.DSTransport;
66
import java.io.IOException;
7-
import java.util.LinkedList;
8-
import java.util.List;
7+
import java.util.concurrent.ConcurrentLinkedQueue;
98
import org.iot.dsa.conn.DSConnection;
109
import org.iot.dsa.conn.DSIConnected;
1110
import org.iot.dsa.dslink.DSIRequester;
@@ -49,8 +48,8 @@ public abstract class DSSession extends DSNode implements DSIConnected {
4948
private int messageId = 0;
5049
private int nextMessage = 1;
5150
private final Object outgoingMutex = new Object();
52-
private List<OutboundMessage> outgoingRequests = new LinkedList<OutboundMessage>();
53-
private List<OutboundMessage> outgoingResponses = new LinkedList<OutboundMessage>();
51+
private ConcurrentLinkedQueue<OutboundMessage> outgoingRequests = new ConcurrentLinkedQueue<OutboundMessage>();
52+
private ConcurrentLinkedQueue<OutboundMessage> outgoingResponses = new ConcurrentLinkedQueue<OutboundMessage>();
5453
private DSInfo requesterAllowed = getInfo(REQUESTER_ALLOWED);
5554
private ReadThread readThread;
5655
private WriteThread writeThread;
@@ -78,10 +77,8 @@ public void enqueueOutgoingRequest(OutboundMessage arg) {
7877
if (!isRequesterAllowed()) {
7978
throw new IllegalStateException("Requester not allowed");
8079
}
81-
synchronized (outgoingMutex) {
82-
outgoingRequests.add(arg);
83-
outgoingMutex.notify();
84-
}
80+
outgoingRequests.add(arg);
81+
notifyOutgoing();
8582
}
8683
}
8784

@@ -90,10 +87,8 @@ public void enqueueOutgoingRequest(OutboundMessage arg) {
9087
*/
9188
public void enqueueOutgoingResponse(OutboundMessage arg) {
9289
if (connected) {
93-
synchronized (outgoingMutex) {
94-
outgoingResponses.add(arg);
95-
outgoingMutex.notify();
96-
}
90+
outgoingResponses.add(arg);
91+
notifyOutgoing();
9792
}
9893
}
9994

@@ -175,24 +170,14 @@ protected void declareDefaults() {
175170
* Can return null.
176171
*/
177172
protected OutboundMessage dequeueOutgoingRequest() {
178-
synchronized (outgoingMutex) {
179-
if (!outgoingRequests.isEmpty()) {
180-
return outgoingRequests.remove(0);
181-
}
182-
}
183-
return null;
173+
return outgoingRequests.poll();
184174
}
185175

186176
/**
187177
* Can return null.
188178
*/
189179
protected OutboundMessage dequeueOutgoingResponse() {
190-
synchronized (outgoingMutex) {
191-
if (!outgoingResponses.isEmpty()) {
192-
return outgoingResponses.remove(0);
193-
}
194-
}
195-
return null;
180+
return outgoingResponses.poll();
196181
}
197182

198183
/**
@@ -247,10 +232,18 @@ protected boolean hasSomethingToSend() {
247232
return false;
248233
}
249234
if (!outgoingResponses.isEmpty()) {
250-
return true;
235+
for (OutboundMessage msg : outgoingResponses) {
236+
if (msg.canWrite(this)) {
237+
return true;
238+
}
239+
}
251240
}
252241
if (!outgoingRequests.isEmpty()) {
253-
return true;
242+
for (OutboundMessage msg : outgoingRequests) {
243+
if (msg.canWrite(this)) {
244+
return true;
245+
}
246+
}
254247
}
255248
return false;
256249
}
@@ -289,11 +282,9 @@ protected void onConnected() {
289282
* Clear the outgoing queues and waits for the the read and write threads to exit.
290283
*/
291284
protected void onDisconnected() {
292-
synchronized (outgoingMutex) {
293-
outgoingRequests.clear();
294-
outgoingResponses.clear();
295-
outgoingMutex.notifyAll();
296-
}
285+
outgoingRequests.clear();
286+
outgoingResponses.clear();
287+
notifyOutgoing();
297288
try {
298289
readThread.join();
299290
} catch (Exception x) {
@@ -322,15 +313,11 @@ protected void onDisconnecting() {
322313
}
323314

324315
protected void requeueOutgoingRequest(OutboundMessage arg) {
325-
synchronized (outgoingMutex) {
326-
outgoingRequests.add(arg);
327-
}
316+
outgoingRequests.add(arg);
328317
}
329318

330319
protected void requeueOutgoingResponse(OutboundMessage arg) {
331-
synchronized (outgoingMutex) {
332-
outgoingResponses.add(arg);
333-
}
320+
outgoingResponses.add(arg);
334321
}
335322

336323
/**
@@ -396,7 +383,8 @@ private void waitForAcks(long timeout) {
396383
warn(getPath(), x);
397384
}
398385
if ((System.currentTimeMillis() - start) > timeout) {
399-
debug(debug() ? String.format("witForAcks timeout (%s / %s)",ackRcvd,messageId)
386+
debug(debug() ? String
387+
.format("waitForAcks timeout (%s / %s)", ackRcvd, messageId)
400388
: null);
401389
break;
402390
}

0 commit comments

Comments
 (0)