rxExec: Run A Function on Multiple Nodes or Cores
Allows distributed execution of a function in parallel across nodes (computers) or cores of a "compute context" such as a cluster.
rxExec(FUN, ... , elemArgs, elemType = "nodes", oncePerElem = FALSE, timesToRun = -1L, packagesToLoad = NULL, execObjects = NULL, taskChunkSize = NULL, quote = FALSE, consoleOutput = NULL, autoCleanup = NULL, continueOnFailure = TRUE, RNGseed = NULL, RNGkind = NULL, foreachOpts = NULL)
the function to be executed; the nodes or cores on which it is run are determined by the currently-active compute context and by the other arguments of rxExec.
arguments passed to the function
FUN each time it is executed. Separate argument values can be sent for each computation by wrapping a vector or list of argument values in rxElemArg.
a vector or list specifying arguments to
FUN. This allows a different set of arguments to be passed to
FUN each time it is executed. The length of the vector or list must match the number of times the function will be executed. Each of these elements will be passed in turn to
FUN. Using a list of lists allows multiple named or unnamed parameters to be passed. If
elemArgs has length 1, that argument is passed to all compute elements (and thus is an alternative to ...). The elements of
elemArgs may be named; if they are node names those elements will be passed to those nodes. Alternatively, they can be "rxElem1", "rxElem2" and so on. In this case, the list of returned values will have those corresponding names. See the Details section for more information. This is an alternative to using rxElemArg one or more times.
[Deprecated]. The distributed computing mode to be used. This parameter is currently deprecated and is not honored by any of the supported compute contexts. It might come back to use in the future.
logical flag. If
FUN will be run exactly once on each specified node. In this case, each element of the return list will be named with the name of the node that computed that element. If
FALSE, a node may be used more than once (but never simultaneously).
oncePerElem must be set to
elemType="cores". This parameter is ignored if the active compute context is local.
integer specifying the the total number of instances of the function
FUN to run. If
timesToRun=-1, the default, then times is set to the length of the
elemArgs argument, if it exists, else to the number of nodes or cores specified in the compute context object, if that is exact. In the latter case, if the
elemType="nodes" and a single set of arguments is being passed to each node, each element of the return list will be named with the name of the node that computed that element. If
timesToRun is not -1, it must be consistent with this other information.
optional character vector specifying additional packages to be loaded on the nodes for this job. If provided, these packages are loaded after any
packagesToLoad specified in the current distributed compute context.
optional character vector specifying additional objects to be exported to the nodes for this job, or an environment containing these objects. The specified objects are added to
FUN's environment, unless that environment is locked, in which case they are added to the environment in which
FUN is evaluated. For purposes of efficiency, this argument should not be used for exporting large data objects. Passing large data through reference to a shared storage location (e.g., HDFS) is recommended.
optional integer scalar specifying the number of tasks to be executed per compute element, or worker. By submitting tasks in chunks, you can avoid some of the overhead of starting new R processes over and over. For example, if you are running thousands of identical simulations on a cluster, it makes sense to specify the
taskChunkSize so that each worker can do its allotment of tasks in a single R process. This argument is incompatible with the
oncePerElem argument; if both are supplied, this one is ignored. It is also incompatible with lists supplied to
elemArgs with compute element names.
logical flag. If
TRUE, underlying calls to
do.call have the corresponding flag set to
TRUE. This is primarily of use to the doRSR package, but may be of use to other users.
NULL or logical value. If
TRUE, the console output from the all of the processes is printed to the user console. Note that the output from different nodes or cores may be interleaved in an unpredictable way. If
FALSE, no console output is displayed. Output can be retrieved with the function rxGetJobOutput for a non-waiting job. If not
NULL, this flag overrides the value set in the compute context when the job was submitted. If
NULL, the setting in the compute context will be used. This parameter is ignored if the active compute context is local.
NULL or logical value. If
TRUE, artifacts created by the distributed computing job are deleted when the results are returned or retrieved using rxGetJobResults. If
FALSE, the artifacts are not deleted, and the results may be obtained repeatedly using rxGetJobResults, and the console output via rxGetJobOutput until rxCleanupJobs is used to delete the artifacts. If not
NULL, this flag overrides the value set in the compute context when the job was submitted. If you routinely set
autoCleanup=FALSE, you may eventually fill your hard disk with compute artifacts. If you set
autoCleanup=TRUE and experience performance degradation on a Windows XP client, consider setting
autoCleanup=FALSE. This parameter is ignored if the active compute context is local.
NULL or logical value. If
TRUE, the default, then if an individual instance of a job fails due to a hardware or network failure, an attempt will be made to rerun that job. (R syntax errors, however, will cause immediate failure as usual.) Furthermore, should a process instance of a job fail due to a user code failure, the rest of the processes will continue, and the failed process will produce a warning when the output is collected. Additionally, the position in the returned list where the failure occured will contain the error as opposed to a result. This parameter is ignored if the active compute context is local or
NULL, the string
"auto", or an integer to be used as the seed for parallel random number generation. See the Details section for a description of how the
"auto" string is used.
NULL or a character string specifying the type of random number generator to be used. Allowable strings are the strings accepted by rxRngNewStream,
"auto", and, if the active compute context is local parallel,
"L'Ecuyer-CMRG" (for compatibility with the parallel package). See the Details section for a description of how the
"auto" string is used.
NULL or a list containing options to be passed to the foreach parallel computing backend. See foreach for details.
rxExec has very limited functionality for
RxInSqlServer for CTP3; computations
are performed sequentially.
There are two primary sets of use cases: In the first set, each computing element
(node or core) gets the same argument values; in this case, do not use
rxElemArg. In the second, each element gets a different set of
arguments; use rxElemArg for each argument that has different values, or
elemArgs whose length is equal to the number of times
Set 1 (All computing elements get the same arguments):
rxExec(FUN, arg1, arg2)
Set 2: Every computing element gets a different set of arguments. If rxElemArg is used, the length of the vector or list for the enclosed argument must equal the number of compute elements. For example,
rxExec(FUN, arg1 = 1, arg2 = rxElemArg(c(1:5)))
elemArgs is a nested list, the individual lists are passed to the compute resources according to the following:
The argument lists can be named according to which compute resource each component list should be assigned.
rxExec(FUN, elemArgs=list(compute1=list(arg1,arg2), compute2=list(arg3, arg4))). In this
case, the list of arguments must be the same length as the list of nodes requested for the current
compute context, and have the same names. If
the computation will be performed once on each requested node, and each node is assured of getting the
argument with its name. If
oncePerElem=FALSE, there is no guarantee that each node will be used in
the processing, so arguments intended for a particular node may not be used; they must still be provided,
The component names must be valid R syntactic names. If you have nodes on your cluster with names that are not valid R syntactic names, use the function rxMakeRNodeNames on the node name to determine the appropriate name to give the list component. When the return value is a list with elements named by compute node, the node names are as returned by the rxMakeRNodeNames function.
The arguments lists, if not named, will be passed to the compute resources allowed by the
compute context according to their position in the list. This is useful when
you don't care which nodes or cores the function is executed on but want
different arguments to be executed on each resource. For example,
rxExec(FUN, elemArgs=list(list(arg1,arg2), list(arg3, arg4))) or
rxExec(FUN, elemArgs=list(c(arg1, arg2), list(arg3, arg4)))
RNGkind can be used to control random number generation in
the workers. By default, both are
NULL and no special random number control is used. If either
RNGkind are set to
"auto", a parallel random number stream is
initialized on each worker, using the
"MT2203" generator and separate substreams for
each worker. If other non-null valid values are supplied for these arguments, they are used as is for
"MT2203" generator, which supports multiple substreams, but for other
rxRngNewStream-supported generators, the seed will be used as the starting point of a sequence
of seeds, one for each worker. In the special case of a local parallel compute context, the
"L'Ecuyer-CMRG" generator case can be specified, in which case the parallel package's
clusterSetRNGStream function is called on the internally generated parallel cluster.
If a waiting compute context is active, a list with an element for each job, where each element contains the value(s) returned by that job's function call(s). If a non-waiting compute context is active, a jobInfo object. See rxGetJobResults.
Microsoft Technical Support
## Not run: ## Run function with no parameters rxExec(getwd) ## Pass the same set of arguments to each compute element rxExec(list.files, all.files=TRUE, full.names=TRUE) ## Run function with the same vector sent as the first ## argument to each compute element ## The values 1 to 10 will be printed 10 times x <- 1:10 rxExec(print, x, elemType = "cores", timesToRun = 10) ## Pass a different argument value to each compute element ## The values 1 to 10 will be printed once each rxExec(print, rxElemArg( x ), elemType = "cores") ## Extract different columns from a data frame on different nodes set.seed(100) myData <- data.frame(x = 1:100, y = rep(c("a", "b", "c", "d"), 25), z = rnorm(100), w = runif(100)) myVarsToKeep = list( c("x", "y"), c("x", "z"), c("x", "w"), c("y", "z"), c("z", "w")) # myVarDataFrames will be a list of data frames myVarDataFrames <- rxExec(rxDataStep, inData = myData, varsToKeep = rxElemArg(myVarsToKeep)) ## Extract different rows from the data frame on different nodes myRowSelection = list( expression(y == 'a'), expression(y == 'b'), expression(y == 'c'), expression(y == 'd'), expression(z > 0)) myRowDataFrames <- rxExec(rxDataStep, inData = myData, rowSelection = rxElemArg(myRowSelection)) ## Use the taskChunkSize argument rxExec(sqrt, rxElemArg(1:100), taskChunkSize=50) ## End(Not run)