<?php
namespace Message\Controller;
use Think\Controller;
use Thrift\Exception\TException;
use Thrift\Protocol\TBinaryProtocol;
use Thrift\Transport\TBufferedTransport;
use Thrift\Transport\THttpClient;
use Thrift\Transport\TPhpStream;
use Thrift\TMultiplexedProcessor;
use Thrift\Protocol\TMultiplexedProtocol;
use Message\Services\MessageServie;
use Rpc\Msg\MessageClient;
use Rpc\Msg\MessageProcessor;
use Thrift\Factory\TBinaryProtocolFactory;
use Thrift\Factory\TTransportFactory;
use Thrift\Server\TServerSocket;
use Thrift\Server\TSimpleServer;
use Thrift\Server\TForkingServer;
use Thrift\Transport\TSocket;
server
public function message_rpc()
{
try {
// 初始化多个服务提供者handle
$messageprocessor = new \Rpc\Msg\MessageProcessor(new MessageServie());
// 创建多个服务Processor
$sendProcessor = new \Rpc\Msg\SendMsgProcessor(new \Message\Services\SendMsgServie());
// 将服务注册到TMultiplexedProcessor中
$tFactory = new TTransportFactory();
$pFactory = new TBinaryProtocolFactory(true, true);
$processor = new TMultiplexedProcessor();
// 将服务注册到TMultiplexedProcessor中
//队列消费者rpc请求
$processor->registerProcessor("MessageApiAction", $messageprocessor);
//消息发送rpc请求
$processor->registerProcessor("sendMsg", $sendProcessor);
// 初始化数据传输方式transport
// 利用该传输方式初始化数据传输格式protocol
// 监听开始
$transport = new TServerSocket('0.0.0.0', '9988');
// $processor->process($pFactory, $pFactory);
$server = new TForkingServer($processor, $transport, $tFactory, $tFactory, $pFactory, $pFactory);
$server->serve();
} catch (TException $tx) {
\Think\Log::write($tx->getMessage());
} catch(\Exception $e){
\Think\Log::write($e->getMessage());
}
}
client
public function local()
{
try {
ini_set('memory_limit', '1024M');
$socket = new TSocket('192.168.1.188', '9988');
$socket->setRecvTimeout(50000);
$socket->setDebug(true);
$transport = new TBufferedTransport($socket, 1024, 1024);
$protocol = new TBinaryProtocol($transport);
$client = new MessageClient(new TMultiplexedProtocol($protocol, "MessageApiAction"));
// $client = new MessageClient($protocol);
$transport->open();
$result = $client->MessageApiAction('message api');
print_r($result);
$transport->close();
} catch (TException $tx) {
print_r($tx->getMessage());
}
}
public function send_msg()
{
try {
ini_set('memory_limit', '1024M');
$socket = new TSocket('192.168.1.188', '9988');
$socket->setRecvTimeout(50000);
$socket->setDebug(true);
$transport = new TBufferedTransport($socket, 1024, 1024);
$protocol = new TBinaryProtocol($transport);
$client = new \Rpc\Msg\SendMsgClient(new TMultiplexedProtocol($protocol, "sendMsg"));
// $client = new \Rpc\Msg\SendMsgClient($protocol);
$transport->open();
$result = $client->sendMsg('send msg');
print_r($result);
$transport->close();
} catch (TException $tx) {
print_r($tx->getMessage());
}
}