Thrift 网络模型 (java)
从驱动形式上,Thrift 提供的网络服务模型分为:单线程、多线程、时间驱动。
从另一个角度分为:阻塞服务模型、非阻塞服务模型。
- 阻塞服务模型: TSimpleServer, TThreadPoolServer
- 非阻塞服务模型: TNonblockingServer, THsHaServer, TThreadedSelectorServer
下面以 Java 开发的角度分析各种网络服务模型的使用场景、性能情况。
TSimpleServer
TSimpleServer 的工作模式采用最简单的阻塞 IO。其实现方法简洁明了,便于理解,但是一次只能接收和处理一个 Socket 连接,效率比较低。 它主要用于演示 Thrift 的工作过程,在实际开发工程中很少用到。
package org.example.service;
import com.example.thrift.UserService;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TSimpleServer;
import org.apache.thrift.transport.TServerSocket;
import org.apache.thrift.transport.TTransportException;
import org.example.service.processor.UserServiceImpl;
public class SimpleService {
public static void main(String[] args) {
try (TServerSocket socket = new TServerSocket(19090); // 传输层
){
TBinaryProtocol.Factory protocolFactory = new TBinaryProtocol.Factory(); // 协议层
UserService.Processor<UserServiceImpl> processor = new UserService.Processor<>(new UserServiceImpl()); // 处理层
// 设置服务器参数
TServer.Args serverArgs = new TServer.Args(socket); // 传输层
serverArgs.protocolFactory(protocolFactory); // 协议层
serverArgs.processor(processor); // 处理层
// 启动服务器 (单线程,一般用于测试)
TSimpleServer server = new TSimpleServer(serverArgs);
server.serve();
} catch (TTransportException e) {
throw new RuntimeException(e);
}
}
}
package org.example.client;
import com.example.thrift.User;
import com.example.thrift.UserService;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
public class SimpleClient {
public static void main(String[] args) {
String host = "localhost";
int port = 19090;
try (TTransport socket = new TSocket(host, port); // 传输层,阻塞io
) {
TProtocol protocol = new TBinaryProtocol(socket); // 协议层,二进制编码格式
UserService.Client client = new UserService.Client(protocol);
// Connect!
socket.open();
try {
User user = client.getById(1);
System.out.println("user = " + user);
} catch (TException e) {
throw new RuntimeException(e);
}
} catch (TTransportException e) {
throw new RuntimeException(e);
}
}
}
TThreadPoolServer
TThreadPoolServer 模式采用阻塞 Socket 方式工作,主线程负责阻塞式监听是否有新 Socket 到来,具体的业务处理交由一个线程池来处理。 (类似 nio)
优点
拆分了监听线程(Accept Thread)和处理客户端连接的工作线程(Worker Thread)。数据读取和业务处理都交给线程池处理。因此在存在并发时新连接也能够被及时接受。 (线程池模式比较适合用在服务器端能够预知客户端并发量的情况下,这时每个请求都能被业务线程池及时处理)
缺点
线程池模式的处理能力受限于线程池的工作能力,当并发请求数量大于线程池中的线程数量时,新请求也只能排队等待。 另外,默认线程池允许创建的最大线程数量为 Intger.MAX_VALUE
,这可能会创建出大量线程,导致 OOM(内存溢出) 风险。
TNonblockingServer
TNonblockingServer 模式也是单线程工作,但是采用 NIO 模式,利用 IO 多路复用模型处理 Socket 就绪事件,对于有数据来到的 Socket 进行数据读取操作,对于有数据发送的 Socket 则进行数据发送操作,对于监听 Socket 则产生一个新业务 Socket 并将其注册到 Selector 上。
注
TNonblockingServer 要求底层的传输通道必须使用 TFramedTransport。
package org.example.service;
import com.example.thrift.UserService;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.TNonblockingServer;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.transport.layered.TFramedTransport;
import org.example.service.processor.UserServiceImpl;
public class NonblockingService {
public static void main(String[] args) {
try (TNonblockingServerSocket socket = new TNonblockingServerSocket(19090)){
UserService.Processor<UserServiceImpl> processor = new UserService.Processor<>(new UserServiceImpl());
TBinaryProtocol.Factory protocolFactory = new TBinaryProtocol.Factory();
TFramedTransport.Factory transportFactory = new TFramedTransport.Factory();
// server
TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(socket);
serverArgs.processor(processor);
serverArgs.protocolFactory(protocolFactory);
serverArgs.transportFactory(transportFactory);
TNonblockingServer server = new TNonblockingServer(serverArgs);
server.serve();
} catch (TTransportException e) {
throw new RuntimeException(e);
}
}
}
package org.example.client;
import com.example.thrift.User;
import com.example.thrift.UserService;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.transport.layered.TFramedTransport;
public class NonblockingClient {
public static void main(String[] args) {
String host = "localhost";
int port = 19090;
try (TSocket socket = new TSocket(host, port)){
TFramedTransport transport = new TFramedTransport(socket);
TBinaryProtocol protocol = new TBinaryProtocol(transport);
UserService.Client client = new UserService.Client(protocol);
// Connect!
socket.open();
try {
User user = client.getById(1);
System.out.println("user = " + user);
} catch (TException e) {
throw new RuntimeException(e);
}
} catch (TTransportException e) {
throw new RuntimeException(e);
}
}
}
优点
相比 TSimpleServer 效率提升主要体现在 IO 多路复用上,TNonblockingServer 采用非阻塞 IO,对 accept/read/write 等 IO 事件进行监控和处理,同时监控多个 Socket 状态的变化。
缺点
TNonblockingServer 模式在业务处理上还是采用单线程顺序来完成。在业务处理比较复杂、耗时的时候效率不高,例如某些接口函数需要读取数据库执行时间较长,会导致整个服务器被阻塞。
THsHaServer
鉴于 TNonblockingServer 的缺点,THsHaServer 继承于 TNonblockingServer,引入了线程池提高了任务处理的并发能力。 (Reactor 模型)
注
THsHaServer 和 TNonblockingServer 一样,要求底层的传输通道必须使用 TFramedTransport。
其他都一样,就是读 Socket 时的处理后面加了线程池。
优点
THsHaServer 于 TNonblockingServer 模式相比,THsHaServer 在完成数据读取之后,将业务处理过程交给线程池处理,主线程直接返回进行下一个循环操作,效率大大提升。
缺点
主线程仍然需要完成所有 Socket 的监听接收、数据读取、数据写入操作。当并发请求数较大时,且发送数据量较多时,监听 Socket 上新连接请求不能被及时接受。
TThreadedSelectorServer
TThreadedSelectorServer 是对 THsHaServer 的一种补充。它将 Selector 中的读写 IO 事件(read/wirete)从主线程中分离出来。同时引入 Worker 工作线程池。
TThreadedSelectorServer 模式是目前 Thrift 提供的最高级的线程服务模型,它内部有如下几个部分构成:
- 一个 AcceptThread 专门用于处理监听 Socket 上新的连接。
- 若干个 SelectorThread 专门用于处理业务 Socket 的网络 I/O 读写操作,所有网络数据的读写均是由这些线程来完成。
- 一个负载均衡器 SelectorThreadLoadBalancer 对象,主要用于 AcceptThread 线程接收一个新的 Socket 连接请求时,决定将这个新连接请求分配给哪个 SelectorThread 线程处理。
多个 Selector 的处理就很像 Tomcat7 中的处理。但 Tomcat8 中认为多个 Selector 对新能提升不大,所以在以后版本中又改回由一个 Selector 处理全部 IO。