今天来介绍一个socket连接复用的包
https://github.com/xtaci/smux
如图所示,多个channel输入通过smux合并在一个连接中,后端服务将连接中的channel分离出来进行处理
场景分析
假设一个简单的使用场景,一个apiservice网关服务对外提供HTTP接口,后面还有一个rand随机数服务,对内提供随机数TCP接口。
客户端访问apiservice接口,apiservice连接randservice服务获取数据并返回。如果不做多路复用的话,apiservice和randservice之间的连接数就是客户端请求数,这样apiservice和randservice之间连接过多会导致性能问题。
n link n link+-----------+ +-------------+ +---------------+| <----------> <-----------> || client <----------> apiservice <-----------> randservice || <----------> <-----------> |+-----------+ +-------------+ +---------------+
经过多路复用后,apiservice和randservice之间只有一个连接,这样无论多少个客户端请求都不会导致连接过多问题。
n link 1 link+-----------+ +-------------+ +---------------+| <----------> | | || client <----------> apiservice <-----------> randservice || <----------> | | |+-----------+ +-------------+ +---------------+
(当然这只是个示例场景而已,生产中apiservice和randservice之间使用RPC框架即可,不用我们手动写socket通信)
代码示例
1.随机数服务 randservice.go
package mainimport ( "bytes" "encoding/binary" "fmt" "github.com/rs/zerolog" "github.com/rs/zerolog/log" "github.com/xtaci/smux" "math/rand" "net" "runtime" "time")func init() { rand.Seed(time.Now().UnixNano())}/**一个生成随机数的tcp服务客户端发送'R', 'A', 'N', 'D',服务返回一个随机数*/func main() { listener, err := net.Listen("tcp", ":9000") if err != nil { panic(err) } log.Info().Msg("随机数服务启动,监听9000端口") defer listener.Close() for { conn, err := listener.Accept() if err != nil { fmt.Println(err.Error()) continue } go SessionHandler(conn) }}/**处理会话每个tcp连接生成一个会话session*/func SessionHandler(conn net.Conn) { session, err := smux.Server(conn, nil) if err != nil { panic(err) } log.Info().Msgf("收到客户端连接,创建新会话,对端地址:%s", session.RemoteAddr().String()) for !session.IsClosed() { stream, err := session.AcceptStream() if err != nil { fmt.Println(err.Error()) break } go StreamHandler(stream) } log.Info().Msgf("客户端连接断开,销毁会话,对端地址:%s", session.RemoteAddr().String())}/**流数据处理*/func StreamHandler(stream *smux.Stream) { buffer := make([]byte, 1024) n, err := stream.Read(buffer) if err != nil { log.Error().Msgf("流id:%d,异常信息:%s", stream.ID(), err.Error()) stream.Close() return } cmd := buffer[:n] if bytes.Equal(cmd, []byte{'R', 'A', 'N', 'D'}) { rand := rand.Uint64() response := make([]byte, 8) binary.BigEndian.PutUint64(response, rand) stream.Write(response) log.Debug().Msgf("收到客户端数据,流id:%d,随机数:%d, 响应数据:%v", stream.ID(), rand, response) } else { log.Warn().Msgf("收到未知请求命令,流id:%d,请求命令:%v", stream.ID(), cmd) }}
2.api接口服务 apiservice.go
package mainimport ( "encoding/binary" "fmt" "github.com/rs/zerolog" "github.com/rs/zerolog/log" "github.com/xtaci/smux" "net" "net/http" "runtime")/**随机数服务客户端连接*/var randClient *smux.Sessionfunc init() { //连接后端随机数服务 conn, err := net.Dial("tcp", ":9000") if err != nil { log.Warn().Msg("随机数服务未启动") panic(err) } session, err := smux.Client(conn, nil) if err != nil { log.Error().Msg("打开会话失败") panic(err) } randClient = session}/**一个api网关,对外提供api接口调用随机数服务来获取随机数*/func main() { defer randClient.Close() http.HandleFunc("/rand", RandHandler) http.ListenAndServe(":8080", nil)}/**随机数接口*/func RandHandler(w http.ResponseWriter, r *http.Request) { stream, err := randClient.OpenStream() if err != nil { w.WriteHeader(500) fmt.Fprint(w, err.Error()) } else { log.Debug().Msgf("收到请求,打开流成功,流id:%d", stream.ID()) defer stream.Close() stream.Write([]byte{'R', 'A', 'N', 'D'}) buffer := make([]byte, 1024) n, err := stream.Read(buffer) if err != nil { w.WriteHeader(500) fmt.Fprint(w, err.Error()) } else { response := buffer[:n] var rand = binary.BigEndian.Uint64(response) log.Debug().Msgf("收到服务端数据,流id:%d,随机数:%d, 响应数据:%v", stream.ID(), rand, response) fmt.Fprintf(w, "%d", rand) } }}
原理分析
smux将socket连接封装成session,每次请求响应封装成一个stream,通过自定义协议发送数据
VERSION(1B) | CMD(1B) | LENGTH(2B) | STREAMID(4B) | DATA(LENGTH) VALUES FOR LATEST VERSION:VERSION: 1/2 CMD: cmdSYN(0) cmdFIN(1) cmdPSH(2) cmdNOP(3) cmdUPD(4) // only supported on version 2 STREAMID: client use odd numbers starts from 1 server use even numbers starts from 0 cmdUPD: | CONSUMED(4B) | WINDOW(4B) |
比如我们发送的RAND命令封装成以下数据包发送给服务端,假设请求的STREAMID为11223344
VERSION(1B) | CMD(1B) | LENGTH(2B) | 11223344 | RAND
VERSION(1B) | CMD(1B) | LENGTH(2B) | 11223344 | 0102030405060708
扩展优化
但是这样又导致了另一个问题,由于apiservice和randservice之间只有一个连接,而这一个连接只能由一个goroutine处理,这样就导致性能低下
所以进一步扩展apiservice和randservice之间建立固定数量的连接,如10个连接,用来处理所有的请求,就是通过连接池的方式来性能最大化
改造后的示意图如下:
n link 10 link+-----------+ +-------------+ +---------------+| <----------> <-----------> || client <----------> apiservice <-----------> randservice || <----------> <-----------> |+-----------+ +-------------+ +---------------+
连接池版代码 apiservicewithpool.go
package mainimport ( "context" "encoding/binary" "fmt" cpool "github.com/jolestar/go-commons-pool/v2" "github.com/rs/zerolog" "github.com/rs/zerolog/log" "github.com/xtaci/smux" "net" "net/http" "runtime")var commonPool *cpool.ObjectPoolvar ctx = context.Background()func init() { factory := cpool.NewPooledObjectFactorySimple(NewSessionCpool) commonPool = cpool.NewObjectPoolWithDefaultConfig(ctx, factory) commonPool.Config.MaxTotal = 10}/**连接池生成新会话函数*/func NewSessionCpool(ctx context.Context) (interface{}, error) { log.Debug().Msg("连接池中生成一个连接") //连接后端随机数服务 conn, err := net.Dial("tcp", ":9000") if err != nil { log.Warn().Msg("随机数服务未启动") panic(err) } //随机数服务客户端连接 session, err := smux.Client(conn, nil) if err != nil { log.Error().Msg("打开会话失败") panic(err) } return session, err}/**一个api网关,对外提供api接口调用随机数服务来获取随机数通过sync.Pool实现“连接池” !!! 不推荐这种方式,sync.Pool的种种特性不适合作为连接池*/func main() { http.HandleFunc("/rand", CommonPoolRandHandler) http.ListenAndServe(":8080", nil)}/**随机数接口*/func CommonPoolRandHandler(w http.ResponseWriter, r *http.Request) { obj, err := commonPool.BorrowObject(ctx) if err != nil { w.WriteHeader(500) fmt.Fprint(w, err.Error()) return } client := obj.(*smux.Session) stream, err := client.OpenStream() if err != nil { w.WriteHeader(500) fmt.Fprint(w, err.Error()) } else { log.Debug().Msgf("收到请求,打开流成功,流id:%d", stream.ID()) defer stream.Close() stream.Write([]byte{'R', 'A', 'N', 'D'}) buffer := make([]byte, 1024) n, err := stream.Read(buffer) if err != nil { w.WriteHeader(500) fmt.Fprint(w, err.Error()) } else { response := buffer[:n] var rand = binary.BigEndian.Uint64(response) log.Debug().Msgf("收到服务端数据,流id:%d,随机数:%d, 响应数据:%v", stream.ID(), rand, response) fmt.Fprintf(w, "%d", rand) } } commonPool.ReturnObject(ctx, obj)}
经过连接池改造后的模型就像MySQL或Redis的使用场景,每次请求相当于一个stream,多个stream共用一个session,一个session背后有一个socket连接,程序和MySQL或Redis之间创建多个session放入连接池中,每次请求从连接池中拿出session进行读写操作
文章来源:智云一二三科技
文章标题:golang socket连接复用 – smux
文章地址:https://www.zhihuclub.com/6723.shtml