冗余请求优化接口

最近接到一个需求,由于某个接口失败率太高,运营小姐姐很多时候需要在失败后手动重试,希望研发能自动重试 3 次。调研了下这个接口,主要是对豆瓣明细页的爬虫和解析,监控页面显示的 QPS 特别低,那种一天可能偶尔来那么十几个请求,每个请求耗时大概在 3-5 s,没在监控页面找到失败率,没什么意义,量级太小了

方案一

串行请求,失败再重试

优点:QPS 不会增大明显

缺点:失败重试,接口耗时类积

适合场景:失败率较低,QPS 较高

方案二(采用)

并行请求,某个请求先成功返回,服务端立刻返回

优点:接口耗时并不会增长

缺点:QPS * n,导致负载大

适合场景:失败率高,接口 QPS 较低

采用该方案,因为其实 QPS 特别低,即使冗余 2 次,QPS 变成原来 3 倍,也很低。但是确能让运营小姐姐体验更好,即更快返回

冗余请求代码编写-基础版

回顾下 channel 的注意点:

(1)channel 的垃圾回收,仅由是否存在引用决定,不由是否关闭决定
(2)channel 关闭,阻塞在读、或读都会立刻返回,阻塞在写、或写会产生 panic
(3)基于以上两点,channel 一般无需关闭。需手动关闭的场景:利用关闭 channel 来唤醒所有阻塞读的 G,手动关闭场景注意最好由写入方进行关闭,避免产生 panic
(4)channel 还有一个造成 goroutine 泄露的问题,即永远阻塞在 channel 读写中,需要注意

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
var ErrAllFail = errors.New("Redundant.Do execute fn() failed, all fail")

type RedundantOption func(*Redundant) *Redundant

type Redundant struct {
	fn func() error

	count    int
	nextTime int64 // 单位 ms

	errCount int
	respChan chan struct{}
	errChan  chan error
}

func NewRedundant(count int, fn func() error, ops ...RedundantOption) *Redundant {
	if count == 0 {
		count = 3
	}
	r := &Redundant{
		count:    count,
		fn:       fn,
        // 注意:大小必须 >= count,这样写入才不会造成阻塞,才不可能产生协程泄露的现象(即阻塞不会回收)
		respChan: make(chan struct{}, count),
		errChan:  make(chan error, count),
	}
	for _, op := range ops {
		r = op(r)
	}

	return r
}

// Do 每一个 G 并行执行冗余请求,有一个执行成功 Do 即刻返回,都执行失败(即 fn 返回 error)或内部 panic 返回 error
func (r *Redundant) Do(ctx context.Context) error {
	for i := 0; i < r.count; i++ {
		if i != 0 && r.nextTime != 0 {
			time.Sleep(time.Duration(r.nextTime) * time.Millisecond)
		}
		go func() {
			defer func() {
				if re := recover(); re != nil {
                    // 注意,这里必须加上,不然可能产生 Do 的主流程永远阻塞在 for{select} 那里,造成 goroutine 泄露
					r.errChan <- errors.New("Redundant.Do execute fn() panic, but recover")
				}
			}()

			if err := r.fn(); err != nil {
				r.errChan <- err
				return
			}
			r.respChan <- struct{}{}
		}()
	}
	for {
		select {
		case <-ctx.Done():
			return errors.New("redundant.Do canceled by external context, context canceled")
		case _ = <-r.respChan:
			return nil
		case err := <-r.errChan:
			log.Errorc(ctx, "redundant.Do execute fn() err:%s", err.Error())
			r.errCount++
			if r.errCount == r.count {
				return ErrAllFail
			}
		}
	}
}

func RedundantOptionNextTime(nextTime int64) RedundantOption {
	return func(redundant *Redundant) *Redundant {
		redundant.nextTime = nextTime
		return redundant
	}
}

使用

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
func HttpDo() (*Resp, error) {
    ...
    return resp, nil
}

func main() {
    var resp *Resp
    var mutex sync.Mutex
    if err := NewRedundant(3, func() (err error) {
        if innerResp, err := HttpDo; err != nil {
            return err
        }
        // 其实可以直接 resp = innerResp,并发覆盖没啥事其实
        if resp == nil {
            mutex.Lock()
            if resp == nil {
                resp = innerResp
            }
            mutex.UnLock()
        }
        
        return
    }).Do(context.TODO); err != nil {
        ...
    }
    ...
}

冗余请求代码编写-升级版(泛型)

上一种已经实现了整个冗余请求的功能,但是作为一个工具包,还不够完美。存在一个问题,resp 需要自己赋值,而且可能产生覆盖问题,虽然整个问题可以通过加锁来解决,甚至不用理会。但是还可以使用泛型来达到更完美的效果

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
type Redundant[Resp any] struct {
    fn       func() (Resp, error)
	
    count    int
	
    errCount int
	respChan chan Resp
	errChan  chan error
}

func NewRedundant[Resp any](count int, fn func() (Resp, error)) *Redundant[Resp] {
	if count == 0 {
		count = 3
	}
	return &Redundant[Resp]{
		count:    count,
		fn:       fn,
		respChan: make(chan Resp, count),
		errChan:  make(chan error, count),
	}
}

func (r *Redundant[Resp]) Do(ctx context.Context) (resp Resp, err error) {
	for i := 0; i < r.count; i++ {
		go func() {
			defer func() {
				if re := recover(); re != nil {
					r.errChan <- errors.New("Redundant.Do execute fn() panic, but recover")
				}
			}()

			if resp, err := r.fn(); err != nil {
				r.errChan <- err
				return
			} else {
				r.respChan <- resp
			}
		}()
	}
	for {
		select {
		case resp = <-r.respChan:
			return resp, nil
		case errPer := <-r.errChan:
			log.Printf("DoFunc err:%s", errPer.Error())
			r.errCount++
			if r.errCount == r.count {
				err = errors.New(fmt.Sprintf("Redundant.Do execute fn() %d in total", r.errCount))
				return
			}
		}
	}
}

使用

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
type Resp struct {
    ...
}


func HttpDo() (*Resp, error) {
    ...
    return resp, nil
}

func main() {
    var resp *Resp
    resp, err := NewRedundant[*Resp](3, func() (*Resp, error) {
        if innerResp, err := HttpDo(); err != nil {
            return nil, err
        } else {
            return innerResp, nil
        }
        
    }).Do(context.TODO)
    ...
}
1
2
3
4
5
6
// 由于入参一致(无参数),加上返回值一样,其实可以直接传入即可
func main() {
    var resp *Resp
    resp, err := NewRedundant[*Resp](3, HttpDo).Do(context.TODO)
    ...
}

冗余请求代码编写-炫技版(泛型+反射)

上面的代码还是不够完美,还是得在 Do(n, func(){ ... }) 里面写调用函数的逻辑,可不可已把整个函数托管给 Do 呢。你可以会想,其实在升级版的基础上再加个 Req 的泛型就好了嘛,改成 Do(n, func(Req) (Resp, err)),但是函数或方法的入参很多都不规范,可能没有封装成一个 struct,而有很多个参数,这样就不能用泛型来实现。所以引出炫技版-反射调用!

特点:任意函数、方法、任意入参,限制返回参数格式 Resp, error

注意:

两个函数的使用者都必须对函数和方法有深入的了解,不然很容易用错!!!(不了解可拉到最下面有讲解方法和函数的区别)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
// receiver 必须为 struct 的实例,fun 必须为 interface.Method 或 struct.Method。params 为 fun 函数入参
func DoMethod[Resp any](count int, receiver, fun interface{}, params ...interface{}) (Resp, error) {
	newParams := make([]interface{}, len(params)+1)
	newParams[0] = receiver
	copy(newParams[1:], params)
	return DoFunc[Resp](count, fun, newParams...)
}

// fun 必须为 函数 或 receiver.Method  
func DoFunc[Resp any](count int, fun interface{}, params ...interface{}) (resp Resp, err error) {
	if count == 0 {
		count = 3
	}
	var (
		respChan = make(chan Resp, count)
		errChan  = make(chan error, count)
		errCount int
	)
	for i := 0; i < count; i++ {
		go func() {
			defer func() {
				if r := recover(); r != nil {
					errChan <- errors.New("panic recover")
				}
			}()

			resp, err := doFunc[Resp](fun, params...)
			if err == nil {
				respChan <- resp
			} else {
				errChan <- err
			}
		}()
	}
	for {
		select {
		case resp := <-respChan:
			return resp, nil
		case err := <-errChan:
			log.Printf("DoFunc err:%s", err.Error())
			errCount++
			if errCount == count {
				return resp, errors.New(fmt.Sprintf("DoFunc execute %d fun fail in total", errCount))
			}
		}
	}
}

func doFunc[Resp any](fun interface{}, params ...interface{}) (resp Resp, err error) {
	method := reflect.ValueOf(fun)
	var args []reflect.Value
	for _, param := range params {
		args = append(args, reflect.ValueOf(param))
	}
	result := method.Call(args)
	if len(result) != 2 {
		panic("返回值出参格式不规范,不为 2")
	}
	r0, r1 := result[0].Interface(), result[1].Interface()
	if r1 != nil {
		var ok bool
		if err, ok = r1.(error); !ok {
			panic("编译错误,类型错误")
		}
		return
	}
	resp0, ok := r0.(Resp)
	if !ok {
		panic("编译错误,类型错误")
	}
	return resp0, nil
}

再谈冗余请求

/images/冗余请求优化接口/冗余请求.png

拓:Go 的函数(Func)和方法(Method)

方法本质上就是函数,只不过在调用时,接收者会作为第一个参数传入

方法本质就是函数,所以跟函数一样,除了直接调用,还能赋值给变量,或作为参数传递,依照具体引用方式不同,可以分为 expression 和 value 两种:

  1. method expression 表达式,struct.method,返回最原始的函数 func(T),第一个参数需要传入接受者

  2. method value 方法变量,structValue.method,编译器生成包装函数,闭包捕获接受者

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
type Stu struct {
	age int
}

func (s *Stu) SetAge(age int) {
	s.age = age
}

func main() {
	stu := &Stu{}

	// 2. method value
	funcSetAge := stu.SetAge // 转换为闭包返回匿名函数
	// 实际转换为
	// func(s *Stu) func(int) {
	// 	return func(age int) {
	// 		// s 是自由变量,会被闭包捕获
	// 		s.age = age
	// 	}
	// }

	println(stu.age) // 0
	funcSetAge(18)
	println(stu.age) // 18

	// 1. method expression
	// 实际是返回 func(T, 参数)
	// 注意,不能 Stu.SetAge,因为 SetAge 的方法集只实现了 *T,没有实现 T
	funcSetAge2 := (*Stu).SetAge
	funcSetAge2(stu, 100)
	println(stu.age) // 100
}

End

0%