test/mpi: add p2p benchmarks in test/mpi/bench

Add point-to-point benchmark code in MyDef. The tests have automatic
warm-ups and adjusts number of iterations for measurement accuracy.
It produces latency measurements with standard deviations and equivalent
bandwidths.

MYDEF_BOOT=[topsrc_dir]/modules/mydef_boot
export PATH=$MYDEF_BOOT/bin:$PATH
export PERL5LIB=$MYDEF_BOOT/lib/perl5
export MYDEFLIB=$MYDEF_BOOT/lib/MyDef

To run:
    mydef_page p2p_latency.def  # -> p2p_latency.c
    mpicc p2p_latency.c && mpi_run -n 2 ./a.out

Alternatively use mydef_run (uses settings from config):
    mydef_run p2p_latency.def

Next commit will add "make testing".
Esse commit está contido em:
Hui Zhou
2023-12-17 19:05:38 -06:00
commit 30f2bbd438
5 arquivos alterados com 219 adições e 0 exclusões
+3
Ver Arquivo
@@ -0,0 +1,3 @@
module: c
CC: mpicc
run: mpirun -n 2
+93
Ver Arquivo
@@ -0,0 +1,93 @@
/*
* bench_frame : boilerplate for mpi program
* measure(iter) : measures `tf_dur` for $(iter) iterations
* run_stat(N, var) : run N measurements and obtain (avg, std) in sum1, sum2
* warm_up(iter, dur): repeat until measurements (iter, dur) stabilize
* report_latency(msgsize, MULTIPLICITY) : print a line of latency result
*/
subcode: bench_frame
$include stdio
$include stdlib
$include mpi
$function main
MPI_Init(NULL, NULL);
$my grank, gsize: int
MPI_Comm_rank(MPI_COMM_WORLD, &grank);
MPI_Comm_size(MPI_COMM_WORLD, &gsize);
$(if:MIN_PROCS)
$if gsize < $(MIN_PROCS)
printf("! Test $(_pagename) requires $(MIN_PROCS) processes !\n");
return 1
MPI_Comm comm = MPI_COMM_WORLD;
char *buf = malloc(MAX_BUFSIZE)
$if !buf
printf("! Failed to allocate buffer (size=%d)\n", MAX_BUFSIZE)
return 1
$if grank == 0
printf("TEST $(_pagename):\n")
$call @report_header
$call main
$if grank == 0
printf("\n")
MPI_Finalize();
macros:
use_double: 1
#----------------------------------------
subcode: _autoload
$register_prefix(comm) MPI_Comm
subcode: foreach_size
$for int size = 0; size < $(MAX_MSG); size = (size==0)?1:size*2
$(set:MSG_SIZE=size)
BLOCK
subcode: measure(iter)
tf_start = MPI_Wtime()
$for 0:$(iter)
BLOCK
tf_dur = MPI_Wtime() - tf_start
subcode: run_stat(N, var)
$my double sum1=0, double sum2=0
$for 0:$(N)
BLOCK
sum1 += $(var)
sum2 += $(var) * $(var)
sum1 /= $(N)
sum2 /= $(N)
sum2 = sqrt(sum2 - sum1 * sum1)
subcode: warm_up(iter, dur)
$(set:MIN_ITER=(int) ($(iter) * 0.001 / $(dur)))
$(iter) = 2
$my double last_dur = 1.0
$my int num_best = 0
$while num_best < 10
BLOCK
$if $(iter) < $(MIN_ITER)
$(iter) = $(MIN_ITER)
num_best = 0
continue
# check that t_dur is no longer monotonically decreasing
$if $(dur) > last_dur
num_best++
last_dur = $(dur)
subcode: header_latency
printf("%12s %10s(us) %6s(us) %12s(MB/s)\n", "msgsize", "latency", "sigma", "bandwidth")
subcode: report_latency(MSGSIZE, MULTIPLICITY)
$my tf_latency, tf_sigma, tf_bw
tf_latency = sum1 / ($(MULTIPLICITY)) * 1e6
tf_sigma = sum2 / ($(MULTIPLICITY)) * 1e6
tf_bw = $(MSGSIZE) / tf_latency
printf("%12d %10.3f %6.3f %12.3f\n", $(MSGSIZE), tf_latency, tf_sigma, tf_bw)
+79
Ver Arquivo
@@ -0,0 +1,79 @@
/*
* Defines following functions:
* bench_p2p
* bench_send, bench_warmup
* bench_recv
*
* For each measurement -
* First sender tells receiver the `iter` parameter. `iter = 0` means to quit.
* For each iteration runs `send_side` and `recv_side` assuming the measurement on sender side represents a latency measurement.
*
* Caller page defines -
* subcode: sender_side, recv_side
* macro:
* MULTIPLICITY: divisor for each measurement
*/
macros:
MIN_PROCS: 2
MAX_BUFSIZE: 5000000 # 5 MB
subcode: _autoload
$register_name(src) int
$register_name(dst) int
$register_name(buf) void *
$register_name(size) int
$define TAG 0
$define SYNC_TAG 100
$define MAX_BUFSIZE 5000000
$define NUM_REPEAT 20
subcode: report_header
$call header_latency
fncode: bench_p2p(comm, src, dst, buf, size)
int rank;
MPI_Comm_rank(comm, &rank)
$(if:!MULTIPLICITY)
$(set:MULTIPLICITY=1)
$if rank == src
iter = bench_warmup(comm, dst, buf, size)
&call run_stat, NUM_REPEAT, tf_latency
tf_latency = bench_send(iter, comm, dst, buf, size)
tf_latency /= iter
$call report_latency, size, $(MULTIPLICITY)
$call send_stop
$elif rank == dst
bench_recv(comm, src, buf, size)
subcode: send_stop
iter = 0;
MPI_Send(&iter, 1, MPI_INT, dst, SYNC_TAG, comm)
#----------------------------------------
fncode: bench_send(int iter, comm, dst, buf, size)
# synchronize with receiver
MPI_Send(&iter, 1, MPI_INT, dst, SYNC_TAG, comm);
&call measure, iter
$call @send_side
return tf_dur
fncode: bench_recv(comm, src, buf, size)
$while 1
int iter;
# synchronize with sender */
MPI_Recv(&iter, 1, MPI_INT, src, SYNC_TAG, comm, MPI_STATUS_IGNORE);
$if iter == 0
# time to quit
break
$for i=0:iter
$call @recv_side
fncode: bench_warmup(comm, dst, buf, size): int
&call warm_up, iter, tf_dur
tf_dur = bench_send(iter, comm, dst, buf, size)
return iter
+26
Ver Arquivo
@@ -0,0 +1,26 @@
include: macros/bench_frame.def
include: macros/bench_p2p.def
subcode: _autoload
$define WINDOW_SIZE 64
page: p2p_bw, bench_frame
MULTIPLICITY: WINDOW_SIZE
data: buf, size, MPI_CHAR
$for int size = 1; size < MAX_BUFSIZE; size *= 2
bench_p2p(comm, 0, 1, buf, size)
subcode: send_side
$my MPI_Request reqs[WINDOW_SIZE]
$for j=0:WINDOW_SIZE
MPI_Isend($(data), dst, TAG, comm, &reqs[j])
MPI_Waitall(WINDOW_SIZE, reqs, MPI_STATUSES_IGNORE)
MPI_Recv(NULL, 0, MPI_DATATYPE_NULL, dst, TAG, comm, MPI_STATUS_IGNORE)
subcode: recv_side
$my MPI_Request reqs[WINDOW_SIZE]
$for j=0:WINDOW_SIZE
MPI_Irecv($(data), src, TAG, comm, &reqs[j])
MPI_Waitall(WINDOW_SIZE, reqs, MPI_STATUSES_IGNORE)
MPI_Send(NULL, 0, MPI_DATATYPE_NULL, src, TAG, comm)
+18
Ver Arquivo
@@ -0,0 +1,18 @@
include: macros/bench_frame.def
include: macros/bench_p2p.def
page: p2p_latency, bench_frame
MULTIPLICITY: 2
data: buf, size, MPI_CHAR
bench_p2p(comm, 0, 1, buf, 0)
$for int size = 1; size < MAX_BUFSIZE; size *= 2
bench_p2p(comm, 0, 1, buf, size)
subcode: send_side
MPI_Send($(data), dst, TAG, comm);
MPI_Recv($(data), dst, TAG, comm, MPI_STATUS_IGNORE);
subcode: recv_side
MPI_Recv($(data), src, TAG, comm, MPI_STATUS_IGNORE);
MPI_Send($(data), src, TAG, comm);