Skip to content

Commit 6b71b77

Browse files
committed
Add cleanup of expired permits
Signed-off-by: Gerben Kroes <gerben@kroesctrl.nl>
1 parent 6047b1a commit 6b71b77

File tree

12 files changed

+273
-91
lines changed

12 files changed

+273
-91
lines changed

osgp/protocol-adapter-dlms/osgp-protocol-adapter-dlms/src/main/java/org/opensmartgridplatform/adapter/protocol/dlms/application/threads/RecoverKeyProcess.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ private boolean canConnectUsingNewKeys(final DlmsDevice device) {
198198
}
199199

200200
permit =
201-
this.throttlingService.openConnection(
201+
this.throttlingService.requestPermit(
202202
this.messageMetadata.getBaseTransceiverStationId(), this.messageMetadata.getCellId());
203203

204204
if (device.needsInvocationCounter()) {
@@ -233,7 +233,7 @@ private boolean canConnectUsingNewKeys(final DlmsDevice device) {
233233
}
234234
}
235235

236-
this.throttlingService.closeConnection(permit);
236+
this.throttlingService.releasePermit(permit);
237237

238238
if (dlmsMessageListener != null) {
239239
final int numberOfSentMessages = dlmsMessageListener.getNumberOfSentMessages();

osgp/protocol-adapter-dlms/osgp-protocol-adapter-dlms/src/main/java/org/opensmartgridplatform/adapter/protocol/dlms/application/throttling/DisabledThrottlingServiceImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,13 @@
1818
public class DisabledThrottlingServiceImpl implements ThrottlingService {
1919

2020
@Override
21-
public Permit openConnection(final Integer baseTransceiverStationId, final Integer cellId) {
21+
public Permit requestPermit(final Integer baseTransceiverStationId, final Integer cellId) {
2222
log.debug("Throttling is disabled, do nothing on openConnection");
2323
return null;
2424
}
2525

2626
@Override
27-
public void closeConnection(final Permit permit) {
27+
public void releasePermit(final Permit permit) {
2828
log.debug("Throttling is disabled, do nothing on closeConnection");
2929
}
3030
}

osgp/protocol-adapter-dlms/osgp-protocol-adapter-dlms/src/main/java/org/opensmartgridplatform/adapter/protocol/dlms/application/throttling/LocalThrottlingServiceImpl.java

Lines changed: 94 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,19 @@
66

77
package org.opensmartgridplatform.adapter.protocol.dlms.application.throttling;
88

9+
import java.time.Duration;
10+
import java.time.Instant;
11+
import java.util.Map.Entry;
912
import java.util.Timer;
1013
import java.util.TimerTask;
14+
import java.util.concurrent.ConcurrentHashMap;
1115
import java.util.concurrent.Semaphore;
16+
import java.util.concurrent.atomic.AtomicInteger;
1217
import java.util.concurrent.locks.ReentrantLock;
1318
import javax.annotation.PostConstruct;
1419
import javax.annotation.PreDestroy;
1520
import org.opensmartgridplatform.adapter.protocol.dlms.application.config.annotation.LocalThrottlingServiceCondition;
21+
import org.opensmartgridplatform.throttling.ThrottlingPermitDeniedException;
1622
import org.opensmartgridplatform.throttling.api.Permit;
1723
import org.slf4j.Logger;
1824
import org.slf4j.LoggerFactory;
@@ -23,6 +29,7 @@
2329
@Component
2430
@Conditional(LocalThrottlingServiceCondition.class)
2531
public class LocalThrottlingServiceImpl implements ThrottlingService {
32+
private final AtomicInteger requestIdCounter = new AtomicInteger(0);
2633

2734
private static final Logger LOGGER = LoggerFactory.getLogger(LocalThrottlingServiceImpl.class);
2835

@@ -35,34 +42,54 @@ public class LocalThrottlingServiceImpl implements ThrottlingService {
3542
@Value("${throttling.reset.time}")
3643
private int resetTime;
3744

45+
@Value("${cleanup.permits.interval}")
46+
private int cleanupExpiredPermitsInterval;
47+
48+
@Value("#{T(java.time.Duration).parse('${cleanup.permits.time-to-live:PT1H}')}")
49+
private Duration timeToLive;
50+
3851
private Semaphore openConnectionsSemaphore;
3952
private Semaphore newConnectionRequestsSemaphore;
40-
private Timer resetTimer;
53+
private Timer resetNewConnectionRequestsTimer;
54+
private Timer cleanupExpiredPermitsTimer;
4155
private ReentrantLock resetTimerLock;
4256

57+
private ConcurrentHashMap<Integer, Permit> permitsByRequestId;
58+
4359
@PostConstruct
4460
public void postConstruct() {
45-
4661
this.openConnectionsSemaphore = new Semaphore(this.maxOpenConnections);
4762
this.newConnectionRequestsSemaphore = new Semaphore(this.maxNewConnectionRequests);
4863

4964
this.resetTimerLock = new ReentrantLock();
5065

51-
this.resetTimer = new Timer();
52-
this.resetTimer.scheduleAtFixedRate(new ResetTask(), this.resetTime, this.resetTime);
66+
this.resetNewConnectionRequestsTimer = new Timer();
67+
this.resetNewConnectionRequestsTimer.scheduleAtFixedRate(
68+
new ResetNewConnectionRequestsTask(), this.resetTime, this.resetTime);
69+
70+
this.cleanupExpiredPermitsTimer = new Timer();
71+
this.cleanupExpiredPermitsTimer.scheduleAtFixedRate(
72+
new CleanupExpiredPermitsTask(),
73+
this.cleanupExpiredPermitsInterval,
74+
this.cleanupExpiredPermitsInterval);
75+
76+
this.permitsByRequestId = new ConcurrentHashMap<>();
5377

5478
LOGGER.info("Initialized ThrottlingService. {}", this);
5579
}
5680

5781
@PreDestroy
5882
public void preDestroy() {
59-
if (this.resetTimer != null) {
60-
this.resetTimer.cancel();
83+
if (this.resetNewConnectionRequestsTimer != null) {
84+
this.resetNewConnectionRequestsTimer.cancel();
85+
}
86+
if (this.cleanupExpiredPermitsTimer != null) {
87+
this.cleanupExpiredPermitsTimer.cancel();
6188
}
6289
}
6390

6491
@Override
65-
public Permit openConnection(final Integer baseTransceiverStationId, final Integer cellId) {
92+
public Permit requestPermit(final Integer baseTransceiverStationId, final Integer cellId) {
6693

6794
this.newConnectionRequest();
6895

@@ -71,6 +98,9 @@ public Permit openConnection(final Integer baseTransceiverStationId, final Integ
7198
this.openConnectionsSemaphore.availablePermits());
7299

73100
try {
101+
if (this.openConnectionsSemaphore.availablePermits() == 0) {
102+
this.handlePermitDenied("Local: max open connections reached");
103+
}
74104
this.openConnectionsSemaphore.acquire();
75105
LOGGER.debug(
76106
"openConnection granted. available = {} ",
@@ -79,18 +109,25 @@ public Permit openConnection(final Integer baseTransceiverStationId, final Integ
79109
LOGGER.warn("Unable to acquire Open Connection", e);
80110
Thread.currentThread().interrupt();
81111
}
82-
return null;
112+
return this.createPermit();
83113
}
84114

85115
@Override
86-
public void closeConnection(final Permit permit) {
116+
public void releasePermit(final Permit permit) {
87117

88118
LOGGER.debug(
89-
"closeConnection(). available = {} ", this.openConnectionsSemaphore.availablePermits());
90-
this.openConnectionsSemaphore.release();
119+
"closeConnection(). available = {}", this.openConnectionsSemaphore.availablePermits());
120+
if (this.openConnectionsSemaphore.availablePermits() < this.maxOpenConnections) {
121+
this.openConnectionsSemaphore.release();
122+
}
123+
124+
this.permitsByRequestId.remove(permit.getRequestId());
91125
}
92126

93127
private void newConnectionRequest() {
128+
LOGGER.debug(
129+
"Await reset for newConnection. available = {} ",
130+
this.newConnectionRequestsSemaphore.availablePermits());
94131

95132
this.awaitReset();
96133

@@ -99,6 +136,9 @@ private void newConnectionRequest() {
99136
this.newConnectionRequestsSemaphore.availablePermits());
100137

101138
try {
139+
if (this.newConnectionRequestsSemaphore.availablePermits() == 0) {
140+
this.handlePermitDenied("Local: max new connection requests reached");
141+
}
102142
this.newConnectionRequestsSemaphore.acquire();
103143
LOGGER.debug(
104144
"Request newConnection granted. available = {} ",
@@ -112,6 +152,7 @@ private void newConnectionRequest() {
112152
private synchronized void awaitReset() {
113153
while (this.resetTimerLock.isLocked()) {
114154
try {
155+
LOGGER.info("Wait {}ms while reset timer is locked", this.resetTime);
115156
this.resetTimerLock.wait(this.resetTime);
116157
} catch (final InterruptedException e) {
117158
LOGGER.warn("Unable to acquire New Connection Request Lock", e);
@@ -120,7 +161,7 @@ private synchronized void awaitReset() {
120161
}
121162
}
122163

123-
private class ResetTask extends TimerTask {
164+
private class ResetNewConnectionRequestsTask extends TimerTask {
124165

125166
@Override
126167
public void run() {
@@ -147,10 +188,51 @@ public void run() {
147188
}
148189
}
149190

191+
private class CleanupExpiredPermitsTask extends TimerTask {
192+
193+
@Override
194+
public void run() {
195+
final Instant createdAtBefore =
196+
Instant.now().minus(LocalThrottlingServiceImpl.this.timeToLive);
197+
try {
198+
LocalThrottlingServiceImpl.this.resetTimerLock.lock();
199+
200+
for (final Entry<Integer, Permit> permitForRequestId :
201+
LocalThrottlingServiceImpl.this.permitsByRequestId.entrySet()) {
202+
final Permit permit = permitForRequestId.getValue();
203+
if (permit.getCreatedAt().isBefore(createdAtBefore)) {
204+
LOGGER.warn("releasing expired permit: {}", permit);
205+
LocalThrottlingServiceImpl.this.releasePermit(permit);
206+
}
207+
}
208+
209+
LOGGER.debug(
210+
"ThrottlingService - Timer Reset and Unlocking, openConnections available = {} ",
211+
LocalThrottlingServiceImpl.this.openConnectionsSemaphore.availablePermits());
212+
} finally {
213+
LocalThrottlingServiceImpl.this.resetTimerLock.unlock();
214+
}
215+
}
216+
}
217+
150218
@Override
151219
public String toString() {
152220
return String.format(
153221
"ThrottlingService. maxOpenConnections = %d, maxNewConnectionRequests=%d, resetTime=%d",
154222
this.maxOpenConnections, this.maxNewConnectionRequests, this.resetTime);
155223
}
224+
225+
private void handlePermitDenied(final String message) {
226+
227+
throw new ThrottlingPermitDeniedException(message);
228+
}
229+
230+
private Permit createPermit() {
231+
final int requestId = this.requestIdCounter.incrementAndGet();
232+
233+
final Permit permit = new Permit(requestId);
234+
this.permitsByRequestId.put(permit.getRequestId(), permit);
235+
236+
return permit;
237+
}
156238
}

osgp/protocol-adapter-dlms/osgp-protocol-adapter-dlms/src/main/java/org/opensmartgridplatform/adapter/protocol/dlms/application/throttling/SharedThrottlingServiceImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,14 @@ public SharedThrottlingServiceImpl(final ThrottlingClientConfig throttlingClient
2323
}
2424

2525
@Override
26-
public Permit openConnection(final Integer baseTransceiverStationId, final Integer cellId) {
26+
public Permit requestPermit(final Integer baseTransceiverStationId, final Integer cellId) {
2727
return this.throttlingClientConfig
2828
.throttlingClient()
2929
.requestPermitUsingNetworkSegmentIfIdsAreAvailable(baseTransceiverStationId, cellId);
3030
}
3131

3232
@Override
33-
public void closeConnection(final Permit permit) {
33+
public void releasePermit(final Permit permit) {
3434
this.throttlingClientConfig.throttlingClient().releasePermit(permit);
3535
}
3636
}

osgp/protocol-adapter-dlms/osgp-protocol-adapter-dlms/src/main/java/org/opensmartgridplatform/adapter/protocol/dlms/application/throttling/ThrottlingService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
public interface ThrottlingService {
1212

13-
public Permit openConnection(final Integer baseTransceiverStationId, final Integer cellId);
13+
Permit requestPermit(final Integer baseTransceiverStationId, final Integer cellId);
1414

15-
public void closeConnection(Permit permit);
15+
void releasePermit(Permit permit);
1616
}

osgp/protocol-adapter-dlms/osgp-protocol-adapter-dlms/src/main/java/org/opensmartgridplatform/adapter/protocol/dlms/infra/messaging/DlmsConnectionMessageProcessor.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public void createAndHandleConnectionForDevice(
5757
throws OsgpException {
5858

5959
final Permit permit =
60-
this.throttlingService.openConnection(
60+
this.throttlingService.requestPermit(
6161
messageMetadata.getBaseTransceiverStationId(), messageMetadata.getCellId());
6262

6363
final DlmsMessageListener dlmsMessageListener =
@@ -84,7 +84,7 @@ public void createAndHandleConnectionForDevice(
8484
* DeviceRequestMessageProcessor.processMessageTasks(), where
8585
* this.doConnectionPostProcessing() is called in a finally block.
8686
*/
87-
this.throttlingService.closeConnection(permit);
87+
this.throttlingService.releasePermit(permit);
8888
throw e;
8989
}
9090
}
@@ -121,7 +121,7 @@ protected void doConnectionPostProcessing(
121121

122122
this.setClosingDlmsConnectionMessageListener(device, conn);
123123

124-
this.throttlingService.closeConnection(conn.getPermit());
124+
this.throttlingService.releasePermit(conn.getPermit());
125125

126126
if (device.needsInvocationCounter()) {
127127
this.updateInvocationCounterForDevice(device, conn);
@@ -159,6 +159,7 @@ void updateInvocationCounterForDevice(final DlmsDevice device, final DlmsConnect
159159
this.deviceRepository.updateInvocationCounter(
160160
device.getDeviceIdentification(), device.getInvocationCounter());
161161
}
162+
162163
/**
163164
* @param logger the logger from the calling subClass
164165
* @param exception the exception to be logged

osgp/protocol-adapter-dlms/osgp-protocol-adapter-dlms/src/main/resources/osgp-adapter-protocol-dlms.properties

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,13 @@ throttling.max.open.connections=1000
197197
throttling.max.new.connection.requests=30
198198
throttling.reset.time=1000
199199

200+
# Any permits that have not been released or discarded will be deleted after a certain period of
201+
# time, in order to prevent permits being kept in case a throttling client crashed.
202+
cleanup.permits.time-to-live=PT1H
203+
# The task to clean up permits that exceeded their time-to-live is executed by fixed rate.
204+
# cleanup.permits.interval is in ms
205+
cleanup.permits.interval=1800000
206+
200207
udp.channel=inboundChannel
201208
udp.port=9598
202209

osgp/protocol-adapter-dlms/osgp-protocol-adapter-dlms/src/test/java/org/opensmartgridplatform/adapter/protocol/dlms/application/threads/RecoverKeyProcessTest.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ void testWhenHasNoNewKeysToConnectWith() throws OsgpException, IOException {
9696
verify(this.secretManagementService, times(1))
9797
.hasNewSecret(this.messageMetadata, DEVICE_IDENTIFICATION);
9898
verify(this.domainHelperService).findDlmsDevice(DEVICE_IDENTIFICATION);
99-
verify(this.throttlingService, never()).openConnection(btsId, cellId);
99+
verify(this.throttlingService, never()).requestPermit(btsId, cellId);
100100
}
101101

102102
@Test
@@ -139,17 +139,17 @@ void testThrottlingServiceCalledAndKeysActivated() throws Exception {
139139

140140
when(this.messageMetadata.getBaseTransceiverStationId()).thenReturn(btsId);
141141
when(this.messageMetadata.getCellId()).thenReturn(cellId);
142-
when(this.throttlingService.openConnection(btsId, cellId)).thenReturn(permit);
142+
when(this.throttlingService.requestPermit(btsId, cellId)).thenReturn(permit);
143143

144144
this.recoverKeyProcess.run();
145145

146146
final InOrder inOrder = inOrder(this.throttlingService, this.hls5Connector);
147147

148-
inOrder.verify(this.throttlingService).openConnection(btsId, cellId);
148+
inOrder.verify(this.throttlingService).requestPermit(btsId, cellId);
149149
inOrder
150150
.verify(this.hls5Connector)
151151
.connectUnchecked(eq(this.messageMetadata), eq(this.dlmsDevice), any(), any());
152-
inOrder.verify(this.throttlingService).closeConnection(permit);
152+
inOrder.verify(this.throttlingService).releasePermit(permit);
153153

154154
verify(this.secretManagementService)
155155
.activateNewKeys(
@@ -173,7 +173,7 @@ void testWhenConnectionFailedThenConnectionClosedAtThrottlingService() throws Ex
173173

174174
when(this.messageMetadata.getBaseTransceiverStationId()).thenReturn(btsId);
175175
when(this.messageMetadata.getCellId()).thenReturn(cellId);
176-
when(this.throttlingService.openConnection(btsId, cellId)).thenReturn(permit);
176+
when(this.throttlingService.requestPermit(btsId, cellId)).thenReturn(permit);
177177

178178
when(this.secretManagementService.hasNewSecret(
179179
eq(this.messageMetadata), eq(DEVICE_IDENTIFICATION)))
@@ -183,9 +183,9 @@ void testWhenConnectionFailedThenConnectionClosedAtThrottlingService() throws Ex
183183

184184
final InOrder inOrder = inOrder(this.throttlingService, this.hls5Connector);
185185

186-
inOrder.verify(this.throttlingService).openConnection(btsId, cellId);
186+
inOrder.verify(this.throttlingService).requestPermit(btsId, cellId);
187187
inOrder.verify(this.hls5Connector).connectUnchecked(any(), any(), any(), any());
188-
inOrder.verify(this.throttlingService).closeConnection(permit);
188+
inOrder.verify(this.throttlingService).releasePermit(permit);
189189

190190
verify(this.secretManagementService, never()).activateNewKeys(any(), any(), any());
191191
}

0 commit comments

Comments
 (0)