Skip to content

Commit 0823c2f

Browse files
authored
[Fix-17613] [Master] Task group queue priority always remains 0 (#17614)
1 parent 4c264fd commit 0823c2f

7 files changed

Lines changed: 19 additions & 10 deletions

File tree

‎dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ public class TaskDefinition {
206206
*/
207207
private int taskGroupId;
208208
/**
209-
* task group id
209+
* task group priority, todo: we should add this field to task instance when create task instance
210210
*/
211211
private int taskGroupPriority;
212212

‎dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/ITaskGroupCoordinator.java‎

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.dolphinscheduler.common.enums.Flag;
2121
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
22+
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
2223
import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
2324
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
2425

@@ -66,9 +67,10 @@ public interface ITaskGroupCoordinator extends AutoCloseable {
6667
* The TaskInstance shouldn't dispatch until there exist available slot, the taskGroupCoordinator notify it.
6768
*
6869
* @param taskInstance the task instance which want to acquire task group slot.
70+
* @param taskDefinition the task definition which contains the task group.
6971
* @throws IllegalArgumentException if the taskInstance is null or the used taskGroup doesn't exist.
7072
*/
71-
void acquireTaskGroupSlot(TaskInstance taskInstance);
73+
void acquireTaskGroupSlot(TaskInstance taskInstance, TaskDefinition taskDefinition);
7274

7375
/**
7476
* If the TaskInstance is using TaskGroup then it need to release TaskGroupSlot.

‎dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinator.java‎

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
2424
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
2525
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
26+
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
2627
import org.apache.dolphinscheduler.dao.entity.TaskGroup;
2728
import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
2829
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@@ -334,7 +335,7 @@ public boolean needAcquireTaskGroupSlot(final TaskInstance taskInstance) {
334335
}
335336

336337
@Override
337-
public void acquireTaskGroupSlot(TaskInstance taskInstance) {
338+
public void acquireTaskGroupSlot(TaskInstance taskInstance, TaskDefinition taskDefinition) {
338339
if (taskInstance == null || taskInstance.getTaskGroupId() <= 0) {
339340
throw new IllegalArgumentException("The current TaskInstance does not use task group");
340341
}
@@ -353,7 +354,7 @@ public void acquireTaskGroupSlot(TaskInstance taskInstance) {
353354
.taskName(taskInstance.getName())
354355
.groupId(taskInstance.getTaskGroupId())
355356
.workflowInstanceId(taskInstance.getWorkflowInstanceId())
356-
.priority(taskInstance.getTaskGroupPriority())
357+
.priority(taskDefinition.getTaskGroupPriority())
357358
.inQueue(Flag.YES.getCode())
358359
.forceStart(Flag.NO.getCode())
359360
.status(TaskGroupQueueStatus.WAIT_QUEUE)

‎dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/TaskExecutionRunnable.java‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ public int compareTo(ITaskExecutionRunnable other) {
178178

179179
// larger number, higher priority
180180
int taskGroupPriorityCompareResult =
181-
taskInstance.getTaskGroupPriority() - other.getTaskInstance().getTaskGroupPriority();
181+
taskDefinition.getTaskGroupPriority() - other.getTaskDefinition().getTaskGroupPriority();
182182
if (taskGroupPriorityCompareResult != 0) {
183183
return -taskGroupPriorityCompareResult;
184184
}

‎dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java‎

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus.DISPATCH;
2222

2323
import org.apache.dolphinscheduler.common.utils.JSONUtils;
24+
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
2425
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
2526
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
2627
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
@@ -84,7 +85,8 @@ protected boolean isTaskNeedAcquireTaskGroupSlot(final ITaskExecutionRunnable ta
8485
*/
8586
protected void acquireTaskGroupSlot(final ITaskExecutionRunnable taskExecutionRunnable) {
8687
final TaskInstance taskInstance = taskExecutionRunnable.getTaskInstance();
87-
taskGroupCoordinator.acquireTaskGroupSlot(taskInstance);
88+
final TaskDefinition taskDefinition = taskExecutionRunnable.getTaskDefinition();
89+
taskGroupCoordinator.acquireTaskGroupSlot(taskInstance, taskDefinition);
8890
}
8991

9092
/**

‎dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinatorTest.java‎

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import static org.mockito.Mockito.when;
2727

2828
import org.apache.dolphinscheduler.common.enums.Flag;
29+
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
2930
import org.apache.dolphinscheduler.dao.entity.TaskGroup;
3031
import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
3132
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@@ -110,26 +111,28 @@ void needAcquireTaskGroupSlot() {
110111
void acquireTaskGroupSlot() {
111112
// TaskInstance is NULL
112113
IllegalArgumentException illegalArgumentException =
113-
assertThrows(IllegalArgumentException.class, () -> taskGroupCoordinator.acquireTaskGroupSlot(null));
114+
assertThrows(IllegalArgumentException.class,
115+
() -> taskGroupCoordinator.acquireTaskGroupSlot(null, null));
114116
assertEquals("The current TaskInstance does not use task group", illegalArgumentException.getMessage());
115117

118+
TaskDefinition taskDefinition = new TaskDefinition();
116119
// TaskGroupId is NULL
117120
TaskInstance taskInstance = new TaskInstance();
118121
illegalArgumentException = assertThrows(IllegalArgumentException.class,
119-
() -> taskGroupCoordinator.acquireTaskGroupSlot(taskInstance));
122+
() -> taskGroupCoordinator.acquireTaskGroupSlot(taskInstance, taskDefinition));
120123
assertEquals("The current TaskInstance does not use task group", illegalArgumentException.getMessage());
121124

122125
// TaskGroup not exist
123126
taskInstance.setTaskGroupId(1);
124127
taskInstance.setId(1);
125128
when(taskGroupDao.queryById(taskInstance.getTaskGroupId())).thenReturn(null);
126129
illegalArgumentException = assertThrows(IllegalArgumentException.class,
127-
() -> taskGroupCoordinator.acquireTaskGroupSlot(taskInstance));
130+
() -> taskGroupCoordinator.acquireTaskGroupSlot(taskInstance, taskDefinition));
128131
assertEquals("The current TaskGroup: 1 does not exist", illegalArgumentException.getMessage());
129132

130133
// TaskGroup exist
131134
when(taskGroupDao.queryById(taskInstance.getTaskGroupId())).thenReturn(new TaskGroup());
132-
Assertions.assertDoesNotThrow(() -> taskGroupCoordinator.acquireTaskGroupSlot(taskInstance));
135+
Assertions.assertDoesNotThrow(() -> taskGroupCoordinator.acquireTaskGroupSlot(taskInstance, taskDefinition));
133136

134137
}
135138

‎dolphinscheduler-master/src/test/resources/it/start/workflow_with_one_fake_task_using_task_group.yaml‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ tasks:
4646
taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 5"}'
4747
workerGroup: default
4848
taskGroupId: 1
49+
taskGroupPriority: 1
4950
createTime: 2024-08-12 00:00:00
5051
updateTime: 2021-08-12 00:00:00
5152
taskExecuteType: BATCH

0 commit comments

Comments
 (0)