[译] 使用 Go 和 ReactJS 构建聊天系统 (四)

https://juejin.im/post/5d43d4c0f265da03cd0a6060

本节完整代码:GitHub

本文是关于使用 ReactJS 和 Go 构建聊天应用程序的系列文章的第 4 部分。你可以在这里找到第 3 部分 - 前端实现

这节主要实现处理多个客户端消息的功能,并将收到的消息广播到每个连接的客户端。在本系列的这一部分结束时,我们将:

  • 实现了一个池机制,可以有效地跟踪 WebSocket 服务中的连接数。
  • 能够将任何收到的消息广播到连接池中的所有连接。
  • 当另一个客户端连接或断开连接时,能够通知现有的客户端。

在本课程的这一部分结束时,我们的应用程序看起来像这样:






拆分 Websocket 代码

现在已经完成了必要的基本工作,我们可以继续改进代码库。可以将一些应用程序拆分为子包以便于开发。

现在,理想情况下,你的 main.go 文件应该只是 Go 应用程序的入口,它应该相当小,并且可以调用项目中的其他包。

注意 - 我们将参考非官方标准的 Go 项目结构布局 - golang-standards/project-layout

让我们在后端项目目录中创建一个名为 pkg/ 的新目录。在此期间,我们将要创建另一个名为 websocket/ 的目录,该目录将包含 websocket.go 文件。

我们将把目前在 main.go 文件中使用的许多基于 WebSocket 的代码移动到这个新的 websocket.go 文件中。

注意 - 需要注意的一件事是,当复制函数时,需要将每个函数的第一个字母大写,我们希望这些函数对项目的其余部分可导出。

  1. package websocket
  2. import (
  3. "fmt"
  4. "io"
  5. "log"
  6. "net/http"
  7. "github.com/gorilla/websocket"
  8. )
  9. var upgrader = websocket.Upgrader{
  10. ReadBufferSize: 1024,
  11. WriteBufferSize: 1024,
  12. CheckOrigin: func(r *http.Request) bool { return true },
  13. }
  14. func Upgrade(w http.ResponseWriter, r *http.Request) (*websocket.Conn, error) {
  15. ws, err := upgrader.Upgrade(w, r, nil)
  16. if err != nil {
  17. log.Println(err)
  18. return ws, err
  19. }
  20. return ws, nil
  21. }
  22. func Reader(conn *websocket.Conn) {
  23. for {
  24. messageType, p, err := conn.ReadMessage()
  25. if err != nil {
  26. log.Println(err)
  27. return
  28. }
  29. fmt.Println(string(p))
  30. if err := conn.WriteMessage(messageType, p); err != nil {
  31. log.Println(err)
  32. return
  33. }
  34. }
  35. }
  36. func Writer(conn *websocket.Conn) {
  37. for {
  38. fmt.Println("Sending")
  39. messageType, r, err := conn.NextReader()
  40. if err != nil {
  41. fmt.Println(err)
  42. return
  43. }
  44. w, err := conn.NextWriter(messageType)
  45. if err != nil {
  46. fmt.Println(err)
  47. return
  48. }
  49. if _, err := io.Copy(w, r); err != nil {
  50. fmt.Println(err)
  51. return
  52. }
  53. if err := w.Close(); err != nil {
  54. fmt.Println(err)
  55. return
  56. }
  57. }
  58. }

现在已经创建了这个新的 websocket 包,然后我们想要更新 main.go 文件来调用这个包。首先必须在文件顶部的导入列表中添加一个新的导入,然后可以通过使用 websocket. 来调用该包中的函数。像这样:

  1. package main
  2. import (
  3. "fmt"
  4. "net/http"
  5. "realtime-chat-go-react/backend/pkg/websocket"
  6. )
  7. func serveWs(pool *websocket.Pool, w http.ResponseWriter, r *http.Request) {
  8. fmt.Println("WebSocket Endpoint Hit")
  9. conn, err := websocket.Upgrade(w, r)
  10. if err != nil {
  11. fmt.Fprintf(w, "%+v\n", err)
  12. }
  13. client := &websocket.Client{
  14. Conn: conn,
  15. Pool: pool,
  16. }
  17. pool.Register <- client
  18. client.Read()
  19. }
  20. func setupRoutes() {
  21. pool := websocket.NewPool()
  22. go pool.Start()
  23. http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
  24. serveWs(pool, w, r)
  25. })
  26. }
  27. func main() {
  28. fmt.Println("Distributed Chat App v0.01")
  29. setupRoutes()
  30. http.ListenAndServe(":8080", nil)
  31. }

经过这些修改,我们应该检查一下这些是否破坏了现有的功能。尝试再次运行后端和前端,确保仍然可以发送和接收消息:

  1. $ cd backend/
  2. $ go run main.go

如果成功,我们可以继续扩展代码库来处理多客户端。

到目前为止,目录结构应如下所示:

  1. - backend/
  2. - - pkg/
  3. - - - websocket/
  4. - - - - websocket.go
  5. - - main.go
  6. - - go.mod
  7. - - go.sum
  8. - frontend/
  9. - ...

处理多客户端

现在已经完成了基本的操作,我们可以继续改进后端并实现处理多个客户端的功能。

为此,我们需要考虑如何处理与 WebSocket 服务的连接。每当建立新连接时,我们都必须将它们添加到现有连接池中,并确保每次发送消息时,该池中的每个人都会收到该消息。

使用 Channels

我们需要开发一个具有大量并发连接的系统。在该连接的持续时间内都会启动新的 goroutine 去处理每一个连接。这意味着我们必须关心这些并发 goroutine 之间的通信,并确保线程安全。

当进一步实现 Pool 结构时,我们必须考虑使用 sync.Mutex 来阻塞其他 goroutine 同时访问/修改数据,或者我们也可以使用 channels

对于这个项目,我认为最好使用 channels 并且以安全的方式在多个并发的 goroutine 中进行通信。

注意 - 如果想进一步了解 Go 中的 channels,可以在这里查看我的其他文章:Go Channels Tutorial

client.go

我们先创建一个名为 client.go 新文件,它将存在于 pkg/websocket 目录中,在文件中将定义一个包含以下内容的 Client 结构体:

  • ID:特定连接的唯一可识别字符串
  • Conn:指向 websocket.Conn 的指针
  • Pool:指向 Pool 的指针

还需要定义一个 Read() 方法,该方法将一直监听此 Client 的 websocket 连接上发出的新消息。

如果收到新消息,它将把这些消息传递给池的 Broadcast channel,该 channel 随后将接收的消息广播到池中的每个客户端。

  1. package websocket
  2. import (
  3. "fmt"
  4. "log"
  5. "github.com/gorilla/websocket"
  6. )
  7. type Client struct {
  8. ID string
  9. Conn *websocket.Conn
  10. Pool *Pool
  11. }
  12. type Message struct {
  13. Type int `json:"type"`
  14. Body string `json:"body"`
  15. }
  16. func (c *Client) Read() {
  17. defer func() {
  18. c.Pool.Unregister <- c
  19. c.Conn.Close()
  20. }()
  21. for {
  22. messageType, p, err := c.Conn.ReadMessage()
  23. if err != nil {
  24. log.Println(err)
  25. return
  26. }
  27. message := Message{Type: messageType, Body: string(p)}
  28. c.Pool.Broadcast <- message
  29. fmt.Printf("Message Received: %+v\n", message)
  30. }
  31. }

太棒了,我们已经在代码中定义了客户端,继续实现池。

Pool 结构体

我们在 pkg/websocket 目录下创建一个新文件 pool.go

首先定义一个 Pool 结构体,它将包含我们进行并发通信所需的所有 channels,以及一个客户端 map

  1. package websocket
  2. import "fmt"
  3. type Pool struct {
  4. Register chan *Client
  5. Unregister chan *Client
  6. Clients map[*Client]bool
  7. Broadcast chan Message
  8. }
  9. func NewPool() *Pool {
  10. return &Pool{
  11. Register: make(chan *Client),
  12. Unregister: make(chan *Client),
  13. Clients: make(map[*Client]bool),
  14. Broadcast: make(chan Message),
  15. }
  16. }

我们需要确保应用程序中只有一个点能够写入 WebSocket 连接,否则将面临并发写入问题。所以,定义了 Start() 方法,该方法将一直监听传递给 Pool channels 的内容,然后,如果它收到发送给其中一个 channel 的内容,它将采取相应的行动。

  • Register - 当新客户端连接时,Register channel 将向此池中的所有客户端发送 New User Joined...
  • Unregister - 注销用户,在客户端断开连接时通知池
  • Clients - 客户端的布尔值映射。可以使用布尔值来判断客户端活动/非活动
  • Broadcast - 一个 channel,当它传递消息时,将遍历池中的所有客户端并通过套接字发送消息。

代码:

  1. func (pool *Pool) Start() {
  2. for {
  3. select {
  4. case client := <-pool.Register:
  5. pool.Clients[client] = true
  6. fmt.Println("Size of Connection Pool: ", len(pool.Clients))
  7. for client, _ := range pool.Clients {
  8. fmt.Println(client)
  9. client.Conn.WriteJSON(Message{Type: 1, Body: "New User Joined..."})
  10. }
  11. break
  12. case client := <-pool.Unregister:
  13. delete(pool.Clients, client)
  14. fmt.Println("Size of Connection Pool: ", len(pool.Clients))
  15. for client, _ := range pool.Clients {
  16. client.Conn.WriteJSON(Message{Type: 1, Body: "User Disconnected..."})
  17. }
  18. break
  19. case message := <-pool.Broadcast:
  20. fmt.Println("Sending message to all clients in Pool")
  21. for client, _ := range pool.Clients {
  22. if err := client.Conn.WriteJSON(message); err != nil {
  23. fmt.Println(err)
  24. return
  25. }
  26. }
  27. }
  28. }
  29. }

websocket.go

太棒了,我们再对 websocket.go 文件进行一些小修改,并删除一些不再需要的函数和方法:

  1. package websocket
  2. import (
  3. "log"
  4. "net/http"
  5. "github.com/gorilla/websocket"
  6. )
  7. var upgrader = websocket.Upgrader{
  8. ReadBufferSize: 1024,
  9. WriteBufferSize: 1024,
  10. CheckOrigin: func(r *http.Request) bool { return true },
  11. }
  12. func Upgrade(w http.ResponseWriter, r *http.Request) (*websocket.Conn, error) {
  13. conn, err := upgrader.Upgrade(w, r, nil)
  14. if err != nil {
  15. log.Println(err)
  16. return nil, err
  17. }
  18. return conn, nil
  19. }

更新 main.go

最后,我们需要更新 main.go 文件,在每个连接上创建一个新 Client,并使用 Pool 注册该客户端:

  1. package main
  2. import (
  3. "fmt"
  4. "net/http"
  5. "github.com/TutorialEdge/realtime-chat-go-react/pkg/websocket"
  6. )
  7. func serveWs(pool *websocket.Pool, w http.ResponseWriter, r *http.Request) {
  8. fmt.Println("WebSocket Endpoint Hit")
  9. conn, err := websocket.Upgrade(w, r)
  10. if err != nil {
  11. fmt.Fprintf(w, "%+v\n", err)
  12. }
  13. client := &websocket.Client{
  14. Conn: conn,
  15. Pool: pool,
  16. }
  17. pool.Register <- client
  18. client.Read()
  19. }
  20. func setupRoutes() {
  21. pool := websocket.NewPool()
  22. go pool.Start()
  23. http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
  24. serveWs(pool, w, r)
  25. })
  26. }
  27. func main() {
  28. fmt.Println("Distributed Chat App v0.01")
  29. setupRoutes()
  30. http.ListenAndServe(":8080", nil)
  31. }

测试

现在已经做了所有必要的修改,我们应该测试已经完成的工作并确保一切按预期工作。

启动你的后端应用程序:

  1. $ go run main.go
  2. Distributed Chat App v0.01

如果你在几个浏览器中打开 http://localhost:3000,可以看到到它们会自动连接到后端 WebSocket 服务,现在我们可以发送和接收来自同一池内的其他客户端的消息!






总结

在本节中,我们设法实现了一种处理多个客户端的方法,并向连接池中连接的每个人广播消息。

现在开始变得有趣了。我们可以在下一节中添加新功能,例如自定义消息。

下一节:Part 5 - 优化前端


原文:tutorialedge.net/projects/ch…

作者:Elliot Forbes 译者:咔叽咔叽 校对:polaris1119

本文由 GCTT 原创编译,Go 中文网 荣誉推出

ft_authoradmin  ft_create_time2019-08-03 16:23
 ft_update_time2019-08-03 16:24