0
点赞
收藏
分享

微信扫一扫

物联网项目-服务端TCP Server

0、背景

C#编写简单TCP服务端,定时检查连接状态,超时主动关闭客户端连接。

1、代码

主函数

class Program
    {
        static int  defaultPort = 7738;
        static string defaultIp = "127.0.0.1";
       
        [Serializable]
        public class TextMessage
        {
            public string MyText { get; set; }
        }
        
        static void Main(string[] args)
        {
            
            int.TryParse(System.Configuration.ConfigurationManager.AppSettings["serverport"], out defaultPort);
            if (false == string.IsNullOrEmpty(System.Configuration.ConfigurationManager.AppSettings["serverip"]))
            {
                defaultIp = System.Configuration.ConfigurationManager.AppSettings["serverip"];
            }
            MyTcpServer server = new MyTcpServer();
            server.OpenServer(defaultIp, defaultPort);
            Console.WriteLine($"IEMSServer Start ...... on IP:{defaultIp} Port:{defaultPort}");
            string input = Console.ReadLine();
            if(input.ToLower() == "e")
            {
                server.CloseServer();
                Console.WriteLine($"IEMSServer Stop ...... on Port:{defaultPort}");
            }
        }
        
    }

Tcp服务器类

/// <summary>
    /// 服务端
    /// </summary>
    public class MyTcpServer
    {
        private Socket ServerSocket = null;//服务端  
        public Dictionary<string, MySession> dic_ClientSocket = new Dictionary<string, MySession>();//tcp客户端字典
        private Dictionary<string, Thread> dic_ClientThread = new Dictionary<string, Thread>();//线程字典,每新增一个连接就添加一条线程
        private Dictionary<string, string> dic_ClientIdSocketEndPoint = new Dictionary<string, string>();//终端字典,终端ID 和 str_EndPoint
        private bool Flag_Listen = true;//监听客户端连接的标志
        private string connStr = System.Configuration.ConfigurationManager.AppSettings["MQ"];
        private IBus bus = null;//RabbitMQ
        //定义Timer类
        System.Timers.Timer timer;
        int interval = 3 * 60 * 1000;//180秒清理连接
        /// <summary>
        /// 启动服务
        /// </summary>
        /// <param name="port">端口号</param>
        public bool OpenServer(string ipAddress, int port)
        {
            try
            {
                Flag_Listen = true;
                // 创建负责监听的套接字,注意其中的参数;
                ServerSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
                // 创建包含ip和端口号的网络节点对象;
                IPEndPoint endPoint = new IPEndPoint(IPAddress.Parse(ipAddress), port);
                try
                {
                    // 将负责监听的套接字绑定到唯一的ip和端口上;
                    ServerSocket.Bind(endPoint);
                }
                catch (Exception ex)
                {
                    Console.WriteLine($"Error Class MyTcpServer Line 46: OpenServer {ex.Message}");
                    return false;
                }
                // 设置监听队列的长度;
                ServerSocket.Listen(100);
                // 创建负责监听的线程;
                Thread Thread_ServerListen = new Thread(ListenConnecting);
                Thread_ServerListen.IsBackground = true;
                Thread_ServerListen.Start();
                bus = RabbitHutch.CreateBus(connStr);
                
                bus.Subscribe<byte[]>("iems_control_id", DealWithControl, x => x.WithTopic("control"));
                //启动定时清理线程
                timer = new System.Timers.Timer(interval);
                timer.AutoReset = true;
                timer.Enabled = true;
                timer.Elapsed += new System.Timers.ElapsedEventHandler(TimerUp);
                //加载终端到Redis <Terminal>
                TerminalInit.LoadAllTerminalToRedisCache();
                //
                return true;
            }
            catch (Exception ex)
            {
                Console.WriteLine($"Error: {ex.Message}");
                return false;
            }
        }
        /// <summary>
        /// 关闭服务
        /// </summary>
        public void CloseServer()
        {
            lock (dic_ClientSocket)
            {
                foreach (var item in dic_ClientSocket)
                {
                    item.Value.Close();//关闭每一个连接
                }
                dic_ClientSocket.Clear();//清除字典
            }
            lock (dic_ClientThread)
            {
                foreach (var item in dic_ClientThread)
                {
                    item.Value.Abort();//停止线程
                }
                dic_ClientThread.Clear();
            }
            lock (dic_ClientIdSocketEndPoint)
            {
                dic_ClientIdSocketEndPoint.Clear();
            }
            Flag_Listen = false;
            //ServerSocket.Shutdown(SocketShutdown.Both);//服务端不能主动关闭连接,需要把监听到的连接逐个关闭
            if (ServerSocket != null)
                ServerSocket.Close();

        }
        /// <summary>
        /// 监听客户端请求的方法;
        /// </summary>
        private void ListenConnecting()
        {
            while (Flag_Listen)  // 持续不断的监听客户端的连接请求;
            {
                try
                {
                    Socket sokConnection = ServerSocket.Accept(); // 一旦监听到一个客户端的请求,就返回一个与该客户端通信的 套接字;
                    // 将与客户端连接的 套接字 对象添加到集合中;
                    string str_EndPoint = sokConnection.RemoteEndPoint.ToString();
                    MySession myTcpClient = new MySession() { TcpSocket = sokConnection, LastRefreshTime = DateTime.Now.AddSeconds(interval / 1000) };
                    //创建线程接收数据
                    Thread th_ReceiveData = new Thread(ReceiveData);
                    th_ReceiveData.IsBackground = true;
                    th_ReceiveData.Start(myTcpClient);
                    //把线程及客户连接加入字典
                    dic_ClientThread.Add(str_EndPoint, th_ReceiveData);
                    dic_ClientSocket.Add(str_EndPoint, myTcpClient);
                    Console.WriteLine($"{DateTime.Now.ToString()}有新连接来了:{str_EndPoint}");
                    
                }
                catch (Exception ex)
                {
                    Console.WriteLine($"Error: {ex.Message}");
                }
                Thread.Sleep(200);
            }
        }
        /// <summary>
        /// 接收数据
        /// </summary>
        /// <param name="sokConnectionparn"></param>
        private void ReceiveData(object sokConnectionparn)
        {
            MySession tcpClient = sokConnectionparn as MySession;
            
            Socket socketClient = tcpClient.TcpSocket;
            //bool Flag_Receive = true;

            while (tcpClient.Flag_Receive)
            {
                try
                {
                    // 定义一个2M的缓存区;
                    byte[] arrMsgRec = new byte[1024 * 1024 * 2];
                    // 将接受到的数据存入到输入  arrMsgRec中;
                    int length = -1;
                    try
                    {
                        length = socketClient.Receive(arrMsgRec); // 接收数据,并返回数据的长度;
                    }
                    catch
                    {
                        tcpClient.Flag_Receive = false;
                        // 从通信线程集合中删除被中断连接的通信线程对象;
                        string keystr = socketClient.RemoteEndPoint.ToString();
                        if (dic_ClientSocket.ContainsKey(keystr))
                        {
                            dic_ClientSocket.Remove(keystr);//删除客户端字典中该socket
                        }
                        if (dic_ClientThread.ContainsKey(keystr))
                        {
                            if(dic_ClientThread[keystr].ThreadState != ThreadState.Aborted  && dic_ClientThread[keystr].ThreadState != ThreadState.AbortRequested)//关闭线程
                            {
                                dic_ClientThread[keystr].Abort();
                            }
                            dic_ClientThread.Remove(keystr);//删除字典中该线程
                        }
                        tcpClient = null;
                        socketClient = null;
                        break;
                    }
                    byte[] buf = new byte[length];
                    Array.Copy(arrMsgRec, buf, length);
                    lock (tcpClient.m_Buffer)
                    {
                        tcpClient.AddQueue(buf);
                    }
                    if (length > 0)
                    {
                        string recString = System.Text.Encoding.ASCII.GetString(buf);
                        //string recString = StringTool.byteToHexStr(buf);
                        if (recString.Length < 20)
                        {
                            if (recString == "OK")
                            {
                                Console.WriteLine($"接收心跳:{recString}");
                                tcpClient.LastRefreshTime = DateTime.Now.AddSeconds(interval / 1000);//修改超时时间
                            }
                            else
                            {
                                Console.WriteLine($"客户端字符长度低于20,请重新发送:{recString}");
                            }
                            continue;
                        }
                        Console.WriteLine($"接收到客户端{socketClient.LocalEndPoint.ToString()} {recString}");
                        //解析字符
                        string head = recString.Substring(0, 4);
                        if (head != "2323")
                        {
                            Console.WriteLine("数据格式错误,开头不是## >" + head);
                            continue;
                        }
                        string clientCheckFlag = recString.Substring(recString.Length - 2, 2);
                        string checkRecString = recString.Substring(4, recString.Length - 6);
                        string serverCheckFlag = StringTool.GetBCCXorCode(StringTool.strToToHexByte(checkRecString));
                        if (clientCheckFlag == serverCheckFlag)
                        {
                            Package p = null;
                            if (recString.Substring(4, 2) == "01")//01 登录
                            {
                                p = new LoginPackage(recString);

                                bus.Publish<byte[]>(buf, "login");
                            }
                            else if (recString.Substring(4, 2) == "02")//02 实时数据上报
                            {
                                p = new RealDataPackage(recString);
                                bus.Publish<byte[]>(buf, "upload");
                            }
                            else
                            {
                                p = new Package(recString);
                                bus.Publish<byte[]>(buf, "other");
                            }

                            if (p != null)
                            {
                                if (dic_ClientIdSocketEndPoint.ContainsKey(p.getSID))
                                {
                                    string endPoint = dic_ClientIdSocketEndPoint[p.getSID];
                                    if(endPoint != socketClient.RemoteEndPoint.ToString())
                                    {
                                        dic_ClientIdSocketEndPoint.Remove(p.getSID);
                                        dic_ClientIdSocketEndPoint.Add(p.getSID, socketClient.RemoteEndPoint.ToString());
                                    }
                                }else
                                {
                                    dic_ClientIdSocketEndPoint.Add(p.getSID, socketClient.RemoteEndPoint.ToString());
                                }
                                Console.WriteLine(p.ToString());
                                Console.WriteLine("应答:");
                                Console.WriteLine(p.getAnswerPackage());
                                //SendData(socketClient.RemoteEndPoint.ToString(), StringTool.strToToHexByte(p.getAnswerPackage()));
                                Console.WriteLine("发送应答完成");
                            }
                        }
                    }
                }
                catch (Exception ex)
                {
                    System.Console.WriteLine($"Error: MyTcpServer Line 164 {ex.Message}");
                    break;
                }
                Thread.Sleep(100);
            }
        }
        /// <summary>
        /// 发送数据给指定的客户端
        /// </summary>
        /// <param name="_endPoint">客户端套接字</param>
        /// <param name="_buf">发送的数组</param>
        /// <returns></returns>
        public bool SendData(string _endPoint, byte[] _buf)
        {
            MySession myT = new MySession();
            if (dic_ClientSocket.TryGetValue(_endPoint, out myT))
            {
                myT.Send(_buf);
                return true;
            }
            else
            {
                return false;
            }
        }

        private void TimerUp(object sender, System.Timers.ElapsedEventArgs e)
        {
            try
            {
                int i = 0;
                List<string> needCloseConnect = new List<string>();
                foreach (var item in dic_ClientSocket)
                {
                    Console.WriteLine($"{DateTime.Now.ToString()} 队列连接显示:{++i}-> {item.Key} 最后连接时间 {item.Value.LastRefreshTime}");
                    if (item.Value.LastRefreshTime < DateTime.Now)
                    {
                        needCloseConnect.Add(item.Key);
                    }
                }
                i = 0;
                foreach (var conn in needCloseConnect)
                {
                    Console.WriteLine($"{DateTime.Now.ToString()} 队列连接删除:{++i} -> {conn} 最后连接时间 {dic_ClientSocket[conn].LastRefreshTime}");
                    dic_ClientSocket[conn].Flag_Receive = false;
                    dic_ClientThread[conn].Abort();
                    dic_ClientSocket[conn].Close();
                    dic_ClientSocket.Remove(conn);
                    
                    //dic_ClientThread[conn].Interrupt();
                    //dic_ClientThread[conn].Join();
                    
                    dic_ClientThread.Remove(conn);
                }
            }catch(Exception ex)
            {
                Console.WriteLine($"TimeUp Error:{ex.Message}");
            }
        }

        public void DealWithControl(byte[] buf)
        {
            string recString = System.Text.Encoding.ASCII.GetString(buf);
            Package p = new Package(recString);
            Console.ForegroundColor = ConsoleColor.Green;
            Console.WriteLine("发送命令给:" + p.getSID);
            Console.WriteLine(p.ToString());
            if (dic_ClientIdSocketEndPoint.ContainsKey(p.getSID))
            {
                string endPoint = dic_ClientIdSocketEndPoint[p.getSID];
                SendData(endPoint, buf);
            }
        }
    }

会话 Session类

/// <summary>
    /// 会话端
    /// </summary>
    public class MySession
    {
        public Socket TcpSocket;//socket对象
        public List<byte> m_Buffer = new List<byte>();//数据缓存区
        public DateTime LastRefreshTime;
        public volatile bool Flag_Receive = true;
        public MySession()
        {

        }

        /// <summary>
        /// 发送数据
        /// </summary>
        /// <param name="buf"></param>
        public void Send(byte[] buf)
        {
            if (buf != null)
            {
                TcpSocket.Send(buf);
            }
        }
        /// <summary>
        /// 获取连接的ip
        /// </summary>
        /// <returns></returns>
        public string GetIp()
        {
            IPEndPoint clientipe = (IPEndPoint)TcpSocket.RemoteEndPoint;
            string _ip = clientipe.Address.ToString();
            return _ip;
        }
        /// <summary>
        /// 关闭连接
        /// </summary>
        public void Close()
        {
            TcpSocket.Shutdown(SocketShutdown.Both);
        }
        /// <summary>
        /// 提取正确数据包
        /// </summary>
        public byte[] GetBuffer(int startIndex, int size)
        {
            byte[] buf = new byte[size];
            m_Buffer.CopyTo(startIndex, buf, 0, size);
            m_Buffer.RemoveRange(0, startIndex + size);
            return buf;
        }

        /// <summary>
        /// 添加队列数据
        /// </summary>
        /// <param name="buffer"></param>
        public void AddQueue(byte[] buffer)
        {
            m_Buffer.AddRange(buffer);
        }
        /// <summary>
        /// 清除缓存
        /// </summary>
        public void ClearQueue()
        {
            m_Buffer.Clear();
        }
    }

加载终端到Redis -(供Web端显示终端是否在线)

public class TerminalInit
    {
        public static void LoadAllTerminalToRedisCache()
        {
            var allTerminals = EntityRepository.instance.IEMSContext.Query<Terminal>("select * from iems_terminal");

            BinaryFormatter formatter = new BinaryFormatter();
            RedisHelper redisHelper = new RedisHelper();
            foreach(var ter in allTerminals)
            {
                using (var ms = new MemoryStream())
                {
                    formatter.Serialize(ms, ter);
                    redisHelper.Set<byte[]>(ter.terminal_sid, ms.ToArray(), -1);
                }
                Console.WriteLine($"Terminal Init:{ter.terminal_sid}->{ter.location}->{ter.terminal_config}");
            }
        }
    }

举报

相关推荐

0 条评论