2013-03-19 88 views
1

我正在开发一个将点对点通信转换为集体通信的项目。MPI点对点通信到集体通信:MPI_Scatterv问题

本质上,我想要做的是使用MPI_Scatterv而不是MPI_Send和MPI_Recv。我无法确定的是Scatterv的正确论据。

这里是我在我工作的功能:

void read_block_vector (
    char  *s,  /* IN - File name */ 
    void  **v,  /* OUT - Subvector */ 
    MPI_Datatype dtype, /* IN - Element type */ 
    int   *n,  /* OUT - Vector length */ 
    MPI_Comm  comm) /* IN - Communicator */ 
{ 
    int  datum_size; /* Bytes per element */ 
    int  i; 
    FILE  *infileptr; /* Input file pointer */ 
    int  local_els; /* Elements on this proc */ 
    MPI_Status status;  /* Result of receive */ 
    int  id;   /* Process rank */ 
    int  p;   /* Number of processes */ 
    int  x;   /* Result of read */ 

    datum_size = get_size (dtype); 
    MPI_Comm_size(comm, &p); 
    MPI_Comm_rank(comm, &id); 

    /* Process p-1 opens file, determines number of vector 
     elements, and broadcasts this value to the other 
     processes. */ 

    if (id == (p-1)) { 
     infileptr = fopen (s, "r"); 
     if (infileptr == NULL) *n = 0; 
     else fread (n, sizeof(int), 1, infileptr); 
    } 
    MPI_Bcast (n, 1, MPI_INT, p-1, comm); 
    if (! *n) { 
     if (!id) { 
     printf ("Input file '%s' cannot be opened\n", s); 
     fflush (stdout); 
     } 
    } 

    /* Block mapping of vector elements to processes */ 

    local_els = BLOCK_SIZE(id,p,*n); 

    /* Dynamically allocate vector. */ 

    *v = my_malloc (id, local_els * datum_size); 
    if (id == (p-1)) { 
     for (i = 0; i < p-1; i++) { 
     x = fread (*v, datum_size, BLOCK_SIZE(i,p,*n), 
      infileptr); 
     MPI_Send (*v, BLOCK_SIZE(i,p,*n), dtype, i, DATA_MSG, 
      comm); 
     } 
     x = fread (*v, datum_size, BLOCK_SIZE(id,p,*n), 
      infileptr); 
     fclose (infileptr); 
    } else { 
     MPI_Recv (*v, BLOCK_SIZE(id,p,*n), dtype, p-1, DATA_MSG, 
     comm, &status); 
    } 
// My Attempt at making this collective communication: 
if(id == (p-1)) 
    { 
     x = fread(*v,datum_size,*n,infileptr); 

     for(i = 0; i < p; i++) 
     { 
      size[i] = BLOCK_SIZE(i,p,*n); 

     } 
     //x = fread(*v,datum_size,BLOCK_SIZE(id, p, *n),infileptr); 
     fclose(infileptr); 
    } 
    MPI_Scatterv(v,send_count,send_disp, dtype, storage, size[id], dtype, p-1, comm); 

} 

任何帮助,将不胜感激。

谢谢

+0

我假设你已经在http://www.mcs.anl.gov/research/projects采取一看/mpi/www/www3/MPI_Scatterv.html(谷歌搜索的第一个结果)?你得到什么样的错误? – adamdunson 2013-03-28 01:55:35

+0

我确实看过那个。这非常有帮助。事实证明,我的错误实际上来自动态内存分配问题。我能够解决这些问题。 感谢您的帮助。 – user2188190 2013-03-28 18:53:27

回答

0

很容易,如果你发布一个小的,独立的,可重复的例子的人回答你的问题。

对于Scatterv,您需要提供要发送给每个进程的计数列表,这些进程似乎是您的size[]阵列以及要发送的数据中的位移。详细描述了Scatter vs Scatterv的机制in this answer。试图推断出所有变量和未提供的函数/宏所做的工作,下面的示例将文件分散到进程中。

但是请注意,如果您正在这样做,实际使用MPI-IO来直接协调文件访问并不困难,从而避免需要让一个进程首先读取所有数据。代码也提供。

#include <stdio.h> 
#include <stdlib.h> 
#include <mpi.h> 

int main(int argc, char **argv) { 

    int id, p; 
    int *block_size; 
    int datasize = 0; 

    MPI_Init(&argc, &argv); 

    MPI_Comm_size(MPI_COMM_WORLD, &p); 
    MPI_Comm_rank(MPI_COMM_WORLD, &id); 

    block_size = malloc(p * sizeof(int)); 
    for (int i=0; i<p; i++) { 
     block_size[i] = i + 1; 
     datasize += block_size[i]; 
    } 

    /* create file for reading */ 
    if (id == p-1) { 
     char *data = malloc(datasize * sizeof(char)); 
     for (int i=0; i<datasize; i++) 
      data[i] = 'a' + i; 

     FILE *f = fopen("data.dat","wb"); 
     fwrite(data, sizeof(char), datasize, f); 
     fclose(f); 

     printf("Initial data: "); 
     for (int i=0; i<datasize; i++) 
      printf("%c", data[i]); 
     printf("\n"); 
     free(data); 
    } 

    if (id == 0) printf("---Using MPI-Scatterv---\n"); 

    /* using scatterv */ 

    int local_els = block_size[id]; 
    char *v = malloc ((local_els + 1) * sizeof(char)); 
    char *all; 

    int *counts, *disps; 
    counts = malloc(p * sizeof(int)); 
    disps = malloc(p * sizeof(int)); 

    /* counts.. */ 
    for(int i = 0; i < p; i++) 
     counts[i] = block_size[i]; 

    /* and displacements (where the data starts within the send buffer) */ 
    disps[0] = 0; 
    for(int i = 1; i < p; i++) 
     disps[i] = disps[i-1] + counts[i-1]; 

    if(id == (p-1)) 
    { 
     all = malloc(datasize*sizeof(char)); 

     FILE *f = fopen("data.dat","rb"); 
     int x = fread(all,sizeof(char),datasize,f); 
     fclose(f); 
    } 

    MPI_Scatterv(all, counts, disps, MPI_CHAR, v, local_els, MPI_CHAR, p-1, MPI_COMM_WORLD); 

    if (id == (p-1)) { 
     free(all); 
    } 

    v[local_els] = '\0'; 
    printf("[%d]: %s\n", id, v); 

    /* using MPI I/O */ 

    fflush(stdout); 
    MPI_Barrier(MPI_COMM_WORLD); /* only for syncing output to screen */ 

    if (id == 0) printf("---Using MPI-IO---\n"); 

    for (int i=0; i<local_els; i++) 
     v[i] = 'X'; 

    /* create the file layout - the subarrays within the 1d array of data */ 
    MPI_Datatype myview; 
    MPI_Type_create_subarray(1, &datasize, &local_els, &(disps[id]), 
           MPI_ORDER_C, MPI_CHAR, &myview); 
    MPI_Type_commit(&myview); 

    MPI_File mpif; 
    MPI_Status status; 

    MPI_File_open(MPI_COMM_WORLD, "data.dat", MPI_MODE_RDONLY, MPI_INFO_NULL, &mpif); 

    MPI_File_set_view(mpif, (MPI_Offset)0, MPI_CHAR, myview, "native", MPI_INFO_NULL); 
    MPI_File_read_all(mpif, v, local_els, MPI_CHAR, &status); 

    MPI_File_close(&mpif); 
    MPI_Type_free(&myview); 

    v[local_els] = '\0'; 
    printf("[%d]: %s\n", id, v); 

    free(v); 
    free(counts); 
    free(disps); 

    MPI_Finalize(); 
    return 0; 
} 

运行这给(输出重新排序为清楚起见)

$ mpirun -np 6 ./foo 
Initial data: abcdefghijklmnopqrstu 
---Using MPI-Scatterv--- 
[0]: a 
[1]: bc 
[2]: def 
[3]: ghij 
[4]: klmno 
[5]: pqrstu 
---Using MPI-IO--- 
[0]: a 
[1]: bc 
[2]: def 
[3]: ghij 
[4]: klmno 
[5]: pqrstu 
+0

感谢您的耐心等待,并将我的注意力吸引到MPI-IO上。我会确保查看这一点。 您的代码对理解如何使用Scatterv非常有帮助。 事实证明,我的另一个主要问题是处理动态内存分配问题。我错误地将这些错误解释为我的Scatterv参数中的错误。 – user2188190 2013-03-28 18:57:40