您的位置 首页 golang

WebSocket+SLB(负载均衡)会话保持解决重连问题

写在最前面:由于现在游戏基本上采用全球大区的模式,全球玩家在同一个大区进行游戏,传统的单服模式已经不能够满足当前的服务需求,所以现在游戏服务器都在往微服务架构发展。当前我们游戏也是利用微服务架构来实现全球玩家同服游戏。
玩家每次断线(包括切换网络/超时断线)后应该会重新连接服务器,重连成功的话可以继续当前情景继续游戏,但是之前写的底层重连机制一直不能生效,导致每次玩家断线后重连都失败,要从账号登陆开始重新登陆,该文章写在已经定位了重连问题是由SLB引起后,提出的解决方案。

当前游戏架构:

  1. 客户端从一个Account服务器登陆并且拉取游戏服务器的所有SLB地址
  2. 客户端ping所有SLB地址,选择延迟最低的一个SLB进行连接
  3. 客户端连接SLB,SLB将连接转发到一个网关节点建立连接

问题:

每次重连后,客户端向SLB发送建立连接,SLB都会重新分配一个网关节点,导致客户端连接到其他网关,重连失败。

解决方法:

1. 开启SLB会话保持功能

会话保持的作用是什么?

将同一客户端的会话请求转发给指定的一个后端服务器处理。
负载均衡支持什么类型的会话保持?
-四层(TCP协议)服务,负载均衡系统是基于源IP的会话保持。四层会话保持的最长时间是3600秒。
-七层(HTTP/HTTPS协议)服务,负载均衡系统是基于cookie的会话保持。植入cookie的会话保持的最长时间是86400秒(24小时)。

开启SLB会话保持功能后,SLB会记录客户端的IP地址,在一定时间内,自动将同一个IP的连接转发到上次连接的网关。
在网络不稳定的情况下,游戏容易心跳或者发包超时,开启会话保持,能解决大部分情况下的重连问题。
但是在切换网络的时候,手机网络从Wifi切换成4G,自身IP会变,这时候连接必定和服务器断开,需要重新建立连接。由于IP已经变化,SLB不能识别到是同一个客户端发出的请求,会将连接转发到其他网关节点。所以使用TCP连接的情况下,SLB开启会话保持并不能解决所有的重连问题。
另外某些时刻,手机频繁开启和断开WI-FI,有时候可能不会断开网络,这并不是因为4G切换WI-FI时网络没断开,从4G切换到Wi-Fi网络,因为IP变了,服务器不能识别到新的IP,连接肯定是断开的。这时候网络没断开,主要是因为现在智能手机会对4G和Wi-Fi网络做个权重判断,当Wi-Fi网络频繁打开关闭时,手机会判断Wi-Fi网络不稳定,所有流量都走4G。所以网络没断开是因为一直使用4G连接,才没有断开。想要验证,只需要切换Wi-Fi时,把4G网络关闭,这样流量就必定走Wi-Fi。

2. 切换网络时的重连问题

上面说过,四层的TCP协议主要是基于IP来实现会话保持。但是切换网络的时候客户端的IP会变。所以要解决切换网络时的重连问题,只有两个方法:1. 当客户端成功连接网关节点后,记录下网关节点的IP,下次重连后不经过SLB,直接向网关节点发送连接请求。2.使用 SLB的七层(HTTP)转发服务。

2.1 客户端直接向网关发送请求连接

当客户端经过SLB将连接转发到网关时,二次握手验证成功后向客户端发送自己节点的IP,这样客户端下次连接的时候就能直接连接网关节点。但是这样会暴露网关的IP地址,为安全留下隐患。
如果不希望暴露网关的IP地址,就需要增加一层代理层,SLB将客户端请求转发到代理层,代理层再根据客户端带有的key,转发到正确的网关节点上。增加一层代理层,不仅会增加请求的响应时间,还会增加整体框架的复杂度。

2.2 利用SLB的七层(HTTP)转发服务

阿里云的七层SLB会话保持服务,主要是基于cookie的会话保持。客户端在往服务器发送HTTP请求后,服务器会返回客户端一个Response,SLB会在这时候,将经过的Response插入或者重写cookie。客户端获取到这个cookie,下次请求时会带上cookie,SLB判断Request的Headers里面有cookie,就将连接转发到之前的网关节点。
HTTP是短链接,我们游戏是长连接,所以用HTTP肯定不合适。但是可以考虑基于HTTP的WebSocket。

什么是WebSocket?

WebSocket (WS)是HTML5一种新的协议,它实现了浏览器与服务器全双工通信,能更好地节省服务器资源和带宽并达到实时通讯。WebSocket建立在TCP之上,同HTTP一样通过TCP来传输数据,但是它和HTTP最大不同是:
WebSocket是一种双向通信协议,在建立连接后,WebSocket服务器和Browser/Client Agent都能主动的向对方发送或接收数据,就像Socket一样;WebSocket需要类似TCP的客户端和服务器端通过握手连接,连接成功后才能相互通信。

WSS(Web Socket Secure)是WebSocket的加密版本。

WebSocket在建立连接的时候,会依赖HTTP协议进行一次握手,这时候客户端会给服务器发送Request,服务器也会给客户端返回一个Response,并且HTTP其实也是建立在TCP之上的通信协议。

SLB对WebSocket的支持

如何在阿里云负载均衡上启用WS/WSS支持?
无需配置,当选用HTTP监听时,默认支持无加密版本WebSocket协议(WS协议);当选择HTTPS监听时,默认支持加密版本的WebSocket协议(WSS协议)。
WSS/WS协议支持的约束如下:
负载均衡与ECS后端服务的连接采用HTTP/1.1,建议后端服务器采用支持HTTP/1.1的Web Server。
若负载均衡与后端服务超过60秒无消息交互,会主动断开连接,如需要维持连接一直不中断,需要主动实现保活机制,每60秒内进行一次报文交互。

查看阿里云SLB文档对WS的支持,说明SLB是支持WS协议的,并且SLB对于WS无需配置,只需要选用HTTP监听时,就能够转发WS协议。说明WS协议在SLB这边看来就是一个HTTP,这样WS走的也是七层的转发服务。只要SLB能够正常识别WS握手协议里Request的cookie和正常识别服务器返回的Response并且往里面插入cookie,就可以利用会话保持解决重连问题。

go语言实现Websocket服务器

Go语言实现WS服务器有两种方法,一种是利用golang.org/x/net下的websocket包,另外一种方法就是自己解读Websocket协议来实现,由于WS协议一样是基于TCP协议之上,完全可以通过监听TCP端口来实现。

1.握手

客户端发送Request消息

GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Origin: http://example.com
Sec-WebSocket-Version: 13

服务器返回Response消息

HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=

其中服务器返回的Sec-WebSocket-Accept字段,主要是用于客户端需要验证服务器是否支持WS。RFC6455文档中规定,在WebSocket通信协议中服务端为了证实已经接收了握手,它需要把两部分的数据合并成一个响应。一部分信息来自客户端握手的Sec-WebSocket-Keyt头字段:Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==。对于这个字段,服务端必须得到这个值(头字段中经过base64编码的值减去前后的空格)并与GUID”258EAFA5-E914-47DA-95CA-C5AB0DC85B11″组合成一个字符串,这个字符串对于不懂WebSocket协议的网络终端来说是不能使用的。这个组合经过SHA-1掩码,base64编码后在服务端的握手中返回。如果这个Sec-WebSocket-Accept计算错误浏览器会提示:Sec-WebSocket-Accept dismatch

如果返回成功,Websocket就会回调onopen事件

2.传输协议

游戏服务器的使用的TCP协议,是在协议的包头使用4Byte来声明本协议长度,然后将协议一次性发送。但是在WS协议是通过Frame形式发送的,会将一条消息分为几个frame,按照先后顺序传输出去。这样做会有几个好处:

  • a、大数据的传输可以分片传输,不用考虑到数据大小导致的长度标志位不足够的情况。
  • b、和http的chunk一样,可以边生成数据边传递消息,即提高传输效率。

websocket的协议格式:

0                   1                   2                   30 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1+-+-+-+-+-------+-+-------------+-------------------------------+|F|R|R|R| opcode|M| Payload len |    Extended payload length    ||I|S|S|S|  (4)  |A|     (7)     |             (16/64)           ||N|V|V|V|       |S|             |   (if payload len==126/127)   || |1|2|3|       |K|             |                               |+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +|     Extended payload length continued, if payload len == 127  |+ - - - - - - - - - - - - - - - +-------------------------------+|                               |Masking-key, if MASK set to 1  |+-------------------------------+-------------------------------+| Masking-key (continued)       |          Payload Data         |+-------------------------------- - - - - - - - - - - - - - - - +:                     Payload Data continued ...                :+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +|                     Payload Data continued ...                |+---------------------------------------------------------------+

参数说明如下:

* FIN:1位,用来表明这是一个消息的最后的消息片断,当然第一个消息片断也可能是最后的一个消息片断;* RSV1, RSV2, RSV3: 分别都是1位,如果双方之间没有约定自定义协议,那么这几位的值都必须为0,否则必须断掉WebSocket连接;* Opcode: 4位操作码,定义有效负载数据,如果收到了一个未知的操作码,连接也必须断掉,以下是定义的操作码:*  %x0 表示连续消息片断*  %x1 表示文本消息片断*  %x2 表未二进制消息片断*  %x3-7 为将来的非控制消息片断保留的操作码*  %x8 表示连接关闭*  %x9 表示心跳检查的ping*  %xA 表示心跳检查的pong*  %xB-F 为将来的控制消息片断的保留操作码* Mask: 1位,定义传输的数据是否有加掩码,如果设置为1,掩码键必须放在masking-key区域,客户端发送给服务端的所有消息,此位的值都是1;* Payload length: 传输数据的长度,以字节的形式表示:7位、7+16位、或者7+64位。如果这个值以字节表示是0-125这个范围,那这个值就表示传输数据的长度;如果这个值是126,则随后的两个字节表示的是一个16进制无符号数,用来表示传输数据的长度;如果这个值是127,则随后的是8个字节表示的一个64位无符合数,这个数用来表示传输数据的长度。多字节长度的数量是以网络字节的顺序表示。负载数据的长度为扩展数据及应用数据之和,扩展数据的长度可能为0,因而此时负载数据的长度就为应用数据的长度。* Masking-key: 0或4个字节,客户端发送给服务端的数据,都是通过内嵌的一个32位值作为掩码的;掩码键只有在掩码位设置为1的时候存在。* Payload data: (x+y)位,负载数据为扩展数据及应用数据长度之和。* Extension data: x位,如果客户端与服务端之间没有特殊约定,那么扩展数据的长度始终为0,任何的扩展都必须指定扩展数据的长度,或者长度的计算方式,以及在握手时如何确定正确的握手方式。如果存在扩展数据,则扩展数据就会包括在负载数据的长度之内。* Application data: y位,任意的应用数据,放在扩展数据之后,应用数据的长度=负载数据的长度-扩展数据的长度。

3.SLB植入的Cookie处理

阿里云的SLB开启HTTP监听后,会检查过往的Request和Response请求,收到服务器返回的Response后,会往Response插入一个Cookie

植入cookie: 此种方法下,您只需要指定cookie的过期时间。客户端第一次访问时,负载均衡服务在返回请求中植入cookie(即在HTTP/HTTPS响应报文中插入SERVERID字串),下次客户端携带此cookie访问,负载均衡服务会将请求定向转发给之前记录到的ECS实例上。

客户端收到服务器的Response后,可以在Header中查到有个“Set-Cookie”字段,里面是SLB插入的Cookie值

Set-Cookie:SERVERID=1d94100b7e13c96fa2979b58edda2aa0|1587613640|1587613640;Path=/

客户端断开连接后,下次发送请求需要往Headers插入Cookie字段

Cookie:SERVERID=1d94100b7e13c96fa2979b58edda2aa0|1587613640|1587613640;Path=/

WS服务器与客户端Demo

1.WS客户端DEMO(JAVA实现)

package com.zhenyouqu.wsclient;
import org.java_websocket.drafts.Draft;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.drafts.Draft_6455;
import org.java_websocket.handshake.ServerHandshake;
import org.java_websocket.enums.ReadyState;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;

public class wsclient {
    static String cookie = null;
    private static Logger logger = LoggerFactory.getLogger(WebSocketClient.class);
    public static WebSocketClient client;
    static CountDownLatch countDownLatch;

    public static void main(String[] args) throws InterruptedException {
        tryConnect();
        //因为WebSocketClient请求是异步返回调用,所以需要等待上一次返回设置Cookie后,再设置Cookie进行请求
        countDownLatch.await();
        tryConnect();
        countDownLatch.await();
        tryConnect();
    }

    public static void tryConnect() throws InterruptedException {
        try {
            countDownLatch = new CountDownLatch(1);
            Draft draft = new Draft_6455();
            HashMap<String, String> headers = new HashMap<>();
            if (cookie != null) {
                headers.put("Cookie", cookie);
            }
            client = new WebSocketClient(new URI("ws://101.133.195.232:8000"), draft, headers) {
                @Override
                public void onOpen(ServerHandshake serverHandshake) {
                    cookie = serverHandshake.getFieldValue("Set-Cookie");
                    System.out.println("收到Cookie:" + cookie);
                    countDownLatch.countDown();
                }
                @Override
                public void onMessage(String msg) {
                    System.out.println("收到消息==========\n" + msg);
                    if (msg.equals("over")) {
                        client.close();
                    }
                }
                @Override
                public void onClose(int i, String s, boolean b) {
                    logger.info("链接已关闭");
                }

                @Override
                public void onError(Exception e) {
                    e.printStackTrace();
                    logger.info("发生错误已关闭");
                }
            };
        } catch (URISyntaxException e) {
            e.printStackTrace();
        }
        client.connect();
        logger.info(client.getDraft());
        while (!client.getReadyState().equals(ReadyState.OPEN)) {
            logger.info("正在连接...");
        } 
        //连接成功,发送信息
        client.send("哈喽,连接一下啊"); 
        //等待三秒 接受数据 
        Thread.sleep(1000);
        client.close();
    }
}

2.WS服务器DEMO

package main

import (
   "crypto/sha1"
   "encoding/base64"
   "errors"
   "flag"
   "fmt"
   "io"
   "log"
   "net"
   "strings"
)

var serverId = flag.Int("serverId", 1, "input serverId")

func main() {
   flag.Parse()
   ln, err := net.Listen("tcp", ":8000")
   if err != nil {
      log.Panic(err)
   }
   for {
      conn, err := ln.Accept()
      if err != nil {
         log.Println("Accept err:", err)
      }
      handleConnection(conn)
   }
}
func handleConnection(conn net.Conn) {
   defer conn.Close()
   content := make([]byte, 1024)
   _, err := conn.Read(content)
   log.Println(string(content))
   if err != nil {
      log.Println(err)
   }
   isHttp := false
   // 先暂时这么判断
   if string(content[0:3]) == "GET" {
      isHttp = true
   }
   log.Println("isHttp:", isHttp)
   if isHttp {
      headers := parseHandshake(string(content))
      log.Println("headers", headers)
      secWebsocketKey := headers["Sec-WebSocket-Key"]
      // NOTE:这里省略其他的验证 
      guid := "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
      // 计算Sec-WebSocket-Accept 
      h := sha1.New()
      log.Println("accept raw:", secWebsocketKey+guid)
      io.WriteString(h, secWebsocketKey+guid)
      accept := make([]byte, 28)
      base64.StdEncoding.Encode(accept, h.Sum(nil))
      log.Println(string(accept))
      response := "HTTP/1.1 101 Switching Protocols\r\n"
      response = response + "Sec-WebSocket-Accept: " + string(accept) + "\r\n"
      response = response + "Connection: Upgrade\r\n"
      response = response + "Upgrade: websocket\r\n\r\n"
      log.Println("response:", response)
      if lenth, err := conn.Write([]byte(response));
         err != nil {
         log.Println(err)
      } else {
         log.Println("send len:", lenth)
      }
      wssocket := NewWsSocket(conn)
      for {
         data, err := wssocket.ReadIframe()
         if err != nil {
            log.Println("readIframe err:", err)
            break
         }
         log.Println("read data:", string(data))
         err = wssocket.SendIframe([]byte(fmt.Sprintf("serverId:%d", *serverId)))
         if err != nil {
            log.Println("sendIframe err:", err)
            break
         }
         log.Println("send data")
      }
   } else {
      log.Println("receive tcp content")
      log.Println(string(content)) 
      // 直接读取
   }
}

type WsSocket struct{ 
   MaskingKey []byte
   Conn net.Conn
}

func NewWsSocket(conn net.Conn) *WsSocket { 
   return &WsSocket{Conn: conn}
}
func (this *WsSocket) SendIframe(data []byte) error {
   // 这里只处理data长度<125的
   if len(data) >= 125 {
      return errors.New("send iframe data error")
   }
   lenth := len(data)
   maskedData := make([]byte, lenth)
   for i := 0; i < lenth; i++ {
      if this.MaskingKey != nil {
         maskedData[i] = data[i] ^ this.MaskingKey[i%4]
      } else {
         maskedData[i] = data[i]
      }
   }
   this.Conn.Write([]byte{0x81})
   var payLenByte byte
   if this.MaskingKey != nil && len(this.MaskingKey) != 4 {
      payLenByte = byte(0x80) | byte(lenth)
      this.Conn.Write([]byte{payLenByte})
      this.Conn.Write(this.MaskingKey)
   } else {
      payLenByte = byte(0x00) | byte(lenth)
      this.Conn.Write([]byte{payLenByte})
   }
   this.Conn.Write(data)
   return nil
}
func (this *WsSocket) ReadIframe() (data []byte, err error) {
   err = nil
   //第一个字节:FIN + RSV1-3 + OPCODE 
   opcodeByte := make([]byte, 1)
   this.Conn.Read(opcodeByte)
   //断开连接 
   if len(opcodeByte) == 1 && opcodeByte[0] == 0 {
      return opcodeByte, errors.New("close connect error")
   }
   FIN := opcodeByte[0] >> 7
   RSV1 := opcodeByte[0] >> 6 & 1
   RSV2 := opcodeByte[0] >> 5 & 1
   RSV3 := opcodeByte[0] >> 4 & 1
   OPCODE := opcodeByte[0] & 15
   log.Println(RSV1, RSV2, RSV3, OPCODE)
   //OPCODE==8 连接关闭
   if OPCODE == 8 {
      return opcodeByte, errors.New("close connect normal")
   }
   //心跳ping
   if OPCODE == 9 {
      //TODO: 返回心跳pong 
      return opcodeByte, nil
   }
   payloadLenByte := make([]byte, 1)
   this.Conn.Read(payloadLenByte)
   payloadLen := int(payloadLenByte[0] & 0x7F)
   mask := payloadLenByte[0] >> 7
   if payloadLen == 127 {
      extendedByte := make([]byte, 8)
      this.Conn.Read(extendedByte)
   }
   maskingByte := make([]byte, 4)
   if mask == 1 {
      this.Conn.Read(maskingByte)
      this.MaskingKey = maskingByte
   }
   payloadDataByte := make([]byte, payloadLen)
   this.Conn.Read(payloadDataByte)
   log.Println("data:", payloadDataByte)
   dataByte := make([]byte, payloadLen)
   //TODO: 需要优化 
   for i := 0; i < payloadLen; i++ {
      if mask == 1 {
         dataByte[i] = payloadDataByte[i] ^ maskingByte[i%4]
      } else {
         dataByte[i] = payloadDataByte[i]
      }
   }
   if FIN == 1 {
      data = dataByte
      return
   }
   nextData, err := this.ReadIframe()
   if err != nil {
      return
   }
   data = append(data, nextData...)
   return
}
func parseHandshake(content string) map[string]string {
   headers := make(map[string]string, 10)
   lines := strings.Split(content, "\r\n")
   for _, line := range lines {
      if len(line) >= 0 {
         words := strings.Split(line, ":")
         if len(words) == 2 {
            headers[strings.Trim(words[0], " ")] = strings.Trim(words[1], " ")
         }
      }
   }
   return headers
}

3.部署实验

分别在阿里云的两台ECS实例上部署WS服务器,打开8000端口,开启一个SLB服务,SLB服务选择HTTP方式监听,并且打开会话保持功能,Cookie处理方式选择植入Cookie。Demo服务器没有做HTTP健康监听的处理,健康检查这块可以先关掉。

在两台ECS上启动WS服务器,然后本地运行客户端,分别测试两台服务器是否能正常连接,测试完毕后,测试SLB能否正常工作。服务器和SLB都正常的情况下,运行客户端,客户端会得到以下结果

收到Cookie:SERVERID=1d94100b7e13c96fa2979b58edda2aa0|1587619495|1587619495;Path=/收到消息==========serverId:1收到Cookie:SERVERID=1d94100b7e13c96fa2979b58edda2aa0|1587619496|1587619495;Path=/收到消息==========serverId:1收到Cookie:SERVERID=1d94100b7e13c96fa2979b58edda2aa0|1587619497|1587619495;Path=/收到消息==========serverId:1

收到的三次Cookie都相同,说明Cookie是有正常植入工作的,并且三次都被SLB正确抓取了。
收到的三次serverId也都是同样的值,说明三次都是同一个ECS上的服务器响应。

至此,验证成功。

Websocket+SLB会话保持能够解决超时重连和切换网络时重连的问题。


文章来源:智云一二三科技

文章标题:WebSocket+SLB(负载均衡)会话保持解决重连问题

文章地址:https://www.zhihuclub.com/171.shtml

关于作者: 智云科技

热门文章

评论已关闭

6条评论

  1. 1 During menopause, estrogen levels decline, therefore the skin can become more sensitive to damage

  2. He knew that suffering what are the side effects of beta blockers was not enough, It can be a pile of meat dregs that will be teleported to the demon world aeruginosa was 100 resistant to amikacin, cefepime, ciprofloxacin and tetracycline

  3. After a stroke, your doctor will likely give you a cholesterol lowering medication called a statin

  4. Adam standing in the same pose he d first struck when he was modeling for the college art class I taught; Adam s face, when he first opened his eyes Side effects of Tetradox 100mg in details

网站地图