util: add mpir_async_things

A generic progress hook for all async things.
Esse commit está contido em:
Hui Zhou
2023-12-11 22:35:26 -06:00
commit 1b60cdf445
5 arquivos alterados com 153 adições e 1 exclusões
+1 -1
Ver Arquivo
@@ -3,7 +3,7 @@
## See COPYRIGHT in top-level directory
##
AM_CPPFLAGS += -I$(top_srcdir)/src/include
AM_CPPFLAGS += -I$(top_srcdir)/src/include -I$(top_srcdir)/src/util
include $(top_srcdir)/src/mpi/Makefile.mk
include $(top_srcdir)/src/util/Makefile.mk
+7
Ver Arquivo
@@ -7,6 +7,7 @@
#include "mpir_info.h"
#include "mpi_init.h"
#include <strings.h>
#include "mpir_async_things.h"
/*
=== BEGIN_MPI_T_CVAR_INFO_BLOCK ===
@@ -212,6 +213,9 @@ int MPII_Init_thread(int *argc, char ***argv, int user_required, int *provided,
mpi_errno = MPIR_Datatype_init_predefined();
MPIR_ERR_CHECK(mpi_errno);
mpi_errno = MPIR_Async_things_init();
MPIR_ERR_CHECK(mpi_errno);
if (MPIR_CVAR_DEBUG_HOLD) {
MPII_debugger_hold();
}
@@ -426,6 +430,9 @@ int MPII_Finalize(MPIR_Session * session_ptr)
mpi_errno = MPII_Coll_finalize();
MPIR_ERR_CHECK(mpi_errno);
mpi_errno = MPIR_Async_things_finalize();
MPIR_ERR_CHECK(mpi_errno);
/* Call the low-priority (post Finalize) callbacks */
MPII_Call_finalize_callbacks(0, MPIR_FINALIZE_CALLBACK_PRIO);
+1
Ver Arquivo
@@ -14,4 +14,5 @@ mpi_core_sources += \
src/util/mpir_netloc.c \
src/util/mpir_hwtopo.c \
src/util/mpir_nettopo.c \
src/util/mpir_async_things.c \
src/util/mpir_progress_hook.c
+80
Ver Arquivo
@@ -0,0 +1,80 @@
/*
* Copyright (C) by Argonne National Laboratory
* See COPYRIGHT in top-level directory
*/
#include "mpidimpl.h"
#include "mpir_async_things.h"
static MPIR_Async_thing *async_things_list;
static MPID_Thread_mutex_t async_things_mutex;
static int async_things_progress_hook_id;
int MPIR_Async_things_init(void)
{
int mpi_errno = MPI_SUCCESS;
int err;
MPID_Thread_mutex_create(&async_things_mutex, &err);
MPIR_Assert(err == 0);
mpi_errno = MPIR_Progress_hook_register(MPIR_Async_things_progress,
&async_things_progress_hook_id);
return mpi_errno;
}
int MPIR_Async_things_finalize(void)
{
int mpi_errno = MPI_SUCCESS;
int err;
MPID_Thread_mutex_destroy(&async_things_mutex, &err);
MPIR_Assert(err == 0);
mpi_errno = MPIR_Progress_hook_deregister(async_things_progress_hook_id);
return mpi_errno;
}
int MPIR_Async_things_add(int (*poll_fn) (MPIR_Async_thing * entry), void *state)
{
MPIR_Async_thing *entry = MPL_malloc(sizeof(MPIR_Async_thing), MPL_MEM_OTHER);
entry->poll_fn = poll_fn;
entry->state = state;
entry->new_entries = NULL;
MPID_THREAD_CS_ENTER(VCI, async_things_mutex);
bool was_empty = (async_things_list == NULL);
DL_APPEND(async_things_list, entry);
MPID_THREAD_CS_EXIT(VCI, async_things_mutex);
if (was_empty) {
MPIR_Progress_hook_activate(async_things_progress_hook_id);
}
return MPI_SUCCESS;
}
int MPIR_Async_things_progress(int *made_progress)
{
MPIR_Async_thing *entry, *tmp;
MPID_THREAD_CS_ENTER(VCI, async_things_mutex);
DL_FOREACH_SAFE(async_things_list, entry, tmp) {
int ret = entry->poll_fn(entry);
if (ret != MPIR_ASYNC_THING_NOPROGRESS) {
*made_progress = 1;
if (entry->new_entries) {
DL_CONCAT(async_things_list, entry->new_entries);
entry->new_entries = NULL;
}
if (ret == MPIR_ASYNC_THING_DONE) {
DL_DELETE(async_things_list, entry);
MPL_free(entry);
if (async_things_list == NULL) {
MPIR_Progress_hook_deactivate(async_things_progress_hook_id);
}
}
}
}
MPID_THREAD_CS_EXIT(VCI, async_things_mutex);
return MPI_SUCCESS;
}
+64
Ver Arquivo
@@ -0,0 +1,64 @@
/*
* Copyright (C) by Argonne National Laboratory
* See COPYRIGHT in top-level directory
*/
#ifndef MPIR_ASYNC_THINGS_H_INCLUDED
#define MPIR_ASYNC_THINGS_H_INCLUDED
/* Async_things is the most general interface for managing asynchronous progress.
* The async things are a collection items that needs progress. Each
* "thing" is opaque with all the logic hidden inside the "poll_fn"
* callback. The callback is passed with the entry pointer itself, thus
* it can -
* * update the entry as a state machine.
* * cleanup the entry states as it progresses.
* * return a new list of new entries to be progressed.
* * return a done state for the entry to be removed.
*
* Each async things are independent items may be progressed in any order.
* A single MPI_Test may progress all or just a few of the items. We only promise
* to progress all items eventually after repeated MPI_Test calls.
*/
/* poll_fn return following states. */
enum {
MPIR_ASYNC_THING_NOPROGRESS = 0,
MPIR_ASYNC_THING_UPDATED = 1,
MPIR_ASYNC_THING_DONE = 2,
};
typedef struct MPIR_Async_thing {
int (*poll_fn) (struct MPIR_Async_thing * entry);
void *state;
/* doubly-linked list */
struct MPIR_Async_thing *next, *prev;
/* poll_fn may add new async thing entries */
struct MPIR_Async_thing *new_entries;
} MPIR_Async_thing;
typedef int (*MPIR_Async_thing_poll_fn) (MPIR_Async_thing *);
/* two access functions to make MPIR_Async_thing opaque */
static inline void *MPIR_Async_thing_get_state(MPIR_Async_thing * thing)
{
return thing->state;
}
static inline void MPIR_Async_thing_spawn(MPIR_Async_thing * thing,
MPIR_Async_thing_poll_fn poll_fn, void *state)
{
MPIR_Async_thing *entry = MPL_malloc(sizeof(MPIR_Async_thing), MPL_MEM_OTHER);
entry->poll_fn = poll_fn;
entry->state = state;
entry->new_entries = NULL;
DL_APPEND(thing->new_entries, entry);
}
int MPIR_Async_things_init(void);
int MPIR_Async_things_finalize(void);
int MPIR_Async_things_add(MPIR_Async_thing_poll_fn poll_fn, void *state);
int MPIR_Async_things_progress(int *made_progress);
#endif /* MPIR_ASYNC_THINGS_H_INCLUDED */