Go猜想录
大道至简,悟者天成
手搓 RPC 框架 -- #1 基础架构

底层通信选型

设计微服务框架的起点是确定底层通信方式,因为服务治理等上层功能都建立在此基础之上。可选的通信方式包括:

  • 直接使用 gRPC
  • 直接使用 HTTP
  • 设计自定义的 RPC 协议

接下来我们将共同设计自定义的 RPC 协议。

rpc-1.png

RPC 要解决的核心问题

RPC(Remote Procedure Call,远程过程调用)的核心是让客户端像本地调用一样调用服务器上的方法。关键在于如何将客户端(左侧虚线框的服务 A)映射到服务器端(右侧实线框的服务 A),实现透明的远程调用。

rpc-2.png

调用信息

要完成这种映射,首先需要解决的第一个问题是:映射什么?举个例子,假如客户端调用的是 userService.GetById,传入的参数是一个 int 类型的值 123。那么,服务端如何得知客户端调用的是 userService.GetById,且参数是 int 类型的 123 呢?

rpc-3.png

答案很简单:我们将这些调用信息传递给服务端,统称为“调用信息”。那么,调用信息应该包含哪些内容?

  • 服务名:例如 userService
  • 方法名:例如 GetById
  • 参数值:例如 123

至于是否需要传递参数类型,这取决于是否支持方法重载。如果你在支持重载的语言中设计微服务框架,并且决定支持重载,那么就需要传递参数类型;否则,参数类型可以省略。

rpc-4.png

客户端捕捉本地调用

既然需要传递调用信息,那么问题就在于:RPC 客户端如何获取这些调用信息? 也就是说,当用户调用 userService.GetById(123) 时,底层框架如何得知 userServiceGetById 和参数 123 这些信息?如果你是 RPC 设计者,如何捕捉本地调用的信息?

image.png

代理模式

这就需要使用代理模式:定义一个结构体,并为其中的方法类型字段注入调用逻辑。需要注意的是,Go 语言无法直接修改方法实现,因此我们只能通过代理模式来实现这一功能。

// UserService 声明服务,反射会把 GetById 转成一个 RPC 调用
type UserService struct {
	// 用反射来赋值
	// 类型是函数的字段,它不是方法(它不是定义在 UserService 上的方法)
	// 本质上是一个字段
	GetById func(ctx context.Context, req *GetByIdReq) (*GetByIdResp, error)
}

type GetByIDReq struct {
	ID int
}
type GetByIDResp struct {
	Msg string
}
func (s UserService) Name() string {
	return "user-service"
}

我们做出以下约定:

  • 每个方法的第一个参数必须是 context.Context,第二个参数为请求结构体的指针,且只有这两个参数。
  • 返回值的第一个为响应指针,第二个为 error,且只能有这两个返回值。

这种限制的主要目的是简化微服务框架的代码。在实际生产环境中,你可以选择保留此限制,也可以根据需求灵活调整。

反射生成代理

客户端一侧将本地调用篡改成 RPC 调用进行发送。

func (c *Client) InitService(service Service) error {
	return setFuncField(service, c, c.serializer)
}

func setFuncField(service Service, proxy Proxy, s serialize.Serializer) error {
	if service == nil {
		return ErrServiceNil
	}

	val := reflect.ValueOf(service)
	typ := val.Type()
	// 只支持指向结构体的一级指针
	if typ.Kind() != reflect.Pointer || typ.Elem().Kind() != reflect.Struct {
		return ErrServiceWrongType
	}

	val = val.Elem()
	typ = typ.Elem()

	numField := typ.NumField()
	for i := 0; i < numField; i++ {
		fieldTyp := typ.Field(i)
		fieldVal := val.Field(i)
		if !fieldVal.CanSet() || fieldVal.Kind() != reflect.Func {
			continue
		}
		fn := func(args []reflect.Value) (results []reflect.Value) {
			...
		}
		fnVal := reflect.MakeFunc(fieldTyp.Type, fn)
		fieldVal.Set(fnVal)
	}

	return nil
}

编码并发送请求

到目前为止,我们已经获得了调用信息。接下来,需要将这些调用信息转换为字节流,并通过网络发送到服务端。这实际上涉及到 RPC 协议设计以及序列化协议的选择

rpc-6.png

编码并发送

上文中省略的 fn 部分就是将客户端一侧的调用信息封装好发送给服务端。内部涉及到了信息的编解码和网络的发送流程。

fn := func(args []reflect.Value) (results []reflect.Value) {
	ctx := args[0].Interface().(context.Context)
	retVal := reflect.New(fieldTyp.Type.Out(0).Elem())

	reqArg, err := s.Encode(args[1].Interface())
	if err != nil {
		return []reflect.Value{
			retVal,
			reflect.ValueOf(err),
		}
	}
	var meta map[string]string
	if isOneway(ctx) {
		meta = map[string]string{
			"one-way": "true",
		}
	}
	req := &message.Request{
		ServiceName: service.Name(),
		MethodName:  fieldTyp.Name,
		Data:        reqArg,
		Serializer:  s.Code(),
		Meta:        meta,
	}
	fmt.Println(req)

	req.CalculateHeaderLength()
	req.CalculateBodyLength()

	resp, err := proxy.Invoke(ctx, req)
	if err != nil {
		return []reflect.Value{
			retVal,
			reflect.ValueOf(err),
		}
	}
	fmt.Println(string(resp.Data))

	var retErr error
	if len(resp.Error) > 0 {
		// 服务端传来的 error
		retErr = errors.New(string(resp.Error))
	}

	if len(resp.Data) > 0 {
		err = s.Decode(resp.Data, retVal.Interface())
		if err != nil {
			// 反序列化的 error
			return []reflect.Value{
				retVal,
				reflect.ValueOf(err),
			}
		}
	}

	var retErrVal reflect.Value
	if retErr != nil {
		retErrVal = reflect.ValueOf(retErr)
	} else {
		retErrVal = reflect.Zero(reflect.TypeOf(new(error)).Elem())
	}

	return []reflect.Value{
		retVal,
		retErrVal,
	}
}

服务端接收

服务端监听在对应的端口,收到请求后将调用信息还原。

func (s *Server) Start(network, address string) error {
	listener, err := net.Listen(network, address)
	if err != nil {
		return err
	}

	for {
		conn, err := listener.Accept()
		if err != nil {
			return err
		}
		go func() {
			if herr := s.handleConn(conn); herr != nil {
				_ = conn.Close()
			}
		}()
	}
}

func (s *Server) handleConn(conn net.Conn) error {
	for {
		reqBs, err := ReadMsg(conn)
		if err != nil {
			return err
		}

		// 还原调用信息
		req := message.DecodeReq(reqBs)

		ctx := context.Background()
		oneway, ok := req.Meta["one-way"]
		if ok && oneway == "true" {
			ctx = CtxWithOneway(ctx)
		}

		resp, err := s.Invoke(ctx, req)
		if err != nil {
			// 处理业务 error
			resp.Error = []byte(err.Error())
		}

		resp.CalculateHeaderLength()
		resp.CalculateBodyLength()

		_, err = conn.Write(message.EncodeResp(resp))
		if err != nil {
			return err
		}
	}
}
func ReadMsg(conn net.Conn) ([]byte, error) {
	lenBs := make([]byte, numOfLengthBytes)
	_, err := conn.Read(lenBs)
	if err != nil {
		return nil, err
	}

	headerLength := binary.BigEndian.Uint32(lenBs[:4])
	bodyLength := binary.BigEndian.Uint32(lenBs[4:8])
	length := headerLength + bodyLength

	data := make([]byte, length)
	copy(data[:8], lenBs)
	_, err = conn.Read(data[8:])

	return data, err
}

服务端执行逻辑

根据客户端传过来的信息找到对应的服务,再使用反射执行对应的方法。

type Server struct {
	stubs       map[string]reflectionStub
	serializers map[uint8]serialize.Serializer
}

func (s *Server) Invoke(ctx context.Context, req *message.Request) (*message.Response, error) {
	// 发起业务调用
	stub, ok := s.stubs[req.ServiceName]

	resp := &message.Response{
		RequestID:  req.RequestID,
		Version:    req.Version,
		Compressor: req.Compressor,
		Serializer: req.Serializer,
	}

	if !ok {
		// 即使是 oneway 调用,也返回这个错误。
		return resp, errors.New("service not available")
	}

	if isOneway(ctx) {
		go func() {
			_, _ = stub.invoke(ctx, req)
		}()
		return nil, nil
	}

	respData, err := stub.invoke(ctx, req)
	resp.Data = respData
	if err != nil {
		return resp, err
	}

	return resp, nil
}

type reflectionStub struct {
	service     Service
	value       reflect.Value
	serializers map[uint8]serialize.Serializer
}

func (s *reflectionStub) invoke(ctx context.Context, req *message.Request) ([]byte, error) {
	// 反射找到方法,并且执行调用
	method := s.value.MethodByName(req.MethodName)

	inReq := reflect.New(method.Type().In(1).Elem())
	serializer, ok := s.serializers[req.Serializer]
	if !ok {
		return nil, errors.New("micro: not supported serializer " + strconv.FormatUint(uint64(req.Serializer), 10))
	}
	err := serializer.Decode(req.Data, inReq.Interface())
	if err != nil {
		return nil, err
	}

	in := []reflect.Value{
		reflect.ValueOf(context.Background()),
		inReq,
	}
	result := method.Call(in)

	if result[1].Interface() != nil {
		err = result[1].Interface().(error)
	}

	var res []byte
	if result[0].IsNil() {
		return nil, err
	} else {
		var er error
		res, er = serializer.Encode(result[0].Interface())
		if er != nil {
			return nil, er
		}
	}

	return res, err
}

基础架构

客户端:

  • 初始化代理
  • 代理会利用反射获得调用信息
  • 将调用信息编码成字节流,加上长度字段
  • 将数据发送到服务端
  • 等待并且解析响应

服务端:

  • 启动服务器监听端口
  • 接收连接,并且读取数据
  • 将数据还原回调用信息
  • 根据服务名查找该实例上注册的服务
  • 利用反射执行方法调用
  • 写回响应

总结

  • 什么是 RPC ?远程过程调用,类似的还有 RMl:远程方法调用。
  • 和使用 HTTP 接口比起来,使用 RPC 有什么优势?不必关心 HTTP 调用的细节,对于使用者来说就如同本地调用一般。
  • RPC 框架的要点是什么?客户端捕捉调用信息,编码成二进制,发送到服务端。服务端查找本地服务,执行调用,写回响应。任何一个 RPC 框架都类似。所以如果面试官问 gRPC 的大概步骤,也可以这么回答。
  • RPC 框架怎么捕捉本地调用信息(或者说 RPC 框架怎么知道你调用的是什么)?主要依赖于代理模式和代码生成技术。
  • 什么是代理模式?什么是动态代理模式?动态代理可以看做是动态生成的代理,一般是指运行时生成的代理。
  • 动态代理技术能用来做什么?四个字,为所欲为。在这里就是用来发起 RPC 调用,然后再返回响应。

知识共享许可协议

本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。