暂时是说,熔断后,并不会一直不再调用下游服务,而是以一定的策略(如每分钟调用 10 次,若均返回成功,则增大调用量)试探调用下游服务,当下游服务恢复可用时,自动停止熔断。
type CircuitBreakerStatus uint8
const (
CircuitBreakerClosed CircuitBreakerStatus = iota // 正常状态
CircuitBreakerOpen // 熔断状态
CircuitBreakerHalfOpen // 半开路
)
type CircuitBreaker struct {
failedCount int64 // 失败计数
threshold int64 // 熔断器触发阈值
retryDur time.Duration
callerInfo fmt.Stringer
rwm sync.RWMutex
timer *time.Timer
s CircuitBreakerStatus
}
// 执行查询
func (cb *CircuitBreaker) Run(f func() (err error, skip bool)) error {
switch cb.Status() {
case CircuitBreakerOpen: // 熔断状态:直接返回
return fmt.Errorf("circuit breaker is open %s", cb.callerInfo)
case CircuitBreakerClosed, CircuitBreakerHalfOpen: // 正常/半开路状态:执行查询,并判断结果
err, skip := f()
switch {
case err == nil: // 无错:看是否恢复至正常状态
cb.setStatus(CircuitBreakerClosed)
case !skip:
cb.errHappened() // 出错:判断是否需要熔断
}
return err
}
return nil
}
// 出错
func (cb *CircuitBreaker) errHappened() {
switch cb.Status() {
case CircuitBreakerClosed: // 正常状态
if atomic.AddInt64(&cb.failedCount, 1) > cb.threshold { // 增加错误计数并和阈值比较
cb.setStatus(CircuitBreakerOpen) // 如果超过阈值,则进入熔断
}
case CircuitBreakerHalfOpen: // 半开路:增加错误计数,并立即熔断
atomic.AddInt64(&cb.failedCount, 1)
cb.setStatus(CircuitBreakerOpen)
case CircuitBreakerOpen: // 熔断:增加错误计数
atomic.AddInt64(&cb.failedCount, 1)
}
}
// 设置熔断器状态
func (cb *CircuitBreaker) setStatus(cbs CircuitBreakerStatus) CircuitBreakerStatus {
cb.rwm.Lock()
defer cb.rwm.Unlock()
if cb.s != cbs {
switch cbs {
case CircuitBreakerOpen:
slf.Errorf("circuit breaker is open %s", cb.callerInfo)
case CircuitBreakerHalfOpen:
slf.Infof("circuit breaker is half open %s", cb.callerInfo)
case CircuitBreakerClosed:
slf.Infof("circuit breaker is closed %s", cb.callerInfo)
}
}
cb.s = cbs
switch cb.s {
case CircuitBreakerOpen:
if cb.timer == nil { // 这里设置了一个定时器,在一定时间后将熔断器恢复至半开路状态
cb.timer = time.AfterFunc(cb.retryDur, func() {
cb.setStatus(CircuitBreakerHalfOpen)
})
} else {
cb.timer.Reset(cb.retryDur)
}
case CircuitBreakerClosed: // 半开路状态下无错,则立即将计数器清零,恢复为正常状态
atomic.StoreInt64(&cb.failedCount, 0)
}
return cb.s
}
func (cb *CircuitBreaker) Status() CircuitBreakerStatus {
cb.rwm.RLock()
defer cb.rwm.RUnlock()
return cb.s
}