Skip to content

Commit f8d230f

Browse files
authored
Merge pull request #81 from goccy/support-ttl-seconds-after-finished
Support ttlSecondsAfterFinished like Job
2 parents 7ee4944 + a5a2c12 commit f8d230f

File tree

13 files changed

+221
-70
lines changed

13 files changed

+221
-70
lines changed

‎api/v1/job.go‎

Lines changed: 39 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ type PreInitCallback func(context.Context, JobExecutor) error
2323
type Job interface {
2424
Spec() batchv1.JobSpec
2525
PreInit(TestJobContainer, PreInitCallback)
26-
RunWithExecutionHandler(context.Context, func([]JobExecutor) error, func(JobExecutor) error) error
26+
RunWithExecutionHandler(context.Context, func(context.Context, []JobExecutor) error, func(context.Context, JobExecutor) error) error
2727
Mount(func(ctx context.Context, exec JobExecutor, isInitContainer bool) error)
2828
}
2929

@@ -36,7 +36,7 @@ type JobExecutor interface {
3636
CopyTo(context.Context, string, string) error
3737
Container() corev1.Container
3838
Pod() *corev1.Pod
39-
PrepareCommand([]string) ([]byte, error)
39+
PrepareCommand(context.Context, []string) ([]byte, error)
4040
}
4141

4242
type JobBuilder struct {
@@ -71,6 +71,13 @@ func (b *JobBuilder) BuildWithJob(jobSpec *batchv1.Job, containerNameToInstalled
7171
if err != nil {
7272
return nil, fmt.Errorf("kubetest: failed to create agent config: %w", err)
7373
}
74+
if sharedAgentSpec.Timeout != "" {
75+
timeout, err := time.ParseDuration(sharedAgentSpec.Timeout)
76+
if err != nil {
77+
return nil, err
78+
}
79+
cfg.SetTimeout(timeout)
80+
}
7481
if sharedAgentSpec.AllocationStartPort != nil {
7582
cfg.SetAllocationStartPort(*sharedAgentSpec.AllocationStartPort)
7683
}
@@ -94,11 +101,10 @@ func (b *JobBuilder) BuildWithJob(jobSpec *batchv1.Job, containerNameToInstalled
94101
}
95102

96103
type kubernetesJob struct {
97-
preInitCallbackContext context.Context
98-
job *kubejob.Job
99-
finalizer *corev1.Container
100-
agentConfig *kubejob.AgentConfig
101-
mountCallback func(context.Context, JobExecutor, bool) error
104+
job *kubejob.Job
105+
finalizer *corev1.Container
106+
agentConfig *kubejob.AgentConfig
107+
mountCallback func(context.Context, JobExecutor, bool) error
102108
}
103109

104110
var defaultMountCallback = func(context.Context, JobExecutor, bool) error { return nil }
@@ -117,37 +123,36 @@ func (j *kubernetesJob) Spec() batchv1.JobSpec {
117123
}
118124

119125
func (j *kubernetesJob) PreInit(c TestJobContainer, cb PreInitCallback) {
120-
j.job.PreInit(c.Container, func(exec *kubejob.JobExecutor) error {
121-
return cb(j.preInitCallbackContext, &kubernetesJobExecutor{exec: exec})
126+
j.job.PreInit(c.Container, func(ctx context.Context, exec *kubejob.JobExecutor) error {
127+
return cb(ctx, &kubernetesJobExecutor{exec: exec})
122128
})
123129
}
124130

125131
func (j *kubernetesJob) Mount(cb func(context.Context, JobExecutor, bool) error) {
126132
j.mountCallback = cb
127133
}
128134

129-
func (j *kubernetesJob) RunWithExecutionHandler(ctx context.Context, handler func([]JobExecutor) error, finalizerHandler func(JobExecutor) error) error {
130-
j.preInitCallbackContext = ctx
135+
func (j *kubernetesJob) RunWithExecutionHandler(ctx context.Context, handler func(context.Context, []JobExecutor) error, finalizerHandler func(context.Context, JobExecutor) error) error {
131136
j.job.DisableInitContainerLog()
132137
j.job.SetPendingPhaseTimeout(10 * time.Minute)
133-
j.job.SetInitContainerExecutionHandler(func(exec *kubejob.JobExecutor) error {
138+
j.job.SetInitContainerExecutionHandler(func(ctx context.Context, exec *kubejob.JobExecutor) error {
134139
e := &kubernetesJobExecutor{exec: exec}
135140
if err := j.mountCallback(ctx, e, true); err != nil {
136141
return err
137142
}
138-
_, err := exec.ExecOnly()
143+
_, err := exec.ExecOnly(ctx)
139144
return err
140145
})
141146
var finalizer *kubejob.JobFinalizer
142147
if j.finalizer != nil {
143148
finalizer = &kubejob.JobFinalizer{
144149
Container: *j.finalizer,
145-
Handler: func(exec *kubejob.JobExecutor) error {
146-
return finalizerHandler(&kubernetesJobExecutor{exec: exec})
150+
Handler: func(ctx context.Context, exec *kubejob.JobExecutor) error {
151+
return finalizerHandler(ctx, &kubernetesJobExecutor{exec: exec})
147152
},
148153
}
149154
}
150-
return j.job.RunWithExecutionHandler(ctx, func(execs []*kubejob.JobExecutor) error {
155+
return j.job.RunWithExecutionHandler(ctx, func(ctx context.Context, execs []*kubejob.JobExecutor) error {
151156
converted := make([]JobExecutor, 0, len(execs))
152157
for _, exec := range execs {
153158
e := &kubernetesJobExecutor{exec: exec}
@@ -156,27 +161,27 @@ func (j *kubernetesJob) RunWithExecutionHandler(ctx context.Context, handler fun
156161
}
157162
converted = append(converted, e)
158163
}
159-
return handler(converted)
164+
return handler(ctx, converted)
160165
}, finalizer)
161166
}
162167

163168
type kubernetesJobExecutor struct {
164169
exec *kubejob.JobExecutor
165170
}
166171

167-
func (e *kubernetesJobExecutor) PrepareCommand(cmd []string) ([]byte, error) {
168-
return e.exec.ExecPrepareCommand([]string{"sh", "-c", strings.Join(cmd, " ")})
172+
func (e *kubernetesJobExecutor) PrepareCommand(ctx context.Context, cmd []string) ([]byte, error) {
173+
return e.exec.ExecPrepareCommand(ctx, []string{"sh", "-c", strings.Join(cmd, " ")})
169174
}
170175

171-
func (e *kubernetesJobExecutor) Output(_ context.Context) ([]byte, error) {
172-
return e.exec.ExecOnly()
176+
func (e *kubernetesJobExecutor) Output(ctx context.Context) ([]byte, error) {
177+
return e.exec.ExecOnly(ctx)
173178
}
174179

175-
func (e *kubernetesJobExecutor) ExecAsync(_ context.Context) {
176-
e.exec.ExecAsync()
180+
func (e *kubernetesJobExecutor) ExecAsync(ctx context.Context) {
181+
e.exec.ExecAsync(ctx)
177182
}
178183

179-
func (e *kubernetesJobExecutor) TerminationLog(_ context.Context, log string) error {
184+
func (e *kubernetesJobExecutor) TerminationLog(ctx context.Context, log string) error {
180185
return e.exec.TerminationLog(log)
181186
}
182187

@@ -195,14 +200,14 @@ func (e *kubernetesJobExecutor) CopyFrom(ctx context.Context, src string, dst st
195200
containerName := e.exec.Container.Name
196201
addr := e.exec.Pod.Status.PodIP
197202
LoggerFromContext(ctx).Debug("copy from %s on container(%s) in %s pod to %s on local by %s", src, containerName, addr, dst, e.execProtocol())
198-
return e.exec.CopyFromPod(src, dst)
203+
return e.exec.CopyFromPod(ctx, src, dst)
199204
}
200205

201206
func (e *kubernetesJobExecutor) CopyTo(ctx context.Context, src string, dst string) error {
202207
containerName := e.exec.Container.Name
203208
addr := e.exec.Pod.Status.PodIP
204209
LoggerFromContext(ctx).Debug("copy from %s on local to %s on container(%s) in %s pod by %s", src, dst, containerName, addr, e.execProtocol())
205-
return e.exec.CopyToPod(src, dst)
210+
return e.exec.CopyToPod(ctx, src, dst)
206211
}
207212

208213
func (e *kubernetesJobExecutor) Container() corev1.Container {
@@ -244,7 +249,7 @@ func (j *localJob) Mount(cb func(context.Context, JobExecutor, bool) error) {
244249
j.mountCallback = cb
245250
}
246251

247-
func (j *localJob) RunWithExecutionHandler(ctx context.Context, handler func([]JobExecutor) error, finalizer func(JobExecutor) error) error {
252+
func (j *localJob) RunWithExecutionHandler(ctx context.Context, handler func(context.Context, []JobExecutor) error, finalizer func(context.Context, JobExecutor) error) error {
248253
preInitNameToPath := map[string]string{}
249254
if j.preInitCallback != nil {
250255
j.preInitCallback(ctx, &localJobExecutor{
@@ -269,11 +274,11 @@ func (j *localJob) RunWithExecutionHandler(ctx context.Context, handler func([]J
269274
}
270275
execs = append(execs, e)
271276
}
272-
if err := handler(execs); err != nil {
277+
if err := handler(ctx, execs); err != nil {
273278
return err
274279
}
275280
if j.finalizer != nil {
276-
if err := finalizer(&localJobExecutor{
281+
if err := finalizer(ctx, &localJobExecutor{
277282
rootDir: j.rootDir,
278283
container: *j.finalizer,
279284
}); err != nil {
@@ -306,7 +311,7 @@ func (e *localJobExecutor) cmd(cmdarr []string) (*exec.Cmd, error) {
306311
return cmd, nil
307312
}
308313

309-
func (e *localJobExecutor) PrepareCommand(cmdarr []string) ([]byte, error) {
314+
func (e *localJobExecutor) PrepareCommand(ctx context.Context, cmdarr []string) ([]byte, error) {
310315
filteredCmd := []string{}
311316
for _, c := range cmdarr {
312317
if strings.HasPrefix(c, "/") {
@@ -400,18 +405,18 @@ func (j *dryRunJob) Spec() batchv1.JobSpec {
400405
func (j *dryRunJob) PreInit(c TestJobContainer, cb PreInitCallback) {}
401406
func (j *dryRunJob) Mount(_ func(context.Context, JobExecutor, bool) error) {}
402407

403-
func (j *dryRunJob) RunWithExecutionHandler(ctx context.Context, handler func([]JobExecutor) error, finalizer func(JobExecutor) error) error {
408+
func (j *dryRunJob) RunWithExecutionHandler(ctx context.Context, handler func(context.Context, []JobExecutor) error, finalizer func(context.Context, JobExecutor) error) error {
404409
execs := make([]JobExecutor, 0, len(j.job.Spec.Template.Spec.Containers))
405410
for _, container := range j.job.Spec.Template.Spec.Containers {
406411
execs = append(execs, &dryRunJobExecutor{
407412
container: container,
408413
})
409414
}
410-
if err := handler(execs); err != nil {
415+
if err := handler(ctx, execs); err != nil {
411416
return err
412417
}
413418
if j.finalizer != nil {
414-
if err := finalizer(&dryRunJobExecutor{
419+
if err := finalizer(ctx, &dryRunJobExecutor{
415420
container: *j.finalizer,
416421
}); err != nil {
417422
return err
@@ -424,7 +429,7 @@ type dryRunJobExecutor struct {
424429
container corev1.Container
425430
}
426431

427-
func (e *dryRunJobExecutor) PrepareCommand(cmd []string) ([]byte, error) {
432+
func (e *dryRunJobExecutor) PrepareCommand(ctx context.Context, cmd []string) ([]byte, error) {
428433
return nil, nil
429434
}
430435

‎api/v1/runner_test.go‎

Lines changed: 84 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ import (
77
"io/ioutil"
88
"os"
99
"path/filepath"
10+
"strings"
1011
"testing"
12+
"time"
1113

1214
corev1 "k8s.io/api/core/v1"
1315
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -143,14 +145,16 @@ func TestRunner(t *testing.T) {
143145
t.Run(runMode.String(), func(t *testing.T) {
144146
runner := NewRunner(getConfig(), runMode)
145147
runner.SetLogger(NewLogger(os.Stdout, LogLevelDebug))
148+
ttl := int32(1)
146149
if _, err := runner.Run(context.Background(), TestJob{
147150
ObjectMeta: testjobObjectMeta(),
148151
Spec: TestJobSpec{
149152
Repos: testRepos(),
150153
MainStep: MainStep{
154+
TTLSecondsAfterFinished: &ttl,
151155
Template: TestJobTemplateSpec{
152156
ObjectMeta: metav1.ObjectMeta{
153-
GenerateName: "test",
157+
GenerateName: "finalizer-test-",
154158
},
155159
Spec: TestJobPodSpec{
156160
Containers: []TestJobContainer{
@@ -181,9 +185,87 @@ func TestRunner(t *testing.T) {
181185
}); err != nil {
182186
t.Fatal(err)
183187
}
188+
if runMode == RunModeKubernetes {
189+
// Make sure ttl is working properly.
190+
time.Sleep(5 * time.Second)
191+
clientset, err := kubernetes.NewForConfig(getConfig())
192+
if err != nil {
193+
t.Fatal(err)
194+
}
195+
list, err := clientset.BatchV1().Jobs("default").List(context.Background(), metav1.ListOptions{})
196+
if err != nil {
197+
t.Fatal(err)
198+
}
199+
for _, job := range list.Items {
200+
if strings.HasPrefix(job.GetName(), "finalizer-test-") {
201+
t.Fatal("failed to delete job by ttl")
202+
}
203+
}
204+
}
184205
})
185206
}
186207
})
208+
t.Run("cancel", func(t *testing.T) {
209+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
210+
defer cancel()
211+
runner := NewRunner(getConfig(), RunModeKubernetes)
212+
runner.SetLogger(NewLogger(os.Stdout, LogLevelDebug))
213+
ttl := int32(1)
214+
if _, err := runner.Run(ctx, TestJob{
215+
ObjectMeta: testjobObjectMeta(),
216+
Spec: TestJobSpec{
217+
Repos: testRepos(),
218+
MainStep: MainStep{
219+
TTLSecondsAfterFinished: &ttl,
220+
Template: TestJobTemplateSpec{
221+
ObjectMeta: metav1.ObjectMeta{
222+
GenerateName: "cancel-test-",
223+
},
224+
Spec: TestJobPodSpec{
225+
Containers: []TestJobContainer{
226+
{
227+
Container: corev1.Container{
228+
Name: "test",
229+
Image: "alpine",
230+
Command: []string{"sleep"},
231+
Args: []string{"50"},
232+
WorkingDir: filepath.Join("/", "work"),
233+
VolumeMounts: []corev1.VolumeMount{testRepoVolumeMount()},
234+
},
235+
},
236+
},
237+
FinalizerContainer: TestJobContainer{
238+
Container: corev1.Container{
239+
Name: "finalizer",
240+
Image: "alpine",
241+
Command: []string{"echo"},
242+
Args: []string{"finalizer"},
243+
},
244+
},
245+
Volumes: []TestJobVolume{testRepoVolume()},
246+
},
247+
},
248+
},
249+
},
250+
}); err != nil {
251+
t.Fatal(err)
252+
}
253+
// Make sure ttl is working properly.
254+
time.Sleep(5 * time.Second)
255+
clientset, err := kubernetes.NewForConfig(getConfig())
256+
if err != nil {
257+
t.Fatal(err)
258+
}
259+
list, err := clientset.BatchV1().Jobs("default").List(context.Background(), metav1.ListOptions{})
260+
if err != nil {
261+
t.Fatal(err)
262+
}
263+
for _, job := range list.Items {
264+
if strings.HasPrefix(job.GetName(), "cancel-test-") {
265+
t.Fatal("failed to delete job by ttl")
266+
}
267+
}
268+
})
187269

188270
t.Run("use token", func(t *testing.T) {
189271
if !inCluster {
@@ -1204,6 +1286,7 @@ func TestRunner(t *testing.T) {
12041286
{
12051287
Agent: &TestAgentSpec{
12061288
InstalledPath: filepath.Join("/", "bin", "kubetest-agent"),
1289+
Timeout: "10m",
12071290
},
12081291
Container: corev1.Container{
12091292
Name: "test",

‎api/v1/scheduler.go‎

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,10 @@ func (s *TaskScheduler) getScheduleKeys(ctx context.Context, builder *TaskBuilde
218218

219219
func (s *TaskScheduler) dynamicKeys(ctx context.Context, builder *TaskBuilder, source *StrategyDynamicKeySource) ([]string, error) {
220220
LoggerFromContext(ctx).Info("start to get dynamic task keys for running distributed task")
221-
keyTask, err := builder.Build(ctx, &MainStep{Template: source.Template})
221+
keyTask, err := builder.Build(ctx, &MainStep{
222+
TTLSecondsAfterFinished: source.TTLSecondsAfterFinished,
223+
Template: source.Template,
224+
})
222225
if err != nil {
223226
return nil, err
224227
}

‎api/v1/step.go‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,6 @@ const (
1414
type Step interface {
1515
GetName() string
1616
GetType() StepType
17+
GetTTLSecondsAfterFinished() *int32
1718
GetTemplate() TestJobTemplateSpec
1819
}

‎api/v1/subtask.go‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ func (t *SubTask) outputError(logGroup Logger, baseErr error) {
3939
cmdErr, ok := failedJob.Reason.(*kubejob.CommandError)
4040
if !ok {
4141
logGroup.Log(failedJob.Reason.Error())
42+
return
4243
}
4344
if !cmdErr.IsExitError() {
4445
logGroup.Log(cmdErr.Error())

‎api/v1/task.go‎

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,9 @@ func (t *Task) runWithRetry(ctx context.Context) (*TaskResult, error) {
100100
}
101101

102102
func (t *Task) run(ctx context.Context) (*TaskResult, error) {
103+
logger := LoggerFromContext(ctx)
103104
var result TaskResult
104-
if err := t.job.RunWithExecutionHandler(ctx, func(executors []JobExecutor) error {
105+
if err := t.job.RunWithExecutionHandler(ctx, func(ctx context.Context, executors []JobExecutor) error {
105106
for _, sidecar := range t.sideCarExecutors(executors) {
106107
sidecar.ExecAsync(ctx)
107108
}
@@ -115,13 +116,13 @@ func (t *Task) run(ctx context.Context) (*TaskResult, error) {
115116
result.add(subTaskGroup.Run(ctx))
116117
}
117118
return nil
118-
}, func(finalizer JobExecutor) error {
119+
}, func(ctx context.Context, finalizer JobExecutor) error {
119120
out, err := finalizer.Output(ctx)
120121
if err != nil {
121-
LoggerFromContext(ctx).Error("failed to run finalizer: output %s: %s", string(out), err.Error())
122+
logger.Error("failed to run finalizer: output %s: %s", string(out), err.Error())
122123
return fmt.Errorf("failed to run finalizer: %s: %w", string(out), err)
123124
}
124-
LoggerFromContext(ctx).Debug("run finalizer: output %s", string(out))
125+
logger.Debug("run finalizer: output %s", string(out))
125126
return nil
126127
}); err != nil {
127128
var failedJob *kubejob.FailedJob

0 commit comments

Comments
 (0)