Skip to content

Commit 8f621ff

Browse files
authored
[Optimization] Calculate global parameter and local parameter at master. (#10704)
* Global parameter and local parameter calculation external expansion. * k8s task ut fix. * TimePlaceholderUtils import DateUtils fix * follow the review comments to fix. * follow the review comments to fix. * e2e rerun
1 parent a45fc5b commit 8f621ff

39 files changed

Lines changed: 344 additions & 424 deletions

File tree

‎dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
import org.apache.dolphinscheduler.api.utils.Result;
5252
import org.apache.dolphinscheduler.common.Constants;
5353
import org.apache.dolphinscheduler.common.enums.Flag;
54-
import org.apache.dolphinscheduler.common.expand.CuringGlobalParamsService;
54+
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
5555
import org.apache.dolphinscheduler.common.graph.DAG;
5656
import org.apache.dolphinscheduler.common.model.TaskNode;
5757
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
@@ -155,7 +155,7 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce
155155
private ScheduleMapper scheduleMapper;
156156

157157
@Autowired
158-
private CuringGlobalParamsService curingGlobalParamsService;
158+
private CuringParamsService curingGlobalParamsService;
159159

160160
/**
161161
* return top n SUCCESS process instance order by running time which started between startTime and endTime

‎dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java‎

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,7 @@
3636
import org.apache.dolphinscheduler.common.enums.CommandType;
3737
import org.apache.dolphinscheduler.common.enums.Flag;
3838
import org.apache.dolphinscheduler.common.enums.UserType;
39-
import org.apache.dolphinscheduler.common.expand.CuringGlobalParamsService;
40-
import org.apache.dolphinscheduler.common.expand.DolphinSchedulerCuringGlobalParams;
39+
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
4140
import org.apache.dolphinscheduler.common.graph.DAG;
4241
import org.apache.dolphinscheduler.common.model.TaskNode;
4342
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
@@ -72,7 +71,6 @@
7271
import org.mockito.Mock;
7372
import org.mockito.Mockito;
7473
import org.mockito.junit.MockitoJUnitRunner;
75-
import org.springframework.beans.factory.annotation.Autowired;
7674

7775
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_DELETE;
7876
import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.INSTANCE_UPDATE;
@@ -131,7 +129,7 @@ public class ProcessInstanceServiceTest {
131129
ScheduleMapper scheduleMapper;
132130

133131
@Mock
134-
CuringGlobalParamsService curingGlobalParamsService;
132+
CuringParamsService curingGlobalParamsService;
135133

136134

137135
private String shellJson = "[{\"name\":\"\",\"preTaskCode\":0,\"preTaskVersion\":0,\"postTaskCode\":123456789,"

‎dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java‎

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -831,4 +831,6 @@ private Constants() {
831831
public static final int USER_PASSWORD_MAX_LENGTH = 20;
832832

833833
public static final int USER_PASSWORD_MIN_LENGTH = 2;
834+
835+
public static final String FUNCTION_START_WITH = "$";
834836
}

‎dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/expand/DolphinSchedulerCuringGlobalParams.java‎

Lines changed: 0 additions & 96 deletions
This file was deleted.

‎dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/placeholder/TimePlaceholderUtils.java‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -512,7 +512,7 @@ public static Map.Entry<Date, String> calcMinutes(String expression, Date date)
512512

513513
if (Character.isDigit(expression.charAt(index + 1))) {
514514
String addMinuteExpr = expression.substring(index + 1);
515-
Date targetDate = org.apache.commons.lang.time.DateUtils
515+
Date targetDate = org.apache.commons.lang3.time.DateUtils
516516
.addMinutes(date, calcMinutes(addMinuteExpr));
517517
String dateFormat = expression.substring(0, index);
518518

‎dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/ParameterUtilsTest.java‎

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,27 +20,16 @@
2020
import static org.apache.dolphinscheduler.common.utils.placeholder.TimePlaceholderUtils.replacePlaceholders;
2121

2222
import org.apache.dolphinscheduler.common.Constants;
23-
import org.apache.dolphinscheduler.common.enums.CommandType;
24-
import org.apache.dolphinscheduler.common.expand.TimePlaceholderResolverExpandService;
25-
import org.apache.dolphinscheduler.common.expand.TimePlaceholderResolverExpandServiceImpl;
2623
import org.apache.dolphinscheduler.common.utils.placeholder.PlaceholderUtils;
27-
import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
28-
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
29-
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
3024

3125
import java.text.ParseException;
32-
import java.util.ArrayList;
3326
import java.util.Date;
3427
import java.util.HashMap;
35-
import java.util.List;
3628
import java.util.Map;
3729

3830
import org.junit.Assert;
39-
import org.junit.Before;
4031
import org.junit.Test;
4132
import org.junit.runner.RunWith;
42-
import org.mockito.InjectMocks;
43-
import org.mockito.Mock;
4433
import org.mockito.junit.MockitoJUnitRunner;
4534
import org.slf4j.Logger;
4635
import org.slf4j.LoggerFactory;
@@ -49,12 +38,6 @@
4938
public class ParameterUtilsTest {
5039
public static final Logger logger = LoggerFactory.getLogger(ParameterUtilsTest.class);
5140

52-
@Mock
53-
private TimePlaceholderResolverExpandService timePlaceholderResolverExpandService;
54-
55-
@InjectMocks
56-
private TimePlaceholderResolverExpandServiceImpl timePlaceholderResolverExpandServiceImpl;
57-
5841
/**
5942
* Test convertParameterPlaceholders
6043
*/

‎dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/builder/TaskExecutionContextBuilder.java‎

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,12 @@
2929
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
3030
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
3131
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
32+
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
33+
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
3234
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
35+
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
36+
37+
import java.util.Map;
3338

3439
/**
3540
* TaskExecutionContext builder
@@ -131,6 +136,28 @@ public TaskExecutionContextBuilder buildK8sTaskRelatedInfo(K8sTaskExecutionConte
131136
taskExecutionContext.setK8sTaskExecutionContext(k8sTaskExecutionContext);
132137
return this;
133138
}
139+
140+
/**
141+
* build global and local params
142+
* @param propertyMap
143+
* @return
144+
*/
145+
public TaskExecutionContextBuilder buildParamInfo(Map<String, Property> propertyMap) {
146+
taskExecutionContext.setPrepareParamsMap(propertyMap);
147+
return this;
148+
}
149+
150+
/**
151+
* build business params
152+
* @param businessParamsMap
153+
* @return
154+
*/
155+
public TaskExecutionContextBuilder buildBusinessParamsMap(Map<String, Property> businessParamsMap) {
156+
taskExecutionContext.setParamsMap(businessParamsMap);
157+
return this;
158+
}
159+
160+
134161
/**
135162
* create
136163
*

‎dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
import org.apache.dolphinscheduler.common.Constants;
2121
import org.apache.dolphinscheduler.common.enums.SlotCheckState;
22-
import org.apache.dolphinscheduler.common.expand.CuringGlobalParamsService;
22+
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
2323
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
2424
import org.apache.dolphinscheduler.common.thread.Stopper;
2525
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
@@ -95,7 +95,7 @@ public class MasterSchedulerService extends BaseDaemonThread {
9595
private StateWheelExecuteThread stateWheelExecuteThread;
9696

9797
@Autowired
98-
private CuringGlobalParamsService curingGlobalParamsService;
98+
private CuringParamsService curingGlobalParamsService;
9999

100100
private String masterAddress;
101101

‎dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java‎

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
import org.apache.dolphinscheduler.common.enums.TaskDependType;
4141
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
4242
import org.apache.dolphinscheduler.common.enums.TimeoutFlag;
43-
import org.apache.dolphinscheduler.common.expand.CuringGlobalParamsService;
43+
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
4444
import org.apache.dolphinscheduler.common.graph.DAG;
4545
import org.apache.dolphinscheduler.common.model.TaskNode;
4646
import org.apache.dolphinscheduler.common.model.TaskNodeRelation;
@@ -49,7 +49,6 @@
4949
import org.apache.dolphinscheduler.common.utils.JSONUtils;
5050
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
5151
import org.apache.dolphinscheduler.common.utils.NetUtils;
52-
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
5352
import org.apache.dolphinscheduler.dao.entity.Command;
5453
import org.apache.dolphinscheduler.dao.entity.Environment;
5554
import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
@@ -239,7 +238,7 @@ public class WorkflowExecuteRunnable implements Runnable {
239238
/**
240239
* curing global params service
241240
*/
242-
private final CuringGlobalParamsService curingGlobalParamsService;
241+
private final CuringParamsService curingParamsService;
243242

244243
/**
245244
* @param processInstance processInstance
@@ -255,14 +254,14 @@ public WorkflowExecuteRunnable(ProcessInstance processInstance
255254
, ProcessAlertManager processAlertManager
256255
, MasterConfig masterConfig
257256
, StateWheelExecuteThread stateWheelExecuteThread
258-
, CuringGlobalParamsService curingGlobalParamsService) {
257+
, CuringParamsService curingParamsService) {
259258
this.processService = processService;
260259
this.processInstance = processInstance;
261260
this.masterConfig = masterConfig;
262261
this.nettyExecutorManager = nettyExecutorManager;
263262
this.processAlertManager = processAlertManager;
264263
this.stateWheelExecuteThread = stateWheelExecuteThread;
265-
this.curingGlobalParamsService = curingGlobalParamsService;
264+
this.curingParamsService = curingParamsService;
266265
TaskMetrics.registerTaskPrepared(readyToSubmitTaskQueue::size);
267266
}
268267

@@ -1008,7 +1007,7 @@ private void initTaskQueue() {
10081007

10091008
if (!complementListDate.isEmpty() && Flag.NO == processInstance.getIsSubProcess()) {
10101009
processInstance.setScheduleTime(complementListDate.get(0));
1011-
String globalParams = curingGlobalParamsService.curingGlobalParams(processInstance.getId(),
1010+
String globalParams = curingParamsService.curingGlobalParams(processInstance.getId(),
10121011
processDefinition.getGlobalParamMap(),
10131012
processDefinition.getGlobalParamList(),
10141013
CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime(), cmdParam.get(Constants.SCHEDULE_TIMEZONE));

‎dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java‎

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import org.apache.dolphinscheduler.plugin.task.api.enums.dp.ConnectorType;
6060
import org.apache.dolphinscheduler.plugin.task.api.enums.dp.ExecuteSqlType;
6161
import org.apache.dolphinscheduler.plugin.task.api.model.JdbcInfo;
62+
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
6263
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
6364
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
6465
import org.apache.dolphinscheduler.plugin.task.api.parameters.ParametersNode;
@@ -73,6 +74,7 @@
7374
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
7475
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
7576
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
77+
import org.apache.dolphinscheduler.service.expand.CuringParamsService;
7678
import org.apache.dolphinscheduler.service.process.ProcessService;
7779
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
7880
import org.apache.dolphinscheduler.spi.enums.DbType;
@@ -123,13 +125,16 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
123125

124126
protected TaskPluginManager taskPluginManager;
125127

128+
protected CuringParamsService curingParamsService;
129+
126130
protected String threadLoggerInfoName;
127131

128132
@Override
129133
public void init(@NonNull TaskInstance taskInstance, @NonNull ProcessInstance processInstance) {
130134
processService = SpringApplicationContext.getBean(ProcessService.class);
131135
masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
132136
taskPluginManager = SpringApplicationContext.getBean(TaskPluginManager.class);
137+
curingParamsService = SpringApplicationContext.getBean(CuringParamsService.class);
133138
this.taskInstance = taskInstance;
134139
this.processInstance = processInstance;
135140
this.maxRetryTimes = masterConfig.getTaskCommitRetryTimes();
@@ -301,6 +306,10 @@ protected TaskExecutionContext getTaskExecutionContext(TaskInstance taskInstance
301306
setK8sTaskRelation(k8sTaskExecutionContext, taskInstance);
302307
}
303308

309+
Map<String, Property> businessParamsMap = curingParamsService.preBuildBusinessParams(processInstance);
310+
311+
AbstractParameters baseParam = taskPluginManager.getParameters(ParametersNode.builder().taskType(taskInstance.getTaskType()).taskParams(taskInstance.getTaskParams()).build());
312+
Map<String, Property> propertyMap = curingParamsService.paramParsingPreparation(taskInstance, baseParam, processInstance);
304313
return TaskExecutionContextBuilder.get()
305314
.buildTaskInstanceRelatedInfo(taskInstance)
306315
.buildTaskDefinitionRelatedInfo(taskInstance.getTaskDefine())
@@ -309,6 +318,8 @@ protected TaskExecutionContext getTaskExecutionContext(TaskInstance taskInstance
309318
.buildResourceParametersInfo(resources)
310319
.buildDataQualityTaskExecutionContext(dataQualityTaskExecutionContext)
311320
.buildK8sTaskRelatedInfo(k8sTaskExecutionContext)
321+
.buildBusinessParamsMap(businessParamsMap)
322+
.buildParamInfo(propertyMap)
312323
.create();
313324
}
314325

0 commit comments

Comments
 (0)