| Title: | Reproducible Pipeline Infrastructure for Neuroscience |
|---|---|
| Description: | Defines the underlying pipeline structure for reproducible neuroscience, adopted by 'RAVE' (reproducible analysis and visualization of intracranial electroencephalography); provides high-level class definition to build, compile, set, execute, and share analysis pipelines. Both R and 'Python' are supported, with 'Markdown' and 'shiny' dashboard templates for extending and building customized pipelines. See the full documentations at <https://rave.wiki>; to cite us, check out our paper by Magnotti, Wang, and Beauchamp (2020, <doi:10.1016/j.neuroimage.2020.117341>), or run citation("ravepipeline") for details. |
| Authors: | Zhengjia Wang [aut, cre, cph], John Magnotti [ctb, res], Xiang Zhang [ctb, res], Michael Beauchamp [ctb, res], Trustees of University of Pennsylvania [cph] (Copyright Holder) |
| Maintainer: | Zhengjia Wang <[email protected]> |
| License: | MIT + file LICENSE |
| Version: | 0.1.0 |
| Built: | 2026-05-29 21:55:32 UTC |
| Source: | https://github.com/dipterix/ravepipeline |
Encode or decode 'base64' raw or url-safe string
base64_urlencode(x) base64_encode(x) base64_urldecode(x) base64_decode(x) base64_plot( expr, width = 480, height = 480, ..., quoted = FALSE, envir = parent.frame() )base64_urlencode(x) base64_encode(x) base64_urldecode(x) base64_decode(x) base64_plot( expr, width = 480, height = 480, ..., quoted = FALSE, envir = parent.frame() )
x |
for encoders, this is an R raw or character vectors; for decoders this is 'base64' encoded strings |
expr |
expression for plot, will saved to a |
width, height
|
image size in pixels |
... |
passed to |
quoted, envir
|
non-standard evaluation settings |
base64_encode, base64_plot returns 'base64' string in
raw format; base64_urlencode returns 'base64' string url-safe format;
base64_urldecode returns the original string; base64_decode
returns original raw vectors.
# ---- For direct base64URI ------------------------------------ file_raw <- as.raw(1:255) # raw base64 base64_raw <- base64_encode(file_raw) base64_raw as.integer(base64_decode(base64_raw)) # ---- For URL-save base64 ------------------------------------ # Can be used in URL base64_url <- base64_urlencode( paste(c(letters, LETTERS, 0:9), collapse = "")) base64_url base64_urldecode(base64_url) # ---- Convert R plots to base64 -------------------------------- img <- base64_plot({ plot(1:10) }, width = 320, height = 320) # summary print(img) # get base64 content img_base64 <- format(img, type = "content") # save to png tmppng <- tempfile(fileext = ".png") writeBin(base64_decode(img_base64), con = tmppng) # cleanup unlink(tmppng) # Format as svg format(img, type = "html_svg")# ---- For direct base64URI ------------------------------------ file_raw <- as.raw(1:255) # raw base64 base64_raw <- base64_encode(file_raw) base64_raw as.integer(base64_decode(base64_raw)) # ---- For URL-save base64 ------------------------------------ # Can be used in URL base64_url <- base64_urlencode( paste(c(letters, LETTERS, 0:9), collapse = "")) base64_url base64_urldecode(base64_url) # ---- Convert R plots to base64 -------------------------------- img <- base64_plot({ plot(1:10) }, width = 320, height = 320) # summary print(img) # get base64 content img_base64 <- format(img, type = "content") # save to png tmppng <- tempfile(fileext = ".png") writeBin(base64_decode(img_base64), con = tmppng) # cleanup unlink(tmppng) # Format as svg format(img, type = "html_svg")
Force creating directory with checks
dir_create2(x, showWarnings = FALSE, recursive = TRUE, check = TRUE, ...)dir_create2(x, showWarnings = FALSE, recursive = TRUE, check = TRUE, ...)
x |
path to create |
showWarnings, recursive, ...
|
passed to |
check |
whether to check the directory after creation |
Normalized path
path <- file.path(tempfile(), 'a', 'b', 'c') # The following are equivalent dir.create(path, showWarnings = FALSE, recursive = TRUE) dir_create2(path)path <- file.path(tempfile(), 'a', 'b', 'c') # The following are equivalent dir.create(path, showWarnings = FALSE, recursive = TRUE) dir_create2(path)
html_embed_write encodes JSON strings, plain-text strings, and
binary files as base64 <script> tags and injects them into an HTML
file.
html_embed_read reads <script> tags back out of a saved
HTML file and reconstructs the original data.
html_embed_read(path, name = NULL, parse_json = TRUE, update = FALSE) html_embed_write( html_path, json_string = list(), text_string = list(), binary_paths = list(), missing_action = c("error", "warning", "ignore") )html_embed_read(path, name = NULL, parse_json = TRUE, update = FALSE) html_embed_write( html_path, json_string = list(), text_string = list(), binary_paths = list(), missing_action = c("error", "warning", "ignore") )
path |
character path to an HTML file, or a manifest object returned
by a previous call to |
name |
character; the |
parse_json |
logical; when |
update |
logical; when |
html_path |
character; path to the HTML file to write. If the file
does not exist, behavior is controlled by |
json_string |
named list of character strings; each element is a
UTF-8 JSON string. The list name becomes the |
text_string |
named list of character strings; each element is an
arbitrary UTF-8 plain-text string. The list name becomes the
|
binary_paths |
named list of character strings; each element is an
absolute path to a binary file to embed. The list name becomes the
|
missing_action |
character; what to do when |
html_embed_write streams data after </body> (or before
</html>, or appends when neither tag is found). Large inputs are
split into chunks; each chunk gets its own
<script> tag with a sequential data-partition index.
html_embed_read: when name is NULL it returns a
manifest object that lists all embedded entries; subsequent calls with a
specific name use seek positions stored in the manifest to retrieve
only the requested partitions. Files written by compatible tools
(e.g. threeBrain) are handled transparently.
The per-entry <script> tag format:
<script type='text/plain;charset=UTF-8'
data-for='<name>'
data-partition='<N>'
data-type='application/json|text/plain|application/octet-stream'
data-size='<total bytes>'
data-start='<byte offset>'
data-partition-size='<this chunk bytes>'>
BASE64 (72-character wrapped lines)
</script>
html_embed_write: html_path, invisibly.
html_embed_read: when name is NULL, a manifest object
of class ravepipeline_html_embed_manifest listing all embedded
entries. When name is specified the manifest is returned with the
requested entry decoded and cached; access it via
manifest$content[[name]]: a character string for JSON/text data,
or a raw vector for binary data.
html_file <- tempfile(fileext = ".html") writeLines( c("<html>", "<head></head>", "<body></body>", "</html>"), html_file ) # ---- Write: embed JSON and binary data into an HTML file -------- tmp <- tempfile(fileext = ".bin") writeBin(as.raw(0:255), tmp) html_embed_write( html_file, json_string = list(meta = '{"version":1}'), text_string = list(note = "hello world"), binary_paths = list(data = tmp) ) # ---- Read: list all embedded entries ---------------------------- manifest <- html_embed_read(html_file) print(manifest) # ---- Read: decode a specific entry ------------------------------ manifest <- html_embed_read(html_file, name = "meta") manifest$content[["meta"]] # character (JSON string or parsed object) manifest <- html_embed_read(manifest, name = "data") manifest$content[["data"]] # raw vector unlink(c(tmp, html_file))html_file <- tempfile(fileext = ".html") writeLines( c("<html>", "<head></head>", "<body></body>", "</html>"), html_file ) # ---- Write: embed JSON and binary data into an HTML file -------- tmp <- tempfile(fileext = ".bin") writeBin(as.raw(0:255), tmp) html_embed_write( html_file, json_string = list(meta = '{"version":1}'), text_string = list(note = "hello world"), binary_paths = list(data = tmp) ) # ---- Read: list all embedded entries ---------------------------- manifest <- html_embed_read(html_file) print(manifest) # ---- Read: decode a specific entry ------------------------------ manifest <- html_embed_read(html_file, name = "meta") manifest$content[["meta"]] # character (JSON string or parsed object) manifest <- html_embed_read(manifest, name = "data") manifest$content[["data"]] # raw vector unlink(c(tmp, html_file))
Low-level function exported for down-stream 'RAVE' packages.
install_modules(modules, dependencies = FALSE)install_modules(modules, dependencies = FALSE)
modules |
a vector of characters, repository names; default is to automatically determined from a public registry |
dependencies |
whether to update dependent packages; default is false |
nothing
Keep track of messages printed by modules or functions
logger( ..., level = c("info", "success", "warning", "error", "fatal", "debug", "trace"), calc_delta = "auto", .envir = parent.frame(), .sep = "", use_glue = FALSE, reset_timer = FALSE ) set_logger_path(root_path, max_bytes, max_files) logger_threshold( level = c("info", "success", "warning", "error", "fatal", "debug", "trace"), module_id, type = c("console", "file", "both") ) logger_error_condition(cond, level = "error")logger( ..., level = c("info", "success", "warning", "error", "fatal", "debug", "trace"), calc_delta = "auto", .envir = parent.frame(), .sep = "", use_glue = FALSE, reset_timer = FALSE ) set_logger_path(root_path, max_bytes, max_files) logger_threshold( level = c("info", "success", "warning", "error", "fatal", "debug", "trace"), module_id, type = c("console", "file", "both") ) logger_error_condition(cond, level = "error")
..., .envir, .sep
|
passed to |
level |
the level of message, choices are |
calc_delta |
whether to calculate time difference between current
message and previous message; default is |
use_glue |
whether to use |
reset_timer |
whether to reset timer used by |
root_path |
root directory if you want log messages to be saved to
hard disks; if |
max_bytes |
maximum file size for each logger partitions |
max_files |
maximum number of partition files to hold the log; old files will be deleted. |
module_id |
'RAVE' module identification string, or name-space; default
is |
type |
which type of logging should be set; default is |
cond |
condition to log |
The message without time-stamps
logger("This is a message") a <- 1 logger("A message with glue: a={a}") logger("A message without glue: a={a}", use_glue = FALSE) logger("Message A", calc_delta = TRUE, reset_timer = TRUE) logger("Seconds before logging another message", calc_delta = TRUE) # by default, debug and trace messages won't be displayed logger('debug message', level = 'debug') # adjust logger level, make sure `module_id` is a valid RAVE module ID logger_threshold('debug', module_id = NULL) # Debug message will display logger('debug message', level = 'debug') # Trace message will not display as it's lower than debug level logger('trace message', level = 'trace')logger("This is a message") a <- 1 logger("A message with glue: a={a}") logger("A message without glue: a={a}", use_glue = FALSE) logger("Message A", calc_delta = TRUE, reset_timer = TRUE) logger("Seconds before logging another message", calc_delta = TRUE) # by default, debug and trace messages won't be displayed logger('debug message', level = 'debug') # adjust logger level, make sure `module_id` is a valid RAVE module ID logger_threshold('debug', module_id = NULL) # Debug message will display logger('debug message', level = 'debug') # Trace message will not display as it's lower than debug level logger('trace message', level = 'trace')
Creates a 'RAVE' pipeline with additional dashboard module from template.
module_add( module_id, module_label, path = ".", type = c("default", "bare", "scheduler", "python"), ..., pipeline_name = module_id, overwrite = FALSE )module_add( module_id, module_label, path = ".", type = c("default", "bare", "scheduler", "python"), ..., pipeline_name = module_id, overwrite = FALSE )
module_id |
module ID to create, must be unique; users cannot install
two modules with identical module ID. We recommend that
a module ID follows snake format, starting with lab name, for example,
|
module_label |
a friendly label to display in the dashboard |
path |
project root path; default is current directory |
type |
template to choose, options are |
... |
additional configurations to the module such as |
pipeline_name |
the pipeline name to create along with the module;
default is identical to |
overwrite |
whether to overwrite existing module if module with same ID exists; default is false |
Nothing.
# For demonstrating this example only project_root <- tempfile() dir.create(project_root, showWarnings = FALSE, recursive = TRUE) # Add a module module_id <- "mylab_my_first_module" module_add( module_id = module_id, module_label = "My Pipeline", path = project_root ) # show the structure cat( list.files( project_root, recursive = TRUE, full.names = FALSE, include.dirs = TRUE ), sep = "\n" ) unlink(project_root, recursive = TRUE)# For demonstrating this example only project_root <- tempfile() dir.create(project_root, showWarnings = FALSE, recursive = TRUE) # Add a module module_id <- "mylab_my_first_module" module_add( module_id = module_id, module_label = "My Pipeline", path = project_root ) # show the structure cat( list.files( project_root, recursive = TRUE, full.names = FALSE, include.dirs = TRUE ), sep = "\n" ) unlink(project_root, recursive = TRUE)
Create, view, or reserve the module registry
module_registry( title, repo, modules, authors, url = sprintf("https://github.com/%s", repo) ) module_registry2(repo, description) get_modules_registries(update = NA) get_module_description(path) add_module_registry(title, repo, modules, authors, url, dry_run = FALSE)module_registry( title, repo, modules, authors, url = sprintf("https://github.com/%s", repo) ) module_registry2(repo, description) get_modules_registries(update = NA) get_module_description(path) add_module_registry(title, repo, modules, authors, url, dry_run = FALSE)
title |
title of the registry, usually identical to the description
title in |
repo |
'Github' repository |
modules |
characters of module ID, must only contain letters, digits, underscore, dash; must not be duplicated with existing registered modules |
authors |
a list of module authors; there must be one and only one
author with |
url |
the web address of the repository |
update |
whether to force updating the registry |
path, description
|
path to |
dry_run |
whether to generate and preview message content instead of opening an email link |
A 'RAVE' registry contains the following data entries: repository title, name, 'URL', authors, and a list of module IDs. 'RAVE' requires that each module must use a unique module ID. It will cause an issue if two modules share the same ID. Therefore 'RAVE' maintains a public registry list such that the module maintainers can register their own module ID and prevent other people from using it.
To register your own module ID, please use add_module_registry to
validate and send an email to the 'RAVE' development team.
a registry object, or a list of registries
library(ravepipeline) # create your own registry module_registry( repo = "rave-ieeg/rave-pipelines", title = "A Collection of 'RAVE' Builtin Pipelines", authors = list( list("Zhengjia", "Wang", role = c("cre", "aut"), email = "[email protected]") ), modules = "brain_viewer" ) ## Not run: # This example will need access to Github and will open an email link # get current registries get_modules_registries(FALSE) # If your repository is on Github and RAVE-CONFIG file exists module_registry2("rave-ieeg/rave-pipelines") # send a request to add your registry registry <- module_registry2("rave-ieeg/rave-pipelines") add_module_registry(registry) ## End(Not run)library(ravepipeline) # create your own registry module_registry( repo = "rave-ieeg/rave-pipelines", title = "A Collection of 'RAVE' Builtin Pipelines", authors = list( list("Zhengjia", "Wang", role = c("cre", "aut"), email = "[email protected]") ), modules = "brain_viewer" ) ## Not run: # This example will need access to Github and will open an email link # get current registries get_modules_registries(FALSE) # If your repository is on Github and RAVE-CONFIG file exists module_registry2("rave-ieeg/rave-pipelines") # send a request to add your registry registry <- module_registry2("rave-ieeg/rave-pipelines") add_module_registry(registry) ## End(Not run)
Set pipeline inputs, execute, and read pipeline outputs
pipeline( pipeline_name, settings_file = "settings.yaml", paths = pipeline_root(), temporary = FALSE ) pipeline_from_path(path, settings_file = "settings.yaml")pipeline( pipeline_name, settings_file = "settings.yaml", paths = pipeline_root(), temporary = FALSE ) pipeline_from_path(path, settings_file = "settings.yaml")
pipeline_name |
the name of the pipeline, usually title field in the
|
settings_file |
the name of the settings file, usually stores user inputs |
paths |
the paths to search for the pipeline, usually the parent
directory of the pipeline; default is |
temporary |
see |
path |
the pipeline folder |
A PipelineTools instance
library(ravepipeline) if(interactive()) { # ------------ Set up a bare minimal example pipeline --------------- root_path <- tempdir() pipeline_root_folder <- file.path(root_path, "modules") # create pipeline folder pipeline_path <- pipeline_create_template( root_path = pipeline_root_folder, pipeline_name = "raveio_demo", overwrite = TRUE, activate = FALSE, template_type = "rmd-bare") # Set initial user inputs yaml::write_yaml( x = list( n = 100, pch = 16, col = "steelblue" ), file = file.path(pipeline_path, "settings.yaml") ) # build the pipeline for the first time # this is a one-time setup pipeline_build(pipeline_path) # Temporarily redirect the pipeline project root # to `root_path` old_opt <- options("raveio.pipeline.project_root" = root_path) # Make sure the options are reset on.exit({ options(old_opt) }) # Compile the pipeline document pipeline_render( module_id = "raveio_demo", project_path = root_path ) ## Not run: # Open web browser to see compiled report utils::browseURL(file.path(pipeline_path, "main.html")) ## End(Not run) # --------------------- Example starts ------------------------ # Load pipeline pipeline <- pipeline( pipeline_name = "raveio_demo", paths = pipeline_root_folder, temporary = TRUE ) # Check which pipeline targets to run pipeline$target_table # Run to `plot_data`, RAVE pipeline will automatically # calculate which up-stream targets need to be updated # and evaluate these targets pipeline$run("plot_data") # Customize settings pipeline$set_settings(pch = 2) # Run again with the new inputs, since input_data does not change, # the pipeline will skip that target automatically pipeline$run("plot_data") # Read intermediate data head(pipeline$read("input_data")) # or use `[]` to get results pipeline[c("n", "pch", "col")] pipeline[-c("input_data")] # Check evaluating status pipeline$progress("details") # result summary & cache table pipeline$result_table # visualize the target dependency graph pipeline$visualize(glimpse = TRUE) # --------------------- Clean up ------------------------ unlink(pipeline_path, recursive = TRUE) }library(ravepipeline) if(interactive()) { # ------------ Set up a bare minimal example pipeline --------------- root_path <- tempdir() pipeline_root_folder <- file.path(root_path, "modules") # create pipeline folder pipeline_path <- pipeline_create_template( root_path = pipeline_root_folder, pipeline_name = "raveio_demo", overwrite = TRUE, activate = FALSE, template_type = "rmd-bare") # Set initial user inputs yaml::write_yaml( x = list( n = 100, pch = 16, col = "steelblue" ), file = file.path(pipeline_path, "settings.yaml") ) # build the pipeline for the first time # this is a one-time setup pipeline_build(pipeline_path) # Temporarily redirect the pipeline project root # to `root_path` old_opt <- options("raveio.pipeline.project_root" = root_path) # Make sure the options are reset on.exit({ options(old_opt) }) # Compile the pipeline document pipeline_render( module_id = "raveio_demo", project_path = root_path ) ## Not run: # Open web browser to see compiled report utils::browseURL(file.path(pipeline_path, "main.html")) ## End(Not run) # --------------------- Example starts ------------------------ # Load pipeline pipeline <- pipeline( pipeline_name = "raveio_demo", paths = pipeline_root_folder, temporary = TRUE ) # Check which pipeline targets to run pipeline$target_table # Run to `plot_data`, RAVE pipeline will automatically # calculate which up-stream targets need to be updated # and evaluate these targets pipeline$run("plot_data") # Customize settings pipeline$set_settings(pch = 2) # Run again with the new inputs, since input_data does not change, # the pipeline will skip that target automatically pipeline$run("plot_data") # Read intermediate data head(pipeline$read("input_data")) # or use `[]` to get results pipeline[c("n", "pch", "col")] pipeline[-c("input_data")] # Check evaluating status pipeline$progress("details") # result summary & cache table pipeline$result_table # visualize the target dependency graph pipeline$visualize(glimpse = TRUE) # --------------------- Clean up ------------------------ unlink(pipeline_path, recursive = TRUE) }
Experimental, subject to change in the future.
pipeline_collection(root_path = NULL, overwrite = FALSE)pipeline_collection(root_path = NULL, overwrite = FALSE)
root_path |
directory to store pipelines and results |
overwrite |
whether to overwrite if |
A PipelineCollections instance
Install 'RAVE' pipelines
pipeline_install_local( src, to = c("default", "custom", "workdir", "tempdir"), upgrade = FALSE, force = FALSE, set_default = NA, ... ) pipeline_install_github( repo, to = c("default", "custom", "workdir", "tempdir"), upgrade = FALSE, force = FALSE, set_default = NA, ... )pipeline_install_local( src, to = c("default", "custom", "workdir", "tempdir"), upgrade = FALSE, force = FALSE, set_default = NA, ... ) pipeline_install_github( repo, to = c("default", "custom", "workdir", "tempdir"), upgrade = FALSE, force = FALSE, set_default = NA, ... )
src |
pipeline directory |
to |
installation path; choices are |
upgrade |
whether to upgrade the dependence; default is |
force |
whether to force installing the pipelines |
set_default |
whether to set current pipeline module folder as the default, will be automatically set when the pipeline is from the official 'Github' repository. |
... |
other parameters not used |
repo |
'Github' repository in user-repository combination, for example,
|
nothing
## Not run: pipeline_install_github("rave-ieeg/pipelines") # or download github.com/rave-ieeg/pipelines repository, extract # to a folder, and call pipeline_install_local("path/to/pipeline/folder") ## End(Not run)## Not run: pipeline_install_github("rave-ieeg/pipelines") # or download github.com/rave-ieeg/pipelines repository, extract # to a folder, and call pipeline_install_local("path/to/pipeline/folder") ## End(Not run)
Tags an R object so that calling plot on it outside the
pipeline can still dispatch the correct S3 method, even though that
method is only defined inside the pipeline's shared R scripts.
pipeline_plot_data( x, name = substitute(x), strip_oldclasses = TRUE, pipe_dir = Sys.getenv("RAVE_PIPELINE", "."), pipeline_name = NULL )pipeline_plot_data( x, name = substitute(x), strip_oldclasses = TRUE, pipe_dir = Sys.getenv("RAVE_PIPELINE", "."), pipeline_name = NULL )
x |
R object to be used as plot data. |
name |
|
strip_oldclasses |
if |
pipe_dir |
path to the active pipeline directory. Do not set this
when calling from within a pipeline make-file; the default reads the
RAVE_PIPELINE environment variable which is set automatically
during |
pipeline_name |
character string overriding the pipeline name stored in
the returned object. When |
Object x with the class vector
c(name, "ravepipeline_plot_data", <original classes>) and two
extra attributes: pipeline_name and pipeline_plot_class.
A RAVE pipeline keeps its plot helpers in files whose names start with
shared inside the pipeline's R/ folder (e.g.
R/shared-plots.R). Those files are sourced automatically every
time the pipeline runs, but they are not available in an ordinary
interactive R session.
pipeline_plot_data bridges the two contexts by:
Inserting name and the sentinel class
"ravepipeline_plot_data" to the class vector of x.
Attaching the pipeline name as an attribute so the object can be re-associated with its pipeline later.
When plot() is subsequently called:
pipeline_run)The environment variable RAVE_PIPELINE_ACTIVE is "true",
the shared scripts have already been sourced, and plot.<name>
is in scope. plot.ravepipeline_plot_data simply calls
NextMethod() so dispatch falls through to plot.<name>.
plot.ravepipeline_plot_data locates the pipeline by
pipeline_name, calls $shared_env() to source all
R/shared*.R files in an isolated environment, and then
evaluates plot(x) inside that environment, where
plot.<name> is now available.
Step 1: define the S3 method in any file whose name starts
with shared inside the pipeline's R/ directory (e.g.
R/shared-plots.R). The function receives the original object x
with its user-defined class prepended, so standard R dispatch applies:
# R/shared-plots.R (inside the pipeline source tree)
plot.my_pipeline_result <- function(x, ...) {
graphics::plot(
x$time, x$signal,
type = "l",
xlab = "Time (s)",
ylab = "Amplitude",
main = x$title
)
}
Step 2: wrap the target inside main.Rmd (or any pipeline
make-file) by calling pipeline_plot_data with the same name
you used for the S3 method:
# main.Rmd (pipeline make-file target block)
result_plot <- {
ravepipeline::pipeline_plot_data(
list(time = seq(0, 1, by = 0.01),
signal = sin(2 * pi * 10 * seq(0, 1, by = 0.01)),
title = "10 Hz sine wave"),
name = "my_pipeline_result"
)
}
Step 3: call plot() anywhere:
# Interactive session or report
p <- pipeline("my_pipeline")
result <- p$read("result_plot")
plot(result) # sources R/shared-plots.R automatically, then calls
# plot.my_pipeline_result(result)
# 1. R/shared-plots.R -- define the S3 method plot.toy_example <- function(x, ...) { graphics::plot(x$data, xlab = "Index", ylab = "Value", main = x$title %||% "") } # 2. main.Rmd target block -- wrap the data plot_data <- ravepipeline::pipeline_plot_data( list(data = 1:10, title = "Toy example"), name = "toy_example", pipeline_name = "toy_pipeline" ) # 3. Interactive session -- just call plot() plot(plot_data) # dispatches to plot.toy_example via shared_env# 1. R/shared-plots.R -- define the S3 method plot.toy_example <- function(x, ...) { graphics::plot(x$data, xlab = "Index", ylab = "Value", main = x$title %||% "") } # 2. main.Rmd target block -- wrap the data plot_data <- ravepipeline::pipeline_plot_data( list(data = 1:10, title = "Toy example"), name = "toy_example", pipeline_name = "toy_pipeline" ) # 3. Interactive session -- just call plot() plot(plot_data) # dispatches to plot.toy_example via shared_env
Get or change pipeline input parameter settings
pipeline_settings_set( ..., pipeline_path = Sys.getenv("RAVE_PIPELINE", "."), pipeline_settings_path = file.path(pipeline_path, "settings.yaml") ) pipeline_settings_get( key, default = NULL, constraint = NULL, pipeline_path = Sys.getenv("RAVE_PIPELINE", "."), pipeline_settings_path = file.path(pipeline_path, "settings.yaml") )pipeline_settings_set( ..., pipeline_path = Sys.getenv("RAVE_PIPELINE", "."), pipeline_settings_path = file.path(pipeline_path, "settings.yaml") ) pipeline_settings_get( key, default = NULL, constraint = NULL, pipeline_path = Sys.getenv("RAVE_PIPELINE", "."), pipeline_settings_path = file.path(pipeline_path, "settings.yaml") )
pipeline_path |
the root directory of the pipeline |
pipeline_settings_path |
the settings file of the pipeline, must be
a 'yaml' file; default is |
key, ...
|
the character key(s) to get or set |
default |
the default value is key is missing |
constraint |
the constraint of the resulting value; if not |
pipeline_settings_set returns a list of all the settings.
pipeline_settings_get returns the value of given key.
root_path <- tempfile() pipeline_root_folder <- file.path(root_path, "modules") # create pipeline folder pipeline_path <- pipeline_create_template( root_path = pipeline_root_folder, pipeline_name = "raveio_demo", overwrite = TRUE, activate = FALSE, template_type = "rmd-bare") # Set initial user inputs yaml::write_yaml( x = list( n = 100, pch = 16, col = "steelblue" ), file = file.path(pipeline_path, "settings.yaml") ) # build the pipeline for the first time # this is a one-time setup pipeline_build(pipeline_path) # get pipeline settings pipeline_settings_get( key = "n", pipeline_path = pipeline_path ) # get variable with default if missing pipeline_settings_get( key = "missing_variable", default = "missing", pipeline_path = pipeline_path ) pipeline_settings_set( missing_variable = "A", pipeline_path = pipeline_path ) pipeline_settings_get( key = "missing_variable", default = "missing", pipeline_path = pipeline_path ) unlink(root_path, recursive = TRUE)root_path <- tempfile() pipeline_root_folder <- file.path(root_path, "modules") # create pipeline folder pipeline_path <- pipeline_create_template( root_path = pipeline_root_folder, pipeline_name = "raveio_demo", overwrite = TRUE, activate = FALSE, template_type = "rmd-bare") # Set initial user inputs yaml::write_yaml( x = list( n = 100, pch = 16, col = "steelblue" ), file = file.path(pipeline_path, "settings.yaml") ) # build the pipeline for the first time # this is a one-time setup pipeline_build(pipeline_path) # get pipeline settings pipeline_settings_get( key = "n", pipeline_path = pipeline_path ) # get variable with default if missing pipeline_settings_get( key = "missing_variable", default = "missing", pipeline_path = pipeline_path ) pipeline_settings_set( missing_variable = "A", pipeline_path = pipeline_path ) pipeline_settings_get( key = "missing_variable", default = "missing", pipeline_path = pipeline_path ) unlink(root_path, recursive = TRUE)
Translate pipeline settings between pipelines using export and/or import
wizard functions defined in each pipeline's
R/import-export-wizard.R file. pipeline_export_wizard and
pipeline_import_wizard register those wizard functions.
pipeline_translate_settings(src_pipeline, dst_pipeline, settings = NULL) pipeline_export_wizard(fun, pipeline_name, env = parent.frame()) pipeline_import_wizard(fun, pipeline_name, env = parent.frame())pipeline_translate_settings(src_pipeline, dst_pipeline, settings = NULL) pipeline_export_wizard(fun, pipeline_name, env = parent.frame()) pipeline_import_wizard(fun, pipeline_name, env = parent.frame())
src_pipeline |
character name or a |
dst_pipeline |
character name or a |
settings |
named list of settings to translate. If |
fun |
a function with signature |
pipeline_name |
character; the pipeline name this wizard handles, with context-dependent meaning:
|
env |
environment in which to register the wizard. Defaults to the
calling frame, i.e. the sourced |
Translation proceeds in up to two passes:
Export pass: the source pipeline may declare an export wizard
keyed by dst_pipeline_name; if present it converts the settings
into the destination format.
Import pass: the destination pipeline may declare an import wizard keyed by the (possibly already-converted) source pipeline name; if present it applies an additional filter. This also handles the case where the destination pipeline defines a self-filter applied after the export pass.
At least one wizard must exist; otherwise an error is raised.
pipeline_translate_settingsA named list of translated
settings compatible with dst_pipeline_name.
pipeline_export_wizard, pipeline_import_wizard
fun, invisibly. Called for the side effect of registering the
wizard in env.
## Not run: # Translate settings from "pipelineA" to "pipelineB" new_settings <- pipeline_translate_settings( src_pipeline = "pipelineA", dst_pipeline = "pipelineB" ) # To achieve this, you would define export and/or import wizards in the # respective pipelines. # Option 1: Inside the source pipeline (pipelineA): # file `R/import-export-wizard.R`, define an export wizard for pipelineB: pipeline_export_wizard( pipeline_name = "pipelineB", fun = function(settings) { # settings is the current settings list of pipelineA settings$frequency_range <- settings$freq_range settings$freq_range <- NULL settings } ) # Option 2: Inside the destination pipeline (pipelineB): # file `R/import-export-wizard.R`, define an import wizard for pipelineA: pipeline_import_wizard( pipeline_name = "pipelineA", fun = function(settings) { # settings is the current settings list of pipelineA settings$frequency_range <- settings$freq_range settings$freq_range <- NULL settings } ) ## End(Not run)## Not run: # Translate settings from "pipelineA" to "pipelineB" new_settings <- pipeline_translate_settings( src_pipeline = "pipelineA", dst_pipeline = "pipelineB" ) # To achieve this, you would define export and/or import wizards in the # respective pipelines. # Option 1: Inside the source pipeline (pipelineA): # file `R/import-export-wizard.R`, define an export wizard for pipelineB: pipeline_export_wizard( pipeline_name = "pipelineB", fun = function(settings) { # settings is the current settings list of pipelineA settings$frequency_range <- settings$freq_range settings$freq_range <- NULL settings } ) # Option 2: Inside the destination pipeline (pipelineB): # file `R/import-export-wizard.R`, define an import wizard for pipelineA: pipeline_import_wizard( pipeline_name = "pipelineA", fun = function(settings) { # settings is the current settings list of pipelineA settings$frequency_range <- settings$freq_range settings$freq_range <- NULL settings } ) ## End(Not run)
'rmarkdown' files to build 'RAVE' pipelinesAllows building 'RAVE' pipelines from 'rmarkdown' files.
Please use it in 'rmarkdown' scripts only. Use
pipeline_create_template to create an example.
configure_knitr(languages = c("R", "python")) pipeline_setup_rmd( module_id, env = parent.frame(), collapse = TRUE, comment = "#>", languages = c("R", "python"), project_path = getOption("raveio.pipeline.project_root", default = rs_active_project(child_ok = TRUE, shiny_ok = TRUE)) ) pipeline_render( module_id, ..., env = new.env(parent = parent.frame()), entry_file = "main.Rmd", project_path = getOption("raveio.pipeline.project_root", default = rs_active_project(child_ok = TRUE, shiny_ok = TRUE)) )configure_knitr(languages = c("R", "python")) pipeline_setup_rmd( module_id, env = parent.frame(), collapse = TRUE, comment = "#>", languages = c("R", "python"), project_path = getOption("raveio.pipeline.project_root", default = rs_active_project(child_ok = TRUE, shiny_ok = TRUE)) ) pipeline_render( module_id, ..., env = new.env(parent = parent.frame()), entry_file = "main.Rmd", project_path = getOption("raveio.pipeline.project_root", default = rs_active_project(child_ok = TRUE, shiny_ok = TRUE)) )
languages |
one or more programming languages to support; options are
|
module_id |
the module ID, usually the name of direct parent folder containing the pipeline file |
env |
environment to set up the pipeline translator |
collapse, comment
|
passed to |
project_path |
the project path containing all the pipeline folders, usually the active project folder |
... |
passed to internal function calls |
entry_file |
the file to compile; default is |
A function that is supposed to be called later that builds the pipeline scripts
configure_knitr("R") ## Not run: # Requires to configure Python configure_knitr("python") # This function must be called in an Rmd file setup block # for example, see # https://rave.wiki/posts/customize_modules/python_module_01.html pipeline_setup_rmd("my_module_id") ## End(Not run)configure_knitr("R") ## Not run: # Requires to configure Python configure_knitr("python") # This function must be called in an Rmd file setup block # for example, see # https://rave.wiki/posts/customize_modules/python_module_01.html pipeline_setup_rmd("my_module_id") ## End(Not run)
Experimental, subject to change in the future.
verbosewhether to verbose the build
root_pathpath to the directory that contains pipelines and scheduler
collection_pathpath to the pipeline collections
pipeline_idspipeline ID codes
PipelineCollection$new()Constructor
PipelineCollection$new(root_path = NULL, overwrite = FALSE)
root_pathwhere to store the pipelines and intermediate results
overwritewhether to overwrite if root_path exists
PipelineCollection$add_pipeline()Add pipeline into the collection
PipelineCollection$add_pipeline(
x,
names = NULL,
deps = NULL,
pre_hook = NULL,
post_hook = NULL,
cue = c("always", "thorough", "never"),
search_paths = pipeline_root(),
standalone = TRUE,
hook_envir = parent.frame()
)
xa pipeline name (can be found via pipeline_list),
or a PipelineTools
namespipeline targets to execute
depspipeline IDs to depend on; see 'Values' below
pre_hookfunction to run before the pipeline; the function needs two arguments: input map (can be edit in-place), and path to a directory that allows to store temporary files
post_hookfunction to run after the pipeline; the function needs two arguments: pipeline object, and path to a directory that allows to store intermediate results
cuewhether to always run dependence
search_pathswhere to search for pipeline if x is a
character; ignored when x is a pipeline object
standalonewhether the pipeline should be standalone, set to
TRUE if the same pipeline added multiple times should run
independently; default is true
hook_envirwhere to look for global environments if pre_hook
or post_hook contains global variables; default is the calling
environment
A list containing
idthe pipeline ID that can be used by deps
pipelineforked pipeline instance
target_namescopy of names
depend_oncopy of deps
cuecopy of cue
standalonecopy of standalone
PipelineCollection$build_pipelines()Build pipelines and visualize
PipelineCollection$build_pipelines(visualize = TRUE)
visualizewhether to visualize the pipeline; default is true
PipelineCollection$run()Run the collection of pipelines
PipelineCollection$run(
error = c("error", "warning", "ignore"),
.scheduler = c("none", "future", "clustermq"),
.type = c("callr", "smart", "vanilla"),
.as_promise = FALSE,
.async = FALSE,
rebuild = NA,
...
)
errorwhat to do when error occurs; default is 'error'
throwing errors; other choices are 'warning' and 'ignore'
.scheduler, .type, .as_promise, .async, ...passed to
pipeline_run
rebuildwhether to re-build the pipeline; default is NA (
if the pipeline has been built before, then do not rebuild)
PipelineCollection$get_scheduler()Get scheduler object
PipelineCollection$get_scheduler()
Class definition for 'RAVE' pipeline results
progressorprogress bar object, usually generated a progress instance
promisea promise instance that monitors
the pipeline progress
verbosewhether to print warning messages
namesnames of the pipeline to build
async_callbackfunction callback to call in each check loop;
only used when the pipeline is running in async=TRUE mode
check_intervalused when async=TRUE in
pipeline_run, interval in seconds to check the progress
variablestarget variables of the pipeline
variable_descriptionsreadable descriptions of the target variables
validlogical true or false whether the result instance hasn't been invalidated
statusresult status, possible status are 'initialize',
'running', 'finished', 'canceled',
and 'errored'. Note that 'finished' only means the pipeline
process has been finished.
process(read-only) process object if the pipeline is running in
'async' mode, or NULL; see r_bg.
PipelineResult$validate()check if result is valid, raises errors when invalidated
PipelineResult$validate()
PipelineResult$invalidate()invalidate the pipeline result
PipelineResult$invalidate()
PipelineResult$get_progress()get pipeline progress
PipelineResult$get_progress()
PipelineResult$new()constructor (internal)
PipelineResult$new(path = character(0L), verbose = FALSE)
pathpipeline path
verbosewhether to print warnings
PipelineResult$run()run pipeline (internal)
PipelineResult$run( expr, env = parent.frame(), quoted = FALSE, async = FALSE, process = NULL )
exprexpression to evaluate
envenvironment of expr
quotedwhether expr has been quoted
asyncwhether the process runs in other sessions
processthe process object inherits process,
will be inferred from expr if process=NULL,
and will raise errors if cannot be found
PipelineResult$await()wait until some targets get finished
PipelineResult$await(names = NULL, timeout = Inf)
namestarget names to wait, default is NULL, i.e. to
wait for all targets that have been scheduled
timeoutmaximum waiting time in seconds
TRUE if the target is finished, or FALSE if
timeout is reached
PipelineResult$print()print method
PipelineResult$print()
PipelineResult$get_values()get results
PipelineResult$get_values(names = NULL, ...)
namesthe target names to read
...passed to pipeline_read
PipelineResult$clone()The objects of this class are cloneable with this method.
PipelineResult$clone(deep = FALSE)
deepWhether to make a deep clone.
Class definition for 'RAVE' pipelines
RAVESerializable -> PipelineTools
descriptionpipeline description
settings_pathabsolute path to the settings file
extdata_pathabsolute path to the user-defined pipeline data folder
preference_pathdirectory to the pipeline preference folder
target_tabletable of target names and their descriptions
result_tablesummary of the results, including signatures of data and commands
pipeline_paththe absolute path of the pipeline
pipeline_namethe code name of the pipeline
available_reportsavailable reports and their configurations
taskshiny task object, see method 'run_ask_task'
PipelineTools$@marshal()Create an atomic list that can be serialized
PipelineTools$@marshal(...)
...ignored
PipelineTools$@unmarshal()Restore an object from an atomic list
PipelineTools$@unmarshal(object, ...)
objecta list from '@marshal'
...ignored
PipelineTools$new()construction function
PipelineTools$new( pipeline_name, settings_file = "settings.yaml", paths = pipeline_root(), temporary = FALSE )
pipeline_namename of the pipeline, usually in the pipeline
'DESCRIPTION' file, or pipeline folder name
settings_filethe file name of the settings file, where the user inputs are stored
pathsthe paths to find the pipeline, usually the parent folder
of the pipeline; default is pipeline_root()
temporarywhether not to save paths to current pipeline
root registry. Set this to TRUE when importing pipelines
from subject pipeline folders
PipelineTools$set_settings()set inputs
PipelineTools$set_settings(..., .list = NULL)
..., .listnamed list of inputs; all inputs should be named, otherwise errors will be raised
PipelineTools$get_settings()get current inputs
PipelineTools$get_settings(key, default = NULL, constraint)
keythe input name; default is missing, i.e., to get all the settings
defaultdefault value if not found
constraintthe constraint of the results; if input value is not
from constraint, then only the first element of constraint
will be returned.
The value of the inputs, or a list if key is missing
PipelineTools$import_settings()import input settings from file: this can be a
'settings.yaml' from an exported pipeline, or a report HTML
PipelineTools$import_settings(
path,
format = c("auto", "yaml", "html"),
src_pipeline = NULL,
settings_names = NULL,
dry_run = TRUE
)
pathpath to the file containing settings information
formatformat of the file; default is to derive from the file extension
src_pipelinepipeline or pipeline name from which the settings
file was generated. For HTML reports, this is automatically
derived; default is current pipeline if the information cannot be
obtained
settings_namesnames of the input settings to import; default
is NULL to import all. This option helps avoid changing the
underlying data, such as project and subject that has been loaded, and
only adjust analysis parameters.
dry_runwhether to set current pipeline immediately;
default is FALSE.
Imported settings as a list
PipelineTools$read()read intermediate variables
PipelineTools$read(var_names, ifnotfound = NULL, ..., simplify = TRUE)
var_namesthe target names, can be obtained via
x$target_table member; default is missing, i.e., to read
all the intermediate variables
ifnotfoundvariable default value if not found
simplify, ...other parameters passing to pipeline_read
The values of the targets
PipelineTools$run()run the pipeline
PipelineTools$run(
names = NULL,
async = FALSE,
as_promise = async,
scheduler = c("none", "future", "clustermq"),
type = c("smart", "callr", "vanilla"),
envir = new.env(parent = globalenv()),
callr_function = NULL,
return_values = TRUE,
debug = FALSE,
...
)
namespipeline variable names to calculate; default is to calculate all the targets
asyncwhether to run asynchronous in another process
as_promisewhether to return a PipelineResult
instance
scheduler, type, envir, callr_function, return_values, debug, ...passed
to pipeline_run if as_promise is true, otherwise
these arguments will be passed to pipeline_run_bare
A PipelineResult instance if as_promise
or async is true; otherwise a list of values for input names
PipelineTools$run_as_task()Run pipeline as shiny extended task,
requires package shiny
PipelineTools$run_as_task( names = NULL, with_progress = TRUE, check_internals = 0.5, ... )
namestarget names to build, see method 'run'
with_progresswhether to show progress; default is true
check_internals, progress update frequency in seconds; default is 0.5 seconds
...arguments passed to rave_progress
shiny extended task; see ExtendedTask
# pipeline <- ... (initialize pipeline somewhere)
# runs within shiny
server <- function(input, output, session) {
pipeline$run_as_task()
shiny::observe({
shiny::showNotification(pipeline$task$status())
})
}
PipelineTools$eval()run the pipeline in order; unlike $run(), this method
does not use the targets infrastructure, hence the pipeline
results will not be stored, and the order of names will be
respected.
PipelineTools$eval( names, env = parent.frame(), shortcut = FALSE, clean = TRUE, ... )
namespipeline variable names to calculate; must be specified
envenvironment to evaluate and store the results
shortcutlogical or characters; default is FALSE, meaning
names and all the dependencies (if missing from env)
will be evaluated; set to TRUE if only names are to be
evaluated. When shortcut is a character vector, it should be
a list of targets (including their ancestors) whose values can be assumed
to be up-to-date, and the evaluation of those targets can be skipped.
cleanwhether to evaluate without polluting env
...passed to pipeline_eval
PipelineTools$shared_env()run the pipeline shared library in scripts starting with
path R/shared
PipelineTools$shared_env(callr_function = callr::r)
callr_functioneither callr::r or NULL; when
callr::r, the environment will be loaded in isolated R session
and serialized back to the main session to avoid contaminating the
main session environment; when NULL, the code will be sourced
directly in current environment.
An environment of shared variables
PipelineTools$python_module()get 'Python' module embedded in the pipeline
PipelineTools$python_module(
type = c("info", "module", "shared", "exist"),
must_work = TRUE
)
typereturn type, choices are 'info' (get basic information
such as module path, default), 'module' (load module and return
it), 'shared' (load a shared sub-module from the module, which
is shared also in report script), and 'exist' (returns true
or false on whether the module exists or not)
must_workwhether the module needs to be existed or not. If
TRUE, the raise errors when the module does not exist; default
is TRUE, ignored when type is 'exist'.
See type
PipelineTools$progress()get progress of the pipeline
PipelineTools$progress(method = c("summary", "details"))
methodeither 'summary' or 'details'
A table of the progress
PipelineTools$attach()attach pipeline tool to environment (internally used)
PipelineTools$attach(env)
envan environment
PipelineTools$visualize()visualize pipeline target dependency graph
PipelineTools$visualize( glimpse = FALSE, aspect_ratio = 2, node_size = 30, label_size = 40, ... )
glimpsewhether to glimpse the graph network or render the state
aspect_ratiocontrols node spacing
node_size, label_sizesize of nodes and node labels
...passed to pipeline_visualize
a list where the names are target names and values are the corresponding dependence
PipelineTools$target_ancestors()a helper function to get target ancestors
PipelineTools$target_ancestors(names, skip_names = NULL)
namestargets whose ancestor targets need to be queried
skip_namestargets that are assumed to be up-to-date, hence
will be excluded, notice this exclusion is
recursive, that means not only skip_names are excluded,
but also their ancestors will be excluded from the result.
ancestor target names (including names)
PipelineTools$fork()fork (copy) the current pipeline to a new directory
PipelineTools$fork(path, policy = "default")
pathpath to the new pipeline, a folder will be created there
policyfork policy defined by module author, see text file
'fork-policy' under the pipeline directory; if missing, then default to
avoid copying main.html and shared folder
A new pipeline object based on the path given
PipelineTools$fork_to_subject()fork (copy) the current pipeline to a 'RAVE' subject
PipelineTools$fork_to_subject( subject, label = "NA", policy = "default", delete_old = FALSE, sanitize = TRUE )
subjectsubject ID or instance in which pipeline will be saved
labelpipeline label describing the pipeline
policyfork policy defined by module author, see text file
'fork-policy' under the pipeline directory; if missing, then default to
avoid copying main.html and shared folder
delete_oldwhether to delete old pipelines with the same label default is false
sanitizewhether to sanitize the registry at save. This will remove missing folders and import manually copied pipelines to the registry (only for the pipelines with the same name)
A new pipeline object based on the path given
PipelineTools$with_activated()run code with pipeline activated, some environment variables
and function behaviors might change under such condition (for example,
targets package functions)
PipelineTools$with_activated(expr, quoted = FALSE, env = parent.frame())
exprexpression to evaluate
quotedwhether expr is quoted; default is false
envenvironment to run expr
PipelineTools$clean()clean all or part of the data store
PipelineTools$clean(
destroy = c("all", "cloud", "local", "meta", "process", "preferences", "progress",
"objects", "scratch", "workspaces"),
ask = FALSE
)
destroy, asksee tar_destroy
PipelineTools$save_data()save data to pipeline data folder
PipelineTools$save_data(
data,
name,
format = c("json", "yaml", "csv", "fst", "rds"),
overwrite = FALSE,
...
)
dataR object
namethe name of the data to save, must start with letters
formatserialize format, choices are 'json',
'yaml', 'csv', 'fst', 'rds'; default is
'json'. To save arbitrary objects such as functions or
environments, use 'rds'
overwritewhether to overwrite existing files; default is no
...passed to saver functions
the saved file path
PipelineTools$load_data()load data from pipeline data folder
PipelineTools$load_data(
name,
error_if_missing = TRUE,
default_if_missing = NULL,
format = c("auto", "json", "yaml", "csv", "fst", "rds"),
...
)
namethe name of the data
error_if_missingwhether to raise errors if the name is missing
default_if_missingdefault values to return if the name is missing
formatthe format of the data, default is automatically obtained from the file extension
...passed to loader functions
the data if file is found or a default value
PipelineTools$set_preferences()set persistent preferences from the pipeline. The preferences should not affect how pipeline is working, hence usually stores minor variables such as graphic options. Changing preferences will not invalidate pipeline cache.
PipelineTools$set_preferences(..., .list = NULL)
..., .listkey-value pairs of initial preference values. The keys
must start with 'global' or the module ID, followed by dot and preference
type and names. For example 'global.graphics.continuous_palette'
for setting palette colors for continuous heat-map; "global" means the
settings should be applied to all 'RAVE' modules. The module-level
preference, 'power_explorer.export.default_format' sets the
default format for power-explorer export dialogue.
namepreference name, must contain only letters, digits, underscore, and hyphen, will be coerced to lower case (case-insensitive)
A list of key-value pairs
PipelineTools$get_preferences()get persistent preferences from the pipeline.
PipelineTools$get_preferences( keys, simplify = TRUE, ifnotfound = NULL, validator = NULL, modes = NULL, ... )
keyscharacters to get the preferences
simplifywhether to simplify the results when length of key is 1; default is true; set to false to always return a list of preferences
ifnotfounddefault value when the key is missing
validatorNULL or function to validate the values; see
'Examples'
modeslength of zero (no type-constraint), character vector
with length of one or length(keys) specifying the type of preference
values; see pipeline_get_preferences
...passed to validator if validator is a function
A list of the preferences. If simplify is true and length
if keys is 1, then returns the value of that preference
library(ravepipeline)
if(interactive() && length(pipeline_list()) > 0) {
pipeline <- pipeline("power_explorer")
# set dummy preference
pipeline$set_preferences("global.example.dummy_preference" = 1:3)
# get preference
pipeline$get_preferences("global.example.dummy_preference")
# get preference with validator to ensure the value length to be 1
pipeline$get_preferences(
"global.example.dummy_preference",
validator = function(value) {
stopifnot(length(value) == 1)
},
ifnotfound = 100
)
pipeline$has_preferences("global.example.dummy_preference")
}
PipelineTools$has_preferences()whether pipeline has preference keys
PipelineTools$has_preferences(keys, ...)
keyscharacters name of the preferences
...passed to internal methods
logical whether the keys exist
PipelineTools$source_document()obtain the source document
PipelineTools$source_document()
characters if the source document (main.Rmd) is found,
otherwise NULL
PipelineTools$generate_report()generate pipeline
PipelineTools$generate_report( name, subject = NULL, output_dir = NULL, output_format = "auto", clean = FALSE, callback = NULL, ... )
namereport name, see field 'available_reports'
subjectsubject helps determine the output_dir and
working directories
output_dirparent folder where output will be stored
output_formatoutput format
cleanwhether to clean the output; default is false
callbackcallback function (if not NULL) to run once
the report is created; typically used for actions such as zipping the
report directory, sending out report via emails. The function must only
take one argument, which is the directory where the report resides.
The callback function will be evaluated in a separate session so please
make sure the function itself is self-contained.
...passed to 'rmarkdown' render function
A job identification number, see resolve_job for
querying job details
PipelineTools$clone()The objects of this class are cloneable with this method.
PipelineTools$clone(deep = FALSE)
deepWhether to make a deep clone.
## ------------------------------------------------ ## Method `PipelineTools$run_as_task()` ## ------------------------------------------------ # pipeline <- ... (initialize pipeline somewhere) # runs within shiny server <- function(input, output, session) { pipeline$run_as_task() shiny::observe({ shiny::showNotification(pipeline$task$status()) }) } ## ------------------------------------------------ ## Method `PipelineTools$get_preferences()` ## ------------------------------------------------ library(ravepipeline) if(interactive() && length(pipeline_list()) > 0) { pipeline <- pipeline("power_explorer") # set dummy preference pipeline$set_preferences("global.example.dummy_preference" = 1:3) # get preference pipeline$get_preferences("global.example.dummy_preference") # get preference with validator to ensure the value length to be 1 pipeline$get_preferences( "global.example.dummy_preference", validator = function(value) { stopifnot(length(value) == 1) }, ifnotfound = 100 ) pipeline$has_preferences("global.example.dummy_preference") }## ------------------------------------------------ ## Method `PipelineTools$run_as_task()` ## ------------------------------------------------ # pipeline <- ... (initialize pipeline somewhere) # runs within shiny server <- function(input, output, session) { pipeline$run_as_task() shiny::observe({ shiny::showNotification(pipeline$task$status()) }) } ## ------------------------------------------------ ## Method `PipelineTools$get_preferences()` ## ------------------------------------------------ library(ravepipeline) if(interactive() && length(pipeline_list()) > 0) { pipeline <- pipeline("power_explorer") # set dummy preference pipeline$set_preferences("global.example.dummy_preference" = 1:3) # get preference pipeline$get_preferences("global.example.dummy_preference") # get preference with validator to ensure the value length to be 1 pipeline$get_preferences( "global.example.dummy_preference", validator = function(value) { stopifnot(length(value) == 1) }, ifnotfound = 100 ) pipeline$has_preferences("global.example.dummy_preference") }
Automatically displays 'shiny' progress when shiny is present, or text messages to track progress
rave_progress( title, max = 1, ..., quiet = FALSE, session = get_shiny_session(), shiny_auto_close = FALSE, log = NULL )rave_progress( title, max = 1, ..., quiet = FALSE, session = get_shiny_session(), shiny_auto_close = FALSE, log = NULL )
title |
progress title |
max |
maximum steps |
... |
passed to shiny progress |
quiet |
whether to suppress the progress |
session |
shiny session |
shiny_auto_close |
whether to automatically close the progress bar when the parent function is closed |
log |
alternative log function if not default ( |
A list of functions to control the progress bar
# Naive example progress <- rave_progress(title = "progress", max = 10) progress$inc("job 1") progress$inc("job 2") progress$close() # Within function slow_sum <- function(n = 11) { p <- rave_progress(title = "progress", max = n, shiny_auto_close = TRUE) s <- 0 for( i in seq(1, n) ) { Sys.sleep(0.1) p$inc(sprintf("adding %d", i)) s <- s + i } invisible(s) } slow_sum()# Naive example progress <- rave_progress(title = "progress", max = 10) progress$inc("job 1") progress$inc("job 2") progress$close() # Within function slow_sum <- function(n = 11) { p <- rave_progress(title = "progress", max = n, shiny_auto_close = TRUE) s <- 0 for( i in seq(1, n) ) { Sys.sleep(0.1) p$inc(sprintf("adding %d", i)) s <- s + i } invisible(s) } slow_sum()
Utility functions for 'RAVE' pipelines, currently designed for internal development use. The infrastructure will be deployed to 'RAVE' in the future to facilitate the "self-expanding" aim. Please check the official 'RAVE' website.
pipeline_root(root_path, temporary = FALSE) pipeline_list(root_path = pipeline_root()) pipeline_find(name, root_path = pipeline_root()) pipeline_attach(name, root_path = pipeline_root()) pipeline_run( pipe_dir = Sys.getenv("RAVE_PIPELINE", "."), scheduler = c("none", "future", "clustermq"), type = c("smart", "callr", "vanilla"), envir = new.env(parent = globalenv()), callr_function = NULL, names = NULL, async = FALSE, check_interval = 0.5, progress_quiet = !async, progress_max = NA, progress_title = "Running pipeline", return_values = TRUE, debug = FALSE, ... ) pipeline_clean( pipe_dir = Sys.getenv("RAVE_PIPELINE", "."), destroy = c("all", "cloud", "local", "meta", "process", "preferences", "progress", "objects", "scratch", "workspaces"), ask = FALSE ) pipeline_run_bare( pipe_dir = Sys.getenv("RAVE_PIPELINE", "."), scheduler = c("none", "future", "clustermq"), type = c("smart", "callr", "vanilla"), envir = new.env(parent = globalenv()), callr_function = NULL, names = NULL, return_values = TRUE, debug = FALSE, ... ) load_targets(..., env = NULL) pipeline_target_names(pipe_dir = Sys.getenv("RAVE_PIPELINE", ".")) pipeline_debug( quick = TRUE, env = parent.frame(), pipe_dir = Sys.getenv("RAVE_PIPELINE", "."), skip_names ) pipeline_dep_targets( names, skip_names = NULL, pipe_dir = Sys.getenv("RAVE_PIPELINE", ".") ) pipeline_eval( names, env = new.env(parent = parent.frame()), pipe_dir = Sys.getenv("RAVE_PIPELINE", "."), settings_path = file.path(pipe_dir, "settings.yaml"), shortcut = FALSE ) pipeline_visualize( pipe_dir = Sys.getenv("RAVE_PIPELINE", "."), glimpse = FALSE, targets_only = TRUE, shortcut = FALSE, zoom_speed = 0.1, ... ) pipeline_progress( pipe_dir = Sys.getenv("RAVE_PIPELINE", "."), method = c("summary", "details", "custom"), func = targets::tar_progress_summary ) pipeline_fork( src = Sys.getenv("RAVE_PIPELINE", "."), dest = tempfile(pattern = "rave_pipeline_"), policy = "default", activate = FALSE, ... ) pipeline_build(pipe_dir = Sys.getenv("RAVE_PIPELINE", ".")) pipeline_read( var_names, pipe_dir = Sys.getenv("RAVE_PIPELINE", "."), branches = NULL, ifnotfound = NULL, dependencies = c("none", "ancestors_only", "all"), simplify = TRUE, ... ) pipeline_vartable( pipe_dir = Sys.getenv("RAVE_PIPELINE", "."), targets_only = TRUE, complete_only = FALSE, ... ) pipeline_hasname(var_names, pipe_dir = Sys.getenv("RAVE_PIPELINE", ".")) pipeline_watch( pipe_dir = Sys.getenv("RAVE_PIPELINE", "."), targets_only = TRUE, ... ) pipeline_create_template( root_path, pipeline_name, overwrite = FALSE, activate = TRUE, template_type = c("rmd", "r", "rmd-bare", "rmd-scheduler", "rmd-python") ) pipeline_create_subject_pipeline( subject, pipeline_name, overwrite = FALSE, activate = TRUE, template_type = c("rmd", "r", "rmd-python") ) pipeline_description(file) pipeline_load_extdata( name, format = c("auto", "json", "yaml", "csv", "fst", "rds"), error_if_missing = TRUE, default_if_missing = NULL, pipe_dir = Sys.getenv("RAVE_PIPELINE", "."), ... ) pipeline_save_extdata( data, name, format = c("json", "yaml", "csv", "fst", "rds"), overwrite = FALSE, pipe_dir = Sys.getenv("RAVE_PIPELINE", "."), ... ) pipeline_shared( pipe_dir = Sys.getenv("RAVE_PIPELINE", "."), callr_function = callr::r )pipeline_root(root_path, temporary = FALSE) pipeline_list(root_path = pipeline_root()) pipeline_find(name, root_path = pipeline_root()) pipeline_attach(name, root_path = pipeline_root()) pipeline_run( pipe_dir = Sys.getenv("RAVE_PIPELINE", "."), scheduler = c("none", "future", "clustermq"), type = c("smart", "callr", "vanilla"), envir = new.env(parent = globalenv()), callr_function = NULL, names = NULL, async = FALSE, check_interval = 0.5, progress_quiet = !async, progress_max = NA, progress_title = "Running pipeline", return_values = TRUE, debug = FALSE, ... ) pipeline_clean( pipe_dir = Sys.getenv("RAVE_PIPELINE", "."), destroy = c("all", "cloud", "local", "meta", "process", "preferences", "progress", "objects", "scratch", "workspaces"), ask = FALSE ) pipeline_run_bare( pipe_dir = Sys.getenv("RAVE_PIPELINE", "."), scheduler = c("none", "future", "clustermq"), type = c("smart", "callr", "vanilla"), envir = new.env(parent = globalenv()), callr_function = NULL, names = NULL, return_values = TRUE, debug = FALSE, ... ) load_targets(..., env = NULL) pipeline_target_names(pipe_dir = Sys.getenv("RAVE_PIPELINE", ".")) pipeline_debug( quick = TRUE, env = parent.frame(), pipe_dir = Sys.getenv("RAVE_PIPELINE", "."), skip_names ) pipeline_dep_targets( names, skip_names = NULL, pipe_dir = Sys.getenv("RAVE_PIPELINE", ".") ) pipeline_eval( names, env = new.env(parent = parent.frame()), pipe_dir = Sys.getenv("RAVE_PIPELINE", "."), settings_path = file.path(pipe_dir, "settings.yaml"), shortcut = FALSE ) pipeline_visualize( pipe_dir = Sys.getenv("RAVE_PIPELINE", "."), glimpse = FALSE, targets_only = TRUE, shortcut = FALSE, zoom_speed = 0.1, ... ) pipeline_progress( pipe_dir = Sys.getenv("RAVE_PIPELINE", "."), method = c("summary", "details", "custom"), func = targets::tar_progress_summary ) pipeline_fork( src = Sys.getenv("RAVE_PIPELINE", "."), dest = tempfile(pattern = "rave_pipeline_"), policy = "default", activate = FALSE, ... ) pipeline_build(pipe_dir = Sys.getenv("RAVE_PIPELINE", ".")) pipeline_read( var_names, pipe_dir = Sys.getenv("RAVE_PIPELINE", "."), branches = NULL, ifnotfound = NULL, dependencies = c("none", "ancestors_only", "all"), simplify = TRUE, ... ) pipeline_vartable( pipe_dir = Sys.getenv("RAVE_PIPELINE", "."), targets_only = TRUE, complete_only = FALSE, ... ) pipeline_hasname(var_names, pipe_dir = Sys.getenv("RAVE_PIPELINE", ".")) pipeline_watch( pipe_dir = Sys.getenv("RAVE_PIPELINE", "."), targets_only = TRUE, ... ) pipeline_create_template( root_path, pipeline_name, overwrite = FALSE, activate = TRUE, template_type = c("rmd", "r", "rmd-bare", "rmd-scheduler", "rmd-python") ) pipeline_create_subject_pipeline( subject, pipeline_name, overwrite = FALSE, activate = TRUE, template_type = c("rmd", "r", "rmd-python") ) pipeline_description(file) pipeline_load_extdata( name, format = c("auto", "json", "yaml", "csv", "fst", "rds"), error_if_missing = TRUE, default_if_missing = NULL, pipe_dir = Sys.getenv("RAVE_PIPELINE", "."), ... ) pipeline_save_extdata( data, name, format = c("json", "yaml", "csv", "fst", "rds"), overwrite = FALSE, pipe_dir = Sys.getenv("RAVE_PIPELINE", "."), ... ) pipeline_shared( pipe_dir = Sys.getenv("RAVE_PIPELINE", "."), callr_function = callr::r )
root_path |
the root directory for pipeline templates |
temporary |
whether not to save |
name, pipeline_name
|
the pipeline name to create; usually also the folder |
pipe_dir |
where the pipeline directory is; can be set via
system environment |
scheduler |
how to schedule the target jobs: default is |
type |
how the pipeline should be executed; current choices are
|
callr_function |
function that will be passed to
|
names |
the names of pipeline targets that are to be executed; default
is |
async |
whether to run pipeline without blocking the main session |
check_interval |
when running in background (non-blocking mode), how often to check the pipeline |
progress_title, progress_max, progress_quiet
|
control the progress |
return_values |
whether to return pipeline target values; default is
true; only works in |
debug |
whether to debug the process; default is false |
... |
other parameters, targets, etc. |
destroy |
what part of data repository needs to be cleaned |
ask |
whether to ask |
env, envir
|
environment to execute the pipeline |
quick |
whether to skip finished targets to save time |
skip_names |
hint of target names to fast skip provided they are
up-to-date; only used when |
settings_path |
path to settings file name within subject's pipeline path |
shortcut |
whether to display shortcut targets |
glimpse |
whether to hide network status when visualizing the pipelines |
targets_only |
whether to return the variable table for targets only; default is true |
zoom_speed |
zoom speed when visualizing the pipeline dependence |
method |
how the progress should be presented; choices are
|
func |
function to call when reading customized pipeline progress;
default is |
src, dest
|
pipeline folder to copy the pipeline script from and to |
policy |
fork policy defined by module author, see text file
'fork-policy' under the pipeline directory; if missing, then default to
avoid copying |
activate |
whether to activate the new pipeline folder from |
var_names |
variable name to fetch or to check |
branches |
branch to read from; see |
ifnotfound |
default values to return if variable is not found |
dependencies |
whether to load dependent targets, choices are
|
simplify |
whether to simplify the output |
complete_only |
whether only to show completed and up-to-date target variables; default is false |
overwrite |
whether to overwrite existing pipeline; default is false so users can double-check; if true, then existing pipeline, including the data will be erased |
template_type |
which template type to create; choices are |
subject |
character indicating valid 'RAVE' subject ID, or a
|
file |
path to the 'DESCRIPTION' file under the pipeline folder, or pipeline collection folder that contains the pipeline information, structures, dependencies, etc. |
format |
format of the extended data, default is |
error_if_missing, default_if_missing
|
what to do if the extended data is not found |
data |
extended data to be saved |
pipeline_rootthe root directories of the pipelines
pipeline_listthe available pipeline names under pipeline_root
pipeline_findthe path to the pipeline
pipeline_runa PipelineResult instance
load_targetsa list of targets to build
pipeline_target_namesa vector of characters indicating the pipeline target names
pipeline_visualizea widget visualizing the target dependence structure
pipeline_progressa table of building progress
pipeline_forka normalized path of the forked pipeline directory
pipeline_readthe value of corresponding var_names, or a named list if var_names has more than one element
pipeline_vartablea table of summaries of the variables; can raise errors if pipeline has never been executed
pipeline_hasnamelogical, whether the pipeline has variable built
pipeline_watcha basic shiny application to monitor the progress
pipeline_descriptionthe list of descriptions of the pipeline or pipeline collection
Run a function (job) in another session
start_job( fun, fun_args = list(), packages = NULL, workdir = NULL, method = c("callr", "rs_job", "mirai"), name = NULL, ensure_init = TRUE, digest_key = NULL, envvars = NULL, log_path = NULL ) check_job(job_id) resolve_job( job_id, timeout = Inf, auto_remove = TRUE, must_init = TRUE, unresolved = c("warning", "error", "silent"), log_maxline = getOption("ravepipeline.log_maxline", 0L) ) remove_job(job_id)start_job( fun, fun_args = list(), packages = NULL, workdir = NULL, method = c("callr", "rs_job", "mirai"), name = NULL, ensure_init = TRUE, digest_key = NULL, envvars = NULL, log_path = NULL ) check_job(job_id) resolve_job( job_id, timeout = Inf, auto_remove = TRUE, must_init = TRUE, unresolved = c("warning", "error", "silent"), log_maxline = getOption("ravepipeline.log_maxline", 0L) ) remove_job(job_id)
fun |
function to evaluate |
fun_args |
list of function arguments |
packages |
list of packages to load |
workdir |
working directory; default is temporary path |
method |
job type; choices are |
name |
name of the job |
ensure_init |
whether to make sure the job has been started; default is true |
digest_key |
a string that will affect how job ID is generated; used internally |
envvars |
additional environment variables to set; must be a named list of environment variables |
log_path |
path to a log file for capturing both standard output
and messages ( |
job_id |
job identification number |
timeout |
timeout in seconds before the resolve ends; jobs that
are still running are subject to |
auto_remove |
whether to automatically remove the job if resolved; default it true |
must_init |
whether the resolve should error out if the job is not
initialized: typically meaning the either the resolving occurs too soon
(only when |
unresolved |
what to do if the job is still running after timing-out;
default is |
log_maxline |
maximum number of log lines to read from the tail of
the log file when resolving a job; default is
|
For start_job, a string of job identification number;
check_job returns the job status; resolve_job returns
the function result.
## Not run: # Basic use job_id <- start_job(function() { Sys.sleep(1) Sys.getpid() }) check_job(job_id) result <- resolve_job(job_id) # As promise library(promises) as.promise( start_job(function() { Sys.sleep(1) Sys.getpid() }) ) %...>% print() ## End(Not run)## Not run: # Basic use job_id <- start_job(function() { Sys.sleep(1) Sys.getpid() }) check_job(job_id) result <- resolve_job(job_id) # As promise library(promises) as.promise( start_job(function() { Sys.sleep(1) Sys.getpid() }) ) %...>% print() ## End(Not run)
Get, set, and check persistent preference values for 'RAVE' pipelines and modules. Preferences are stored in a global on-disk store that survives across R sessions.
pipeline_set_preferences( ..., .list = NULL, .pipe_dir = Sys.getenv("RAVE_PIPELINE", "."), .preference_instance = NULL ) pipeline_get_preferences( keys, simplify = TRUE, ifnotfound = NULL, validator = NULL, modes = NULL, ..., .preference_instance = NULL ) pipeline_has_preferences(keys, ..., .preference_instance = NULL)pipeline_set_preferences( ..., .list = NULL, .pipe_dir = Sys.getenv("RAVE_PIPELINE", "."), .preference_instance = NULL ) pipeline_get_preferences( keys, simplify = TRUE, ifnotfound = NULL, validator = NULL, modes = NULL, ..., .preference_instance = NULL ) pipeline_has_preferences(keys, ..., .preference_instance = NULL)
..., .list
|
for |
.pipe_dir |
the active pipeline directory used to determine the allowed
key prefix; defaults to the |
.preference_instance |
pipeline preference instance: this is
automatically filled when calling from |
keys |
one or more preference key strings following the
|
simplify |
if |
ifnotfound |
value returned when a requested key is absent or fails
validation; default is |
validator |
|
modes |
|
Preference keys must follow a three-part dot-separated naming convention
[prefix].[type].[key]:
prefixEither "global" (shared across all modules) or
a specific module ID such as "power_explorer". When calling
pipeline_set_preferences from within a pipeline, the allowed
prefixes are automatically restricted to "global" and the
current pipeline name.
typeA category string such as "graphics" or
"export".
keyThe individual preference item, e.g.
"use_ggplot" or "default_format".
Valid examples: "global.graphics.use_ggplot",
"power_explorer.export.default_format".
Setting a preference value to NULL removes the key from the store.
pipeline_set_preferencesInvisibly returns the named list of values that were passed in.
pipeline_get_preferencesThe preference value(s): a single
value when simplify = TRUE and one key is requested, otherwise
a named list with one element per key.
pipeline_has_preferencesA logical vector the same length as
keys indicating which keys currently exist in the preference
store.
## Not run: # Set preferences (keys use [prefix].[type].[key] convention) pipeline_set_preferences( "global.graphics.use_ggplot" = TRUE, "global.graphics.cex" = 1.2 ) # Check whether keys exist pipeline_has_preferences( c("global.graphics.use_ggplot", "global.graphics.cex") ) # Retrieve a single preference (returns the value directly) pipeline_get_preferences("global.graphics.cex") # Retrieve multiple preferences as a named list pipeline_get_preferences( keys = c("global.graphics.use_ggplot", "global.graphics.cex"), simplify = FALSE ) # Return a default when the key is absent pipeline_get_preferences("global.graphics.missing_key", ifnotfound = FALSE) # Validate the stored mode; fall back to default on mismatch pipeline_get_preferences( "global.graphics.cex", modes = "numeric", ifnotfound = 1.0 ) # Remove a preference by setting it to NULL pipeline_set_preferences("global.graphics.cex" = NULL) ## End(Not run)## Not run: # Set preferences (keys use [prefix].[type].[key] convention) pipeline_set_preferences( "global.graphics.use_ggplot" = TRUE, "global.graphics.cex" = 1.2 ) # Check whether keys exist pipeline_has_preferences( c("global.graphics.use_ggplot", "global.graphics.cex") ) # Retrieve a single preference (returns the value directly) pipeline_get_preferences("global.graphics.cex") # Retrieve multiple preferences as a named list pipeline_get_preferences( keys = c("global.graphics.use_ggplot", "global.graphics.cex"), simplify = FALSE ) # Return a default when the key is absent pipeline_get_preferences("global.graphics.missing_key", ifnotfound = FALSE) # Validate the stored mode; fall back to default on mismatch pipeline_get_preferences( "global.graphics.cex", modes = "numeric", ifnotfound = 1.0 ) # Remove a preference by setting it to NULL pipeline_set_preferences("global.graphics.cex" = NULL) ## End(Not run)
Serialization reference hook generic functions
rave_serialize_refhook(object) rave_serialize_impl(object) ## Default S3 method: rave_serialize_impl(object) ## S3 method for class 'RAVESerializable' rave_serialize_impl(object) ## S3 method for class ''rave-brain'' rave_serialize_impl(object) rave_unserialize_refhook(x) rave_unserialize_impl(x) ## Default S3 method: rave_unserialize_impl(x) ## S3 method for class 'rave_serialized' rave_unserialize_impl(x) ## S3 method for class 'rave_serialized_r6' rave_unserialize_impl(x) ## S3 method for class ''rave_serialized_rave-brain'' rave_unserialize_impl(x)rave_serialize_refhook(object) rave_serialize_impl(object) ## Default S3 method: rave_serialize_impl(object) ## S3 method for class 'RAVESerializable' rave_serialize_impl(object) ## S3 method for class ''rave-brain'' rave_serialize_impl(object) rave_unserialize_refhook(x) rave_unserialize_impl(x) ## Default S3 method: rave_unserialize_impl(x) ## S3 method for class 'rave_serialized' rave_unserialize_impl(x) ## S3 method for class 'rave_serialized_r6' rave_unserialize_impl(x) ## S3 method for class ''rave_serialized_rave-brain'' rave_unserialize_impl(x)
object |
Object to serialize (environment or external pointers) |
x |
raw or string objects that will be passed to
|
rave_serialize_refhook returns either serialized objects
in string (raw vector converted to char via rawToChar), or NULL
indicating the object undergoing default serialization;
rave_unserialize_refhook returns the reconstructed object.
# This example requires additional `filearray` package # If you are an RAVE user (installed RAVE via rave.wiki) # then this package was installed x0 <- array(rnorm(240000), c(200, 300, 4)) x1 <- filearray::as_filearray(x0) x2 <- RAVEFileArray$new(x1, temporary = TRUE) r0 <- serialize(x0, NULL, refhook = rave_serialize_refhook) r1 <- serialize(x1, NULL, refhook = rave_serialize_refhook) r2 <- serialize(x2, NULL, refhook = rave_serialize_refhook) # Compare the serialization sizes c(length(r0), length(r1), length(r2)) y0 <- unserialize(r0, refhook = rave_unserialize_refhook) y1 <- unserialize(r1, refhook = rave_unserialize_refhook) y2 <- unserialize(r2, refhook = rave_unserialize_refhook) all(y0 == x0) all(y1[] == x0) all(y2[] == x0) ## Not run: # 3D Brain, this example needs RAVE installation, not included in # this package, needs extra installations available at rave.wiki # 4 MB brain <- ravecore::rave_brain("demo/DemoSubject") # 52 KB rbrain <- serialize(brain, NULL, refhook = rave_serialize_refhook) brain2 <- unserialize(rbrain, refhook = rave_unserialize_refhook) brain2$plot() ## End(Not run)# This example requires additional `filearray` package # If you are an RAVE user (installed RAVE via rave.wiki) # then this package was installed x0 <- array(rnorm(240000), c(200, 300, 4)) x1 <- filearray::as_filearray(x0) x2 <- RAVEFileArray$new(x1, temporary = TRUE) r0 <- serialize(x0, NULL, refhook = rave_serialize_refhook) r1 <- serialize(x1, NULL, refhook = rave_serialize_refhook) r2 <- serialize(x2, NULL, refhook = rave_serialize_refhook) # Compare the serialization sizes c(length(r0), length(r1), length(r2)) y0 <- unserialize(r0, refhook = rave_unserialize_refhook) y1 <- unserialize(r1, refhook = rave_unserialize_refhook) y2 <- unserialize(r2, refhook = rave_unserialize_refhook) all(y0 == x0) all(y1[] == x0) all(y2[] == x0) ## Not run: # 3D Brain, this example needs RAVE installation, not included in # this package, needs extra installations available at rave.wiki # 4 MB brain <- ravecore::rave_brain("demo/DemoSubject") # 52 KB rbrain <- serialize(brain, NULL, refhook = rave_serialize_refhook) brain2 <- unserialize(rbrain, refhook = rave_unserialize_refhook) brain2$plot() ## End(Not run)
Run snippet code
update_local_snippet(force = TRUE) install_snippet(path) list_snippets() load_snippet(topic, local = TRUE)update_local_snippet(force = TRUE) install_snippet(path) list_snippets() load_snippet(topic, local = TRUE)
force |
whether to force updating the snippets; default is true |
path |
for installing code snippets locally only; can be an R script, a zip file, or a directory |
topic |
snippet topic |
local |
whether to use local snippets first before requesting online repository |
load_snippet returns snippet as a function, others return nothing
# This example script requires running in an interactive session if(interactive()) { # ---- Example 1: Install built-in pipeline snippets ------------ update_local_snippet(force = TRUE) # ---- Example 2: Install customized pipeline snippets ---------- snippets <- file.path( "https://github.com/rave-ieeg/rave-gists", "archive/refs/heads/main.zip", fsep = "/" ) tempf <- tempfile(fileext = ".zip") utils::download.file(url = snippets, destfile = tempf) install_snippet(tempf) } # ---- List snippets -------------------------------------------- # list all topics list_snippets() # ---- Run snippets as functions -------------------------------- topic <- "image-burn-contacts-to-t1" # check whether this example can run # This snippet requires installing package `raveio` # which is currently not on CRAN (soon it will) condition_met <- topic %in% list_snippets() && (system.file(package = "raveio") != "") if( interactive() && condition_met ) { snippet <- load_snippet(topic) # Read snippet documentation print(snippet) results <- snippet( subject_code = "DemoSubject", project_name = "demo", save_path = NA, blank_underlay = FALSE ) plot(results) }# This example script requires running in an interactive session if(interactive()) { # ---- Example 1: Install built-in pipeline snippets ------------ update_local_snippet(force = TRUE) # ---- Example 2: Install customized pipeline snippets ---------- snippets <- file.path( "https://github.com/rave-ieeg/rave-gists", "archive/refs/heads/main.zip", fsep = "/" ) tempf <- tempfile(fileext = ".zip") utils::download.file(url = snippets, destfile = tempf) install_snippet(tempf) } # ---- List snippets -------------------------------------------- # list all topics list_snippets() # ---- Run snippets as functions -------------------------------- topic <- "image-burn-contacts-to-t1" # check whether this example can run # This snippet requires installing package `raveio` # which is currently not on CRAN (soon it will) condition_met <- topic %in% list_snippets() && (system.file(package = "raveio") != "") if( interactive() && condition_met ) { snippet <- load_snippet(topic) # Read snippet documentation print(snippet) results <- snippet( subject_code = "DemoSubject", project_name = "demo", save_path = NA, blank_underlay = FALSE ) plot(results) }
'FileArray'
Wrapper for better serialization (check 'See also')
RAVESerializable -> RAVEFileArray
temporarywhether this file array is to be upon garbage collection;
default is false. The file array will be deleted if the
temporary flag is set to true and the array mode is 'readwrite'
validwhether the array is valid and ready to read
@implthe underlying array object
RAVEFileArray$@marshal()Serialization helper, convert the object to a descriptive list
RAVEFileArray$@marshal(...)
...ignored
RAVEFileArray$@unmarshal()Serialization helper, convert the object from a descriptive list
RAVEFileArray$@unmarshal(object, ...)
objectserialized list
...ignored
RAVEFileArray$new()Constructor
RAVEFileArray$new(x, temporary = FALSE)
xfile array or can be converted to as_filearray
temporarywhether this file array is to be deleted once the object is out-of-scope; default is false
RAVEFileArray$clone()The objects of this class are cloneable with this method.
RAVEFileArray$clone(deep = FALSE)
deepWhether to make a deep clone.
RAVESerializable rave-serialize-refhook
Persist settings on local configuration file
raveio_setopt(key, value, .save = TRUE) raveio_resetopt(all = FALSE) raveio_getopt(key, default = NA, temp = TRUE) raveio_confpath(cfile = "settings.yaml")raveio_setopt(key, value, .save = TRUE) raveio_resetopt(all = FALSE) raveio_getopt(key, default = NA, temp = TRUE) raveio_confpath(cfile = "settings.yaml")
key |
character, option name |
value |
character or logical of length 1, option value |
.save |
whether to save to local drive, internally used to temporary change option. Not recommended to use it directly. |
all |
whether to reset all non-default keys |
default |
is key not found, return default value |
temp |
when saving, whether the key-value pair should be considered
temporary, a temporary settings will be ignored when saving; when getting
options, setting |
cfile |
file name in configuration path |
raveio_setopt stores key-value pair in local path.
The values are persistent and shared across multiple sessions.
There are some read-only keys such as "session_string". Trying to
set those keys will result in error.
The following keys are reserved by 'RAVE':
data_dirDirectory path, where processed data are stored;
default is at home directory, folder ~/rave_data/data_dir
raw_data_dirDirectory path, where raw data files are stored,
mainly the original signal files and imaging files;
default is at home directory, folder ~/rave_data/raw_dir
max_workerMaximum number of CPU cores to use; default is one less than the total number of CPU cores
mni_template_rootDirectory path, where 'MNI' templates are stored
raveio_getopt returns value corresponding to the keys. If key is
missing, the whole option will be returned.
If set all=TRUE, raveio_resetopt resets all keys including
non-standard ones. However "session_string" will never reset.
raveio_setopt returns modified value;
raveio_resetopt returns current settings as a list;
raveio_confpath returns absolute path for the settings file;
raveio_getopt returns the settings value to the given key, or
default if not found.
The following options will alter other packages and might cause changes in behaviors:
'disable_fork_clusters'This option will change the
options 'dipsaus.no.fork' and
'dipsaus.cluster.backup', which handles the parallel computing
'threeBrain_template_subject'This option will set and persist
option 'threeBrain.template_subject', which changes the default
group-level template brain.
R_user_dir
# get one RAVE option ncore <- raveio_getopt("max_worker") print(ncore) # get all options raveio_getopt() # set option raveio_setopt("disable_fork_clusters", FALSE)# get one RAVE option ncore <- raveio_getopt("max_worker") print(ncore) # get all options raveio_getopt() # set option raveio_setopt("disable_fork_clusters", FALSE)
The official built-in pipeline repository is located at https://github.com/rave-ieeg/rave-pipelines; The code snippet repository is located at https://github.com/rave-ieeg/rave-gists.
ravepipeline_finalize_installation( upgrade = c("ask", "always", "never", "config-only", "data-only"), async = FALSE, ... )ravepipeline_finalize_installation( upgrade = c("ask", "always", "never", "config-only", "data-only"), async = FALSE, ... )
upgrade |
rules to upgrade dependencies; default is to ask if needed |
async |
whether to run in the background; ignore for now |
... |
ignored; reserved for external calls. |
A list built-in pipelines will be installed, the function itself returns nothing.
## Not run: # This function requires connection to the Github, and must run # under interactive session since an user prompt will be displayed ravepipeline_finalize_installation() ## End(Not run)## Not run: # This function requires connection to the Github, and must run # under interactive session since an user prompt will be displayed ravepipeline_finalize_installation() ## End(Not run)
Regular expression PIPELINE_FORK_PATTERN defines the file matching
rules when forking a pipeline; see pipeline_fork for details.
PIPELINE_FORK_PATTERNPIPELINE_FORK_PATTERN
For package inheritance only; do not instantiate the class directly.
RAVESerializable$new()Abstract constructor
RAVESerializable$new(...)
...ignored
RAVESerializable$@marshal()Create an atomic list that can be serialized
RAVESerializable$@marshal(...)
...ignored
RAVESerializable$@unmarshal()Restore an object from an atomic list
RAVESerializable$@unmarshal(object, ...)
objecta list from '@marshal'
...ignored
RAVESerializable$@compare()How two object can be compared to each other
RAVESerializable$@compare(other)
otheranother object to compare with self
RAVESerializable$clone()The objects of this class are cloneable with this method.
RAVESerializable$clone(deep = FALSE)
deepWhether to make a deep clone.
RAVEFileArray rave-serialize-refhook
'YAML' formatsupports reading data into a map object, and write the map to files with names sorted for consistency
load_yaml(file, ..., map = NULL) save_yaml(x, file, ..., sorted = FALSE)load_yaml(file, ..., map = NULL) save_yaml(x, file, ..., sorted = FALSE)
file |
file to read from or write to |
... |
passed to |
map |
a |
x |
list or map to write |
sorted |
whether to sort the list by name; default is false |
A map object
tfile <- tempfile(fileext = ".yml") save_yaml(list(b = 2, a = 1), tfile, sorted = TRUE) cat(readLines(tfile), sep = "\n") load_yaml(tfile) unlink(tfile)tfile <- tempfile(fileext = ".yml") save_yaml(list(b = 2, a = 1), tfile, sorted = TRUE) cat(readLines(tfile), sep = "\n") load_yaml(tfile) unlink(tfile)
Experimental parallel functions, intended for internal use now. The goal is to allow 'RAVE' functions to gain the potential benefit from parallel computing, but allow users to control whether to do it.
with_rave_parallel(expr, .workers = 0) lapply_jobs( x, fun, ..., .globals = list(), .workers = 0, .always = FALSE, callback = NULL )with_rave_parallel(expr, .workers = 0) lapply_jobs( x, fun, ..., .globals = list(), .workers = 0, .always = FALSE, callback = NULL )
expr |
expression to evaluate with parallel workers |
.workers |
number of workers: note the actual numbers may differ, depending on the options and number of input elements |
x |
a list, vector, array of R objects |
fun |
function to apply to each element of |
... |
additional arguments to be passed to |
.globals |
global variables to be serialized |
.always |
whether always use workers, only considered when number of workers is one; default is false, then run jobs in the main process when only one worker is required |
callback |
callback function, input is each element of |
workers |
number of workers |
By default, lapply_jobs is almost identical to lapply.
It only runs in parallel when running inside of with_rave_parallel.
The hard max-limit number of workers are determined by the 'RAVE' option
raveio_getopt('max_worker'). Users can lower this number for
memory-intensive tasks manually, via argument .workers.
The actual number of workers might be less than the requested ones, this
is often a result of sort input x. If the number of input iterations
has fewer than the max worker size, then the number of workers automatically
shrinks to the length of input list. All workers will be a child process
running separate from the main session, except for when only one worker
is needed and .always=FALSE: the only task will be executed in the
main session.
Each worker session will run a completely isolated new process. There is
a ramp-up serialization that is needed for global objects (objects that
are defined elsewhere or outside of the function). Please make sure
the global objects are specified explicitly in .globals, a named list.
Unlike future package, users must specify the global objects.
The global objects might be large to serialize. Please optimize the code
to avoid serializing big objects, especially environments or functions.
All objects inheriting RAVESerializable class with
@marshal and @unmarshal methods implemented correctly will
be serialized with reference hook rave_serialize_refhook, making
them extremely efficient.
# Run without `with_rave_parallel` res <- lapply_jobs(1:5, function(x, ...) { c(child = Sys.getpid(), ...) }, main = Sys.getpid()) simplify2array(res) # Comparison f <- function(n = 5, workers = 0) { system.time({ ravepipeline::lapply_jobs(seq_len(n), function(x, ...) { Sys.sleep(1) c(child = Sys.getpid(), ...) }, main = Sys.getpid(), .workers = workers, callback = I) }) } ## Not run: # Without parallel f() #> user system elapsed #> 0.022 0.019 5.010 # with parallel with_rave_parallel({ f() }) #> user system elapsed #> 0.729 0.190 1.460 ## End(Not run)# Run without `with_rave_parallel` res <- lapply_jobs(1:5, function(x, ...) { c(child = Sys.getpid(), ...) }, main = Sys.getpid()) simplify2array(res) # Comparison f <- function(n = 5, workers = 0) { system.time({ ravepipeline::lapply_jobs(seq_len(n), function(x, ...) { Sys.sleep(1) c(child = Sys.getpid(), ...) }, main = Sys.getpid(), .workers = workers, callback = I) }) } ## Not run: # Without parallel f() #> user system elapsed #> 0.022 0.019 5.010 # with parallel with_rave_parallel({ f() }) #> user system elapsed #> 0.729 0.190 1.460 ## End(Not run)