go rpc 源码分析

https://segmentfault.com/a/1190000013532622

1. 概述

go 源码中带了rpc框架,以相对精简的当时方式实现了rpc功能,目前源码中的rpc官方已经宣布不再添加新功能,并推荐使用grpc.
作为go标准库中rpc框架,还是有很多地方值得借鉴及学习,这里将从源码角度分析go原生rpc框架,以及分享一些在使用过程中遇到的坑.

2. server端

server端主要分为两个步骤,首先进行方法注册,通过反射处理将方法取出,并存到map中.然后是网络调用,主要是监听端口,读取数据包,解码请求
调用反射处理后的方法,将返回值编码,返回给客户端.

2.1 方法注册

2.1.1 Register

  1. // Register publishes the receiver's methods in the DefaultServer.
  2. func Register(rcvr interface{}) error { return DefaultServer.Register(rcvr) }
  3. // RegisterName is like Register but uses the provided name for the type
  4. // instead of the receiver's concrete type.
  5. func RegisterName(name string, rcvr interface{}) error {
  6. return DefaultServer.RegisterName(name, rcvr)
  7. }

如上,方法注册的入口函数有两个,分别为Register以及RegisterName,这里interface{}通常是带方法的对象.如果想要自定义方法的接收对象,则可以使用RegisterName.

2.1.2 反射处理过程

  1. type methodType struct {
  2. sync.Mutex // protects counters
  3. method reflect.Method //反射后的函数
  4. ArgType reflect.Type //请求参数的反射值
  5. ReplyType reflect.Type //返回参数的反射值
  6. numCalls uint //调用次数
  7. }
  8. type service struct {
  9. name string // 服务名,这里通常为register时的对象名或自定义对象名
  10. rcvr reflect.Value // 服务的接收者的反射值
  11. typ reflect.Type // 接收者的类型
  12. method map[string]*methodType // 对象的所有方法的反射结果.
  13. }

反射处理过程,其实就是将对象以及对象的方法,通过反射生成上面的结构,如注册Arith.Multiply(xx,xx) error 这样的对象时,生成的结构为 map["Arith"]service, service 中ethod为 map["Multiply"]methodType.

几个关键代码如下:

生成service对象

  1. func (server *Server) register(rcvr interface{}, name string, useName bool) error {
  2. //生成service
  3. s := new(service)
  4. s.typ = reflect.TypeOf(rcvr)
  5. s.rcvr = reflect.ValueOf(rcvr)
  6. sname := reflect.Indirect(s.rcvr).Type().Name()
  7. ....
  8. s.name = sname
  9. // 通过suitableMethods将对象的方法转换成map[string]*methodType结构
  10. s.method = suitableMethods(s.typ, true)
  11. ....
  12. //service存储为键值对
  13. if _, dup := server.serviceMap.LoadOrStore(sname, s); dup {
  14. return errors.New("rpc: service already defined: " + sname)
  15. }
  16. return nil
  17. }

生成 map[string] *methodType

  1. func suitableMethods(typ reflect.Type, reportErr bool) map[string]*methodType {
  2. methods := make(map[string]*methodType)
  3. //通过反射,遍历所有的方法
  4. for m := 0; m < typ.NumMethod(); m++ {
  5. method := typ.Method(m)
  6. mtype := method.Type
  7. mname := method.Name
  8. // Method must be exported.
  9. if method.PkgPath != "" {
  10. continue
  11. }
  12. // Method needs three ins: receiver, *args, *reply.
  13. if mtype.NumIn() != 3 {
  14. if reportErr {
  15. log.Println("method", mname, "has wrong number of ins:", mtype.NumIn())
  16. }
  17. continue
  18. }
  19. //取出请求参数类型
  20. argType := mtype.In(1)
  21. ...
  22. // 取出响应参数类型,响应参数必须为指针
  23. replyType := mtype.In(2)
  24. if replyType.Kind() != reflect.Ptr {
  25. if reportErr {
  26. log.Println("method", mname, "reply type not a pointer:", replyType)
  27. }
  28. continue
  29. }
  30. ...
  31. // 去除函数的返回值,函数的返回值必须为error.
  32. if returnType := mtype.Out(0); returnType != typeOfError {
  33. if reportErr {
  34. log.Println("method", mname, "returns", returnType.String(), "not error")
  35. }
  36. continue
  37. }
  38. //将方法存储成key-value
  39. methods[mname] = &methodType{method: method, ArgType: argType, ReplyType: replyType}
  40. }
  41. return methods
  42. }

2.2 网络调用

  1. // Request 每次rpc调用的请求的头部分
  2. type Request struct {
  3. ServiceMethod string // 格式为: "Service.Method"
  4. Seq uint64 // 客户端生成的序列号
  5. next *Request // server端保持的链表
  6. }
  7. // Response 每次rpc调用的响应的头部分
  8. type Response struct {
  9. ServiceMethod string // 对应请求部分的 ServiceMethod
  10. Seq uint64 // 对应请求部分的 Seq
  11. Error string // 错误
  12. next *Response // server端保持的链表
  13. }

如上,网络调用主要用到上面的两个结构体,分别是请求参数以及返回参数,通过编解码器(gob/json)实现二进制到结构体的相互转换.主要涉及到下面几个步骤:

关键代码如下:
取出请求,并得到相应函数的调用参数

  1. func (server *Server) readRequestHeader(codec ServerCodec) (svc *service, mtype *methodType, req *Request, keepReading bool, err error) {
  2. // Grab the request header.
  3. req = server.getRequest()
  4. //编码器读取生成请求
  5. err = codec.ReadRequestHeader(req)
  6. if err != nil {
  7. //错误处理
  8. ...
  9. return
  10. }
  11. keepReading = true
  12. //取出服务名以及方法名
  13. dot := strings.LastIndex(req.ServiceMethod, ".")
  14. if dot < 0 {
  15. err = errors.New("rpc: service/method request ill-formed: " + req.ServiceMethod)
  16. return
  17. }
  18. serviceName := req.ServiceMethod[:dot]
  19. methodName := req.ServiceMethod[dot+1:]
  20. //从注册时生成的map中查询出相应的方法的结构
  21. svci, ok := server.serviceMap.Load(serviceName)
  22. if !ok {
  23. err = errors.New("rpc: can't find service " + req.ServiceMethod)
  24. return
  25. }
  26. svc = svci.(*service)
  27. //获取出方法的类型
  28. mtype = svc.method[methodName]
  29. if mtype == nil {
  30. err = errors.New("rpc: can't find method " + req.ServiceMethod)
  31. }

循环处理,不断读取链接上的字节流,解密出请求,调用方法,编码响应,回写到客户端.

  1. func (server *Server) ServeCodec(codec ServerCodec) {
  2. sending := new(sync.Mutex)
  3. for {
  4. //读取请求
  5. service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
  6. if err != nil {
  7. ...
  8. }
  9. //调用
  10. go service.call(server, sending, mtype, req, argv, replyv, codec)
  11. }
  12. codec.Close()
  13. }

通过参数进行函数调用

  1. func (s *service) call(server *Server, sending *sync.Mutex, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {
  2. mtype.Lock()
  3. mtype.numCalls++
  4. mtype.Unlock()
  5. function := mtype.method.Func
  6. // 通过反射进行函数调用
  7. returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv})
  8. // 返回值是不为空时,则取出错误的string
  9. errInter := returnValues[0].Interface()
  10. errmsg := ""
  11. if errInter != nil {
  12. errmsg = errInter.(error).Error()
  13. }
  14. //发送相应,并释放请求结构
  15. server.sendResponse(sending, req, replyv.Interface(), codec, errmsg)
  16. server.freeRequest(req)
  17. }

3. client端

  1. // 异步调用
  2. func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
  3. }
  4. // 同步调用
  5. func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
  6. }
  7. // Call represents an active RPC.
  8. type Call struct {
  9. ServiceMethod string // 服务名及方法名 格式:服务.方法
  10. Args interface{} // 函数的请求参数 (*struct).
  11. Reply interface{} // 函数的响应参数 (*struct).
  12. Error error // 方法完成后 error的状态.
  13. Done chan *Call // 方法调用结束后的channel.
  14. }

client端部分则相对要简单很多,主要提供Call以及Go两个方法,分别表示同步调用以及异步调用,但其实同步调用底层实现其实也是异步调用,调用时主要用到了Call结构,相关解释如上.

3.1 主要流程

3.2 关键代码
发送请求部分代码,每次send一次请求,均生成一个call对象,并使用seq作为key保存在map中,服务端返回时从map取出call,进行相应处理.

  1. func (client *Client) send(call *Call) {
  2. //请求级别的锁
  3. client.reqMutex.Lock()
  4. defer client.reqMutex.Unlock()
  5. // Register this call.
  6. client.mutex.Lock()
  7. if client.shutdown || client.closing {
  8. call.Error = ErrShutdown
  9. client.mutex.Unlock()
  10. call.done()
  11. return
  12. }
  13. //生成seq,每次调用均生成唯一的seq,在服务端相应后会通过该值进行匹配
  14. seq := client.seq
  15. client.seq++
  16. client.pending[seq] = call
  17. client.mutex.Unlock()
  18. // 请求并发送请求
  19. client.request.Seq = seq
  20. client.request.ServiceMethod = call.ServiceMethod
  21. err := client.codec.WriteRequest(&client.request, call.Args)
  22. if err != nil {
  23. //发送请求错误时,将map中call对象删除.
  24. client.mutex.Lock()
  25. call = client.pending[seq]
  26. delete(client.pending, seq)
  27. client.mutex.Unlock()
  28. if call != nil {
  29. call.Error = err
  30. call.done()
  31. }
  32. }
  33. }

接收响应部分的代码,这里是一个for循环,不断读取tcp上的流,并解码成Response对象以及方法的Reply对象.

  1. func (client *Client) input() {
  2. var err error
  3. var response Response
  4. for err == nil {
  5. response = Response{}
  6. err = client.codec.ReadResponseHeader(&response)
  7. if err != nil {
  8. break
  9. }
  10. //通过response中的 Seq获取call对象
  11. seq := response.Seq
  12. client.mutex.Lock()
  13. call := client.pending[seq]
  14. delete(client.pending, seq)
  15. client.mutex.Unlock()
  16. switch {
  17. case call == nil:
  18. err = client.codec.ReadResponseBody(nil)
  19. if err != nil {
  20. err = errors.New("reading error body: " + err.Error())
  21. }
  22. case response.Error != "":
  23. //服务端返回错误,直接将错误返回
  24. call.Error = ServerError(response.Error)
  25. err = client.codec.ReadResponseBody(nil)
  26. if err != nil {
  27. err = errors.New("reading error body: " + err.Error())
  28. }
  29. call.done()
  30. default:
  31. //通过编码器,将Resonse的body部分解码成reply对象.
  32. err = client.codec.ReadResponseBody(call.Reply)
  33. if err != nil {
  34. call.Error = errors.New("reading body " + err.Error())
  35. }
  36. call.done()
  37. }
  38. }
  39. // 客户端退出处理
  40. client.reqMutex.Lock()
  41. client.mutex.Lock()
  42. client.shutdown = true
  43. closing := client.closing
  44. if err == io.EOF {
  45. if closing {
  46. err = ErrShutdown
  47. } else {
  48. err = io.ErrUnexpectedEOF
  49. }
  50. }
  51. for _, call := range client.pending {
  52. call.Error = err
  53. call.done()
  54. }
  55. client.mutex.Unlock()
  56. client.reqMutex.Unlock()
  57. if debugLog && err != io.EOF && !closing {
  58. log.Println("rpc: client protocol error:", err)
  59. }
  60. }

4. 一些坑

同步调用无法超时
由于原生rpc只提供两个方法,同步的Call以及异步的Go,同步的Call服务端不返回则会一直阻塞,这里如果存在大量的不返回,会导致协程一直无法释放.

异步调用超时后会内存泄漏
基于异步调用加channel实现超时功能也会存在泄漏问题,原因是client的请求会存在map结构中,Go函数退出并不会清理map的内容,因此如果server端不返回的话,map中的请求会一直存储,从而导致内存泄漏.

5. 总结

总的来说,go原生rpc算是个基础版本的rpc,代码精简,可扩展性高,但是只是实现了rpc最基本的网络通讯,像超时熔断,链接管理(保活与重连),服务注册发现,还是欠缺的,因此还是达不到生产环境开箱即用,不过git就有一个基于rpc的功能增强版本,叫rpcx,支持了大部分主流rpc的特性.

6. 参考

  1. rpc https://golang.org/pkg/net/rpc/

3月5日发布
新浪微博微信TwitterFacebook
你可能感兴趣的文章
gRPC 初探 2 收藏,1.4k 浏览
Golang gRPC实践 连载一 gRPC介绍与安装 44 收藏,8.4k 浏览
golang 依赖管理 251 浏览

本作品采用 署名-非商业性使用-禁止演绎 4.0 国际许可协议 进行许可 。
2 条评论

李浩然 · 3月7日
这个坑就没有解决办法吗?

赞 回复

沐风 作者 · 3月7日
如果server端一直不返回的话,是没办法解决的,所以说rpc不适合生产环境.另外因为rpc默认使用没有keepalive功能的,如果进行链接复用时可能会出现链接其实已经不可用,但是上层没有感知到,继续进行调用从而导致请求阻塞.
相比之下使用grpc要更方便的多,rpc的通用特性基本支持,另外还跨语言,可扩展性强.

ft_authoradmin  ft_create_time2018-03-20 14:02
 ft_update_time2018-03-20 14:07