张松然
京东商城 POP平台系统架构师。对构建高性能,高可用的大规模分布系统有丰富的开发经验,有多年NIO领域的设计、开发经验,对HTTP、TCP长连接技术有深入研究与领悟。

TCP 网关

本文将为大家介绍一个基于 Netty + Protobuf 构建的高性能 TCP 网关开源组件。该组件部署业务化运行2年以上,实现TCP 双向通道通信,维持高并发在线长连接,优化传输字节码等。

安装

GitHub 克隆这个工程,并将它作为一个依赖包添加到 Maven 项目中。

使用

1. 创建 TCP 服务

基于 Spring 配置文件:spring-tcp-server.xml 启动服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd"
default-autowire="byName">

<!-- tcp server config start. -->
<bean id="tcpServer" class="com.linkedkeeper.tcp.connector.tcp.server.TcpServer" init-method="init"
destroy-method="shutdown">
<!-- port is tcp server port -->
<property name="port" value="2000"/>
</bean>
<bean id="tcpSessionManager" class="com.linkedkeeper.tcp.connector.tcp.TcpSessionManager">
<property name="maxInactiveInterval" value="500"/>
<!-- you can add listener to listen session event, include session create, destroy and so on. -->
<property name="sessionListeners">
<list>
<ref bean="logSessionListener"/>
</list>
</property>
</bean>
<!-- logSessionListener is related tcpSessionManager, those listener should implements SessionListener -->
<bean id="logSessionListener" class="com.linkedkeeper.tcp.connector.api.listener.LogSessionListener"/>
<!-- tcp sender is a container that can send message to client from server -->
<bean id="tcpSender" class="com.linkedkeeper.tcp.remoting.TcpSender">
<constructor-arg ref="tcpConnector"/>
</bean>
<!-- server config is combine the config, don't modify -->
<bean id="serverConfig" class="com.linkedkeeper.tcp.connector.tcp.config.ServerTransportConfig">
<constructor-arg ref="tcpConnector"/>
<constructor-arg ref="proxy"/>
<constructor-arg ref="notify"/>
</bean>
<!-- tcp connector is container that manage the connection between server and client -->
<bean id="tcpConnector" class="com.linkedkeeper.tcp.connector.tcp.TcpConnector" init-method="init"
destroy-method="destroy"/>
<!-- notify proxy is proxy that implement send notify to client -->
<bean id="notify" class="com.linkedkeeper.tcp.notify.NotifyProxy">
<constructor-arg ref="tcpConnector"/>
</bean>
<!-- default tcp server config end. -->

<!-- this proxy is your proxy that can receive message from client -->
<bean id="proxy" class="com.linkedkeeper.tcp.server.TestSimpleProxy"/>
</beans>

说明:

  • TcpServer:提供 TCP 连接的服务

  • TcpSessionManager:你可以添加监听事件,用于监听 TCP 会话的创建、销毁等

  • LogSessionListener:一个日志监听器,它和 tcpSessionManager 关联,监听器必须事先 SessionListener

  • TcpSender:TCP 发送者,用于向客户端发送通知消息,实现下行逻辑

  • ServerConfig:TCP 的配置管理类

  • TcpConnector:TCP 容器,用于管理服务端和客户端的连接

  • NotifyProxy:发送通知的代理类

上面的配置都是默认的,你可以不更改,但是你可能需要换个 TCP 端口。

例子1 创建测试代理用于从客户端接收消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.linkedkeeper.tcp.connector.tcp.codec.MessageBuf;
import com.linkedkeeper.tcp.data.Login;
import com.linkedkeeper.tcp.data.Protocol;
import com.linkedkeeper.tcp.invoke.ApiProxy;
import com.linkedkeeper.tcp.message.MessageWrapper;
import com.linkedkeeper.tcp.message.SystemMessage;

public class TestSimpleProxy implements ApiProxy {

public MessageWrapper invoke(SystemMessage sMsg, MessageBuf.JMTransfer message) {
ByteString body = message.getBody();

if (message.getCmd() == 1000) {
try {
Login.MessageBufPro.MessageReq messageReq
= Login.MessageBufPro.MessageReq.parseFrom(body);
if (messageReq.getCmd().equals(Login.MessageBufPro.CMD.CONNECT)) {
return new MessageWrapper(MessageWrapper.MessageProtocol.CONNECT,
message.getToken(), null);
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
} else if (message.getCmd() == 1002) {
try {
Login.MessageBufPro.MessageReq messageReq
= Login.MessageBufPro.MessageReq.parseFrom(body);
if (messageReq.getCmd().equals(Login.MessageBufPro.CMD.HEARTBEAT)) {
MessageBuf.JMTransfer.Builder resp = Protocol.generateHeartbeat();
return new MessageWrapper(MessageWrapper.MessageProtocol.HEART_BEAT,
message.getToken(), resp);
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
return null;
}
}

输入参数

SystemMessage:TcpServer 构建的参数,包括远端地址,本地地址等

MessageBuf.JMTransfer:这个很重要,这是由 Protobuf 创建的,它包括 header 和 body,header 是系统及参数,你可以从项目中看到具体参数,body 是 protobuf 序列化的字节码,也是有 protobuf 类生成

输出参数

MessageWrapper:这是一个消息响应的包装类,它包括 protocol,sessionId 和 body。body 是一个 bytes,它和 protobuf 对应。

例子2 发送通知到客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
private NotifyProxy notify;

final int timeout = 10 * 1000;
final int NOTIFY = 3;

public boolean send(long seq, String sessionId, int cmd, ByteString body) throws Exception {
boolean success = false;
MessageBuf.JMTransfer.Builder builder = generateNotify(sessionId, seq, cmd, body);
if (builder != null) {
MessageWrapper wrapper = new MessageWrapper(MessageWrapper.MessageProtocol.NOTIFY, sessionId, builder);
int ret = notify.notify(seq, wrapper, timeout);
if (ret == Constants.NOTIFY_SUCCESS) {
success = true;
} else if (ret == Constants.NOTIFY_NO_SESSION) {
/** no session on this machine **/
success = true;
}
} else {
/** no session in the cache **/
success = true;
}
return success;
}

/**
* session
*/
final String VERSION = "version";
final String DEVICE_ID = "deviceId";
final String PLATFORM = "platform";
final String PLATFORM_VERSION = "platformVersion";
final String TOKEN = "token";
final String APP_KEY = "appKey";
final String TIMESTAMP = "timestamp";
final String SIGN = "sign";

/**
* need session into redis, then when you notify you can get info from redis by session
*/
final Map<String, Map<String, Object>> testSessionMap = null;

protected MessageBuf.JMTransfer.Builder generateNotify(String sessionId, long seq, int cmd, ByteString body)
throws Exception {
Map<String, Object> map = testSessionMap.get(sessionId);

MessageBuf.JMTransfer.Builder builder = MessageBuf.JMTransfer.newBuilder();
builder.setVersion(String.valueOf(map.get(VERSION)));
builder.setDeviceId(String.valueOf(map.get(DEVICE_ID)));
builder.setCmd(cmd);
builder.setSeq(seq);
builder.setFormat(NOTIFY);
builder.setFlag(0);
builder.setPlatform(String.valueOf(map.get(PLATFORM)));
builder.setPlatformVersion(String.valueOf(map.get(PLATFORM_VERSION)));
builder.setToken(String.valueOf(map.get(TOKEN)));
builder.setAppKey(String.valueOf(map.get(APP_KEY)));
builder.setTimeStamp(String.valueOf(map.get(TIMESTAMP)));
builder.setSign(String.valueOf(map.get(SIGN)));
builder.setBody(body);

return builder;
}

2. 创建 TCP 客户端

支持 iOS,Android,C++ 等语言构建的客户端

3. 序列化 Protobuf

Java

1
/protobuf/protoc --proto_path=/protobuf/ --java_out=/protobuf/MessageBuf.proto

Object-C

1
protoc --plugin=/protobuf/protoc-gen-objc MessageBuf.proto --object_out="/protobuf/"

附件:

你可以点击 下载 protobuf 编译器

来源:http://www.linkedkeeper.com/detail/blog.action?bid=1026