0
点赞
收藏
分享

微信扫一扫

分布式服务框架学习笔记7.3 Thrift 池

眼君 2022-06-28 阅读 74


分布式服务框架学习笔记7.3 Thrift 池

  • ​​一、服务端​​
  • ​​二、客户端​​

Thrift池可以优化 RPC 服务资源占用。

一、服务端

import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.server.TNonblockingServer;
import org.apache.thrift.server.TServer;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TTransportException;

import com.whr.rpc.service.MessageForwardsService;

public class MessageForwardsServer {

public static MessageForwardsRpcInterface handler;

@SuppressWarnings("rawtypes")
public static MessageForwardsService.Processor processor;

public static void start() {
try {
System.out.println("async TNonblockingServer start ....");

final TProcessor tprocessor = new MessageForwardsService.Processor<MessageForwardsService.Iface>(
new MessageForwardsRpcInterface());

Runnable simple = new Runnable() {
public void run() {
simple(tprocessor);
}
};

new Thread(simple).start();
} catch (Exception e) {
System.out.println("Server start error!!!");
e.printStackTrace();
}

}

// /**
// * RPC 服务端
// * @param processor
// */
public static void simple(TProcessor tprocessor) {

TNonblockingServerSocket tnbSocketTransport;
try {
tnbSocketTransport = new TNonblockingServerSocket(9090);

TNonblockingServer.Args tnbArgs = new TNonblockingServer.Args(
tnbSocketTransport);
tnbArgs.processor(tprocessor);
tnbArgs.transportFactory(new TFramedTransport.Factory());
tnbArgs.protocolFactory(new TCompactProtocol.Factory());

// 使用非阻塞式IO,服务端和客户端需要指定TFramedTransport数据传输的方式
TServer server = new TNonblockingServer(tnbArgs);
server.serve();
} catch (TTransportException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

二、客户端

pom.xml

<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-pool2 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.4.3</version>
</dependency>

ControlCommand.java

import java.util.NoSuchElementException;

import org.apache.commons.pool2.ObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.thrift.protocol.TProtocol;

import com.whr.rpc.service.MessageForwardsService;


/**
* Hello world!
*
*/
public class ControlCommand
{
public static void main( String[] args )
{
test("v2-1","192.10.200.203");
//test("v2-2","59.110.22.202");
}

public static void test(String server,String ip){
GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
poolConfig.setMaxTotal(10);
poolConfig.setMinIdle(1);
poolConfig.setTestOnBorrow(true);

ObjectPool<TProtocol> pool = new AutoClearGenericObjectPool<TProtocol>(
new TProtocolFactory("192.10.200.203", 9090, true), poolConfig);
TProtocol protocol;
try {
protocol = pool.borrowObject();
MessageForwardsService.Client client = new MessageForwardsService.Client(protocol);
client.getAllChannels();
pool.returnObject(protocol);
} catch (NoSuchElementException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IllegalStateException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

AutoClearGenericObjectPool.java

import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

/**
* Created by yangjunming on 6/7/16.
*/
public class AutoClearGenericObjectPool<T> extends GenericObjectPool<T> {

public AutoClearGenericObjectPool(PooledObjectFactory<T> factory) {
super(factory);
}

public AutoClearGenericObjectPool(PooledObjectFactory<T> factory, GenericObjectPoolConfig config) {
super(factory, config);
}

@Override
public void returnObject(T obj) {
super.returnObject(obj);
//空闲数>=激活数时,清理掉空闲连接
if (getNumIdle() >= getNumActive()) {
clear();
}
}

}

TProtocolFactory.java

import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;

public class TProtocolFactory
extends BasePooledObjectFactory<TProtocol> {


private String host;
private int port;
private boolean keepAlive = true;

public TProtocolFactory(String host, int port, boolean keepAlive) {
this.host = host;
this.port = port;
this.keepAlive = keepAlive;
}

@Override
public TProtocol create() throws TTransportException {
TSocket tSocket = new TSocket(host, port);
TTransport tTransport = new TFramedTransport(tSocket);
tTransport.open();
return new TCompactProtocol(tTransport);
}


@Override
public PooledObject<TProtocol> wrap(TProtocol protocol) {
return new DefaultPooledObject<TProtocol>(protocol);
}

/**
* 对象钝化(即:从激活状态转入非激活状态,returnObject时触发)
*
* @param pooledObject
* @throws TTransportException
*/
@Override
public void passivateObject(PooledObject<TProtocol> pooledObject) throws TTransportException {
if (!keepAlive) {
pooledObject.getObject().getTransport().flush();
pooledObject.getObject().getTransport().close();
}
}


/**
* 对象激活(borrowObject时触发)
*
* @param pooledObject
* @throws TTransportException
*/
@Override
public void activateObject(PooledObject<TProtocol> pooledObject) throws TTransportException {
if (!pooledObject.getObject().getTransport().isOpen()) {
pooledObject.getObject().getTransport().open();
}
}


/**
* 对象销毁(clear时会触发)
* @param pooledObject
* @throws TTransportException
*/
@Override
public void destroyObject(PooledObject<TProtocol> pooledObject) throws TTransportException {
passivateObject(pooledObject);
pooledObject.markAbandoned();
}


/**
* 验证对象有效性
*
* @param p
* @return
*/
@Override
public boolean validateObject(PooledObject<TProtocol> p) {
if (p.getObject() != null) {
if (p.getObject().getTransport().isOpen()) {
return true;
}
try {
p.getObject().getTransport().open();
return true;
} catch (TTransportException e) {
e.printStackTrace();
}
}
return false;
}
}


举报

相关推荐

0 条评论