4、MPI的同步收发函数介绍及其编程模型
- MPI相关接口概述
- 头文件:每一个C/C++的MPI程序必须包含
"mpi.h"
头文件; - MPI中的函数特性:
- 都有相对统一的命名格式和返回类型:
int MPI_Xxxx(param1, ...);
,即以MPI开头,以下划线分隔,单词首字母大写; - 如果执行成功,则返回
MPI_SUCCESS
,失败则返回代表错误类型的错误代码。
- MPI程序结构
- MPI程序的基本框架如下:
#include "mpi.h"
int main( int argc, char *argv[] )
{
MPI_Init( &argc, &argv );
.....
.....
MPI_Finalize();
}
- MPI程序的第一条语句永远是:
MPI_Init( &argc, &argv );
- 默认的MPI“通信组”
- 在MPI规范中最常用的默认通信组称为:
MPI_COMM_WORLD
-
MPI_COMM_WORLD
包含了当前程序中使用额所有进程;
- 最常用的MPI函数
函数名 | 描述 |
MPI_Init(int *argc, char*argv[]) | 初始化MPI系统,必须是第一条语句; |
MPI_Finalize() | 终止MPI系统 |
MPI_Comm_size(MPI_Comm comm, int *size) | 返回通信组comm的大小,通过输出参数size返回。 |
MPI_Comm_rank(MPI_Comm comm, int *rank) | 返回当前通信组中正在调用的进程号,通过输出参数rank返回。 |
MPI_Send(void *buff, | 向通信组comm中的进程dest发送点对点消息,消息地址是buff,存放了count个type类型的数据,消息标签是tag。该函数是阻塞的,仅当消息被目标进程接收后返回。(这样可以确保安全的再次使用buff空间) |
MPI_Recv(void *buff, | 接收点对点消息。该消息必须是通信组comm中某一进程source发的,且标签值是tag。接收到的消息将会存在大小能存count个type类型的数据的buff中。当该函数退出时,退出状态存放于输出参数status中。例如可以通过status.MPI_TAG查看接收消息的标签,可以通过status.MPI_SOURCE查看消息的发送进程。当然,如果你对消息类型很明确的话不必关心接受消息的status的话,此时可以传一个NULL给status参数。该函数也是阻塞的,即它会一直等到期望的消息被接收后才会返回,否则会返回错误代码。 |
- 一个简单的hello world例子
- 注意:在一个通信组中,只有0号进程才会输出到终端,所以在通信组的其它进程中,不能使用printf打印函数!!!
- 本例中,通过0号进程发送hello消息给其他进程,其它进程收到后回复消息给0号进程。
- 代码如下:
/*
function name:mpi_test.c
*/
#include <stdio.h>
#include <string.h>
#include "mpi.h"
int main(int argc, char **argv)
{
char idstr[128];
char buff[128];
int numprocs;
int myid;
int i;
MPI_Status stat;
/*********************************************
* Begin program
*********************************************/
MPI_Init(&argc,&argv); // Initialize MPI group
MPI_Comm_size(MPI_COMM_WORLD, &numprocs); // Get # processors
MPI_Comm_rank(MPI_COMM_WORLD, &myid); // Get my rank (id)
if( myid == 0 )
{ // Master
printf("WE have %d processors\n", numprocs);
for( i = 1; i < numprocs; i++)
{
sprintf(buff, "Hello %d", i);
MPI_Send(buff, 128, MPI_CHAR, i, 0, MPI_COMM_WORLD);
}
for( i = 1; i < numprocs; i++)
{
MPI_Recv(buff, 128, MPI_CHAR, i, 0, MPI_COMM_WORLD, &stat);
printf("%s\n", buff);
}
}
else
{ // Slave
MPI_Recv(buff, 128, MPI_CHAR, 0, 0, MPI_COMM_WORLD, &stat);
sprintf(idstr, "Hello 0, Processor %d is present and accounted for !",myid);
MPI_Send(idstr, 128, MPI_CHAR, 0, 0, MPI_COMM_WORLD);
}
MPI_Finalize();
}
- 编译:
mpicc mpi_test.c -o mpi_test
,ubuntu下mpicc的安装命令为:sudo apt install mpich
- 运行:
mpirun -np 4 ./mpi_test
,
- -np选项指定MPI使用的处理器个数(计算机的cpu核数);
- 每个进程ID依次被赋以0,1,2,3。
- 但只有0号进程可以打印输出,其它是不能打印的
- 其它进程可以通过进程号(rank)来执行不同的语句。即
if (rank == xx) {...}
- 结果:
WE have 4 processors
Hello 0, Processor 1 is present and accounted for !
Hello 0, Processor 2 is present and accounted for !
Hello 0, Processor 3 is present and accounted for !
4.1 点对点阻塞发送/接收MPI函数
- 阻塞意味着函数会一直等待,直到任务(发送/接收)完成为止。
- MPI_Recv()只接受其参数中指定的进程号(source)发送的标签为指定的tag的消息。
- 如果你想接收任意进程的消息,需要将参数source指定为
MPI_ANY_SOURCE
。 - 如果你想接收source进程的任意标签的消息,需要将参数tag指定为
MPI_ANY_TAG
。
- 一个点对点阻塞收发的例子:
#include "mpi.h"
int main(int argc, char **argv)
{
char reply[100];
char buff[128];
int numprocs;
int myid;
int i;
MPI_Status stat;
MPI_Init(&argc,&argv); //必须是第一条语句!
MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
MPI_Comm_rank(MPI_COMM_WORLD, &myid);
/* -----------------------------------------
Master process(0号主进程)
----------------------------------------- */
if(myid == 0)
{
printf("WE have %d processors\n", numprocs);
/* -----------------------------------------
Master process: send msg with tag 1234
----------------------------------------- */
for(i=1; i < numprocs; i++)
{
sprintf(buff, "Hello %d", i);
MPI_Send(buff, 128, MPI_CHAR, i, 1234, MPI_COMM_WORLD);
}
/* ---------------------------------------------
Master process: wait for msg with tag 4444
--------------------------------------------- */
for(i=1;i < numprocs;i++)
{
MPI_Recv(buff, 128, MPI_CHAR, i, 4444, MPI_COMM_WORLD, &stat);
printf("%s\n",buff);
}
}
else //Slave process: receive msg with tag 1234
{
MPI_Recv(buff, 128, MPI_CHAR, 0, 1234, MPI_COMM_WORLD, &stat);
sprintf(reply," |--> Hello 0, Processor %d is present and accounted for !",myid);
strcat(buff, reply);
/* --------------------------------------------
Slave process: send back msg with tag 4444
-------------------------------------------- */
MPI_Send(buff, 128, MPI_CHAR, 0, 4444, MPI_COMM_WORLD);
}
MPI_Finalize();
}
- 分析:
- 改变发送消息的tag值为1111,接收进程将不会接收到消息!
- 将接收函数的消息标签改为MPI_ANY_TAG后,又会接收到消息了!
- MPI_Status数据结构
- 该结构用于接收函数返回消息接收情况
- MPI_Status至少包含如下数据域
数据域 | 作用 |
MPI_SOURCE | 消息发送者的进程号(rank),整形 |
MPI_TAG | 消息标签(整形) |
MPI_ERROR | 错误代码(整形) |
- 其它数据与为MPI系统保留的,用户不可以直接访问
- MPI中的数据类型符号
- MPI符号常数包含了C语言中常用的数据类型
C/C++数据类型 | 相对应的MPI符号常量 |
char | MPI_CHAR |
int | MPI_INT |
float | MPI_FLOAT |
double | MPI_DOUBLE |
- 发送和接收使用不同的数据类型的例子
- 进程1发送4个字符到进程0,而进程0当作1个4字节整形数据来接收;
int main(int argc, char **argv)
{
char in[4]; // Send 4 characters
int out; // Interprete as an integer
int numprocs;
int myid;
int i;
MPI_Init(&argc,&argv);
MPI_Comm_size(MPI_COMM_WORLD,&numprocs);
MPI_Comm_rank(MPI_COMM_WORLD,&myid);
if(myid == 0)
{
MPI_Recv(&out, 1, MPI_INT, 1, 0, MPI_COMM_WORLD, NULL);
printf("Received this number from proc 1: %d\n",out);
}
else if ( myid == 1 )
{
in[0] = '2'; in[1] = 1; in[2] = 0; in[3] = 0;
MPI_Send(in, 4, MPI_CHAR, 0, 0, MPI_COMM_WORLD);
}
MPI_Finalize();
}
- 我的电脑在运行时提示
MPI_Recv(&out, 1, MPI_INT, 1, 0, MPI_COMM_WORLD, NULL);
中的状态参数不能为NULL,所有你有相同情况发生的话,可以在程序中定义一个状态变量,并将其地址传给它。 - 程序输出为:
Received this number from proc 1: 306
- 因为我的CPU的字节序是小端存储(低地址存数据低位,高地址存数据高位)
- 所以0x[00] [00] [01] [32] = 306
- MPI并发编程应用举例——计算PI值
- 并行算法(采用积分法)
- 将计算任务分解为N个子任务,每个处理器(进程)负责一部分,包括主进程
- 从进程将计算出的自己那部分结果传给主进程
- 主进程接收各从进程的结果后,相加得到PI的值
- 代码:
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include "mpi.h"
double f(double a)
{
return( 2.0 / sqrt(1 - a*a) );
}
int main(int argc, char *argv[])
{
int N,num_procs; // Number of intervals
double w, x; // width and x point
int i, myid;
double mypi, others_pi;
MPI_Status st;
MPI_Init(&argc,&argv); // Initialize
MPI_Comm_size(MPI_COMM_WORLD, &num_procs); // Get # processors
MPI_Comm_rank(MPI_COMM_WORLD, &myid);
N = atoi(argv[1]);
w = 1.0/(double) N;
mypi = 0.0;
for (i = myid; i < N; i = i + num_procs) //sub_task for processors
{
x = w*(i + 0.5);
mypi = mypi + w*f(x);
}
/* ----------------------------------------------------
Proc 0 collects and others send data to proc 0
---------------------------------------------------- */
if ( myid == 0 )
{
for (i = 1; i < num_procs; i++)
{
MPI_Recv(&others_pi, 1, MPI_DOUBLE, i, 0, MPI_COMM_WORLD, &st);
mypi += others_pi;
}
printf("Pi = %f\n", mypi);
}
else
{
MPI_Send(&mypi, 1, MPI_DOUBLE, 0, 0, MPI_COMM_WORLD);
}
MPI_Finalize();
}
- 编译:
mpicc mpi_pi.c -o pi -lm
- 运行:
mpirun -np 4 ./pi 4
- 结果:Pi = 2.716621
- **注意:**上述代码中不存在内存共享,因为每一个MPI进程都会运行上述代码,所以都有它自己的变量(包括全局变量),且互不影响;
- 当在编程时并不知道消息发送者的rank及消息标志tag时,可以使用MPI系统的常数宏来代替:
MPI_Recv(buff, count, type, MPI_ANY_SOURCE, MPI_ANY_TAG, status);
- 当消息接收完成后,再通过状态变量status来分辨消息来源:
-
status.SOURCE
= source id of sender -
status.TAG
= tag of message
- 但在接收消息时,你必须知道消息的数据类型!!!
- 当在编程时并不知道消息长度时怎么办?
- 因为无法指定接收缓存区的大小,而无法使用接收函数;
- 为了解决上述问题,可以先使用探测函数来探测一下消息的到来:
-
MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
- **MPI_Probe()**将会阻塞等待指定的源进程和标签的消息的到来;
- 但它不会接收该消息,而是会根据消息去填充状态变量的值;
- 进而,根据以下3步来解决未知消息长度的问题:
- 等待消息的到来,利用**MPI_Probe()**函数得到消息的大小size;
- 利用**MPI_Get_count()**函数,得到消息的长度;
- 而后就可以分配一个大小合适的缓存区去接收该消息。
- 示例代码:
MPI_Status status;
int nbytes;
MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
// Allocate memory to receive data
MPI_Get_count(&status, MPI_CHAR, &nbytes);
if ( nbytes != MPI_UNDEFINED )
buff = malloc( nbytes ); // Allocate buffer to receive message
MPI_Recv(buff, nbytes, MPI_CHAR, status.SOURCE, status.TAG, MPI_COMM_WORLD, &status);
- 写在最后
如果你想传输任意格式的数据,你必须使用与实际数据格式近似的meta data类型,该方法类似与数据库中meta data使用相同,更多请参考meta data