Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,17 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

import static java.lang.Double.valueOf;
import static java.lang.Integer.parseInt;
import static java.lang.Thread.sleep;
import static java.time.LocalDateTime.now;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static java.util.concurrent.Executors.newFixedThreadPool;

public class ExecutorServiceRunner {
Expand All @@ -29,6 +33,7 @@ public class ExecutorServiceRunner {
private int numberOfThreads;
private int rampUpPeriod;
private int loopCount;
private int abortAfterTimeLapsedInSeconds;

private Double delayBetweenTwoThreadsInMilliSecs;

Expand All @@ -37,6 +42,7 @@ public ExecutorServiceRunner(String loadPropertiesFile) {
numberOfThreads = parseInt(properties.getProperty("number.of.threads"));
rampUpPeriod = parseInt(properties.getProperty("ramp.up.period.in.seconds"));
loopCount = parseInt(properties.getProperty("loop.count"));
abortAfterTimeLapsedInSeconds = parseInt(properties.getProperty("abort.after.time.lapsed.in.seconds"));

calculateAndSetDelayBetweenTwoThreadsInSecs(rampUpPeriod);

Expand All @@ -47,6 +53,17 @@ public ExecutorServiceRunner(int numberOfThreads, int loopCount, int rampUpPerio
this.numberOfThreads = numberOfThreads;
this.loopCount = loopCount;
this.rampUpPeriod = rampUpPeriod;
this.abortAfterTimeLapsedInSeconds = Integer.MAX_VALUE;

calculateAndSetDelayBetweenTwoThreadsInSecs(this.rampUpPeriod);
logLoadingProperties();
}

public ExecutorServiceRunner(int numberOfThreads, int loopCount, int rampUpPeriod, int abortAfterTimeLapsedInSeconds) {
this.numberOfThreads = numberOfThreads;
this.loopCount = loopCount;
this.rampUpPeriod = rampUpPeriod;
this.abortAfterTimeLapsedInSeconds = abortAfterTimeLapsedInSeconds;

calculateAndSetDelayBetweenTwoThreadsInSecs(this.rampUpPeriod);
logLoadingProperties();
Expand All @@ -64,16 +81,60 @@ public ExecutorServiceRunner addCallable(Callable callable) {


public void runRunnables() {
executeWithAbortTimeout(() -> {
if (runnables == null || runnables.size() == 0) {
throw new RuntimeException("No runnable(s) was found to run. You can add one or more runnables using 'addRunnable(Runnable runnable)'");
}

if (runnables == null || runnables.size() == 0) {
throw new RuntimeException("No runnable(s) was found to run. You can add one or more runnables using 'addRunnable(Runnable runnable)'");
}
ExecutorService executorService = newFixedThreadPool(numberOfThreads);

ExecutorService executorService = newFixedThreadPool(numberOfThreads);
try {
for (int i = 0; i < loopCount; i++) {
runnables.stream().forEach(thisFunction -> {
for (int j = 0; j < numberOfThreads; j++) {
try {
LOGGER.debug("Waiting for the next test flight to adjust the overall ramp up time, " +
"waiting time in the transit now = " + delayBetweenTwoThreadsInMilliSecs);
sleep(delayBetweenTwoThreadsInMilliSecs.longValue());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

try {
for (int i = 0; i < loopCount; i++) {
runnables.stream().forEach(thisFunction -> {
LOGGER.debug(Thread.currentThread().getName() + " Executor - *Start... Time = " + now());

executorService.execute(thisFunction);

LOGGER.debug(Thread.currentThread().getName() + " Executor - *Finished Time = " + now());
}
});
}
} catch (Exception interruptEx) {
throw new RuntimeException(interruptEx);
} finally {
executorService.shutdown();
while (!executorService.isTerminated()) {
// --------------------------------------
// wait for all tasks to finish execution
// --------------------------------------
//LOGGER.info("Still waiting for all threads to complete execution...");
}
LOGGER.debug("**Finished executing all threads**");
}
});
}

public void runRunnablesMulti() {
executeWithAbortTimeout(() -> {
if (runnables == null || runnables.size() == 0) {
throw new RuntimeException("No runnable(s) was found to run. You can add one or more runnables using 'addRunnable(Runnable runnable)'");
}

ExecutorService executorService = newFixedThreadPool(numberOfThreads);

try {
final AtomicInteger functionIndex = new AtomicInteger();

for (int i = 0; i < loopCount; i++) {
for (int j = 0; j < numberOfThreads; j++) {
try {
LOGGER.debug("Waiting for the next test flight to adjust the overall ramp up time, " +
Expand All @@ -85,113 +146,72 @@ public void runRunnables() {

LOGGER.debug(Thread.currentThread().getName() + " Executor - *Start... Time = " + now());

executorService.execute(thisFunction);
executorService.execute(runnables.get(functionIndex.getAndIncrement()));

LOGGER.debug(Thread.currentThread().getName() + " Executor - *Finished Time = " + now());
}
});
}
} catch (Exception interruptEx) {
throw new RuntimeException(interruptEx);
} finally {
executorService.shutdown();
while (!executorService.isTerminated()) {
// --------------------------------------
// wait for all tasks to finish execution
// --------------------------------------
//LOGGER.info("Still waiting for all threads to complete execution...");
}
LOGGER.debug("**Finished executing all threads**");
}
}

public void runRunnablesMulti() {
if (runnables == null || runnables.size() == 0) {
throw new RuntimeException("No runnable(s) was found to run. You can add one or more runnables using 'addRunnable(Runnable runnable)'");
}

ExecutorService executorService = newFixedThreadPool(numberOfThreads);

try {
final AtomicInteger functionIndex = new AtomicInteger();

for (int i = 0; i < loopCount; i++) {
for (int j = 0; j < numberOfThreads; j++) {
try {
LOGGER.debug("Waiting for the next test flight to adjust the overall ramp up time, " +
"waiting time in the transit now = " + delayBetweenTwoThreadsInMilliSecs);
sleep(delayBetweenTwoThreadsInMilliSecs.longValue());
} catch (InterruptedException e) {
throw new RuntimeException(e);
if (functionIndex.get() == runnables.size()) {
functionIndex.set(0);
}
}

LOGGER.debug(Thread.currentThread().getName() + " Executor - *Start... Time = " + now());

executorService.execute(runnables.get(functionIndex.getAndIncrement()));

LOGGER.debug(Thread.currentThread().getName() + " Executor - *Finished Time = " + now());

if(functionIndex.get() == runnables.size()){
functionIndex.set(0);
}
}

}
} catch (Exception interruptEx) {
throw new RuntimeException(interruptEx);
} finally {
executorService.shutdown();
while (!executorService.isTerminated()) {
// --------------------------------------
// wait for all tasks to finish execution
// --------------------------------------
//LOGGER.info("Still waiting for all threads to complete execution...");
} catch (Exception interruptEx) {
throw new RuntimeException(interruptEx);
} finally {
executorService.shutdown();
while (!executorService.isTerminated()) {
// --------------------------------------
// wait for all tasks to finish execution
// --------------------------------------
//LOGGER.info("Still waiting for all threads to complete execution...");
}
LOGGER.warn("** Completed executing all virtual-user scenarios! **");
}
LOGGER.warn("** Completed executing all virtual-user scenarios! **");
}
});
}

public void runCallables() {
runCallableFutures();
}

public void runCallableFutures() {
executeWithAbortTimeout(() -> {

if (callables == null || callables.size() == 0) {
throw new RuntimeException("No callable(s) was found to run. You can add one or more callables using 'addCallable(Callable callable)'");
}
if (callables == null || callables.size() == 0) {
throw new RuntimeException("No callable(s) was found to run. You can add one or more callables using 'addCallable(Callable callable)'");
}

ExecutorService executorService = newFixedThreadPool(numberOfThreads);
ExecutorService executorService = newFixedThreadPool(numberOfThreads);

try {
executorService.invokeAll(callables).stream().forEach(future -> {
for (int j = 0; j < numberOfThreads; j++) {
try {
LOGGER.debug("Waiting in the transit for next test flight to adjust overall ramp up time, wait time now = " + delayBetweenTwoThreadsInMilliSecs);
sleep(delayBetweenTwoThreadsInMilliSecs.longValue());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
try {
executorService.invokeAll(callables).stream().forEach(future -> {
for (int j = 0; j < numberOfThreads; j++) {
try {
LOGGER.debug("Waiting in the transit for next test flight to adjust overall ramp up time, wait time now = " + delayBetweenTwoThreadsInMilliSecs);
sleep(delayBetweenTwoThreadsInMilliSecs.longValue());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

LOGGER.debug(Thread.currentThread().getName() + " Future execution- Start.... Time = " + now());
LOGGER.debug(Thread.currentThread().getName() + " Future execution- Start.... Time = " + now());

execute(future);
execute(future);

LOGGER.debug(Thread.currentThread().getName() + " Future execution- *Finished Time = " + now());
LOGGER.debug(Thread.currentThread().getName() + " Future execution- *Finished Time = " + now());
}
});
} catch (InterruptedException interruptEx) {
throw new RuntimeException(interruptEx);
} finally {
executorService.shutdown();
while (!executorService.isTerminated()) {
// wait for all tasks to finish executing
// LOGGER.info("Still waiting for all threads to complete execution...");
}
});
} catch (InterruptedException interruptEx) {
throw new RuntimeException(interruptEx);
} finally {
executorService.shutdown();
while (!executorService.isTerminated()) {
// wait for all tasks to finish executing
// LOGGER.info("Still waiting for all threads to complete execution...");
LOGGER.warn("* Completed executing all virtual-user scenarios! *");
}
LOGGER.warn("* Completed executing all virtual-user scenarios! *");
}


});
}

public <T extends Object> Callable<Object> createCallableFuture(T objectToConsumer, Consumer<T> consumer) {
Expand All @@ -210,6 +230,24 @@ private Object execute(Future<Object> future) {
}
}

private void executeWithAbortTimeout(Runnable runnable) {
ExecutorService executorService = newSingleThreadExecutor();
Future<?> future = executorService.submit(runnable);
try {
future.get(abortAfterTimeLapsedInSeconds, TimeUnit.SECONDS);
} catch (TimeoutException timeoutEx) {
future.cancel(true);
throw new RuntimeException(timeoutEx);
} catch (InterruptedException interruptEx) {
Thread.currentThread().interrupt();
future.cancel(true);
throw new RuntimeException(interruptEx);
} catch (ExecutionException executionEx) {
throw new RuntimeException(executionEx);
} finally {
executorService.shutdownNow();
}
}

private void calculateAndSetDelayBetweenTwoThreadsInSecs(int rampUpPeriod) {
if (rampUpPeriod == 0) {
Expand Down Expand Up @@ -242,6 +280,7 @@ private void logLoadingProperties() {
"\n ### numberOfThreads : " + numberOfThreads +
"\n ### rampUpPeriodInSeconds : " + rampUpPeriod +
"\n ### loopCount : " + loopCount +
"\n ### abortAfterTimeLapsedInSeconds : " + abortAfterTimeLapsedInSeconds +
"\n-----------------------------------\n");

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
import java.util.concurrent.atomic.AtomicInteger;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertThrows;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.CoreMatchers.equalTo;

public class LoadTest {

Expand Down Expand Up @@ -76,5 +78,37 @@ public void testLoad_Fail() {
assertThat(passedCounter.get(), is(0));
}

@Test
public void testLoad_Timeout() {
ExecutorServiceRunner executorServiceRunner = new ExecutorServiceRunner(3, 3, 6, 3);

final AtomicInteger passedCounter = new AtomicInteger();
final AtomicInteger failedCounter = new AtomicInteger();

Runnable taskSampleTest = () -> {
System.out.println(Thread.currentThread().getName() + " JunitTestSample test- Start. Time = " + LocalDateTime.now());

Result result = (new JUnitCore()).run(Request.method(JunitTestSample.class, "testFirstName"));

System.out.println(Thread.currentThread().getName() + " JunitTestSample test- *Finished Time, result = " + LocalDateTime.now() + " -" + result.wasSuccessful());

if(result.wasSuccessful()){
passedCounter.incrementAndGet();
} else {
failedCounter.incrementAndGet();
}
};

executorServiceRunner.addRunnable(taskSampleTest);

RuntimeException e = assertThrows(RuntimeException.class, executorServiceRunner::runRunnables);
assertThat(e.getMessage(), equalTo("java.util.concurrent.TimeoutException"));

System.out.println(">>> passed count:" + passedCounter.get());
System.out.println(">>> failed count:" + failedCounter.get());
System.out.println(">>> Total test count:" + (failedCounter.get() + passedCounter.get()));

assertThat(failedCounter.get(), is(0));
}
}