4.14. Multiplexing

基于管道,我们可以很容易实现一个支持多路客户端的服务器程序。采用的技巧是将每个客户端私有的通信管道 作为消息的一部分发送给服务器,然后服务器通过这些管道和客户端独立通信。现实中的服务器实现都很复杂, 我们这里只给出一个服务器的简单实现来展现前面描述的技巧。首先定义一个"request"类型,里面包含一个 客户端的通信管道。

  09    type request struct {
  10        a, b    int
  11        replyc  chan int
  12    }

服务器对客户端发送过来的两个整数进行运算。下面是具体的函数,函数在运算完之后将结构通过结构中的 管道返回给客户端。

  14    type binOp func(a, b int) int

  16    func run(op binOp, req *request) {
  17        reply := op(req.a, req.b)
  18        req.replyc <- reply
  19    }

第14行现定义一个"binOp"函数类型,用于对两个整数进行运算。

服务器routine线程是一个无限循环,它接受客户端请求。然后为每个客户端启动一个独立的routine线程, 用于处理客户数据(不会被某个客户端阻塞)。

  21    func server(op binOp, service chan *request) {
  22        for {
  23            req := <-service
  24            go run(op, req)  // don't wait for it
  25        }
  26    }

启动服务器的方法也是一个类似的routine线程,然后返回服务器的请求管道。

  28    func startServer(op binOp) chan *request {
  29        req := make(chan *request)
  30        go server(op, req)
  31        return req
  32    }

这里是一个简单的测试。首先启动服务器,处理函数为计算两个整数的和。接着向服务器发送"N"个请求(无阻塞)。 当所有请求都发送完了之后,再进行验证返回结果。

  34    func main() {
  35        adder := startServer(func(a, b int) int { return a + b })
  36        const N = 100
  37        var reqs [N]request
  38        for i := 0; i < N; i++ {
  39            req := &reqs[i]
  40            req.a = i
  41            req.b = i + N
  42            req.replyc = make(chan int)
  43            adder <- req
  44        }
  45        for i := N-1; i >= 0; i-- {   // doesn't matter what order
  46            if <-reqs[i].replyc != N + 2*i {
  47                fmt.Println("fail at", i)
  48            }
  49        }
  50        fmt.Println("done")
  51    }

前面的服务器程序有个小问题:当main函数退出之后,服务器没有关闭,而且可能有一些客户端被阻塞在 管道通信中。为了处理这个问题,我们可给服务器增加一个控制管道,用于退出服务器。

  32    func startServer(op binOp) (service chan *request, quit chan bool) {
  33        service = make(chan *request)
  34        quit = make(chan bool)
  35        go server(op, service, quit)
  36        return service, quit
  37    }

首先给"server"函数增加一个控制管道参数,然后这样使用:

  21    func server(op binOp, service chan *request, quit chan bool) {
  22        for {
  23            select {
  24            case req := <-service:
  25                go run(op, req)  // don't wait for it
  26            case <-quit:
  27                return
  28            }
  29        }
  30    }

在服务器函数中,"select"操作服用于从多个通讯管道中选择一个就绪的管道。如果所有的管道都没有数据, 那么将等待知道有任意一个管道有数据。如果有多个管道就绪,则随即选择一个。服务器处理客户端请求,如果 有退出消息则退出。

最后是在main函数中保存"quit"管道,然后在退出的时候向服务线程发送停止命令。

  40        adder, quit := startServer(func(a, b int) int { return a + b })
  ...

  55        quit <- true

当然,Go语言及并行编程要讨论的问题很多。这个入门只是给出一些简单的例子。