Skip to content

Commit 06e7935

Browse files
authored
[Chore] Bump testcontainer to 1.21.4 to fix could not find a valid Docker environment at CI (#17978)
1 parent d4a6caf commit 06e7935

15 files changed

Lines changed: 98 additions & 38 deletions

File tree

‎dolphinscheduler-api-test/pom.xml‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737

3838
<junit.version>5.7.2</junit.version>
3939
<selenium.version>4.21.0</selenium.version>
40-
<testcontainers.version>1.19.8</testcontainers.version>
40+
<testcontainers.version>1.21.4</testcontainers.version>
4141
<lombok.version>1.18.24</lombok.version>
4242
<assertj-core.version>3.23.1</assertj-core.version>
4343
<awaitility.version>4.1.0</awaitility.version>

‎dolphinscheduler-bom/pom.xml‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@
119119
<qcloud-cos.version>5.6.231</qcloud-cos.version>
120120
<system-lambda.version>1.2.1</system-lambda.version>
121121
<zeppelin-client.version>0.10.1</zeppelin-client.version>
122-
<testcontainer.version>1.19.3</testcontainer.version>
122+
<testcontainer.version>1.21.4</testcontainer.version>
123123
<checker-qual.version>3.19.0</checker-qual.version>
124124
<zeppelin-client.version>0.10.1</zeppelin-client.version>
125125
<aliyun-voice.version>2.1.4</aliyun-voice.version>

‎dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/DateUtilsTest.java‎

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,18 +30,15 @@
3030

3131
import org.junit.jupiter.api.AfterEach;
3232
import org.junit.jupiter.api.Assertions;
33-
import org.junit.jupiter.api.BeforeEach;
3433
import org.junit.jupiter.api.Test;
3534

3635
public class DateUtilsTest {
3736

38-
@BeforeEach
39-
public void before() {
40-
ThreadLocalContext.removeTimezone();
41-
}
37+
private final TimeZone defaultTimeZone = TimeZone.getDefault();
4238

4339
@AfterEach
44-
public void after() {
40+
public void rollbackTimeZone() {
41+
TimeZone.setDefault(defaultTimeZone);
4542
ThreadLocalContext.removeTimezone();
4643
}
4744

‎dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/JSONUtilsTest.java‎

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.Map;
3030
import java.util.TimeZone;
3131

32+
import org.junit.jupiter.api.AfterEach;
3233
import org.junit.jupiter.api.Assertions;
3334
import org.junit.jupiter.api.Test;
3435

@@ -41,6 +42,14 @@
4142

4243
public class JSONUtilsTest {
4344

45+
private final TimeZone defaultTimeZone = TimeZone.getDefault();
46+
47+
@AfterEach
48+
public void rollbackTimeZone() {
49+
TimeZone.setDefault(defaultTimeZone);
50+
JSONUtils.setTimeZone(defaultTimeZone);
51+
}
52+
4453
@Test
4554
public void createObjectNodeTest() {
4655
String jsonStr = "{\"a\":\"b\",\"b\":\"d\"}";

‎dolphinscheduler-e2e/pom.xml‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
<log4j-slf4j-impl.version>2.17.2</log4j-slf4j-impl.version>
4545
<guava.version>31.0.1-jre</guava.version>
4646
<maven-surefire-plugin.version>2.22.2</maven-surefire-plugin.version>
47-
<testcontainers.version>1.19.8</testcontainers.version>
47+
<testcontainers.version>1.21.4</testcontainers.version>
4848
<junit-pioneer.version>2.2.0</junit-pioneer.version>
4949
</properties>
5050

‎dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/LogicTaskExecutorContainerProvider.java‎

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,18 @@
1818
package org.apache.dolphinscheduler.server.master.engine.executor;
1919

2020
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
21-
import org.apache.dolphinscheduler.task.executor.container.ITaskExecutorContainer;
2221
import org.apache.dolphinscheduler.task.executor.container.ITaskExecutorContainerProvider;
2322
import org.apache.dolphinscheduler.task.executor.container.SharedThreadTaskExecutorContainer;
2423
import org.apache.dolphinscheduler.task.executor.container.TaskExecutorContainerConfig;
2524

2625
import org.springframework.stereotype.Component;
2726

2827
@Component
29-
public class LogicTaskExecutorContainerProvider implements ITaskExecutorContainerProvider {
28+
public class LogicTaskExecutorContainerProvider
29+
implements
30+
ITaskExecutorContainerProvider<SharedThreadTaskExecutorContainer> {
3031

31-
private final ITaskExecutorContainer taskExecutorContainer;
32+
private final SharedThreadTaskExecutorContainer taskExecutorContainer;
3233

3334
public LogicTaskExecutorContainerProvider(final MasterConfig masterConfig) {
3435
final TaskExecutorContainerConfig containerConfig = TaskExecutorContainerConfig.builder()
@@ -39,7 +40,7 @@ public LogicTaskExecutorContainerProvider(final MasterConfig masterConfig) {
3940
}
4041

4142
@Override
42-
public ITaskExecutorContainer getExecutorContainer() {
43+
public SharedThreadTaskExecutorContainer getExecutorContainer() {
4344
return taskExecutorContainer;
4445
}
4546
}

‎dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcher.java‎

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434

3535
import lombok.extern.slf4j.Slf4j;
3636

37+
import com.google.common.annotations.VisibleForTesting;
38+
3739
/**
3840
* WorkerGroupTaskDispatcher is responsible for dispatching tasks from the task queue.
3941
* The main responsibilities include:
@@ -179,7 +181,13 @@ public synchronized void close() {
179181
}
180182
}
181183

182-
int queueSize() {
184+
@VisibleForTesting
185+
public int dispatchEventCount() {
183186
return this.workerGroupEventBus.size();
184187
}
188+
189+
@VisibleForTesting
190+
public int waitingDispatchTaskCount() {
191+
return this.waitingDispatchTaskIds.size();
192+
}
185193
}

‎dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherCoordinator.java‎

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,16 @@
2121
import org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
2222
import org.apache.dolphinscheduler.server.master.engine.task.runnable.ITaskExecutionRunnable;
2323

24+
import java.util.Map;
2425
import java.util.concurrent.ConcurrentHashMap;
2526

2627
import lombok.extern.slf4j.Slf4j;
2728

2829
import org.springframework.beans.factory.annotation.Autowired;
2930
import org.springframework.stereotype.Component;
3031

32+
import com.google.common.annotations.VisibleForTesting;
33+
3134
/**
3235
* WorkerGroupTaskDispatcherManager is responsible for managing the task dispatching for worker groups.
3336
* It maintains a mapping of worker groups to their task dispatchers and priority delay queues,
@@ -40,12 +43,12 @@ public class WorkerGroupDispatcherCoordinator implements AutoCloseable {
4043
@Autowired
4144
private ITaskExecutorClient taskExecutorClient;
4245

43-
private final ConcurrentHashMap<String, WorkerGroupDispatcher> workerGroupDispatcherMap;
46+
private final Map<String, WorkerGroupDispatcher> workerGroupDispatchers;
4447

4548
private final MasterConfig masterConfig;
4649

4750
public WorkerGroupDispatcherCoordinator(final MasterConfig masterConfig) {
48-
workerGroupDispatcherMap = new ConcurrentHashMap<>();
51+
workerGroupDispatchers = new ConcurrentHashMap<>();
4952
this.masterConfig = masterConfig;
5053
}
5154

@@ -82,7 +85,12 @@ public boolean removeTask(ITaskExecutionRunnable taskExecutionRunnable) {
8285
}
8386

8487
public boolean existWorkerGroup(String workerGroup) {
85-
return workerGroupDispatcherMap.containsKey(workerGroup);
88+
return workerGroupDispatchers.containsKey(workerGroup);
89+
}
90+
91+
@VisibleForTesting
92+
public Map<String, WorkerGroupDispatcher> workerGroupDispatchers() {
93+
return workerGroupDispatchers;
8694
}
8795

8896
/**
@@ -91,7 +99,7 @@ public boolean existWorkerGroup(String workerGroup) {
9199
@Override
92100
public void close() throws Exception {
93101
log.info("WorkerGroupDispatcherCoordinator closing");
94-
for (WorkerGroupDispatcher workerGroupDispatcher : workerGroupDispatcherMap.values()) {
102+
for (WorkerGroupDispatcher workerGroupDispatcher : workerGroupDispatchers.values()) {
95103
try {
96104
workerGroupDispatcher.close();
97105
} catch (Exception e) {
@@ -102,7 +110,7 @@ public void close() throws Exception {
102110
}
103111

104112
private WorkerGroupDispatcher getOrCreateWorkerGroupDispatcher(String workerGroup) {
105-
return workerGroupDispatcherMap.computeIfAbsent(workerGroup, wg -> {
113+
return workerGroupDispatchers.computeIfAbsent(workerGroup, wg -> {
106114
WorkerGroupDispatcher workerGroupDispatcher =
107115
new WorkerGroupDispatcher(wg, taskExecutorClient, masterConfig.getTaskDispatchPolicy());
108116
workerGroupDispatcher.start();

‎dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/task/dispatcher/WorkerGroupDispatcherTest.java‎

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
3535
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
3636
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
37-
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
3837
import org.apache.dolphinscheduler.server.master.config.TaskDispatchPolicy;
3938
import org.apache.dolphinscheduler.server.master.engine.WorkflowEventBus;
4039
import org.apache.dolphinscheduler.server.master.engine.task.client.ITaskExecutorClient;
@@ -49,6 +48,7 @@
4948
import java.util.concurrent.ThreadLocalRandom;
5049
import java.util.concurrent.TimeUnit;
5150

51+
import org.junit.jupiter.api.AfterEach;
5252
import org.junit.jupiter.api.BeforeEach;
5353
import org.junit.jupiter.api.Test;
5454
import org.mockito.InOrder;
@@ -61,9 +61,14 @@ class WorkerGroupDispatcherTest {
6161
@BeforeEach
6262
void setUp() {
6363
taskExecutorClient = mock(ITaskExecutorClient.class);
64-
final MasterConfig masterConfig = new MasterConfig();
6564
dispatcher =
66-
new WorkerGroupDispatcher("TestGroup", taskExecutorClient, masterConfig.getTaskDispatchPolicy());
65+
new WorkerGroupDispatcher("TestGroup", taskExecutorClient, new TaskDispatchPolicy());
66+
}
67+
68+
@AfterEach
69+
void tearDown() {
70+
dispatcher.close();
71+
dispatcher.interrupt();
6772
}
6873

6974
@Test

‎dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/MasterContainer.java‎

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@
2727
import org.apache.dolphinscheduler.server.master.engine.executor.LogicTaskExecutorLifecycleEventReporter;
2828
import org.apache.dolphinscheduler.server.master.engine.executor.LogicTaskExecutorRepository;
2929
import org.apache.dolphinscheduler.server.master.engine.system.SystemEventBus;
30-
import org.apache.dolphinscheduler.task.executor.container.AbstractTaskExecutorContainer;
30+
import org.apache.dolphinscheduler.server.master.engine.task.dispatcher.WorkerGroupDispatcherCoordinator;
31+
import org.apache.dolphinscheduler.task.executor.container.SharedThreadTaskExecutorContainer;
3132
import org.apache.dolphinscheduler.task.executor.container.TaskExecutorAssignmentTable;
32-
import org.apache.dolphinscheduler.task.executor.worker.TaskExecutorWorkers;
3333

3434
import java.util.concurrent.TimeUnit;
3535

@@ -57,31 +57,59 @@ public class MasterContainer {
5757
@Autowired
5858
private LogicTaskExecutorLifecycleEventReporter logicTaskExecutorLifecycleEventReporter;
5959

60+
@Autowired
61+
private WorkerGroupDispatcherCoordinator workerGroupDispatcherCoordinator;
62+
6063
public void assertAllResourceReleased() {
6164
await()
6265
.atMost(10, TimeUnit.SECONDS)
6366
.untilAsserted(this::doAssertAllResourceReleased);
6467
}
6568

6669
private void doAssertAllResourceReleased() {
70+
assertWorkflowReleased();
71+
assertWorkflowEventBusReleased();
72+
73+
assertSystemEventBusReleased();
74+
75+
assertLogicTaskEngineReleased();
76+
77+
assertWorkerGroupDispatcherReleased();
78+
}
79+
80+
private void assertWorkflowReleased() {
6781
assertThat(workflowRepository.getAll()).isEmpty();
82+
assertThat(workflowEventBusFireWorkers.getWorkers())
83+
.allMatch(worker -> worker.getRegisteredWorkflowExecuteRunnableSize() == 0);
84+
}
6885

86+
private void assertWorkflowEventBusReleased() {
6987
assertThat(workflowEventBusFireWorkers.getWorkers())
70-
.allMatch(workflowEventBusFireWorker -> workflowEventBusFireWorker
71-
.getRegisteredWorkflowExecuteRunnableSize() == 0);
88+
.allMatch(worker -> worker.getRegisteredWorkflowExecuteRunnableSize() == 0);
89+
}
90+
91+
private void assertSystemEventBusReleased() {
7292
assertThat(systemEventBus).matches(AbstractDelayEventBus::isEmpty);
93+
}
7394

95+
private void assertLogicTaskEngineReleased() {
7496
assertThat(logicTaskExecutorRepository.getAll()).isEmpty();
7597

76-
final AbstractTaskExecutorContainer executorContainer =
77-
(AbstractTaskExecutorContainer) logicTaskExecutorContainerProvider.getExecutorContainer();
98+
final SharedThreadTaskExecutorContainer executorContainer =
99+
logicTaskExecutorContainerProvider.getExecutorContainer();
78100
assertThat(executorContainer.getTaskExecutorAssignmentTable()).matches(TaskExecutorAssignmentTable::isEmpty);
79101

80-
final TaskExecutorWorkers taskExecutorWorkers = executorContainer.getTaskExecutorWorkers();
81-
assertThat(taskExecutorWorkers.getWorkers())
102+
assertThat(executorContainer.getTaskExecutorWorkers().getWorkers())
82103
.allMatch(taskExecutorWorker -> taskExecutorWorker.getRegisteredTaskExecutorSize() == 0)
83104
.allMatch(taskExecutorWorker -> taskExecutorWorker.getFiredTaskExecutorSize() == 0);
84105

85106
assertThat(logicTaskExecutorLifecycleEventReporter.getEventChannels()).isEmpty();
86107
}
108+
109+
private void assertWorkerGroupDispatcherReleased() {
110+
assertThat(workerGroupDispatcherCoordinator.workerGroupDispatchers().values())
111+
.allMatch(dispatcher -> dispatcher.dispatchEventCount() == 0)
112+
.allMatch(dispatcher -> dispatcher.waitingDispatchTaskCount() == 0);
113+
}
114+
87115
}

0 commit comments

Comments
 (0)