Skip to content

Fixed busy waiting in points 1,2,3,7 #3101

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
f20a582
Fixed busy waiting in points 1,2,3,7
alhusseain Nov 30, 2024
58e5c62
Fixed busy waiting in points 1,2,3,7 (1)
alhusseain Nov 30, 2024
9415119
Checkstyle fix try 1
alhusseain Nov 30, 2024
42e4c32
Checkstyle fix try 2
alhusseain Nov 30, 2024
27325f2
Checkstyle fix try 3
alhusseain Nov 30, 2024
2109002
SonarQube fix try 1
alhusseain Nov 30, 2024
992d899
SonarQube fix try 2
alhusseain Nov 30, 2024
ec0d423
SonarQube fix try 3
alhusseain Nov 30, 2024
ffd4773
Merge branch 'master' into busy-looping---Alhusseain
alhusseain Dec 1, 2024
d84dc96
SonarQube fix try 4 (Reliability and coverage in twin)
alhusseain Dec 1, 2024
a8fb4b2
Merge remote-tracking branch 'origin/busy-looping---Alhusseain' into …
alhusseain Dec 1, 2024
fca409f
Add Expiration Task state test (should be waiting at the start)
alhusseain Dec 5, 2024
986130a
Add Expiration Task state test (should be waiting at the start)
alhusseain Dec 5, 2024
e31b12b
Add Expiration Task state test (should be sleeping after it is woken)
alhusseain Dec 7, 2024
7be96ca
Add Expiration Task state test (should be sleeping after it is woken)
alhusseain Dec 7, 2024
30d6eb3
Add BallThread resume and suspend state test
alhusseain Dec 7, 2024
d0e0d87
Add Service Executor Start and Wake State Test
alhusseain Dec 7, 2024
4751cb8
Add Service Executor Start and Wake State Test
alhusseain Dec 7, 2024
7afa9c8
LogoutHandlerTest correct
alhusseain Dec 7, 2024
b5073d6
LogoutHandlerTest correct
alhusseain Dec 25, 2024
18a4c02
Merge branch 'master' into busy-looping---Alhusseain
alhusseain Dec 25, 2024
1c04548
implemented review changes
alhusseain Feb 18, 2025
fa902ad
Merge remote-tracking branch 'origin/busy-looping---Alhusseain' into …
alhusseain Feb 18, 2025
dbf6a7b
implemented review changes
alhusseain Feb 18, 2025
9671120
Merge branch 'master' into busy-looping---Alhusseain
alhusseain Feb 18, 2025
8407245
implemented review changes
alhusseain Feb 18, 2025
e4d0f53
Merge remote-tracking branch 'origin/busy-looping---Alhusseain' into …
alhusseain Feb 18, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -46,6 +47,7 @@ public class LogAggregator {
private final ConcurrentLinkedQueue<LogEntry> buffer = new ConcurrentLinkedQueue<>();
private final LogLevel minLogLevel;
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final Object bufferWait = new Object();
private final AtomicInteger logCount = new AtomicInteger(0);

/**
Expand Down Expand Up @@ -77,6 +79,7 @@ public void collectLog(LogEntry logEntry) {
}

buffer.offer(logEntry);
bufferWake();

if (logCount.incrementAndGet() >= BUFFER_THRESHOLD) {
flushBuffer();
Expand Down Expand Up @@ -106,15 +109,29 @@ private void flushBuffer() {
}

private void startBufferFlusher() {
executorService.execute(() -> {
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(5000); // Flush every 5 seconds.
synchronized (bufferWait) {
if (buffer.isEmpty()) {
bufferWait.wait();
}
}
flushBuffer();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
}, 5, 5, TimeUnit.SECONDS);
}
}

/**
* Wakes up buffer.
*/
public void bufferWake() {
synchronized (bufferWait) {
bufferWait.notifyAll();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@
*/
package com.iluwatar.logaggregation;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.time.LocalDateTime;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -88,7 +89,8 @@ public static void main(String[] args) {

// Create e service which should process the submitted jobs.
final var srvRunnable = new ServiceExecutor(msgQueue);

ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(srvRunnable, 1, 1, TimeUnit.SECONDS);
// Create a ThreadPool of 2 threads and
// submit all Runnable task for execution to executor
executor = Executors.newFixedThreadPool(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
public class MessageQueue {

private final BlockingQueue<Message> blkQueue;
public final Object serviceExecutorWait = new Object();

// Default constructor when called creates Blocking Queue object.
public MessageQueue() {
Expand All @@ -50,6 +51,9 @@ public void submitMsg(Message msg) {
try {
if (null != msg) {
blkQueue.add(msg);
synchronized (serviceExecutorWait) {
serviceExecutorWait.notifyAll();
}
}
} catch (Exception e) {
LOGGER.error(e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@
*/
@Slf4j
public class ServiceExecutor implements Runnable {

private final MessageQueue msgQueue;

public ServiceExecutor(MessageQueue msgQueue) {
this.msgQueue = msgQueue;
}
Expand All @@ -51,9 +49,10 @@ public void run() {
LOGGER.info(msg + " is served.");
} else {
LOGGER.info("Service Executor: Waiting for Messages to serve .. ");
synchronized (msgQueue.serviceExecutorWait) {
msgQueue.serviceExecutorWait.wait();
}
}

Thread.sleep(1000);
}
} catch (Exception e) {
LOGGER.error(e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,8 @@ public void run() {
try {
while (count > 0) {
var statusMsg = "Message-" + count + " submitted by " + Thread.currentThread().getName();
this.submit(new Message(statusMsg));

LOGGER.info(statusMsg);
this.submit(new Message(statusMsg));

// reduce the message count.
count--;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,19 @@
*/
package com.iluwatar.queue.load.leveling;

import static java.util.concurrent.CompletableFuture.anyOf;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;

/**
* Test case for submitting Message to Blocking Queue by TaskGenerator and retrieve the message by
* ServiceExecutor.
*/
@Slf4j
class TaskGenSrvExeTest {

@Test
Expand All @@ -53,4 +58,37 @@ void taskGeneratorTest() {
assertNotNull(srvExeThr);
}

/**
* Tests that service executor waits at start since no message is sent to execute upon.
* @throws InterruptedException
*/
@Test
void serviceExecutorStartStateTest() throws InterruptedException {
var msgQueue = new MessageQueue();
var srvRunnable = new ServiceExecutor(msgQueue);
var srvExeThr = new Thread(srvRunnable);
srvExeThr.start();
Thread.sleep(200); // sleep a little until service executor thread waits
LOGGER.info("Current Service Executor State: " + srvExeThr.getState());
assertEquals(srvExeThr.getState(), Thread.State.WAITING);

}

@Test
void serviceExecutorWakeStateTest() throws InterruptedException {
var msgQueue = new MessageQueue();
var srvRunnable = new ServiceExecutor(msgQueue);
var srvExeThr = new Thread(srvRunnable);
srvExeThr.start();
Thread.sleep(200); // sleep a little until service executor thread waits
synchronized (msgQueue.serviceExecutorWait){
msgQueue.serviceExecutorWait.notifyAll();
}
var srvExeState = srvExeThr.getState();
LOGGER.info("Current Service Executor State: " + srvExeState);
// assert that state changes from waiting
assertTrue(srvExeState != Thread.State.WAITING);

}

}
39 changes: 34 additions & 5 deletions server-session/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,26 +47,43 @@ The `main` application starts a server and assigns handlers to manage login and
```java
public class App {

private static Map<String, Integer> sessions = new HashMap<>();
private static Map<String, Instant> sessionCreationTimes = new HashMap<>();
private static final long SESSION_EXPIRATION_TIME = 10000;
// Map to store session data (simulated using a HashMap)

private static Map<String, Integer> sessions = new HashMap<String,Integer>();
private static Map<String, Instant> sessionCreationTimes = new HashMap<String,Instant>();
private static final long SESSION_EXPIRATION_TIME = 10000;
private static Object sessionExpirationWait=new Object(); // used to make expiration task wait or work based on event (login request sent or not)
public static void main(String[] args) throws IOException {
// Create HTTP server listening on port 8000
HttpServer server = HttpServer.create(new InetSocketAddress(8080), 0);

// Set up session management endpoints
server.createContext("/login", new LoginHandler(sessions, sessionCreationTimes));
server.createContext("/logout", new LogoutHandler(sessions, sessionCreationTimes));

// Start the server
server.start();

// Start background task to check for expired sessions
sessionExpirationTask();

LOGGER.info("Server started. Listening on port 8080...");
}

private static void sessionExpirationTask() {
new Thread(() -> {
new Thread(() -> {
while (true) {
try {
Thread.sleep(SESSION_EXPIRATION_TIME);
synchronized (sessions)
{
if(sessions.isEmpty())
synchronized (sessionExpirationWait)
{
sessionExpirationWait.wait(); // Make Session expiration Checker wait until at least a single login request is sent.
}
}
LOGGER.info("Session expiration checker started...");
Thread.sleep(SESSION_EXPIRATION_TIME); // Sleep for expiration time
Instant currentTime = Instant.now();
synchronized (sessions) {
synchronized (sessionCreationTimes) {
Expand All @@ -75,18 +92,30 @@ public class App {
while (iterator.hasNext()) {
Map.Entry<String, Instant> entry = iterator.next();
if (entry.getValue().plusMillis(SESSION_EXPIRATION_TIME).isBefore(currentTime)) {
LOGGER.info("User " + entry.getValue() + " removed");
sessions.remove(entry.getKey());
iterator.remove();
}
}
}
}
LOGGER.info("Session expiration checker finished!");
} catch (InterruptedException e) {
LOGGER.error("An error occurred: ", e);
Thread.currentThread().interrupt();
}
}
}).start();
}

public static void expirationTaskWake() //Wake up sleeping Expiration task thread
{
synchronized (sessionExpirationWait)
{
sessionExpirationWait.notify();
}
}

}
```

Expand Down
1 change: 0 additions & 1 deletion server-session/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,4 @@
<scope>test</scope>
</dependency>
</dependencies>

</project>
48 changes: 24 additions & 24 deletions server-session/src/main/java/com/iluwatar/sessionserver/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,12 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;


/**
* The server session pattern is a behavioral design pattern concerned with assigning the responsibility
* of storing session data on the server side. Within the context of stateless protocols like HTTP all
Expand All @@ -54,10 +58,12 @@
public class App {

// Map to store session data (simulated using a HashMap)

private static Map<String, Integer> sessions = new HashMap<>();
private static Map<String, Instant> sessionCreationTimes = new HashMap<>();
private static final long SESSION_EXPIRATION_TIME = 10000;


/**
* Main entry point.
* @param args arguments
Expand All @@ -75,37 +81,31 @@ public static void main(String[] args) throws IOException {
server.start();

// Start background task to check for expired sessions
sessionExpirationTask();

LOGGER.info("Server started. Listening on port 8080...");

ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(App::sessionExpirationTask, SESSION_EXPIRATION_TIME, SESSION_EXPIRATION_TIME, TimeUnit.MILLISECONDS);
}

private static void sessionExpirationTask() {
new Thread(() -> {
while (true) {
try {
LOGGER.info("Session expiration checker started...");
Thread.sleep(SESSION_EXPIRATION_TIME); // Sleep for expiration time
Instant currentTime = Instant.now();
synchronized (sessions) {
synchronized (sessionCreationTimes) {
Iterator<Map.Entry<String, Instant>> iterator =
sessionCreationTimes.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, Instant> entry = iterator.next();
if (entry.getValue().plusMillis(SESSION_EXPIRATION_TIME).isBefore(currentTime)) {
sessions.remove(entry.getKey());
iterator.remove();
}
}
}
LOGGER.info("Session expiration checker started...");
Instant currentTime = Instant.now();
synchronized (sessions) {
synchronized (sessionCreationTimes) {
Iterator<Map.Entry<String, Instant>> iterator =
sessionCreationTimes.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, Instant> entry = iterator.next();
if (entry.getValue().plusMillis(SESSION_EXPIRATION_TIME).isBefore(currentTime)) {
LOGGER.info("User " + entry.getValue() + " removed");
sessions.remove(entry.getKey());
iterator.remove();
}
LOGGER.info("Session expiration checker finished!");
} catch (InterruptedException e) {
LOGGER.error("An error occurred: ", e);
Thread.currentThread().interrupt();
}
}
}).start();
}
LOGGER.info("Session expiration checker finished!");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ public class LoginHandler implements HttpHandler {
private Map<String, Integer> sessions;
private Map<String, Instant> sessionCreationTimes;

/**
* Handles new login requests.
*/
public LoginHandler(Map<String, Integer> sessions, Map<String, Instant> sessionCreationTimes) {
this.sessions = sessions;
this.sessionCreationTimes = sessionCreationTimes;
Expand All @@ -60,7 +63,6 @@ public void handle(HttpExchange exchange) {

// Set session ID as cookie
exchange.getResponseHeaders().add("Set-Cookie", "sessionID=" + sessionId);

// Send response
String response = "Login successful!\n" + "Session ID: " + sessionId;
try {
Expand Down
Loading
Loading