在分析prometheus中的ruler代码时,看到一个很经典的Manager模式,用于周期性更新规则信息,代码简单实用,特地总结一下:
Manager模式
首先,定义一个manager,其中需要包含维护状态的字段,比如记录上次的配置,同时也需要互斥锁,用于并发访问;
然后,NewManager生成一个新的manager;
最后,调用mangaer.start manager.run manager.stop维护生命周期。
// NewManager returns an implementation of Manager, ready to be started
// by calling the Run method.
func NewManager(o *ManagerOptions) *Manager {
if o.Metrics == nil {
o.Metrics = NewGroupMetrics(o.Registerer)
}
if o.GroupLoader == nil {
o.GroupLoader = FileLoader{}
}
m := &Manager{
groups: map[string]*Group{},
opts: o,
block: make(chan struct{}),
done: make(chan struct{}),
logger: o.Logger,
}
return m
}
// Run starts processing of the rule manager. It is blocking.
func (m *Manager) Run() {
m.start()
<-m.done
}
func (m *Manager) start() {
close(m.block)
}
// Stop the rule manager's rule evaluation cycles.
func (m *Manager) Stop() {
m.mtx.Lock()
defer m.mtx.Unlock()
level.Info(m.logger).Log("msg", "Stopping rule manager...")
for _, eg := range m.groups {
eg.stop()
}
// Shut down the groups waiting multiple evaluation intervals to write
// staleness markers.
close(m.done)
level.Info(m.logger).Log("msg", "Rule manager stopped")
}
Load and Update
首先,加载所有新的配置,并为每个配置生成hash key,存入map中;
然后,与Manager维护的状态,进行比较
- 相同则跳过
- 没有或者不相同则更新
- 最后删除旧的数据
实现要点:
- 用sync.WaitGroup保证goroutine并发运行结束
- 用<-m.block信号作为同步运行信号
- 用delete(key)删除map中已经处理过的数据,最后直接删除map中剩余的多余数据
代码如下:
// Update the rule manager's state as the config requires. If
// loading the new rules failed the old rule set is restored.
func (m *Manager) Update(interval time.Duration, files []string, externalLabels labels.Labels, externalURL string, ruleGroupPostProcessFunc RuleGroupPostProcessFunc) error {
m.mtx.Lock()
defer m.mtx.Unlock()
groups, errs := m.LoadGroups(interval, externalLabels, externalURL, ruleGroupPostProcessFunc, files...)
if errs != nil {
for _, e := range errs {
level.Error(m.logger).Log("msg", "loading groups failed", "err", e)
}
return errors.New("error loading rules, previous rule set restored")
}
m.restored = true
var wg sync.WaitGroup
for _, newg := range groups {
// If there is an old group with the same identifier,
// check if new group equals with the old group, if yes then skip it.
// If not equals, stop it and wait for it to finish the current iteration.
// Then copy it into the new group.
gn := GroupKey(newg.file, newg.name)
oldg, ok := m.groups[gn]
delete(m.groups, gn)
if ok && oldg.Equals(newg) {
groups[gn] = oldg
continue
}
wg.Add(1)
go func(newg *Group) {
if ok {
oldg.stop()
newg.CopyState(oldg)
}
wg.Done()
// Wait with starting evaluation until the rule manager
// is told to run. This is necessary to avoid running
// queries against a bootstrapping storage.
<-m.block
newg.run(m.opts.Context)
}(newg)
}
// Stop remaining old groups.
wg.Add(len(m.groups))
for n, oldg := range m.groups {
go func(n string, g *Group) {
g.markStale = true
g.stop()
if m := g.metrics; m != nil {
m.IterationsMissed.DeleteLabelValues(n)
m.IterationsScheduled.DeleteLabelValues(n)
m.EvalTotal.DeleteLabelValues(n)
m.EvalFailures.DeleteLabelValues(n)
m.GroupInterval.DeleteLabelValues(n)
m.GroupLastEvalTime.DeleteLabelValues(n)
m.GroupLastDuration.DeleteLabelValues(n)
m.GroupRules.DeleteLabelValues(n)
m.GroupSamples.DeleteLabelValues((n))
}
wg.Done()
}(n, oldg)
}
wg.Wait()
m.groups = groups
return nil
}