Generally, one should keep pipeline steps as simple as possible, basically following the principle “one step, one task”. This means that usually a lot of pipeline steps are used to calculate intermediate results and only a few steps contain the final results that we are interested in. This vignette shows how to conveniently collect and possibly group the output of those final steps.
Output steps are flagged by settting the keepOut
argument to TRUE
when adding a step to the pipeline. In the
following example, we will want to keep the output of the steps
data_summary
, model_summary
, and
model_plot
.
library(pipeflow)
library(ggplot2)
pip <- pipe_new(
"my-pipeline",
data = airquality
) |>
pipe_add(
"data_prep",
function(data = ~data) {
replace(data, "Temp.Celsius", (data[, "Temp"] - 32) * 5/9)
}
) |>
pipe_add(
"data_summary",
function(
data = ~data_prep,
xVar = "Temp.Celsius",
yVar = "Ozone"
) {
data[, c(xVar, yVar)]
},
keepOut = TRUE # <- keep this
) |>
pipe_add(
"model_fit",
function(
data = ~data_prep,
xVar = "Temp.Celsius",
yVar = "Ozone"
) {
lm(paste(yVar, "~", xVar), data = data)
}
) |>
pipe_add(
"model_summary",
function(
fit = ~model_fit
) {
summary(fit)
},
keepOut = TRUE # <- keep this
) |>
pipe_add(
"model_plot",
function(
model = ~model_fit,
data = ~data_prep,
xVar = "Temp.Celsius",
yVar = "Ozone",
title = "Linear model fit"
) {
coeffs <- coefficients(model)
ggplot(data) +
geom_point(aes(.data[[xVar]], .data[["Ozone"]])) +
geom_abline(intercept = coeffs[1], slope = coeffs[2]) +
labs(title = title)
},
keepOut = TRUE # <- keep this
)
Looking at the pipeline, we see that the steps
data_summary
, model-summary
, and
model_plot
have been flagged accordingly (see column
keepOut
).
pip
# step depends out keepOut group state
# <char> <list> <list> <lgcl> <char> <char>
# 1: data [NULL] FALSE data New
# 2: data_prep data [NULL] FALSE data_prep New
# 3: data_summary data_prep [NULL] TRUE data_summary New
# 4: model_fit data_prep [NULL] FALSE model_fit New
# 5: model_summary model_fit [NULL] TRUE model_summary New
# 6: model_plot model_fit,data_prep [NULL] TRUE model_plot New
Graphically, steps flagged with keepOut = TRUE
are
displayed with a circle shape while “normal” steps are shown as
rectangle boxes.
Now let’s run and collect the output of the flagged steps using the
collect_out
method, which returns a list with the output of
the flagged steps.
pip$run()
# INFO [2024-12-04 18:01:42.274] Start run of 'my-pipeline' pipeline:
# INFO [2024-12-04 18:01:42.275] Step 1/6 data
# INFO [2024-12-04 18:01:42.278] Step 2/6 data_prep
# INFO [2024-12-04 18:01:42.281] Step 3/6 data_summary
# INFO [2024-12-04 18:01:42.282] Step 4/6 model_fit
# INFO [2024-12-04 18:01:42.285] Step 5/6 model_summary
# INFO [2024-12-04 18:01:42.288] Step 6/6 model_plot
# INFO [2024-12-04 18:01:42.293] Finished execution of steps.
# INFO [2024-12-04 18:01:42.293] Done.
out <- pip$collect_out()
names(out)
# [1] "data_summary" "model_summary" "model_plot"
As expected, the output list contains the output of the flagged steps.
Often certain output steps are related and should be grouped
together. This can be achieved conveniently by setting the
group
argument when adding a step to the pipeline. Let’s
illustrate this by slightly modifying the previous example.
pip <- Pipeline$new("my-pipeline", data = airquality) |>
pipe_add(
"data_prep",
function(data = ~data) {
replace(data, "Temp.Celsius", (data[, "Temp"] - 32) * 5/9)
}
) |>
pipe_add(
"used_data",
function(
data = ~data_prep,
xVar = "Temp.Celsius",
yVar = "Ozone"
) {
data[, c(xVar, yVar)]
},
keepOut = TRUE,
group = "Data" # <- define 'Data' group here
) |>
pipe_add(
"model_fit",
function(
data = ~data_prep,
xVar = "Temp.Celsius",
yVar = "Ozone"
) {
lm(paste(yVar, "~", xVar), data = data)
}
) |>
pipe_add(
"model_summary",
function(
fit = ~model_fit
) {
summary(fit)
},
keepOut = TRUE,
group = "Model" # <- define 'Model' group here
) |>
pipe_add(
"model_plot",
function(
model = ~model_fit,
data = ~data_prep,
xVar = "Temp.Celsius",
yVar = "Ozone",
title = "Linear model fit"
) {
coeffs <- coefficients(model)
ggplot(data) +
geom_point(aes(.data[[xVar]], .data[["Ozone"]])) +
geom_abline(intercept = coeffs[1], slope = coeffs[2]) +
labs(title = title)
},
keepOut = TRUE,
group = "Model" # <- define 'Model' group here
)
Looking at the pipeline, the defined groups are shown in the
group
column.
pip
# step depends out keepOut group state
# <char> <list> <list> <lgcl> <char> <char>
# 1: data [NULL] FALSE data New
# 2: data_prep data [NULL] FALSE data_prep New
# 3: used_data data_prep [NULL] TRUE Data New
# 4: model_fit data_prep [NULL] FALSE model_fit New
# 5: model_summary model_fit [NULL] TRUE Model New
# 6: model_plot model_fit,data_prep [NULL] TRUE Model New
As you see, by default, the group is identical to the step name, that is, each step represents the trivial case of a one-sized group. Again, we run the pipeline and collect the output.
pip$run()
# INFO [2024-12-04 18:01:42.435] Start run of 'my-pipeline' pipeline:
# INFO [2024-12-04 18:01:42.436] Step 1/6 data
# INFO [2024-12-04 18:01:42.439] Step 2/6 data_prep
# INFO [2024-12-04 18:01:42.442] Step 3/6 used_data
# INFO [2024-12-04 18:01:42.443] Step 4/6 model_fit
# INFO [2024-12-04 18:01:42.446] Step 5/6 model_summary
# INFO [2024-12-04 18:01:42.448] Step 6/6 model_plot
# INFO [2024-12-04 18:01:42.453] Finished execution of steps.
# INFO [2024-12-04 18:01:42.454] Done.
out <- pip$collect_out()
names(out)
# [1] "Data" "Model"
As we can see, the output related to the modelling has been grouped
into one sublist named Model
.
str(out, max.level = 2)
# List of 2
# $ Data :'data.frame': 153 obs. of 2 variables:
# ..$ Temp.Celsius: num [1:153] 19.4 22.2 23.3 16.7 13.3 ...
# ..$ Ozone : int [1:153] 41 36 12 18 NA 28 23 19 8 NA ...
# $ Model:List of 2
# ..$ model_summary:List of 12
# .. ..- attr(*, "class")= chr "summary.lm"
# ..$ model_plot :List of 11
# .. ..- attr(*, "class")= chr [1:2] "gg" "ggplot"