golang net/rpc源码分析

http://vinllen.com/golang-net-rpcyuan-ma-fen-xi/
08 Jan 2018

为什么需要rpc框架?一次rpc需要指定调用的方法,参数,接收返回值。如果没有rpc框架,裸写tcp,什么时候知道报文传递完毕的界限。最简单我们可以搞个私有协议,TLV格式指定:T(type)指定类型,L(length)指定长度,V(Value)指定值,但是这个也会带入一些问题,比如规范问题,不同服务提供不同协议,这不乱套了吗;另外还有效率问题,比如我要传递一个数组怎么传?基于以上几个问题,rpc框架出现了,rpc框架采用序列化操作将请求和返回在发送端进行序列化,然后在接收端进行解序列化达到目的,如下图所示,图片来自博客
pic1.gif
服务调用流程如下:

  1. client调用client stub,这是一次本地过程调用
  2. client stub将参数打包成一个消息,然后发送这个消息。打包过程(序列化)也叫做 marshalling
  3. client所在的系统将消息发送给server
  4. server的的系统将收到的包传给server stub
  5. server stub解包得到参数。 解包(解序列化)也被称作 unmarshalling
  6. 最后server stub调用服务过程. 返回结果按照相反的步骤传给client

net/rpc是go自带的rpc框架,采用gob进行序列化。现在rpc框架有许多,比如跨语言调用的grpc,thrift等,服务治理框架dubbo,RPCX,go-micro等。rpc不同于RESTful API,前者可以基于HTTP,也可以基于TCP,UDP,主要注重方法,而后者为HTTP,主要为资源操作(增删改查)。关于rpc部分,可以查看这篇rpcx作者写的博客。本文的编写也参考了Go官方库RPC开发指南,这篇已经对net/rpc分析有个大概的轮廓了,只不过有些细节没有深究,本文我来扣一扣总结一下。
  本文框架:首先给出服务端和客户端调用的例子,然后介绍服务端代码,然后介绍客户端,最后总结一下

1. 调用的例子

1.1 服务端调用

  1. package server
  2. import "errors"
  3. type Args struct {
  4. A, B int
  5. }
  6. type Quotient struct {
  7. Quo, Rem int
  8. }
  9. type Arith int
  10. func (t *Arith) Multiply(args *Args, reply *int) error {
  11. *reply = args.A * args.B
  12. return nil
  13. }
  14. func (t *Arith) Divide(args *Args, quo *Quotient) error {
  15. if args.B == 0 {
  16. return errors.New("divide by zero")
  17. }
  18. quo.Quo = args.A / args.B
  19. quo.Rem = args.A % args.B
  20. return nil
  21. }

上面给出了服务端的例子,Arith提供了2个函数:Multiply相乘函数和Divide相除,格式规范为:第一个参数为传入的参数,第二个参数为返回的参数,返回值是error。这个也是定义rpc的必备约束:

  1. the method's type is exported.
  2. the method is exported.
  3. the method has two arguments, both exported (or builtin) types.
  4. the method's second argument is a pointer.
  5. the method has return type error.

也就是说,一个合格的RPC调用接口应该长这样:

  1. func (t *T) MethodName(argType T1, replyType *T2) error

服务端启动服务完整代码,具体调用流程在下面小节分析:

  1. package main
  2. import(
  3. "net"
  4. "net/rpc"
  5. "net/http/httptest"
  6. "errors"
  7. "log"
  8. "sync"
  9. )
  10. type Args struct {
  11. A, B int
  12. }
  13. type Quotient struct {
  14. Quo, Rem int
  15. }
  16. type Arith int
  17. func (t *Arith) Multiply(args *Args, reply *int) error {
  18. *reply = args.A * args.B
  19. return nil
  20. }
  21. func (t *Arith) Divide(args *Args, quo *Quotient) error {
  22. if args.B == 0 {
  23. return errors.New("divide by zero")
  24. }
  25. quo.Quo = args.A / args.B
  26. quo.Rem = args.A % args.B
  27. return nil
  28. }
  29. func listenTCP() (net.Listener, string) {
  30. l, e := net.Listen("tcp", "127.0.0.1:10011")
  31. if e != nil {
  32. log.Fatalf("net.Listen tcp :0: %v", e)
  33. }
  34. return l, l.Addr().String()
  35. }
  36. func startHttpServer() {
  37. server := httptest.NewServer(nil)
  38. httpServerAddr := server.Listener.Addr().String()
  39. log.Println("Test HTTP RPC server listening on", httpServerAddr)
  40. }
  41. func main() {
  42. rpc.Register(new(Arith)) //注册服务
  43. var l net.Listener
  44. l, serverAddr := listenTCP() //监听TCP连接
  45. log.Println("Test RPC server listening on", serverAddr)
  46. go rpc.Accept(l)
  47. rpc.HandleHTTP() //监听HTTP连接
  48. var httpOnce sync.Once
  49. httpOnce.Do(startHttpServer)
  50. select{}
  51. }

1.2 客户端调用

客户端分别调用异步和同步连接到TCP和HTTP2个接口。

  1. package main
  2. import(
  3. "net/rpc"
  4. "log"
  5. "fmt"
  6. )
  7. type Args struct {
  8. A, B int
  9. }
  10. type Quotient struct {
  11. Quo, Rem int
  12. }
  13. func main() {
  14. client, err := rpc.DialHTTP("tcp", "127.0.0.1:64120") //64120为服务端启动服务的端口
  15. if err != nil {
  16. log.Fatal("dialing:", err)
  17. }
  18. // Synchronous call
  19. args := &Args{7,8}
  20. var reply int
  21. err = client.Call("Arith.Multiply", args, &reply)
  22. if err != nil {
  23. log.Fatal("arith error:", err)
  24. }
  25. fmt.Printf("Arith: %d*%d=%d\n", args.A, args.B, reply)
  26. // Asynchronous call
  27. clientTCP, err := rpc.Dial("tcp", "127.0.0.1:10011")
  28. if err != nil {
  29. log.Fatal("dialing:", err)
  30. }
  31. quotient := new(Quotient)
  32. divCall := clientTCP.Go("Arith.Divide", args, quotient, nil)
  33. replyCall := <-divCall.Done // will be equal to divCall
  34. if replyCall.Error != nil {
  35. fmt.Println(replyCall.Error)
  36. } else {
  37. fmt.Printf("Arith: %d/%d=%d...%d\n", args.A, args.B, quotient.Quo, quotient.Rem)
  38. }
  39. }

2. 服务端代码分析

  我们先来看一下服务端代码中的流程:注册(rpc.Register(new(Arith)))、启动监听(listenTCP())、协程处理TCP连接(rpc.Accept(l))、处理HTTP连接(rpc.HandleHTTP())。

2.1 注册(rpc.Register(new(Arith))

  首先是服务端代码调用rpc.Register(new(Arith)),然后是对应的Register代码:

  1. // Register publishes the receiver's methods in the DefaultServer.
  2. func Register(rcvr interface{}) error { return DefaultServer.Register(rcvr) }

其中,DefaultServer是全局变量var DefaultServer = NewServer(),也就是说如果注册多次,都是挂在同一个Server下面(除非new一个新的Server,有相应的接口)。
  调用Server中的Register:

  1. // Register publishes in the server the set of methods of the
  2. // receiver value that satisfy the following conditions:
  3. // - exported method of exported type
  4. // - two arguments, both of exported type
  5. // - the second argument is a pointer
  6. // - one return value, of type error
  7. // It returns an error if the receiver is not an exported type or has
  8. // no suitable methods. It also logs the error using package log.
  9. // The client accesses each method using a string of the form "Type.Method",
  10. // where Type is the receiver's concrete type.
  11. func (server *Server) Register(rcvr interface{}) error {
  12. return server.register(rcvr, "", false)
  13. }

接下来是具体的register函数:

  1. func (server *Server) register(rcvr interface{}, name string, useName bool) error {
  2. s := new(service)
  3. s.typ = reflect.TypeOf(rcvr)
  4. s.rcvr = reflect.ValueOf(rcvr)
  5. sname := reflect.Indirect(s.rcvr).Type().Name()
  6. if useName {
  7. sname = name
  8. }
  9. if sname == "" {
  10. s := "rpc.Register: no service name for type " + s.typ.String()
  11. log.Print(s)
  12. return errors.New(s)
  13. }
  14. if !isExported(sname) && !useName {
  15. s := "rpc.Register: type " + sname + " is not exported"
  16. log.Print(s)
  17. return errors.New(s)
  18. }
  19. s.name = sname
  20. // Install the methods
  21. s.method = suitableMethods(s.typ, true) //判断是否符合rpc规范
  22. if len(s.method) == 0 {
  23. str := ""
  24. // To help the user, see if a pointer receiver would work.
  25. method := suitableMethods(reflect.PtrTo(s.typ), false)
  26. if len(method) != 0 {
  27. str = "rpc.Register: type " + sname + " has no exported methods of suitable type (hint: pass a pointer to value of that type)"
  28. } else {
  29. str = "rpc.Register: type " + sname + " has no exported methods of suitable type"
  30. }
  31. log.Print(str)
  32. return errors.New(str)
  33. }
  34. if _, dup := server.serviceMap.LoadOrStore(sname, s); dup {
  35. return errors.New("rpc: service already defined: " + sname)
  36. }
  37. return nil
  38. }

通过反射获取接口类型和值,并通过suitableMethods函数判断注册的rpc是否符合规范,最后调用server.serviceMap.LoadOrStore(sname, s)将对应rpc存放于map中,供之后查找。

2.2 启动监听(listenTCP()

注册监听端口。

2.3 协程处理TCP连接(rpc.Accept(l)

Accept函数内对端口进行监听,有新来的连接,启动协程调用server.ServerConn方法进行处理:

  1. // Accept accepts connections on the listener and serves requests
  2. // for each incoming connection. Accept blocks until the listener
  3. // returns a non-nil error. The caller typically invokes Accept in a
  4. // go statement.
  5. func (server *Server) Accept(lis net.Listener) {
  6. for {
  7. conn, err := lis.Accept()
  8. if err != nil {
  9. log.Print("rpc.Serve: accept:", err.Error())
  10. return
  11. }
  12. go server.ServeConn(conn)
  13. }
  14. }

ServeConn接着调用ServeCodec,也就是走到了序列化/解序列化的地方:

  1. // ServeConn runs the server on a single connection.
  2. // ServeConn blocks, serving the connection until the client hangs up.
  3. // The caller typically invokes ServeConn in a go statement.
  4. // ServeConn uses the gob wire format (see package gob) on the
  5. // connection. To use an alternate codec, use ServeCodec.
  6. func (server *Server) ServeConn(conn io.ReadWriteCloser) {
  7. buf := bufio.NewWriter(conn)
  8. srv := &gobServerCodec{
  9. rwc: conn,
  10. dec: gob.NewDecoder(conn),
  11. enc: gob.NewEncoder(buf),
  12. encBuf: buf,
  13. }
  14. server.ServeCodec(srv)
  15. }

ServeCodec代码,其主要为读取消息然后对request进行解序列化,然后调用相应的RPC方法,处理后发送序列化后的返回参数:

  1. // ServeCodec is like ServeConn but uses the specified codec to
  2. // decode requests and encode responses.
  3. func (server *Server) ServeCodec(codec ServerCodec) {
  4. sending := new(sync.Mutex)
  5. wg := new(sync.WaitGroup)
  6. for {
  7. service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec) //读取消息并解序列化,其主要包含两部分:1.调用readRequestHeader读取请求,并查看请求的service是否存在,存在则返回。2.对输入参数进行解序列化,以及对应service的返回参数的类型
  8. if err != nil {
  9. if debugLog && err != io.EOF {
  10. log.Println("rpc:", err)
  11. }
  12. if !keepReading {
  13. break
  14. }
  15. // send a response if we actually managed to read a header.
  16. if req != nil {
  17. server.sendResponse(sending, req, invalidRequest, codec, err.Error())
  18. server.freeRequest(req)
  19. }
  20. continue
  21. }
  22. wg.Add(1) //信号量控制,在下面call方法中会进行Done()
  23. go service.call(server, sending, wg, mtype, req, argv, replyv, codec)//调用对应的service处理,然后返回序列化后的返回值。
  24. }
  25. // We've seen that there are no more requests.
  26. // Wait for responses to be sent before closing codec.
  27. wg.Wait()//等待所有service.call完成
  28. codec.Close()
  29. }

readRequest不具体展开了,来看一下call函数:

  1. func (s *service) call(server *Server, sending *sync.Mutex, wg *sync.WaitGroup, mtype *methodType, req *Request, argv, replyv reflect.Value, codec ServerCodec) {
  2. if wg != nil {
  3. defer wg.Done() //信号量控制释放
  4. }
  5. mtype.Lock()
  6. mtype.numCalls++ //访问次数计数
  7. mtype.Unlock()
  8. function := mtype.method.Func
  9. // Invoke the method, providing a new value for the reply.
  10. returnValues := function.Call([]reflect.Value{s.rcvr, argv, replyv}) //调用对应的rpc
  11. // The return value for the method is an error.
  12. errInter := returnValues[0].Interface()
  13. errmsg := ""
  14. if errInter != nil {
  15. errmsg = errInter.(error).Error()
  16. }
  17. server.sendResponse(sending, req, replyv.Interface(), codec, errmsg) //返回加密后的返回值
  18. server.freeRequest(req) //释放request,此处下面有疑问
  19. }

别的流程挺清晰了,此处freeRequest用于释放Request,对应的,readRequest中调用getRequest()获取头部。先来看一下Request结构体:

  1. // Request is a header written before every RPC call. It is used internally
  2. // but documented here as an aid to debugging, such as when analyzing
  3. // network traffic.
  4. type Request struct {
  5. ServiceMethod string // format: "Service.Method"
  6. Seq uint64 // sequence number chosen by client
  7. next *Request // for free list in Server
  8. }

包括了rpc方法名,序列号,已经链表结构存储下一个结点。看起来像是链表结构存储Request,request请求来了,拿一个Request结点,请求处理完毕,释放掉。

  1. func (server *Server) getRequest() *Request {
  2. server.reqLock.Lock()
  3. req := server.freeReq
  4. if req == nil {
  5. req = new(Request)
  6. } else {
  7. server.freeReq = req.next
  8. *req = Request{}
  9. }
  10. server.reqLock.Unlock()
  11. return req
  12. }
  13. func (server *Server) freeRequest(req *Request) {
  14. server.reqLock.Lock()
  15. req.next = server.freeReq
  16. server.freeReq = req
  17. server.reqLock.Unlock()
  18. }

然而看以上2个代码实现,现在的问题是:如果链表内没有结点可拿,则new一个,结束后把结点插入到链表的头部。那么链表的长度表示“最大一次并发访问量”,比如最大一次并发接受了100个请求,则结束后这个链表长度为100个Request,那么这个意义在哪里?为啥要用链表存,直接请求来了new不行吗?反正也是链表中结点也是拿出来复用的。此处的确没看懂。

同理,Response也是这么做的。

2.4 处理HTTP连接(rpc.HandleHTTP()

以上是RPC over TCP的情况,go这个rpc库还提供了RPC over HTTP的接口。HandleHTTP调用链就不具体讲了,其主要将默认的DefaultRPCPath传递给http.Handle,当启动http server的时候,上面设置的RPC path将会生效,默认访问到该path。接下来是ServeHTTP处理方法:

  1. // ServeHTTP implements an http.Handler that answers RPC requests.
  2. func (server *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
  3. if req.Method != "CONNECT" {
  4. w.Header().Set("Content-Type", "text/plain; charset=utf-8")
  5. w.WriteHeader(http.StatusMethodNotAllowed)
  6. io.WriteString(w, "405 must CONNECT\n")
  7. return
  8. }
  9. conn, _, err := w.(http.Hijacker).Hijack()
  10. if err != nil {
  11. log.Print("rpc hijacking ", req.RemoteAddr, ": ", err.Error())
  12. return
  13. }
  14. io.WriteString(conn, "HTTP/1.0 "+connected+"\n\n")
  15. server.ServeConn(conn)
  16. }

函数内对连接进行Hijack,然后调用ServeConn处理连接(这里为RPC over HTTP情况,与RPC over TCP为一个处理函数,也就是说下层透明)。

2.5 服务端小结

net/rpc中默认生成了一个server供调用,当然你也可以自己new一个。

3. 客户端代码分析

客户端代码提供了2种方式:同步Call和异步Go,其中Call方法的内部还是调用了Go方法,只不过进行了一次channel阻塞。

  1. // Go invokes the function asynchronously. It returns the Call structure representing
  2. // the invocation. The done channel will signal when the call is complete by returning
  3. // the same Call object. If done is nil, Go will allocate a new channel.
  4. // If non-nil, done must be buffered or Go will deliberately crash.
  5. func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
  6. call := new(Call)
  7. call.ServiceMethod = serviceMethod
  8. call.Args = args
  9. call.Reply = reply
  10. if done == nil {
  11. done = make(chan *Call, 10) // buffered.
  12. } else {
  13. // If caller passes done != nil, it must arrange that
  14. // done has enough buffer for the number of simultaneous
  15. // RPCs that will be using that channel. If the channel
  16. // is totally unbuffered, it's best not to run at all.
  17. if cap(done) == 0 {
  18. log.Panic("rpc: done channel is unbuffered")
  19. }
  20. }
  21. call.Done = done
  22. client.send(call)
  23. return call
  24. }
  25. // Call invokes the named function, waits for it to complete, and returns its error status.
  26. func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
  27. call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
  28. return call.Error
  29. }

Go里面是需要传入一个channel的且必须是带buffer的,如果没有,将会new一个,但是为什么这里channel buffer大小是10,我也不知道……send方法用于发送,考虑到一个client可能会调用多次send,所以有个加锁机制,但是为什么会有2个加锁?内层加锁是用于锁序列号,外层加锁锁整个发送过程?问题是既然2层嵌套,好像内层锁没什么用啊。

  1. func (client *Client) send(call *Call) {
  2. client.reqMutex.Lock()
  3. defer client.reqMutex.Unlock()
  4. // Register this call.
  5. client.mutex.Lock()
  6. if client.shutdown || client.closing {
  7. call.Error = ErrShutdown
  8. client.mutex.Unlock()
  9. call.done()
  10. return
  11. }
  12. seq := client.seq
  13. client.seq++
  14. client.pending[seq] = call
  15. client.mutex.Unlock()
  16. // Encode and send the request.
  17. client.request.Seq = seq
  18. client.request.ServiceMethod = call.ServiceMethod
  19. err := client.codec.WriteRequest(&client.request, call.Args)
  20. if err != nil {
  21. client.mutex.Lock()
  22. call = client.pending[seq]
  23. delete(client.pending, seq)
  24. client.mutex.Unlock()
  25. if call != nil {
  26. call.Error = err
  27. call.done()
  28. }
  29. }
  30. }

上面是发送过程,接收回复在Dial方法中调用了:

  1. // Dial connects to an RPC server at the specified network address.
  2. func Dial(network, address string) (*Client, error) {
  3. conn, err := net.Dial(network, address)
  4. if err != nil {
  5. return nil, err
  6. }
  7. return NewClient(conn), nil
  8. }

NewClient调用:

  1. // NewClient returns a new Client to handle requests to the
  2. // set of services at the other end of the connection.
  3. // It adds a buffer to the write side of the connection so
  4. // the header and payload are sent as a unit.
  5. func NewClient(conn io.ReadWriteCloser) *Client {
  6. encBuf := bufio.NewWriter(conn)
  7. client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}
  8. return NewClientWithCodec(client)
  9. }
  10. // NewClientWithCodec is like NewClient but uses the specified
  11. // codec to encode requests and decode responses.
  12. func NewClientWithCodec(codec ClientCodec) *Client {
  13. client := &Client{
  14. codec: codec,
  15. pending: make(map[uint64]*Call),
  16. }
  17. go client.input()
  18. return client
  19. }

同样也是先过解码器,调用input处理回复,失败或者处理完毕会调用done方法,这样读channel端就不会一直阻塞住。

  1. func (client *Client) input() {
  2. var err error
  3. var response Response
  4. for err == nil {
  5. response = Response{}
  6. err = client.codec.ReadResponseHeader(&response)
  7. if err != nil {
  8. break
  9. }
  10. seq := response.Seq
  11. client.mutex.Lock()
  12. call := client.pending[seq]
  13. delete(client.pending, seq)
  14. client.mutex.Unlock()
  15. switch {
  16. case call == nil:
  17. // We've got no pending call. That usually means that
  18. // WriteRequest partially failed, and call was already
  19. // removed; response is a server telling us about an
  20. // error reading request body. We should still attempt
  21. // to read error body, but there's no one to give it to.
  22. err = client.codec.ReadResponseBody(nil)
  23. if err != nil {
  24. err = errors.New("reading error body: " + err.Error())
  25. }
  26. case response.Error != "":
  27. // We've got an error response. Give this to the request;
  28. // any subsequent requests will get the ReadResponseBody
  29. // error if there is one.
  30. call.Error = ServerError(response.Error)
  31. err = client.codec.ReadResponseBody(nil)
  32. if err != nil {
  33. err = errors.New("reading error body: " + err.Error())
  34. }
  35. call.done()
  36. default:
  37. err = client.codec.ReadResponseBody(call.Reply)
  38. if err != nil {
  39. call.Error = errors.New("reading body " + err.Error())
  40. }
  41. call.done()
  42. }
  43. }
  44. // Terminate pending calls.
  45. client.reqMutex.Lock()
  46. client.mutex.Lock()
  47. client.shutdown = true
  48. closing := client.closing
  49. if err == io.EOF {
  50. if closing {
  51. err = ErrShutdown
  52. } else {
  53. err = io.ErrUnexpectedEOF
  54. }
  55. }
  56. for _, call := range client.pending {
  57. call.Error = err
  58. call.done()
  59. }
  60. client.mutex.Unlock()
  61. client.reqMutex.Unlock()
  62. if debugLog && err != io.EOF && !closing {
  63. log.Println("rpc: client protocol error:", err)
  64. }
  65. }
  66. func (call *Call) done() {
  67. select {
  68. case call.Done <- call:
  69. // ok
  70. default:
  71. // We don't want to block here. It is the caller's responsibility to make
  72. // sure the channel has enough buffer space. See comment in Go().
  73. if debugLog {
  74. log.Println("rpc: discarding Call reply due to insufficient Done chan capacity")
  75. }
  76. }
  77. }

总结

net/rpc库默认采用gob进行序列化,当然这个可以更改为protobuf, json等。据说net/rpc的性能很不错。

说明:

转载请注明链接: http://vinllen.com/golang-net-rpcyuan-ma-fen-xi/

参考:

http://colobu.com/2016/09/18/go-net-rpc-guide/
https://www.gitbook.com/book/smallnest/go-rpc-programming-guide/details

ft_authoradmin  ft_create_time2018-03-20 14:15
 ft_update_time2018-03-20 14:27