Skip to content

Converted frontend worker to a service. #2246

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## master / unreleased

* [CHANGE] Utilize separate protos for rule state and storage. Experimental ruler API will not be functional until the rollout is complete. #2226
* [CHANGE] Frontend worker in querier now starts after all Querier module dependencies are started. This fixes issue where frontend worker started to send queries to querier before it was ready to serve them (mostly visible when using experimental blocks storage). #2246
* [FEATURE] Flusher target to flush the WAL.
* `-flusher.wal-dir` for the WAL directory to recover from.
* `-flusher.concurrent-flushes` for number of concurrent flushes.
Expand Down
1 change: 0 additions & 1 deletion pkg/cortex/cortex.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,6 @@ type Cortex struct {
ingester *ingester.Ingester
flusher *flusher.Flusher
store chunk.Store
worker frontend.Worker
frontend *frontend.Frontend
tableManager *chunk.TableManager
cache cache.Cache
Expand Down
17 changes: 4 additions & 13 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,23 +202,14 @@ func (t *Cortex) initQuerier(cfg *Config) (serv services.Service, err error) {
subrouter.Path("/chunks").Handler(t.httpAuthMiddleware.Wrap(querier.ChunksHandler(queryable)))
subrouter.Path("/user_stats").Handler(middleware.AuthenticateUser.Wrap(http.HandlerFunc(t.distributor.UserStatsHandler)))

// Start the query frontend worker once the query engine and the store
// have been successfully initialized.
t.worker, err = frontend.NewWorker(cfg.Worker, httpgrpc_server.NewServer(t.server.HTTPServer.Handler), util.Logger)
// Query frontend worker will only be started after all its dependencies are started, not here.
// Worker may also be nil, if not configured, which is OK.
worker, err := frontend.NewWorker(cfg.Worker, httpgrpc_server.NewServer(t.server.HTTPServer.Handler), util.Logger)
if err != nil {
return
}

// TODO: If queryable returned from querier.New was a service, it could actually wait for storeQueryable
// (if it also implemented Service) to finish starting... and return error if it's not in Running state.
// This requires extra work, which is out of scope for this proof-of-concept...
// BUT this extra functionality is ONE OF THE REASONS to introduce entire "Services" concept into Cortex.
// For now, only return service that stops the worker, and Querier will be used even before storeQueryable has finished starting.

return services.NewIdleService(nil, func(_ error) error {
t.worker.Stop()
return nil
}), nil
return worker, nil
}

// Latest Prometheus requires r.RemoteAddr to be set to addr:port, otherwise it reject the request.
Expand Down
5 changes: 4 additions & 1 deletion pkg/querier/frontend/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"google.golang.org/grpc"

"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/services"
)

const (
Expand Down Expand Up @@ -197,7 +198,9 @@ func testFrontend(t *testing.T, handler http.Handler, test func(addr string)) {

worker, err := NewWorker(workerConfig, httpgrpc_server.NewServer(handler), logger)
require.NoError(t, err)
defer worker.Stop()
require.NoError(t, services.StartAndAwaitRunning(context.Background(), worker))

test(httpListen.Addr().String())

require.NoError(t, services.StopAndAwaitTerminated(context.Background(), worker))
}
70 changes: 30 additions & 40 deletions pkg/querier/frontend/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/httpgrpc/server"
"github.com/weaveworks/common/middleware"
Expand All @@ -18,6 +19,7 @@ import (

"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/grpcclient"
"github.com/cortexproject/cortex/pkg/util/services"
)

var (
Expand Down Expand Up @@ -46,31 +48,21 @@ func (cfg *WorkerConfig) RegisterFlags(f *flag.FlagSet) {
}

// Worker is the counter-part to the frontend, actually processing requests.
type Worker interface {
Stop()
}

type worker struct {
cfg WorkerConfig
log log.Logger
server *server.Server

ctx context.Context
cancel context.CancelFunc
watcher naming.Watcher //nolint:staticcheck //Skipping for now. If you still see this more than likely issue https://github.com/cortexproject/cortex/issues/2015 has not yet been addressed.
wg sync.WaitGroup
}

type noopWorker struct {
}

func (noopWorker) Stop() {}

// NewWorker creates a new Worker.
func NewWorker(cfg WorkerConfig, server *server.Server, log log.Logger) (Worker, error) {
// NewWorker creates a new worker and returns a service that is wrapping it.
// If no address is specified, it returns nil service (and no error).
func NewWorker(cfg WorkerConfig, server *server.Server, log log.Logger) (services.Service, error) {
if cfg.Address == "" {
level.Info(log).Log("msg", "no address specified, not starting worker")
return noopWorker{}, nil
return nil, nil
}

resolver, err := naming.NewDNSResolverWithFreq(cfg.DNSLookupDuration)
Expand All @@ -83,52 +75,50 @@ func NewWorker(cfg WorkerConfig, server *server.Server, log log.Logger) (Worker,
return nil, err
}

ctx, cancel := context.WithCancel(context.Background())

w := &worker{
cfg: cfg,
log: log,
server: server,

ctx: ctx,
cancel: cancel,
cfg: cfg,
log: log,
server: server,
watcher: watcher,
}
w.wg.Add(1)
go w.watchDNSLoop()
return w, nil
return services.NewBasicService(nil, w.watchDNSLoop, w.stopping), nil
}

// Stop the worker.
func (w *worker) Stop() {
w.watcher.Close()
w.cancel()
func (w *worker) stopping(_ error) error {
// wait until all per-address workers are done. This is only called after watchDNSLoop exits.
w.wg.Wait()
return nil
}

// watchDNSLoop watches for changes in DNS and starts or stops workers.
func (w *worker) watchDNSLoop() {
defer w.wg.Done()
func (w *worker) watchDNSLoop(servCtx context.Context) error {
go func() {
// Close the watcher, when this service is asked to stop.
// Closing the watcher makes watchDNSLoop exit, since it only iterates on watcher updates, and has no other
// way to stop. We cannot close the watcher in `stopping` method, because it is only called *after*
// watchDNSLoop exits.
<-servCtx.Done()
w.watcher.Close()
}()

cancels := map[string]context.CancelFunc{}
defer func() {
for _, cancel := range cancels {
cancel()
}
}()

for {
updates, err := w.watcher.Next()
if err != nil {
level.Error(w.log).Log("msg", "error from DNS watcher", "err", err)
return
// watcher.Next returns error when Close is called, but we call Close when our context is done.
// we don't want to report error in that case.
if servCtx.Err() != nil {
return nil
}
return errors.Wrapf(err, "error from DNS watcher")
}

for _, update := range updates {
switch update.Op {
case naming.Add:
level.Debug(w.log).Log("msg", "adding connection", "addr", update.Addr)
ctx, cancel := context.WithCancel(w.ctx)
ctx, cancel := context.WithCancel(servCtx)
cancels[update.Addr] = cancel
w.runMany(ctx, update.Addr)

Expand All @@ -139,7 +129,7 @@ func (w *worker) watchDNSLoop() {
}

default:
panic("unknown op")
return fmt.Errorf("unknown op: %v", update.Op)
}
}
}
Expand Down