熔断

熔断的做法和限流有点像,都是统计一定时间内的计数。对限流来说是统计访问量,超过阈值则拒绝或放进队列;对熔断来说是统计错误量,超过阈值则拒绝,并在一定时间后恢复

但两个应用场景不一样,但限流主要是为了保护自己的服务,熔断主要是为了保护下游、别人的服务

熔断有两个关键词,一个是暂时,一个是停止。

停止是说,当前服务一旦对下游服务进行熔断,当请求到达时,当前服务不再对下游服务进行调用,而是使用设定好的策略(如构建默认值)直接返回。

暂时是说,熔断后,并不会一直不再调用下游服务,而是以一定的策略(如每分钟调用 10 次,若均返回成功,则增大调用量)试探调用下游服务,当下游服务恢复可用时,自动停止熔断。

这两个都做的自适应的处理

熔断状态有三个:

  1. 正常状态:正常执行查询

  2. 熔断状态:该状态下直接拒绝读写。同时设置一个定时器,在一段时间后,进入半开路状态

  3. 半开路状态:允许读写。如果查询正常,恢复至正常状态。如果报错,则回到熔断状态

type CircuitBreakerStatus uint8

const (
	CircuitBreakerClosed CircuitBreakerStatus = iota // 正常状态
	CircuitBreakerOpen                               // 熔断状态
	CircuitBreakerHalfOpen                           // 半开路
)

type CircuitBreaker struct {
	failedCount int64          // 失败计数
	threshold   int64          // 熔断器触发阈值
	retryDur    time.Duration
	callerInfo  fmt.Stringer

	rwm   sync.RWMutex
	timer *time.Timer
	s     CircuitBreakerStatus
}

// 执行查询
func (cb *CircuitBreaker) Run(f func() (err error, skip bool)) error {
	switch cb.Status() {
	case CircuitBreakerOpen: // 熔断状态:直接返回
		return fmt.Errorf("circuit breaker is open %s", cb.callerInfo)
	case CircuitBreakerClosed, CircuitBreakerHalfOpen: // 正常/半开路状态:执行查询,并判断结果
		err, skip := f()
		switch {
		case err == nil:     // 无错:看是否恢复至正常状态
			cb.setStatus(CircuitBreakerClosed)
		case !skip:
			cb.errHappened() // 出错:判断是否需要熔断
		}
		return err
	}
	return nil
}

// 出错
func (cb *CircuitBreaker) errHappened() {
	switch cb.Status() {
	case CircuitBreakerClosed:   // 正常状态
		if atomic.AddInt64(&cb.failedCount, 1) > cb.threshold { // 增加错误计数并和阈值比较
			cb.setStatus(CircuitBreakerOpen) // 如果超过阈值,则进入熔断
		}
	case CircuitBreakerHalfOpen: // 半开路:增加错误计数,并立即熔断
		atomic.AddInt64(&cb.failedCount, 1)
		cb.setStatus(CircuitBreakerOpen)
	case CircuitBreakerOpen:     // 熔断:增加错误计数
		atomic.AddInt64(&cb.failedCount, 1)
	}
}

// 设置熔断器状态
func (cb *CircuitBreaker) setStatus(cbs CircuitBreakerStatus) CircuitBreakerStatus {
	cb.rwm.Lock()
	defer cb.rwm.Unlock()
	if cb.s != cbs {
		switch cbs {
		case CircuitBreakerOpen:
			slf.Errorf("circuit breaker is open %s", cb.callerInfo)
		case CircuitBreakerHalfOpen:
			slf.Infof("circuit breaker is half open %s", cb.callerInfo)
		case CircuitBreakerClosed:
			slf.Infof("circuit breaker is closed %s", cb.callerInfo)
		}
	}
	cb.s = cbs
	switch cb.s {
	case CircuitBreakerOpen: 
		if cb.timer == nil {  // 这里设置了一个定时器,在一定时间后将熔断器恢复至半开路状态
			cb.timer = time.AfterFunc(cb.retryDur, func() {
				cb.setStatus(CircuitBreakerHalfOpen)
			})
		} else {
			cb.timer.Reset(cb.retryDur)
		}
	case CircuitBreakerClosed: // 半开路状态下无错,则立即将计数器清零,恢复为正常状态
		atomic.StoreInt64(&cb.failedCount, 0)
	}
	return cb.s
}

func (cb *CircuitBreaker) Status() CircuitBreakerStatus {
	cb.rwm.RLock()
	defer cb.rwm.RUnlock()
	return cb.s
}

参考链接

https://blog.csdn.net/chijiansong/article/details/123169633

Last updated