Skip to content

Commit 3c25b80

Browse files
authored
Merge branch 'master' into retrofit2-upgrade
2 parents a5ed6b0 + ebba3ba commit 3c25b80

File tree

4 files changed

+49
-5
lines changed

4 files changed

+49
-5
lines changed

clouddriver-core-tck/src/main/java/com/netflix/spinnaker/clouddriver/core/test/TaskRepositoryTck.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,29 @@ public void testListRunningTasks() {
116116
.contains(t2.getId());
117117
}
118118

119+
@Test
120+
public void testListByThisInstance() {
121+
Task t1 = subject.create("Test", "STARTED");
122+
Task t2 = subject.create("Test", "STARTED");
123+
Task t3 = subject.create("Test", "STARTED");
124+
Task t4 = subject.create("Test", "STARTED");
125+
Task t5 = subject.create("Test", "STARTED");
126+
String ownerId = ClouddriverHostname.ID;
127+
128+
t3.updateOwnerId("foo@not_this_clouddriver", "Test");
129+
t5.complete();
130+
131+
List<Task> runningTasks = subject.listByThisInstance();
132+
133+
assertThat(runningTasks.stream().allMatch(t -> t.getOwnerId().equals(ownerId))).isTrue();
134+
assertThat(runningTasks.stream().map(Task::getId).collect(Collectors.toList()))
135+
.contains(t1.getId(), t2.getId(), t4.getId());
136+
// Task 3 doesn't belong to this pod and task 5 is not running, so should not be included in the
137+
// result
138+
assertThat(runningTasks.stream().map(Task::getId).collect(Collectors.toList()))
139+
.doesNotContain(t3.getId(), t5.getId());
140+
}
141+
119142
@Test
120143
public void testResultObjectsPersistence() {
121144
Task t1 = subject.create("Test", "Test Status");

clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/data/task/InMemoryTaskRepository.groovy

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package com.netflix.spinnaker.clouddriver.data.task
1818

19+
import com.netflix.spinnaker.clouddriver.core.ClouddriverHostname
20+
1921
import java.util.concurrent.ConcurrentHashMap
2022

2123
class InMemoryTaskRepository implements TaskRepository {
@@ -60,7 +62,7 @@ class InMemoryTaskRepository implements TaskRepository {
6062

6163
@Override
6264
List<Task> listByThisInstance() {
63-
return list()
65+
return list().findAll { it.ownerId == ClouddriverHostname.ID }
6466
}
6567

6668
private String getNextId() {

clouddriver-sql/src/main/kotlin/com/netflix/spinnaker/clouddriver/sql/SqlTaskRepository.kt

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ import org.jooq.Condition
3131
import org.jooq.DSLContext
3232
import org.jooq.Record
3333
import org.jooq.Select
34-
import org.jooq.impl.DSL
3534
import org.jooq.impl.DSL.field
35+
import org.jooq.impl.DSL.max
3636
import org.jooq.impl.DSL.sql
3737
import org.slf4j.LoggerFactory
3838

@@ -360,13 +360,29 @@ class SqlTaskRepository(
360360
* Since task statuses are insert-only, we first need to find the most
361361
* recent status record for each task ID and the filter that result set
362362
* down to the ones that are running.
363+
*
364+
* Query used:
365+
* SELECT a.task_id
366+
* FROM task_states AS `a`
367+
* JOIN (
368+
* SELECT task_id, MAX(created_at) AS `created`
369+
* FROM task_states
370+
* GROUP BY task_id
371+
* ) AS `b`
372+
* ON (a.task_id = b.task_id AND a.created_at = b.created)
373+
* JOIN tasks AS `t`
374+
* ON (a.task_id = t.id)
375+
* WHERE (
376+
* t.owner_id = '<clouddriver host name>'
377+
* and a.state = 'STARTED'
378+
* )
363379
*/
364380
private fun runningTaskIds(ctx: DSLContext, thisInstance: Boolean): Array<String> {
365381
return withPool(poolName) {
366382
val baseQuery = ctx.select(field("a.task_id"))
367383
.from(taskStatesTable.`as`("a"))
368384
.innerJoin(
369-
ctx.select(field("task_id"), DSL.max(field("created_at")).`as`("created"))
385+
ctx.select(field("task_id"), max(field("created_at")).`as`("created"))
370386
.from(taskStatesTable)
371387
.groupBy(field("task_id"))
372388
.asTable("b")
@@ -377,10 +393,10 @@ class SqlTaskRepository(
377393
.innerJoin(tasksTable.`as`("t")).on(sql("a.task_id = t.id"))
378394
.where(
379395
field("t.owner_id").eq(ClouddriverHostname.ID)
380-
.and(field("a.state").eq(TaskState.STARTED.toString()))
396+
.and(field("a.state").eq(STARTED.toString()))
381397
)
382398
} else {
383-
baseQuery.where(field("a.state").eq(TaskState.STARTED.toString()))
399+
baseQuery.where(field("a.state").eq(STARTED.toString()))
384400
}
385401

386402
select

clouddriver-web/src/main/groovy/com/netflix/spinnaker/clouddriver/controllers/OperationsController.groovy

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@ class OperationsController {
213213
*/
214214
@PreDestroy
215215
void destroy() {
216+
log.info("Destroy has been triggered. Initiating graceful shutdown of tasks.")
216217
long start = System.currentTimeMillis()
217218
def tasks = taskRepository.listByThisInstance()
218219
while (tasks && !tasks.isEmpty() &&
@@ -225,6 +226,8 @@ class OperationsController {
225226
if (tasks && !tasks.isEmpty()) {
226227
log.error("Shutting down while tasks '{}' are still in progress!", tasks)
227228
}
229+
230+
log.info("Destruction procedure completed.")
228231
}
229232

230233
private StartOperationResult start(@Nullable String cloudProvider,

0 commit comments

Comments
 (0)