FastSocket这个东西上次我已经说过,它使用简单,功能强大,扩展灵活,目前在新浪的生产环境中已经被广泛使用,所以它的性能,安全等各方面我们绝对可以信赖,今天我们来说一个话题,和上一讲有关,这次我们制作一个基于FastSocket的传输协议,它的意义重大,当fastSocket提供的协议不能满足项目要求时,我们就必须硬着头皮去自己写了,还好,fastsocket为我们铺好了路,我们只要按着这条路走下去,就可以了。
自定义的协议格式如下
说干就干
首先,如果要想扩展一个自己的协议,要对 client和server端分别进行开发,下面我们来看一下client的开发
我们要添加的类有三个文件组成,分别是DSSBinaryProtocol,DSSBinaryResponse和一个使用这个协议的客户端入口DSSBinarySocketClient
DSSBinaryProtocol
/// <summary>
/// 异步二进制协议
/// 协议格式
/// [Message Length(int32)][SeqID(int32)][ProjectID(int16)][Cmd Length(int16)][VersonNumber Length(int16)][Cmd + VersonNumber + Body Buffer]
/// 其中参数TableName和VersonNumber长度为40,不够自动在左侧补空格
/// </summary>
public sealed class DSSBinaryProtocol : IProtocol<DSSBinaryResponse>
{
#region IProtocol Members
/// <summary>
/// find response
/// </summary>
/// <param name="connection"></param>
/// <param name="buffer"></param>
/// <param name="readlength"></param>
/// <returns></returns>
/// <exception cref="BadProtocolException">bad async binary protocl</exception>
public DSSBinaryResponse FindResponse(IConnection connection, ArraySegment<byte> buffer, out int readlength)
{
if (buffer.Count < 4) { readlength = 0; return null; }
//获取message length
var messageLength = NetworkBitConverter.ToInt32(buffer.Array, buffer.Offset);
if (messageLength < 7) throw new BadProtocolException("bad async binary protocl");
readlength = messageLength + 4;
if (buffer.Count < readlength) { readlength = 0; return null; }
var seqID = NetworkBitConverter.ToInt32(buffer.Array, buffer.Offset + 4);
var projectID = NetworkBitConverter.ToInt16(buffer.Array, buffer.Offset + 8);
var flagLength = NetworkBitConverter.ToInt16(buffer.Array, buffer.Offset + 10);
var versonLength = NetworkBitConverter.ToInt16(buffer.Array, buffer.Offset + 12);
var strName = Encoding.UTF8.GetString(buffer.Array, buffer.Offset + 14, flagLength);
var versonNumber = Encoding.UTF8.GetString(buffer.Array, buffer.Offset + 14 + flagLength, versonLength);
var dataLength = messageLength - 10 - flagLength - versonLength;
byte[] data = null;
if (dataLength > 0)
{
data = new byte[dataLength];
Buffer.BlockCopy(buffer.Array, buffer.Offset + 14 + flagLength + versonLength, data, 0, dataLength);
}
return new DSSBinaryResponse(seqID, projectID, strName, versonNumber, data);
}
#endregion
}
View Code
DSSBinaryResponse
/// <summary>
/// 数据同步系统DSS使用的Socket协议,我们称为DSSBinary协议
/// [Message Length(int32)][SeqID(int32)][ProjectID(int16)][Cmd Length(int16)][VersonNumber Length(int16)][Cmd + VersonNumber + Body Buffer]
/// </summary>
public class DSSBinaryResponse : IResponse
{
/// <summary>
/// 流水ID
/// </summary>
public int SeqID { get; private set; }
/// <summary>
/// 项目类型编号
/// </summary>
public short ProjectID { get; set; }
/// <summary>
/// 本次传输的版本号,所有客户端唯一[项目名称(4字节)+guid(36字节)]
/// </summary>
public string VersonNumber { get; private set; }
/// <summary>
/// 命令名称
/// </summary>
public string Flag { get; private set; }
/// <summary>
/// 要操作的表对象,以字节数组形式进行传输
/// </summary>
public readonly byte[] Buffer = null;
public DSSBinaryResponse(int seqID,
short projectID,
string flag,
string versonNumber,
byte[] buffer)
{
this.SeqID = seqID;
this.ProjectID = projectID;
this.VersonNumber = versonNumber;
this.Flag = flag;
this.Buffer = buffer;
}
}
View Code
DSSBinarySocketClient
/// <summary>
/// 异步socket客户端
/// </summary>
public class DSSBinarySocketClient : PooledSocketClient<DSSBinaryResponse>
{
#region Constructors
/// <summary>
/// new
/// </summary>
public DSSBinarySocketClient()
: base(new DSSBinaryProtocol())
{
}
/// <summary>
/// new
/// </summary>
/// <param name="socketBufferSize"></param>
/// <param name="messageBufferSize"></param>
public DSSBinarySocketClient(int socketBufferSize, int messageBufferSize)
: base(new DSSBinaryProtocol(), socketBufferSize, messageBufferSize, 3000, 3000)
{
}
/// <summary>
/// new
/// </summary>
/// <param name="socketBufferSize"></param>
/// <param name="messageBufferSize"></param>
/// <param name="millisecondsSendTimeout"></param>
/// <param name="millisecondsReceiveTimeout"></param>
public DSSBinarySocketClient(int socketBufferSize,
int messageBufferSize,
int millisecondsSendTimeout,
int millisecondsReceiveTimeout)
: base(new DSSBinaryProtocol(),
socketBufferSize,
messageBufferSize,
millisecondsSendTimeout,
millisecondsReceiveTimeout)
{
}
#endregion
#region Public Methods
public Task<TResult> Send<TResult>(string cmdName, short projectID, string versonNumber, byte[] payload,
Func<DSSBinaryResponse, TResult> funcResultFactory, object asyncState = null)
{
return this.Send(null, cmdName, projectID, versonNumber, payload, funcResultFactory, asyncState);
}
public Task<TResult> Send<TResult>(byte[] consistentKey, string cmdName, short projectID, string versonNumber, byte[] payload,
Func<DSSBinaryResponse, TResult> funcResultFactory, object asyncState = null)
{
if (string.IsNullOrEmpty(cmdName)) throw new ArgumentNullException("cmdName");
if (funcResultFactory == null) throw new ArgumentNullException("funcResultFactory");
var seqID = base.NextRequestSeqID();
var cmdLength = cmdName.Length;
var versonNumberLength = versonNumber.Length;
var messageLength = (payload == null ? 0 : payload.Length) + cmdLength + versonNumberLength + 10;
var sendBuffer = new byte[messageLength + 4];
//write message length
Buffer.BlockCopy(NetworkBitConverter.GetBytes(messageLength), 0, sendBuffer, 0, 4);
//write seqID.
Buffer.BlockCopy(NetworkBitConverter.GetBytes(seqID), 0, sendBuffer, 4, 4);
//write proejctID
Buffer.BlockCopy(NetworkBitConverter.GetBytes(projectID), 0, sendBuffer, 8, 2);
//write response flag length.
Buffer.BlockCopy(NetworkBitConverter.GetBytes((short)cmdLength), 0, sendBuffer, 10, 2);
//write verson length
Buffer.BlockCopy(NetworkBitConverter.GetBytes((short)versonNumberLength), 0, sendBuffer, 12, 2);
//write response cmd
Buffer.BlockCopy(Encoding.ASCII.GetBytes(cmdName), 0, sendBuffer, 14, cmdLength);
//write response versonNumber
Buffer.BlockCopy(Encoding.ASCII.GetBytes(versonNumber), 0, sendBuffer, 14 + cmdLength, versonNumberLength);
//write body buffer
if (payload != null && payload.Length > 0)
Buffer.BlockCopy(payload, 0, sendBuffer, 14 + cmdLength + versonNumberLength, payload.Length);
var source = new TaskCompletionSource<TResult>(asyncState);
base.Send(new Request<DSSBinaryResponse>(consistentKey, seqID, cmdName, sendBuffer,
ex => source.TrySetException(ex),
response =>
{
TResult result;
try { result = funcResultFactory(response); }
catch (Exception ex) { source.TrySetException(ex); return; }
source.TrySetResult(result);
}));
return source.Task;
}
#endregion
}
View Code
然后,我们再来说一下server端的开发,它有两个文件组成,分别是DSSBinaryCommandInfo,DSSBinaryProtocol
DSSBinaryCommandInfo
/// <summary>
/// async binary command info.
/// </summary>
public class DSSBinaryCommandInfo : ICommandInfo
{
#region Constructors
/// <summary>
/// new
/// </summary>
/// <param name="cmdName"></param>
/// <param name="seqID"></param>
/// <param name="buffer"></param>
/// <exception cref="ArgumentNullException">cmdName is null or empty.</exception>
public DSSBinaryCommandInfo(int seqID, short projectID, string cmdName, string versonNumber, byte[] buffer)
{
if (string.IsNullOrEmpty(cmdName)) throw new ArgumentNullException("cmdName");
if (string.IsNullOrEmpty(versonNumber)) throw new ArgumentNullException("versonNumber");
this.VersonNumber = versonNumber;
this.CmdName = cmdName;
this.SeqID = seqID;
this.ProjectID = projectID;
this.Buffer = buffer;
}
#endregion
#region Public Properties
/// <summary>
/// 版本号
/// </summary>
public string VersonNumber
{
get;
private set;
}
public short ProjectID { get; private set; }
/// <summary>
/// get the current command name.
/// </summary>
public string CmdName
{
get;
private set;
}
/// <summary>
/// seq id.
/// </summary>
public int SeqID
{
get;
private set;
}
/// <summary>
/// 主体内容
/// </summary>
public byte[] Buffer
{
get;
private set;
}
#endregion
#region Public Methods
/// <summary>
/// reply
/// </summary>
/// <param name="connection"></param>
/// <param name="payload"></param>
public void Reply(IConnection connection, byte[] payload)
{
var packet = PacketBuilder.ToDSSBinary(this.SeqID, this.ProjectID, this.CmdName, this.VersonNumber, payload);
connection.BeginSend(packet);
}
#endregion
}
View Code
DSSBinaryProtocol
/// <summary>
/// 数据中心二进制协议
/// 协议格式
/// [Message Length(int32)][SeqID(int32)][Request|Response Flag Length(int16)][VersonNumber Length(int16)][Request|Response Flag + VersonNumber + Body Buffer]
/// </summary>
public sealed class DSSBinaryProtocol : IProtocol<DSSBinaryCommandInfo>
{
#region IProtocol Members
/// <summary>
/// find command
/// </summary>
/// <param name="connection"></param>
/// <param name="buffer"></param>
/// <param name="maxMessageSize"></param>
/// <param name="readlength"></param>
/// <returns></returns>
/// <exception cref="BadProtocolException">bad async binary protocl</exception>
public DSSBinaryCommandInfo FindCommandInfo(IConnection connection, ArraySegment<byte> buffer,
int maxMessageSize, out int readlength)
{
if (buffer.Count < 4) { readlength = 0; return null; }
var payload = buffer.Array;
//获取message length
var messageLength = NetworkBitConverter.ToInt32(payload, buffer.Offset);
if (messageLength < 7) throw new BadProtocolException("bad async binary protocl");
if (messageLength > maxMessageSize) throw new BadProtocolException("message is too long");
readlength = messageLength + 4;
if (buffer.Count < readlength)
{
readlength = 0; return null;
}
var seqID = NetworkBitConverter.ToInt32(payload, buffer.Offset + 4);
var projectID = NetworkBitConverter.ToInt16(payload, buffer.Offset + 8);
var cmdNameLength = NetworkBitConverter.ToInt16(payload, buffer.Offset + 10);
var versonNumberLength = NetworkBitConverter.ToInt16(payload, buffer.Offset + 12);
var strName = Encoding.UTF8.GetString(payload, buffer.Offset + 14, cmdNameLength);
var versonNumber = Encoding.UTF8.GetString(payload, buffer.Offset + 14 + cmdNameLength, versonNumberLength);
var dataLength = messageLength - 8 - cmdNameLength;
byte[] data = null;
if (dataLength > 0)
{
data = new byte[dataLength];
Buffer.BlockCopy(payload, buffer.Offset + 14 + cmdNameLength + versonNumberLength, data, 0, dataLength);
}
return new DSSBinaryCommandInfo(seqID, projectID, strName, versonNumber, data);
}
#endregion
}
View Code
除了上面两个文件外,我们还要修改服务端的管理类
/// <summary>
/// Socket server manager.
/// </summary>
public class SocketServerManager
{
#region Private Members
static private readonly List<SocketBase.IHost> _listHosts = new List<SocketBase.IHost>();
#endregion
#region Static Methods
/// <summary>
/// 初始化Socket Server
/// </summary>
static public void Init()
{
Init("socketServer");
}
/// <summary>
/// 初始化Socket Server
/// </summary>
/// <param name="sectionName"></param>
static public void Init(string sectionName)
{
if (string.IsNullOrEmpty(sectionName)) throw new ArgumentNullException("sectionName");
Init(ConfigurationManager.GetSection(sectionName) as Config.SocketServerConfig);
}
/// <summary>
/// 初始化Socket Server
/// </summary>
/// <param name="config"></param>
static public void Init(Config.SocketServerConfig config)
{
if (config == null) throw new ArgumentNullException("config");
if (config.Servers == null) return;
foreach (Config.Server serverConfig in config.Servers)
{
//inti protocol
var objProtocol = GetProtocol(serverConfig.Protocol);
if (objProtocol == null) throw new InvalidOperationException("protocol");
//init custom service
var tService = Type.GetType(serverConfig.ServiceType, false);
if (tService == null) throw new InvalidOperationException("serviceType");
var serviceFace = tService.GetInterface(typeof(ISocketService<>).Name);
if (serviceFace == null) throw new InvalidOperationException("serviceType");
var objService = Activator.CreateInstance(tService);
if (objService == null) throw new InvalidOperationException("serviceType");
//init host.
var host = Activator.CreateInstance(typeof(SocketServer<>).MakeGenericType(
serviceFace.GetGenericArguments()),
objService,
objProtocol,
serverConfig.SocketBufferSize,
serverConfig.MessageBufferSize,
serverConfig.MaxMessageSize,
serverConfig.MaxConnections) as BaseSocketServer;
host.AddListener(serverConfig.Name, new IPEndPoint(IPAddress.Any, serverConfig.Port));
_listHosts.Add(host);
}
}
/// <summary>
/// get protocol.
/// </summary>
/// <param name="protocol"></param>
/// <returns></returns>
static public object GetProtocol(string protocol)
{
switch (protocol)
{
case Protocol.ProtocolNames.AsyncBinary:
return new Protocol.AsyncBinaryProtocol();
case Protocol.ProtocolNames.Thrift:
return new Protocol.ThriftProtocol();
case Protocol.ProtocolNames.CommandLine:
return new Protocol.CommandLineProtocol();
case Protocol.ProtocolNames.DSSBinary:
return new Protocol.DSSBinaryProtocol();
}
return Activator.CreateInstance(Type.GetType(protocol, false));
}
/// <summary>
/// 启动服务
/// </summary>
static public void Start()
{
foreach (var server in _listHosts) server.Start();
}
/// <summary>
/// 停止服务
/// </summary>
static public void Stop()
{
foreach (var server in _listHosts) server.Stop();
}
#endregion
}
从上面的代码中,我们看到了自己新加的协议DSSBinary,我们可以在配置文件中对它进行配置,方法和之前说的一样,在这里就不再重复了。
感谢各位的阅读!
作者:仓储大叔,张占岭,
荣誉:微软MVP