Controlling Pipeline Flow
Since Version: 4.0
In its basic form, a Pipeline is executed as a linear flow of Commands whereas each Command is executed one after another, from start to its end.
Sometimes, it is necessary to change this linear flow dynamically, depending on given conditions. PIPEFORCE offers different toolings to controll the flow inside a pipeline dynamically. Most of these toolings are also implemented as commands and therefore can be used like any other commands.
Trigger
A trigger is an external action which causes a pipeline to be executed. Depending on the input data of a trigger, the execution flow of a pipeline could vary.
These triggers are the common ones in PIPEFORCE:
Job
Triggers a pipeline as a job which must be executed after a certain amount of time. For more details see Jobs.
Message
Triggers a pipeline in case a message of interest is on the message queue / bus. For more details see Message.
Event
Triggers a pipeline in case an internal event of interest has occured. Events are quite similar to messages, except that their origin is always the hub backend. For more details see Events. Common event examples are:
property.created
= A new property has been created in the Property Store.property.deleted
= A new property has been deleted from the Property Store.iam.bruteforce.detected
= A potential brute force attack has been detected.- See Events Reference for more events.
Webhook
Triggers a pipeline in case an external system sends a request to one of the custom webhook endpoints. For more details see Webhhooks.
If, Else
In some situations it is handy to disable the execution of a command depending on a given condition.
if
-Parameter
One way of skipping a command execution, is by using the common parameter if
. It is available on any command. Setting this parameter to a value of false
on a command, will skip the execution of this command. By default this parameter is set to true
. Also see Common Parameters.
Example:
vars:
logging: "debug"
pipeline:
- drive.read:
path: "my.doc"
- log:
message: "Document loaded from my.doc."
if: "#{vars.logging == 'debug'}"
- drive.save:
path: "folder/my.doc"
- log:
message: "Document stored to folder/my.doc."
if: "#{vars.logging == 'debug'}"
if
-Command
In case a block of commands must be skipped, you can use the commands if
, if.else
and if.end
.
Example:
pipeline:
- if:
true: "#{1 < 2}"
- log:
message: "1 is smaller than 2"
- if.else:
- log:
message: "This should never happen..."
- if.end:
Also nesting is supported. For example:
vars:
name: "Sabrina"
age: 24
pipeline:
- if:
true: "#{vars.name != ''}"
- if:
true: "#{vars.age > 21}"
- log:
message: "#{vars.name} may have a drink..."
- if.else:
- log:
message: "#{vars.name} is too young to have a drink..."
- if.end:
- if.end:
Foreach / Iterator
Looping over a set of data is also called "iterating" over this set of data. Or in other words, "for each" item from the data set, do some action.
Iterating over a set of data (for example a list, array, map or other type of collections), you can use the command foreach
. With this approach you can also implement the Splitter Pattern from the enterprise integration pattern.
For each entry in the data set, the foreach
command will execute all subsequent commands until a foreach.end
has been found.
For example:
vars:
people: ["Sam", "Sarah", "Carol"]
pipeline:
- foreach:
in: "#{vars.people}"
as: "person"
- log:
message: "Hello #{vars.person}"
- foreach.end:
Also nesting of foreach
is possible:
vars:
people: ["Sam", "Carol"]
activities: "biking, swimming, hiking"
pipeline:
- foreach:
in: "#{vars.people}"
as: person
- foreach:
in: "#{vars.activities}"
as: activity
- eval:
expr: "#{@list.add(body, vars.person + ' could do: ' + vars.activity)}"
- foreach.end:
- foreach.end:
body: []
This would produce an output like this:
[
"Sam could do: biking",
"Sam could do: swimming",
"Sam could do: hiking",
"Carol could do: biking",
"Carol could do: swimming",
"Carol could do: hiking"
]
You can simplify this by using the eval
parameter instead of the eval
command:
vars:
people: ["Sam", "Carol"]
activities: "biking, swimming, hiking"
pipeline:
- foreach:
in: "#{vars.people}"
as: person
- foreach:
in: "#{vars.activities}"
as: activity
eval: "#{@list.add(body, vars.person + ' could do: ' + vars.activity)}"
- foreach.end:
- foreach.end:
body: []
You can also combine the foreach
with the if
command:
vars:
people: [{"name":"Sam", "age": 15}, {"name": "Carol", "age": 35}]
activities: "biking, clubbing"
pipeline:
- foreach:
in: "#{vars.people}"
as: person
- foreach:
in: "#{vars.activities}"
as: activity
- if:
true: "#{vars.person.age < 18 and vars.activity == 'clubbing'}"
- set.var:
key: "allowed"
value: "NOT "
- if.else:
- set.var:
key: "allowed"
value: ""
- if.end:
eval: "#{@list.add(body, vars.person.name + ' could ' + vars.allowed + 'do: ' + vars.activity)}"
- foreach.end:
- foreach.end:
body: []
This would result in an output like this:
[
"Sam could do: biking",
"Sam could NOT do: clubbing",
"Carol could do: biking",
"Carol could do: clubbing"
]
Exit
Based on a condition, you can exit the pipeline execution using the exit
command.
In case there is a finally
command in the pipeline, this will be considered before exiting. See Finally.
Example:
pipeline:
- exit:
if: "#{2 > 1}"
Retry
In case an error occured in a command you can automatically let it retry for a certain amount of time before giving up and exiting the pipeline flow.
For more details see Error Handling.
Rollback
In case an error occured in a command you can automatically call a rollback action.
For more details see Error Handling.
Sub-Pipeline
In case you would like to delegate control to another persisted pipeline, you can use the command pipeline.start
.
For example let's assume you have a persisted pipeline stored under key global/app/myapp/pipeline/concat
which loads a user from IAM and concats his name and email address like this:
vars:
userUuid: null
pipeline:
- iam.user.get:
uuid: "#{vars.userUuid}"
- body:
value: "#{body.firstName} #{body.firstName} (#{body.email})"
This is the sub-pipeline. The result of the sub-pipeline will be stored in the body.
Now, let's have an example of a pipeline which calls this sub-pipeline and uses its result:
pipeline:
- pipeline.start:
key: "global/app/myapp/pipeline/concat"
vars: {"userUuid": "18a887b4-194e-4aac-82a5-ff7b33710594"}
This pipeline will call the sub-pipeline global/app/myapp/pipeline/concat
with parameter userUuid
and places the result to the body by default. So the output will be something like this:
Sam Smith (s.smith@company.com)
Error
You can control what should happen, if a command produces and error. Depding on your configuration, the pipeline flow will change. For example, an error could exit the pipeline flow or trigger some other commands.
For more details see section Error Handling.
Finally
The command finally
can be used in a pipeline in order to make sure a set of commands is executed in any case at the very end of a pipeline. Even if an error has been occured or the pipeline execution has been cancelled by an exit
command. This approach is useful for example in case you need to cleanup data or would like to get informed about the pipeline execution result in any case.
For more details see Error Handling
Wait
Sometimes it is necessary to pause the execution flow of a pipeline for a certain amount of time.
You can do so using the command wait
.
Example:
pipeline:
- log:
message: "Lets wait 2 seconds..."
- wait:
ms: 2000
- log:
message: "2 seconds later."
Assert
In case you would like to make sure, a condition in the pipeline is true, you can use the assert
command to check that. In case the given condition is wrong, the pipeline execution will end and an error will the thrown. This is especially useful in writing tests.
This example will end the pipeline execution since it expectes the condition to be true
, but it is wrong:
pipeline:
- assert:
true: "#{1 > 2}"
message: "1 is not greater than 2!"