Skip to content

Commit 6aecd89

Browse files
authored
[Improvement-17001] Once workflow is not exist, delete scheduler task (#17003)
1 parent c4e2c66 commit 6aecd89

3 files changed

Lines changed: 102 additions & 23 deletions

File tree

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.dao.repository;
19+
20+
import org.apache.dolphinscheduler.dao.entity.Schedule;
21+
22+
public interface ScheduleDao extends IDao<Schedule> {
23+
24+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.dolphinscheduler.dao.repository.impl;
19+
20+
import org.apache.dolphinscheduler.dao.entity.Schedule;
21+
import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper;
22+
import org.apache.dolphinscheduler.dao.repository.BaseDao;
23+
import org.apache.dolphinscheduler.dao.repository.ScheduleDao;
24+
25+
import lombok.NonNull;
26+
27+
import org.springframework.stereotype.Repository;
28+
29+
@Repository
30+
public class ScheduleDaoImpl extends BaseDao<Schedule, ScheduleMapper> implements ScheduleDao {
31+
32+
public ScheduleDaoImpl(@NonNull ScheduleMapper scheduleMapper) {
33+
super(scheduleMapper);
34+
}
35+
36+
}

‎dolphinscheduler-scheduler-plugin/dolphinscheduler-scheduler-quartz/src/main/java/org/apache/dolphinscheduler/scheduler/quartz/ProcessScheduleTask.java‎

Lines changed: 42 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,14 @@
2222
import org.apache.dolphinscheduler.common.enums.TaskDependType;
2323
import org.apache.dolphinscheduler.dao.entity.Schedule;
2424
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
25+
import org.apache.dolphinscheduler.dao.repository.ScheduleDao;
26+
import org.apache.dolphinscheduler.dao.repository.WorkflowDefinitionDao;
2527
import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
2628
import org.apache.dolphinscheduler.extract.master.IWorkflowControlClient;
2729
import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowScheduleTriggerRequest;
28-
import org.apache.dolphinscheduler.service.process.ProcessService;
2930

3031
import java.util.Date;
32+
import java.util.Optional;
3133

3234
import lombok.extern.slf4j.Slf4j;
3335

@@ -44,7 +46,10 @@
4446
public class ProcessScheduleTask extends QuartzJobBean {
4547

4648
@Autowired
47-
private ProcessService processService;
49+
private ScheduleDao scheduleDao;
50+
51+
@Autowired
52+
private WorkflowDefinitionDao workflowDefinitionDao;
4853

4954
@Autowired
5055
private IWorkflowControlClient workflowInstanceController;
@@ -53,34 +58,48 @@ public class ProcessScheduleTask extends QuartzJobBean {
5358
@Timed(value = "ds.master.quartz.job.execution.time", percentiles = {0.5, 0.75, 0.95, 0.99}, histogram = true)
5459
@Override
5560
protected void executeInternal(JobExecutionContext context) {
56-
QuartzJobData quartzJobData = QuartzJobData.of(context.getJobDetail().getJobDataMap());
57-
int projectId = quartzJobData.getProjectId();
58-
int scheduleId = quartzJobData.getScheduleId();
61+
final QuartzJobData quartzJobData = QuartzJobData.of(context.getJobDetail().getJobDataMap());
62+
final int projectId = quartzJobData.getProjectId();
63+
final int scheduleId = quartzJobData.getScheduleId();
5964

60-
Date scheduledFireTime = context.getScheduledFireTime();
65+
final Date scheduledFireTime = context.getScheduledFireTime();
66+
final Date fireTime = context.getFireTime();
6167

62-
Date fireTime = context.getFireTime();
68+
log.info("Scheduler: {} fired expect fire time is {}, actual fire time is {}",
69+
scheduleId,
70+
scheduledFireTime,
71+
fireTime);
6372

64-
log.info("scheduled fire time :{}, fire time :{}, scheduleId :{}", scheduledFireTime, fireTime, scheduleId);
65-
66-
// query schedule
67-
Schedule schedule = processService.querySchedule(scheduleId);
73+
// If the schedule does not exist or offline, then delete the corn job
74+
final Schedule schedule = scheduleDao.queryById(scheduleId);
6875
if (schedule == null || ReleaseState.OFFLINE == schedule.getReleaseState()) {
76+
log.warn("Scheduler: {} does not exist in db,will delete job in quartz", scheduleId);
77+
deleteJob(context, projectId, scheduleId);
78+
return;
79+
}
80+
81+
final Optional<WorkflowDefinition> workflowDefinitionOptional =
82+
workflowDefinitionDao.queryByCode(schedule.getWorkflowDefinitionCode());
83+
if (!workflowDefinitionOptional.isPresent()) {
6984
log.warn(
70-
"process schedule does not exist in db or process schedule offline,delete schedule job in quartz, projectId:{}, scheduleId:{}",
71-
projectId, scheduleId);
85+
"Scheduler: {} bind workflow: {} does not exist in db,will delete the schedule and delete schedule job in quartz",
86+
scheduleId,
87+
schedule.getWorkflowDefinitionCode());
88+
scheduleDao.deleteById(scheduleId);
7289
deleteJob(context, projectId, scheduleId);
7390
return;
7491
}
7592

76-
WorkflowDefinition workflowDefinition =
77-
processService.findWorkflowDefinitionByCode(schedule.getWorkflowDefinitionCode());
78-
// release state : online/offline
79-
ReleaseState releaseState = workflowDefinition.getReleaseState();
80-
if (releaseState == ReleaseState.OFFLINE) {
93+
final WorkflowDefinition workflowDefinition = workflowDefinitionOptional.get();
94+
if (workflowDefinition.getReleaseState() == ReleaseState.OFFLINE) {
8195
log.warn(
82-
"process definition does not exist in db or offline,need not to create command, projectId:{}, processDefinitionId:{}",
83-
projectId, workflowDefinition.getId());
96+
"Scheduler: {} bind workflow: {} state is OFFLINE,will update the schedule status to OFFLINE and delete schedule job in quartz",
97+
scheduleId,
98+
schedule.getWorkflowDefinitionCode());
99+
schedule.setReleaseState(ReleaseState.OFFLINE);
100+
schedule.setUpdateTime(new Date());
101+
scheduleDao.updateById(schedule);
102+
deleteJob(context, projectId, scheduleId);
84103
return;
85104
}
86105

@@ -106,14 +125,14 @@ protected void executeInternal(JobExecutionContext context) {
106125

107126
private void deleteJob(JobExecutionContext context, int projectId, int scheduleId) {
108127
final Scheduler scheduler = context.getScheduler();
109-
JobKey jobKey = QuartzJobKey.of(projectId, scheduleId).toJobKey();
128+
final JobKey jobKey = QuartzJobKey.of(projectId, scheduleId).toJobKey();
110129
try {
111130
if (scheduler.checkExists(jobKey)) {
112-
log.info("Try to delete job: {}, projectId: {}, schedulerId", projectId, scheduleId);
131+
log.info("Try to delete job: {}, projectId: {}, schedulerId: {}", jobKey, projectId, scheduleId);
113132
scheduler.deleteJob(jobKey);
114133
}
115134
} catch (Exception e) {
116-
log.error("Failed to delete job: {}", jobKey);
135+
log.error("Failed to delete job: {}", jobKey, e);
117136
}
118137
}
119138
}

0 commit comments

Comments
 (0)