Skip to content

Commit a2ba4bc

Browse files
authored
chore: small fixes for #16348 (#16358)
1 parent 5fe1fa7 commit a2ba4bc

File tree

2 files changed

+7
-12
lines changed

2 files changed

+7
-12
lines changed

‎pkg/dataobj/consumer/config.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,5 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
3030
cfg.BuilderConfig.RegisterFlagsWithPrefix(prefix, f)
3131
cfg.UploaderConfig.RegisterFlagsWithPrefix(prefix, f)
3232

33-
if cfg.IdleFlushTimeout <= 0 {
34-
cfg.IdleFlushTimeout = 60 * 60 * time.Second // default to 1 hour
35-
}
36-
37-
f.DurationVar(&cfg.IdleFlushTimeout, prefix+"idle-flush-timeout", cfg.IdleFlushTimeout, "The maximum amount of time to wait in seconds before flushing an object that is no longer receiving new writes")
33+
f.DurationVar(&cfg.IdleFlushTimeout, prefix+"idle-flush-timeout", 60*60*time.Second, "The maximum amount of time to wait in seconds before flushing an object that is no longer receiving new writes")
3834
}

‎pkg/dataobj/consumer/partition_processor.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@ type partitionProcessor struct {
4040
bufPool *sync.Pool
4141

4242
// Idle stream handling
43-
idleFlushTimout time.Duration
44-
lastFlush time.Time
43+
idleFlushTimeout time.Duration
44+
lastFlush time.Time
4545

4646
// Metrics
4747
metrics *partitionOffsetMetrics
@@ -113,7 +113,7 @@ func newPartitionProcessor(
113113
uploader: uploader,
114114
metastoreManager: metastoreManager,
115115
bufPool: bufPool,
116-
idleFlushTimout: idleFlushTimeout,
116+
idleFlushTimeout: idleFlushTimeout,
117117
lastFlush: time.Now(),
118118
}
119119
}
@@ -136,7 +136,7 @@ func (p *partitionProcessor) start() {
136136
}
137137
p.processRecord(record)
138138

139-
case <-time.After(p.idleFlushTimout):
139+
case <-time.After(p.idleFlushTimeout):
140140
p.idleFlush()
141141
}
142142
}
@@ -294,8 +294,7 @@ func (p *partitionProcessor) idleFlush() {
294294
return
295295
}
296296

297-
now := time.Now()
298-
if now.Sub(p.lastFlush) < p.idleFlushTimout {
297+
if time.Since(p.lastFlush) < p.idleFlushTimeout {
299298
return // Avoid checking too frequently
300299
}
301300

@@ -310,6 +309,6 @@ func (p *partitionProcessor) idleFlush() {
310309
return
311310
}
312311

313-
p.lastFlush = now
312+
p.lastFlush = time.Now()
314313
}()
315314
}

0 commit comments

Comments
 (0)