09.gRPC的NameResolver
gRPC NameResolver 根据 name-system 选择对应的解析器,用以解析调用时提供的服务器名,最后返回具体地址列表(IP+端口号)。
gRPC 中的默认 name-system 是 DNS,同时在客户端以插件形式提供了自定义 name-system 的机制。
gRPC 自定义 NameResolver
本节的示例,使用 etcd 存储服务元数据(其实就是地址和端口,如果需要更多信息,可以定义结构体)。代码基于 grpc-testing v0.0.1 实现。
使用 docker-compose 启动 etcd
服务端
在服务端启动时,向 etcd 注册自己,并定时续约。
// 服务的地址和端口
var ip = "127.0.0.1"
var port = flag.Int("p", 8080, "port")
// etcd endpoint
var etcd = []string{"127.0.0.1:12379", "127.0.0.1:22379", "127.0.0.1:32379"}
// 租期
var lease int64 = 10
func main() {
log.Println("Go")
flag.Parse()
// 创建 etcd client
cfg := clientv3.Config{
Endpoints: etcd,
DialTimeout: time.Minute,
}
var etcdClient *clientv3.Client
var err error
if etcdClient, err = clientv3.New(cfg); err != nil {
log.Fatal("create etcd client failed")
}
defer etcdClient.Close()
//设置租约时间
grant, err := etcdClient.Grant(context.Background(), lease)
if err != nil {
log.Fatal("craete grant failed", err)
}
// put key
key := "/service/echo/" + uuid.New().String()
value := fmt.Sprintf("%s:%d", ip, *port)
_, err = etcdClient.Put(context.Background(), key, value, clientv3.WithLease(grant.ID))
if err != nil {
log.Fatal("put k-v failed", err)
}
// 定时续约
leaseRespChan, err := etcdClient.KeepAlive(context.Background(), grant.ID)
if err != nil {
log.Fatal("keep alive failed", err)
}
go func(c <-chan *clientv3.LeaseKeepAliveResponse) {
for v := range c {
fmt.Println("续约成功", v.ResponseHeader)
}
log.Println("停止续约")
}(leaseRespChan)
// 程序退出时,停止续约
defer func() {
etcdClient.Revoke(context.Background(), grant.ID)
log.Println("停止续约")
}()
listen, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer()
echo.RegisterEchoServer(s, &EchoServer{})
if err := s.Serve(listen); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
客户端
gRPC 提供了简单的 name resolver 方案,即实现需要的两个接口: resolver.Builder
, resolver.Resolver
定义在
resolver/resolver.go
文件中。 在 Dial 前,可以使用 resolver.Register
函数注册自定义的 Build
。 resolver.Builder
, resolver.Resolver
定义如下:
// Builder creates a resolver that will be used to watch name resolution updates.
type Builder interface {
// Build creates a new resolver for the given target.
//
// gRPC dial calls Build synchronously, and fails if the returned error is
// not nil.
Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
// Scheme returns the scheme supported by this resolver.
// Scheme is defined at https://github.com/grpc/grpc/blob/master/doc/naming.md.
Scheme() string
}
// Resolver watches for the updates on the specified target.
// Updates include address updates and service config updates.
type Resolver interface {
// ResolveNow will be called by gRPC to try to resolve the target name
// again. It's just a hint, resolver can ignore this if it's not necessary.
//
// It could be called multiple times concurrently.
ResolveNow(ResolveNowOptions)
// Close closes the resolver.
Close()
}
客户端实现
// -----------------------------------------------------------------------------
// 实现 resolver.Builder
type EtcdBuilder struct {
EtcdEndpoints []string
}
func (b *EtcdBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
fmt.Printf("build Scheme : %s Authority : %s Endpoint : %s \n", target.Scheme, target.Authority, target.Endpoint)
// 创建 Etcd 客户端连接
cfg := clientv3.Config{
Endpoints: b.EtcdEndpoints,
DialTimeout: time.Minute,
}
var etcdClient *clientv3.Client
var err error
if etcdClient, err = clientv3.New(cfg); err != nil {
log.Println("create etcd client failed")
return nil, err
}
// 创建 Resolver
r := ServiceResolver{
target: target,
cc: cc,
EtcdClient: etcdClient,
}
r.ResolveNow(resolver.ResolveNowOptions{})
return &r, nil
}
func (b *EtcdBuilder) Scheme() string {
return "grpclb"
}
// -----------------------------------------------------------------------------
// 实现 resolver.Resolver 接口
type ServiceResolver struct {
target resolver.Target
cc resolver.ClientConn
EtcdClient *clientv3.Client
}
func (r *ServiceResolver) ResolveNow(opts resolver.ResolveNowOptions) {
// 获取节点列表
resp, err := r.EtcdClient.Get(context.Background(), fmt.Sprintf("/service/%s", r.target.Endpoint), clientv3.WithPrefix())
if err != nil {
log.Println("get service error", err)
}
var Address = make([]resolver.Address, 0, resp.Count)
for _, v := range resp.Kvs {
fmt.Println(string(v.Key), string(v.Value))
Address = append(Address, resolver.Address{Addr: string(v.Value)})
}
r.cc.UpdateState(resolver.State{
Addresses: Address,
})
}
func (r *ServiceResolver) Close() {
r.EtcdClient.Close()
}
// -----------------------------------------------------------------------------
func main() {
fmt.Println("go")
// 创建 resolver.Builder
b := &EtcdBuilder{
EtcdEndpoints: []string{"127.0.0.1:12379", "127.0.0.1:22379", "127.0.0.1:32379"},
}
// 注册 resolver.Builder
resolver.Register(b)
// 修改连接的地址,使用自定义的 name resolver
conn, err := grpc.Dial(fmt.Sprintf("%s:///%s", b.Scheme(), "echo"),
grpc.WithInsecure(),
// 配置 loadBalancing 策略
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`),
)
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
for i := 0; i < 100; i++ {
c := echo.NewEchoClient(conn)
ctx, cancle := context.WithCancel(context.Background())
defer cancle()
r, err := c.Echo(ctx, &echo.EchoRequest{Msg: "hello"})
if err != nil {
log.Fatalf("echo failed :%#v", err)
}
log.Print(r.GetMsg())
time.Sleep(time.Second)
}
}