Skip to content

Commit 08db465

Browse files
authored
[Fix-18131] Workflow instance stuck in RUNNING state forever when using CONTINUE failure strategy with a failed upstream task (#18146)
1 parent 373d9d6 commit 08db465

5 files changed

Lines changed: 164 additions & 2 deletions

File tree

‎docs/docs/en/guide/project/workflow-definition.md‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ After selecting multiple workflows, you can perform batch operations at the bott
106106

107107
Description of workflow operating parameters:
108108

109-
* Failure strategy: When a task node fails to execute, other parallel task nodes need to execute this strategy. "Continue" means: after a certain task fails, other task nodes execute normally; "End" means: terminate all tasks execution, and terminate the entire process.
109+
* Failure strategy: When a task node fails to execute, other parallel task nodes need to execute this strategy. "Continue" means: after a certain task fails, end the workflow after waiting for other task nodes at the same level to execute normally.; "End" means: terminate all tasks execution, and terminate the entire workflow.
110110
* Notification strategy: When the process is over, send the process execution result notification email according to the process status, options including no send, send if success, send of failure, send whatever result.
111111
* Process priority: The priority of process operation, divide into five levels: highest (HIGHEST), high (HIGH), medium (MEDIUM), low (LOW), and lowest (LOWEST). When the number of master threads is insufficient, high priority processes will execute first in the execution queue, and processes with the same priority will execute in the order of first in, first out.
112112
* Worker group: The process can only be executed in the specified worker machine group. The default is `Default`, which can execute on any worker.

‎docs/docs/zh/guide/project/workflow-definition.md‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@
8989

9090
工作流运行参数说明:
9191

92-
* 失败策略:当某一个任务节点执行失败时,其他并行的任务节点需要执行的策略。”继续“表示:某一任务失败后,其他任务节点正常执行;”结束“表示:终止所有正在执行的任务,并终止整个流程
92+
* 失败策略:当某一个任务节点执行失败时,其他并行的任务节点需要执行的策略。”继续“表示:某一任务失败后,等待其他同级任务节点正常执行完毕后结束工作流;”结束“表示:终止所有正在执行的任务,并终止整个工作流
9393
* 通知策略:当流程结束,根据流程状态发送流程执行信息通知邮件,包含任何状态都不发,成功发,失败发,成功或失败都发。
9494
* 流程优先级:流程运行的优先级,分五个等级:最高(HIGHEST),高(HIGH),中(MEDIUM),低(LOW),最低(LOWEST)。当 master 线程数不足时,级别高的流程在执行队列中会优先执行,相同优先级的流程按照先进先出的顺序执行。
9595
* Worker 分组:该流程只能在指定的 worker 机器组里执行。默认是 Default,可以在任一 worker 上执行。

‎dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java‎

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,14 @@ protected void triggerTasks(final IWorkflowExecutionRunnable workflowExecutionRu
8989
.sorted(Comparator.comparing(ITaskExecutionRunnable::getName))
9090
.collect(Collectors.toList());
9191
if (CollectionUtils.isEmpty(readyToTriggerTasks)) {
92+
final boolean isAllCandidateTaskPredecessorsInActive = triggerCandidateTasks.stream()
93+
.flatMap(taskExecutionRunnable -> workflowExecutionGraph
94+
.getPredecessors(taskExecutionRunnable.getName())
95+
.stream())
96+
.allMatch(workflowExecutionGraph::isTaskExecutionRunnableInActive);
97+
if (isAllCandidateTaskPredecessorsInActive) {
98+
emitWorkflowFinishedEventIfApplicable(workflowExecutionRunnable);
99+
}
92100
return;
93101
}
94102
final WorkflowEventBus workflowEventBus = workflowExecutionRunnable.getWorkflowEventBus();

‎dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/integration/cases/WorkflowStartTestCase.java‎

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -381,6 +381,7 @@ public void testStartWorkflow_with_oneSuccessSwitch_twoFakeTask() {
381381
final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
382382
.workflowDefinition(parentWorkflow)
383383
.runWorkflowCommandParam(new RunWorkflowCommandParam())
384+
.failureStrategy(FailureStrategy.CONTINUE)
384385
.build();
385386
final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
386387

@@ -610,6 +611,7 @@ void testStartWorkflow_with_oneTaskWithMultiplePredecessors_runFailed() {
610611
final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
611612
.workflowDefinition(parentWorkflow)
612613
.runWorkflowCommandParam(new RunWorkflowCommandParam())
614+
.failureStrategy(FailureStrategy.CONTINUE)
613615
.build();
614616
final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
615617

@@ -640,6 +642,45 @@ void testStartWorkflow_with_oneTaskWithMultiplePredecessors_runFailed() {
640642
masterContainer.assertAllResourceReleased();
641643
}
642644

645+
@Test
646+
@DisplayName("Test start a workflow with shared downstream task when failed predecessor finishes first using continue failure strategy")
647+
void testStartWorkflow_with_sharedDownstreamTask_whenFailedPredecessorFinishFirst_usingFailureStrategyContinue() {
648+
final String yaml =
649+
"/it/start/workflow_with_shared_downstream_task_when_failed_predecessor_finish_first.yaml";
650+
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
651+
final WorkflowDefinition parentWorkflow = context.getOneWorkflow();
652+
653+
final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
654+
.workflowDefinition(parentWorkflow)
655+
.runWorkflowCommandParam(new RunWorkflowCommandParam())
656+
.failureStrategy(FailureStrategy.CONTINUE)
657+
.build();
658+
final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);
659+
660+
await()
661+
.atMost(Duration.ofMinutes(1))
662+
.untilAsserted(() -> {
663+
Assertions
664+
.assertThat(repository.queryWorkflowInstance(workflowInstanceId))
665+
.matches(
666+
workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.FAILURE)
667+
.matches(workflowInstance -> workflowInstance.getEndTime() != null);
668+
669+
Assertions
670+
.assertThat(repository.queryTaskInstance(workflowInstanceId))
671+
.hasSize(2)
672+
.anySatisfy(taskInstance -> {
673+
assertThat(taskInstance.getName()).isEqualTo("A");
674+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
675+
})
676+
.anySatisfy(taskInstance -> {
677+
assertThat(taskInstance.getName()).isEqualTo("B");
678+
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
679+
});
680+
});
681+
masterContainer.assertAllResourceReleased();
682+
}
683+
643684
@Test
644685
@DisplayName("Test start a workflow with one sub workflow task(A) failed")
645686
public void testStartWorkflow_with_subWorkflowTask_failed() {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
# A(failed) -> C(not triggered)
19+
# B(success) -> C(not triggered)
20+
# A finishes before B.
21+
project:
22+
name: MasterIntegrationTest
23+
code: 1
24+
description: This is a fake project
25+
userId: 1
26+
userName: admin
27+
createTime: 2024-08-12 00:00:00
28+
updateTime: 2021-08-12 00:00:00
29+
30+
workflows:
31+
- name: workflow_with_shared_downstream_task_when_failed_predecessor_finish_first
32+
code: 1
33+
version: 1
34+
projectCode: 1
35+
description: This is a fake workflow with a shared downstream task
36+
releaseState: ONLINE
37+
createTime: 2024-08-12 00:00:00
38+
updateTime: 2021-08-12 00:00:00
39+
userId: 1
40+
executionType: PARALLEL
41+
42+
tasks:
43+
- name: A
44+
code: 1
45+
version: 1
46+
projectCode: 1
47+
userId: 1
48+
taskType: LogicFakeTask
49+
taskParams: '{"localParams":null,"varPool":[],"shellScript":"xx"}'
50+
workerGroup: default
51+
createTime: 2024-08-12 00:00:00
52+
updateTime: 2021-08-12 00:00:00
53+
taskExecuteType: BATCH
54+
- name: B
55+
code: 2
56+
version: 1
57+
projectCode: 1
58+
userId: 1
59+
taskType: LogicFakeTask
60+
taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 2"}'
61+
workerGroup: default
62+
createTime: 2024-08-12 00:00:00
63+
updateTime: 2021-08-12 00:00:00
64+
taskExecuteType: BATCH
65+
- name: C
66+
code: 3
67+
version: 1
68+
projectCode: 1
69+
userId: 1
70+
taskType: LogicFakeTask
71+
taskParams: '{"localParams":null,"varPool":[],"shellScript":"echo success"}'
72+
workerGroup: default
73+
createTime: 2024-08-12 00:00:00
74+
updateTime: 2021-08-12 00:00:00
75+
taskExecuteType: BATCH
76+
77+
taskRelations:
78+
- projectCode: 1
79+
workflowDefinitionCode: 1
80+
workflowDefinitionVersion: 1
81+
preTaskCode: 0
82+
preTaskVersion: 0
83+
postTaskCode: 1
84+
postTaskVersion: 1
85+
createTime: 2024-08-12 00:00:00
86+
updateTime: 2024-08-12 00:00:00
87+
- projectCode: 1
88+
workflowDefinitionCode: 1
89+
workflowDefinitionVersion: 1
90+
preTaskCode: 0
91+
preTaskVersion: 0
92+
postTaskCode: 2
93+
postTaskVersion: 1
94+
createTime: 2024-08-12 00:00:00
95+
updateTime: 2024-08-12 00:00:00
96+
- projectCode: 1
97+
workflowDefinitionCode: 1
98+
workflowDefinitionVersion: 1
99+
preTaskCode: 1
100+
preTaskVersion: 1
101+
postTaskCode: 3
102+
postTaskVersion: 1
103+
createTime: 2024-08-12 00:00:00
104+
updateTime: 2024-08-12 00:00:00
105+
- projectCode: 1
106+
workflowDefinitionCode: 1
107+
workflowDefinitionVersion: 1
108+
preTaskCode: 2
109+
preTaskVersion: 1
110+
postTaskCode: 3
111+
postTaskVersion: 1
112+
createTime: 2024-08-12 00:00:00
113+
updateTime: 2024-08-12 00:00:00

0 commit comments

Comments
 (0)