系统socket编成,listen后在accept阻塞监听:[连接](https:
源码:[连接](https:
罗老师:[https:
源码目录介绍:[https:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <time.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/socket.h>
#include <sys/wait.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <fcntl.h>
#include <signal.h>
#include <pwd.h>
#include "st.h"
#define PID_FILE "pid"
#define ERRORS_FILE "errors"
#define ACCESS_FILE "access"
#define SERV_PORT_DEFAULT 8000
#define LISTENQ_SIZE_DEFAULT 256
#define MAX_BIND_ADDRS 16
#define MAX_WAIT_THREADS_DEFAULT 8
#define FD_PER_THREAD 2
#define ACCLOG_FLUSH_INTERVAL 30
#define REQUEST_TIMEOUT 30
struct socket_info {
st_netfd_t nfd;
char *addr;
unsigned int port;
int wait_threads;
int busy_threads;
int rqst_count;
} srv_socket[MAX_BIND_ADDRS];
static int sk_count = 0;
static int vp_count = 0;
static pid_t *vp_pids;
static int my_index = -1;
static pid_t my_pid = -1;
static st_netfd_t sig_pipe[2];
static int interactive_mode = 0;
static int serialize_accept = 0;
static int log_access = 0;
static char *logdir = NULL;
static char *username = NULL;
static int listenq_size = LISTENQ_SIZE_DEFAULT;
static int errfd = STDERR_FILENO;
static int max_threads = 0;
static int max_wait_threads = 0;
static int min_wait_threads = 2;
#ifndef INADDR_NONE
#define INADDR_NONE 0xffffffff
#endif
#define SEC2USEC(s) ((s)*1000000LL)
#define WAIT_THREADS(i) (srv_socket[i].wait_threads)
#define BUSY_THREADS(i) (srv_socket[i].busy_threads)
#define TOTAL_THREADS(i) (WAIT_THREADS(i) + BUSY_THREADS(i))
#define RQST_COUNT(i) (srv_socket[i].rqst_count)
static void usage(const char *progname);
static void parse_arguments(int argc, char *argv[]);
static void start_daemon(void);
static void set_thread_throttling(void);
static void create_listeners(void);
static void change_user(void);
static void open_log_files(void);
static void start_processes(void);
static void wdog_sighandler(int signo);
static void child_sighandler(int signo);
static void install_sighandlers(void);
static void start_threads(void);
static void *process_signals(void *arg);
static void *flush_acclog_buffer(void *arg);
static void *handle_connections(void *arg);
static void dump_server_info(void);
static void Signal(int sig, void (*handler)(int));
static int cpu_count(void);
extern void handle_session(long srv_socket_index, st_netfd_t cli_nfd);
extern void load_configs(void);
extern void logbuf_open(void);
extern void logbuf_flush(void);
extern void logbuf_close(void);
extern void err_sys_report(int fd, const char *fmt, ...);
extern void err_sys_quit(int fd, const char *fmt, ...);
extern void err_sys_dump(int fd, const char *fmt, ...);
extern void err_report(int fd, const char *fmt, ...);
extern void err_quit(int fd, const char *fmt, ...);
int main(int argc, char *argv[])
{
parse_arguments(argc, argv);
if ((vp_pids = calloc(vp_count, sizeof(pid_t))) == NULL)
err_sys_quit(errfd, "ERROR: calloc failed");
if (!interactive_mode)
start_daemon();
if (st_init() < 0)
err_sys_quit(errfd, "ERROR: initialization failed: st_init");
set_thread_throttling();
create_listeners();
if (username)
change_user();
open_log_files();
start_processes();
st_timecache_set(1);
install_sighandlers();
load_configs();
start_threads();
process_signals(NULL);
return 1;
}
static void usage(const char *progname)
{
fprintf(stderr, "Usage: %s -l <log_directory> [<options>]\n\n"
"Possible options:\n\n"
"\t-b <host>:<port> Bind to specified address. Multiple"
" addresses\n"
"\t are permitted.\n"
"\t-p <num_processes> Create specified number of processes.\n"
"\t-t <min_thr>:<max_thr> Specify thread limits per listening"
" socket\n"
"\t across all processes.\n"
"\t-u <user> Change server's user id to specified"
" value.\n"
"\t-q <backlog> Set max length of pending connections"
" queue.\n"
"\t-a Enable access logging.\n"
"\t-i Run in interactive mode.\n"
"\t-S Serialize all accept() calls.\n"
"\t-h Print this message.\n",
progname);
exit(1);
}
static void parse_arguments(int argc, char *argv[])
{
extern char *optarg;
int opt;
char *c;
while ((opt = getopt(argc, argv, "b:p:l:t:u:q:aiSh")) != EOF) {
switch (opt) {
case 'b':
if (sk_count >= MAX_BIND_ADDRS)
err_quit(errfd, "ERROR: max number of bind addresses (%d) exceeded",
MAX_BIND_ADDRS);
if ((c = strdup(optarg)) == NULL)
err_sys_quit(errfd, "ERROR: strdup");
srv_socket[sk_count++].addr = c;
break;
case 'p':
vp_count = atoi(optarg);
if (vp_count < 1)
err_quit(errfd, "ERROR: invalid number of processes: %s", optarg);
break;
case 'l':
logdir = optarg;
break;
case 't':
max_wait_threads = (int) strtol(optarg, &c, 10);
if (*c++ == ':')
max_threads = atoi(c);
if (max_wait_threads < 0 || max_threads < 0)
err_quit(errfd, "ERROR: invalid number of threads: %s", optarg);
break;
case 'u':
username = optarg;
break;
case 'q':
listenq_size = atoi(optarg);
if (listenq_size < 1)
err_quit(errfd, "ERROR: invalid listen queue size: %s", optarg);
break;
case 'a':
log_access = 1;
break;
case 'i':
interactive_mode = 1;
break;
case 'S':
serialize_accept = 1;
break;
case 'h':
case '?':
usage(argv[0]);
}
}
if (logdir == NULL && !interactive_mode) {
err_report(errfd, "ERROR: logging directory is required\n");
usage(argv[0]);
}
if (getuid() == 0 && username == NULL)
err_report(errfd, "WARNING: running as super-user!");
if (vp_count == 0 && (vp_count = cpu_count()) < 1)
vp_count = 1;
if (sk_count == 0) {
sk_count = 1;
srv_socket[0].addr = "0.0.0.0";
}
}
static void start_daemon(void)
{
pid_t pid;
if ((pid = fork()) < 0)
err_sys_quit(errfd, "ERROR: fork");
if (pid > 0)
exit(0);
setsid();
if ((pid = fork()) < 0)
err_sys_quit(errfd, "ERROR: fork");
if (pid > 0)
exit(0);
umask(022);
if (chdir(logdir) < 0)
err_sys_quit(errfd, "ERROR: can't change directory to %s: chdir", logdir);
}
static void set_thread_throttling(void)
{
if (max_wait_threads == 0)
max_wait_threads = MAX_WAIT_THREADS_DEFAULT * vp_count;
if (max_threads == 0)
max_threads = (st_getfdlimit() * vp_count) / FD_PER_THREAD / sk_count;
if (max_wait_threads > max_threads)
max_wait_threads = max_threads;
if (max_wait_threads % vp_count)
max_wait_threads = max_wait_threads / vp_count + 1;
else
max_wait_threads = max_wait_threads / vp_count;
if (max_threads % vp_count)
max_threads = max_threads / vp_count + 1;
else
max_threads = max_threads / vp_count;
if (min_wait_threads > max_wait_threads)
min_wait_threads = max_wait_threads;
}
static void create_listeners(void)
{
int i, n, sock;
char *c;
struct sockaddr_in serv_addr;
struct hostent *hp;
unsigned short port;
for (i = 0; i < sk_count; i++) {
port = 0;
if ((c = strchr(srv_socket[i].addr, ':')) != NULL) {
*c++ = '\0';
port = (unsigned short) atoi(c);
}
if (srv_socket[i].addr[0] == '\0')
srv_socket[i].addr = "0.0.0.0";
if (port == 0)
port = SERV_PORT_DEFAULT;
if ((sock = socket(PF_INET, SOCK_STREAM, 0)) < 0)
err_sys_quit(errfd, "ERROR: can't create socket: socket");
n = 1;
if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *)&n, sizeof(n)) < 0)
err_sys_quit(errfd, "ERROR: can't set SO_REUSEADDR: setsockopt");
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(port);
serv_addr.sin_addr.s_addr = inet_addr(srv_socket[i].addr);
if (serv_addr.sin_addr.s_addr == INADDR_NONE) {
if ((hp = gethostbyname(srv_socket[i].addr)) == NULL)
err_quit(errfd, "ERROR: can't resolve address: %s",
srv_socket[i].addr);
memcpy(&serv_addr.sin_addr, hp->h_addr, hp->h_length);
}
srv_socket[i].port = port;
if (bind(sock, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0)
err_sys_quit(errfd, "ERROR: can't bind to address %s, port %hu",
srv_socket[i].addr, port);
if (listen(sock, listenq_size) < 0)
err_sys_quit(errfd, "ERROR: listen");
if ((srv_socket[i].nfd = st_netfd_open_socket(sock)) == NULL)
err_sys_quit(errfd, "ERROR: st_netfd_open_socket");
if (serialize_accept && st_netfd_serialize_accept(srv_socket[i].nfd) < 0)
err_sys_quit(errfd, "ERROR: st_netfd_serialize_accept");
}
}
static void change_user(void)
{
struct passwd *pw;
if ((pw = getpwnam(username)) == NULL)
err_quit(errfd, "ERROR: can't find user '%s': getpwnam failed", username);
if (setgid(pw->pw_gid) < 0)
err_sys_quit(errfd, "ERROR: can't change group id: setgid");
if (setuid(pw->pw_uid) < 0)
err_sys_quit(errfd, "ERROR: can't change user id: setuid");
err_report(errfd, "INFO: changed process user id to '%s'", username);
}
static void open_log_files(void)
{
int fd;
char str[32];
if (interactive_mode)
return;
if (log_access)
logbuf_open();
if ((fd = open(PID_FILE, O_CREAT | O_WRONLY | O_TRUNC, 0644)) < 0)
err_sys_quit(errfd, "ERROR: can't open pid file: open");
sprintf(str, "%d\n", (int)getpid());
if (write(fd, str, strlen(str)) != strlen(str))
err_sys_quit(errfd, "ERROR: can't write to pid file: write");
close(fd);
if ((fd = open(ERRORS_FILE, O_CREAT | O_WRONLY | O_APPEND, 0644)) < 0)
err_sys_quit(errfd, "ERROR: can't open error log file: open");
errfd = fd;
err_report(errfd, "INFO: starting the server...");
}
static void start_processes(void)
{
int i, status;
pid_t pid;
sigset_t mask, omask;
if (interactive_mode) {
my_index = 0;
my_pid = getpid();
return;
}
for (i = 0; i < vp_count; i++) {
if ((pid = fork()) < 0) {
err_sys_report(errfd, "ERROR: can't create process: fork");
if (i == 0)
exit(1);
err_report(errfd, "WARN: started only %d processes out of %d", i,
vp_count);
vp_count = i;
break;
}
if (pid == 0) {
my_index = i;
my_pid = getpid();
return;
}
vp_pids[i] = pid;
}
Signal(SIGTERM, wdog_sighandler);
Signal(SIGHUP, wdog_sighandler);
Signal(SIGUSR1, wdog_sighandler);
for ( ; ; ) {
if ((pid = wait(&status)) < 0) {
if (errno == EINTR)
continue;
err_sys_quit(errfd, "ERROR: watchdog: wait");
}
for (i = 0; i < vp_count; i++) {
if (vp_pids[i] == pid)
break;
}
sigemptyset(&mask);
sigaddset(&mask, SIGTERM);
sigaddset(&mask, SIGHUP);
sigaddset(&mask, SIGUSR1);
sigprocmask(SIG_BLOCK, &mask, &omask);
if (WIFEXITED(status))
err_report(errfd, "WARN: watchdog: process %d (pid %d) exited"
" with status %d", i, pid, WEXITSTATUS(status));
else if (WIFSIGNALED(status))
err_report(errfd, "WARN: watchdog: process %d (pid %d) terminated"
" by signal %d", i, pid, WTERMSIG(status));
else if (WIFSTOPPED(status))
err_report(errfd, "WARN: watchdog: process %d (pid %d) stopped"
" by signal %d", i, pid, WSTOPSIG(status));
else
err_report(errfd, "WARN: watchdog: process %d (pid %d) terminated:"
" unknown termination reason", i, pid);
if ((pid = fork()) < 0) {
err_sys_report(errfd, "ERROR: watchdog: can't create process: fork");
} else if (pid == 0) {
my_index = i;
my_pid = getpid();
return;
}
vp_pids[i] = pid;
sigprocmask(SIG_SETMASK, &omask, NULL);
}
}
static void wdog_sighandler(int signo)
{
int i, err;
err = errno;
for (i = 0; i < vp_count; i++) {
if (vp_pids[i] > 0)
kill(vp_pids[i], signo);
}
switch (signo) {
case SIGHUP:
err_report(errfd, "INFO: watchdog: caught SIGHUP");
if (log_access) {
logbuf_close();
logbuf_open();
}
close(errfd);
if ((errfd = open(ERRORS_FILE, O_CREAT | O_WRONLY | O_APPEND, 0644)) < 0)
err_sys_quit(STDERR_FILENO, "ERROR: watchdog: open");
break;
case SIGTERM:
err_report(errfd, "INFO: watchdog: caught SIGTERM, terminating");
unlink(PID_FILE);
exit(0);
case SIGUSR1:
err_report(errfd, "INFO: watchdog: caught SIGUSR1");
break;
default:
err_report(errfd, "INFO: watchdog: caught signal %d", signo);
}
errno = err;
}
static void install_sighandlers(void)
{
sigset_t mask;
int p[2];
if (pipe(p) < 0)
err_sys_quit(errfd, "ERROR: process %d (pid %d): can't create"
" signal pipe: pipe", my_index, my_pid);
if ((sig_pipe[0] = st_netfd_open(p[0])) == NULL ||
(sig_pipe[1] = st_netfd_open(p[1])) == NULL)
err_sys_quit(errfd, "ERROR: process %d (pid %d): can't create"
" signal pipe: st_netfd_open", my_index, my_pid);
Signal(SIGTERM, child_sighandler);
Signal(SIGHUP, child_sighandler);
Signal(SIGUSR1, child_sighandler);
sigemptyset(&mask);
sigaddset(&mask, SIGTERM);
sigaddset(&mask, SIGHUP);
sigaddset(&mask, SIGUSR1);
sigprocmask(SIG_UNBLOCK, &mask, NULL);
}
static void child_sighandler(int signo)
{
int err, fd;
err = errno;
fd = st_netfd_fileno(sig_pipe[1]);
if (write(fd, &signo, sizeof(int)) != sizeof(int))
err_sys_quit(errfd, "ERROR: process %d (pid %d): child's signal"
" handler: write", my_index, my_pid);
errno = err;
}
static void *process_signals(void *arg)
{
int signo;
for ( ; ; ) {
if (st_read(sig_pipe[0], &signo, sizeof(int),
ST_UTIME_NO_TIMEOUT) != sizeof(int))
err_sys_quit(errfd, "ERROR: process %d (pid %d): signal processor:"
" st_read", my_index, my_pid);
switch (signo) {
case SIGHUP:
err_report(errfd, "INFO: process %d (pid %d): caught SIGHUP,"
" reloading configuration", my_index, my_pid);
if (interactive_mode) {
load_configs();
break;
}
if (log_access) {
logbuf_flush();
logbuf_close();
logbuf_open();
}
close(errfd);
if ((errfd = open(ERRORS_FILE, O_CREAT | O_WRONLY | O_APPEND, 0644)) < 0)
err_sys_quit(STDERR_FILENO, "ERROR: process %d (pid %d): signal"
" processor: open", my_index, my_pid);
load_configs();
break;
case SIGTERM:
err_report(errfd, "INFO: process %d (pid %d): caught SIGTERM,"
" terminating", my_index, my_pid);
if (log_access)
logbuf_flush();
exit(0);
case SIGUSR1:
err_report(errfd, "INFO: process %d (pid %d): caught SIGUSR1",
my_index, my_pid);
dump_server_info();
break;
default:
err_report(errfd, "INFO: process %d (pid %d): caught signal %d",
my_index, my_pid, signo);
}
}
return NULL;
}
static void *flush_acclog_buffer(void *arg)
{
for ( ; ; ) {
st_sleep(ACCLOG_FLUSH_INTERVAL);
logbuf_flush();
}
return NULL;
}
static void start_threads(void)
{
long i, n;
if (log_access && st_thread_create(flush_acclog_buffer, NULL, 0, 0) == NULL)
err_sys_quit(errfd, "ERROR: process %d (pid %d): can't create"
" log flushing thread", my_index, my_pid);
for (i = 0; i < sk_count; i++) {
err_report(errfd, "INFO: process %d (pid %d): starting %d threads"
" on %s:%u", my_index, my_pid, max_wait_threads,
srv_socket[i].addr, srv_socket[i].port);
WAIT_THREADS(i) = 0;
BUSY_THREADS(i) = 0;
RQST_COUNT(i) = 0;
for (n = 0; n < max_wait_threads; n++) {
if (st_thread_create(handle_connections, (void *)i, 0, 0) != NULL)
WAIT_THREADS(i)++;
else
err_sys_report(errfd, "ERROR: process %d (pid %d): can't create"
" thread", my_index, my_pid);
}
if (WAIT_THREADS(i) == 0)
exit(1);
}
}
static void *handle_connections(void *arg)
{
st_netfd_t srv_nfd, cli_nfd;
struct sockaddr_in from;
int fromlen;
long i = (long) arg;
srv_nfd = srv_socket[i].nfd;
fromlen = sizeof(from);
while (WAIT_THREADS(i) <= max_wait_threads) {
cli_nfd = st_accept(srv_nfd, (struct sockaddr *)&from, &fromlen,
ST_UTIME_NO_TIMEOUT);
if (cli_nfd == NULL) {
err_sys_report(errfd, "ERROR: can't accept connection: st_accept");
continue;
}
st_netfd_setspecific(cli_nfd, &from.sin_addr, NULL);
WAIT_THREADS(i)--;
BUSY_THREADS(i)++;
if (WAIT_THREADS(i) < min_wait_threads && TOTAL_THREADS(i) < max_threads) {
if (st_thread_create(handle_connections, (void *)i, 0, 0) != NULL)
WAIT_THREADS(i)++;
else
err_sys_report(errfd, "ERROR: process %d (pid %d): can't create"
" thread", my_index, my_pid);
}
handle_session(i, cli_nfd);
st_netfd_close(cli_nfd);
WAIT_THREADS(i)++;
BUSY_THREADS(i)--;
}
WAIT_THREADS(i)--;
return NULL;
}
static void dump_server_info(void)
{
char *buf;
int i, len;
if ((buf = malloc(sk_count * 512)) == NULL) {
err_sys_report(errfd, "ERROR: malloc failed");
return;
}
len = sprintf(buf, "\n\nProcess #%d (pid %d):\n", my_index, (int)my_pid);
for (i = 0; i < sk_count; i++) {
len += sprintf(buf + len, "\nListening Socket #%d:\n"
"-------------------------\n"
"Address %s:%u\n"
"Thread limits (min/max) %d/%d\n"
"Waiting threads %d\n"
"Busy threads %d\n"
"Requests served %d\n",
i, srv_socket[i].addr, srv_socket[i].port,
max_wait_threads, max_threads,
WAIT_THREADS(i), BUSY_THREADS(i), RQST_COUNT(i));
}
write(STDERR_FILENO, buf, len);
free(buf);
}
void handle_session(long srv_socket_index, st_netfd_t cli_nfd)
{
static char resp[] = "HTTP/1.0 200 OK\r\nContent-type: text/html\r\n"
"Connection: close\r\n\r\n<H2>It worked!</H2>\n";
char buf[512];
int n = sizeof(resp) - 1;
struct in_addr *from = st_netfd_getspecific(cli_nfd);
if (st_read(cli_nfd, buf, sizeof(buf), SEC2USEC(REQUEST_TIMEOUT)) < 0) {
err_sys_report(errfd, "WARN: can't read request from %s: st_read",
inet_ntoa(*from));
return;
}
if (st_write(cli_nfd, resp, n, ST_UTIME_NO_TIMEOUT) != n) {
err_sys_report(errfd, "WARN: can't write response to %s: st_write",
inet_ntoa(*from));
return;
}
RQST_COUNT(srv_socket_index)++;
}
void load_configs(void)
{
err_report(errfd, "INFO: process %d (pid %d): configuration loaded",
my_index, my_pid);
}
void logbuf_open(void)
{
}
void logbuf_flush(void)
{
}
void logbuf_close(void)
{
}
static void Signal(int sig, void (*handler)(int))
{
struct sigaction sa;
sa.sa_handler = handler;
sigemptyset(&sa.sa_mask);
sa.sa_flags = 0;
sigaction(sig, &sa, NULL);
}
static int cpu_count(void)
{
int n;
#if defined (_SC_NPROCESSORS_ONLN)
n = (int) sysconf(_SC_NPROCESSORS_ONLN);
#elif defined (_SC_NPROC_ONLN)
n = (int) sysconf(_SC_NPROC_ONLN);
#elif defined (HPUX)
#include <sys/mpctl.h>
n = mpctl(MPC_GETNUMSPUS, 0, 0);
#else
n = -1;
errno = ENOSYS;
#endif
return n;
}