前面的文章介绍了 gRPC 相关的功能,今天继续介绍 gRPC 的功能,本文将介绍 gRPC 的重试功能。

介绍

请求的重试是一个常见的功能,在我们日常的使用中,如果需要重试请求往往需要使用外部包进行实现,在gRPC 中内置了重试了功能,不需要我们自己实现。

通过查阅 gRPC 的文档可以看到,gRPC 会根据开发者设定的策略进行失败RPC的重试,有两种策略 1)重试策略:重试失败的RPC请求 2) hedging 策略:并行发生相同RPC请求。单个RPC请求可以选择两种重试策略中的一种,不能同时选择多种策略。

重试策略有以下参数可以使用:

  • maxAttempts: 必填 RPC 最大请求次数,包括原始请求
  • initialBackoff, maxBackoff, backoffMultiplier: 必填 决定下次重试前的延迟时间 random(0, min(initialBackoff*backoffMultiplier**(n-1), maxBackoff))
  • retryableStatusCodes: 必填 收到服务器非正常状态码时,根据 retryableStatusCodes 中的状态码列表决定是否重试请求

hedging 策略可以主动发送单个请求的多个副本,而无需等待响应。需要注意的是,此策略可能会导致后端多次执行,因此最好仅对可以多次执行不会有不利影响的请求开启此策略。有如下参数:

  • maxAttempts 必填
  • hedgingDelay 可选
  • nonFatalStatusCodes 可选

一个请求在没有收到成功响应时,经过 hedgingDelay没收到响应 将继续发送请求,直至达到 maxAttempts 最大次数或请求成功。当收到成功响应时,所有未完成的其它请求将停止。本质上hedging 策略可以看作在收到失败响应前重试请求。

使用

接下来讲解如何在 gRPC go语言版本中配置使用重试功能。

服务端

服务端创建一个服务,只有当请求次数达到第三次时,才返回成功响应。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package main

import (
	"context"
	"flag"
	"fmt"
	"log"
	"net"
	"sync"

	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"

	pb "github.com/overstarry/grpc-example/proto/echo"
)

var port = flag.Int("port", 9000, "port number")

type failingServer struct {
	pb.UnimplementedEchoServer
	mu sync.Mutex

	reqCounter uint
	reqModulo  uint
}


func (s *failingServer) maybeFailRequest() error {
	s.mu.Lock()
	defer s.mu.Unlock()
	s.reqCounter++
	if (s.reqModulo > 0) && (s.reqCounter%s.reqModulo == 0) {
		return nil
	}

	return status.Errorf(codes.Unavailable, "maybeFailRequest: failing it")
}

func (s *failingServer) UnaryEcho(ctx context.Context, req *pb.EchoRequest) (*pb.EchoResponse, error) {
	if err := s.maybeFailRequest(); err != nil {
		log.Println("request failed count:", s.reqCounter)
		return nil, err
	}

	log.Println("request succeeded count:", s.reqCounter)
	return &pb.EchoResponse{Message: req.Message}, nil
}

func main() {
	flag.Parse()

	address := fmt.Sprintf(":%v", *port)
	lis, err := net.Listen("tcp", address)
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}
	fmt.Println("listen on address", address)

	s := grpc.NewServer()

	
	failingservice := &failingServer{
		reqCounter: 0,
		reqModulo:  3,
	}

	pb.RegisterEchoServer(s, failingservice)
	if err := s.Serve(lis); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}

客户端

客户端通过 WithDefaultServiceConfig 设置配置好重试功能

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package main

import (
	"context"
	"flag"
	"log"
	"time"

	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	pb "github.com/overstarry/grpc-example/proto/echo"
)

var (
	addr = flag.String("addr", "127.0.0.1:9000", "the address to connect to")
	// see https://github.com/grpc/grpc/blob/master/doc/service_config.md to know more about service config
	retryPolicy = `{
		"methodConfig": [{
		  "name": [{"service": "grpc.examples.echo.Echo"}],
		  "waitForReady": true,
		  "retryPolicy": {
			  "MaxAttempts": 4,
			  "InitialBackoff": ".01s",
			  "MaxBackoff": ".01s",
			  "BackoffMultiplier": 1.0,
			  "RetryableStatusCodes": [ "UNAVAILABLE" ]
		  }
		}]}`
)

func retryDial() (*grpc.ClientConn, error) {
	return grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(retryPolicy))
}

func main() {
	flag.Parse()

	// Set up a connection to the server.
	conn, err := retryDial()
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer func() {
		if e := conn.Close(); e != nil {
			log.Printf("failed to close connection: %s", e)
		}
	}()

	c := pb.NewEchoClient(conn)

	ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
	defer cancel()

	reply, err := c.UnaryEcho(ctx, &pb.EchoRequest{Message: "Try and Success"})
	if err != nil {
		log.Fatalf("UnaryEcho error: %v", err)
	}
	log.Printf("UnaryEcho reply: %v", reply)
}

接下来先启动服务端,再启动客户端,可以看到相应的日志:

1
2
3
2024/03/23 23:10:03 request failed count: 1
2024/03/23 23:10:03 request failed count: 2
2024/03/23 23:10:03 request succeeded count: 3

在客户端代码中我们可以看到相关的配置代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
retryPolicy = `{
		"methodConfig": [{
		  "name": [{"service": "grpc.examples.echo.Echo"}],
		  "waitForReady": true,
		  "retryPolicy": {
			  "MaxAttempts": 4,
			  "InitialBackoff": ".01s",
			  "MaxBackoff": ".01s",
			  "BackoffMultiplier": 1.0,
			  "RetryableStatusCodes": [ "UNAVAILABLE" ]
		  }
		}]}`

retryPolicy 就是上文讲述的重试策略配置,name 表示对哪些RPC请求开启重试。

小结

本文简单介绍了 gRPC 的重试功能及go语言如何实现重试功能,本文的相关代码可以在 https://github.com/overstarry/grpc-example 看到。

参考