一、进程间通信介绍
1.1、进程间通信的概念和意义
为什么要进行进程间通信?
1.2、如何进行进程间通讯及其本质
怎么办?
一般规律
具体做法
进程间通信的本质:让不同的进程看到同一份资源(一般由OS提供)
二、管道
2.1管道介绍
什么是管道?
open("log.txt",w);
open("log.txt",r);
一个文件打开两次,那么在操作系统中会有2个struct file 但是这两个struct file指向同一个缓冲区
若父进程3为读端,4为写端,子进程也一样。那么子进程写入,父进程读取缓冲区内容,这是父子进程看到了同一块资源。
管道只能被设计成单向通信
如:子进程为写(writer,关掉读端) 父进程为读(reader,关掉写端) 当子进程关掉读端/父进程关掉写端对应的struct file没有释放掉,说明 struct file有引用计数(记录多少指针指向我) 当引用计数为0才释放。struct file是允许多个进程通过指针指向的。
为什么父进程最开始用rw方式打开同一个文件呢? 如果只以r方式打开的话,子进程拷贝完后就也是r;父进程只以w打开,子进程拷贝完也只是w
3.2匿名管道
匿名管道:就是没有名字的文件
为了支持我们进行管道通信,OS提供系统调用pipe()
3.3匿名管道代码
通过系统调用接口创建一个匿名管道
#include <iostream>
#include <cerrno>
#include <cstring>
#include <unistd.h>
using namespace std;
int main()
{
int pipefd[2];
int ret = pipe(pipefd); // 一.创建管道
if(ret < 0)
{
cerr << errno << ": " << strerror(errno) << endl;
}
cout << "pipefd[0]: " << pipefd[0] << endl; // 3
cout << "pipefd[1]: " << pipefd[1] << endl; // 4
return 0;
}
然后就可以创建子进程,关闭不需要的读端或写端
#include <iostream>
#include <cerrno>
#include <cstring>
#include <unistd.h>
using namespace std;
int main()
{
int pipefd[2];
int ret = pipe(pipefd); // 一.创建管道
if(ret < 0)
{
cerr << errno << ": " << strerror(errno) << endl;
}
pid_t id = fork(); // 二.创建子进程
assert(id != -1);
if(id == 0)
{
//子进程 关掉读端,只写
close(pipefd[0]);
exit(1);
}
//父进程
//关掉写端,只读
close(pipefd[1]);
close(pipefd[0]); // 父进程,只写,关闭读
return 0;
}
这时父子进程已经可以看到同一份资源,可以开始通信了
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<unistd.h>
#include<sys/types.h>
#include<sys/wait.h>
#include<assert.h>
void writer(int wfd)
{
const char* str = "我是子进程,o.O,我在给你发消息";
char buffer[128];
int cnt = 0;
pid_t pid = getpid();
while(1)
{
snprintf(buffer,sizeof(buffer),"message:%s,pid:%d,count:%d\n",str,pid,cnt);
write(wfd, buffer, strlen(buffer));
cnt++;
sleep(1);
}
close(wfd);
}
void reader(int rfd)
{
char buffer[1024];
int cnt = 10;
while(1)
{
size_t n = read(rfd,buffer,sizeof(buffer)-1);
if(n>0)
printf("父亲获得信息是: %s\n", buffer);
else
{
printf("缓冲区读完了,文件也读完了\n");
break;
}
cnt--;
if(cnt==0)
break;
}
close(rfd);
}
int main()
{
//创建管道
int pipefd[2];
int n = pipe(pipefd);
if(n<0)
return 1;
pid_t id = fork();
if(id == 0)
{
//子进程 关掉读端,只写
close(pipefd[0]);
writer(pipefd[1]);
exit(1);
}
//父进程
//关掉写端,只读
close(pipefd[1]);
reader(pipefd[0]);
int status = 0;
pid_t rid = waitpid(id, &status, 0);
if(rid == id)
printf("退出码为:%d,信号为:%d\n",WEXITSTATUS(status), status & 0x7f);
return 0;
}
匿名管道的一些读写现象以及对应的特性
按上面代码将子进程休眠上5s,那么在子进程休眠这段时间,父进程在等待子进程退出休眠(可以理解为管道内无数据)
写端一直写,读端一直不读或者很久读一次:若一次写入一个字符"A",每次写入时cnt++,执行后会发现当cnt=65536时不在写入(也就是写入65536个字节时)65536÷1024=64
在Ubuntu20.04操作系统下默认建立的管道大小为64KB;
管道内部被写满,父进程还没有读取的时候,那子进程要等到父进程来读它
对以上两种情况的总结:
由此推断出管道的两种特性:
若把父进程休眠时间改短一点,每次父进程读完后,子进程又能继续写入,在此过程中我们不难发现:无论写端写多少个,读端都能一次读完,由此我们发现管道的另一个特性:
当子进程写入10s后退出,而父进程一直读,且打印了返回值,10s后子进程关掉写文件描述符,此时返回值为0;若父进程退出,子进程会僵尸
若写端一直在写,而读端读一会就结束,关闭读文件描述符
由此可以得出管道另外的特性
有时候公共资源有可能被两个执行流共同访问,访问时会出现信息交叉、数据混乱等问题;由此我们要有一种特性:一段数据、一块空间或一种资源我们要么不访问、要访问就把它改完了,这种特性叫原子性。
3.3进程池
processpool.cc
#include <iostream>
#include <string>
#include <cstdlib>
#include <vector>
#include <unistd.h>
#include <ctime>
#include "task.hpp"
using namespace std;
enum
{
UsageError= 1,
ArgError,
PipeError
};
void Usage(const std::string &proc)
{
cout<<"Usage:"<<proc<<"sub_process_num"<<endl;
}
//用一个类封装管道
class Channel
{
public:
Channel(int wfd,pid_t sub_id,const std::string &name)//构造
:_wfd(wfd)
,_sub_process_id(sub_id)
,_name(name)
{}
void PrintDebug()
{
cout << "_wfd: " << _wfd;
cout << ",_sub_process_id: " << _sub_process_id;
cout << ", _name: " << _name << endl;
}
string name() {return _name;}
int wfd() {return _wfd;}
pid_t pid() { return _sub_process_id; }
~Channel()//析构
{
}
private:
int _wfd;//父进程通过此向channel写东西
pid_t _sub_process_id;//记录子进程
string _name;//channel名字
};
//将冗长的创建子进程封装一下
class ProcessPool
{
public:
ProcessPool(int sub_process_num) //构造
: _sub_process_num(sub_process_num)
{}
int CreateProcess(work_t work) // 回调函数
{
for (int number = 0; number < _sub_process_num; number++)
{
int pipefd[2]{0};
int n = pipe(pipefd);
if (n < 0)
return PipeError;
pid_t id = fork();
if (id == 0)
{
// child -> r
close(pipefd[1]);
// 执行任务
dup2(pipefd[0], 0);
work();
exit(0);
}
string cname = "channel-" + to_string(number);
// father
close(pipefd[0]);
channels.push_back(Channel(pipefd[1], id, cname));
}
return 0;
}
int NextChannel()
{
static int next = 0;
int c = next;
next++;
next %= channels.size();
return c;
}
void SendTaskCode(int index, uint32_t code)
{
cout << "send code: " << code << " to " << channels[index].name() << " sub prorcess id: " << channels[index].pid() << endl;
write(channels[index].wfd(), &code, sizeof(code));
}
void Debug()
{
for (auto &channel : channels)
{
channel.PrintDebug();
}
}
~ProcessPool()
{
}
private:
int _sub_process_num;
vector<Channel> channels;
};
int main(int argc ,char* argv[])
{
if(argc!=2)
{
Usage(argv[0]);
return UsageError;
}
int sub_process_num = std::stoi(argv[1]);//把进程数转整型
if(sub_process_num == 0)
return ArgError;
//vector<Channel> channels;
//把所有的channel(管道)push到vector中,那么对管道的管理就会变成对vector的增删查改
//create process
// for(int num=0;num<sub_process_num;num++)
// {
// int pipefd[2]{0};
// int n = pipe(pipefd);
// if(n<0)
// return PipeError;
// pid_t id = fork();
// if(id == 0)//子进程
// {
// close(pipefd[1]);
// sleep(1);
// exit(0);
// }
// string cname = "channel-"+to_string(num);
// //父进程
// close(pipefd[0]);
// channels.push_back(Channel(pipefd[1],id,cname));
// }
ProcessPool *proc_ptr = new ProcessPool(sub_process_num);
proc_ptr->CreateProcess(worker);
//控制子进程
// for(auto& e:channels)
// {
// e.PrintDebug();
// }
while(1)
{
// a. 选择一个进程和通道
int channel = proc_ptr->NextChannel();
// cout << channel.name() << endl;
// b. 你要选择一个任务
uint32_t code = NextTask();
// c. 发送任务
proc_ptr->SendTaskCode(channel, code);
sleep(1);
}
//回收、等待子进程
delete proc_ptr;
return 0;
}
task.hpp
#include <iostream>
#include <unistd.h>
using namespace std;
typedef void(*work_t)(); //函数指针类型
typedef void(*task_t)(); //函数指针类型
void PrintLog()
{
cout << "printf log task" << endl;
}
void ReloadConf()
{
cout << "reload conf task" << endl;
}
void ConnectMysql()
{
cout << "connect mysql task" << endl;
}
task_t tasks[3] = {PrintLog, ReloadConf, ConnectMysql};
uint32_t NextTask()
{
return rand() % 3;
}
void worker()
{
// 从0中读取任务即可!
while(true)
{
uint32_t command_code = 0;
ssize_t n = read(0, &command_code, sizeof(command_code));
if(n == sizeof(command_code))
{
if(command_code >= 3) continue;
tasks[command_code]();
}
cout << "I am worker: " << getpid() << endl;
sleep(1);
}
}
makefile
processpool:processpool.cc
g++ -o $@ $^ -std=c++11 -g
.PHONY:clean
clean:
rm -f processpool
三、命名管道
管道应用的一个限制就是只能在具有共同祖先(具有亲缘关系)的进程间通信。
如果我们想在不相关的进程之间交换数据,可以使用FIFO文件来做这项工作,它经常被称为命名管道。
命名管道是一种特殊类型的文件
man mkfifo:
命名管道可以从命令行上创建,命令行方法是使用下面这个命令
mkfifo filename
此时就成功地建立了一个命名管道,可以发现它的(文件类型)权限前面的字母是p(pipe),而目录的文件类型是d(directory)。命名管道文件类型是p,而且该文件还有inode,说明在磁盘上是真实存在的。
当磁盘中有了命名管道文件以后,两个进程将可以通过这个管道文件进行通信了,步骤和匿名管道非常相似。一个进程以写方式打开管道文件,另一个进程以读端方式打开管道文件。
直接写入的话可以发现会阻塞在这里
它需要被另一个进程读取
可以通过unlink或者rm删掉命名管道
系统调用mkfifo以及unlink
man 2 unlink