Citation
J. Bachan, S.B. Baden, D. Bonachea, J. Corbino, M. Grossman, P.H. Hargrove, S. Hofmeyr, M. Jacquelin, A. Kamil, B. van Straalen, D. Waters, “UPC++ v1.0 Programmer’s Guide, Revision 2023.9.0”, Lawrence Berkeley National Laboratory Technical Report (LBNL-2001560), December 2023 https://doi.org/10.25344/S4P01J
Copyright
This manuscript has been authored by an author at Lawrence Berkeley National Laboratory under Contract No. DE-AC02-05CH11231 with the U.S. Department of Energy. The U.S. Government retains, and the publisher, by accepting the article for publication, acknowledges, that the U.S. Government retains a non-exclusive, paid-up, irrevocable, world-wide license to publish or reproduce the published form of this manuscript, or allow others to do so, for U.S. Government purposes.
Legal Disclaimer
This document was prepared as an account of work sponsored by the United States Government. While this document is believed to contain correct information, neither the United States Government nor any agency thereof, nor the Regents of the University of California, nor any of their employees, makes any warranty, express or implied, or assumes any legal responsibility for the accuracy, completeness, or usefulness of any information, apparatus, product, or process disclosed, or represents that its use would not infringe privately owned rights. Reference herein to any specific commercial product, process, or service by its trade name, trademark, manufacturer, or otherwise, does not necessarily constitute or imply its endorsement, recommendation, or favoring by the United States Government or any agency thereof, or the Regents of the University of California. The views and opinions of authors expressed herein do not necessarily state or reflect those of the United States Government or any agency thereof or the Regents of the University of California.
Acknowledgments
This work was supported by the Exascale Computing Project (17-SC-20-SC), a collaborative effort of the U.S. Department of Energy Office of Science and the National Nuclear Security Administration.
Early development of UPC++ was supported by the Director, Office of Science, Office of Advanced Scientific Computing Research, of the U.S. Department of Energy under Contract No. DE-AC02-05CH11231.
UPC++ is a C++ library that supports Partitioned Global Address Space (PGAS) programming. It is designed for writing efficient, scalable parallel programs on distributed-memory parallel computers. The key communication facilities in UPC++ are one-sided Remote Memory Access (RMA) and Remote Procedure Call (RPC). The UPC++ control model is single program, multiple-data (SPMD), with each separate constituent process having access to local memory as it would in C++. The PGAS memory model additionally provides one-sided RMA communication to a global address space, which is allocated in shared segments that are distributed over the processes (see Figure 1). UPC++ also features Remote Procedure Call (RPC) communication, making it easy to move computation to operate on data that resides on remote processes.
UPC++ was designed to support exascale high-performance computing, and the library interfaces and implementation are focused on maximizing scalability. In UPC++, all communication operations are syntactically explicit, which encourages programmers to consider the costs associated with communication and data movement. Moreover, all communication operations are asynchronous by default, encouraging programmers to seek opportunities for overlapping communication latencies with other useful work. UPC++ provides expressive and composable abstractions designed for efficiently managing aggressive use of asynchrony in programs. Together, these design principles are intended to enable programmers to write applications using UPC++ that perform well even on hundreds of thousands of cores.
This guide describes the LBNL implementation of UPC++, which uses GASNet for communication across a wide variety of platforms, ranging from Ethernet-connected laptops to commodity InfiniBand clusters and supercomputers with custom high-performance networks. GASNet is a language-independent, networking middleware layer that provides network-independent, high-performance communication primitives tailored for implementing parallel global address space languages and libraries such as UPC, UPC++, Fortran coarrays, Legion, Chapel, and many others. For more information about GASNet, visit https://gasnet.lbl.gov.
Although our implementation of UPC++ uses GASNet, in this guide, only the Getting Started with UPC++ and Advanced Job Launch sections are specific to the implementation. The LBNL implementation of UPC++ adheres to the implementation-independent UPC++ Specification. Both are available at the UPC++ homepage at https://upcxx.lbl.gov/.
See Additional Resources for links to additional sites that are helpful in learning to use UPC++. For example, the UPC++ training site at https://upcxx.lbl.gov/training has links to introductory video tutorials that give overviews of using UPC++ and some hands-on exercises to get started.
UPC++ has been designed with modern object-oriented concepts in mind. Novices to C++ should avail themselves of good-quality tutorials and documentation to refresh their knowledge of C++ templates, the C++ standard library (std::
), and lambda functions, which are used heavily in this guide.
We present a brief description of how to install UPC++ and compile and run UPC++ programs. For more detail, consult the INSTALL.md and README.md files that come with the distribution.
UPC++ installation follows the same steps as many GNU autotools-based projects: configure
, make all
, make install
. To install UPC++, extract the library source code to a clean directory, and from the top-level of this directory, run the following steps (output has been omitted for brevity):
$ ./configure --prefix=<upcxx-install-path>
$ make all
$ make install
This will configure, build and install the UPC++ library in the chosen <upcxx-install-path>
directory. We recommend that users choose an installation path which is a non-existent or empty directory path, so that uninstallation is as trivial as rm -rf <upcxx-install-path>
.
Optionally, make check
(after make all
) and make test_install
(after make install
) can be run to validate the build and installation steps.
In many cases additional arguments to configure
may be required or advisable in order to deploy the best UPC++ installation for your system. Please consult the INSTALL.md file that comes with the distribution for full details.
To compile a program using UPC++, the <upcxx-install-path>/bin/upcxx
wrapper can be used. This works analogously to the familiar mpicxx
wrapper, in that it invokes the underlying C++ compiler with the provided options, and automatically prepends the options necessary for including/linking the UPC++ library. For example to build the hello world code shown in an upcoming section, one could execute:
<upcxx-install-path>/bin/upcxx -O3 hello-world.cpp
Note that an option starting either with -O
(for optimized) or -g
(for debug mode) should be specified, which has a side-effect of selecting the variant of the UPC++ library built with optimization (for production runs) or with assertions and debugging symbols (for debugging and development). Alternatively, one can effect the same choice by passing upcxx -codemode={opt,debug}
The debug-mode library is highly recommended for all application development, as it includes many useful correctness assertions that can help reveal application-level bugs. Conversely, the opt-mode library should be used for all performance testing or production runs. Note that most upcxx
options are passed straight through to the C++ compiler, which includes options controlling the optimization level (if any). If no -O
options are provided (or only a bare “-O
”), the wrapper provides a compiler-specific default optimization level that might not be ideal for your program (selecting the best optimization options for your C++ compiler is application-specific and beyond the scope of this guide).
If features from a C++ standard beyond C++11 are required, the C++ standard may also need to be specified (for example, by passing -std=c++14
or -std=c++17
). For a complete example, look at the Makefile
in the <upcxx-source-path>/example/prog-guide/
directory. That directory also has code for all of the examples given in this guide. To use the Makefile
, first set the UPCXX_INSTALL
shell variable to the install path.
UPC++ supports multithreading within a process, e.g. using OpenMP. In these cases, to ensure that the application is compiled against a thread-safe UPC++ library, pass upcxx -threadmode=par
. Note that this option is less efficient than the default, upcxx -threadmode=seq
, which enables the use of a UPC++ library that is synchronization-free in most of its internals; thus, the parallel, thread-safe mode should only be used when multithreading within processes. For detailed restrictions associated with -threadmode=seq
, consult the docs/implementation-defined.md
document provided by the distribution.
UPC++’s communication services are implemented over GASNet, and on a given system there may be several communication backends available corresponding to different network hardware and/or software stacks. The communication backend can be selected using upcxx -network=<name>
.
For example, to select the debug, thread-safe version of the UDP communication backend:
<upcxx-install-path>/bin/upcxx -g -network=udp -threadmode=par hello-world.cpp
Environment variables are available to establish defaults for several of the common options. The variables UPCXX_CODEMODE
, UPCXX_THREADMODE
and UPCXX_NETWORK
provide defaults for -codemode
, -threadmode
and -network
, respectively.
There are several other options that can be passed to upcxx
. Execute upcxx --help
to get the full list of options, e.g.:
upcxx is a compiler wrapper that is intended as a drop-in replacement for your
C++ compiler that appends the flags necessary to compile/link with the UPC++ library.
Most arguments are passed through without change to the C++ compiler.
Citing UPC++ in publication? Please see: https://upcxx.lbl.gov/publications
Usage: upcxx [options] file...
upcxx Wrapper Options:
----------------------
Informational queries (no compilation):
-help This message
-version Print UPC++ and compiler version information
-info Print full configuration info
UPC++ library configuration:
-network={ibv|aries|ofi|ucx|smp|udp|mpi}
Use the indicated GASNet network backend for communication.
The default and availability of backends is system-dependent.
-codemode={opt|debug}
Select the optimized or debugging variant of the UPC++ library.
-threadmode={seq|par}
Select the single-threaded or thread-safe variant of the UPC++ library.
All other C++ compiler options:
-Wc,<anything> <anything> is passed-through uninterpreted to the underlying compiler
<anything-else> Passed-through uninterpreted to the underlying compiler
C++ compiler --help:
Usage: g++ [options] file...
[...snip...]
More details about compilation can be found in the README.md file that comes with the distribution.
To run a parallel UPC++ application, use the upcxx-run
launcher provided in the installation directory:
<upcxx-install-path>/bin/upcxx-run -n <processes> <exe> <args...>
The launcher will run the executable and arguments <exe> <args...>
in a parallel context with <processes>
number of UPC++ processes. For multiple nodes, specify the node count with -N <nodes>
.
Upon startup, each UPC++ process creates a fixed-size shared memory heap that will never grow. By default, this heap is 128 MB per process. This heap size can be set at startup by passing a -shared-heap
parameter to the run script, which takes a suffix of KB, MB or GB; e.g. to reserve 1GB per process, call:
<upcxx-install-path>/bin/upcxx-run -shared-heap 1G -n <processes> <exe> <args...>
One can also specify the shared heap size as a percentage of physical memory, split evenly between processes sharing the same node, e.g., -shared-heap=50%
. There are several other options that can be passed to upcxx-run
. Execute upcxx-run --help
to get the full list of options, e.g.:
usage: upcxx-run [-h] [-n NUM] [-N NUM] [-shared-heap HEAPSZ] [-backtrace] [-show]
[-info] [-ssh-servers HOSTS] [-localhost] [-v] [-E VAR1[,VAR2]] [--]
command ...
A portable parallel job launcher for UPC++ programs
options:
-h, --help show this help message and exit
-n NUM, -np NUM Spawn NUM number of UPC++ processes. Required.
-N NUM Run on NUM of nodes.
-shared-heap HEAPSZ Requests HEAPSZ size of shared memory per process. HEAPSZ must
include a unit suffix matching the pattern "[KMGT]B?" or be
"[0-100]%" (case-insensitive).
-backtrace Enable backtraces. Compile with -g for full debugging
information.
-show Testing: don't start the job, just output the command line
that would have been executed
-info Display useful information about the executable
-ssh-servers HOSTS List of SSH servers, comma separated.
-localhost Run UDP-conduit program on local host only
-v Generate verbose output. Multiple applications increase
verbosity.
-E VAR1[,VAR2] Adds provided arguments to the list of environment variables
to propagate to compute processes.
-- Optional separator between upcxx-run arguments and command to
execute
command to execute:
command UPC++ executable
... arguments
Use of -v
, -vv
and -vvv
provide additional information which may be useful. Among other things, -v
(or higher) will print job layout and shared-heap usage information.
The upcxx-run
utility covers the basic usage cases, but does not have options to cover all possibilities, such as establishing core and memory affinity. However, upcxx-run
is just a wrapper around system-provided job launch tools with their associated options. For information on use of system-provided job launch tools with UPC++ applications see Advanced Job Launch.
The header upcxx.hpp
defines certain preprocessor macros to reflect the version and compilation environment:
UPCXX_VERSION
An integer literal providing the release version of the implementation, in the format [YYYY][MM][PP]
corresponding to release YYYY.MM.PP
.
UPCXX_SPEC_VERSION
An integer literal providing the revision of the UPC++ specification to which this implementation adheres. See the specification for its corresponding value.
UPCXX_THREADMODE
This is either undefined (for the default “seq” threadmode) or defined to an unspecified non-zero integer value for the “par” threadmode. Recommended usage is to test using #if
, as follows, to identify the need for thread-safety:
#if UPCXX_THREADMODE
// Obtain a lock or do other thread-safety work
#endif
UPCXX_CODEMODE
This is either undefined (for the “debug” codemode) or defined to an unspecified non-zero integer value for the “opt” (production) codemode.
UPCXX_NETWORK_*
The network being targeted is indicated by a preprocessor macro with a UPCXX_NETWORK_
prefix followed by the network name in capitals. The macro for the network being targeted is defined to a non-zero integer value, while those corresponding to others network are undefined.
Currently supported networks and their macros include:
Network/System | Macro |
---|---|
InfiniBand OpenIB/OpenFabrics Verbs | UPCXX_NETWORK_IBV |
Aries (Cray XC) | UPCXX_NETWORK_ARIES |
OFI Libfabric (HPE Cray EX, Intel Omni-Path) | UPCXX_NETWORK_OFI |
UDP (e.g. for Ethernet) | UPCXX_NETWORK_UDP |
No network (single node) | UPCXX_NETWORK_SMP |
The following code implements “Hello World” in UPC++:
#include <iostream>
#include <upcxx/upcxx.hpp>
// we will assume this is always used in all examples
using namespace std;
int main(int argc, char *argv[])
{
// setup UPC++ runtime
::init();
upcxx<< "Hello world from process " << upcxx::rank_me()
cout << " out of " << upcxx::rank_n() << " processes" << endl;
// close down UPC++ runtime
::finalize();
upcxxreturn 0;
}
The UPC++ runtime is initialized with a call to upcxx::init()
, after which there are multiple processes running, each executing the same code. The runtime must be closed down with a call to upcxx::finalize()
. In this example, the call to upcxx::rank_me()
gives the index for the running process, and upcxx::rank_n()
gives the total number of processes. The use of rank in the function names refers to the rank within a team, which in this case contains all processes, i.e. team upcxx::world
. Teams are described in detail in the Teams section.
When this “hello world” example is run on four processes, it will produce output something like the following (there is no expected order across the processes):
Hello world from process 0 out of 4 processes
Hello world from process 2 out of 4 processes
Hello world from process 3 out of 4 processes
Hello world from process 1 out of 4 processes
A UPC++ program can allocate global memory in shared segments, which are accessible by all processes. A global pointer points at storage within the global memory, and is declared as follows:
::global_ptr<int> gptr = upcxx::new_<int>(upcxx::rank_me()); upcxx
The call to upcxx::new_<int>
allocates a new integer in the calling process’s shared segment, and returns a global pointer (upcxx::global_ptr
) to the allocated memory. This is illustrated in Figure 2, which shows that each process has its own private pointer (gptr
) which points to an integer object in its local shared segment. By contrast, a conventional C++ dynamic allocation (int *mine = new int
) allocates an integer object in private local memory. Note that we use the integer type in this paragraph as an example, but any type T
can be allocated using the upcxx::new_<T>()
function call.
A UPC++ global pointer is fundamentally different from a conventional C++ pointer: it cannot be dereferenced using the *
operator; it does not support dynamic casts (e.g., polymorphic conversion of pointers between base and derived types); and it cannot be constructed by the C++ std::addressof
operator. However, UPC++ global pointers do support pointer arithmetic and are trivially copyable.
The upcxx::new_
function calls the class constructor in addition to allocating memory. Since we are allocating a scalar, we can pass arguments to the constructor. Thus, we don’t have to invoke the default constructor. The upcxx::new_
function is paired with upcxx::delete_
:
::global_ptr<int> x = upcxx::new_<int>(42);
upcxx// work with x
...
::delete_(x); upcxx
UPC++ provides the function upcxx::new_array
for allocating a 1-dimensional array in the shared segment. This is a non-collective call and the array is NOT distributed across processes. Each call allocates an independent, contiguous array object in the caller’s shared segment. upcxx::new_array
operates analogously to C++ array new and default-initializes objects (meaning it calls the default constructor for class types, and leaves other types with indeterminate value). The destruction operation is upcxx::delete_array
. For example, to allocate a shared array of 10 uninitialized integers and then free it:
::global_ptr<int> x_arr = upcxx::new_array<int>(10);
upcxx// work with x_arr ...
::delete_array(x_arr); upcxx
UPC++ also provides functions for allocating and deallocating uninitialized shared storage without calling constructors and destructors. The upcxx::allocate
function allocates enough (uninitialized) space for n
shared objects of type T
on the current process, with a specified alignment, and upcxx::deallocate
frees the memory.
If the shared memory pointed to by a global pointer has affinity to a local process, UPC++ enables a programmer to use a global pointer as an ordinary C++ pointer, through downcasting, via the local
method of upcxx::global_ptr
, for example:
::global_ptr<int> x_arr = upcxx::new_array<int>(10);
upcxx(x_arr.is_local()); // a precondition of global_ptr<T>::local()
UPCXX_ASSERTint *local_ptr = x_arr.local();
[i] = ... // work with local ptr local_ptr
UPCXX_ASSERT()
is a macro analogous to the C library assert()
macro from <cassert>
, except the assertion check is conditionally enabled using UPCXX_CODEMODE
instead of the NDEBUG
define. There is also a UPCXX_ASSERT_ALWAYS()
variant that performs an unconditional check. Both macros accept an optional second argument that can be used to provide a console message to print upon failure, which can be a simple string or a stream insertion expression.
Using this downcasting feature, we can treat all shared objects allocated by a process as local objects (global references are needed by remote processes only). Storage with local affinity can be accessed more efficiently via an ordinary C++ pointer. Note that the local()
method is safe only if the global pointer is local. To this end, UPC++ provides the is_local
method for global pointer to check for locality, which will return true if this global pointer is local and can be downcast. This call can help to avoid catastrophic errors, as it is incorrect to attempt to downcast a global pointer that is not local to a process. In ``debug” codemode, such attempts are automatically diagnosed with a fatal error at runtime.
We illustrate the use of global shared memory with a simple program that implements a one-dimensional (1D) stencil. Stencil codes are widely used for modeling physical systems, e.g. for solving differential equations. Stencil codes are iterative algorithms that operate on a mesh or partition of data, where the value at each mesh location is computed based on the values at neighboring locations. A trivial example is where at each iteration the value for each mesh location u[i]
is computed as the average of its neighbors:
[i] = (u[i - 1] + u[i + 1]) / 2; u
In many applications, these updates are iterated until the computed solution converges to a desired degree of accuracy, epsilon
.
One way to parallelize the 1D stencil computation is with red-black updates. The mesh is divided into two groups: the odd-numbered indexes (red) and the even-numbered indexes (black). When iterating, first all the red indexes are updated, and then all the black. This enables us to compute the values for all of a color in any order with only dependencies on the other color, as shown in the following code sketch:
for (int step = 0; step < max_steps; step++) {
for (int i = 1 + step % 2; i < n - 1; i += 2)
[i] = (u[i - 1] + u[i + 1]) / 2;
uif (error(u) <= epsilon) break;
}
Note that the red updates are done on even-numbered steps, and the black updates on odd-numbered.
To implement the parallel algorithm in UPC++, we split the solution into panels, one per process. Each process can operate independently on its own subset of the mesh, except at the boundaries, where a process will need to access the neighbors’ values. In order to do this, we can allocate the panels in the shared segment, using UPC++’s array allocation:
::global_ptr<double> gptr_panel = upcxx::new_array<double>(n_local); upcxx
Here n_local
is the number of points assigned to the local panel. Assuming that the total number of processes divides n
evenly, we have n_local = n / upcxx::rank_n() + 2
. The final +2
is extra space for ghost cells to store the neighboring values. For simplicity, we also assume n_local
is even.
Processes in UPC++ are not automatically aware of new allocations in another process’s shared segment, so we must ensure that the processes obtain the global pointers that they require. There are several different ways of doing this; we will show how to do it using a convenient construct provided by UPC++, called a distributed object.
A distributed object is a single logical object partitioned over a set of processes (a team), where every process has the same global name for the object (i.e. a universal name), but its own local value. Distributed objects are created with the upcxx::dist_object<T>
type. For example, in our stencil, we can allocate our panels and declare a distributed object to hold the global pointers to the panels:
::dist_object<upcxx::global_ptr<double>> u_g(upcxx::new_array<double>(n_local)); upcxx
Each process in a given team (in this case the default team containing all processes) collectively calls this constructor for upcxx::dist_object<T>
, with a value of type T
representing the process’s instance value for the object (a global pointer to an array of doubles in the example above). Although this dist_object
constructor is collective, there is no guarantee that when the constructor returns on a given process it will be complete on any other process. To avoid this hazard, UPC++ provides an interlock to ensure that accesses to a dist_object
are delayed until the local representative has been fully constructed and activated.
We can still use global_ptr::local()
to downcast the pointer contained in a distributed object, provided the allocation has affinity to the process doing the downcast. We use this feature to get an ordinary C++ pointer to the panel:
double *u = u_g->local();
To access the remote value of a distributed object, we use the upcxx::dist_object::fetch
member function, which, given a process rank argument, will get the T
value from the sibling distributed object representative on the remote process. For simplicity we will use periodic boundary conditions, so process 0 has process n-1
as its left neighbor, and process n-1
has process 0 as its right neighbor. In our algorithm, we need to fetch the global pointers for the left (uL
) and right (uR
) ghost cells:
int l_nbr = (upcxx::rank_me() + upcxx::rank_n() - 1) % upcxx::rank_n();
int r_nbr = (upcxx::rank_me() + 1) % upcxx::rank_n();
::global_ptr<double> uL = u_g.fetch(l_nbr).wait();
upcxx::global_ptr<double> uR = u_g.fetch(r_nbr).wait(); upcxx
Because the fetch function is asynchronous, we have to synchronize on completion using a call to wait()
. In the Asynchronous Computation section, we will see how to overlap asynchronous operations, that is, when communication is split-phased. For now, we do not separate the asynchronous initiation from the wait()
.
Now, when we execute the red-black iterations, we can use the global pointers to the ghost cells to get the remote processes’ values. To do this, we invoke a remote get:
if (!(step % 2)) u[0] = upcxx::rget(uL + block).wait();
else u[n_local - 1] = upcxx::rget(uR + 1).wait();
The remote get function is part of the one-sided Remote Memory Access (RMA) communication supported by UPC++. Also supported is a remote put function, upcxx::rput
. These operations initiate transfer of the value
object to (put) or from (get) the remote process; no coordination is needed with the remote process (this is why it is one-sided). The type T
transferred must be TriviallySerializable, generally by satisfying the C++ TriviallyCopyable requirements. Like many asynchronous communication operations, rget
and rput
default to returning a future object that becomes ready when the transfer is complete (futures are discussed in more detail in the Asynchronous Computation section, and completions in general are described in more detail in the Completions section).
Putting all the components described together, the main
function for the red-black solver is:
int main(int argc, char **argv) {
::init();
upcxx// initialize parameters - simple test case
const long N = 1024;
const long MAX_ITER = N * N * 2;
const double EPSILON = 0.1;
const int MAX_VAL = 100;
const double EXPECTED_VAL = MAX_VAL / 2;
// get the bounds for the local panel, assuming num procs divides N into an even block size
long block = N / upcxx::rank_n();
(block % 2 == 0); UPCXX_ASSERT(N == block * upcxx::rank_n());
UPCXX_ASSERTlong n_local = block + 2; // plus two for ghost cells
// set up the distributed object
::dist_object<upcxx::global_ptr<double>> u_g(upcxx::new_array<double>(n_local));
upcxx// downcast to a regular C++ pointer
double *u = u_g->local();
// init to uniform pseudo-random distribution, independent of job size
(1); rgen.discard(upcxx::rank_me() * block);
mt19937_64 rgenfor (long i = 1; i < n_local - 1; i++)
[i] = 0.5 + rgen() % MAX_VAL;
u// fetch the left and right pointers for the ghost cells
int l_nbr = (upcxx::rank_me() + upcxx::rank_n() - 1) % upcxx::rank_n();
int r_nbr = (upcxx::rank_me() + 1) % upcxx::rank_n();
::global_ptr<double> uL = u_g.fetch(l_nbr).wait();
upcxx::global_ptr<double> uR = u_g.fetch(r_nbr).wait();
upcxx::barrier(); // optional - wait for all ranks to finish init
upcxx// iteratively solve
for (long stepi = 0; stepi < MAX_ITER; stepi++) {
// alternate between red and black
int phase = stepi % 2;
// get the values for the ghost cells
if (!phase) u[0] = upcxx::rget(uL + block).wait();
else u[n_local - 1] = upcxx::rget(uR + 1).wait();
// compute updates and error
for (long i = phase + 1; i < n_local - 1; i += 2)
[i] = (u[i - 1] + u[i + 1]) / 2.0;
u::barrier(); // wait until all processes have finished calculations
upcxxif (stepi % 10 == 0) { // periodically check convergence
if (check_convergence(u, n_local, EXPECTED_VAL, EPSILON, stepi))
break;
}
}
::finalize();
upcxxreturn 0;
}
We have one helper function, check_convergence
, which determines if the solution has converged. It uses a UPC++ collective, upcxx::reduce_all
, to enable all the processes to obtain the current maximum error:
bool check_convergence(double *u, long n_local, const double EXPECTED_VAL,
const double EPSILON, long stepi)
{
double err = 0;
for (long i = 1; i < n_local - 1; i++)
= max(err, fabs(EXPECTED_VAL - u[i]));
err // upcxx collective to get max error over all processes
double max_err = upcxx::reduce_all(err, upcxx::op_fast_max).wait();
// check for convergence
if (max_err / EXPECTED_VAL <= EPSILON) {
if (!upcxx::rank_me())
<< "Converged at " << stepi <<", err " << max_err << endl;
cout return true;
}
return false;
}
Because the collective function is asynchronous, we synchronize on completion, using wait()
, to retrieve the result.
An RPC enables the calling process to invoke a function at a remote process, using parameters sent to the remote process via the RPC. For example, to execute a function square
on process r
, we would call:
int square(int a, int b) { return a * b; }
::future<int> fut_result = upcxx::rpc(r, square, a, b); upcxx
By default, an RPC returns the result in an appropriately typed upcxx::future
, and we can obtain the value using wait
, i.e.
int result = fut_result.wait();
(for more on futures, see the Asynchronous Computation section). The function passed in can be a lambda or another function, but note that currently the function cannot be in a shared library. The arguments to an RPC must be Serializable, generally either by satisfying the C++ TriviallyCopyable requirement (which includes all builtin types and many class types), or be one of the commonly used types from the C++ standard template library. See Serialization for details, including interfaces for defining non-trivial serialization of user-defined types.
We illustrate the use of RPCs with a simple distributed hash table, a distributed analog of the C++ unordered map container. The hash table is implemented in a header file, and provides insert and find operations. It relies on each process having a local std::unordered_map
to store the key-value pairs, and in order for the local unordered maps to be accessible from the RPCs, we wrap them in a distributed object:
using dobj_map_t = upcxx::dist_object<std::unordered_map<std::string, std::string> >;
dobj_map_t local_map;
This is initialized in the constructor:
() : local_map({}) {} DistrMap
For the insert operation, the target process is determined by a hash function, get_target_rank(key)
, which maps the key to one of the processes. The data is inserted using a lambda function, which takes a key and a value as parameters. The insert operation returns the result of the lambda, which in this case is an empty future:
::future<> insert(const std::string &key, const std::string &val) {
upcxxreturn upcxx::rpc(get_target_rank(key),
[](dobj_map_t &lmap, const std::string &key, const std::string &val) {
->insert({key, val});
lmap}, local_map, key, val);
}
Note the dist_object
argument local_map
to the RPC. As a special case, when a dist_object
is passed as the argument to an RPC, the RPC activation at the target receives a reference to its local representative of the same dist_object
. Also note the use of const-reference for other arguments, to avoid the cost of copying the RPC arguments upon entry to the callback.
The find operation for our distributed map is similar, and is given in the full header example:
#include <map>
#include <upcxx/upcxx.hpp>
class DistrMap {
private:
// store the local unordered map in a distributed object to access from RPCs
using dobj_map_t = upcxx::dist_object<std::unordered_map<std::string, std::string> >;
dobj_map_t local_map;
// map the key to a target process
int get_target_rank(const std::string &key) {
return std::hash<std::string>{}(key) % upcxx::rank_n();
}
public:
// initialize the local map
() : local_map({}) {}
DistrMap// insert a key-value pair into the hash table
::future<> insert(const std::string &key, const std::string &val) {
upcxx// the RPC returns an empty upcxx::future by default
return upcxx::rpc(get_target_rank(key),
// lambda to insert the key-value pair
[](dobj_map_t &lmap, const std::string &key, const std::string &val) {
// insert into the local map at the target
->insert({key, val});
lmap}, local_map, key, val);
}
// find a key and return associated value in a future
::future<std::string> find(const std::string &key) {
upcxxreturn upcxx::rpc(get_target_rank(key),
// lambda to find the key in the local map
[](dobj_map_t &lmap, const std::string &key) -> std::string {
auto elem = lmap->find(key);
if (elem == lmap->end()) return std::string(); // not found
else return elem->second; // key found: return value
}, local_map, key);
}
};
A test example of using the distributed hash table is shown below. Each process generates a set of N
key-value pairs, guaranteed to be unique, inserts them all into the hash table, and then retrieves the set of keys for the next process over, asserting that each one was found and correct. The insertion phase is followed by a barrier, to ensure all insertions have completed:
#include <iostream>
#include "dmap.hpp"
using namespace std;
int main(int argc, char *argv[])
{
::init();
upcxxconst long N = 1000;
;
DistrMap dmap// insert set of unique key, value pairs into hash map, wait for completion
for (long i = 0; i < N; i++) {
= to_string(upcxx::rank_me()) + ":" + to_string(i);
string key = key;
string val .insert(key, val).wait();
dmap}
// barrier to ensure all insertions have completed
::barrier();
upcxx// now try to fetch keys inserted by neighbor
for (long i = 0; i < N; i++) {
= to_string((upcxx::rank_me() + 1) % upcxx::rank_n()) + ":" + to_string(i);
string key = dmap.find(key).wait();
string val // check that value is correct
(val == key);
UPCXX_ASSERT}
::barrier(); // wait for finds to complete globally
upcxxif (!upcxx::rank_me()) cout << "SUCCESS" << endl;
::finalize();
upcxxreturn 0;
}
In the previous example, the rpc
operations are synchronous because of the call to the wait
method. To achieve maximum performance, UPC++ programs should always take advantage of asynchrony when possible.
We’ve seen several RPC examples that specify a remote callback by passing a C++ lambda expression, which is concise and convenient for small blocks of code. RPC actually accepts any form of C++ FunctionObject for the callback, which includes global and file-scope functions, class static functions, function pointers, lambda expressions and even user-defined objects implementing operator()
. RPC includes system-specific logic to automatically ensure these various callables are correctly serialized and invoked at the target process. However it’s important to understand that when passing a lambda expression as the FunctionObject, RPC will assume that lambda is trivially copyable and it will be byte-copied when transmitted to the target process. As a result, special care needs to be taken when using lambda captures with RPC.
In particular:
When passing a C++ lambda expression to an RPC for execution on another process, by-reference lambda captures should never be used.
This is because the C++ compiler implements reference captures using raw virtual memory addresses, which are generally not meaningful at the remote process that executes the RPC callback. Note this rule does not apply to LPC and future::then()
callbacks (discussed in upcoming sections), which always execute in the same process, but the normal C++ precautions regarding reference captures and object lifetimes still apply.
Our examples thus far have used lambda expressions with an empty capture list, i.e.: [/*empty*/](...) { ... }
, which trivially avoids this pitfall. It is possible to safely use value captures (also known as capture by-copy) of trivial types in a lambda expression passed to RPC, however keep in mind the lambda object (including any captures) are still assumed to be trivially copyable:
When passing a C++ lambda expression to an RPC, lambda value captures (capture by-copy) should only include objects which are trivially copyable.
When in doubt, it’s always safer to pass data to an RPC callback (lambda or otherwise) using explicit RPC callback arguments as we’ve done in our examples so far, instead of relying upon lambda captures or data otherwise embedded in the FunctionObject. In section Serialization we’ll learn about the UPC++ machinery that enables RPC callback arguments to transmit rich data types.
Most communication operations in UPC++ are asynchronous. So, in our red-black solver example, when we made the call to upcxx::rget
, we explicitly waited for it to complete using wait()
. However, in split-phase algorithms, we can perform the wait at a later point, allowing us to overlap computation and communication.
The return type for upcxx::rget
is dependent on the UPC++ completion object passed to the UPC++ call. The default completion is a UPC++ future, which holds a value (or tuple of values) and a state (ready or not ready). We will use this default completion here, and will return to the subject in detail in the Completions section. When the rget
completes, the future becomes ready and can be used to access the results of the operation. The call to wait()
can be replaced by the following equivalent polling loop, which exits when communication has completed:
::future<double> fut = upcxx::rget(uR + 1);
upcxxwhile (!fut.ready()) upcxx::progress();
[n_local - 1] = fut.result(); u
First, we get the future object, and then we loop on it until it becomes ready. This loop must include a call to the upcxx::progress
function, which progresses the library and transitions futures to a ready state when their corresponding operation completes (see the Progress section for more details on progress). This common idiom is embodied in the wait()
method of upcxx::future
.
Using futures, the process waiting for a result can do computation while waiting, effectively overlapping computation and communication. For example, in our distributed hash table, we can do the generation of the key-value pair asynchronously as follows:
// initialize key and value for first insertion
= to_string(upcxx::rank_me()) + ":" + to_string(0);
string key = key;
string val for (long i = 0; i < N; i++) {
::future<> fut = dmap.insert(key, val);
upcxx// compute new key while waiting for RPC to complete
if (i < N - 1) {
= to_string(upcxx::rank_me()) + ":" + to_string(i + 1);
key = key;
val }
// wait for operation to complete before next insert
.wait();
fut}
UPC++ also provides callbacks or completion handlers that can be attached to futures. These are functions that are executed by the local process when the future is ready (this is sometimes also referred to as “future chaining”). In our distributed hash table example, we want to check the value once the find
operation returns. To do this asynchronously, we can attach a callback to the future using the .then
method of upcxx::future
. The callback is executed on the initiating process when the find
completes, and is passed the result of the future as a parameter. In this example, the callback is a lambda, which checks the returned value:
for (long i = 0; i < N; i++) {
= to_string((upcxx::rank_me() + 1) % upcxx::rank_n()) + ":" + to_string(i);
string key // attach callback, which itself returns a future
::future<> fut = dmap.find(key).then(
upcxx// lambda to check the return value
[key](const string &val) {
(val == key);
UPCXX_ASSERT});
// wait for future and its callback to complete
.wait();
fut}
An important feature of UPC++ is that there are no implicit ordering guarantees with respect to asynchronous operations. In particular, there is no guarantee that operations will complete in the order they were initiated. This allows for more efficient implementations, but the programmer must not assume any ordering that is not enforced by explicit synchronization.
When many asynchronous operations are launched, it can be cumbersome to individually track the futures, and wait on all of them for completion. In the previous example of asynchronous finds, we might have preferred to issue all the finds asynchronously and wait on all of them to complete at the end. UPC++ provides an elegant solution to do this, allowing futures to be conjoined (i.e. aggregated), so that the results of one future are dependent on others, and we need only wait for completion explicitly on one future. The example below illustrates how the hash table lookups can be performed asynchronously using this technique:
// the start of the conjoined future
::future<> fut_all = upcxx::make_future();
upcxxfor (long i = 0; i < N; i++) {
= to_string((upcxx::rank_me() + 1) % upcxx::rank_n()) + ":" + to_string(i);
string key // attach callback, which itself returns a future
::future<> fut = dmap.find(key).then(
upcxx// lambda to check the return value
[key](const string &val) {
(val == key);
UPCXX_ASSERT});
// conjoin the futures
= upcxx::when_all(fut_all, fut);
fut_all // periodically call progress to allow incoming RPCs to be processed
if (i % 10 == 0) upcxx::progress();
}
// wait for all the conjoined futures to complete
.wait(); fut_all
The future conjoining begins by invoking upcxx::make_future
to construct a trivially ready future fut_all
. We then loop through each iteration, calling DistrMap::find
asynchronously, and obtaining the future fut
returned by the .then
callback attached to the find
operation. This future is passed to the upcxx::when_all
function, in combination with the previous future, fut_all
. The upcxx::when_all
constructs a new future representing readiness of all its arguments (in this case fut
and fut_all
), and returns a future with a concatenated results tuple of the arguments. By setting fut_all
to the future returned by when_all
, we can extend the conjoined futures. Once all the processes are linked into the conjoined futures, we simply wait on the final future, i.e. the fut_all.wait()
call. Figure 3 depicts the dependency graph of operations constructed when this code executes, with each blue box conceptually corresponding to a future constructed synchronously at runtime. The boxes are labeled with a fragment of the code that activates after any/all incoming dependencies (depicted by arrows) are readied, and eventually leads to readying of the future associated with this box. For instance, the callback passed to each then
will execute at some time after its preceding dmap.find()
has completed asynchronously. Note that within the loop, we have a periodic call to upcxx::progress
, which gives the UPC++ runtime an opportunity to process incoming RPCs and run completion callbacks (progress is described in more detail in the Progress section).
It is also possible to keep track of futures with an array or some other container, such as a vector. However, conjoining futures has several advantages:
.wait()
needed to complete a group of operations. Without conjoined futures, the code must loop through multiple .wait
calls, potentially adding overhead.We saw above that futures can encapsulate the statuses of multiple asynchronous operations. Additionally, they can encapsulate multiple values. These values are contained in a std::tuple
that can be accessed using the result()
or wait()
member functions of upcxx::future
. Using the C++11 standard, these values can be accessed as shown below. The make_future
function is variadic, and constructs a ready future from the arguments.
int a;
double b;
::future<int,double> fut = upcxx::rpc(0,[]() { return upcxx::make_future(42, 3.14); });
upcxxstd::tie(a,b) = fut.wait();
However, the code can be expressed much more concisely using C++17 structured bindings.
::future<int,double> fut = upcxx::rpc(0,[]() { return upcxx::make_future(42, 3.14); });
upcxxauto [a,b] = fut.wait();
In both of the above cases a
and b
are copies of the values contained within fut
. If copying these values is expensive or unnecessary, wait_reference()
can be used to obtain const
references, instead.
Quiescence is a state in which no process is performing computation that will result in communication injection, and no communication operations are currently in-flight in the network or queues on any process. Quiescence is of particular importance for applications using anonymous asynchronous operations on which no synchronization is possible on the sender’s side. For example, quiescence may need to be achieved before destructing resources and/or exiting a UPC++ computational phase. It’s important to note that quiescence is also a precondition for calling upcxx::finalize()
during an orderly exit of the parallel job.
To illustrate a simple approach to quiescence, we use the distributed hash table again. In this case, we use a variant of RPC that does not return a future, whose simplest overload looks like this:
template<typename Func, typename ...Args>
void upcxx::rpc_ff(upcxx::intrank_t recipient, Func &&func, Args &&...args);
The _ff
stands for “fire-and-forget”. From a performance standpoint, upcxx::rpc_ff
has the advantage that it does not send a response message to satisfy a future
back to the process which has issued the RPC. However, because no acknowledgment is returned to the initiator, the caller does not get a future to indicate when the upcxx::rpc_ff
invocation has completed execution of func
at the target. We can modify the distributed hash table insert operation to use upcxx::rpc_ff
as follows:
// insert a key-value pair into the hash table
void insert(const std::string &key, const std::string &val) {
// this RPC does not return anything
::rpc_ff(get_target_rank(key),
upcxx// lambda to insert the key-value pair
[](dobj_map_t &lmap, const std::string &key, const std::string &val) {
(lmap->count(key) == 0); // assume no duplicate keys
UPCXX_ASSERT// insert into the local map at the target
->insert({key, val});
lmap}, local_map, key, val);
}
The only way to establish quiescence is to use additional code. For example, assuming the number of inserts performed by each rank is unpredictable, we could change the distributed hash table insert loop as follows:
// keep track of how many inserts this rank has injected
long n_inserts_injected = 0;
// insert all key-value pairs into the hash map
for (long i = 0; i < N; i++) {
= to_string(upcxx::rank_me()) + ":" + to_string(i);
string key = key;
string val if (should_perform_insert(i)) { // unpredictable condition
// insert mapping from key to value in our distributed map.
// insert has no return because it uses rpc_ff.
.insert(key, val);
dmap// increment the local count
++;
n_inserts_injected}
// periodically call progress to allow incoming RPCs to be processed
if (i % 10 == 0) upcxx::progress();
}
bool done;
do { // Loop while not all insert rpc_ff have completed.
::progress(); // process incoming work
upcxx// On each rank, capture the counts of inserts injected and completed
long local[2] = {n_inserts_injected, dmap.local_size()};
// Globally count the number of inserts injected and completed by completing
// an element-wise sum reduction of each of the two counters in the local
// array, delivering the results in the global array.
long global[2];
::reduce_all(local, global, 2, upcxx::op_fast_add).wait();
upcxx// Test if all inserts have now completed
(global[0] >= global[1]);
UPCXX_ASSERT= (global[0] == global[1]);
done } while (!done);
To track completion, each process tracks how many inserts it has injected in the local variable n_inserts_injected
. The number of inserts that have arrived and completed on each process is tracked implicitly by the local size of the distributed map which is defined in DistrMap
simply as:
int local_size() { return local_map->size(); }
Once all processes have finished injecting their inserts, a global sum reduction (upcxx::reduce_all
) is used to simultaneously compute the total number of inserts injected and completed across all processes. The call to upcxx::reduce_all
computes an element-wise sum reduction of each of the two counters in the local
array, storing the results in the global
array. Reductions are discussed in more detail in the Reduction section. This reduction is repeated until the two values are equal, indicating that all inserts (i.e., rpc_ff
) have executed.
Note that to safely perform repeated rpc_ff
quiescence (e.g. in a loop) using the above method requires double buffering of the counters for injected and executed rpc_ff
– a more complete example of this is provided in the UPC++ Extras package under examples/rpc_ff_quiescence
.
This counting algorithm is one mechanism to achieve quiescence, which relies on counting the number of one-way messages injected and received and establishing global consensus for when they match. There are many alternative ways to establish quiescence. For example, when the number of messages to be received by each rank is known beforehand or can be efficiently computed, one can deploy a simpler quiescence algorithm that skips the communication step to establish consensus and has each rank simply await the arrival of the expected number of messages.
UPC++ provides atomic operations on shared objects. These operations are handled differently from C++ atomics and rely on the notion of an atomic domain, a concept inherited from UPC. In UPC++, all atomic operations are associated with an atomic domain, an abstraction that encapsulates a supported type and set of operations. Currently, the supported types include: float
, double
, and any signed or unsigned integral type with a 32-bit or 64-bit representation. The full list of operations can be found in the UPC++ specification.
Each atomic domain is a collective object comprised of instances of the atomic_domain
class, and the operations are defined as methods on that class. The use of atomic domains permits selection (at construction) of the most efficient available implementation which can provide correct results for the given set of operations on the given data type. This implementation may be hardware-dependent and vary for different platforms. To get the best possible performance from atomics, the user should be aware of which atomics are supported in hardware on their platform, and set up the domains accordingly.
Similar to a mutex, an atomic domain exists independently of the data it applies to. User code is responsible for ensuring that data accessed via a given atomic domain is only accessed via that domain, never via a different domain or without use of a domain. Users may create as many domains as needed to describe their uses of atomic operations, so long as there is at most one domain per atomic datum. If distinct data of the same type are accessed using differing sets of operations, then creation of distinct domains for each operation set is recommended to achieve the best performance on each set.
We illustrate atomics by showing how they can be used to solve the quiescence counting problem for the distributed hash table, discussed in the previous section. The code is as follows:
// keep track of how many inserts have been made to each target process
std::unique_ptr<int64_t[]> inserts_per_rank(new int64_t[upcxx::rank_n()]());
// insert all key-value pairs into the hash map
for (long i = 0; i < N; i++) {
= to_string(upcxx::rank_me()) + ":" + to_string(i);
string key = key;
string val if (should_perform_insert(i)) { // unpredictable condition
.insert(key, val);
dmap[dmap.get_target_rank(key)]++;
inserts_per_rank}
// periodically call progress to allow incoming RPCs to be processed
if (i % 10 == 0) upcxx::progress();
}
// setup atomic domain with only the operations needed
::atomic_domain<int64_t> ad({upcxx::atomic_op::load, upcxx::atomic_op::add});
upcxx// distributed object to keep track of number of inserts expected at every process
::dist_object<upcxx::global_ptr<int64_t> > n_inserts(upcxx::new_<int64_t>(0));
upcxx// get pointers for all other processes and use atomics to update remote counters
for (long i = 0; i < upcxx::rank_n(); i++) {
if (inserts_per_rank[i]) {
::global_ptr<int64_t> remote_n_inserts = n_inserts.fetch(i).wait();
upcxx// use atomics to increment the remote process's expected count of inserts
.add(remote_n_inserts, inserts_per_rank[i], memory_order_relaxed).wait();
ad}
}
::barrier();
upcxx// Note: once a memory location is accessed with atomics, it should only be
// subsequently accessed using atomics to prevent unexpected results
int64_t expected_inserts = ad.load(*n_inserts, memory_order_relaxed).wait();
// wait until we have received all the expected updates, spinning on progress
while (dmap.local_size() < expected_inserts) upcxx::progress();
This quiescence algorithm is likely to be slower and less scalable than the example in the Quiescence section, but it’s used here to demonstrate the use of remote atomic memory operations. Each rank allocates an int64_t
counter in the shared space and shares the global pointer to this location with other ranks using a dist_object
. This is the location that remote ranks will atomically update, which must be an object of one of the supported scalar types that lives in the shared segment. Next we declare an atomic domain, which includes only the two operations we will actually use:
::atomic_domain<int64_t> ad({upcxx::atomic_op::load, upcxx::atomic_op::add}); upcxx
Each process maintains an array, inserts_per_rank
, of the expected counts for all other processes. Once it has finished all its inserts, it loops over all the remote processes, and for each one it first obtains the remote global pointer using fetch
, and then atomically updates the target process’s counter. Finally, each process spins, waiting for the size of the local unordered map to match its expected number of inserts.
Like all atomic operations (and indeed, nearly all UPC++ communication operations), the atomic load and add operations are asynchronous. The load operation returns a completion object, which defaults to a future. In the example in this section, we synchronously wait for each dist_object::fetch
and atomic_domain::add
operation to complete to simplify the presentation. However we could easily use future chaining and conjoining or promises to track completion of both operations and achieve full communication/communication overlap for all the operations in the loop; the details are left as an exercise for the reader.
Note that an atomic domain must be explicitly destroyed via a collective call to the atomic_domain::destroy()
method, before the allocated atomic_domain
object goes out of scope or is deleted (this action meets the precondition of the object’s destructor, that the object has been destroyed). If the atomic_domain
object hasn’t been destroyed appropriately, the program will crash with an error message that upcxx::atomic_domain::destroy()
must be called collectively before the destructor, which has been invoked implicitly.
In the previous examples in this guide, we have relied on futures to inform us about the completion of asynchronous operations. However, UPC++ provides several additional mechanisms for determining completion, including promises, remote procedure calls (RPCs) and local procedure calls (LPCs). Further on in this section, we give examples of completions using promises and RPCs, and a subsequent section demonstrates LPC completions.
A completion object is constructed by a call to a static member function of one of the completion classes: upcxx::source_cx
, upcxx::remote_cx
or upcxx::operation_cx
. These classes correspond to the different stages of completion of an asynchronous operation: source completion indicates that the source memory resources needed for the operation are no longer in use by UPC++ at the source process, whereas operation and remote completion indicate that the operation is fully complete from the perspective of the initiating process and remote target process, respectively. As we have seen in the previous examples in this guide, most operations default to notification of operation completion using a future, e.g.:
template <typename T, typename Cx=/*unspecified*/>
::rput(T const *src, upcxx::global_ptr<T> dest, std::size_t count,
RType upcxx&&completions=upcxx::operation_cx::as_future()); Cx
As shown above, the completion object is constructed as a completion stage combined with a mechanism of completion notification. There are restrictions on which completion notifications can be associated with which stages: futures, promises and LPCs are only valid for source and operation completions, whereas RPCs are only valid for remote completions.
It is possible to request multiple completion notifications from one operation using the pipe (|
) operator to combine completion objects. For example, future completion can be combined with RPC completion as follows:
auto cxs = (upcxx::remote_cx::as_rpc(some_func) | upcxx::operation_cx::as_future());
The completion argument passed to an asynchronous communication initiation function influences the polymorphic return type of that function (abbreviated as RType
in prototypes). In particular, if one future completion is requested (as with the default completion), then the return type RType
is an appropriately typed future (as we’ve seen in prior examples). If a completion argument is passed that requests no future-based completions, then the return type RType
is void
. If more than one future completion is requested (for example, source and operation completions) then the return type RType
is a std::tuple
of the requested futures:
::future<> fut_src, fut_op;
upcxxstd::tie(fut_src, fut_op) = upcxx::rput(p_src, gptr_dst, 1,
::source_cx::as_future() | upcxx::operation_cx::as_future());
upcxx.wait();
fut_src// ... source memory now safe to overwrite
.wait(); // wait for the rput operation to be fully complete fut_op
Every asynchronous communication operation has an associated promise object, which is either created explicitly by the user or implicitly by the runtime when a non-blocking operation is invoked requesting the default future-based completion. A promise represents the producer side of an asynchronous operation, and it is through the promise that the results of the operation are supplied and its dependencies fulfilled. A future is the interface through which the status of the operation can be queried and the results retrieved, and multiple future objects may be associated with the same promise. A future thus represents the consumer side of a non-blocking operation.
Promises are particularly efficient at keeping track of multiple asynchronous operations, essentially acting as a dependency counter. Here is an example using RMA:
::promise<> p;
upcxx::global_ptr<int> gps[10] = /* . . . */;
upcxxfor (int i = 0; i < 10; ++i) // register some RMA operations on p:
::rput(i, gps[i], upcxx::operation_cx::as_promise(p));
upcxx::future<> f = p.finalize(); // end registration on p, obtain future
upcxx.wait(); // await completion of all RMAs above f
The first line explicitly constructs a promise, where the empty template arguments indicate it will only track readiness status and not contain/produce a value (an “empty promise”). The default promise constructor initializes its encapsulated dependency counter to 1, placing it in a non-readied state (the promise is ready when the counter reaches zero). Member functions on promise
allow direct manipulation of the dependency counter, but those are unnecessary for many simple use cases.
Next we issue a series of RMA put operations, overriding the default future-based completion by instead passing operation_cx::as_promise(p)
to request promise-based completion notification on the supplied promise. Each communication operation has a side-effect of incrementing the dependency counter of the supplied promise at injection time, in a process known as “registering” the communication operation with the promise. When each asynchronous operation later completes, the dependency counter of the same promise will be decremented to provide completion notification. After registering our communication on the promise, we invoke p.finalize()
which decrements the counter once, matching the counter initialization, and ending the registration stage of the promise. finalize()
returns the upcxx::future<>
handle corresponding to our promise, which will be readied once the encapsulated dependency counter reaches zero, indicating that all of the registered asynchronous operations have reached operation completion.
In the following example, we show how promises can be used to track completion of the distributed hash table inserts. First, we modify the insert
operation to use a promise instead of return a future:
// insert a key, value pair into the hash table, track completion with promises
void insert(const std::string &key, const std::string &val, upcxx::promise<> &prom) {
::rpc(get_target_rank(key),
upcxx// completion is a promise
::operation_cx::as_promise(prom),
upcxx// lambda to insert the key, value pair
[](dobj_map_t &lmap, const std::string &key, const std::string &val) {
// insert into the local map at the target
->insert({key, val});
lmap}, local_map, key, val);
}
Now we modify the insert loop to use promises instead of futures for tracking completion:
// create an empty promise, to be used for tracking operations
::promise<> prom;
upcxx// insert all key, value pairs into the hash map
for (long i = 0; i < N; i++) {
= to_string(upcxx::rank_me()) + ":" + to_string(i);
string key = key;
string val // pass the promise to the dmap insert operation
.insert(key, val, prom);
dmap// periodically call progress to allow incoming RPCs to be processed
if (i % 10 == 0) upcxx::progress();
}
::future<> fut = prom.finalize(); // finalize the promise
upcxx.wait(); // wait for the operations to complete fut
In our distributed hash table example, we create an empty promise, prom
, which has a dependency count of one. Then we register each insert
operation in turn on the promise, incrementing the dependency count each time, representing the unfulfilled results of the RPC operation used in the insert. Finally, when registration is complete, the original dependency is fulfilled to signal the end of the registration, with the prom.finalize
call. This call returns the future associated with the promise, so we can now wait on that future for all the operations to complete.
It’s instructive to compare this example with the similar example shown in the earlier section on Conjoining Futures. In that prior example, we used future conjoining to dynamically construct a dependency graph of futures. In that graph each asynchronous find
operation led to asynchronous execution of a local callback to process the resulting value, and completion of that callback was subsequently conjoined into the graph of dependencies, all leading to one future representing completion of the entire set of operations.
In the example above, we use a single promise
object to track all of the outstanding asynchronous dependencies generated by the asynchronous insert
operations, and the single corresponding future it produces to represent overall completion of the entire set of operations. We could have applied the future conjoining approach to this example, but that would have dynamically constructed many futures (each with a corresponding implicit promise), instead of a single promise object. Because the promise-based approach shown here constructs fewer objects at runtime, it leads to fewer CPU overheads and should generally be favored in situations where it applies; namely, when launching many asynchronous operations and the dependency graph is a trivial aggregation of all the completions, with no ordering constraints or asynchronous callbacks for individually processing results. This general recommendation applies not only to RPC-based communication, but to all forms of asynchrony (e.g. RMA, LPC and even collective communication), all of which support promise-based completion as an alternative to the default future-based completion.
Alternatively, the completion object provided to an RMA put can specify a remote procedure call. While the data payload involved can be of any size and the RPC body can be as simple as modifying a variable on the target process, use of upcxx::rput(remote_cx::as_rpc)
requires that the payload be TriviallySerializable and the destination address be known to the initiator. Satisfying these requirements enables use of zero-copy RMA (via zero-copy RDMA where appropriate). For large payloads, this can yield a performance advantage over alternatives like upcxx::rpc_ff()
that rely on copying the payload. By querying the value of a variable modified by the callback, RPC completion objects can be used to mimic the coupling of data transfer with synchronization found in message-passing programming models.
As an example, consider a heat transfer simulation in a 3D domain with a 1D decomposition. During each timestep, each process needs to update its interior cells, exchange halos with neighboring ranks, and update boundary cells once the necessary halos are received. RPC completion objects can be used to determine when these halos have arrived. The body of the RPC simply consists of incrementing a counter variable, which is later compared to the number of neighboring processes. As was the case in the last example, upcxx::progress
(discussed later) must be called to allocate resources to incoming RPCs. The execution of the RPC callback guarantees that the data contained in the rput
has been delivered. For this example, this means ghost cells from neighboring processes have arrived and can be used for calculation of the process’ boundary cells.
Pseudocode for this technique is shown below. A complete compilable code for the heat transfer example is available in the example/prog-guide
directory of the implementation.
int count = 0; // declared as a file-scope global variable
for (n in neighbors) {
// RMA put halo to remote process
::rput(n.outbuf, n.inbuf, n.outbuf.size(),
upcxx::as_rpc([](){ count++; }));
remote_cx}
// await incoming copies
while (count < neighbors.size())
::progress();
upcxx= 0; // reset for next timestep count
Progress is a key concept of the UPC++ execution model that programmers must understand to make most effective use of asynchrony. The UPC++ framework does not spawn any hidden OS threads to perform asynchronous work such as delivering completion notifications, invoking user callbacks, or advancing its internal state. All such work is performed synchronously inside application calls to the library. The rationale is this keeps the performance characteristics of the UPC++ runtime as lightweight and configurable as possible, and simplifies synchronization. Without its own threads, UPC++ is reliant on each application process to periodically grant it access to a thread so that it may make “progress” on its internals. Progress is divided into two levels: internal progress and user-level progress. With internal progress, UPC++ may advance its internal state, but no notifications will be delivered to the application. Thus the application cannot easily track or be influenced by this level of progress. With user-level progress, UPC++ may advance its internal state as well as signal completion of user-initiated operations. This could include many things: readying futures, running callbacks dependent on those futures, or invoking inbound RPCs.
The upcxx::progress
function, as already used in our examples, is the primary means by which the application performs the crucial task of temporarily granting UPC++ a thread:
::progress(upcxx::progress_level lev = upcxx::progress_level::user) upcxx
A call to upcxx::progress()
with its default arguments will invoke user-level progress. These calls make for the idiomatic points in the program where the user should expect locally registered callbacks and remotely injected RPCs to execute. There are also other UPC++ functions which invoke user-progress, notably including upcxx::barrier
and upcxx::future::wait
. For the programmer, understanding where these functions are called is crucial, since any invocation of user-level progress may execute RPCs or callbacks, and the application code run by those will typically expect certain invariants of the process’s local state to be in place.
Many UPC++ operations have a mechanism to signal completion to the application. However, for performance-oriented applications, UPC++ provides an additional asynchronous operation status indicator called progress_required
. This status indicates that further advancements of the calling thread’s internal-level progress are necessary so that completion of outstanding operations on remote entities (e.g. notification of delivery) can be reached. Once the progress_required
state has been left, UPC++ guarantees that remote processes will see their side of the completions without any further progress by the current thread. The programmer can query UPC++ to determine whether all operations initiated by the calling thread have reached a state at which they no longer require progress using the following function:
bool upcxx::progress_required();
UPC++ provides a function called upcxx::discharge()
which polls on upcxx::progress_required()
and asks for internal progress until progress is not required anymore. upcxx::discharge()
is equivalent to the following code:
while(upcxx::progress_required())
::progress(upcxx::progress_level::internal); upcxx
Any application entering a long lapse of inattentiveness (e.g. to perform expensive computations) is highly encouraged to call upcxx::discharge()
first.
Thought exercise:
The red-black example in an earlier section has a barrier marked “optional”. Is this barrier required for correctness? Why or why not? Are there any potential negative consequences to its removal?
As mentioned earlier, UPC++ does not spawn background threads for progressing asynchronous operations, but rather leaves control of when such progress is permissible to the user. To help the user in managing the coordination of internal state and threads, UPC++ introduces the concept of personas. An object of type upcxx::persona
represents a collection of UPC++’s internal state. Each persona may be active with at most one thread at any time. The active personas of a thread are organized in a stack, with the top persona denoted the current persona of that thread. When a thread enters progress, UPC++ will advance the progress of all personas it holds active.
When a thread is created, it is assigned a default persona. This persona is always at the bottom of the persona stack, and cannot be pushed onto the persona stack of another thread. A reference to the default persona of a thread can be retrieved using:
::persona& upcxx::default_persona(); upcxx
Additional personas can also be created by the application and pushed onto a thread’s persona stack, as described in subsequent sections. When a thread initiates an asynchronous operation, that operation is registered in the internal state managed by the thread’s current (top-of-the-stack) persona. A reference to the current persona of a thread can be retrieved using:
::persona& upcxx::current_persona(); upcxx
For any UPC++ operation issued by the current persona, the completion notification (e.g. future readying) will be sent to that same persona. This is still the case even if that upcxx::persona
object has been transferred to a different thread by the time the asynchronous operation completes. The key takeaway here is that a upcxx::persona
can be used by one thread to issue operations, then passed to another thread (together with the futures corresponding to these operations). That second thread will be then be notified of the completion of these operations via their respective futures. This can be used, for instance, to build a progress thread — a thread dedicated to progressing asynchronous operations. A more robust approach (as we’ll see in an upcoming section) is to use completion objects (see Completions) to execute a callback (more precisely a local procedure call, or LPC) on another persona when the operation is complete. We recommend using this second option as upcxx::future
and upcxx::promise
objects are not thread-safe and thus can only be safely referenced by a thread holding the persona used to create these objects.
The thread on each process that first initializes UPC++ via a call to upcxx::init()
(this is usually done in the main
function) is also assigned the master persona in addition to its default persona. The master persona is special in that it is the only persona in each process that can execute RPCs destined for this process or initiate collective operations (e.g., upcxx::barrier()
).
A thread can retrieve a reference to the master persona object by calling:
::persona& upcxx::master_persona(); upcxx
It’s important to understand the interaction of the master persona with RPC:
Remote Procedure Call (RPC) callbacks always execute sequentially and synchronously inside user-level progress for the thread holding the master persona, which is unique to each process. User-level progress only occurs inside a handful of explicit UPC++ library calls (e.g.
upcxx::progress()
andupcxx::future::wait()
).
This implies the following useful properties:
RPC callbacks never “interrupt” an application thread. They only execute synchronously inside a UPC++ library call (specified with user-level progress) from the thread holding the master persona.
RPC callbacks never run concurrently with other RPC callbacks on the same process. This means there can never be a data race between two RPC callbacks running on the same process. This remains true even with multiple application threads concurrently invoking UPC++ library calls, because the thread holding the master persona is unique to each process.
In a single-threaded application, RPC callbacks are not concurrent with any other code in the same process. This means single-threaded UPC++ applications should never need inter-thread concurrency control mechanisms (std::mutex, std::atomic, etc).
Note that all the properties above apply specifically to RPC (Remote Procedure Call, as injected by upcxx::rpc()
and upcxx::rpc_ff()
). This should not be confused with LPC (Local Procedure Call, as injected by upcxx::persona::lpc()
and upcxx::persona::lpc_ff()
and discussed in an upcoming section). LPCs are not restricted to the master persona, thus LPC callbacks associated with different personas can exhibit concurrency in multi-threaded applications.
Personas objects do not directly provide comparison operators, but they can be compared by address. For example:
if (&upcxx::current_persona() == &upcxx::master_persona())
std::cout << "This thread's current persona is the master persona" << std::endl;
However it’s important to keep in mind that each thread owns a stack of one or more active personas, all of which are progressed during UPC++ calls with internal- or user-level progress. Also, initiation of collective operations requires the calling thread to hold the master persona, but it need not be the current persona at the top of the persona stack. The persona::active_with_caller()
query may be used to search the entire active persona stack of the calling thread for a particular persona. For example:
if (upcxx::master_persona().active_with_caller())
std::cout << "This thread holds the master persona" << std::endl;
UPC++ provides a upcxx::persona_scope
class for modifying the current thread’s active stack of personas. The application is responsible for ensuring that a given persona is only ever on one thread’s active stack at any given time. Pushing and popping personas from the stack (hence changing the current persona) is accomplished via RAII semantics using the upcxx::persona_scope
constructor/destructor.
Here are the constructors:
::persona_scope(upcxx::persona &p);
upcxx
template<typename Lock>
::persona_scope(Lock &lock, upcxx::persona &p); upcxx
The upcxx::persona_scope
constructor takes the upcxx::persona
that needs to be pushed. Only one thread is permitted to use a given persona at a time. To assist in maintaining this invariant, the upcxx::persona_scope
constructor accepts an optional thread locking mechanism to acquire during construction and release during destruction (the Lock
template argument can be any type of lock, such as C++ std::mutex
).
The following toy example introduces several of these concepts, using two threads to exchange an integer containing a rank id fetched from a neighbor process (defined as (upcxx::rank_me() + 1)%upcxx::rank_n()
). A upcxx::persona
object progress_persona
is first created. This persona object is used by a thread called submit_thread
to construct a completion object which will execute an LPC on the progress_persona
as the completion notification for the subsequently issued rget
operation (the value fetched by the rget
is passed as an argument to the LPC callback that is eventually queued for execution by progress_persona
). The submit_thread
thread then waits and makes progress until the atomic variable thread_barrier
is set to 1. Meanwhile, another thread called progress_thread
pushes progress_persona
onto its persona stack by constructing an appropriate upcxx::persona_scope
(in this case we know by construction that only one thread will push this persona, so we can safely use the lock-free constructor). This thread then calls upcxx::progress()
repeatedly until the LPC attached to the completion object is delivered to progress_persona
where the callback sets the done
boolean to true.
int main () {
::init();
upcxx// create a landing zone, and share it through a dist_object
// allocate and initialize with local rank
::dist_object<upcxx::global_ptr<int>> dptrs(upcxx::new_<int>(upcxx::rank_me()));
upcxx::global_ptr<int> my_ptr = *dptrs;
upcxx// fetch my neighbor's pointer to its landing zone
::intrank_t neigh_rank = (upcxx::rank_me() + 1)%upcxx::rank_n();
upcxx::global_ptr<int> neigh_ptr = dptrs.fetch(neigh_rank).wait();
upcxx// declare an agreed upon persona for the progress thread
::persona progress_persona;
upcxx<int> thread_barrier(0);
atomicbool done = false;
// create the progress thread
( [&]() {
thread progress_thread// push progress_persona onto this thread's persona stack
::persona_scope scope(progress_persona);
upcxx// progress thread drains progress until work is done
while (!done)
::progress();
upcxx<<"Progress thread on process "<<upcxx::rank_me()<<" is done"<<endl;
cout//unlock the other threads
+= 1;
thread_barrier });
// create another thread to issue the rget
( [&]() {
thread submit_thread// create a completion object to execute a LPC on the progress_thread
// which verifies that the value we got was the rank of our neighbor
auto cx = upcxx::operation_cx::as_lpc( progress_persona, [&done, neigh_rank](int got) {
(got == neigh_rank);
UPCXX_ASSERT//signal that work is complete
= true;
done });
// use this completion object on the rget
::rget(neigh_ptr, cx);
upcxx// block here until the progress thread has executed all LPCs
while(thread_barrier.load(memory_order_acquire) != 1){
();
sched_yield::progress();
upcxx}
});
// wait until all threads finish their work
.join();
submit_thread.join();
progress_thread// wait until all processes are done
::barrier();
upcxxif ( upcxx::rank_me()==0 )
<<"SUCCESS"<<endl;
cout// delete my landing zone
::delete_(my_ptr);
upcxx::finalize();
upcxx}
As described in an earlier section, each process has a master persona which is special in that it is the only persona that can execute RPCs destined for this process or initiate collective operations.
The master persona can also be transferred between threads using upcxx::persona_scope
objects. However, due to its special nature, the thread which was initially assigned the master persona (the primordial thread) must first release it before other threads can push it. This is done by invoking the upcxx::liberate_master_persona()
function after upcxx::init()
and before any other changes to the persona stack on the primordial thread. This is of particular importance if one desires to implement a progress thread which will execute RPCs issued by remote processes.
As an example, we show how to implement such a progress thread in the distributed hash table example. We first modify the DistrMap
class so that a completion object is used to track the completion of the RPC issued by the find function. When the RPC completes, a function func
provided by the caller is executed as an LPC on the persona
provided by the caller as well:
// find a key and return associated value in a future
template <typename Func>
void find(const std::string &key, upcxx::persona & persona, Func func) {
// the value returned by the RPC is passed as an argument to the LPC
// used in the completion object
auto cx = upcxx::operation_cx::as_lpc(persona,func);
::rpc(get_target_rank(key), cx,
upcxx// lambda to find the key in the local map
[](dobj_map_t &lmap, const std::string &key) -> std::string {
auto elem = lmap->find(key);
if (elem == lmap->end()) return std::string(); // not found
else return elem->second; // key found: return value
},local_map,key);
}
Let’s now review how this can be used to implement a progress thread. A thread progress_thread
is created to execute incoming RPCs issued to this process, as well as operations submitted to a progress_persona
(in this example, this thread will execute N
LPCs). The master persona is therefore first released before the creation of the progress_thread
. Both the master persona and progress_persona
are pushed onto progress_thread
’s persona stack by constructing two upcxx::persona_scope
objects. The progress thread then calls upcxx::progress
until lpc_count
reaches 0,
Concurrently, ten threads are created to perform a total of N
find operations, using the progress_persona
to handle the completion of these operations. This completion notification enqueues an LPC to the progress_persona
which verifies that the received value corresponds to what was expected and decrements the lpc_count
counter by one. It’s noteworthy that although the LPC’s were concurrently enqueued to progress_persona
by several worker threads, the LPC callbacks are all executed (serially) on the progress thread; for this reason no synchronization is necessary when accessing the lpc_count
variable.
In the last few lines of the example, the primordial thread awaits completion of the progress thread and worker threads by calling std::thread::join
. When the progress thread exits, the upcxx::persona_scope
objects it declared are destructed, releasing the progress_persona
and master persona, and destroying its default persona (which was not used). The master persona is required for all collective operations, including upcxx::barrier()
and upcxx::finalize()
, but it was liberated upon exit of the progress thread. Therefore the primordial thread must re-acquire the master persona using a new upcxx::persona_scope
in order to coordinate an orderly job teardown,
// try to fetch keys inserted by neighbor
// note that in this example, keys and values are assumed to be the same
const int num_threads = 10;
* threads[num_threads];
thread // declare an agreed upon persona for the progress thread
::persona progress_persona;
upcxxint lpc_count = N;
// liberate the master persona to allow the progress thread to use it
::liberate_master_persona();
upcxx// create a thread to execute the assertions while lpc_count is greater than 0
( [&]() {
thread progress_thread// push the master persona onto this thread's stack
::persona_scope scope(upcxx::master_persona());
upcxx// push the progress_persona as well
::persona_scope progress_scope(progress_persona);
upcxx// wait until all assertions in LPCs are complete
while(lpc_count > 0) {
();
sched_yield::progress();
upcxx}
<<"Progress thread on process "<<upcxx::rank_me()<<" is done"<<endl;
cout});
// launch multiple threads to perform find operations
for (int tid=0; tid<num_threads; tid++) {
[tid] = new thread( [&,tid] () {
threads// split the work across threads
long num_asserts = N / num_threads;
long i_beg = tid * num_asserts;
long i_end = tid==num_threads-1?N:(tid+1)*num_asserts;
for (long i = i_beg; i < i_end; i++) {
= to_string((upcxx::rank_me() + 1) % upcxx::rank_n()) + ":" + to_string(i);
string key // attach callback, which itself runs a LPC on progress_persona on completion
.find(key, progress_persona,
dmap[key,&lpc_count](const string &val) {
(val == key);
UPCXX_ASSERT--;
lpc_count});
}
// discharge outgoing find operations before thread exit:
::discharge();
upcxx});
}
// wait until all threads are done
.join();
progress_threadfor (int tid=0; tid<num_threads; tid++) {
[tid]->join();
threadsdelete threads[tid];
}
{
// push the master persona onto the initial thread's persona stack
// before calling barrier and finalize
::persona_scope scope(upcxx::master_persona());
upcxx// wait until all processes are done
::barrier();
upcxxif (upcxx::rank_me() == 0 )
<<"SUCCESS"<<endl;
cout::finalize();
upcxx}
In the first example of this section, the submit_thread
waited in a progress loop until the progress_thread
completed its operations. However this stall was not necessary in the algorithm, because the submit thread was not waiting to receive any callbacks or perform any additional tasks; the stall loop just incurred wasted CPU overhead.
In the second example, we instead allow the worker threads to exit once they’ve completed injecting their find
operations. However because each worker thread has launched asynchronous operations (using its default persona) that might remain in-flight, it is critical that we call upcxx::discharge()
before allowing the thread to exit. As described in the section on Discharge, this instructs the runtime to ensure that outgoing operations initiated by this thread’s personas no longer need further internal progress by this thread to reach completion. Once discharge
returns, each worker thread may safely terminate, implicitly destroying its default persona.
As a general rule, application threads which have launched UPC++ asynchronous operations may not exit, destroying their default persona, until they have:
- reaped any completion notifications scheduled for that persona (e.g. future readying), AND
- completed a
upcxx::discharge()
call to ensure any outgoing operations initiated by thread personas no longer require internal progress.
The examples presented in this chapter thus far use C++ std::thread
for simplicity of presentation. However UPC++ is designed to be agnostic to the specific threading model, in order to improve interoperability with a variety of on-node programming models. UPC++ interoperates with other threading mechanisms including OpenMP or POSIX threads, where the same general concepts and techniques apply.
The next section will demonstrate use of UPC++ with OpenMP.
Some care must be exercised when mixing blocking operations provided by two threaded libraries (such as UPC++ and OpenMP) to avoid deadlock. It’s important to understand that a thread blocked by one library will generally only advance the internal state of that library. This means if one thread is blocking for an OpenMP condition and another thread is blocking for a UPC++ condition, then they might not be attentive to servicing work from the opposite library, potentially creating a deadlock cycle.
A simple example involves a thread waiting at an OpenMP barrier (which is implicit upon the closing brace of an OpenMP parallel region) while another thread from that same region sends it a UPC++ LPC. The recipient thread is inattentive to UPC++ progress, meaning the sending thread will never have its LPC completed no matter how many times upcxx::progress
is called by the initiating thread. As such, it is unable to join the other thread at the barrier, leading to deadlock.
A similar deadlock scenario appears below, where, where one thread stalls in upcxx::future::wait()
awaiting completion of a round-trip UPC++ RPC, but the thread holding the master persona at the target process has stalled in an OpenMP barrier, preventing the RPC from executing.
::intrank_t me = upcxx::rank_me();
upcxx::intrank_t n = upcxx::rank_n();
upcxx
#pragma omp parallel num_threads(2)
{
// Assume thread 0 has the master persona...
if(omp_get_thread_num() == 1) {
// Bounce an rpc off buddy rank and wait for it.
::rpc((me^1) % n,[=](){}).wait();
upcxx}
// Here comes the implicit OpenMP barrier at the end of the parallel region.
// While in the OpenMP barrier, the master persona can't respond to RPCs. So
// we can't reply to our buddy's message and therefore delay the buddy's
// sending thread from making it to the OpenMP barrier. And if our buddy is
// in the same situation with respect to our message then we deadlock.
}
Two solutions will be provided in this section for avoiding this type of deadlock. The first involves separating the thread with the master persona from other threads so it can explicitly service incoming work. Inside the omp parallel
region, the other threads will perform the same work as before, while the master persona thread will execute a upcxx::progress
loop. The satisfaction of that loop condition should indicate that all local worker threads have had their RPCs completed, which implies that the master persona can safely proceed to the implicit OpenMP barrier and be guaranteed that thread barrier will complete immediately. An example is shown below, which uses an atomic integer to track whether all non-master threads have completed the RPC they issued. Once the counter indicates this, the master thread joins the other threads at the OpenMP barrier.
const int me = upcxx::rank_me();
const int n = upcxx::rank_n();
const int buddy = (me^1)%n;
const int tn = thread_count; // threads per process
std::atomic<int> done(1); // master thread doesn't do worker loop
#pragma omp parallel num_threads(tn)
{
(tn == omp_get_num_threads());
UPCXX_ASSERT// OpenMP guarantees master thread has rank 0
if (omp_get_thread_num() == 0) {
(upcxx::master_persona().active_with_caller());
UPCXX_ASSERTdo {
::progress();
upcxx} while(done.load(std::memory_order_relaxed) != tn);
} else { // worker threads send RPCs
::future<> fut_all = upcxx::make_future();
upcxxfor (int i=0; i<10; i++) { // RPC with buddy rank
::future<> fut = upcxx::rpc(buddy,[](int tid, int rank){
upcxxstd::cout << "RPC from thread " << tid << " of rank "
<< rank << std::endl;
},omp_get_thread_num(),me);
= upcxx::when_all(fut_all,fut);
fut_all }
.wait(); // wait for thread quiescence
fut_all++; // worker threads can sleep at OpenMP barrier
done}
} // <-- this closing brace implies an OpenMP thread barrier
::barrier(); // wait for other ranks to finish incoming work upcxx
As mentioned previously, UPC++ futures and promises are persona-specific, even with UPCXX_THREADMODE=par
. As such, when using them with OpenMP threads it’s important to make sure a future or promise is not shared between threads. This example is safe because by default variables declared inside an OpenMP parallel region are thread-private.
It should be noted that this method only works when just the master thread is needed to execute incoming code. When all threads are needed, then they all must enter a loop that calls upcxx::progress()
until local UPC++ quiescence is achieved. An example of this approach appears below. The body of the OpenMP for
loop has an rput
that puts the process’ rank in global memory. The completion object is an LPC that executes on an arbitrary local thread, and executes an rget
which simply validates the correct value was put and increments an atomic counter to check that all remote operations are done. However, the rget
is in an LPC that runs on a (potentially) different persona than the rput
. The while
loop on the final two lines before the closing brace ensures that none of the threads holding the personas are put to sleep by the OpenMP runtime until all callbacks have finished. Until then, progress is called to service them.
const int n = upcxx::rank_n();
const int me = upcxx::rank_me();
const int tn = thread_count; // threads per process
<upcxx::global_ptr<int>> ptrs = setup_pointers(n);
vectorstd::vector<upcxx::persona*> workers(tn);
std::atomic<int> done_count(0);
#pragma omp parallel num_threads(tn)
{
// all threads publish their default persona as worker
int tid = omp_get_thread_num();
[tid] = &upcxx::default_persona();
workers#pragma omp barrier
// launch one rput to each rank
#pragma omp for
for(int i=0; i < n; i++) {
::rput(&me, ptrs[(me + i)%n] + me, 1,
upcxx// rput is resolved in continuation on another thread
::operation_cx::as_lpc(*workers[(tid + i) % tn], [&,i]() {
upcxx// fetch the value just put
::rget(ptrs[(me + i)%n] + me).then([&](int got) {
upcxx(got == me);
UPCXX_ASSERT_ALWAYS++;
done_count});
})
);
}
// each thread drains progress until all work is quiesced
while(done_count.load(std::memory_order_relaxed) != n)
::progress();
upcxx} // <-- this closing brace implies an OpenMP thread barrier
::barrier(); upcxx
Lastly, while the examples in this section used OpenMP constructs to launch asynchronous operations, OpenMP can also be used within the operations themselves. However, it should be noted that in a UPC++ context the same limitations on parallelism apply as in standalone OpenMP code. This means for instance that if a callback containing #pragma omp parallel
is dynamically nested within a code block that has its own parallel construct, then there won’t be any additional parallelism to utilize since all threads are in use. If a developer wants to ensure that callbacks have access to more threads than the code block from which they are run, he can use the omp_set_max_active_levels()
routine.
A UPC++ team is an ordered set of processes and is represented by a upcxx::team
object. For readers familiar with MPI, teams are similar to MPI_Communicator
s and MPI_Group
s. The default team for most operations is upcxx::world()
which includes all processes. Creating a team is a collective operation and may be expensive. It is therefore best to do it in the set-up phase of a calculation.
team::split
New upcxx::team
objects can be created by collectively splitting another team using the upcxx::team::split()
member function. This is demonstrated in the following example, which creates teams consisting of the processes having odd and even ranks in upcxx:world()
by using (upcxx::rank_me() % 2)
as the color argument to split()
. It is worth noting that the key argument is used to sort the members of the newly created teams, and need not be precisely the new rank as in this example.
::team & world_team = upcxx::world();
upcxxint color = upcxx::rank_me() % 2;
int key = upcxx::rank_me() / 2;
::team new_team = world_team.split(color, key); upcxx
A team object has several member functions. The local rank of the calling process within a team is given by the rank_me()
function, while rank_n()
returns the number of processes in the team.
The team::split()
function is a very powerful and convenient means for subdividing teams into smaller teams, and/or creating a new team that renumbers process ranks in an existing team. However this function semantically requires collective communication across the parent team to establish new team boundaries, and thus can entail non-trivial cost when the parent team is large.
team::create
In cases where processes can cheaply/locally compute the membership of a new team they wish to construct, the team::create()
member function enables team construction with lower communication overheads than team::split()
. Here is an example that further partitions the new_team
created in the prior example, assigning processes into pair-wise teams:
::intrank_t group = new_team.rank_me() / 2; // rounds-down
upcxx::intrank_t left = group * 2;
upcxx::intrank_t right = left + 1;
upcxxstd::vector<upcxx::intrank_t> members({left});
if (right != new_team.rank_n()) // right member exists
.push_back(right);
members::team sub_team = new_team.create(members); upcxx
As with team::split()
, the team::create()
function is collective over a parent team, in this case new_team
. Each caller passes an ordered sequence of ranks in that parent team, enumerating the participants of the new sub team it will join. Here we see each process uses new_team.rank_me()
to query its rank in the parent team and uses that to construct a std::vector
of rank indexes selecting members for the team it will construct. team::create()
accepts any ordered C++ Container, or alternatively a pair of InputIterators delimiting the rank sequence. Either way, the sequence passed is required to match across all callers joining the same team, and (as with team::split()
) each process joins (at most) one team per call.
As previously mentioned, the team member functions rank_me()
and rank_n()
can be used to respectively retrieve the rank of the calling process in that team and the number of ranks in the team. There is also an id()
function that returns a trivially copyable team_id
object representing a universal name identifying the team.
The global rank (in the world()
team) of any team member can be retrieved using the []
operator. The upcxx::team::from_world()
function converts a global rank from the world()
team into a local rank within a given team. This function takes either one or two arguments:
::intrank_t upcxx::team::from_world(intrank_t world_index) const;
upcxx::intrank_t upcxx::team::from_world(upcxx::intrank_t world_index,
upcxx::intrank_t otherwise) const; upcxx
In the first case, the process with rank world_index
in world()
MUST be part of the team, while in the second overload, the otherwise
value will be returned if that process is not part of the team. We can therefore modify the first example of this section to do the following (for simplicity, we assume an even number of processes):
::team & world_team = upcxx::world();
upcxxint color = upcxx::rank_me() % 2;
int key = upcxx::rank_me() / 2;
::team new_team = world_team.split(color, key);
upcxx
::intrank_t local_rank = new_team.rank_me();
upcxx::intrank_t local_count = new_team.rank_n();
upcxx
::intrank_t world_rank = new_team[(local_rank+1)%local_count];
upcxx::intrank_t expected_world_rank = (upcxx::rank_me() + 2) % upcxx::rank_n();
upcxx(world_rank == expected_world_rank);
UPCXX_ASSERT
::intrank_t other_local_rank = new_team.from_world(world_rank);
upcxx(other_local_rank == (local_rank+1)%local_count);
UPCXX_ASSERT::intrank_t non_member_rank =
upcxx.from_world((upcxx::rank_me()+1)%upcxx::rank_n(),-1);
new_team(non_member_rank == -1);
UPCXX_ASSERT
.destroy(); // collectively release the sub-team new_team
In addition to the upcxx::world()
team, another special team is available that represents all the processes sharing the same physical shared-memory node. This team is obtained by calling the upcxx::local_team()
function, and has the following very useful property:
All members of
local_team()
with aupcxx::global_ptr<T> gptr
referencing shared memory allocated by any member of that team are guaranteed to seegptr.is_local() == true
, andgptr.local()
will return a validT*
(raw C++ pointer) to the memory.
This is particularly important if one wants to optimize for shared-memory bypass. For example, it means that one can apply the techniques described in the section Downcasting global pointers to obtain direct C++ pointers to shared objects owned by other processes who are members of local_team()
. This works because at startup the UPC++ runtime ensures that all processes in the local_team()
automatically map each other’s shared host memory segments into virtual memory, allowing for direct load/store access to that memory by the CPU.
In the following example, one process per physical node loads data from a file into shared memory, then it allows access to such data by broadcasting a global pointer to it. By doing this, we avoid multiple processes loading the same data, and we improve memory scalability by a factor of local_team().rank_n()
.
size_t n = 0;
::global_ptr<double> data;
upcxx
if (!upcxx::local_team().rank_me()) { // I'm the leader process in this node
// Open the file in binary mode
std::ifstream input_file(filename, std::ios::binary);
// Ensure any I/O errors will throw an exception
.exceptions(std::ifstream::failbit);
input_file
// How many elements am I supposed to read?
.read(reinterpret_cast<char*>(&n), sizeof(n));
input_file
= upcxx::new_array<double>(n); // Allocate space in shared memory
data
// Read the entire array of doubles from the file
.read(reinterpret_cast<char*>(data.local()), sizeof(double)*n);
input_file}
// Leader broadcasts data to other processes in the local team (implicit barrier)
std::tie(n, data) =
(std::make_tuple(n, data), 0, upcxx::local_team()).wait();
broadcast
double *ldata = data.local(); // Downcast global pointer
(n, ldata); // Access shared data using local ptr process_data
Collectives are intimately tied to the notion of Teams. Collectives in UPC++ all operate on the upcxx::world()
team by default, and always have to be initiated by a thread holding the master persona on each process (See Personas). As in other programming models, such as MPI, each collective call in a UPC++ program must be initiated in the same order (with compatible arguments) in every participating process.
One of the most useful and basic collective operations in parallel programming is the barrier operation. UPC++ provides two flavors of barrier synchronization:
void upcxx::barrier(upcxx::team &team = upcxx::world());
template<typename Cx=/*unspecified*/>
::barrier_async(upcxx::team &team = upcxx::world(),
RType upcxx&&completions=upcxx::operation_cx::as_future()); Cx
The first variant is a blocking barrier on team
and will return only after all processes in the team have entered the call.
The second variant is an asynchronous barrier which by default returns a future (See Completions). This future is signaled when all processes in the team have initiated the asynchronous barrier.
It is important to note that although collectives must be issued in the same order by all participants, asynchronous operations (collective or otherwise) are not guaranteed to complete in any particular order relative to other in-flight operations. In particular:
Neither
barrier
norbarrier_async
act as a “flush” of outstanding asynchronous operations. UPC++ does not provide any calls that implicitly “fence” or “flush” outstanding asynchronous operations; all operations are synchronized explicitly via completions.
In other words, issuing and completing a barrier does NOT guarantee completion of asynchronous operations issued before the barrier and not yet explicitly synchronized.
This property may seem counter-intuitive, but it enables the communication layer to maximize utilization of the network fabric, which is often fundamentally unordered at the lowest levels. As discussed in Asynchronous Computation, relaxed ordering guarantees for asynchronous operations enable more efficient implementations, but the programmer must not assume any ordering of asynchronous operation completion that is not enforced by explicit synchronization. As discussed in Quiescence, issuing an asynchronous operation and failing to later synchronize its completion is an error. A corollary of these properties:
Discarding a
upcxx::future
returned from a UPC++ call is almost always a bug.
In fact, the implementation uses compiler annotations (where supported) to encourage a compiler warning for this type of potential defect.
Another fundamental collective operation is the broadcast operation, which is always an asynchronous operation in UPC++ and defaults to returning a upcxx::future
. There are two variants:
template <typename T, typename Cx=/*unspecified*/>
::broadcast(T value, upcxx::intrank_t root,
RType upcxx::team &team = upcxx::world(),
upcxx&&completions=upcxx::operation_cx::as_future());
Cx
template <typename T, typename Cx=/*unspecified*/>
::broadcast(T *buffer, std::size_t count, upcxx::intrank_t root,
RType upcxx::team &team = upcxx::world(),
upcxx&&completions=upcxx::operation_cx::as_future()); Cx
The first variant transfers an object of type T
from the process with (team-relative) rank root
to all processes in team
. The return value is a upcxx::future<T>
containing the broadcast value. The second variant transfers count
objects of type T
stored in an array referenced by buffer
, and its default return value is an object of type upcxx::future<>
. All participants in a broadcast collective must “agree” on which team member is the source of the broadcast, and in particular all callers must pass the same value for the root
argument. This is referred to as a single-valued constraint in a collective operation; in the second broadcast
variant the count
argument is also required to be single-valued.
These functions can be used in the following way:
std::vector<int> buffer;
size_t elem_count{};
if (upcxx::rank_me() == 0) {
= 42; // initially only rank 0 knows the element count
elem_count }
// launch a scalar broadcast of element count from rank 0
::future<size_t> fut = upcxx::broadcast(elem_count, 0);
upcxx
// do some overlapped work like preparing buffer for next broadcast
if (upcxx::rank_me() == 0) {
.reserve(elem_count);
bufferfor (size_t i = 0; i < elem_count; i++)
[i] = (int)i;
buffer}
= fut.wait(); // complete first broadcast
elem_count .reserve(elem_count); // non-zero ranks allocate vector space
buffer
// launch a bulk broadcast of element data from rank 0
::future<> fut_bulk = upcxx::broadcast( buffer.data(), elem_count, 0);
upcxx
// wait until the second broadcast is complete
.wait();
fut_bulk
// consume elements
for (size_t i = 0; i < buffer.size(); i++)
(buffer[i] == (int)i); UPCXX_ASSERT_ALWAYS
UPC++ also provides collectives to perform reductions. Each of these variants applies a binary operator op
to the input data. This operator can either be one of the built-in operators provided by the library, or can be a user-defined function (for instance a lambda).
template <typename T, typename BinaryOp , Cx=/*unspecified*/>
::reduce_one(T value, BinaryOp &&op, upcxx::intrank_t root,
RType upcxx::team &team = upcxx::world(),
upcxx&&completions=upcxx::operation_cx::as_future());
Cx
template <typename T, typename BinaryOp, Cx=/*unspecified*/>
::reduce_all(T value, BinaryOp &&op,
RType upcxx::team &team = upcxx::world(),
upcxx&&completions=upcxx::operation_cx::as_future());
Cx
template <typename T, typename BinaryOp, Cx=/*unspecified*/>
::reduce_one(const T *src, T *dst, size_t count, BinaryOp &&op,
RType upcxx::intrank_t root, upcxx::team &team = upcxx::world(),
upcxx&&completions=upcxx::operation_cx::as_future());
Cx
template <typename T, typename BinaryOp, Cx=/*unspecified*/>
::reduce_all(const T *src, T *dst, size_t count, BinaryOp &&op,
RType upcxx::team &team = upcxx::world(),
upcxx&&completions=upcxx::operation_cx::as_future()); Cx
Similar to upcxx::broadcast
, the first two variants reduce an object value
of type T
and by default return a upcxx::future<T>
containing the resulting value. The second set of variants perform a “multi-field” element-wise reduction on contiguous buffers of count
objects of type T
pointed by src
(where count
must be single-valued). They place the reduced values in a contiguous buffer pointed by dst
, and default to returning a upcxx::future<>
for tracking completion.
When using the upcxx::reduce_one
functions, the reduction result will be available only at the root
process (which must be single-valued), and the result is undefined on other processes. For example:
int sum_to_P0 = upcxx::reduce_one(upcxx::rank_me()*42, upcxx::op_fast_add, 0).wait();
On the other hand, when using the upcxx::reduce_all
functions, a reduction result will be provided to all processes belonging to team
. For example:
int sum_to_all = upcxx::reduce_all(upcxx::rank_me()*42, upcxx::op_fast_add).wait();
UPC++ provides built-in reduction operators (op_fast_add
, op_fast_mul
, op_fast_min
, op_fast_max
, op_fast_bit_and
, op_fast_bit_or
, and op_fast_bit_xor
) which may be hardware-accelerated on systems with collectives offload support.
In the following example, each process finds its local minimum value (of type std::complex<double>
), and then it calls upcxx::reduce_one
providing a lambda as the op
argument and 0
as the root process,
// Find local minimum
= *std::min_element(arr, arr + N, [](const Complex& a, const Complex& b) {
Complex lmin return std::norm(a) < std::norm(b);
});
// Find global minimum and its location (MINLOC)
auto gmin = upcxx::reduce_one(std::make_pair(lmin, upcxx::rank_me()),
[](const std::pair<Complex, int>& a, const std::pair<Complex, int>& b) {
auto norma = std::norm(a.first), normb = std::norm(b.first);
if (norma == normb && b.second < a.second) return b;
else return (norma <= normb ? a : b);
}, 0).wait();
Notice that the comparison involves both components of the pair. That is because reduction routines require op
to be associative and commutative. In the above example, if two or more processes provide the same Complex
value, we use their rank to break the tie and guarantee the reproducibility of the operation. In general, user-provided reduction operators over a class type that return a provided input value (e.g., min or max) will need to consult all the non-static fields of the operands when some fields happen to be equal, in order to ensure commutativity of the operation.
The rput
and rget
operations assume a contiguous buffer of data at the source and destination processes. There are specialized forms of these RMA functions for moving non-contiguous groups of buffers in a single UPC++ call. These functions are denoted by a suffix [rput,rget]_[irregular,regular,strided]
. The rput_irregular
operation is the most general: it takes a set of buffers at the source and destination where the total data size of the combined buffers and their element data type need to match on both ends. The rput_regular
operation is a specialization of rput_irregular
, where every buffer in the collection is the same size. The rput_strided
operation is even more specialized: there is just one base address specified for each of the source and destination regions, together with stride vectors describing a dense multi-dimensional array transfer.
constexpr int sdim[] = {32, 64, 32};
constexpr int ddim[] = {16, 32, 64};
constexpr ptrdiff_t elem_sz = (ptrdiff_t)sizeof(float);
::future<> rput_strided_example(float* src_base, upcxx::global_ptr<float> dst_base) {
upcxxreturn upcxx::rput_strided<3>(
, {{elem_sz, sdim[0]*elem_sz, sdim[0]*sdim[1]*elem_sz}},
src_base, {{elem_sz, ddim[0]*elem_sz, ddim[0]*ddim[1]*elem_sz}},
dst_base{{4, 3, 2}});
}
::future<> rget_strided_example(upcxx::global_ptr<float> src_base, float* dst_base) {
upcxxreturn upcxx::rget_strided<3>(
, {{elem_sz, sdim[0]*elem_sz, sdim[0]*sdim[1]*elem_sz}},
src_base, {{elem_sz, ddim[0]*elem_sz, ddim[0]*ddim[1]*elem_sz}},
dst_base{{4, 3, 2}});
}
The strided example code snippet above corresponds to the translation data motion shown in Figure 4. By altering the arguments to the _strided
functions a user can implement various transpose and reflection operations in an N-dimensional space within their non-contiguous RMA operation. Another common use case is performing RMA on a non-contiguous boundary plane of a dense multi-dimensional array, for example as part of a rectangular halo exchange in a simulation with a multi-dimensional domain decomposition. Note the strides are expressed in units of bytes; this enables users to work with data structures that include padding.
For more general data structures a user can use the _irregular
functions:
<particle_t*, size_t> src[]={{srcP+12, 22}, {srcP+66, 12}, {srcP, 4}};
pair<upcxx::global_ptr<particle_t>, size_t> dest[]={{destP, 38}};
pairauto f = upcxx::rput_irregular(src, end(src), dest, end(dest));
.wait(); f
The user here is taking subsections of a source array with particle_t
element type pointed to by srcP
and copying them to the location pointed to by destP
. This example also shows how data can be shuffled in a non-contiguous RMA operation; the user is responsible for ensuring their source and destination areas specify equivalent amounts of data. As with all RMA operations, all variants of rput and rget assume the element type is TriviallySerializable (byte-copyable).
RPC’s transmit their arguments across process boundaries using the mechanism of serialization, which is type-specific logic responsible for encoding potentially rich C++ objects to and from raw byte streams. UPC++ provides built-in support for serializing trivially copyable types (e.g. primitive types, and C-like structs or other classes which report as std::is_trivially_copyable
). It also supports serialization for most STL containers (e.g. std::vector
, std::list
) when the elements they contain are trivially copyable (or otherwise Serializable). However, in many cases it may be necessary to add application-specific serialization for application-specific objects. Common motivations for this include:
UPC++ defines the concepts TriviallySerializable and Serializable that describe what form of serialization a C++ type supports. Figure 5 helps summarize the relationship of these concepts.
A type T
is TriviallySerializable if it is semantically valid to copy an object by copying its underlying bytes, and UPC++ serializes such types by making a byte copy. A type T
is considered to be TriviallySerializable if either of the following holds:
T
is TriviallyCopyable (i.e. std::is_trivially_copyable<T>::value
is true
), and does not implement any of the class serialization interfaces described in the following sections.upcxx::is_trivially_serializable<T>
is specialized to provide a member constant value
that is true
.In the latter case, UPC++ treats the type T
as if it were TriviallyCopyable for the purposes of serialization. Thus, UPC++ will serialize an object of type T
by making a byte copy, and it will assume T
is TriviallyDestructible when destroying a deserialized object of type T
.
Certain UPC++-provided types such as upcxx::global_ptr<T>
and upcxx::team_id
are also defined as TriviallySerializable.
A type T
is Serializable (the more general concept) if one of the following holds:
T
is TriviallySerializableT
is of class type and implements one of the class serialization interfaces described in the following sections.T
is explicitly defined as Serializable by the UPC++ specification. This includes STL containers (std::vector
, std::tuple
, std::array
, std::map
etc.) where the elements are Serializable, and certain other types including std::string
and upcxx::view<T>
.The type trait upcxx::is_trivially_serializable<T>
provides a member constant value
that is true
if T
is TriviallySerializable and false
otherwise. This trait may be specialized for user types (types that are not defined by the C++ or UPC++ standards), for example:
namespace upcxx {
template<>
struct is_trivially_serializable<::T> : ::std::true_type {};
}
The type trait upcxx::is_serializable<T>
provides a member constant value
that is true
if T
is Serializable and false
otherwise. This trait may NOT be specialized by the user for any types.
Many UPC++ communication operations (notably RMA and collectives) assert that the objects to be transferred are of TriviallySerializable type. This is to ensure these operations are amenable to hardware acceleration, such as the use of RDMA network hardware to accelerate rput
and rget
operations. UPC++ RPC operations are inherently two-sided (they semantically require CPU interaction on both sides), and thus arguments to RPC are only required to be Serializable (not restricted to TriviallySerializable).
Note that serializability of a type T
does not imply that objects of type T
are meaningful on another process. In particular, C++ pointer-to-object and pointer-to-function types are TriviallySerializable, but it is generally invalid to dereference a local pointer that originated from another process. More generally, objects that represent local process resources (e.g., file descriptors) are usually not meaningful on other processes, whether their types are Serializable or not.
The following sections discuss three ways in which UPC++ programmers can implement application-specific serialization logic: field/value-based serialization, custom serialization, and view-based serialization. Each method offers different tradeoffs on productivity, flexibility, and performance.
The simplest and easiest-to-use mechanisms for defining serialization of application-specific class types in UPC++ are the UPCXX_SERIALIZED_FIELDS
and UPCXX_SERIALIZED_VALUES
macros, which specify a set of member fields or values to comprise the serialized representation of a given type.
In UPCXX_SERIALIZED_FIELDS
the programmer passes a list of non-static member field names in a class, and the UPC++ library automatically generates the needed serialization logic to transmit those fields (and only those fields) whenever an instance of that class is sent as part of an RPC. These fields themselves must be Serializable. For UPCXX_SERIALIZED_FIELDS
to be applied to a class, that class must also have a default constructor.
In UPCXX_SERIALIZED_VALUES
the programmer passes a list of expressions to be evaluated every time an object of the given class type is serialized. The results of these expressions are then transmitted, and must themselves be Serializable. For a class to use UPCXX_SERIALIZED_VALUES
it must also declare a constructor that accepts the values resulting from the expressions passed to UPCXX_SERIALIZED_VALUES
.
The example below illustrates using UPCXX_SERIALIZED_FIELDS
to serialize a large object containing a large nested array. While this object is Serializable without UPCXX_SERIALIZED_FIELDS
, using UPCXX_SERIALIZED_FIELDS
allows us to significantly reduce the number of bytes transmitted each time an instance of this class is serialized (assuming only the partial_sum_reduction
field is needed in the remote logic, and not values
).
class dist_reduction {
public:
// The values to perform a sum reduction across
double values[N];
// Used to store a local sum reduction result on each rank
double partial_sum_reduction;
// Default constructor used by UPC++ deserialization
() {
dist_reductionfor (int i = 0; i < N; i++) { values[i] = 1.; }
}
void calculate_partial_sum_reduction() {
= 0.0;
partial_sum_reduction for (int i = 0; i < N; i++) { partial_sum_reduction += values[i]; }
}
(partial_sum_reduction)
UPCXX_SERIALIZED_FIELDS};
The following example implements similar semantics to the previous example code, but using UPCXX_SERIALIZED_VALUES
. Note the addition of a constructor that accepts the single double
value that comprises the serialized representation. This constructor is invoked at the target process to instantiate the deserialized object.
class dist_reduction {
public:
// The values to perform a sum reduction across
double values[N];
// Used to store a local sum reduction result on each rank
double partial_sum_reduction;
// Constructor used by UPC++ deserialization
(double _partial_sum_reduction) {
dist_reduction= _partial_sum_reduction;
partial_sum_reduction }
() {
dist_reductionfor (int i = 0; i < N; i++) { values[i] = 1.; }
}
double calculate_partial_sum_reduction() const {
double partial_sum_reduction = 0.0;
for (int i = 0; i < N; i++) { partial_sum_reduction += values[i]; }
return partial_sum_reduction;
}
(calculate_partial_sum_reduction())
UPCXX_SERIALIZED_VALUES};
The decision tree below can help users determine how to implement serialization for a class of type T
. It is not exhaustive of all possible use cases, but rather is designed to provide general guidance. Fields F1,...,Fn
are defined as the non-static members of T
that encompass all the object state required to represent a serialized T
.
Are ALL non-static fields of T
TriviallyCopyable?
YES:
Is ANY non-static field of T
meaningful only on the local process (such as a raw C++ pointer, C++ reference, or other “process-local resource”)?
YES: goto <NON-TRIVIAL-SERIALIZATION>
NO:
Is T
a TriviallyCopyable class?
YES: Result: Use trivial serialization, T
should already be TriviallySerializable
NO:
Does T
have any non-trivial constructors, assignment operators or destructors that need to perform actions during (de)serialization?
YES: goto <NON-TRIVIAL-SERIALIZATION>
NO: Result: Force trivial serialization by specializing upcxx::is_trivially_serializable<T>
(see Serialization Concepts)
NO: goto <NON-TRIVIAL-SERIALIZATION>
NON-TRIVIAL-SERIALIZATION:
Do you have access to change the implementation of T
?
YES:
Is any field F1,...,Fn
a raw C++ pointer, a “process-local resource”, or a std
container around these?
YES:
Can you write an expression to convert each field needed by serialization into a meaningfully Serializable object?
YES: Result: UPCXX_SERIALIZED_VALUES(expr(F1),...,expr(Fn))
NO: Result: Use Custom Serialization by defining T::upcxx_serialization
NO:
Are all of the fields F1,...,Fn
Serializable and Destructible?
YES:
Is T
DefaultConstructible AND none of the fields F1,...,Fn
are references, const-qualified, or std
container around these?
YES: Result: UPCXX_SERIALIZED_FIELDS(F1,...,Fn)
NO: Result: UPCXX_SERIALIZED_VALUES(F1,...,Fn)
NO: Result: Implement Serialization for the relevant field types that need it and restart from the top of this decision tree.
NO: Result: Use Custom Serialization by specializing upcxx::serialization<T>
Definitions: |
---|
* “process-local resource”: data that is only meaningful to the calling process. Examples include I/O objects like std::ofstream or POSIX file descriptor, non-Serializable library types like std::thread , std::atomic or upcxx::device_allocator and other opaque process-specific resource handles. |
* TriviallyCopyable type: all scalar types, any class type where every copy/move constructor, copy/move assignment operator and destructor is trivial or deleted, and C++ arrays of such types. |
While UPCXX_SERIALIZED_FIELDS
and UPCXX_SERIALIZED_VALUES
are powerful in their simplicity, there are many use cases that require more complex serialization logic than can be expressed in a simple expression. One common example is deep copying pointers in member fields. While the pointer itself is trivially copyable, it is not useful on a remote process. Usually, the desired behavior is that the object referenced by the pointer is itself serialized into the same byte stream as the object pointing to it so that they can both be reconstructed on the remote rank. However in other cases, we may want such a pointer to be cleared or set to reference a pre-existing object at the target.
For this and other complex logic, UPC++ offers custom serialization. In custom serialization, the programmer is responsible for manually serializing and deserializing a C++ object to/from a byte stream using Writer
and Reader
objects to push and pull values into and out of that stream. The programmer implements (1) a serialize
method that accepts a Writer
and the object to be serialized., and (2) a deserialize
method that accepts a Reader
and a pointer to a block of memory the same size as the type being deserialized.
There are two ways in which these serialize
and deserialize
methods can be declared. First, they can be declared as member methods in a public, nested member type named upcxx_serialization
of the class to be serialized/deserialized. Second, they can be declared in a specialization of upcxx::serialization<T>
where T
is the type to be serialized/deserialized, with public serialize
and deserialize
methods. These two mechanisms are respectively intended to serve the separate use cases of (1) serialization implemented by the class author, and (2) serialization added to a pre-existing class.
The example below illustrates custom serialization using a public, nested upcxx_serialization
class for an application-defined vertex
class which stores a list of neighbor vertices. The serialize
method writes the ID of the vertex, the number of neighbors it has, and then the ID of each of those neighbors. The deserialize
method then extracts this information on the remote rank and uses it to reconstruct an instance of vertex
.
class vertex {
int id;
std::vector<vertex *> neighbors;
public:
(int _id) : id(_id) { }
vertexint get_id() const { return id; }
bool has_edge(int other) const;
void add_neighbor(int neighbor_id);
// An example of using a member struct upcxx_serialization to implement
// custom serialization for the vertex class:
struct upcxx_serialization {
template<typename Writer>
static void serialize (Writer& writer, vertex const & object) {
.write(object.get_id());
writer.write(object.neighbors.size());
writerfor (vertex *neighbor : object.neighbors) {
.write(neighbor->get_id());
writer}
}
template<typename Reader, typename Storage>
static vertex* deserialize(Reader& reader, Storage storage) {
int id = reader.template read<int>();
size_t n_neighbors = reader.template read<size_t>();
*v = storage.construct(id);
vertex for (size_t n = 0; n < n_neighbors; n++) {
->add_neighbor(reader.template read<int>());
v}
return v;
}
};
};
Note that custom serialization can also be applied to classes that are not defined by the application, which instead might be defined by a library the application is using. A specialization of upcxx::serialization<T>
can be created to (de)serialize the accessible state of T
, where T
is the type to be serialized/deserialized, regardless of where T
is defined. A more complete version of the above example is available in the examples/serialization
directory of the implementation repository, which also demonstrates this technique.
During deserialization there are multiple routines available for reading data from the byte stream. The best one to use depends on the amount of data being read, as well as the destination they are being read into. The decision tree helps to illustrate when to use which routine, using U
as an alias for deserialized_type_t<T>
.
Implementing Custom Deserialization
Is there a sequence of objects of the same type being read from the byte stream?
YES:
Does the memory they’re being read into already have existing objects?
YES: U *ptr = read_sequence_overwrite<T>(array, num_items)
NO: U *ptr = read_sequence_into<T>(storage, num_items)
NO:
Is object being read into a variable or heap storage?
VARIABLE:
Does the variable contain an object that needs to be destructed before the object in the byte stream is read in?
YES: U *ptr = read_overwrite<T>(var)
NO: U var = read<T>()
STORAGE:
Does the storage point to an object that needs to be destructed before the object in the byte stream is read in?
YES: U *ptr = read_overwrite<T>(*storage)
NO: U *ptr = read_into<T>(storage)
It is often the case that an application-defined class requiring custom serialization itself contains member fields which are also of application-defined types. While a programmer could implement the serialization/deserialization of the wrapping class by extracting data from those member fields and then re-initializing them during deserialization, this logic would have to be duplicated in every place where instances of those classes were also being serialized/deserialized. Instead, UPC++ supports recursive serialization wherein the serialization logic for one class may implicitly call the serialization logic for another.
The example below illustrates how this looks in the case of UPCXX_SERIALIZED_FIELDS
. custom_class_1
is Serializable because it uses UPCXX_SERIALIZED_FIELDS
to serialize its only member field (msg
) which is itself of a Serializable type (std::string
). custom_class_2
then uses UPCXX_SERIALIZED_FIELDS
to serialize its member field std::vector<custom_class_1> msgs
. msgs
is Serializable thanks to UPC++’s built-in support for std::vector
, and because custom_class_1
has already used UPCXX_SERIALIZED_FIELDS
to declare how it should be serialized.
This enables a separation of concerns, wherein programmers do not need to recursively and manually serialize deep object hierarchies themselves so long as the nested objects are themselves Serializable.
class custom_class_1 {
public:
std::string msg;
() { }
custom_class_1(const std::string &_msg) : msg(_msg) { }
custom_class_1
(msg)
UPCXX_SERIALIZED_FIELDS};
class custom_class_2 {
public:
std::vector<custom_class_1> msgs;
() { }
custom_class_2void add_msg(const std::string &m) { msgs.push_back(custom_class_1(m)); }
(msgs)
UPCXX_SERIALIZED_FIELDS};
UPC++ also supports hierarchical serialization of a base class from within a derived class. Both UPCXX_SERIALIZED_FIELDS
and UPCXX_SERIALIZED_VALUES
accept UPCXX_SERIALIZED_BASE(B)
as an argument, where B
is the type of the base class (not the member within the derived class). Derived classes can also use custom serialization as long as the base class has a serializable type. For details, see the UPC++ Specification.
For substantially large objects, deserializing them from UPC++’s internal network buffers can have non-trivial performance cost, and, if the object was built up only to be consumed and immediately torn down within the RPC, then it’s likely that performance can be regained by eliminating the superfluous build-up/tear-down. Notably this can happen with containers: Suppose process A
would like to send a collection of values to process B
which will assimilate them into its local state. If process A were to transmit these values by RPC’ing them in a std::vector<T>
(which along with many other std::
container types, is supported as Serializable in the UPC++ implementation) then upon receipt of the RPC, the UPC++ program would enact the following steps during deserialization:
vector
by visiting each T
element in the network buffer and copying it into the vector
container.vector
.T
’s in the vector
and consume them, likely by copying them out to the process’s private state.vector
.This process works, but can entail considerable unnecessary overhead that might be problematic in performance-critical communication. The remedy to eliminating the overheads associated with steps 1 and 4 is to allow the application direct access to the T
elements in the internal network buffer. UPC++ grants such access with the upcxx::view<T>
type. A view is little more than a pair of iterators delimiting the beginning and end of an ordered sequence of T
values. Since a view only stores iterators, it is not responsible for managing the resources supporting those iterators. Most importantly, when being serialized, a view will serialize each T
it encounters in the sequence, and when deserialized, the view will “remember” special network buffer iterators delimiting its contents directly in the incoming buffer. The RPC can then ask the view for its begin/end iterators and traverse the T
sequence in-place.
The following example demonstrates how a user could easily employ views to implement a remote vector accumulate, i.e., adding the values contained in a local buffer to the values in a remote shared array. Views enable the transmission of the local array of double
’s with minimal intermediate copies. On the sender side, the user acquires begin and end iterators to the value sequence they wish to send (in this case double*
acts as the iterator type) and calls upcxx::make_view(beg,end)
to construct the view. That view is bound to an rpc
whose lambda accepts a upcxx::view<double>
on the receiver side, and traverses the view to consume the sequence, adding each element to the corresponding element in a shared array on the target rank.
::future<> add_accumulate(upcxx::global_ptr<double> remote_dst,
upcxxdouble *buf, std::size_t buf_len) {
return upcxx::rpc(remote_dst.where(),
[](const upcxx::global_ptr<double>& dst, const upcxx::view<double>& buf_in_rpc) {
// Traverse `buf_in_rpc` like a container, adding each element to the
// corresponding element in dst. Views fulfill most of the container
// contract: begin, end, size, and if the element type is trivial, even operator[].
double *local_dst = dst.local();
std::size_t index = 0;
for(double x : buf_in_rpc) {
[index++] += x;
local_dst}
},
, upcxx::make_view(buf, buf + buf_len));
remote_dst}
Beyond just simple pointers to contiguous data, arbitrary iterator types can be used to make a view. This allows the user to build views from the sequence of elements within std
containers using upcxx::make_view(container)
, or, given any compliant ForwardIterator
, upcxx::make_view(begin_iter, end_iter)
.
For a more involved example, we will demonstrate one process contributing histogram values to a histogram distributed over all the processes. We will use std::string
as the key-type for naming histogram buckets, double
for the accumulated bucket value, and std::unordered_map
as the container type for mapping the keys to the values. Assignment of bucket keys to owning process is done by a hash function. We will demonstrate transmission of the histogram update with and without views, illustrating the performance advantages that views enable.
// Hash a key to its owning rank.
::intrank_t owner_of(std::string const &key) {
upcxxstd::uint64_t h = 0x1234abcd5678cdef;
for(char c: key) h = 63*h + std::uint64_t(c);
return h % upcxx::rank_n();
}
using histogram1 = std::unordered_map<std::string, double>;
// The target rank's histogram which is updated by incoming rpc's.
;
histogram1 my_histo1
// Sending histogram updates by value.
::future<> send_histo1_byval(histogram1 const &histo) {
upcxxstd::unordered_map<upcxx::intrank_t, histogram1> clusters;
// Cluster histogram elements by owning rank.
for(auto const &kv: histo) clusters[owner_of(kv.first)].insert(kv);
::promise<> all_done;
upcxx// Send per-owner histogram clusters.
for(auto const &cluster: clusters) {
::rpc(cluster.first, upcxx::operation_cx::as_promise(all_done),
upcxx[](histogram1 const &histo) {
// Pain point: UPC++ already traversed the key-values once to build the
// `histo` container. Now we traverse again within the RPC body.
for(auto const &kv: histo)
[kv.first] += kv.second;
my_histo1
// Pain point: UPC++ will now destroy the container.
},
.second
cluster);
}
return all_done.finalize();
}
// Sending histogram updates by view.
::future<> send_histo1_byview(histogram1 const &histo) {
upcxxstd::unordered_map<upcxx::intrank_t, histogram1> clusters;
// Cluster histogram elements by owning rank.
for(auto const &kv: histo) clusters[owner_of(kv.first)].insert(kv);
::promise<> all_done;
upcxx// Send per-owner histogram clusters.
for(auto const &cluster: clusters) {
::rpc(cluster.first, upcxx::operation_cx::as_promise(all_done),
upcxx[](upcxx::view<std::pair<const std::string, double>> const &histo_view) {
// Pain point from `send_histo1_byval`: Eliminated.
// Traverse key-values directly from network buffer.
for(auto const &kv: histo_view)
[kv.first] += kv.second;
my_histo1
// Pain point from `send_histo1_byval`: Eliminated.
},
::make_view(cluster.second) // build view from container's begin()/end()
upcxx);
}
return all_done.finalize();
}
There is a further benefit to using view-based serialization: the ability for the sender to serialize a subset of elements directly out of a container without preprocessing it (as is done in the two examples above). This is most efficient if we take care to use a container that natively stores its elements in an order grouped according to the destination process. The following example demonstrates the same histogram update as before, but with a data structure that permits sender-side subset serialization.
// Hash a key to its owning rank.
::intrank_t owner_of(std::string const &key) {
upcxxstd::uint64_t h = 0x1234abcd5678cdef;
for(char c: key) h = 63*h + std::uint64_t(c);
return h % upcxx::rank_n();
}
// This comparison functor orders keys such that they are sorted by
// owning rank at the expense of rehashing the keys in each invocation.
// A better strategy would be modify the map's key type to compute this
// information once and store it in the map.
struct histogram2_compare {
bool operator()(std::string const &a, std::string const &b) const {
using augmented = std::pair<upcxx::intrank_t, std::string const&>;
return augmented(owner_of(a), a) < augmented(owner_of(b), b);
}
};
using histogram2 = std::map<std::string, double, histogram2_compare>;
// The target rank's histogram which is updated by incoming rpc's.
;
histogram2 my_histo2
// Sending histogram updates by view.
::future<> send_histo2_byview(histogram2 const &histo) {
upcxx::const_iterator run_begin = histo.begin();
histogram2
::promise<> all_done;
upcxxwhile(run_begin != histo.end()) {
::const_iterator run_end = run_begin;
histogram2::intrank_t owner = owner_of(run_begin->first);
upcxx
// Compute the end of this run as the beginning of the next run.
while(run_end != histo.end() && owner_of(run_end->first) == owner) run_end++;
::rpc(owner, upcxx::operation_cx::as_promise(all_done),
upcxx[](upcxx::view<std::pair<const std::string, double>> const &histo_view) {
// Traverse key-values directly from network buffer.
for(auto const &kv: histo_view)
[kv.first] += kv.second;
my_histo2},
// Serialize from a subset of `histo` in-place.
::make_view(run_begin, run_end)
upcxx);
= run_end;
run_begin }
return all_done.finalize();
}
view
’s Iterator TypeThe above text presented correct and functional code, but it oversimplified the C++ type of the UPC++ view by relying on some of its default characteristics and type inference. The full type signature for view is:
::view<T, Iter=/*internal buffer iterator*/> upcxx
Notably, the view type has a second type parameter which is the type of its underlying iterator. If omitted, this parameter defaults to a special UPC++ provided iterator that deserializes from a network buffer, hence this is the correct type to use when specifying the incoming bound-argument in the RPC function. But this will almost never be the correct type for the view on the sender side of the RPC. For instance, upcxx::make_view(...)
deduces the iterator type provided in its arguments as the Iter
type to use in the returned view. If you were to attempt to assign that to an temporary variable you might be surprised:
std::vector<T> vec = /*...*/;
::view<T> tmp = upcxx::make_view(vec); // Type error: mismatched Iter types
upcxxauto tmp = upcxx::make_view(vec); // OK: deduced upcxx::view<T,std::vector<T>::const_iterator>
Given that the lifetime of a view does not influence the lifetime of its underlying data, UPC++ must make guarantees to the application about the lifetime of the network buffer when referenced by a view. From the examples above, it should be clear that UPC++ will ensure the buffer will live for at least as long as the RPC callback function is executing. In fact, UPC++ will actually extend this lifetime until the future (if any) returned by the RPC callback is ready. This gives the application a convenient means to dispatch the processing of the incoming view to another concurrent execution agent (e.g. a thread), thereby returning from the RPC callback nearly immediately and allowing the UPC++ runtime to resume servicing additional user progress events.
The following example demonstrates how a process can send sparse updates to a remote matrix via rpc
. The updates are not done in the execution context of the rpc
itself, instead the rpc
uses lpc
’s to designated worker personas (backed by dedicated threads) to dispatch the arithmetic update of the matrix element depending on which worker owns it. Views and futures are used to extend the lifetime of the network buffer until all lpc
’s have completed, thus allowing those lpc
’s to use the elements directly from the buffer.
double my_matrix[1000][1000] = {/*0...*/}; // Rank's local matrix.
constexpr int worker_n = 8; // Number of worker threads/personas.
// Each persona has a dedicated thread spinning on its progress engine.
::persona workers[worker_n];
upcxx
struct element {
int row, col;
double value;
};
::future<> update_remote_matrix(upcxx::intrank_t rank,
upcxxconst *elts, int elt_n) {
element return upcxx::rpc(rank,
[](upcxx::view<element> const &elts_in_rpc) {
::future<> all_done = upcxx::make_future();
upcxxfor(int w=0; w < worker_n; w++) {
// Launch task on respective worker.
auto task_done = workers[w].lpc(
[w, elts_in_rpc]() {
// Sum subset of elements into `my_matrix` according to a
// round-robin mapping of matrix rows to workers.
for(element const &elt: elts_in_rpc) {
if(w == elt.row % worker_n)
[elt.row][elt.col] += elt.value;
my_matrix}
}
);
// Conjoin task completion into `all_done`.
= upcxx::when_all(all_done, task_done);
all_done }
// Returned future has a dependency on each task lpc so the network
// buffer (thus `elts_in_rpc` view) will remain valid until all tasks
// have completed.
return all_done;
},
::make_view(elts, elts + elt_n)
upcxx);
}
These lifetime extension semantics for RPC callback arguments actually apply not only to upcxx::view
(as demonstrated above), but also to all deserialized arguments that an RPC callback accepts by const reference.
The memory kinds interface enables the UPC++ programmer to identify regions of memory requiring different access methods or having different performance properties, and subsequently rely on the UPC++ communication services to perform transfers among such regions (both local and remote) in a manner transparent to the programmer. With GPU devices, HBM, scratch-pad memories, NVRAM and various types of storage-class and fabric-attached memory technologies featured in vendors’ public road maps, UPC++ must be prepared to deal efficiently with data transfers among all the memory technologies in any given system. UPC++ currently supports three memory kinds corresponding to device-resident memory onboard the following popular GPU accelerator lines:
In the future it’s anticipated that UPC++ will be extended to handle additional kinds of accelerator and non-host memories.
We demonstrate how to use memory kinds to transfer data between device and host memories, and then between host and device residing on different nodes. For simplicity of presentation our examples use a single GPU device per process, however UPC++ can also handle nodes with multiple GPU devices or heterogeneous mixes of devices. See the UPC++ Specification for the details.
In our first example, we allocate a block of storage in device and host memories and then move some data from host to GPU.
To allocate storage, we first construct a device segment allocator using the make_gpu_allocator()
factory function, which we then use to allocate shared objects in GPU memory. (This function was first made available in UPC++ release 2022.3.0 to simplify allocator management.) Of note, we pass the segment size to the allocator, in our case 4 MiB. The object is templated on the device type, and defaults to gpu_default_device
. This type alias is set by compile definitions and usually indicates the type of GPU located on the machine being used. Valid device types include upcxx::cuda_device
, upcxx::hip_device
and upcxx::ze_device
, corresponding to the three GPU hardware variants mentioned in the previous section. In this first example we explicitly request a device segment on a CUDA device. The device_allocator
object returned here manages a CUDA shared memory segment of the specified size.
std::size_t segsize = 4*1024*1024; // 4 MiB
// Allocate GPU memory segment
auto gpu_alloc = upcxx::make_gpu_allocator<upcxx::cuda_device>(segsize);
In addition to requesting a particular memory kind, function arguments are also available to request a specific CUDA device ID (for systems with multiple GPUs), or to optionally provide a pointer to previously allocated device memory. The full prototype is:
template <typename Device = gpu_default_device>
<Device> make_gpu_allocator(size_t sz_in_bytes,
device_allocator::id_type device_id = Device::auto_device_id,
Devicevoid *device_memory = nullptr);
The device_id
argument is an integer that defaults to a “smart” choice in the range [0,Device::device_n())
, which is also the range of valid values to provide. In the case of no available GPUs at the calling process, the device_allocator
returned from the function will be inactive. By default the function will allocate memory from the GPU driver library of the specified size to serve as the device segment.
If we need a larger segment, we need to specify that at the time we construct the allocator. Currently, there is no way to dynamically extend the segment. Note that as with any memory allocator, a small amount of external padding may be inserted around allocated objects to ensure proper object alignment within the segment. As such, device segments should be sized slightly larger than the sum of the anticipated data allocation sizes.
UPC++ also supports creation of device segments around pre-existing regions of device memory, for example a device memory area obtained from an on-node programming library, or one provisioned with non-default device-specific properties. The device_memory
argument to make_gpu_allocator()
supports this use case (not demonstrated in this example). If device_memory
is specified, then device_id
must correspond to the device where the memory resides, and must point to a segment of at least sz_in_bytes
bytes. Any existing data in provided device memory will be preserved within the UPC++ segment, and the device_allocator::to_global_ptr()
function can be used to obtain global_ptr
’s to pre-existing device objects in such a segment.
The call to make_gpu_allocator()
must be done collectively across all the processes. However different processes are permitted to pass different arguments. For example, processes may open devices with different IDs. If a given process wants to opt out of opening a device it can set the device ID to invalid_device_id
; this is the default behavior of auto_device_id
when called on a process that has no available devices. A different segment size can also be specified by each process. Once the device_allocator
is constructed, allocation and deallocation of shared memory in the device segment is non-collective.
The next step is to allocate an array of 1024 doubles on the GPU, by calling the allocate
function of the allocator we have just created.
// Allocate an array of 1024 doubles on GPU
<double,memory_kind::cuda_device> gpu_array = gpu_alloc.allocate<double>(1024); global_ptr
Note the second template argument to the global_ptr
which specifies the kind of target memory, i.e., memory_kind::cuda_device
. This statically indicates a global pointer that references memory in a CUDA memory segment. This differs from pointers to host memory, where the second template argument to global_ptr
defaults to memory_kind::host
, indicating regular host memory. There is also a memory_kind::any
kind which acts as a wildcard and generates a global_ptr
type that can reference memory of any kind.
It’s also worth noting that the global_ptr
stack variables in this example could have been more concisely declared using the auto
type specifier to leverage C++ type deduction; we’ve explicitly written out their types here to help clarify the presentation.
Next, we allocate the host storage using the familiar new_array
method:
<double> host_array = upcxx::new_array<double>(1024); global_ptr
Data movement that potentially involves non-host memory is handled by the upcxx::copy()
function. We transfer 1024 doubles from the host buffer to the GPU buffer. As with rput
and rget,
copy
is an asynchronous operation that returns a future. In this case, we immediately wait on that future, but the full machinery of completions is available to enable overlap and schedule dependent operations (see Completions).
::copy(host_array, gpu_array, 1024).wait(); upcxx
If we wanted to move the data in the opposite direction, we’d simply swap the host_array
and gpu_array
arguments. Note the absence of explicit CUDA data movement calls.
After transferring the data, most users will need to invoke a computational kernel on the device, and will therefore need a raw pointer to the data allocated on the device. The device_allocator::allocate
function returns a global_ptr
to the data in the specific segment associated with that allocator. This global_ptr
can be downcast to a raw device pointer using the allocator’s local
function.
::pointer<T> device_allocator<Device>::local(global_ptr<T, Device::kind> g); Device
This function will return the raw device pointer (a T *
) which can then be used as an argument to a CUDA computational kernel.
In a similar fashion, a raw device pointer into the segment managed by a device_allocator
may be upcast into a global_ptr
using the to_global_ptr
function of that allocator.
template<typename Device>
template<typename T>
<T, Device::kind> device_allocator<Device>::to_global_ptr(Device::pointer<T> ptr); global_ptr
In the case where nodes have multiple devices, it is important to obtain the ID of the device on which an allocation resides in order to properly launch a kernel. This can be retrieved using the device_allocator::device_id()
method.
Here is a complete example:
#include <upcxx/upcxx.hpp>
#include <iostream>
#if !UPCXX_KIND_CUDA
#error "This example requires UPC++ to be built with CUDA support."
#endif
using namespace std;
using namespace upcxx;
int main() {
::init();
upcxx
std::size_t segsize = 4*1024*1024; // 4 MiB
auto gpu_alloc = upcxx::make_gpu_allocator<cuda_device>(segsize); // alloc GPU segment
(gpu_alloc.is_active(),
UPCXX_ASSERT_ALWAYS"Failed to open GPU:\n" << cuda_device::kind_info());
// alloc some arrays of 1024 doubles on GPU and host
<double,memory_kind::cuda_device> gpu_array = gpu_alloc.allocate<double>(1024);
global_ptr<double> host_array1 = new_array<double>(1024);
global_ptr<double> host_array2 = new_array<double>(1024);
global_ptr
double *h1 = host_array1.local();
double *h2 = host_array2.local();
for (int i=0; i< 1024; i++) h1[i] = i; //initialize h1
// copy data from host memory to GPU
::copy(host_array1, gpu_array, 1024).wait();
upcxx// copy data back from GPU to host memory
::copy(gpu_array, host_array2, 1024).wait();
upcxx
int nerrs = 0;
for (int i=0; i<1024; i++){
if (h1[i] != h2[i]){
if (nerrs < 10) cout << "Error at element " << i << endl;
++;
nerrs}
}
if (nerrs) cout << "ERROR: " << nerrs << " errors detected" << endl;
else if (!upcxx::rank_me()) cout << "SUCCESS" << endl;
.deallocate(gpu_array);
gpu_alloc(host_array2);
delete_array(host_array1);
delete_array
.destroy();
gpu_alloc::finalize();
upcxx}
The power of the UPC++ memory kinds facility lies in its unified interface for moving data not only between host and device on the same node, but between source and target of any memory kind that reside on different nodes, by leveraging UPC++’s global pointer abstraction. We next show how to move data between a host and device that reside on different nodes. This time we will also use gpu_default_device
for the device allocator instead of specifying cuda_device
. As such, the code can be run without modifications using NVIDIA, AMD or Intel GPU hardware.
::device_allocator<upcxx::gpu_default_device> gpu_alloc =
upcxx::make_gpu_allocator(segsize); upcxx
Once we have set up a device allocator, we allocate host and device buffers as before, although this time the device memory global_ptr
will have a memory kind of gpu_default_device::kind
. Recall this is not the default template argument, so it needs to be specified (or deduced using auto
). We use dist_object
as a directory for fetching remote global pointers, which we can then use in the copy()
method:
<global_ptr<double,gpu_default_device::kind>> dobj(gpu_array);
dist_objectint neighbor = (rank_me() + 1) % rank_n();
<double,gpu_default_device::kind> other_gpu_array = dobj.fetch(neighbor).wait();
global_ptr
// copy data from local host memory to remote GPU
::copy(host_array1, other_gpu_array, 1024).wait();
upcxx// copy data back from remote GPU to local host memory
::copy(other_gpu_array, host_array2, 1024).wait();
upcxx
::barrier(); upcxx
Again, if we want to reverse the direction of the RMA data motion, we need only swap the first two arguments in the call to copy().
Instead of immediately waiting for it to complete, we could instead use completion objects such as the operation_cx::as_rpc
mechanism mentioned in the Completions section. To modify the heat transfer simulation mentioned there for GPU compatibility, the memory buffers of both interior and halo cells could be allocated in device memory. This would mean that copy()
operations between neighboring processes take the form of a device-to-device memory transfer. On compatible architectures, UPC++ can implement this using offload technologies, such as GPUDirect RDMA and ROCmRDMA. For a more thorough discussion and analysis of how to write a GPU version of the heat transfer example (which is decomposed in three dimensions and uses the Kokkos programming model for on-node parallelism), the reader is referred to this paper (doi:10.25344/S4630V).
Our code concludes by deallocating host and device buffers. Device allocator has its own method for object deallocation:
.deallocate(gpu_array); gpu_alloc
Note that in this second example, we have preceded this deallocation with a call to upcxx::barrier
to prevent a process from deallocating the array from its device segment while a neighboring process may still be accessing it using upcxx::copy()
(See Quiescence). Once all accesses to a device segment are quiesced, it can safely be destroyed using a collective call to the device_allocator::destroy()
function:
.destroy(); gpu_alloc
In this section, we’ve shown how to use memory kinds in UPC++. This powerful feature relies on a unified notion of the global pointer abstraction to enable code using the pointer to interpret the kind of memory the pointer refers to, while hiding the details of the actual access method from the programmer. We’ve not demonstrated how to move data directly between GPUs, but the details follow the same patterns presented in the preceding examples.
Intermixing communication on device memory kinds with computational kernels on devices is straightforward, but GPU programming is beyond the scope of this guide. Please consult the UPC++ Extras package for several examples that demonstrate efficient use of the GPU with UPC++, as well as the gpu_vecadd example in the UPC++ distribution demonstrating vector addition.
The upcxx-run
utility provides a convenience wrapper around a system-provided “native” job launcher, to provide a uniform interface for the most-common scenarios. However, for the “power user” the facilities of the native job launcher (such as core binding) are indispensable. Therefore, this section describes how to launch UPC++ applications with the native launcher.
The operation of upcxx-run
includes the following three conceptual steps:
-shared-heap
and -backtrace
command line options.To run a UPC++ application with the native launcher, you must first use upcxx-run -v -show ...
to display the results of the three steps above (echoing the next-level launcher rather than executing it). Until you are familiar with the operation of upcxx-run
for your scenario, take care to include all of options appropriate to your desired execution. If you cannot pass the intended executable, at least pass one compiled with the same installation and for the same network.
The following shows a simple example:
$ upcxx-run -v -show -n2 ./test-hello_upcxx-smp
UPCXX_RUN: ./test-hello_upcxx-smp is compiled with smp conduit
UPCXX_RUN: Environment:
UPCXX_RUN: GASNET_MAX_SEGSIZE = 128MB/P
UPCXX_RUN: GASNET_PSHM_NODES = 2
UPCXX_RUN: UPCXX_SHARED_HEAP_SIZE = 128 MB
UPCXX_RUN: Command line:
./test-hello_upcxx-smp
The first line of the output above identifies the network as smp
. The three lines indented below Environment:
show the environment variables set by upcxx-run
as well as any (potentially) relevant ones set in the environment prior to running upcxx-run
. Finally, after Command line:
we see the next-level launcher command (which is trivial in the case of smp
).
The key to launching without upcxx-run
is to run the native launcher with the same environment one would have if using it. In some cases, the command line in the -show
output is the native launch command. In others it is a second-level wrapper provided by GASNet. The following sub-sections describe what to do with the command line, based on the network. However, since the presentation is incremental, you should read in order and not skip ahead to the network you are using.
The UPC++ shared segment is created at job startup and used to service all shared heap allocations (see Global Memory). The shared segment size at each process is determined at job creation, as directed by the user’s upcxx-run -shared-heap
argument (see Running UPC++ Programs). This is currently implemented by setting the UPCXX_SHARED_HEAP_SIZE
and GASNET_MAX_SEGSIZE
environment variables, as one can see in the output above.
Specifying an appropriate shared heap size is important to the correct operation of many applications. However the details of how these variables interact is subtle (notably when passing a percentage argument) and subject to change. As such, we highly recommend that users wishing to bypass the upcxx-run
wrapper first invoke upcxx-run -v -show -shared-heap HEAPSZ
as suggested above with the actual desired HEAPSZ in order to obtain the appropriate values for these environment variables.
In the case of the smp
network, communication is performed through shared memory on a single host, and there is no actual network used. This leaves very little motivation to bypass upcxx-run
in order to execute an application in this case. However, we present this case both for completeness and because it is the simplest case. Using the environment output shown prior to the start of this sub-section, one may launch a 2-process instance of ./test-hello_upcxx-smp
as follows:
$ env GASNET_MAX_SEGSIZE=128MB/P GASNET_PSHM_NODES=2 \
'128 MB' ./test-hello_upcxx-smp
UPCXX_SHARED_HEAP_SIZE=Hello from 0 of 2
Hello from 1 of 2
The udp
network case uses amudprun
as its second-level launcher. The -show
output may look like one of the following two examples depending on use of -localhost
or -ssh-servers
:
$ upcxx-run -show -v -n4 -localhost ./test-hello_upcxx-udp
UPCXX_RUN: ./test-hello_upcxx-udp is compiled with udp conduit
UPCXX_RUN: Looking for spawner "amudprun" in:
UPCXX_RUN: /usr/local/pkg/upcxx/bin/../gasnet.opt/bin
UPCXX_RUN: Found spawner "/usr/local/pkg/upcxx/bin/../gasnet.opt/bin/amudprun"
UPCXX_RUN: Environment:
UPCXX_RUN: GASNET_MAX_SEGSIZE = 128MB/P
UPCXX_RUN: GASNET_SPAWNFN = L
UPCXX_RUN: UPCXX_SHARED_HEAP_SIZE = 128 MB
UPCXX_RUN: Command line:
/usr/local/pkg/upcxx/bin/../gasnet.opt/bin/amudprun -np 4 ./test-hello_upcxx-udp
$ upcxx-run -show -v -n4 -ssh-servers pcp-d-5,pcp-d-6,pcp-d-7,pcp-d-8 ./test-hello_upcxx-udp
UPCXX_RUN: ./test-hello_upcxx-udp is compiled with udp conduit
UPCXX_RUN: Looking for spawner "amudprun" in:
UPCXX_RUN: /usr/local/pkg/upcxx/bin/../gasnet.opt/bin
UPCXX_RUN: Found spawner "/usr/local/pkg/upcxx/bin/../gasnet.opt/bin/amudprun"
UPCXX_RUN: Environment:
UPCXX_RUN: GASNET_IBV_SPAWNFN = ssh
UPCXX_RUN: GASNET_MAX_SEGSIZE = 128MB/P
UPCXX_RUN: GASNET_SPAWNFN = S
UPCXX_RUN: GASNET_SSH_SERVERS = pcp-d-5,pcp-d-6,pcp-d-7,pcp-d-8
UPCXX_RUN: SSH_SERVERS = pcp-d-5,pcp-d-6,pcp-d-7,pcp-d-8
UPCXX_RUN: UPCXX_SHARED_HEAP_SIZE = 128 MB
UPCXX_RUN: Command line:
/usr/local/pkg/upcxx/bin/../gasnet.opt/bin/amudprun -np 4 ./test-hello_upcxx-udp
As with smp
, running this ./test-hello_upcxx-udp
is simply a matter of running the indicated command line with the indicated environment. Taking the simpler -localhost
case and folding-away bin/../
:
$ env GASNET_MAX_SEGSIZE=128MB/P GASNET_SPAWNFN=L UPCXX_SHARED_HEAP_SIZE='128 MB' \
-np 4 ./test-hello_upcxx-udp
/usr/local/pkg/upcxx/gasnet.opt/bin/amudprun Hello from 0 of 4
Hello from 1 of 4
Hello from 2 of 4
Hello from 3 of 4
For the aries
, ibv
and ofi
networks the output of upcxx-run -v -show ...
will display a command line involving a next-level launcher gasnetrun_[network]
. For instance (with line continuations added for readability):
$ upcxx-run -v -show -n4 -N2 ./test-hello_upcxx-aries
UPCXX_RUN: ./hello_upcxx is compiled with aries conduit
[... environment output elided ...]
UPCXX_RUN: Command line:
<upcxx-install-path>/bin/../gasnet.opt/bin/gasnetrun_aries \
-E UPCXX_INSTALL,UPCXX_NETWORK,UPCXX_SHARED_HEAP_SIZE \
-N 2 -n 4 ./test-hello_upcxx-aries
Similar to upcxx-run -show
, the gasnetrun_[network]
utilities take -t
to echo some verbose output which concludes with the command to be run (without running it). Continuing the example above (noting the -t
inserted as the first argument to gasnetrun_aries
):
$ env [environment-settings-from-upcxx-run] \
-t \
[full-path-to]/gasnetrun_aries -E UPCXX_INSTALL,UPCXX_NETWORK,UPCXX_SHARED_HEAP_SIZE \
-N 2 -n 4 ./test-hello_upcxx-aries
[... some output elided ...]
gasnetrun: running: /usr/bin/srun -C knl -K0 -W30 -v -mblock \
-n 4 -N 2 [full-path-to]/test-hello_upcxx-aries
In this case /usr/bin/srun
is the native job launcher on a SLURM-based Cray XC system. Other native launchers one might see include aprun
, jsrun
or even mpirun
.
Together with the environment from the initial upcxx-run -v -show ...
the printed /usr/bin/srun ...
command is sufficient to reproduce the behavior of the upcxx-run
command without -v -show
. With an understanding of the native launcher one can modify this command to suit one’s needs, such as by addition of options to bind processes to CPU cores.
The preceding subsection describes a two-step process for determining the native launcher for certain networks. There is also a single-step approach: upcxx-run -vv ...
. The output from both steps described above are produced in a single step, but with two key differences. The first is that this approach will run the provided executable (and thus may require you to run in an appropriate environment). The second is that in addition to the output required to uncover the native launcher, a large volume of additional verbose output is generated by the added execution.
The ibv
network case may default to using a GASNet-provided “ssh-spawner”. As the name implies, this utilizes ssh
to perform job launch. This can be identified in the output of gasnetrun_ibv -t ...
by the presence of gasnetrun: set GASNET_SPAWN_ARGS=...
.
Since ssh-spawner bypasses the native job launcher, it is not suitable for obtaining more control over the job than is available via upcxx-run
. You may try GASNET_IBV_SPAWNER=mpi
or GASNET_IBV_SPAWNER=pmi
in the environment to demand a different spawner. However, this may result in a failure from gasnetrun_ibv
such as the following:
Spawner is set to PMI, but PMI support was not compiled in
In some cases, running gasnetrun_[network] -t ...
may yield yet another level of launcher above the native one. When present, this is most often a site-specific wrapper intended to apply default behaviors (such as for core binding) or to collect usage statistics. Since there is no uniformity in such utilities, we cannot provide detailed instructions. However, there is often some way to enable a “show” or “verbose” behavior in order to find the native spawn command hidden beneath. Take care in such cases to seek out not only the final launch command, but also any environment variables which may be set to influence its behavior.
This chapter contains a snapshot of Frequently Asked Questions (FAQ) for UPC++. The most up-to-date version of this UPC++ FAQ is available online at: upcxx.lbl.gov/faq
For training materials to get started using UPC++, see the UPC++ Training Page
UPC++ runs on a variety of systems, ranging from laptops to custom supercomputers. The complete list of officially supported platforms (and C++ compilers) appears in the UPC++ INSTALL document. This currently includes systems with x86_64, ARM64 and PowerPC64 architectures running Linux, macOS or Windows (with WSL). It’s also been successfully used on other POSIX-like systems, despite not being officially supported.
There are several ways to access a working UPC++ environment:
We maintain public installs of UPC++ at several DOE centers.
See UPC++ at NERSC, OLCF, and ALCF
UPC++ supports a wide range of UNIX-like systems and architectures, ranging from supercomputers to laptops, including Linux, macOS and Windows (via WSL).
Visit the UPC++ download page for links to download and install UPC++ on your own system.
We maintain a Linux Docker container with UPC++.
Assuming you have a Linux-compatible Docker environment, you can get a working UPC++ environment in seconds with the following command:
docker run -it --rm upcxx/linux-amd64
The UPC++ commands inside the container are upcxx
(compiler wrapper) and upcxx-run
(run wrapper), and the home directory contains some example codes. Note this container is designed for test-driving UPC++ on a single node, and is not intended for long-term development or use in production environments.
The container above supports the SMP and UDP backends, and does not include an MPI install. If you additionally need MPI support, there is also a larger “full” variant of the container that includes both UPC++ and MPI:
docker run -it --rm upcxx/linux-amd64-full
For details on hybrid programming with MPI, see Mixing UPC++ with MPI.
We provide minimal Docker containers intended for test-driving the latest UPC++ release (see above answer).
The E4S project provides a variety of full-featured containers in Docker, Singularity, OVA and EC2 that include UPC++, along with the entire E4S SDK. See documentation at the E4S website for usage instructions.
The UPC++ publications page provides the recommended general citations for citing UPC++ and/or GASNet-EX, along with a BibTeX file you can import directly to make it easy.
We’re especially interested to hear about work-in-progress publications using UPC++, please contact us to start a discussion!
No. See the guide section on Personas for more details.
No. UPC++ provides per-operation completions to this end. See the guide section on the Interaction of Collectives and Operation Completion for more details.
sleep()
flip a future to the ready state?No. Futures returned by communication operations will only become readied during upcxx::progress()
or other library calls with user-level progress such as future::wait()
. No amount of sleeping (sleep()
, usleep()
, std::this_thread::sleep_for()
etc.) will flip a future even if the transfer has actually completed.
No. There are no implicit ordering guarantees between any concurrent/overlapped communication operations. The only ordering guarantees are those explicitly enforced through the use of operation completions and synchronization. See the guide section on Completions for more details.
In particular, there are no implicit point-to-point ordered delivery guarantees for overlapped RPC operations between the same pair of processes (in contrast to, for example, the message ordering semantics provided by MPI_Isend/MPI_Recv
). Similarly, two concurrent RMA put operations issued by the same process are not guaranteed to update the target memory and complete in the order they were issued.
For example:
// UNORDERED puts:
auto f1 = upcxx::rput(1, gptr1); // first put begins
auto f2 = upcxx::rput(2, gptr2); // second put begins
// both puts are *concurrently* "in-flight"
// They are not guaranteed to update memory or signal completion in any particular order
.wait(); // returns when first put is completed
f1.wait(); // returns when second put is completed f2
// Explicitly ORDERED puts: (using operation completion and program order)
auto f1 = upcxx::rput(1, gptr1); // first put begins
.wait(); // returns when first put is completed
f1auto f2 = upcxx::rput(2, gptr2); // second put begins
.wait(); // returns when second put is completed f2
// Explicitly ORDERED puts: (using future continuations)
auto f = upcxx::rput(1, gptr1) // first put begins
.then([=]() {
// completion runs after first put is completed
return upcxx::rput(2, gptr2); // second put begins
});
.wait(); // returns when both puts are completed f
These same examples also apply analogously to upcxx::rpc()
operations.
The lack of implicit ordering guarantees enables more efficient communication, because it helps ensure one only incurs synchronization overheads where explicitly required for program correctness.
when_all()
, in what order do the operations complete? Is there a possibility of a race condition?Futures progress sequentially on their owning thread and in unspecified order. UPC++ futures are not a tool for expressing parallelism, but instead non-deterministic sequential behavior. Futures allow a thread to react to events in the order they actually occur, since the order in which communication operations complete is not deterministic. While UPC++ does support multiple threads within each process, and each thread can be managing its own futures, the same future must never be accessed concurrently by multiple threads. Also, UPC++ will never “spawn” new threads. See the section “Futures and Promises” of the UPC++ specification.
Yes. Here is the exact wording from the UPC++ Specification:
Operation completion implies both source and remote completion. However, it does not imply that notifications associated with source and remote completion have occurred. Similarly, remote completion implies source completion, but it does not imply that notifications associated with source completion have occurred.
So for example in an RMA injection function call of the abstract form:
::rput(psrc, pdst, sz,
upcxx::N1 | remote_cx::N2 | source_cx::N3) operation_cx
The operation completion event implies the other two completion events have also occurred, and the remote completion event implies the source completion event has also occurred. However, all of these events reflect the state of rput’s underlying RMA data transfer (i.e. the state of the memory referenced by psrc
and pdst
), and do NOT imply that any notifications (N1,N2,N3) associated with those events have been signaled yet. So for example if the actual call was:
::rput(psrc, pdst, sz,
upcxx::as_promise(p1) | remote_cx::as_rpc(C2) | source_cx::as_promise(p2)) operation_cx
Observing a signal on promise p1
does not guarantee that callback C2
has executed at the destination process, nor that promise p2
has been signaled yet. It only implies that those event notifications are queued with the UPC++ runtime at the appropriate process and will eventually be delivered during a call with user-level progress. The order in which those notifications are actually delivered to the application is unspecified, and may depend on both system-dependent details and execution timing. The same remains true when the as_promise()
notification is replaced by other notification mechanisms including as_future()
or as_lpc()
.
The discussion above is focused on bulk rput()
, because it is one of the few communication injection calls that supports all three types of completion event. Other put-like RMA calls are similar. The same properties also apply in the obvious way to communication functions that only support two types of completion event. However note the discussion above should not be misread with regards to round-trip RPC injection, e.g. upcxx::rpc(tgtrank, operation_cx::N4, C4)
; in that particular case, operation completion event N4 does imply that remote callback C4 has completed execution at the target process and delivered its return value (if any) back to the initiator.
The short answer is: NEVER use by-reference lambda captures for RPC, and by-value captures are ONLY safe if the types are TriviallySerializable (TriviallyCopyable). See the guide section on RPC and Lambda Captures for more details.
In general this is NOT guaranteed to work correctly. In particular, systems using Address space layout randomization (ASLR) (a security feature which is enabled by default on many Linux and macOS systems) will load static data and possibly even code segments at different randomized base addresses in virtual memory for each process in the job.
Note your callbacks can safely refer to static/global variables by name, which will be resolved correctly wherever the callback runs. For variables with a dynamic lifetime, the recommended solution is to use a dist_object
wrapper; sending a dist_object
reference as an RPC argument automatically ensures the callback receives a reference to the corresponding local representative of the dist_object
.
No.
Remote Procedure Call (RPC) callbacks always execute sequentially and synchronously inside user-level progress for the thread holding the unique “master persona” (a special persona that is initially granted to the thread calling upcxx::init()
on each process). User-level progress only occurs inside a handful of explicit UPC++ library calls (e.g. upcxx::progress()
and upcxx::future::wait()
).
For more details, see the guide section on the Master Persona.
You can absolutely launch RPC, RMA or other communication from within an RPC callback. However you cannot wait on the completion of communication from within the RPC callback. If you run in debug mode you’ll get an error message explaining that, e.g.: “You have attempted to wait() on a non-ready future within upcxx progress, this is prohibited because it will never complete.”
So if you launch communication from RPC callback context you need to use UPC++’s other completion and asynchrony features to synchronize completion instead of calling future::wait()
. There are many different ways to do this, and the best choice depends on the details of what the code is doing. For example, one could request future-based completion on that op and either stash the resulting future in storage outside the callback’s stack frame, or better yet schedule a completion callback on the future using future::then()
with a continuation of whatever you would have done after wait (see the guide section on Asynchronous Computation for more details.). There are also operations (like rpc_ff
) and completion types (remote_cx::as_rpc
) that signal completion at the remote side (see the guide section on Remote Completions for more details), so you perform synchronization at the target instead of the initiator.
rank_me()
? Do I need to explicitly do anything to make that more efficient?There is an optimization applied to all targets in the local_team()
(shared-memory bypass avoids one payload copy for large payloads using the rendezvous-get protocol), but nothing specific to same-process “loopback” (rank_me
).
If you think same-process/loopback RPCs comprise a significant fraction of your RPC traffic, you should probably consider that serialization semantics require copying the argument payload at least once, even for a loopback RPC. Also, the progress semantics require deferring callback invocation until the next user-level progress, so if the input arguments are hot in-cache then you may lose that locality by the time the RPC callback runs. There is also some intrinsic cost associated with callback deferment that makes it significantly more expensive than a synchronous direct function call, even for lightweight RPC arguments.
So if the RPC arguments are large (e.g., a large upcxx::view
) or otherwise expensive to serialize (e.g., lots of std::string
or other containers requiring individual fine-grained allocations), and/or are called with very high frequency, then you might consider converting a loopback RPC into a direct function call, ideally passing the arguments by reference (assuming the callback can safely share the input argument data without copying it). Such a transformation would be prohibited by the library API semantics (i.e., because it’s not transparent), but a user with knowledge of the application and callback semantics could do it safely in many circumstances and potentially get a big win under the right circumstances.
dist_object<T>
a shared object?In general NO - these concepts are orthogonal.
The upcxx::dist_object<T>
comprising a distributed object need not involve any storage in the shared memory segment. In fact, in many common use cases for dist_object<T>
these objects live on the program stack, meaning both the dist_object
and embedded T
object reside entirely in private memory (and attempting to call upcxx::try_global_pointer()
on the address of such an object would yield a null pointer). You can of course explicitly construct a upcxx::dist_object<T>
object in the shared heap using upcxx::new_<upcxx::dist_object<T>>(...)
, but this is probably not a very useful pattern.
A more common and useful pattern is dist_object<global_ptr<T>>
, where the dist_object
and global pointer objects themselves usually live in private memory, but the contained global pointer references an object allocated in the shared heap.
It might be helpful to think of distributed objects as a collective abstraction which is mostly useful for RPC and dist_object::fetch()
. This abstraction is mostly orthogonal to object storage in the shared segment, which is mostly used for RMA operations (rput
/rget
/copy
) or local_team
shared-memory bypass.
Many of the standard library container types are automatically serialized, as are TriviallyCopyable types. For more complicated types there are several mechanisms for customizing serialization: see the guide section on Serialization for more details. Note that if you are transferring a container of data elements “consumed” by the callback, you should consider using view-based serialization for transferring those elements to reduce extraneous data copies: see the guide section on View-Based Serialization for details.
The error you are getting refers to TriviallySerializable because the RMA operation you are attempting does not support the serialization you seek. Operations which are intended to be accelerated by network RMA hardware (such as rput, rget) can only move flat byte sequences, hence we assert the type given can be meaningfully moved that way. RPC’s are not restricted in the types they transmit since the CPU is always involved, and we do indeed accept and serialize most std::
containers when given as RPC arguments. There are also now interfaces for defining custom serialization on your own types for use with RPC: see the guide section on Serialization for more details.
upcxx::view<T>
argument compare to other mechanisms for passing the same TriviallySerializable T
elements to an RPC?Background info on view-based serialization. A view
argument over a TriviallySerializable type adds 8 bytes of serialized data payload to store the number of elements. Aside from that there are no additional overheads to using a view
. There are no incremental costs associated with direct access to the network buffer from the RPC callback at the target.
If your alternative is to send those elements in a container like a std::vector
, std::list
, std::set
, etc those also all add 8 bytes of element count in the serialized representation (std::array
notably does not, as the element count is static info), but these containers all also add additional element data copies and allocation calls to construct the container at the target. So a view should never be worse than a container over the same elements, and can be significantly better.
If your alternative is to send each element as an individual unboxed RPC argument, that might be comparable to a view for a very small number of elements. But the view should quickly win once you amortize the extra 8 bytes on-the-wire and reap the efficiency benefit of passing the arguments to the callback function via pointer indirection instead of forcing the compiler to copy/move each element to the program stack (beyond the point where they all fit in registers).
upcxx::rput
and upcxx::rget
RMA communication?The main benefits are:
These same benefits also apply to upcxx::copy
when applied to host memory, or to device memory with native memory kinds support.
Yes, with a caveat. UPC++ allows RMA transfers (rput
and copy
) to request target-side remote completion notification, sometimes known as “signaling put”. This feature is described in the guide section on Remote Completions.
Here’s a simple example:
double *lp_src = ...;
<double> gp_dst = ...;
global_ptr::rput(lp_src, gp_dst, count,
upcxx::as_rpc([](t1 f1, t2 f2){ ... }, a1, a2)); remote_cx
The RMA data payload transfer (from lp_src
to gp_dst
) remains fully one-sided and zero-copy using RDMA hardware on networks that provide that capability (e.g., InfiniBand and Cray networks).
The RPC arguments (a1
, a2
and the lambda itself) are subject to serialization semantics, so that part of the payload is not zero-copy; however in a well-written application this is typically small (under a few cache lines) so the extra copies are usually irrelevant to performance. Also as usual, the RPC callback will not execute until the target process enters UPC++ user-level progress.
If two or more non-atomic RMA operations concurrently access the same shared memory location with no intervening synchronization and at least one of them is a write/put, that constitutes a data race, just like in thread-style programming. This property holds whether the accesses are performed via RMA function calls (e.g. upcxx::rput
) or via deference operations on raw C++ pointers by a process local to the shared memory. An RMA data race is not immediately fatal, but any values returned and/or deposited in memory will be indeterminate, i.e. potentially garbage. Well-written programs should use synchronization to avoid data races.
UPC++ also provides remote atomic operations that guarantee atomicity and data coherence in the presence of conflicting RMAs to locations in shared memory. These can be used in ways analogous to atomic operations in thread-style programming to ensure deterministically correct behavior.
UPC++ provides the Memory Kinds interface which enables RMA communication between GPU memories and host memories anywhere in the system, all within a convenient and uniform abstraction. UPC++ global_ptr
’s can reference memory in host or GPU memory, local or remote, and the upcxx::copy()
function moves data between them.
For more details see the guide section on Memory Kinds.
The UPC++ Memory Kinds interface currently supports:
On systems with Mellanox/NVIDIA-branded InfiniBand or HPE Slingshot network hardware, this additionally includes native RMA acceleration for GPU memory implemented using technologies such as GPUDirect RDMA and ROCmRDMA. Forthcoming releases will add native RMA acceleration for GPU memory on additional platforms.
UPC++ provides several options for constructing a device segment, which is managed by a upcxx::device_allocator
object. The upcxx::make_gpu_allocator()
factory function is usually the most convenient way to construct a device segment, and is discussed in the guide section on Memory Kinds.
By default, device segment creation allocates a segment of the requested size on the device (i.e. the UPC++ library performs the allocation). However there is also an argument that optionally accepts a pointer to previously allocated device memory (e.g. allocated using cudaMalloc
or some other GPU-enabled library).
Once constructed, the device_allocator
abstraction can optionally be used to carve up the segment into individual objects using device_allocator::::allocate<T>()
.
Alternatively you may construct a UPC++ device segment around a provided region of device memory that already contains objects and data (e.g. created using a different programming model). Constructing a device_allocator
in this case allows UPC++ to register the device memory with the network for RMA and provides the means to create a global_ptr
referencing the device memory that can be shared with other processes.
A local pointer into the device segment can be converted into a global_ptr
of appropriate memory kind via device_allocator::to_global_ptr()
, and back again via device_allocator::local()
. The global_ptr
version of the pointer can be shared with other processes and used in the upcxx::copy
communication calls, and the local version of the device pointer can be passed to GPU compute kernels and other GPU-aware libraries.
CUDA Managed Memory (sometimes referred to as “Unified Memory”) is a feature that allows memory allocated in a special way to transparently migrate back-and-forth between physical pages resident in GPU memory and system DRAM, based on accesses. This migration uses page fault hardware to detect accesses and perform page migration on-demand, which can have a substantial performance cost.
The upcxx::device_allocator
constructors either allocate a (non-managed) device segment in GPU memory or accept a pre-allocated device-resident memory segment. In both cases the entire device segment must be resident in GPU memory. When native Memory Kinds are active (enabled by default for supported networks) the pages comprising the cuda_device
memory segment are pinned/locked in physical memory and registered with the network adapter as a prerequisite to enabling accelerated RDMA access (e.g. GPUDirect RDMA) on behalf of remote processes. This currently precludes Managed Memory on device segments, as it prevents migration of pages in the memory kinds segment (prohibiting direct/implicit access from the host processor). The host processor can explicitly copy data to/from a memory kinds device segment using upcxx::copy
, or (for segments it owns) using an appropriate cudaMemcpy
call.
Nothing prevents use of CUDA Managed Memory for objects outside a UPC++ device segment.
The upcxx
compiler wrapper is the best and recommended way to compile most programs using UPC++ from the command line or Makefiles. It’s a script that works analogously to the familiar mpicxx
wrapper, in that it invokes the underlying C++ compiler with the provided options, and automatically prepends the options necessary for including/linking the UPC++ library.
For details on upcxx
, see the guide section on Compiling UPC++ Programs.
For CMake projects, a UPCXX CMake package is provided in the installation directory. To use it in a CMake project, append the UPC++ installation directory to the CMAKE_PREFIX_PATH
variable (cmake ... -DCMAKE_PREFIX_PATH=/path/to/upcxx/install/prefix ...
), then use find_package(UPCXX)
in the CMakeLists.txt file of the project.
Some advanced use cases (notably when composing a hybrid of UPC++ with certain other parallel programming models) might require more manual control of the underlying compiler commands. For those use cases there is also a upcxx-meta
script that directly supplies the relevant compiler flags, analogously to the pkg-config
tool. For details on upcxx-meta
, see the README section.
Like MPI, a UPC++ rank is a process. How they are created is platform dependent, but you can count on fork() being a popular case for non-supercomputers. Typically, UPC++ processes use process shared memory (POSIX shm_*
) to communicate within a node. Several other OS-level shared memory bypass mechanisms are also supported: see GASNet documentation on “GASNet inter-Process SHared Memory (PSHM)” for details.
The shared heap segment size for each process is determined at job creation and remains fixed for the lifetime of the job. The easiest way to control the shared segment size is the upcxx-run -shared-heap=HEAPSZ
argument, where HEAPSZ
can be either an absolute per-process size (e.g., -shared-heap=512MB
, -shared-heap=2GB
, etc) or a percentage of physical memory (e.g., -shared-heap=50%
, which devotes half of compute-node physical memory to shared heap, divided evenly between co-located processes).
For details on upcxx-run
see the guide section on Running UPC++ Programs.
See docs/debugging for tips on debugging UPC++ codes.
See docs/debugging for information about using the Valgrind instrumentation framework in UPC++ programs.
Like MPI and other SPMD models, the maximum amount of potential parallelism has to be specified at job launch. Dynamically asking for more processes to join the job is not possible, but asking for more than you need up front and “waking” them dynamically is doable. Consider having all but rank=0 spinning in a loop on upcxx::progress().
When rank 0 wants to offload work to rank 1, it can send an RPC which could kick it out of its spin loop to do real work.
The UPC++ configure script should auto-detect the availability of InfiniBand (done during a step called “configuring gasnet”). To build a program to run over InfiniBand, make sure that:
CXX=mpicxx
is in your environment when calling the configure script.upcxx -network=ibv
Then to launch just use the upcxx-run
script.
You have three options:
gasnetrun_ibv
) to perform ssh-based spawning.GASNET_SSH_SERVERS="host1 host2 host3...
gasnetrun_ibv -v
option is often useful for troubleshooting site-specific problems that may arise here.-v
option one or more times (for increasing levels of verbosity).CXX=mpicxx
)For more details about job spawning see the guide section on Advanced Job Launch.
Appropriate binding of processes to CPU cores can be crucial to maximizing computational performance on dedicated computational resources. Core binding helps to prevent performance loss stemming from executing processes periodically migrating between cores, sacrificing cache state and sharing CPU core resources in other undesirable ways. Process binding can also help ensure processes acquire and maintain affinity to physical memory pages that are “nearest” to the cores executing the process; this can be especially important on modern many-core/multi-socket systems that often exhibit highly non-uniform memory access (NUMA) performance characteristics.
The best means of core-binding is system-specific, and documentation from your system administrator may provide the most accurate resource. Parallel job spawners such as SLURM srun
and IBM jsrun
have options to control core binding as part of parallel process launch, and on systems with those spawners that’s often the best place to start. This might mean a trial run with upcxx-run -show
to get the necessary “starting point” for envvars and spawner command, before manually adding some core-binding options. Failing those, tools like hwloc’s hwloc-bind
can be invoked as a wrapper around your application to achieve specific binding, although it might require some scripting trickery to bind different ranks to distinct resources.
Either way on a system with the hwloc
package installed, you can programmatically verify cores were bound as expected using code like the following, which prints the current core binding to stdout:
std::string s("echo ");
+= std::to_string(upcxx::rank_me()) + ": `hwloc-bind --get`";
s (s.c_str()); system
For more details about job spawning see the guide section on Advanced Job Launch.
Sometimes job startup can fail with shared library loading errors of the form:
/lib64/libstdc++.so.6: version `GLIBCXX_3.4.26' not found
These are especially likely to crop up in multi-node runs, where a job spawning program is starting processes across remote nodes on your behalf, but those processes might not see appropriate LD_LIBRARY_PATH
/LD_RUN_PATH
environment variables. There are many different possible causes for such errors, and the identity of the missing shared library is usually the best clue of how to proceed.
One common case of the error above is configuring UPC++ with a GNU C++ compiler that is not the default compiler for the Linux distro, meaning the supporting shared libraries are not present in the loader’s default shared library search path (e.g. /lib64
). Please see docs/local-gcc for tips on using UPC++ with a non-default GCC compiler on Linux.
Similar issues can arise from configuring UPC++ to use a non-GNU compiler that is not fully integrated with the system programming environment. Please see docs/alt-compilers for tips on using UPC++ with non-GNU compilers on Linux.
Another common failure mode in multi-node runs is failures to find networking or GPU libraries, eg:
error while loading shared libraries: libfabric.so.1:
cannot open shared object file: No such file or directory
This particular failure can often be solved by configuring with --enable-rpath
, which requests GASNet to add the link-time shared library locations to the executable RPATH. Alternatively most of the same solutions suggested for the C++ shared library in docs/local-gcc can also be applied to other missing shared libraries.
Example program:
("Hello world from rank %d\n", upcxx::rank_me());
printf(stdout);
fflush::barrier();
upcxx("Goodbye world from rank %d\n", upcxx::rank_me());
printf(stdout); fflush
Possible output from a 2-node job, as seen on the console:
$ upcxx-run -n 2 -N 2 ./a.out
Hello world from rank 1
Goodbye world from rank 1
Hello world from rank 0
Goodbye world from rank 0
The issue here is that while upcxx::barrier()
DOES synchronize the execution of all the processes, it does NOT necessarily flush all their stdout/stderr output all the way back to the central console on a multi-node system. The call to fflush(stdout)
ensures that each process has flushed output from its private buffers to the file descriptor owned by each local kernel, however in a distributed-memory run this output stream still needs to travel over the network (e.g., via a socket connection) from the kernel of each node back to the login/batch node running the upcxx-run
command before it reaches the console and is multiplexed with output from the other compute nodes to appear on your screen. upcxx::barrier()
does not synchronize any part of that output-forwarding channel, so it’s common to see such output “races” on distributed-memory systems. The same is true of MPI_Barrier()
for the same reasons.
The only way to reliably workaround this issue is to perform all console output from a single process (conventionally rank 0). This is also a good idea for scalability reasons, as generally nobody wants to scroll through 100k copies of “hello world” ;-). It’s also worth noting that console output usually vastly underperforms file system I/O with a good parallel file system, so the latter should always be preferred for large-volume I/O.
UPC++ supports several orthogonal optimizations for systems with hierarchical locality. Most modern systems have multi-core CPUs, and it’s common (although not required) to layout distributed UPC++ jobs with multiple processes sharing a physical node. In such a layout, processes sharing a physical node (and hence a coherent physical memory domain) are “closer” to each other (both in terms of wire distance and access time) than processes running on different physical nodes connected by a network.
By default, UPC++/GASNet-EX will optimize communication between processes sharing a physical node to use a fast shared-memory transport, bypassing the network adapter. This optimization is automatic (no code changes required) and can result in orders-of-magnitude improvement in communication latency and bandwidth for peers sharing a physical node, relative to processes communicating over a network.
UPC++ provides a special team called upcxx::local_team()
that contains all the processes co-located on a physical node. Processes sharing a local_team()
additionally have direct load/store access to each other’s UPC++ shared segment, via virtual memory cross-mappings established at job creation.
UPC++ applications can leverage upcxx::local_team()
to explicitly optimize their performance in several ways:
local_team()
(a physical node) communicate much more efficiently than those which do not, application data distribution decisions can sometime be made to favor communication between local_team()
peers.local_team()
can create raw C++ pointers and references directly to objects in each other’s shared segments, allowing for direct load/store access without the need for explicit UPC++ RMA calls. This technique eliminates overheads associated with library calls, and potentially eliminates the overhead of an in-memory copy for the accessed data (i.e. instead moving data directly between a peer’s shared memory and local CPU registers).local_team()
can arrange to maintain a single node-wide copy of any replicated data, potentially improving memory scalability by a factor of local_team().rank_n()
.See the guide section on local_team for further details and an example of optimization using local_team()
.
UPC++ provides built-in teams for the entire job (upcxx::world()
) and for the processes co-located on the same physical node (upcxx::local_team()
). There is no built-in singleton team for the calling process, but it’s easy to construct one using team::create()
– see example below.
#include <upcxx/upcxx.hpp>
::team *team_self_p;
upcxxinline upcxx::team &self() { return *team_self_p; }
int main() {
::init();
upcxxint world_rank = upcxx::rank_me();
// setup a self-team:
= new upcxx::team(upcxx::world().create(std::vector<int>{world_rank}));
team_self_p
// use the new team
(self().rank_me() == 0);
UPCXX_ASSERT(self().rank_n() == 1);
UPCXX_ASSERT::barrier(self());
upcxx::dist_object<int> dobj(world_rank, self());
upcxx(dobj.fetch(0).wait() == world_rank);
UPCXX_ASSERT
// cleanup (optional)
().destroy();
selfdelete team_self_p;
::finalize();
upcxx}
For more info on teams, see the guide section on Teams.
Absolutely! The main caveat is that if more than one thread will make UPC++ calls then you probably need to build using the thread-safe version of UPC++. This usually just means compiling with upcxx -threadmode=par
(or with envvar UPCXX_THREADMODE=par
).
The UPC++ model is fully thread-aware, and even provides LPC messaging queues between threads as a convenience. For details, see the guide section on Personas.
Absolutely! You’ll usually need to build using the thread-safe version of UPC++, which just means compiling with upcxx -threadmode=par
(or with envvar UPCXX_THREADMODE=par
). The other main caveat is you may need to be careful with blocking synchronization operations (such as the implicit thread barrier at the end of an OpenMP parallel region). Specifically, blocking OpenMP operations don’t advance UPC++ progress, so incautious mixing of OpenMP and UPC++ synchronization can sometimes lead to deadlock.
For details, see the guide section on OpenMP Interoperability.
If exactly one thread per process ever makes UPC++ calls over the lifetime of the program, then yes. This is analogous to MPI_THREAD_FUNNELED
, and is permitted under UPC++ threadmode=seq. However threadmode=seq does NOT allow UPC++ communication calls from multiple threads per process, even if those calls are serialized by the caller to avoid concurrency (the analog of MPI_THREAD_SERIALIZED
).
For more details on what is permitted in threadmode=seq see docs/implementation-defined.
Yes! The communication facilities provided by UPC++ nicely complement the on-node computational abstractions provided by Kokkos. Here’s a simple example showing UPC++ with Kokkos. You can even integrate UPC++ memory kinds (GPU support) with GPU computation managed by Kokkos - here’s an example of that, which is also the topic of a recent paper showing how this code can outperform an equivalent Kokkos+MPI program.
Use the UPC++ Issue Tracker to report issues or enhancement requests for the implementation or documentation.
The UPC++ Support Forum (email) is a public forum where you can ask questions or browse prior discussions.
You may also contact us via the staff email for private communication or non-technical questions.
The GASNet website includes links to documentation relevant to the networking layer inside UPC++. This includes:
The GASNet Issue Tracker contains the latest detailed information about known issues or enhancement requests in the GASNet-EX networking layer. Note that GASNet is an internal component of UPC++ that is also used by other programming models, so many of the issues there may not be relevant to UPC++.
ECONGESTION
error from the UDP network backend mean?Under some circumstances with udp-conduit you might see a runtime error like this:
*** FATAL ERROR(Node 0): An active message was returned to sender,
and trapped by the default returned message handler (handler 0):
Error Code: ECONGESTION: Congestion at destination endpoint
ECONGESTION
is the error that indicates a sending process timed out and gave up trying to send a packet to a receiver who never answered. Under default settings, this means the udp-conduit retransmit protocol gave up after around 60 seconds with no network attentiveness at the target process. The most common way this occurs is when the target process is genuinely frozen; generally because the target process is either (1) stuck in a prolonged compute loop with no UPC++ internal progress or communication calls from any thread, or (2) stopped by a debugger. The other thing we’ve occasionally seen is some DDoS filters or firewalls will step in and stop delivering UDP traffic after a certain point if it exceeds an activity threshold.
The udp-conduit retransmission protocol parameters can be tweaked using environment variables, as described in the udp-conduit README. So for example if you are using a debugger to temporarily freeze some processes, you probably want to set the timeout to infinite via envvar export GASNET_REQUESTTIMEOUT_MAX=0
which tells sending processes to never time out waiting for a network-level response from a potentially frozen process.
Here are some helpful additional resources for learning and using UPC++:
Main UPC++ Site : upcxx.lbl.gov
UPC++ Training Materials Site: upcxx.lbl.gov/training
UPC++ Frequently Asked Questions: upcxx.lbl.gov/faq
UPC++ Extras: upcxx.lbl.gov/extras
IPDPS’19 Paper: doi.org/10.25344/S4V88H
UPC++ Issue Tracker: upcxx-bugs.lbl.gov
UPC++ User Support Forum: upcxx.lbl.gov/forum
Thanks for your interest in UPC++ - we welcome your participation and feedback!