底层通信选型
设计微服务框架的起点是确定底层通信方式,因为服务治理等上层功能都建立在此基础之上。可选的通信方式包括:
- 直接使用 gRPC
- 直接使用 HTTP
- 设计自定义的 RPC 协议
接下来我们将共同设计自定义的 RPC 协议。
RPC 要解决的核心问题
RPC(Remote Procedure Call,远程过程调用)的核心是让客户端像本地调用一样调用服务器上的方法。关键在于如何将客户端(左侧虚线框的服务 A)映射到服务器端(右侧实线框的服务 A),实现透明的远程调用。
调用信息
要完成这种映射,首先需要解决的第一个问题是:映射什么?举个例子,假如客户端调用的是 userService.GetById
,传入的参数是一个 int
类型的值 123
。那么,服务端如何得知客户端调用的是 userService.GetById
,且参数是 int
类型的 123
呢?
答案很简单:我们将这些调用信息传递给服务端,统称为“调用信息”。那么,调用信息应该包含哪些内容?
- 服务名:例如
userService
- 方法名:例如
GetById
- 参数值:例如
123
至于是否需要传递参数类型,这取决于是否支持方法重载。如果你在支持重载的语言中设计微服务框架,并且决定支持重载,那么就需要传递参数类型;否则,参数类型可以省略。
客户端捕捉本地调用
既然需要传递调用信息,那么问题就在于:RPC 客户端如何获取这些调用信息? 也就是说,当用户调用 userService.GetById(123)
时,底层框架如何得知 userService
、GetById
和参数 123
这些信息?如果你是 RPC 设计者,如何捕捉本地调用的信息?
代理模式
这就需要使用代理模式:定义一个结构体,并为其中的方法类型字段注入调用逻辑。需要注意的是,Go 语言无法直接修改方法实现,因此我们只能通过代理模式来实现这一功能。
// UserService 声明服务,反射会把 GetById 转成一个 RPC 调用
type UserService struct {
// 用反射来赋值
// 类型是函数的字段,它不是方法(它不是定义在 UserService 上的方法)
// 本质上是一个字段
GetById func(ctx context.Context, req *GetByIdReq) (*GetByIdResp, error)
}
type GetByIDReq struct {
ID int
}
type GetByIDResp struct {
Msg string
}
func (s UserService) Name() string {
return "user-service"
}
我们做出以下约定:
- 每个方法的第一个参数必须是
context.Context
,第二个参数为请求结构体的指针,且只有这两个参数。 - 返回值的第一个为响应指针,第二个为
error
,且只能有这两个返回值。
这种限制的主要目的是简化微服务框架的代码。在实际生产环境中,你可以选择保留此限制,也可以根据需求灵活调整。
反射生成代理
客户端一侧将本地调用篡改成 RPC 调用进行发送。
func (c *Client) InitService(service Service) error {
return setFuncField(service, c, c.serializer)
}
func setFuncField(service Service, proxy Proxy, s serialize.Serializer) error {
if service == nil {
return ErrServiceNil
}
val := reflect.ValueOf(service)
typ := val.Type()
// 只支持指向结构体的一级指针
if typ.Kind() != reflect.Pointer || typ.Elem().Kind() != reflect.Struct {
return ErrServiceWrongType
}
val = val.Elem()
typ = typ.Elem()
numField := typ.NumField()
for i := 0; i < numField; i++ {
fieldTyp := typ.Field(i)
fieldVal := val.Field(i)
if !fieldVal.CanSet() || fieldVal.Kind() != reflect.Func {
continue
}
fn := func(args []reflect.Value) (results []reflect.Value) {
...
}
fnVal := reflect.MakeFunc(fieldTyp.Type, fn)
fieldVal.Set(fnVal)
}
return nil
}
编码并发送请求
到目前为止,我们已经获得了调用信息。接下来,需要将这些调用信息转换为字节流,并通过网络发送到服务端。这实际上涉及到 RPC 协议设计以及序列化协议的选择。
编码并发送
上文中省略的 fn 部分就是将客户端一侧的调用信息封装好发送给服务端。内部涉及到了信息的编解码和网络的发送流程。
fn := func(args []reflect.Value) (results []reflect.Value) {
ctx := args[0].Interface().(context.Context)
retVal := reflect.New(fieldTyp.Type.Out(0).Elem())
reqArg, err := s.Encode(args[1].Interface())
if err != nil {
return []reflect.Value{
retVal,
reflect.ValueOf(err),
}
}
var meta map[string]string
if isOneway(ctx) {
meta = map[string]string{
"one-way": "true",
}
}
req := &message.Request{
ServiceName: service.Name(),
MethodName: fieldTyp.Name,
Data: reqArg,
Serializer: s.Code(),
Meta: meta,
}
fmt.Println(req)
req.CalculateHeaderLength()
req.CalculateBodyLength()
resp, err := proxy.Invoke(ctx, req)
if err != nil {
return []reflect.Value{
retVal,
reflect.ValueOf(err),
}
}
fmt.Println(string(resp.Data))
var retErr error
if len(resp.Error) > 0 {
// 服务端传来的 error
retErr = errors.New(string(resp.Error))
}
if len(resp.Data) > 0 {
err = s.Decode(resp.Data, retVal.Interface())
if err != nil {
// 反序列化的 error
return []reflect.Value{
retVal,
reflect.ValueOf(err),
}
}
}
var retErrVal reflect.Value
if retErr != nil {
retErrVal = reflect.ValueOf(retErr)
} else {
retErrVal = reflect.Zero(reflect.TypeOf(new(error)).Elem())
}
return []reflect.Value{
retVal,
retErrVal,
}
}
服务端接收
服务端监听在对应的端口,收到请求后将调用信息还原。
func (s *Server) Start(network, address string) error {
listener, err := net.Listen(network, address)
if err != nil {
return err
}
for {
conn, err := listener.Accept()
if err != nil {
return err
}
go func() {
if herr := s.handleConn(conn); herr != nil {
_ = conn.Close()
}
}()
}
}
func (s *Server) handleConn(conn net.Conn) error {
for {
reqBs, err := ReadMsg(conn)
if err != nil {
return err
}
// 还原调用信息
req := message.DecodeReq(reqBs)
ctx := context.Background()
oneway, ok := req.Meta["one-way"]
if ok && oneway == "true" {
ctx = CtxWithOneway(ctx)
}
resp, err := s.Invoke(ctx, req)
if err != nil {
// 处理业务 error
resp.Error = []byte(err.Error())
}
resp.CalculateHeaderLength()
resp.CalculateBodyLength()
_, err = conn.Write(message.EncodeResp(resp))
if err != nil {
return err
}
}
}
func ReadMsg(conn net.Conn) ([]byte, error) {
lenBs := make([]byte, numOfLengthBytes)
_, err := conn.Read(lenBs)
if err != nil {
return nil, err
}
headerLength := binary.BigEndian.Uint32(lenBs[:4])
bodyLength := binary.BigEndian.Uint32(lenBs[4:8])
length := headerLength + bodyLength
data := make([]byte, length)
copy(data[:8], lenBs)
_, err = conn.Read(data[8:])
return data, err
}
服务端执行逻辑
根据客户端传过来的信息找到对应的服务,再使用反射执行对应的方法。
type Server struct {
stubs map[string]reflectionStub
serializers map[uint8]serialize.Serializer
}
func (s *Server) Invoke(ctx context.Context, req *message.Request) (*message.Response, error) {
// 发起业务调用
stub, ok := s.stubs[req.ServiceName]
resp := &message.Response{
RequestID: req.RequestID,
Version: req.Version,
Compressor: req.Compressor,
Serializer: req.Serializer,
}
if !ok {
// 即使是 oneway 调用,也返回这个错误。
return resp, errors.New("service not available")
}
if isOneway(ctx) {
go func() {
_, _ = stub.invoke(ctx, req)
}()
return nil, nil
}
respData, err := stub.invoke(ctx, req)
resp.Data = respData
if err != nil {
return resp, err
}
return resp, nil
}
type reflectionStub struct {
service Service
value reflect.Value
serializers map[uint8]serialize.Serializer
}
func (s *reflectionStub) invoke(ctx context.Context, req *message.Request) ([]byte, error) {
// 反射找到方法,并且执行调用
method := s.value.MethodByName(req.MethodName)
inReq := reflect.New(method.Type().In(1).Elem())
serializer, ok := s.serializers[req.Serializer]
if !ok {
return nil, errors.New("micro: not supported serializer " + strconv.FormatUint(uint64(req.Serializer), 10))
}
err := serializer.Decode(req.Data, inReq.Interface())
if err != nil {
return nil, err
}
in := []reflect.Value{
reflect.ValueOf(context.Background()),
inReq,
}
result := method.Call(in)
if result[1].Interface() != nil {
err = result[1].Interface().(error)
}
var res []byte
if result[0].IsNil() {
return nil, err
} else {
var er error
res, er = serializer.Encode(result[0].Interface())
if er != nil {
return nil, er
}
}
return res, err
}
基础架构
客户端:
- 初始化代理
- 代理会利用反射获得调用信息
- 将调用信息编码成字节流,加上长度字段
- 将数据发送到服务端
- 等待并且解析响应
服务端:
- 启动服务器监听端口
- 接收连接,并且读取数据
- 将数据还原回调用信息
- 根据服务名查找该实例上注册的服务
- 利用反射执行方法调用
- 写回响应
总结
- 什么是 RPC ?远程过程调用,类似的还有 RMl:远程方法调用。
- 和使用 HTTP 接口比起来,使用 RPC 有什么优势?不必关心 HTTP 调用的细节,对于使用者来说就如同本地调用一般。
- RPC 框架的要点是什么?客户端捕捉调用信息,编码成二进制,发送到服务端。服务端查找本地服务,执行调用,写回响应。任何一个 RPC 框架都类似。所以如果面试官问 gRPC 的大概步骤,也可以这么回答。
- RPC 框架怎么捕捉本地调用信息(或者说 RPC 框架怎么知道你调用的是什么)?主要依赖于代理模式和代码生成技术。
- 什么是代理模式?什么是动态代理模式?动态代理可以看做是动态生成的代理,一般是指运行时生成的代理。
- 动态代理技术能用来做什么?四个字,为所欲为。在这里就是用来发起 RPC 调用,然后再返回响应。
本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。