最近接到一个需求,由于某个接口失败率太高,运营小姐姐很多时候需要在失败后手动重试,希望研发能自动重试 3 次。调研了下这个接口,主要是对豆瓣明细页的爬虫和解析,监控页面显示的 QPS 特别低,那种一天可能偶尔来那么十几个请求,每个请求耗时大概在 3-5 s,没在监控页面找到失败率,没什么意义,量级太小了
方案一
串行请求,失败再重试
优点:QPS 不会增大明显
缺点:失败重试,接口耗时类积
适合场景:失败率较低,QPS 较高
方案二(采用)
并行请求,某个请求先成功返回,服务端立刻返回
优点:接口耗时并不会增长
缺点:QPS * n,导致负载大
适合场景:失败率高,接口 QPS 较低
采用该方案,因为其实 QPS 特别低,即使冗余 2 次,QPS 变成原来 3 倍,也很低。但是确能让运营小姐姐体验更好,即更快返回
冗余请求代码编写-基础版
回顾下 channel 的注意点:
(1)channel 的垃圾回收,仅由是否存在引用决定,不由是否关闭决定
(2)channel 关闭,阻塞在读、或读都会立刻返回,阻塞在写、或写会产生 panic
(3)基于以上两点,channel 一般无需关闭。需手动关闭的场景:利用关闭 channel 来唤醒所有阻塞读的 G,手动关闭场景注意最好由写入方进行关闭,避免产生 panic
(4)channel 还有一个造成 goroutine 泄露的问题,即永远阻塞在 channel 读写中,需要注意
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
|
var ErrAllFail = errors.New("Redundant.Do execute fn() failed, all fail")
type RedundantOption func(*Redundant) *Redundant
type Redundant struct {
fn func() error
count int
nextTime int64 // 单位 ms
errCount int
respChan chan struct{}
errChan chan error
}
func NewRedundant(count int, fn func() error, ops ...RedundantOption) *Redundant {
if count == 0 {
count = 3
}
r := &Redundant{
count: count,
fn: fn,
// 注意:大小必须 >= count,这样写入才不会造成阻塞,才不可能产生协程泄露的现象(即阻塞不会回收)
respChan: make(chan struct{}, count),
errChan: make(chan error, count),
}
for _, op := range ops {
r = op(r)
}
return r
}
// Do 每一个 G 并行执行冗余请求,有一个执行成功 Do 即刻返回,都执行失败(即 fn 返回 error)或内部 panic 返回 error
func (r *Redundant) Do(ctx context.Context) error {
for i := 0; i < r.count; i++ {
if i != 0 && r.nextTime != 0 {
time.Sleep(time.Duration(r.nextTime) * time.Millisecond)
}
go func() {
defer func() {
if re := recover(); re != nil {
// 注意,这里必须加上,不然可能产生 Do 的主流程永远阻塞在 for{select} 那里,造成 goroutine 泄露
r.errChan <- errors.New("Redundant.Do execute fn() panic, but recover")
}
}()
if err := r.fn(); err != nil {
r.errChan <- err
return
}
r.respChan <- struct{}{}
}()
}
for {
select {
case <-ctx.Done():
return errors.New("redundant.Do canceled by external context, context canceled")
case _ = <-r.respChan:
return nil
case err := <-r.errChan:
log.Errorc(ctx, "redundant.Do execute fn() err:%s", err.Error())
r.errCount++
if r.errCount == r.count {
return ErrAllFail
}
}
}
}
func RedundantOptionNextTime(nextTime int64) RedundantOption {
return func(redundant *Redundant) *Redundant {
redundant.nextTime = nextTime
return redundant
}
}
|
使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
func HttpDo() (*Resp, error) {
...
return resp, nil
}
func main() {
var resp *Resp
var mutex sync.Mutex
if err := NewRedundant(3, func() (err error) {
if innerResp, err := HttpDo; err != nil {
return err
}
// 其实可以直接 resp = innerResp,并发覆盖没啥事其实
if resp == nil {
mutex.Lock()
if resp == nil {
resp = innerResp
}
mutex.UnLock()
}
return
}).Do(context.TODO); err != nil {
...
}
...
}
|
冗余请求代码编写-升级版(泛型)
上一种已经实现了整个冗余请求的功能,但是作为一个工具包,还不够完美。存在一个问题,resp 需要自己赋值,而且可能产生覆盖问题,虽然整个问题可以通过加锁来解决,甚至不用理会。但是还可以使用泛型来达到更完美的效果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
type Redundant[Resp any] struct {
fn func() (Resp, error)
count int
errCount int
respChan chan Resp
errChan chan error
}
func NewRedundant[Resp any](count int, fn func() (Resp, error)) *Redundant[Resp] {
if count == 0 {
count = 3
}
return &Redundant[Resp]{
count: count,
fn: fn,
respChan: make(chan Resp, count),
errChan: make(chan error, count),
}
}
func (r *Redundant[Resp]) Do(ctx context.Context) (resp Resp, err error) {
for i := 0; i < r.count; i++ {
go func() {
defer func() {
if re := recover(); re != nil {
r.errChan <- errors.New("Redundant.Do execute fn() panic, but recover")
}
}()
if resp, err := r.fn(); err != nil {
r.errChan <- err
return
} else {
r.respChan <- resp
}
}()
}
for {
select {
case resp = <-r.respChan:
return resp, nil
case errPer := <-r.errChan:
log.Printf("DoFunc err:%s", errPer.Error())
r.errCount++
if r.errCount == r.count {
err = errors.New(fmt.Sprintf("Redundant.Do execute fn() %d in total", r.errCount))
return
}
}
}
}
|
使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
type Resp struct {
...
}
func HttpDo() (*Resp, error) {
...
return resp, nil
}
func main() {
var resp *Resp
resp, err := NewRedundant[*Resp](3, func() (*Resp, error) {
if innerResp, err := HttpDo(); err != nil {
return nil, err
} else {
return innerResp, nil
}
}).Do(context.TODO)
...
}
|
1
2
3
4
5
6
|
// 由于入参一致(无参数),加上返回值一样,其实可以直接传入即可
func main() {
var resp *Resp
resp, err := NewRedundant[*Resp](3, HttpDo).Do(context.TODO)
...
}
|
冗余请求代码编写-炫技版(泛型+反射)
上面的代码还是不够完美,还是得在 Do(n, func(){ ... })
里面写调用函数的逻辑,可不可已把整个函数托管给 Do 呢。你可以会想,其实在升级版的基础上再加个 Req 的泛型就好了嘛,改成 Do(n, func(Req) (Resp, err))
,但是函数或方法的入参很多都不规范,可能没有封装成一个 struct,而有很多个参数,这样就不能用泛型来实现。所以引出炫技版-反射调用!
特点:任意函数、方法、任意入参,限制返回参数格式 Resp, error
注意:
两个函数的使用者都必须对函数和方法有深入的了解,不然很容易用错!!!(不了解可拉到最下面有讲解方法和函数的区别)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
|
// receiver 必须为 struct 的实例,fun 必须为 interface.Method 或 struct.Method。params 为 fun 函数入参
func DoMethod[Resp any](count int, receiver, fun interface{}, params ...interface{}) (Resp, error) {
newParams := make([]interface{}, len(params)+1)
newParams[0] = receiver
copy(newParams[1:], params)
return DoFunc[Resp](count, fun, newParams...)
}
// fun 必须为 函数 或 receiver.Method
func DoFunc[Resp any](count int, fun interface{}, params ...interface{}) (resp Resp, err error) {
if count == 0 {
count = 3
}
var (
respChan = make(chan Resp, count)
errChan = make(chan error, count)
errCount int
)
for i := 0; i < count; i++ {
go func() {
defer func() {
if r := recover(); r != nil {
errChan <- errors.New("panic recover")
}
}()
resp, err := doFunc[Resp](fun, params...)
if err == nil {
respChan <- resp
} else {
errChan <- err
}
}()
}
for {
select {
case resp := <-respChan:
return resp, nil
case err := <-errChan:
log.Printf("DoFunc err:%s", err.Error())
errCount++
if errCount == count {
return resp, errors.New(fmt.Sprintf("DoFunc execute %d fun fail in total", errCount))
}
}
}
}
func doFunc[Resp any](fun interface{}, params ...interface{}) (resp Resp, err error) {
method := reflect.ValueOf(fun)
var args []reflect.Value
for _, param := range params {
args = append(args, reflect.ValueOf(param))
}
result := method.Call(args)
if len(result) != 2 {
panic("返回值出参格式不规范,不为 2")
}
r0, r1 := result[0].Interface(), result[1].Interface()
if r1 != nil {
var ok bool
if err, ok = r1.(error); !ok {
panic("编译错误,类型错误")
}
return
}
resp0, ok := r0.(Resp)
if !ok {
panic("编译错误,类型错误")
}
return resp0, nil
}
|
再谈冗余请求
拓:Go 的函数(Func)和方法(Method)
方法本质上就是函数,只不过在调用时,接收者会作为第一个参数传入
方法本质就是函数,所以跟函数一样,除了直接调用,还能赋值给变量,或作为参数传递,依照具体引用方式不同,可以分为 expression 和 value 两种:
-
method expression 表达式,struct.method
,返回最原始的函数 func(T)
,第一个参数需要传入接受者
-
method value 方法变量,structValue.method
,编译器生成包装函数,闭包捕获接受者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
type Stu struct {
age int
}
func (s *Stu) SetAge(age int) {
s.age = age
}
func main() {
stu := &Stu{}
// 2. method value
funcSetAge := stu.SetAge // 转换为闭包返回匿名函数
// 实际转换为
// func(s *Stu) func(int) {
// return func(age int) {
// // s 是自由变量,会被闭包捕获
// s.age = age
// }
// }
println(stu.age) // 0
funcSetAge(18)
println(stu.age) // 18
// 1. method expression
// 实际是返回 func(T, 参数)
// 注意,不能 Stu.SetAge,因为 SetAge 的方法集只实现了 *T,没有实现 T
funcSetAge2 := (*Stu).SetAge
funcSetAge2(stu, 100)
println(stu.age) // 100
}
|
End