From de0cbf587351ca305d1b7d0f4d755f0babc987fd Mon Sep 17 00:00:00 2001 From: lqqyt2423 <974923609@qq.com> Date: Tue, 9 Feb 2021 22:06:58 +0800 Subject: [PATCH] fix websocket concurrent write --- addon/web/web.go | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/addon/web/web.go b/addon/web/web.go index d800827..bb6acf0 100644 --- a/addon/web/web.go +++ b/addon/web/web.go @@ -41,6 +41,11 @@ func (web *WebAddon) echo(w http.ResponseWriter, r *http.Request) { } } +type concurrentConn struct { + conn *websocket.Conn + mu sync.Mutex +} + type WebAddon struct { addon.Base addr string @@ -48,7 +53,7 @@ type WebAddon struct { serverMux *http.ServeMux server *http.Server - conns []*websocket.Conn + conns []*concurrentConn connsMu sync.RWMutex } @@ -79,7 +84,7 @@ func NewWebAddon() *WebAddon { web.server = &http.Server{Addr: web.addr, Handler: web.serverMux} log = log.WithField("in", "WebAddon") - web.conns = make([]*websocket.Conn, 0) + web.conns = make([]*concurrentConn, 0) go func() { log.Infof("server start listen at %v\n", web.addr) @@ -92,7 +97,7 @@ func NewWebAddon() *WebAddon { func (web *WebAddon) addConn(c *websocket.Conn) { web.connsMu.Lock() - web.conns = append(web.conns, c) + web.conns = append(web.conns, &concurrentConn{conn: c}) web.connsMu.Unlock() } @@ -102,7 +107,7 @@ func (web *WebAddon) removeConn(conn *websocket.Conn) { index := -1 for i, c := range web.conns { - if conn == c { + if conn == c.conn { index = i break } @@ -130,7 +135,9 @@ func (web *WebAddon) sendFlow(on string, f *flow.Flow) { return } for _, c := range conns { - c.WriteMessage(websocket.TextMessage, b) + c.mu.Lock() + c.conn.WriteMessage(websocket.TextMessage, b) + c.mu.Unlock() } }