Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Unify the config client between the alertmanager and the ruler.
Signed-off-by: Tom Wilkie <tom.wilkie@gmail.com>
  • Loading branch information
tomwilkie committed May 13, 2019
commit 8a019d3f6716a43ed82fd3ab292aff58de94eb53
48 changes: 23 additions & 25 deletions pkg/alertmanager/multitenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@ import (
"github.com/go-kit/kit/log/level"
amconfig "github.com/prometheus/alertmanager/config"
"github.com/prometheus/client_golang/prometheus"
"github.com/weaveworks/common/user"
"github.com/weaveworks/mesh"

"github.com/cortexproject/cortex/pkg/configs"
configs_client "github.com/cortexproject/cortex/pkg/configs/client"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/weaveworks/common/instrument"
"github.com/weaveworks/common/user"
"github.com/weaveworks/mesh"
)

var backoffConfig = util.BackoffConfig{
Expand Down Expand Up @@ -93,15 +92,9 @@ const (
var (
totalConfigs = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "cortex",
Name: "configs",
Name: "alertmanager_configs_total",
Help: "How many configs the multitenant alertmanager knows about.",
})
configsRequestDuration = instrument.NewHistogramCollector(prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "cortex",
Name: "configs_request_duration_seconds",
Help: "Time spent requesting configs.",
Buckets: prometheus.DefBuckets,
}, []string{"operation", "status_code"}))
totalPeers = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "cortex",
Name: "mesh_peers",
Expand All @@ -112,7 +105,6 @@ var (
)

func init() {
configsRequestDuration.Register()
prometheus.MustRegister(totalConfigs)
prometheus.MustRegister(totalPeers)
statusTemplate = template.Must(template.New("statusPage").Funcs(map[string]interface{}{
Expand Down Expand Up @@ -226,7 +218,7 @@ func (cfg *MultitenantAlertmanagerConfig) RegisterFlags(f *flag.FlagSet) {
type MultitenantAlertmanager struct {
cfg *MultitenantAlertmanagerConfig

configsAPI configs_client.AlertManagerConfigsAPI
configsAPI configs_client.Client

// The fallback config is stored as a string and parsed every time it's needed
// because we mutate the parsed results and don't want those changes to take
Expand Down Expand Up @@ -256,15 +248,17 @@ func NewMultitenantAlertmanager(cfg *MultitenantAlertmanagerConfig) (*Multitenan
return nil, fmt.Errorf("unable to create Alertmanager data directory %q: %s", cfg.DataDir, err)
}

mrouter := initMesh(cfg.MeshListenAddr, cfg.MeshHWAddr, cfg.MeshNickname, cfg.MeshPassword)
configsAPI, err := configs_client.New(configs_client.Config{
ConfigsAPIURL: cfg.ConfigsAPIURL,
ClientTimeout: cfg.ClientTimeout,
})
if err != nil {
return nil, err
}

mrouter := initMesh(cfg.MeshListenAddr, cfg.MeshHWAddr, cfg.MeshNickname, cfg.MeshPassword)
mrouter.Start()

configsAPI := configs_client.AlertManagerConfigsAPI{
URL: cfg.ConfigsAPIURL.URL,
Timeout: cfg.ClientTimeout,
}

var fallbackConfig []byte
if cfg.FallbackConfigFile != "" {
fallbackConfig, err = ioutil.ReadFile(cfg.FallbackConfigFile)
Expand Down Expand Up @@ -364,12 +358,7 @@ func (am *MultitenantAlertmanager) updateConfigs(now time.Time) error {
// poll the configuration server. Not re-entrant.
func (am *MultitenantAlertmanager) poll() (map[string]configs.View, error) {
configID := am.latestConfig
var cfgs *configs_client.ConfigsResponse
err := instrument.CollectedRequest(context.Background(), "Configs.GetOrgConfigs", configsRequestDuration, instrument.ErrorCode, func(_ context.Context) error {
var err error
cfgs, err = am.configsAPI.GetConfigs(configID)
return err
})
cfgs, err := am.configsAPI.GetAlerts(configID)
if err != nil {
level.Warn(util.Logger).Log("msg", "MultitenantAlertmanager: configs server poll failed", "err", err)
return nil, err
Expand Down Expand Up @@ -471,7 +460,7 @@ func (am *MultitenantAlertmanager) setConfig(userID string, config configs.Confi
return fmt.Errorf("unable to load fallback configuration for %v: %v", userID, err)
}
} else {
amConfig, err = configs_client.AlertmanagerConfigFromConfig(config)
amConfig, err = alertmanagerConfigFromConfig(config)
if err != nil && hasExisting {
// XXX: This means that if a user has a working configuration and
// they submit a broken one, we'll keep processing the last known
Expand Down Expand Up @@ -506,6 +495,15 @@ func (am *MultitenantAlertmanager) setConfig(userID string, config configs.Confi
return nil
}

// alertmanagerConfigFromConfig returns the Alertmanager config from the Cortex configuration.
func alertmanagerConfigFromConfig(c configs.Config) (*amconfig.Config, error) {
cfg, err := amconfig.Load(c.AlertmanagerConfig)
if err != nil {
return nil, fmt.Errorf("error parsing Alertmanager config: %s", err)
}
return cfg, nil
}

func (am *MultitenantAlertmanager) newAlertmanager(userID string, amConfig *amconfig.Config) (*Alertmanager, error) {
newAM, err := New(&Config{
UserID: userID,
Expand Down
151 changes: 151 additions & 0 deletions pkg/configs/client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package client

import (
"encoding/json"
"fmt"
"net/http"
"net/url"
"time"

"github.com/cortexproject/cortex/pkg/configs"
"github.com/cortexproject/cortex/pkg/configs/db"
"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log/level"
)

// Client is what the ruler and altermanger needs from a config store to process rules.
type Client interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This interface looks to be missing a GetTemplates method for alert template files, which were added in PR #1237.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @khaines - I don't understand - I don't see a GetTemplates method in that PR (or anywhere in the codebase). What am I missing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think we're good - GetAlerts returns a ConfigsResponse which contains the templates, like before.

// GetRules returns all Cortex configurations from a configs API server
// that have been updated after the given configs.ID was last updated.
GetRules(since configs.ID) (map[string]configs.VersionedRulesConfig, error)

// GetAlerts fetches all the alerts that have changes since since.
GetAlerts(since configs.ID) (*ConfigsResponse, error)
}

// New creates a new ConfigClient.
func New(cfg Config) (Client, error) {
// All of this falderal is to allow for a smooth transition away from
// using the configs server and toward directly connecting to the database.
// See https://github.com/cortexproject/cortex/issues/619
if cfg.ConfigsAPIURL.URL != nil {
return instrumented{
next: configsClient{
URL: cfg.ConfigsAPIURL.URL,
Timeout: cfg.ClientTimeout,
},
}, nil
}

db, err := db.NewRulesDB(cfg.DBConfig)
if err != nil {
return nil, err
}
return instrumented{
next: dbStore{
db: db,
},
}, nil
}

// configsClient allows retrieving recording and alerting rules from the configs server.
type configsClient struct {
URL *url.URL
Timeout time.Duration
}

// GetRules implements ConfigClient.
func (c configsClient) GetRules(since configs.ID) (map[string]configs.VersionedRulesConfig, error) {
suffix := ""
if since != 0 {
suffix = fmt.Sprintf("?since=%d", since)
}
endpoint := fmt.Sprintf("%s/private/api/prom/configs/rules%s", c.URL.String(), suffix)
response, err := doRequest(endpoint, c.Timeout, since)
if err != nil {
return nil, err
}
configs := map[string]configs.VersionedRulesConfig{}
for id, view := range response.Configs {
cfg := view.GetVersionedRulesConfig()
if cfg != nil {
configs[id] = *cfg
}
}
return configs, nil
}

// GetAlerts implements ConfigClient.
func (c configsClient) GetAlerts(since configs.ID) (*ConfigsResponse, error) {
suffix := ""
if since != 0 {
suffix = fmt.Sprintf("?since=%d", since)
}
endpoint := fmt.Sprintf("%s/private/api/prom/configs/alertmanager%s", c.URL.String(), suffix)
return doRequest(endpoint, c.Timeout, since)
}

func doRequest(endpoint string, timeout time.Duration, since configs.ID) (*ConfigsResponse, error) {
req, err := http.NewRequest("GET", endpoint, nil)
if err != nil {
return nil, err
}

client := &http.Client{Timeout: timeout}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("Invalid response from configs server: %v", resp.StatusCode)
}

var config ConfigsResponse
if err := json.NewDecoder(resp.Body).Decode(&config); err != nil {
level.Error(util.Logger).Log("msg", "configs: couldn't decode JSON body", "err", err)
return nil, err
}

config.since = since
return &config, nil
}

type dbStore struct {
db db.RulesDB
}

// GetRules implements ConfigClient.
func (d dbStore) GetRules(since configs.ID) (map[string]configs.VersionedRulesConfig, error) {
if since == 0 {
return d.db.GetAllRulesConfigs()
}
return d.db.GetRulesConfigs(since)
}

// GetAlerts implements ConfigClient.
func (d dbStore) GetAlerts(since configs.ID) (*ConfigsResponse, error) {
// TODO implement this!
return nil, nil
}

// ConfigsResponse is a response from server for GetConfigs.
type ConfigsResponse struct {
// The version since which these configs were changed
since configs.ID

// Configs maps user ID to their latest configs.View.
Configs map[string]configs.View `json:"configs"`
}

// GetLatestConfigID returns the last config ID from a set of configs.
func (c ConfigsResponse) GetLatestConfigID() configs.ID {
latest := c.since
for _, config := range c.Configs {
if config.ID > latest {
latest = config.ID
}
}
return latest
}
68 changes: 68 additions & 0 deletions pkg/configs/client/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package client

import (
"context"
"flag"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/weaveworks/common/instrument"

"github.com/cortexproject/cortex/pkg/configs"
"github.com/cortexproject/cortex/pkg/configs/db"
"github.com/cortexproject/cortex/pkg/util/flagext"
)

// Config says where we can find the ruler configs.
type Config struct {
DBConfig db.Config

// DEPRECATED
ConfigsAPIURL flagext.URLValue

// DEPRECATED. HTTP timeout duration for requests made to the Weave Cloud
// configs service.
ClientTimeout time.Duration
}

// RegisterFlags adds the flags required to config this to the given FlagSet
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.DBConfig.RegisterFlags(f)
f.Var(&cfg.ConfigsAPIURL, "ruler.configs.url", "DEPRECATED. URL of configs API server.")
f.DurationVar(&cfg.ClientTimeout, "ruler.client-timeout", 5*time.Second, "DEPRECATED. Timeout for requests to Weave Cloud configs service.")
}

var configsRequestDuration = instrument.NewHistogramCollector(prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "cortex",
Name: "configs_request_duration_seconds",
Help: "Time spent requesting configs.",
Buckets: prometheus.DefBuckets,
}, []string{"operation", "status_code"}))

func init() {
configsRequestDuration.Register()
}

type instrumented struct {
next Client
}

func (i instrumented) GetRules(since configs.ID) (map[string]configs.VersionedRulesConfig, error) {
var cfgs map[string]configs.VersionedRulesConfig
err := instrument.CollectedRequest(context.Background(), "Configs.GetConfigs", configsRequestDuration, instrument.ErrorCode, func(_ context.Context) error {
var err error
cfgs, err = i.next.GetRules(since) // Warning: this will produce an incorrect result if the configID ever overflows
return err
})
return cfgs, err
}

func (i instrumented) GetAlerts(since configs.ID) (*ConfigsResponse, error) {
var cfgs *ConfigsResponse
err := instrument.CollectedRequest(context.Background(), "Configs.GetConfigs", configsRequestDuration, instrument.ErrorCode, func(_ context.Context) error {
var err error
cfgs, err = i.next.GetAlerts(since) // Warning: this will produce an incorrect result if the configID ever overflows
return err
})
return cfgs, err
}
Loading