摘要:leaf服务器通过tcp连接并收取消息。
结构
1 | // tcp_server.go |
1 | // tcp_msg.go |
在路由器设置时如果config中配置了TCPAddr则会调用tcp_server.go的Start函数来启动TCP服务。
TCP连接
TCP启动通过gate模块调用TCPServer的Start方法来实现。
TCPserver的Start方法
1 | // tcp_server.go |
Start方法主要包含两部分,init方法和run方法。
init方法
init方法主要包含三个部分:监听ip地址的指定端口、检查设置合法性、设置TCPServer的一些参数。
监听地址1
2
3
4
5// tcp_server.go
ln, err := net.Listen("tcp", server.Addr)
if err != nil {
log.Fatal("%v", err)
}
检查最大连接数、‘写通道’容量及Agent
1 | // tcp_server.go |
对TCPServer进行一些参数初始化设置
1 | // tcp_server.go |
run方法
在Start方法中是在一个新的协程中调用的run方法。
run方法主要是通过for循环无限循环接收新的连接,并对每个连接启动新的协程进行处理。
for循环内部内容:
- 接收连接请求
1 | // tcp_server.go |
- 检查连接数是否超过最大连接数,并将连接存入TCPServer的conns字段。
1 | // tcp_server.go |
- 创建Agent
1 | // tcp_server.go |
- 协程中执行agent.Run()
1 | // tcp_server.go |
开始接收消息
agent的Run方法,其中agent结构为:
1 | // gate.go |
agent的Run()方法1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21func (a *agent) Run() {
for {
data, err := a.conn.ReadMsg()
if err != nil {
log.Debug("read message: %v", err)
break
}
if a.gate.Processor != nil {
msg, err := a.gate.Processor.Unmarshal(data)
if err != nil {
log.Debug("unmarshal messagefff error: %v", err)
break
}
err = a.gate.Processor.Route(msg, a)
if err != nil {
log.Debug("route messagefff error: %v", err)
break
}
}
}
}
- 循环接收消息
1 | data, err := a.conn.ReadMsg() |
ReadMsg方法,因为agent的conn是在上面创建Agent时通过newTCPConn
函数创建的,所以结构为*TCPConn
,所以调用的ReadMsg方法为TCPConn的ReadMsg方法。
1 | // tcp_conn.go |
MsgParser的Read方法`func (p MsgParser) Read(conn *TCPConn) ([]byte, error)`
获取消息长度的byte值1
2
3
4
5
6
7
8// tcp_msg.go
var b [4]byte
bufMsgLen := b[:p.lenMsgLen]
// read len
if _, err := io.ReadFull(conn, bufMsgLen); err != nil {
return nil, err
}
ReadFull从conn精确地读取len(bufMsgLen)字节数据填充进bufMsgLen。函数返回
写入的字节数和错误(如果没有读取足够的字节)。只有没有读取到字节时才可能
返回EOF;如果读取了有但不够的字节时遇到了EOF,函数会返回ErrUnexpectedEOF。
只有返回值err为nil时,返回值n才会等于len(bufMsgLen)。
根据大小端解析bufMsgLen为int值1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17// tcp_msg.go
switch p.lenMsgLen {
case 1:
msgLen = uint32(bufMsgLen[0])
case 2:
if p.littleEndian {
msgLen = uint32(binary.LittleEndian.Uint16(bufMsgLen))
} else {
msgLen = uint32(binary.BigEndian.Uint16(bufMsgLen))
}
case 4:
if p.littleEndian {
msgLen = binary.LittleEndian.Uint32(bufMsgLen)
} else {
msgLen = binary.BigEndian.Uint32(bufMsgLen)
}
}
这里需要对littleEndian
这个字段做一些说明:
有几个结构中有littleEndian
这个字段。
type Gate struct
type TCPServer struct
type MsgParser struct
这几个结构中的littleEendian
,Gate赋值给TCPServer,TCPServer赋值给MsgParser,所以根本上来自于Gate(Gate读取的conf.go中的配置)。
另外在使用protobuf时,protobuf.go中的type Processor struct
结构中也存在littleEndian
,这个实在调用protobuf中的NewProcessor
函数时赋值的默认为false,但这个littleEndian
只是负责protobuf解析时是否使用小端序。所以如果在使用protobuf且使用小端序时需要设置
Processor中的littleEndian为true(Processor提供了SetByteOrder方法),且conf.go中需要设置LittleEndian为true。
判断消息长度合法性1
2
3
4
5
6
7// tcp_msg.go
// check len
if msgLen > p.maxMsgLen {
return nil, errors.New("message too long")
} else if msgLen < p.minMsgLen {
return nil, errors.New("message too short")
}
读取消息数据1
2
3
4
5
6// tcp_msg.go
// data
msgData := make([]byte, msgLen)
if _, err := io.ReadFull(conn, msgData); err != nil {
return nil, err
}
到此数据的得去就完成了,然后是数据的解析。