源码阅读顺序
mjpg_streamer/mjpg_streamer.c
->int main(int argc, char argv[])
mjpg_streamer/plugins/input_uvc.c
->input_init()
->intput_run()
*->cam_thread()
mjpg-streamer\plugins\output_http.c
->output_init()
->output_run()
->*server_thread()
->*client_thread()
源码结构分析流程图
这里借用一下韦东山老师画的流程图
**从这张流程图中,我们可以看出 *.so 的文件一共有7个 ,分别为三个输入插件(动态库):视频从文件读入(input_file);视频从摄像头读入(input_uvc);视频从网页读入(input_http)。 四个输出插件:视频输出到文件(output_file);视频输出到网页(output_http)。以input_uvc.so为例,通过dlopen()函数,调用该动态库的函数input_init() 完成摄像头的初始化,再创建一条线程,该线程负责完成视频流的读入,转化,导入到缓存区。 以output_http.so 为例 主进程main会通过dlopen,调用到该动态库的函数output_init来处理用户传入的参数,之后进入循环,等待input线程的请求连接,为每一对确认连接创建一个线程,该线程会读取缓存区中的视频流,然后将其输入到浏览器。 **
mjpg_streamer.c源码分析
下面我们来看看mjpg_streamer.c中主进程的源码
//为了不使篇幅过长,进行了一些删减
int main(int argc, char *argv[])
{
//char *input = "input_uvc.so --resolution 640x480 --fps 5 --device /dev/video0";
char *input[MAX_INPUT_PLUGINS];
char *output[MAX_OUTPUT_PLUGINS];
int daemon = 0, i;
size_t tmp = 0;
output[0] = "output_http.so --port 8080";//定义默认输出的插件和端口
global.outcnt = 0;
/* parameter parsing */
while(1) {
int option_index = 0, c = 0;
static struct option long_options[] = {
{"h", no_argument, 0, 0
},
{"help", no_argument, 0, 0},
{"i", required_argument, 0, 0},
{"input", required_argument, 0, 0},
{"o", required_argument, 0, 0},
{"output", required_argument, 0, 0},
{"v", no_argument, 0, 0},
{"version", no_argument, 0, 0},
{"b", no_argument, 0, 0},
{"background", no_argument, 0, 0},
{0, 0, 0, 0}
};
/*
getopt_long_only():
用于解析命令行选项
根据传入的参数,一个一个的在struct option 数组里面进行匹配,当匹配到相同的参数,返回在数组中的下标。如传入-i选项 ,argv中自然有i选项,就会和struct option 数组项进行匹配,找到时返回数组下标。option_index索引值为2.
*/
c = getopt_long_only(argc, argv, "", long_options, &option_index);
/* no more options to parse */
if(c == -1) break;
/* unrecognized option */
if(c == '?') {
help(argv[0]);
return 0;
}
switch(option_index) {
/* h, help */
case 0:
case 1:
help(argv[0]);
return 0;
break;
/* i, input */
case 2:
case 3:
input[global.incnt++] = strdup(optarg);/*在getopt_long_only函数中,把传入的-i选项对应的字符串"input_uvc.so -f 10 -r 320*240"赋给变量optarg,
而strdup()函数用于将变量optarg的值赋予指针input */
break;
/* o, output */ //这一块就是把数据流的出入流和输出流给定义,如果执行时,用户未指定则采用默认配置->开头定义处
case 4:
case 5:
output[global.outcnt++] = strdup(optarg);/*在getopt_long_only函数中,把传入的-o选项对应的字符串"output_uvc.so ......"赋给变量optarg,
而strdup()函数用于将变量optarg的值赋予指针output */
break;
/* v, version */
case 6:
case 7:
printf("MJPG Streamer Version: %s\n" \
"Compilation Date.....: %s\n" \
"Compilation Time.....: %s\n",
#ifdef SVN_REV
SVN_REV,
#else
SOURCE_VERSION,
#endif
__DATE__, __TIME__);
return 0;
break;
/* b, background, 参数传入-b选项时,让daemon=1;让程序在后台运行*/
case 8:
case 9:
daemon = 1;
break;
default:
help(argv[0]);
return 0;
}
}
openlog("MJPG-streamer ", LOG_PID | LOG_CONS, LOG_USER);
//openlog("MJPG-streamer ", LOG_PID|LOG_CONS|LOG_PERROR, LOG_USER);
syslog(LOG_INFO, "starting application");
/* fork to the background */
if(daemon) {
LOG("enabling daemon mode");
daemon_mode();
}
/* ignore SIGPIPE (send by OS if transmitting to closed TCP sockets) */
signal(SIGPIPE, SIG_IGN);/*对于某些进程,特别是服务器进程往往在请求到来时生成子进程处理请求。如果父进程不等待子进程结束,
子进程将成为僵尸进程(zombie)从而占用系统资源。如果父进程等待子进程结束,
将增加父进程的负担,影响服务器进程的并发性能。在Linux下可以简单地将 SIGCHLD信号的操作设为SIG_IGN。*/
/* register signal handler for <CTRL>+C in order to clean up */
/*
signal()函数是信号绑定,当我们按下 <CTRL>+C 时,则调用signal_handler()函数,做一些清理工作
*/
if(signal(SIGINT, signal_handler) == SIG_ERR) {
LOG("could not register signal handler\n");
closelog();
exit(EXIT_FAILURE);
}
/*
* messages like the following will only be visible on your terminal
* if not running in daemon mode
*/
#ifdef SVN_REV
LOG("MJPG Streamer Version: svn rev: %s\n", SVN_REV);
#else
LOG("MJPG Streamer Version.: %s\n", SOURCE_VERSION);
#endif
/* check if at least one output plugin was selected */
if(global.outcnt == 0) {
/* no? Then use the default plugin instead */
global.outcnt = 1;
}
/* open input plugin */
for(i = 0; i < global.incnt; i++) {
/* this mutex and the conditional variable are used to synchronize access to the global picture buffer */
if(pthread_mutex_init(&global.in[i].db, NULL) != 0) {
LOG("could not initialize mutex variable\n");
closelog();
exit(EXIT_FAILURE);
}
if(pthread_cond_init(&global.in[i].db_update, NULL) != 0) {
LOG("could not initialize condition variable\n");
closelog();
exit(EXIT_FAILURE);
}
tmp = (size_t)(strchr(input[i], ' ') - input[i]);
global.in[i].stop = 0;
global.in[i].buf = NULL;
global.in[i].size = 0;
global.in[i].plugin = (tmp > 0) ? strndup(input[i], tmp) : strdup(input[i]);
global.in[i].handle = dlopen(global.in[i].plugin, RTLD_LAZY);//打开 "input_uvc.so" 这个动态链接库,返回连接这个动态库的句柄(**比较关键的一句)
//global.in.init等于 刚打开的动态链接库(input_uvc.c)的input_init函数
global.in[i].init = dlsym(global.in[i].handle, "input_init");
if(global.in[i].init == NULL) {
LOG("%s\n", dlerror());
exit(EXIT_FAILURE);
}
//input_stop,global.in.stop等于 刚打开的动态链接库(input_uvc.c)的input_stop函数
global.in[i].stop = dlsym(global.in[i].handle, "input_stop");
if(global.in[i].stop == NULL) {
LOG("%s\n", dlerror());
exit(EXIT_FAILURE);
}
//input_run,global.in.run等于 刚打开的动态链接库(input_uvc.c)的input_run函数
global.in[i].run = dlsym(global.in[i].handle, "input_run");
if(global.in[i].run == NULL) {
LOG("%s\n", dlerror());
exit(EXIT_FAILURE);
}
//input_cmd,global.in.cmd等于 刚打开的动态链接库(input_uvc.c)的input_cmd函数
/* try to find optional command */
global.in[i].cmd = dlsym(global.in[i].handle, "input_cmd");//命令参数的传入
global.in[i].param.parameters = strchr(input[i], ' ');
split_parameters(global.in[i].param.parameters, &global.in[i].param.argc, global.in[i].param.argv);
global.in[i].param.global = &global;
global.in[i].param.id = i;
if(global.in[i].init(&global.in[i].param, i)) {
LOG("input_init() return value signals to exit\n");
closelog();
exit(0);
}
}
/* open output plugin ,底下的这一段和上面的就一样了,是对输出的动态库的操作*/
for(i = 0; i < global.outcnt; i++) {
tmp = (size_t)(strchr(output[i], ' ') - output[i]);
global.out[i].plugin = (tmp > 0) ? strndup(output[i], tmp) : strdup(output[i]);
global.out[i].handle = dlopen(global.out[i].plugin, RTLD_LAZY);
global.out[i].init = dlsym(global.out[i].handle, "output_init");
if(global.out[i].init == NULL) {
LOG("%s\n", dlerror());
exit(EXIT_FAILURE);
}
global.out[i].stop = dlsym(global.out[i].handle, "output_stop");
if(global.out[i].stop == NULL) {
LOG("%s\n", dlerror());
exit(EXIT_FAILURE);
}
global.out[i].run = dlsym(global.out[i].handle, "output_run");
if(global.out[i].run == NULL) {
LOG("%s\n", dlerror());
exit(EXIT_FAILURE);
}
/* 将视频流读入缓存区*/
for(i = 0; i < global.incnt; i++) {
if(global.in[i].run(i)) {
LOG("can not run input plugin %d: %s\n", i, global.in[i].plugin);
closelog();
return 1;
}
}
/* 将视频流从缓存区取出并输出到浏览器*/
for(i = 0; i < global.outcnt; i++) {
syslog(LOG_INFO, "starting output plugin: %s (ID: %02d)", global.out[i].plugin, global.out[i].param.id);
global.out[i].run(global.out[i].param.id);
}
/* wait for signals */
pause();
return 0;
}
总结一下,mjpg_streamer.c的main函数是整个程序的起点,通过dlopen调用其他模块的函数,完成了对输入,输出环境的初始化,并完成了视频流的读入和导出。而视频流在输入插件和输出插件的传输是通过读取同一块缓存区实现的
接下来我们来看看input_uvc.c这个摄像头输入流程序,具体做了那些工作:
*在int input_init(input_parameter param, int id)函数中,通过对参数解析后,根据解析的参数完成了初始化配置。在int input_run(void)中开辟了一块空间空间用来存放一帧视频数据,最后开辟一个线程,线程主体由函数void *cam_thread( void *arg )来实现。这也是该文件的重点,所以我们来着重讲一讲该函数
// 有删减,只留了比较重要的部分
void *cam_thread(void *arg)
{
context *pcontext = arg;
pglobal = pcontext->pglobal;
/* 当线程执行完后,会调用 cam_cleanup 来做些清理工作 */
pthread_cleanup_push(cam_cleanup, pcontext);
/* grab a frame */
if(uvcGrab(pcontext->videoIn) < 0) { /*
获得一帧数据:
MJPEG:则将一帧视频数据存放到 videoIn->tempbuffer 中
YUV:则将一帧数据存放到 videoIn->framebuffer 中
*/
IPRINT("Error grabbing frames\n");
exit(EXIT_FAILURE);
}
DBG("received frame of size: %d from plugin: %d\n", pcontext->videoIn->buf.bytesused, pcontext->id);
/*
* Workaround for broken, corrupted frames:
* Under low light conditions corrupted frames may get captured.
* The good thing is such frames are quite small compared to the regular pictures.
* For example a VGA (640x480) webcam picture is normally >= 8kByte large,
* corrupted frames are smaller.
*/
if(pcontext->videoIn->buf.bytesused < minimum_size) {
DBG("dropping too small frame, assuming it as broken\n");//如果数据太小则认为是无效数据
continue;
}
/* copy JPG picture to global buffer */
pthread_mutex_lock(&pglobal->in[pcontext->id].db);
/*
* If capturing in YUV mode convert to JPEG now.
* This compression requires many CPU cycles, so try to avoid YUV format.
* Getting JPEGs straight from the webcam, is one of the major advantages of
* Linux-UVC compatible devices.
*/
if(pcontext->videoIn->formatIn == V4L2_PIX_FMT_YUYV) {/* 如果摄像头输出的视频数据为YUV格式,则执行该分支 */
/* 最终的MJPEG数据存放到 pglobal->buf 中 */
DBG("compressing frame from input: %d\n", (int)pcontext->id);
pglobal->in[pcontext->id].size = compress_yuyv_to_jpeg(pcontext->videoIn, pglobal->in[pcontext->id].buf, pcontext->videoIn->framesizeIn, gquality);
} else {
DBG("copying frame from input: %d\n", (int)pcontext->id);/*
如果摄像头输出的数据为MJPEG格式,则直接将它拷贝到 pglobal->buf 中
*/
pglobal->in[pcontext->id].size = memcpy_picture(pglobal->in[pcontext->id].buf, pcontext->videoIn->tmpbuffer, pcontext->videoIn->buf.bytesused);
}
return NULL;
}
在output_http.c中的output_init()和output_run()函数做的事情和input_uvc.c中是一样的,我们重点来看看server_thread()函数,在这个函数里我们会通过套接字搭建一个网络模型,当服务器与客服端建立连接后,将缓存区的视频发送给客服端(也就是浏览器),下面我们具体来看看函数实现
void *server_thread(void *arg)
{
int on;
pthread_t client;
struct addrinfo *aip, *aip2;
struct addrinfo hints;
struct sockaddr_storage client_addr;
socklen_t addr_len = sizeof(struct sockaddr_storage);
fd_set selectfds;
int max_fds = 0;
char name[NI_MAXHOST];
int err;
int i;
context *pcontext = arg;
pglobal = pcontext->pglobal;//***取出globals结构体变量,globals由主函数传入,并传入输入通道,输入通道和输出通道就是通过globals来共享数据的
/* set cleanup handler to cleanup ressources */
bzero(&hints, sizeof(hints));//对套接字进行一些初始化的设置
hints.ai_family = PF_UNSPEC;
hints.ai_flags = AI_PASSIVE;
hints.ai_socktype = SOCK_STREAM;
for(i = 0; i < MAX_SD_LEN; i++)
pcontext->sd[i] = -1;
// mjpeg-streamer的输出通道就是一个socket编程,在socket编程中充当一个服务器角色。
i = 0;
for(aip2 = aip; aip2 != NULL; aip2 = aip2->ai_next) {
if((pcontext->sd[i] = socket(aip2->ai_family, aip2->ai_socktype, 0)) < 0) {
continue;
}
if(bind(pcontext->sd[i], aip2->ai_addr, aip2->ai_addrlen) < 0) {//这里完成绑定哦!!,套接字的基本套路
perror("bind");
pcontext->sd[i] = -1;
continue;
}
if(listen(pcontext->sd[i], 10) < 0) {//监听客服端的连接请求,最多可以同时连接10个客服端
perror("listen");
pcontext->sd[i] = -1;
} else {
i++;
if(i >= MAX_SD_LEN) {
OPRINT("%s(): maximum number of server sockets exceeded", __FUNCTION__);
i--;
break;
}
}
}
for(i = 0; i < max_fds + 1; i++) {
if(pcontext->sd[i] != -1 && FD_ISSET(pcontext->sd[i], &selectfds)) {
pcfd->fd = accept(pcontext->sd[i], (struct sockaddr *)&client_addr, &addr_len);//用pcfd->fd 存放连接句柄(用来表示这对连接的描述符)
pcfd->pc = pcontext;
/* start new thread that will handle this TCP connected client */
DBG("create thread to handle client that just established a connection\n");
if(pthread_create(&client, NULL, &client_thread, pcfd) != 0) { /* 为每一个连接创建一个线程去维护 */
DBG("could not launch another client thread\n");
close(pcfd->fd);
free(pcfd);
continue;
}
pthread_detach(client);
}
}
}
DBG("leaving server thread, calling cleanup function now\n");
pthread_cleanup_pop(1);
return NULL;
}