0
点赞
收藏
分享

微信扫一扫

基于消息传递的并发编程(MPI)之同步收发


4、MPI的同步收发函数介绍及其编程模型

  • MPI相关接口概述
  • 头文件:每一个C/C++的MPI程序必须包含​​"mpi.h"​​头文件;
  • MPI中的函数特性:
  • 都有相对统一的命名格式和返回类型:​​int MPI_Xxxx(param1, ...);​​,即以MPI开头,以下划线分隔,单词首字母大写;
  • 如果执行成功,则返回​​MPI_SUCCESS​​,失败则返回代表错误类型的错误代码。
  • MPI程序结构
  • MPI程序的基本框架如下:

#include "mpi.h"
int main( int argc, char *argv[] )
{
MPI_Init( &argc, &argv );
.....
.....
MPI_Finalize();
}

  • MPI程序的第一条语句永远是:​​MPI_Init( &argc, &argv );​
  • 默认的MPI“通信组”
  • 在MPI规范中最常用的默认通信组称为:​​MPI_COMM_WORLD​
  • ​MPI_COMM_WORLD​​包含了当前程序中使用额所有进程;
  • 最常用的MPI函数

函数名

描述

MPI_Init(int *argc, char*argv[])

初始化MPI系统,必须是第一条语句;

MPI_Finalize()

终止MPI系统

MPI_Comm_size(MPI_Comm comm, int *size)

返回通信组comm的大小,通过输出参数size返回。

MPI_Comm_rank(MPI_Comm comm, int *rank)

返回当前通信组中正在调用的进程号,通过输出参数rank返回。

MPI_Send(void *buff,
int count,
MPI_Datatype type,
int dest,
int tag,
int comm)

向通信组comm中的进程dest发送点对点消息,消息地址是buff,存放了count个type类型的数据,消息标签是tag。该函数是阻塞的,仅当消息被目标进程接收后返回。(这样可以确保安全的再次使用buff空间)

MPI_Recv(void *buff,
int count,
MPI_Datatype type,
int source,
int tag,
int comm,
MPI_Status *status)

接收点对点消息。该消息必须是通信组comm中某一进程source发的,且标签值是tag。接收到的消息将会存在大小能存count个type类型的数据的buff中。当该函数退出时,退出状态存放于输出参数status中。例如可以通过status.MPI_TAG查看接收消息的标签,可以通过status.MPI_SOURCE查看消息的发送进程。当然,如果你对消息类型很明确的话不必关心接受消息的status的话,此时可以传一个NULL给status参数。该函数也是阻塞的,即它会一直等到期望的消息被接收后才会返回,否则会返回错误代码。

  • 一个简单的hello world例子
  • 注意:在一个通信组中,只有0号进程才会输出到终端,所以在通信组的其它进程中,不能使用printf打印函数!!!
  • 本例中,通过0号进程发送hello消息给其他进程,其它进程收到后回复消息给0号进程。
  • 代码如下:

/*
function name:mpi_test.c
*/

#include <stdio.h>
#include <string.h>

#include "mpi.h"

int main(int argc, char **argv)
{
char idstr[128];
char buff[128];
int numprocs;
int myid;
int i;

MPI_Status stat;

/*********************************************
* Begin program
*********************************************/

MPI_Init(&argc,&argv); // Initialize MPI group

MPI_Comm_size(MPI_COMM_WORLD, &numprocs); // Get # processors
MPI_Comm_rank(MPI_COMM_WORLD, &myid); // Get my rank (id)

if( myid == 0 )
{ // Master
printf("WE have %d processors\n", numprocs);

for( i = 1; i < numprocs; i++)
{
sprintf(buff, "Hello %d", i);
MPI_Send(buff, 128, MPI_CHAR, i, 0, MPI_COMM_WORLD);
}

for( i = 1; i < numprocs; i++)
{
MPI_Recv(buff, 128, MPI_CHAR, i, 0, MPI_COMM_WORLD, &stat);
printf("%s\n", buff);
}
}
else
{ // Slave

MPI_Recv(buff, 128, MPI_CHAR, 0, 0, MPI_COMM_WORLD, &stat);

sprintf(idstr, "Hello 0, Processor %d is present and accounted for !",myid);

MPI_Send(idstr, 128, MPI_CHAR, 0, 0, MPI_COMM_WORLD);
}
MPI_Finalize();
}

  • 编译:​​mpicc mpi_test.c -o mpi_test​​,ubuntu下mpicc的安装命令为:​​sudo apt install mpich​
  • 运行:​​mpirun -np 4 ./mpi_test​​,
  • -np选项指定MPI使用的处理器个数(计算机的cpu核数);
  • 每个进程ID依次被赋以0,1,2,3。
  • 但只有0号进程可以打印输出,其它是不能打印的
  • 其它进程可以通过进程号(rank)来执行不同的语句。即​​if (rank == xx) {...}​
  • 结果:

WE have 4 processors
Hello 0, Processor 1 is present and accounted for !
Hello 0, Processor 2 is present and accounted for !
Hello 0, Processor 3 is present and accounted for !

4.1 点对点阻塞发送/接收MPI函数

  • 阻塞意味着函数会一直等待,直到任务(发送/接收)完成为止。
  • MPI_Recv()只接受其参数中指定的进程号(source)发送的标签为指定的tag的消息。
  • 如果你想接收任意进程的消息,需要将参数source指定为​​MPI_ANY_SOURCE​​。
  • 如果你想接收source进程的任意标签的消息,需要将参数tag指定为​​MPI_ANY_TAG​​。
  • 一个点对点阻塞收发的例子:

#include "mpi.h"

int main(int argc, char **argv)
{
char reply[100];
char buff[128];

int numprocs;
int myid;
int i;
MPI_Status stat;

MPI_Init(&argc,&argv); //必须是第一条语句!
MPI_Comm_size(MPI_COMM_WORLD, &numprocs);
MPI_Comm_rank(MPI_COMM_WORLD, &myid);

/* -----------------------------------------
Master process(0号主进程)
----------------------------------------- */
if(myid == 0)
{
printf("WE have %d processors\n", numprocs);

/* -----------------------------------------
Master process: send msg with tag 1234
----------------------------------------- */
for(i=1; i < numprocs; i++)
{
sprintf(buff, "Hello %d", i);
MPI_Send(buff, 128, MPI_CHAR, i, 1234, MPI_COMM_WORLD);
}

/* ---------------------------------------------
Master process: wait for msg with tag 4444
--------------------------------------------- */
for(i=1;i < numprocs;i++)
{
MPI_Recv(buff, 128, MPI_CHAR, i, 4444, MPI_COMM_WORLD, &stat);
printf("%s\n",buff);
}
}
else //Slave process: receive msg with tag 1234
{
MPI_Recv(buff, 128, MPI_CHAR, 0, 1234, MPI_COMM_WORLD, &stat);
sprintf(reply," |--> Hello 0, Processor %d is present and accounted for !",myid);
strcat(buff, reply);

/* --------------------------------------------
Slave process: send back msg with tag 4444
-------------------------------------------- */
MPI_Send(buff, 128, MPI_CHAR, 0, 4444, MPI_COMM_WORLD);
}
MPI_Finalize();
}

  • 分析:
  • 改变发送消息的tag值为1111,接收进程将不会接收到消息!
  • 将接收函数的消息标签改为MPI_ANY_TAG后,又会接收到消息了!
  • MPI_Status数据结构
  • 该结构用于接收函数返回消息接收情况
  • MPI_Status至少包含如下数据域

数据域

作用

MPI_SOURCE

消息发送者的进程号(rank),整形

MPI_TAG

消息标签(整形)

MPI_ERROR

错误代码(整形)

  • 其它数据与为MPI系统保留的,用户不可以直接访问
  • MPI中的数据类型符号
  • MPI符号常数包含了C语言中常用的数据类型

C/C++数据类型

相对应的MPI符号常量

char

MPI_CHAR

int

MPI_INT

float

MPI_FLOAT

double

MPI_DOUBLE

  • 发送和接收使用不同的数据类型的例子
  • 进程1发送4个字符到进程0,而进程0当作1个4字节整形数据来接收;

int main(int argc, char **argv)
{
char in[4]; // Send 4 characters
int out; // Interprete as an integer

int numprocs;
int myid;
int i;

MPI_Init(&argc,&argv);
MPI_Comm_size(MPI_COMM_WORLD,&numprocs);
MPI_Comm_rank(MPI_COMM_WORLD,&myid);

if(myid == 0)
{
MPI_Recv(&out, 1, MPI_INT, 1, 0, MPI_COMM_WORLD, NULL);
printf("Received this number from proc 1: %d\n",out);
}
else if ( myid == 1 )
{
in[0] = '2'; in[1] = 1; in[2] = 0; in[3] = 0;
MPI_Send(in, 4, MPI_CHAR, 0, 0, MPI_COMM_WORLD);
}

MPI_Finalize();
}

  • 我的电脑在运行时提示​​MPI_Recv(&out, 1, MPI_INT, 1, 0, MPI_COMM_WORLD, NULL);​​中的状态参数不能为NULL,所有你有相同情况发生的话,可以在程序中定义一个状态变量,并将其地址传给它。
  • 程序输出为:​​Received this number from proc 1: 306​
  • 因为我的CPU的字节序是小端存储(低地址存数据低位,高地址存数据高位)
  • 所以0x[00] [00] [01] [32] = 306
  • MPI并发编程应用举例——计算PI值
  • 并行算法(采用积分法)
  • 将计算任务分解为N个子任务,每个处理器(进程)负责一部分,包括主进程
  • 从进程将计算出的自己那部分结果传给主进程
  • 主进程接收各从进程的结果后,相加得到PI的值
  • 代码:

#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include "mpi.h"

double f(double a)
{
return( 2.0 / sqrt(1 - a*a) );
}

int main(int argc, char *argv[])
{
int N,num_procs; // Number of intervals
double w, x; // width and x point
int i, myid;
double mypi, others_pi;
MPI_Status st;

MPI_Init(&argc,&argv); // Initialize
MPI_Comm_size(MPI_COMM_WORLD, &num_procs); // Get # processors
MPI_Comm_rank(MPI_COMM_WORLD, &myid);

N = atoi(argv[1]);
w = 1.0/(double) N;
mypi = 0.0;

for (i = myid; i < N; i = i + num_procs) //sub_task for processors
{
x = w*(i + 0.5);
mypi = mypi + w*f(x);
}

/* ----------------------------------------------------
Proc 0 collects and others send data to proc 0
---------------------------------------------------- */
if ( myid == 0 )
{
for (i = 1; i < num_procs; i++)
{
MPI_Recv(&others_pi, 1, MPI_DOUBLE, i, 0, MPI_COMM_WORLD, &st);
mypi += others_pi;
}
printf("Pi = %f\n", mypi);
}
else
{
MPI_Send(&mypi, 1, MPI_DOUBLE, 0, 0, MPI_COMM_WORLD);
}

MPI_Finalize();
}

  • 编译:​​mpicc mpi_pi.c -o pi -lm​
  • 运行:​​ mpirun -np 4 ./pi 4​
  • 结果:Pi = 2.716621
  • **注意:**上述代码中不存在内存共享,因为每一个MPI进程都会运行上述代码,所以都有它自己的变量(包括全局变量),且互不影响;
  • 当在编程时并不知道消息发送者的rank及消息标志tag时,可以使用MPI系统的常数宏来代替:

MPI_Recv(buff, count, type, MPI_ANY_SOURCE, MPI_ANY_TAG, status);

  • 当消息接收完成后,再通过状态变量status来分辨消息来源:
  • ​status.SOURCE​​ = source id of sender
  • ​status.TAG ​​ = tag of message
  • 但在接收消息时,你必须知道消息的数据类型!!!
  • 当在编程时并不知道消息长度时怎么办?
  • 因为无法指定接收缓存区的大小,而无法使用接收函数;
  • 为了解决上述问题,可以先使用探测函数来探测一下消息的到来:
  • ​MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);​
  • **MPI_Probe()**将会阻塞等待指定的源进程和标签的消息的到来;
  • 但它不会接收该消息,而是会根据消息去填充状态变量的值;
  • 进而,根据以下3步来解决未知消息长度的问题:
  1. 等待消息的到来,利用**MPI_Probe()**函数得到消息的大小size;
  2. 利用**MPI_Get_count()**函数,得到消息的长度;
  3. 而后就可以分配一个大小合适的缓存区去接收该消息。
  • 示例代码:

MPI_Status status;
int nbytes;

MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);

// Allocate memory to receive data
MPI_Get_count(&status, MPI_CHAR, &nbytes);

if ( nbytes != MPI_UNDEFINED )
buff = malloc( nbytes ); // Allocate buffer to receive message

MPI_Recv(buff, nbytes, MPI_CHAR, status.SOURCE, status.TAG, MPI_COMM_WORLD, &status);

  • 写在最后
    如果你想传输任意格式的数据,你必须使用与实际数据格式近似的meta data类型,该方法类似与数据库中meta data使用相同,更多请参考​​meta data​​


举报

相关推荐

0 条评论