-
Notifications
You must be signed in to change notification settings - Fork 2
/
mpi_par.c
133 lines (118 loc) · 4.11 KB
/
mpi_par.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <dirent.h>
#include <unistd.h>
#include <errno.h>
#include <mpi.h>
#include "util/queue.h"
#include "util/util.h"
#define FILE_NAME_BUF_SIZE 50
#define TAG_COMM_REQ_DATA 0
#define TAG_COMM_FILE_NAME 1
int main(int argc, char **argv)
{
MPI_Init(&argc, &argv);
int size, pid, p_name_len;
char p_name[MPI_MAX_PROCESSOR_NAME];
MPI_Comm_size(MPI_COMM_WORLD, &size);
MPI_Comm_rank(MPI_COMM_WORLD, &pid);
MPI_Get_processor_name(p_name, &p_name_len);
char files_dir[] = "./files/"; // TODO: This should be taken from argv
/* file outputs for processes */
// declare a file
FILE *outfile;
// open a file whose name is based on the pid
char buf[10];
snprintf(buf, 10, "./pout/f%d", pid);
outfile = fopen(buf, "w");
// outfile = stdout;
MPI_Request request;
MPI_Status status;
int recv_pid;
int recv_len = 0;
int count = 0;
int done = 0;
int file_count = 0;
int done_sent_p_count = 0;
struct Queue *file_name_queue;
if (pid == 0)
{
file_name_queue = createQueue();
file_count = get_file_list(file_name_queue, files_dir);
}
MPI_Barrier(MPI_COMM_WORLD);
while (!done)
{
/*
* idling process request data from master process (0th process is the master process)
*/
if (pid != 0)
{
fprintf(outfile, "requesting data to process %d from process 0\n", pid);
MPI_Send(&pid, 1, MPI_INT, 0, TAG_COMM_REQ_DATA, MPI_COMM_WORLD);
fprintf(outfile, "send finished\n");
}
else
{
fprintf(outfile, "process 0 is waiting for a request..\n");
MPI_Recv(&recv_pid, 1, MPI_INT, MPI_ANY_SOURCE,
TAG_COMM_REQ_DATA, MPI_COMM_WORLD, &status);
}
/*
* Master process sends file name to the idling process and dequeue the file name
* If all files are sent, then the master process sends only one character
* If slave process receives only 1 character for file name, it knows that
* master process has finished sending all the files to slave processes
*/
if (pid == 0)
{
fprintf(outfile, "process 0 received request from process %d\n", recv_pid);
// send back to the message received process
fprintf(outfile, "sending file name to recv_pid %d from process 0\n", recv_pid);
if (count < file_count)
{
MPI_Send(file_name_queue->front->line,
file_name_queue->front->len, MPI_CHAR, status.MPI_SOURCE,
TAG_COMM_FILE_NAME, MPI_COMM_WORLD);
deQueue(file_name_queue);
count++;
}
else
{
char end[] = "."; // send only 1 character - this indicates that there is no more work
MPI_Send(end, 1, MPI_CHAR, status.MPI_SOURCE,
TAG_COMM_FILE_NAME, MPI_COMM_WORLD);
done_sent_p_count++;
}
if (done_sent_p_count == size - 1)
{
done = 1;
}
}
else
{
fprintf(outfile, "receiving file name to process %d\n", pid);
char *file_name = (char *)malloc(sizeof(char) * FILE_NAME_BUF_SIZE);
MPI_Status status;
MPI_Recv(file_name, FILE_NAME_BUF_SIZE, MPI_CHAR, 0, TAG_COMM_FILE_NAME, MPI_COMM_WORLD, &status);
MPI_Get_count(&status, MPI_CHAR, &recv_len);
fprintf(outfile, "received file name [%s] to pid %d from process 0, recv len %d\n", file_name, pid, recv_len);
if (recv_len == 1)
{
done = 1;
}
else
{
// process file -- do work related to reading
}
}
fprintf(outfile, "pid: %d, done: %d\n", pid, done);
fflush(outfile);
}
MPI_Barrier(MPI_COMM_WORLD);
// this indicates the end of reading section of the MPI
MPI_Finalize();
return 0;
}