@@ -24,7 +24,7 @@ const (
2424 phaseRunning = "running"
2525)
2626
27- type ConsumerFactory func (committer Committer ) (Consumer , error )
27+ type ConsumerFactory func (committer Committer , logger log. Logger ) (Consumer , error )
2828
2929type Consumer interface {
3030 Start (ctx context.Context , recordsChan <- chan []Record ) func ()
@@ -126,27 +126,29 @@ func (s *ReaderService) starting(ctx context.Context) error {
126126 s .metrics .reportOwnerOfPartition (s .reader .Partition ())
127127 s .metrics .reportStarting ()
128128
129+ logger := log .With (s .logger , "phase" , phaseStarting )
130+
129131 // Fetch the last committed offset to determine where to start reading
130132 lastCommittedOffset , err := s .reader .FetchLastCommittedOffset (ctx )
131133 if err != nil {
132134 return fmt .Errorf ("fetching last committed offset: %w" , err )
133135 }
134136
135137 if lastCommittedOffset == int64 (KafkaEndOffset ) {
136- level .Warn (s . logger ).Log ("msg" , fmt .Sprintf ("no committed offset found, starting from %d" , kafkaStartOffset ))
138+ level .Warn (logger ).Log ("msg" , fmt .Sprintf ("no committed offset found, starting from %d" , kafkaStartOffset ))
137139 } else {
138- level .Debug (s . logger ).Log ("msg" , "last committed offset" , "offset" , lastCommittedOffset )
140+ level .Debug (logger ).Log ("msg" , "last committed offset" , "offset" , lastCommittedOffset )
139141 }
140142
141143 consumeOffset := int64 (kafkaStartOffset )
142144 if lastCommittedOffset >= 0 {
143145 // Read from the next offset.
144146 consumeOffset = lastCommittedOffset + 1
145147 }
146- level .Debug (s . logger ).Log ("msg" , "consuming from offset" , "offset" , consumeOffset )
148+ level .Debug (logger ).Log ("msg" , "consuming from offset" , "offset" , consumeOffset )
147149 s .reader .SetOffsetForConsumption (consumeOffset )
148150
149- if err = s .processConsumerLagAtStartup (ctx ); err != nil {
151+ if err = s .processConsumerLagAtStartup (ctx , logger ); err != nil {
150152 return fmt .Errorf ("failed to process consumer lag at startup: %w" , err )
151153 }
152154
@@ -157,7 +159,7 @@ func (s *ReaderService) running(ctx context.Context) error {
157159 level .Info (s .logger ).Log ("msg" , "reader service running" )
158160 s .metrics .reportRunning ()
159161
160- consumer , err := s .consumerFactory (s .committer )
162+ consumer , err := s .consumerFactory (s .committer , log . With ( s . logger , "phase" , phaseRunning ) )
161163 if err != nil {
162164 return fmt .Errorf ("creating consumer: %w" , err )
163165 }
@@ -172,13 +174,13 @@ func (s *ReaderService) running(ctx context.Context) error {
172174 return nil
173175}
174176
175- func (s * ReaderService ) processConsumerLagAtStartup (ctx context.Context ) error {
177+ func (s * ReaderService ) processConsumerLagAtStartup (ctx context.Context , logger log. Logger ) error {
176178 if s .cfg .MaxConsumerLagAtStartup <= 0 {
177- level .Debug (s . logger ).Log ("msg" , "processing consumer lag at startup is disabled" )
179+ level .Debug (logger ).Log ("msg" , "processing consumer lag at startup is disabled" )
178180 return nil
179181 }
180182
181- consumer , err := s .consumerFactory (s .committer )
183+ consumer , err := s .consumerFactory (s .committer , logger )
182184 if err != nil {
183185 return fmt .Errorf ("failed to create consumer: %w" , err )
184186 }
@@ -192,13 +194,13 @@ func (s *ReaderService) processConsumerLagAtStartup(ctx context.Context) error {
192194 wait ()
193195 }()
194196
195- level .Debug (s . logger ).Log ("msg" , "processing consumer lag at startup" )
196- _ , err = s .fetchUntilLagSatisfied (ctx , s .cfg .MaxConsumerLagAtStartup , s . logger , recordsCh , time .Since )
197+ level .Debug (logger ).Log ("msg" , "processing consumer lag at startup" )
198+ _ , err = s .fetchUntilLagSatisfied (ctx , s .cfg .MaxConsumerLagAtStartup , logger , recordsCh , time .Since )
197199 if err != nil {
198- level .Error (s . logger ).Log ("msg" , "failed to catch up" , "err" , err )
200+ level .Error (logger ).Log ("msg" , "failed to catch up" , "err" , err )
199201 return err
200202 }
201- level .Debug (s . logger ).Log ("msg" , "processing consumer lag at startup finished" )
203+ level .Debug (logger ).Log ("msg" , "processing consumer lag at startup finished" )
202204
203205 return nil
204206}
0 commit comments