0
点赞
收藏
分享

微信扫一扫

[Linux]----进程间通信之管道通信


文章目录

  • ​​前言​​
  • ​​一、进程间通信目的​​
  • ​​二、进程间通信发展​​
  • ​​三、进程间通信分类​​
  • ​​四、管道​​
  • ​​1. 匿名管道​​
  • ​​2. 管道内核代码​​
  • ​​3. 站在文件描述符角度-深度理解管道​​
  • ​​4. 站在内核角度-管道本质​​
  • ​​5. 管道的特征总结​​
  • ​​五、命名管道​​
  • ​​1. 创建命名管道​​
  • ​​总结​​

前言

首先我基于通信背景来带大家了解进程间通讯!!!

  1. 进程是具有独立性的!—>进程间想交互数据数据,成本会非常高!—>如果我们多进程需要协同处理一件事情
  2. 我们要明确一个点,进程独立了,不是彻底独立,有时候,我们需要双方能够进行一定程度的信息交互。

​正文开始!​

一、进程间通信目的

  • 数据传输:一个进程需要将他的数据发送给另一个进程
  • 资源共享:多个进程之间共享同样的资源
  • 通知事件:一个进程需要向另一个或一组进程发送消息,通知他们发生可某种事件(如进程终止是要通知父进程)
  • 进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道他的状态改变。

二、进程间通信发展

  • 管道
  • System V进程间通信
  • POSIX进程间通信

三、进程间通信分类

管道

  • 匿名管道pipe
  • 命名管道

System V IPC

  • System V 消息队列
  • System V 共享内存
  • System V 信号量

POSIX IPC

  • 消息队列
  • 共享内存
  • 信号量
  • 互斥量
  • 条件变量
  • 读写锁

POSIX的通讯我们在进程的学习过程中带大家了解!
本章主要给大家分享管道和共享内存来进行进程间通讯!

四、管道

1. 匿名管道

[Linux]----进程间通信之管道通信_子进程


进程A和进程B可以同时看到一份文件来通过某种方式(接下来会将,现在先大致的理解一下)来进行通信!

所以就要求我们在通讯之前,让不同的进程看到同一份资源(文件,内存块…)!

我们要学的进程间通信,不是告诉我们如何通信。而是如何让两个进程先看到同一份资源!

资源的不同决定了不同种类的通信方式!

管道是提供共享资源的一种手段。

[Linux]----进程间通信之管道通信_#include_02

2. 管道内核代码

[Linux]----进程间通信之管道通信_运维_03

管道的特点

  1. 大部分都是单向的!
  2. 所有的管道都是传输资源的—数据

所以进程间通信中的管道一定是单向的,为了传输数据的!!

3. 站在文件描述符角度-深度理解管道

[Linux]----进程间通信之管道通信_子进程_04


[Linux]----进程间通信之管道通信_子进程_05


[Linux]----进程间通信之管道通信_子进程_06


那么为什么父进程要分别打开读和写呢?->为了让子进程继承,让子进程不用在打开了

为什么父子要关闭对应的读写?->因为管道是单向通信的!!

谁决定,父子关闭什么读写?–>不是由管道本身决定的,由用户的需求决定的!

4. 站在内核角度-管道本质

为了实现管道的通信,内核提供了接口方便我们使用!

[Linux]----进程间通信之管道通信_运维_07


[Linux]----进程间通信之管道通信_#include_08

创建管道

#include<iostream>
#include<cstdio>
#include<unistd.h>
#include<cstring>

//演示pipe通信的基本过程-----匿名管道
using namespace std;
int main()
{
//1.创建管道
int pipefd[2]={0};
if(pipe(pipefd)!=0)
{
cerr<<"pipe error"<<endl;
return 1;
}
cout<<"fd[0]="<<pipefd[0]<<endl;
cout<<"fd[1]="<<pipefd[1]<<endl;
return 0;
}

[Linux]----进程间通信之管道通信_网络_09


因为文件描述符中0,1,2是被默认打开的,所以得到我们预期的结果3,4。

在pipefd[2]这个数组中,pipefd[0]是管道中的读端!pipefd[1]是管道中的写端!!

进行管道通信

#include<iostream>
#include<cstdio>
#include<unistd.h>
#include<cstring>
#include<sys/types.h>
#include<sys/wait.h>
#include<cmath>


//演示pipe通信的基本过程-----匿名管道
using namespace std;
#define NUM 1024
int main()
{
//1.创建管道
int pipefd[2]={0};
if(pipe(pipefd)!=0)
{
cerr<<"pipe error"<<endl;
return 1;
}
//2.创建子进程
pid_t id=fork();
if(id<0)
{
cerr<<"fork error"<<endl;
return 2;
}
else if(id==0)
{
//子进程
//让子进程进行读取,子进程就应该关掉写端
close(pipefd[1]);
char buffer[NUM];
while(true)
{
memset(buffer,0,sizeof(buffer));
ssize_t s=read(pipefd[0],buffer,sizeof(buffer)-1);
if(s>0)
{
//读取成功
buffer[s]='\0';
cout<<"子进程收到消息,内容是: "<<buffer<<endl;
}
else if(s==0)
{
cout<<"父进程写完了,我也退出啦"<<endl;
break;
}
else
{
//Do Nothing
}
}
close(pipefd[0]);
exit(0);
}
else
{
//父进程
//让父进程进行写入,就应该关掉读端!
close(pipefd[0]);
const char* msg="你好子进程,我是父进程,这次发送的信息标号是:";
int cnt=0;
while(cnt<5)
{
write(pipefd[1],sendBuffer,strlen(sendBuffer));
sleep(1);//这里是为了一会看现象明显
cnt++;
}
cout<<"父进程写完啦!"<<endl;
close(pipefd[1]);
}
pid_t ret=waitpid(id,nullptr,0);
if(ret>0)
{
cout<<"等待子进程成功"<<endl;
}
return 0;
}

[Linux]----进程间通信之管道通信_运维_10

因为我们让父进程发送五条信息,子进程收到五条信息后也就推出了!此时就完成了我们通过管道进行进程间通信了!!

对父进程代码稍作修改

[Linux]----进程间通信之管道通信_#include_11


[Linux]----进程间通信之管道通信_#include_12

在父进程中我们每次写入后都sleep(1);然后子进程读入的时候也是休眠1秒后在读取,所以我们可以发现:1.当父进程没有写入数据的时候,子进程会在等!所以,父进程写入之后,子进程才能read(会返回)到数据,子进程打印读取数据要以父进程的节奏为主!

所以父进程和子进程在读写的时候,是有一定顺序的!!

但是我们以前父子进程各自printf(向显示器写入)的时候,打印出来的语句是没有顺序的!(缺乏访问控制!)

所以我们可以得出结论:管道内部没有数据,reader就必须阻塞等待(read)。管道内部,如果数据被写满,writer就不必须阻塞等待(write);

阻塞等待的本质就是将当前进程的task_struct放入等待队列中!(R->S/D/T)

[Linux]----进程间通信之管道通信_#include_13


[Linux]----进程间通信之管道通信_#include_14

[Linux]----进程间通信之管道通信_子进程_15


[Linux]----进程间通信之管道通信_网络_16

所以,pipe内部自带访问控制机制!!

父进程指派给子进程任务

#include <iostream>
#include <cstdio>
#include <unistd.h>
#include <cstring>
#include <sys/types.h>
#include <sys/wait.h>
#include <cmath>
#include <cstdlib>
#include <assert.h>
#include <vector>
#include <unordered_map>

using namespace std;

typedef void (*functor)();
vector<functor> functors; //方法集合

// for debug
unordered_map<uint32_t, string> info;

void f1()
{
cout << "这是一个处理日志的任务,执行的进程 ID [" << getpid()
<< "],执行的时间是[" << time(nullptr) << "]\n\n" << endl;
}
void f2()
{
cout << "这是一个备份数据的任务,执行的进程 ID [" << getpid()
<< "],执行的时间是[" << time(nullptr) << "]\n\n" << endl;
}
void f3()
{
cout << "这是一个处理网络链接的任务,执行的进程 ID [" << getpid()
<< "],执行的时间是[" << time(nullptr) << "]\n\n" << endl;
}

void loadFunctor()
{
info.insert({functors.size(), "这是一个处理日志的任务"});
functors.push_back(f1);
info.insert({functors.size(), "这是一个备份数据的任务"});
functors.push_back(f2);
info.insert({functors.size(), "这是一个处理网络链接的任务"});
functors.push_back(f3);
}
int main()
{
// 0.加载任务列表
loadFunctor();

// 1.创建管道
int pipefd[2] = {0};
if (pipe(pipefd) != 0)
{
cerr << "pipe error" << endl;
return 1;
}

// 2.创建子进程
pid_t id = fork();
if (id < 0)
{
cerr << "fork error" << endl;
return 2;
}
else if (id == 0)
{
//子进程进行读操作---read

// 3.关闭不需要的文件fd
close(pipefd[1]);

// 4.业务处理
while (true)
{
uint32_t opeartorType = 0;
//如果有数据,就读取,如果没有数据,就阻塞等待,等待任务的到来
ssize_t s = read(pipefd[0], &opeartorType, sizeof(uint32_t));
if (s == 0)
{
cout << "我要退出啦,我是子进程,父进程都退出了!" << endl;
break;
}

assert(s == sizeof(uint32_t));
// assert断言,是变异有效的debug模式
// release模式下,断言也就没有了
//一旦断言没有了,s变量就只是被定义,没有被使用。release摸下中,可能会warning
(void)s;

if (opeartorType < functors.size())
{
functors[opeartorType]();
}
else
{
cerr << "bug opeartorType" << endl;
}
}
close(pipefd[0]);
exit(0);
}
else
{
srand((long long)time(nullptr));
//父进程进行写操作----write

// 3.关闭不需要的文件fd
close(pipefd[0]);
// 4.指派任务
uint32_t num = functors.size();
int cnt = 0;
while (cnt < 10)
{
// 5.形成任务码
int commandCode = rand() % num;
std::cout << "父进程指派任务完成,任务是: " << info[commandCode] << "任务的编号是: " << cnt << std::endl;
// 向指定的进程下达执行任务的操作
write(pipefd[1], &commandCode, sizeof(uint32_t));
sleep(1);
cnt++;
}
close(pipefd[1]);
pid_t ret = waitpid(id, nullptr, 0);
if (ret)
{
cout << "wait success" << endl;
}
}
}

[Linux]----进程间通信之管道通信_子进程_17

父进程通过给子进程发送不同的信号,让子进程完成不同的任务!!!

接下来如果通过父进程控制一批子进程呢??

[Linux]----进程间通信之管道通信_linux_18

#include <iostream>
#include <cstdio>
#include <unistd.h>
#include <cstring>
#include <sys/types.h>
#include <sys/wait.h>
#include <cmath>
#include <cstdlib>
#include <assert.h>
#include <vector>
#include <unordered_map>

using namespace std;

typedef void (*functor)();
vector<functor> functors; //方法集合

// for debug
unordered_map<uint32_t, string> info;

void f1()
{
cout << "这是一个处理日志的任务,执行的进程 ID [" << getpid()
<< "],执行的时间是[" << time(nullptr) << "]\n\n" << endl;
}
void f2()
{
cout << "这是一个备份数据的任务,执行的进程 ID [" << getpid()
<< "],执行的时间是[" << time(nullptr) << "]\n\n" << endl;
}
void f3()
{
cout << "这是一个处理网络链接的任务,执行的进程 ID [" << getpid()
<< "],执行的时间是[" << time(nullptr) << "]\n\n" << endl;
}

void loadFunctor()
{
info.insert({functors.size(), "这是一个处理日志的任务"});
functors.push_back(f1);
info.insert({functors.size(), "这是一个备份数据的任务"});
functors.push_back(f2);
info.insert({functors.size(), "这是一个处理网络链接的任务"});
functors.push_back(f3);
}

// int32_t:进程pid,int32_t:该进程对应的写端fd
typedef std::pair<int32_t, int32_t> elem;
int processNum = 5;

void work(int blockFd)
{
cout<<"进程 ["<<getpid()<<"] 开始工作!"<<endl;
//子进程核心工作的代码
while(true)
{
//a.阻塞等待 b.获取任务信息
uint32_t operatorCode=0;

ssize_t s=read(blockFd,&operatorCode,sizeof(uint32_t));

if(s==0)
break;
assert(s==sizeof(uint32_t));
(void)s;

//c.处理任务
if(operatorCode<functors.size())
{
functors[operatorCode]();
}
}
cout<<"进程 ["<<getpid()<<"] 结束工作!"<<endl;
}
//[子进程的pid,子进程的管道fd]
void blanceSendTask(const vector<elem>& assignMap)
{
srand((long long)time(nullptr));
while(true)
{
sleep(1);
//选择一个进程,选择进程是随机的,没有压着一个进程一直给任务
//较为均匀的将任务给所有子进程 ---负载均衡
uint32_t pick=rand()%assignMap.size();

//选择一个任务
uint32_t task=rand()%functors.size();

//把任务给一个指定的进程
write(assignMap[pick].second,&task,sizeof(uint32_t));

//打印对应的提示信息
cout<<"父进程指派任务->"<<info[task]<<" 给进程: "<<assignMap[pick].first<<" 编号是:"<<pick<<endl;
}
}
int main()
{
loadFunctor();
vector<elem> assignMap;
//创建processNum个子进程
for (int i = 0; i < processNum; i++)
{
//定义保存管道的fd的对象
int pipefd[2] = {0};
//创建管道
pipe(pipefd);
//创建子进程
pid_t id = fork();
if (id == 0)
{
//子进程读取,r
close(pipefd[1]);
//子进程执行
work(pipefd[0]);
close(pipefd[0]);
exit(0);
}
//父进程做的事情
close(pipefd[0]);
elem e(id,pipefd[1]);
assignMap.push_back(e);
}
cout<<"create all process success!"<<endl;
//父进程派发任务
blanceSendTask(assignMap);
//回收资源
for(int i=0;i<processNum;i++)
{
if(waitpid(assignMap[i].first,nullptr,0)>0)
{
cout<<"wait for:pid=["<<assignMap[i].first<<"] success!"<<" number: "<<i<<endl;
}
close(assignMap[i].second);
}
return 0;
}

[Linux]----进程间通信之管道通信_子进程_19

接下来回归命令行的’|'字符

[Linux]----进程间通信之管道通信_子进程_20

命令行’|',其实就是匿名管道!!

在这里我们使用匿名管道也可以让兄弟进程进行通信!

5. 管道的特征总结

  1. 管道只能用来进行具有血缘关系的进程之间,进行进程间通信。常用于父子通信
  2. 管道只能单向通信(内核的实现决定)—半双工的一种特殊情况
  3. 管道自带同步机制(pipe满,writer等,pipe空,reader等)—自带访问控制
  4. 管道是面向字节流的。----先写的字符,一定是先被读取的,管道内的数据没有格式边界,需要用户来定义区分内容的边界。
  5. 管道的生命周期—管道是文件—进程退出了,增加打开的文件会怎样?–退出,随进程

五、命名管道

那是不是只能父子(血缘)通信??—毫不相关的进程之间进行通信,可以吗???------命名管道!

[Linux]----进程间通信之管道通信_#include_21


[Linux]----进程间通信之管道通信_子进程_22

[Linux]----进程间通信之管道通信_运维_23

1. 创建命名管道

[Linux]----进程间通信之管道通信_运维_24

命名管道:通过一个FIFO文件–>有路径–>具有唯一性—>通过路径,找到同一个资源!

[Linux]----进程间通信之管道通信_linux_25


代码如下

“comm.h”

#pragma once
#include<iostream>
#include<string>
#include<cstdio>
#include<sys/types.h>
#include<sys/stat.h>
#include<fcntl.h>
#include<unistd.h>
#include<cerrno>
#include<cstring>

#define IPC_PATH "./.fifo"

“clientFifo.cpp”

//写入
#include"comm.h"
using namespace std;
#define NUM 1024
int main()
{
int pipeFd=open(IPC_PATH,O_WRONLY);
if(pipeFd<0)
{
cerr<<"open: "<<strerror(errno)<<endl;
return 1;
}
char line[NUM];
while(true)
{
printf("请输入你的消息# ");
fflush(stdout);
memset(line,0,sizeof(line));
if(fgets(line,sizeof(line),stdin)!=nullptr)
{
line[strlen(line)-1]='\0';
write(pipeFd,line,strlen(line));
}
else
{
break;
}
}
close(pipeFd);
cout<<"客户端退出啦!"<<endl;
unlink(IPC_PATH);
return 0;
}

“serverFifo.cpp”

//读取
#include"comm.h"
using namespace std;
#define NUM 1024

int main()
{
extern int error;
if(mkfifo(IPC_PATH,0666)!=0)
{
cerr<<"mkfifo error"<<endl;
return 1;
}
int pipeFd=open(IPC_PATH,O_RDONLY);
if(pipeFd<0)
{
cerr<<"open error"<<endl;
return 2;
}

//正常的通信过程
char buffer[NUM];
while(true)
{
ssize_t s=read(pipeFd,buffer,sizeof(buffer)-1);
if(s>0)
{
buffer[s]='\0';
cout<<"客户端->服务器# "<<buffer<<endl;
}
else if(s==0)
{
cout<<"客户端退出啦,我也退出吧!"<<endl;
break;
}
else
{
//do nothing
cout<<"read "<<strerror(errno)<<endl;
break;
}
}
close(pipeFd);
cout<<"服务端退出啦!"<<endl;
return 0;
}

[Linux]----进程间通信之管道通信_子进程_26


[Linux]----进程间通信之管道通信_#include_27

总结

(本章完!)


举报

相关推荐

0 条评论