Rust 代码实现高性能服务器
网络模块
处理网络通信和连接管理。
// 网络模块
use std::net::{TcpListener, TcpStream};
use std::io::{Read, Write};
use std::thread;
pub struct Server {
address: String,
}
impl Server {
pub fn new(address: &str) -> Self {
Server {
address: address.to_string(),
}
}
pub fn start(&self) {
let listener = TcpListener::bind(&self.address).expect("Failed to bind address");
println!("Server listening on {}", self.address);
for stream in listener.incoming() {
match stream {
Ok(stream) => {
thread::spawn(move || {
handle_connection(stream);
});
}
Err(e) => {
eprintln!("Error accepting connection: {}", e);
}
}
}
}
}
fn handle_connection(mut stream: TcpStream) {
let mut buffer = [0; 1024];
stream.read(&mut buffer).expect("Failed to read data from stream");
let response = "HTTP/1.1 200 OK\r\n\r\nHello, world!";
stream.write(response.as_bytes()).expect("Failed to write response to stream");
stream.flush().expect("Failed to flush stream");
}
请求处理模块
处理客户端请求并生成响应。
// 请求处理模块
pub struct RequestHandler;
impl RequestHandler {
pub fn handle_request(request: &[u8]) -> String {
// 在这里处理请求并生成响应
// 这里只是一个简单的示例,实际实现会更复杂
"HTTP/1.1 200 OK\r\n\r\nHello, world!".to_string()
}
}
在这个模块中,我们定义了一个 RequestHandler 结构体,其中有一个 handle_request 方法,用于处理客户端请求并生成响应。在这个示例中,我们只是返回了一个简单的 HTTP 响应。
线程池模块
管理线程池,用于处理请求。
// 线程池模块
use std::sync::{Arc, Mutex};
use std::thread;
use std::sync::mpsc;
pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Job>,
}
type Job = Box<dyn FnOnce() + Send + 'static>;
impl ThreadPool {
pub fn new(size: usize) -> Self {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool { workers, sender }
}
pub fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.send(job).expect("Failed to send job to worker");
}
}
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Self {
let thread = thread::spawn(move || {
loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Worker {} got a job; executing.", id);
job();
}
});
Worker {
id,
thread: Some(thread),
}
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
for worker in &mut self.workers {
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}
在这个模块中,我们定义了一个 ThreadPool 结构体,用于管理线程池。我们还定义了一个 Worker 结构体,用于实际处理任务的工作线程。在 ThreadPool 的 execute 方法中,我们将任务发送到线程池中的一个工作线程来执行。
服务器模块
在服务器模块中,我们定义了一个 Server 结构体,负责创建服务器并监听客户端连接。在 run 方法中,我们使用线程池来处理每个连接。当有新的连接到来时,我们将其交给线程池中的一个工作线程来处理。处理连接的方法在 handle_connection 中实现,其中会调用请求处理模块来处理客户端请求并返回响应。
// 服务器模块
use std::net::{TcpListener, TcpStream};
use std::io::{Read, Write};
use std::sync::Arc;
use crate::thread_pool::ThreadPool;
mod request_handler;
mod thread_pool;
pub struct Server {
listener: TcpListener,
pool: ThreadPool,
}
impl Server {
pub fn new(addr: &str, pool_size: usize) -> Self {
let listener = TcpListener::bind(addr).expect("Failed to bind to address");
let pool = ThreadPool::new(pool_size);
Server { listener, pool }
}
pub fn run(&self) {
println!("Server running on {}", self.listener.local_addr().unwrap());
for stream in self.listener.incoming() {
let stream = stream.expect("Failed to establish connection");
let pool = Arc::clone(&self.pool);
pool.execute(move || {
Self::handle_connection(stream);
});
}
}
fn handle_connection(mut stream: TcpStream) {
let mut buffer = [0; 1024];
stream.read(&mut buffer).expect("Failed to read from stream");
let response = request_handler::RequestHandler::handle_request(&buffer);
stream.write_all(response.as_bytes()).expect("Failed to write to stream");
stream.flush().expect("Failed to flush stream");
}
}
fn main() {
let server = Server::new("127.0.0.1:8080", 4);
server.run();
}