gRPC 中的负载均衡包括服务端负载均衡和客户端负载均衡,本文将介绍客户端负载均衡,gRPC中的客户端负载均衡主要有2个部分:1) Name Resolver 2) Load Balancing Policy 接下来将依次介绍。

Name Resolver

gRPC 中的默认 name-system 是 DNS , 同时各种客户端还提供了插件以使用自定义 name-system。gRPC Name Resolver 会根据 name-system 进行对应的解析,将用户提供的名称转换为对应的地址。

Load Balancing Policy

gRPC 中内置了多种负载均衡策略,本文将介绍常见的几种负载均衡策略:1) pick_first 2) round_robin

pick_first

pick_first 是默认的负载均衡策略,该策略从 Name Resolver 获得到服务器的地址列表,按顺序依次对每个服务器地址进行连接,直到连接成功,如果某个地址连接成功则所有的RPC请求都会发送到这个服务器地址。

round_robin

round_robin 策略,该策略从 Name Resolver 获得到服务器的地址列表,依次将请求发送到每一个地址,例如第一个请求将发送到 backend1 ,第二个请求将发送到 backend2 。

接下来分别使用这两种策略进行测试。

例子

我们先创建服务端,循环创建了3个服务端,分别使用30051、30052、30053端口。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package main

import (
	"context"
	"fmt"
	"log"
	"net"
	"sync"

	"google.golang.org/grpc"

	pb "github.com/overstarry/grpc-example/proto/echo"
)

var (
	addrs = []string{":30051", ":30052",":30053"}
)

type ecServer struct {
	pb.UnimplementedEchoServer
	addr string
}

func (s *ecServer) UnaryEcho(ctx context.Context, req *pb.EchoRequest) (*pb.EchoResponse, error) {
	return &pb.EchoResponse{Message: fmt.Sprintf("%s (from %s)", req.Message, s.addr)}, nil
}

func startServer(addr string) {
	lis, err := net.Listen("tcp", addr)
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}
	s := grpc.NewServer()
	pb.RegisterEchoServer(s, &ecServer{addr: addr})
	log.Printf("serving on %s\n", addr)
	if err := s.Serve(lis); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}

func main() {
	var wg sync.WaitGroup
	for _, addr := range addrs {
		wg.Add(1)
		go func(addr string) {
			defer wg.Done()
			startServer(addr)
		}(addr)
	}
	wg.Wait()
}

接下来创建对应的客户端连接:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	pb "github.com/overstarry/grpc-example/proto/echo"
	"google.golang.org/grpc/resolver"
)

const (
	exampleScheme      = "example"
	exampleServiceName = "lb.overstarry.grpc.io"
)

var addrs = []string{"localhost:30051", "localhost:30052", "localhost:30053"}

func callUnaryEcho(c pb.EchoClient, message string) {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	r, err := c.UnaryEcho(ctx, &pb.EchoRequest{Message: message})
	if err != nil {
		log.Fatalf("could not greet: %v", err)
	}
	fmt.Println(r.Message)
}

func makeRPCs(cc *grpc.ClientConn, n int) {
	hwc := pb.NewEchoClient(cc)
	for i := 0; i < n; i++ {
		callUnaryEcho(hwc, "this is examples/load_balancing")
	}
}

func main() {
	pickfirstConn, err := grpc.Dial(
		fmt.Sprintf("%s:///%s", exampleScheme, exampleServiceName),
		grpc.WithTransportCredentials(insecure.NewCredentials()),
	)
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer pickfirstConn.Close()

	makeRPCs(pickfirstConn, 10)

	fmt.Println()

	roundrobinConn, err := grpc.Dial(
		fmt.Sprintf("%s:///%s", exampleScheme, exampleServiceName),
		grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"round_robin":{}}]}`), // This sets the initial balancing policy.
		grpc.WithTransportCredentials(insecure.NewCredentials()),
	)
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer roundrobinConn.Close()

	makeRPCs(roundrobinConn, 10)
}


type exampleResolverBuilder struct{}

func (*exampleResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
	r := &exampleResolver{
		target: target,
		cc:     cc,
		addrsStore: map[string][]string{
			exampleServiceName: addrs,
		},
	}
	r.start()
	return r, nil
}
func (*exampleResolverBuilder) Scheme() string { return exampleScheme }

type exampleResolver struct {
	target     resolver.Target
	cc         resolver.ClientConn
	addrsStore map[string][]string
}

func (r *exampleResolver) start() {
	addrStrs := r.addrsStore[r.target.Endpoint()]
	addrs := make([]resolver.Address, len(addrStrs))
	for i, s := range addrStrs {
		addrs[i] = resolver.Address{Addr: s}
	}
	r.cc.UpdateState(resolver.State{Addresses: addrs})
}
func (*exampleResolver) ResolveNow(o resolver.ResolveNowOptions) {}
func (*exampleResolver) Close()                                  {}

func init() {
	resolver.Register(&exampleResolverBuilder{})
}

可以看到通过 grpc.WithDefaultServiceConfig 可以指定使用的负载均衡策略,由于测试的需要,我们还创建了自定义的 Name Resolver用来测试使用。

运行代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
2024/03/30 17:37:33 pick_first
this is examples/load_balancing (from :30051)
this is examples/load_balancing (from :30051)
this is examples/load_balancing (from :30051)
this is examples/load_balancing (from :30051)
this is examples/load_balancing (from :30051)
this is examples/load_balancing (from :30051)
this is examples/load_balancing (from :30051)
this is examples/load_balancing (from :30051)
this is examples/load_balancing (from :30051)
this is examples/load_balancing (from :30051)

2024/03/30 17:37:33 round_robin
this is examples/load_balancing (from :30051)
this is examples/load_balancing (from :30051)
this is examples/load_balancing (from :30052)
this is examples/load_balancing (from :30053)
this is examples/load_balancing (from :30051)
this is examples/load_balancing (from :30052)
this is examples/load_balancing (from :30053)
this is examples/load_balancing (from :30051)
this is examples/load_balancing (from :30052)
this is examples/load_balancing (from :30053)

可以看到结果与相应的策略效果一致。

参考