0、背景
C#编写简单TCP服务端,定时检查连接状态,超时主动关闭客户端连接。
1、代码
主函数
class Program
{
static int defaultPort = 7738;
static string defaultIp = "127.0.0.1";
[Serializable]
public class TextMessage
{
public string MyText { get; set; }
}
static void Main(string[] args)
{
int.TryParse(System.Configuration.ConfigurationManager.AppSettings["serverport"], out defaultPort);
if (false == string.IsNullOrEmpty(System.Configuration.ConfigurationManager.AppSettings["serverip"]))
{
defaultIp = System.Configuration.ConfigurationManager.AppSettings["serverip"];
}
MyTcpServer server = new MyTcpServer();
server.OpenServer(defaultIp, defaultPort);
Console.WriteLine($"IEMSServer Start ...... on IP:{defaultIp} Port:{defaultPort}");
string input = Console.ReadLine();
if(input.ToLower() == "e")
{
server.CloseServer();
Console.WriteLine($"IEMSServer Stop ...... on Port:{defaultPort}");
}
}
}
Tcp服务器类
/// <summary>
/// 服务端
/// </summary>
public class MyTcpServer
{
private Socket ServerSocket = null;//服务端
public Dictionary<string, MySession> dic_ClientSocket = new Dictionary<string, MySession>();//tcp客户端字典
private Dictionary<string, Thread> dic_ClientThread = new Dictionary<string, Thread>();//线程字典,每新增一个连接就添加一条线程
private Dictionary<string, string> dic_ClientIdSocketEndPoint = new Dictionary<string, string>();//终端字典,终端ID 和 str_EndPoint
private bool Flag_Listen = true;//监听客户端连接的标志
private string connStr = System.Configuration.ConfigurationManager.AppSettings["MQ"];
private IBus bus = null;//RabbitMQ
//定义Timer类
System.Timers.Timer timer;
int interval = 3 * 60 * 1000;//180秒清理连接
/// <summary>
/// 启动服务
/// </summary>
/// <param name="port">端口号</param>
public bool OpenServer(string ipAddress, int port)
{
try
{
Flag_Listen = true;
// 创建负责监听的套接字,注意其中的参数;
ServerSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
// 创建包含ip和端口号的网络节点对象;
IPEndPoint endPoint = new IPEndPoint(IPAddress.Parse(ipAddress), port);
try
{
// 将负责监听的套接字绑定到唯一的ip和端口上;
ServerSocket.Bind(endPoint);
}
catch (Exception ex)
{
Console.WriteLine($"Error Class MyTcpServer Line 46: OpenServer {ex.Message}");
return false;
}
// 设置监听队列的长度;
ServerSocket.Listen(100);
// 创建负责监听的线程;
Thread Thread_ServerListen = new Thread(ListenConnecting);
Thread_ServerListen.IsBackground = true;
Thread_ServerListen.Start();
bus = RabbitHutch.CreateBus(connStr);
bus.Subscribe<byte[]>("iems_control_id", DealWithControl, x => x.WithTopic("control"));
//启动定时清理线程
timer = new System.Timers.Timer(interval);
timer.AutoReset = true;
timer.Enabled = true;
timer.Elapsed += new System.Timers.ElapsedEventHandler(TimerUp);
//加载终端到Redis <Terminal>
TerminalInit.LoadAllTerminalToRedisCache();
//
return true;
}
catch (Exception ex)
{
Console.WriteLine($"Error: {ex.Message}");
return false;
}
}
/// <summary>
/// 关闭服务
/// </summary>
public void CloseServer()
{
lock (dic_ClientSocket)
{
foreach (var item in dic_ClientSocket)
{
item.Value.Close();//关闭每一个连接
}
dic_ClientSocket.Clear();//清除字典
}
lock (dic_ClientThread)
{
foreach (var item in dic_ClientThread)
{
item.Value.Abort();//停止线程
}
dic_ClientThread.Clear();
}
lock (dic_ClientIdSocketEndPoint)
{
dic_ClientIdSocketEndPoint.Clear();
}
Flag_Listen = false;
//ServerSocket.Shutdown(SocketShutdown.Both);//服务端不能主动关闭连接,需要把监听到的连接逐个关闭
if (ServerSocket != null)
ServerSocket.Close();
}
/// <summary>
/// 监听客户端请求的方法;
/// </summary>
private void ListenConnecting()
{
while (Flag_Listen) // 持续不断的监听客户端的连接请求;
{
try
{
Socket sokConnection = ServerSocket.Accept(); // 一旦监听到一个客户端的请求,就返回一个与该客户端通信的 套接字;
// 将与客户端连接的 套接字 对象添加到集合中;
string str_EndPoint = sokConnection.RemoteEndPoint.ToString();
MySession myTcpClient = new MySession() { TcpSocket = sokConnection, LastRefreshTime = DateTime.Now.AddSeconds(interval / 1000) };
//创建线程接收数据
Thread th_ReceiveData = new Thread(ReceiveData);
th_ReceiveData.IsBackground = true;
th_ReceiveData.Start(myTcpClient);
//把线程及客户连接加入字典
dic_ClientThread.Add(str_EndPoint, th_ReceiveData);
dic_ClientSocket.Add(str_EndPoint, myTcpClient);
Console.WriteLine($"{DateTime.Now.ToString()}有新连接来了:{str_EndPoint}");
}
catch (Exception ex)
{
Console.WriteLine($"Error: {ex.Message}");
}
Thread.Sleep(200);
}
}
/// <summary>
/// 接收数据
/// </summary>
/// <param name="sokConnectionparn"></param>
private void ReceiveData(object sokConnectionparn)
{
MySession tcpClient = sokConnectionparn as MySession;
Socket socketClient = tcpClient.TcpSocket;
//bool Flag_Receive = true;
while (tcpClient.Flag_Receive)
{
try
{
// 定义一个2M的缓存区;
byte[] arrMsgRec = new byte[1024 * 1024 * 2];
// 将接受到的数据存入到输入 arrMsgRec中;
int length = -1;
try
{
length = socketClient.Receive(arrMsgRec); // 接收数据,并返回数据的长度;
}
catch
{
tcpClient.Flag_Receive = false;
// 从通信线程集合中删除被中断连接的通信线程对象;
string keystr = socketClient.RemoteEndPoint.ToString();
if (dic_ClientSocket.ContainsKey(keystr))
{
dic_ClientSocket.Remove(keystr);//删除客户端字典中该socket
}
if (dic_ClientThread.ContainsKey(keystr))
{
if(dic_ClientThread[keystr].ThreadState != ThreadState.Aborted && dic_ClientThread[keystr].ThreadState != ThreadState.AbortRequested)//关闭线程
{
dic_ClientThread[keystr].Abort();
}
dic_ClientThread.Remove(keystr);//删除字典中该线程
}
tcpClient = null;
socketClient = null;
break;
}
byte[] buf = new byte[length];
Array.Copy(arrMsgRec, buf, length);
lock (tcpClient.m_Buffer)
{
tcpClient.AddQueue(buf);
}
if (length > 0)
{
string recString = System.Text.Encoding.ASCII.GetString(buf);
//string recString = StringTool.byteToHexStr(buf);
if (recString.Length < 20)
{
if (recString == "OK")
{
Console.WriteLine($"接收心跳:{recString}");
tcpClient.LastRefreshTime = DateTime.Now.AddSeconds(interval / 1000);//修改超时时间
}
else
{
Console.WriteLine($"客户端字符长度低于20,请重新发送:{recString}");
}
continue;
}
Console.WriteLine($"接收到客户端{socketClient.LocalEndPoint.ToString()} {recString}");
//解析字符
string head = recString.Substring(0, 4);
if (head != "2323")
{
Console.WriteLine("数据格式错误,开头不是## >" + head);
continue;
}
string clientCheckFlag = recString.Substring(recString.Length - 2, 2);
string checkRecString = recString.Substring(4, recString.Length - 6);
string serverCheckFlag = StringTool.GetBCCXorCode(StringTool.strToToHexByte(checkRecString));
if (clientCheckFlag == serverCheckFlag)
{
Package p = null;
if (recString.Substring(4, 2) == "01")//01 登录
{
p = new LoginPackage(recString);
bus.Publish<byte[]>(buf, "login");
}
else if (recString.Substring(4, 2) == "02")//02 实时数据上报
{
p = new RealDataPackage(recString);
bus.Publish<byte[]>(buf, "upload");
}
else
{
p = new Package(recString);
bus.Publish<byte[]>(buf, "other");
}
if (p != null)
{
if (dic_ClientIdSocketEndPoint.ContainsKey(p.getSID))
{
string endPoint = dic_ClientIdSocketEndPoint[p.getSID];
if(endPoint != socketClient.RemoteEndPoint.ToString())
{
dic_ClientIdSocketEndPoint.Remove(p.getSID);
dic_ClientIdSocketEndPoint.Add(p.getSID, socketClient.RemoteEndPoint.ToString());
}
}else
{
dic_ClientIdSocketEndPoint.Add(p.getSID, socketClient.RemoteEndPoint.ToString());
}
Console.WriteLine(p.ToString());
Console.WriteLine("应答:");
Console.WriteLine(p.getAnswerPackage());
//SendData(socketClient.RemoteEndPoint.ToString(), StringTool.strToToHexByte(p.getAnswerPackage()));
Console.WriteLine("发送应答完成");
}
}
}
}
catch (Exception ex)
{
System.Console.WriteLine($"Error: MyTcpServer Line 164 {ex.Message}");
break;
}
Thread.Sleep(100);
}
}
/// <summary>
/// 发送数据给指定的客户端
/// </summary>
/// <param name="_endPoint">客户端套接字</param>
/// <param name="_buf">发送的数组</param>
/// <returns></returns>
public bool SendData(string _endPoint, byte[] _buf)
{
MySession myT = new MySession();
if (dic_ClientSocket.TryGetValue(_endPoint, out myT))
{
myT.Send(_buf);
return true;
}
else
{
return false;
}
}
private void TimerUp(object sender, System.Timers.ElapsedEventArgs e)
{
try
{
int i = 0;
List<string> needCloseConnect = new List<string>();
foreach (var item in dic_ClientSocket)
{
Console.WriteLine($"{DateTime.Now.ToString()} 队列连接显示:{++i}-> {item.Key} 最后连接时间 {item.Value.LastRefreshTime}");
if (item.Value.LastRefreshTime < DateTime.Now)
{
needCloseConnect.Add(item.Key);
}
}
i = 0;
foreach (var conn in needCloseConnect)
{
Console.WriteLine($"{DateTime.Now.ToString()} 队列连接删除:{++i} -> {conn} 最后连接时间 {dic_ClientSocket[conn].LastRefreshTime}");
dic_ClientSocket[conn].Flag_Receive = false;
dic_ClientThread[conn].Abort();
dic_ClientSocket[conn].Close();
dic_ClientSocket.Remove(conn);
//dic_ClientThread[conn].Interrupt();
//dic_ClientThread[conn].Join();
dic_ClientThread.Remove(conn);
}
}catch(Exception ex)
{
Console.WriteLine($"TimeUp Error:{ex.Message}");
}
}
public void DealWithControl(byte[] buf)
{
string recString = System.Text.Encoding.ASCII.GetString(buf);
Package p = new Package(recString);
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine("发送命令给:" + p.getSID);
Console.WriteLine(p.ToString());
if (dic_ClientIdSocketEndPoint.ContainsKey(p.getSID))
{
string endPoint = dic_ClientIdSocketEndPoint[p.getSID];
SendData(endPoint, buf);
}
}
}
会话 Session类
/// <summary>
/// 会话端
/// </summary>
public class MySession
{
public Socket TcpSocket;//socket对象
public List<byte> m_Buffer = new List<byte>();//数据缓存区
public DateTime LastRefreshTime;
public volatile bool Flag_Receive = true;
public MySession()
{
}
/// <summary>
/// 发送数据
/// </summary>
/// <param name="buf"></param>
public void Send(byte[] buf)
{
if (buf != null)
{
TcpSocket.Send(buf);
}
}
/// <summary>
/// 获取连接的ip
/// </summary>
/// <returns></returns>
public string GetIp()
{
IPEndPoint clientipe = (IPEndPoint)TcpSocket.RemoteEndPoint;
string _ip = clientipe.Address.ToString();
return _ip;
}
/// <summary>
/// 关闭连接
/// </summary>
public void Close()
{
TcpSocket.Shutdown(SocketShutdown.Both);
}
/// <summary>
/// 提取正确数据包
/// </summary>
public byte[] GetBuffer(int startIndex, int size)
{
byte[] buf = new byte[size];
m_Buffer.CopyTo(startIndex, buf, 0, size);
m_Buffer.RemoveRange(0, startIndex + size);
return buf;
}
/// <summary>
/// 添加队列数据
/// </summary>
/// <param name="buffer"></param>
public void AddQueue(byte[] buffer)
{
m_Buffer.AddRange(buffer);
}
/// <summary>
/// 清除缓存
/// </summary>
public void ClearQueue()
{
m_Buffer.Clear();
}
}
加载终端到Redis -(供Web端显示终端是否在线)
public class TerminalInit
{
public static void LoadAllTerminalToRedisCache()
{
var allTerminals = EntityRepository.instance.IEMSContext.Query<Terminal>("select * from iems_terminal");
BinaryFormatter formatter = new BinaryFormatter();
RedisHelper redisHelper = new RedisHelper();
foreach(var ter in allTerminals)
{
using (var ms = new MemoryStream())
{
formatter.Serialize(ms, ter);
redisHelper.Set<byte[]>(ter.terminal_sid, ms.ToArray(), -1);
}
Console.WriteLine($"Terminal Init:{ter.terminal_sid}->{ter.location}->{ter.terminal_config}");
}
}
}