WebSocket+SLB(负载均衡)会话保持解决重连问题

写在最前面:由于现在游戏基本上采用全球大区的模式,全球玩家在同一个大区进行游戏,传统的单服模式已经不能够满足当前的服务需求,所以现在游戏服务器都在往微服务架构发展。当前我们游戏也是利用微服务架构来实现全球玩家同服游戏。
玩家每次断线(包括切换网络/超时断线)后应该会重新连接服务器,重连成功的话可以继续当前情景继续游戏,但是之前写的底层重连机制一直不能生效,导致每次玩家断线后重连都失败,要从账号登陆开始重新登陆,该文章写在已经定位了重连问题是由SLB引起后,提出的解决方案。

当前游戏架构:

  1. 客户端从一个Account服务器登陆并且拉取游戏服务器的所有SLB地址

  2. 客户端ping所有SLB地址,选择延迟最低的一个SLB进行连接

  3. 客户端连接SLB,SLB将连接转发到一个网关节点建立连接

问题:

每次重连后,客户端向SLB发送建立连接,SLB都会重新分配一个网关节点,导致客户端连接到其他网关,重连失败。

解决方法:

1. 开启SLB会话保持功能

会话保持的作用是什么?

将同一客户端的会话请求转发给指定的一个后端服务器处理。
负载均衡支持什么类型的会话保持?
-四层(TCP协议)服务,负载均衡系统是基于源IP的会话保持。四层会话保持的最长时间是3600秒。
-七层(HTTP/HTTPS协议)服务,负载均衡系统是基于cookie的会话保持。植入cookie的会话保持的最长时间是86400秒(24小时)。

开启SLB会话保持功能后,SLB会记录客户端的IP地址,在一定时间内,自动将同一个IP的连接转发到上次连接的网关。
在网络不稳定的情况下,游戏容易心跳或者发包超时,开启会话保持,能解决大部分情况下的重连问题。
但是在切换网络的时候,手机网络从Wifi切换成4G,自身IP会变,这时候连接必定和服务器断开,需要重新建立连接。由于IP已经变化,SLB不能识别到是同一个客户端发出的请求,会将连接转发到其他网关节点。所以使用TCP连接的情况下,SLB开启会话保持并不能解决所有的重连问题。
另外某些时刻,手机频繁开启和断开WI-FI,有时候可能不会断开网络,这并不是因为4G切换WI-FI时网络没断开,从4G切换到Wi-Fi网络,因为IP变了,服务器不能识别到新的IP,连接肯定是断开的。这时候网络没断开,主要是因为现在智能手机会对4G和Wi-Fi网络做个权重判断,当Wi-Fi网络频繁打开关闭时,手机会判断Wi-Fi网络不稳定,所有流量都走4G。所以网络没断开是因为一直使用4G连接,才没有断开。想要验证,只需要切换Wi-Fi时,把4G网络关闭,这样流量就必定走Wi-Fi。

2. 切换网络时的重连问题

上面说过,四层的TCP协议主要是基于IP来实现会话保持。但是切换网络的时候客户端的IP会变。所以要解决切换网络时的重连问题,只有两个方法:1. 当客户端成功连接网关节点后,记录下网关节点的IP,下次重连后不经过SLB,直接向网关节点发送连接请求。2.使用 SLB的七层(HTTP)转发服务。

2.1 客户端直接向网关发送请求连接

当客户端经过SLB将连接转发到网关时,二次握手验证成功后向客户端发送自己节点的IP,这样客户端下次连接的时候就能直接连接网关节点。但是这样会暴露网关的IP地址,为安全留下隐患。
如果不希望暴露网关的IP地址,就需要增加一层代理层,SLB将客户端请求转发到代理层,代理层再根据客户端带有的key,转发到正确的网关节点上。增加一层代理层,不仅会增加请求的响应时间,还会增加整体框架的复杂度。

2.2 利用SLB的七层(HTTP)转发服务

阿里云的七层SLB会话保持服务,主要是基于cookie的会话保持。客户端在往服务器发送HTTP请求后,服务器会返回客户端一个Response,SLB会在这时候,将经过的Response插入或者重写cookie。客户端获取到这个cookie,下次请求时会带上cookie,SLB判断Request的Headers里面有cookie,就将连接转发到之前的网关节点。
HTTP是短链接,我们游戏是长连接,所以用HTTP肯定不合适。但是可以考虑基于HTTP的WebSocket。

什么是WebSocket?

WebSocket (WS)是HTML5一种新的协议,它实现了浏览器与服务器全双工通信,能更好地节省服务器资源和带宽并达到实时通讯。WebSocket建立在TCP之上,同HTTP一样通过TCP来传输数据,但是它和HTTP最大不同是:
WebSocket是一种双向通信协议,在建立连接后,WebSocket服务器和Browser/Client Agent都能主动的向对方发送或接收数据,就像Socket一样;WebSocket需要类似TCP的客户端和服务器端通过握手连接,连接成功后才能相互通信。

WSS(Web Socket Secure)是WebSocket的加密版本。

WebSocket在建立连接的时候,会依赖HTTP协议进行一次握手,这时候客户端会给服务器发送Request,服务器也会给客户端返回一个Response,并且HTTP其实也是建立在TCP之上的通信协议。

SLB对WebSocket的支持

如何在阿里云负载均衡上启用WS/WSS支持?
无需配置,当选用HTTP监听时,默认支持无加密版本WebSocket协议(WS协议);当选择HTTPS监听时,默认支持加密版本的WebSocket协议(WSS协议)。
WSS/WS协议支持的约束如下:
负载均衡与ECS后端服务的连接采用HTTP/1.1,建议后端服务器采用支持HTTP/1.1的Web Server。
若负载均衡与后端服务超过60秒无消息交互,会主动断开连接,如需要维持连接一直不中断,需要主动实现保活机制,每60秒内进行一次报文交互。

查看阿里云SLB文档对WS的支持,说明SLB是支持WS协议的,并且SLB对于WS无需配置,只需要选用HTTP监听时,就能够转发WS协议。说明WS协议在SLB这边看来就是一个HTTP,这样WS走的也是七层的转发服务。只要SLB能够正常识别WS握手协议里Request的cookie和正常识别服务器返回的Response并且往里面插入cookie,就可以利用会话保持解决重连问题。

go语言实现Websocket服务器

Go语言实现WS服务器有两种方法,一种是利用golang.org/x/net下的websocket包,另外一种方法就是自己解读Websocket协议来实现,由于WS协议一样是基于TCP协议之上,完全可以通过监听TCP端口来实现。

1.握手

客户端发送Request消息

GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Origin: http://example.com
Sec-WebSocket-Version: 13

服务器返回Response消息

HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=

其中服务器返回的Sec-WebSocket-Accept字段,主要是用于客户端需要验证服务器是否支持WS。RFC6455文档中规定,在WebSocket通信协议中服务端为了证实已经接收了握手,它需要把两部分的数据合并成一个响应。一部分信息来自客户端握手的Sec-WebSocket-Keyt头字段:Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==。对于这个字段,服务端必须得到这个值(头字段中经过base64编码的值减去前后的空格)并与GUID"258EAFA5-E914-47DA-95CA-C5AB0DC85B11"组合成一个字符串,这个字符串对于不懂WebSocket协议的网络终端来说是不能使用的。这个组合经过SHA-1掩码,base64编码后在服务端的握手中返回。如果这个Sec-WebSocket-Accept计算错误浏览器会提示:Sec-WebSocket-Accept dismatch

如果返回成功,Websocket就会回调onopen事件

2.传输协议

游戏服务器的使用的TCP协议,是在协议的包头使用4Byte来声明本协议长度,然后将协议一次性发送。但是在WS协议是通过Frame形式发送的,会将一条消息分为几个frame,按照先后顺序传输出去。这样做会有几个好处:

  • a、大数据的传输可以分片传输,不用考虑到数据大小导致的长度标志位不足够的情况。
  • b、和http的chunk一样,可以边生成数据边传递消息,即提高传输效率。

websocket的协议格式:

0                   1                   2                   3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len |    Extended payload length    |
|I|S|S|S|  (4)  |A|     (7)     |             (16/64)           |
|N|V|V|V|       |S|             |   (if payload len==126/127)   |
| |1|2|3|       |K|             |                               |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
|     Extended payload length continued, if payload len == 127  |
+ - - - - - - - - - - - - - - - +-------------------------------+
|                               |Masking-key, if MASK set to 1  |
+-------------------------------+-------------------------------+
| Masking-key (continued)       |          Payload Data         |
+-------------------------------- - - - - - - - - - - - - - - - +
:                     Payload Data continued ...                :
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
|                     Payload Data continued ...                |
+---------------------------------------------------------------+

参数说明如下:

* FIN:1位,用来表明这是一个消息的最后的消息片断,当然第一个消息片断也可能是最后的一个消息片断;
* RSV1, RSV2, RSV3: 分别都是1位,如果双方之间没有约定自定义协议,那么这几位的值都必须为0,否则必须断掉WebSocket连接;
* Opcode: 4位操作码,定义有效负载数据,如果收到了一个未知的操作码,连接也必须断掉,以下是定义的操作码:
*  %x0 表示连续消息片断
*  %x1 表示文本消息片断
*  %x2 表未二进制消息片断
*  %x3-7 为将来的非控制消息片断保留的操作码
*  %x8 表示连接关闭
*  %x9 表示心跳检查的ping
*  %xA 表示心跳检查的pong
*  %xB-F 为将来的控制消息片断的保留操作码
* Mask: 1位,定义传输的数据是否有加掩码,如果设置为1,掩码键必须放在masking-key区域,客户端发送给服务端的所有消息,此位的值都是1;
* Payload length: 传输数据的长度,以字节的形式表示:7位、7+16位、或者7+64位。如果这个值以字节表示是0-125这个范围,那这个值就表示传输数据的长度;如果这个值是126,则随后的两个字节表示的是一个16进制无符号数,用来表示传输数据的长度;如果这个值是127,则随后的是8个字节表示的一个64位无符合数,这个数用来表示传输数据的长度。多字节长度的数量是以网络字节的顺序表示。负载数据的长度为扩展数据及应用数据之和,扩展数据的长度可能为0,因而此时负载数据的长度就为应用数据的长度。
* Masking-key: 0或4个字节,客户端发送给服务端的数据,都是通过内嵌的一个32位值作为掩码的;掩码键只有在掩码位设置为1的时候存在。
* Payload data: (x+y)位,负载数据为扩展数据及应用数据长度之和。
* Extension data: x位,如果客户端与服务端之间没有特殊约定,那么扩展数据的长度始终为0,任何的扩展都必须指定扩展数据的长度,或者长度的计算方式,以及在握手时如何确定正确的握手方式。如果存在扩展数据,则扩展数据就会包括在负载数据的长度之内。
* Application data: y位,任意的应用数据,放在扩展数据之后,应用数据的长度=负载数据的长度-扩展数据的长度。

3.SLB植入的Cookie处理

阿里云的SLB开启HTTP监听后,会检查过往的Request和Response请求,收到服务器返回的Response后,会往Response插入一个Cookie

植入cookie: 此种方法下,您只需要指定cookie的过期时间。客户端第一次访问时,负载均衡服务在返回请求中植入cookie(即在HTTP/HTTPS响应报文中插入SERVERID字串),下次客户端携带此cookie访问,负载均衡服务会将请求定向转发给之前记录到的ECS实例上。

客户端收到服务器的Response后,可以在Header中查到有个“Set-Cookie”字段,里面是SLB插入的Cookie值

Set-Cookie:SERVERID=1d94100b7e13c96fa2979b58edda2aa0|1587613640|1587613640;Path=/

客户端断开连接后,下次发送请求需要往Headers插入Cookie字段

Cookie:SERVERID=1d94100b7e13c96fa2979b58edda2aa0|1587613640|1587613640;Path=/

WS服务器与客户端Demo

1.WS客户端DEMO(JAVA实现)

package com.zhenyouqu.wsclient;

import org.java_websocket.drafts.Draft;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.java_websocket.client.WebSocketClient;

import org.java_websocket.drafts.Draft_6455;

import org.java_websocket.handshake.ServerHandshake;

import org.java_websocket.enums.ReadyState;

import java.net.URI;

import java.net.URISyntaxException;

import java.util.HashMap;

import java.util.concurrent.CountDownLatch;

public class wsclient {

    static String cookie = null;

    private static Logger logger = LoggerFactory.getLogger(WebSocketClient.class);

    public static WebSocketClient client;

    static CountDownLatch countDownLatch;

    public static void main(String[] args) throws InterruptedException {

        tryConnect();

        //因为WebSocketClient请求是异步返回调用,所以需要等待上一次返回设置Cookie后,再设置Cookie进行请求

        countDownLatch.await();

        tryConnect();

        countDownLatch.await();

        tryConnect();

    }

    public static void tryConnect() throws InterruptedException {

        try {

            countDownLatch = new CountDownLatch(1);

            Draft draft = new Draft_6455();

            HashMap<String, String> headers = new HashMap<>();

            if(cookie != null) {

                headers.put("Cookie", cookie);

            }

            client = new WebSocketClient(new URI("ws://101.133.195.232:8000"),draft, headers) {

                @Override

                public void onOpen(ServerHandshake serverHandshake) {

                    cookie = serverHandshake.getFieldValue("Set-Cookie");

                    System.out.println("收到Cookie:"+cookie);

                    countDownLatch.countDown();

                }

                @Override

                public void onMessage(String msg) {

                    System.out.println("收到消息==========\n"+msg);

                    if(msg.equals("over")){

                        client.close();

                    }

                }

                @Override

                public void onClose(int i, String s, boolean b) {

                    logger.info("链接已关闭");

                }

                @Override

                public void onError(Exception e){

                    e.printStackTrace();

                    logger.info("发生错误已关闭");

                }

            };

        } catch (URISyntaxException e) {

            e.printStackTrace();

        }

        client.connect();

        //logger.info(client.getDraft());

        while(!client.getReadyState().equals(ReadyState.OPEN)){

            logger.info("正在连接...");

        }

        //连接成功,发送信息

        client.send("哈喽,连接一下啊");

        //等待三秒 接受数据

        Thread.sleep(1000);

        client.close();

    }

}

2.WS服务器DEMO

package main

import (

    "crypto/sha1"

    "encoding/base64"

    "errors"

    "flag"

    "fmt"

    "io"

    "log"

    "net"

    "strings"

)

var serverId = flag.Int("serverId", 1, "input serverId")

func main() {

    flag.Parse()

    ln, err := net.Listen("tcp",":8000")

    if err != nil {

        log.Panic(err)

    }

    for {

        conn, err := ln.Accept()

        if err != nil {

            log.Println("Accept err:", err)

        }

        handleConnection(conn)

    }

}

func handleConnection(conn net.Conn) {

    defer conn.Close()

    content := make([]byte, 1024)

    _, err := conn.Read(content)

    log.Println(string(content))

    if err != nil {

        log.Println(err)

    }

    isHttp := false

    // 先暂时这么判断

    if string(content[0:3]) == "GET" {

        isHttp = true;

    }

    log.Println("isHttp:", isHttp)

    if isHttp {

        headers := parseHandshake(string(content))

        log.Println("headers", headers)

        secWebsocketKey := headers["Sec-WebSocket-Key"]

        // NOTE:这里省略其他的验证

        guid := "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"

        // 计算Sec-WebSocket-Accept

        h := sha1.New()

        log.Println("accept raw:", secWebsocketKey + guid)

        io.WriteString(h, secWebsocketKey + guid)

        accept := make([]byte, 28)

        base64.StdEncoding.Encode(accept, h.Sum(nil))

        log.Println(string(accept))

        response := "HTTP/1.1 101 Switching Protocols\r\n"

        response = response + "Sec-WebSocket-Accept: " + string(accept) + "\r\n"

        response = response + "Connection: Upgrade\r\n"

        response = response + "Upgrade: websocket\r\n\r\n"

        log.Println("response:", response)

        if lenth, err := conn.Write([]byte(response)); err != nil {

            log.Println(err)

        }else {

            log.Println("send len:", lenth)

        }

        wssocket := NewWsSocket(conn)

        for {

            data, err := wssocket.ReadIframe()

            if err != nil {

                log.Println("readIframe err:" , err)

                break

            }

            log.Println("read data:", string(data))

            err = wssocket.SendIframe([]byte(fmt.Sprintf("serverId:%d",*serverId)))

            if err != nil {

                log.Println("sendIframe err:" , err)

                break

            }

            log.Println("send data")

        }

    }else {

        log.Println("receive tcp content")

        log.Println(string(content))

        // 直接读取

    }

}

type WsSocket struct {

    MaskingKey []byte

    Conn net.Conn

}

func NewWsSocket(conn net.Conn) *WsSocket {

    return &WsSocket{Conn: conn}

}

func (this *WsSocket)SendIframe(data []byte) error {

    // 这里只处理data长度<125的

    if len(data) >= 125 {

        return errors.New("send iframe data error")

    }

    lenth := len(data)

    maskedData := make([]byte, lenth)

    for i := 0; i < lenth; i++ {

        if this.MaskingKey != nil {

            maskedData[i] = data[i] ^ this.MaskingKey[i % 4]

        }else {

            maskedData[i] = data[I]

        }

    }

    this.Conn.Write([]byte{0x81})

    var payLenByte byte

    if this.MaskingKey != nil && len(this.MaskingKey) != 4 {

        payLenByte = byte(0x80) | byte(lenth)

        this.Conn.Write([]byte{payLenByte})

        this.Conn.Write(this.MaskingKey)

    }else {

        payLenByte = byte(0x00) | byte(lenth)

        this.Conn.Write([]byte{payLenByte})

    }

    this.Conn.Write(data)

    return nil

}

func (this *WsSocket)ReadIframe() (data []byte, err error){

    err = nil

    //第一个字节:FIN + RSV1-3 + OPCODE

    opcodeByte := make([]byte, 1)

    this.Conn.Read(opcodeByte)

    //断开连接

    if len(opcodeByte)==1 && opcodeByte[0] ==0 {

        return opcodeByte, errors.New("close connect error")

    }

    FIN := opcodeByte[0] >> 7

    RSV1 := opcodeByte[0] >> 6 & 1

    RSV2 := opcodeByte[0] >> 5 & 1

    RSV3 := opcodeByte[0] >> 4 & 1

    OPCODE := opcodeByte[0] & 15

    log.Println(RSV1,RSV2,RSV3,OPCODE)

    //OPCODE==8 连接关闭

    if OPCODE == 8 {

        return opcodeByte, errors.New("close connect normal")

    }

    //心跳ping

    if OPCODE == 9 {

        //TODO: 返回心跳pong

        return opcodeByte, nil

    }

    payloadLenByte := make([]byte, 1)

    this.Conn.Read(payloadLenByte)

    payloadLen := int(payloadLenByte[0] & 0x7F)

    mask := payloadLenByte[0] >> 7

    if payloadLen == 127 {

        extendedByte := make([]byte, 8)

        this.Conn.Read(extendedByte)

    }

    maskingByte := make([]byte, 4)

    if mask == 1 {

        this.Conn.Read(maskingByte)

        this.MaskingKey = maskingByte

    }

    payloadDataByte := make([]byte, payloadLen)

    this.Conn.Read(payloadDataByte)

    log.Println("data:", payloadDataByte)

    dataByte := make([]byte, payloadLen)

    //TODO: 需要优化

    for i := 0; i < payloadLen; i++ {

        if mask == 1 {

            dataByte[i] = payloadDataByte[i] ^ maskingByte[i % 4]

        }else {

            dataByte[i] = payloadDataByte[I]

        }

    }

    if FIN == 1 {

        data = dataByte

        return

    }

    nextData, err := this.ReadIframe()

    if err != nil {

        return

    }

    data = append(data, nextData...)

    return

}

func parseHandshake(content string) map[string]string {

    headers := make(map[string]string, 10)

    lines := strings.Split(content, "\r\n")

    for _,line := range lines {

        if len(line) >= 0 {

            words := strings.Split(line, ":")

            if len(words) == 2 {

                headers[strings.Trim(words[0]," ")] = strings.Trim(words[1], " ")

            }

        }

    }

    return headers

}

3.部署实验

分别在阿里云的两台ECS实例上部署WS服务器,打开8000端口,开启一个SLB服务,SLB服务选择HTTP方式监听,并且打开会话保持功能,Cookie处理方式选择植入Cookie。Demo服务器没有做HTTP健康监听的处理,健康检查这块可以先关掉。

在两台ECS上启动WS服务器,然后本地运行客户端,分别测试两台服务器是否能正常连接,测试完毕后,测试SLB能否正常工作。服务器和SLB都正常的情况下,运行客户端,客户端会得到以下结果

收到Cookie:SERVERID=1d94100b7e13c96fa2979b58edda2aa0|1587619495|1587619495;Path=/
收到消息==========
serverId:1
收到Cookie:SERVERID=1d94100b7e13c96fa2979b58edda2aa0|1587619496|1587619495;Path=/
收到消息==========
serverId:1
收到Cookie:SERVERID=1d94100b7e13c96fa2979b58edda2aa0|1587619497|1587619495;Path=/
收到消息==========
serverId:1

收到的三次Cookie都相同,说明Cookie是有正常植入工作的,并且三次都被SLB正确抓取了。
收到的三次serverId也都是同样的值,说明三次都是同一个ECS上的服务器响应。

至此,验证成功。

Websocket+SLB会话保持能够解决超时重连和切换网络时重连的问题。

参考:
阿里云会话保持
解答Wi-Fi与4G网络切换的困惑
WebSocket的实现原理
阿里云SLB对WebSocket的支持
HTTP Headers和Cookie


发表评论

您的电子邮箱地址不会被公开。