Skip to content
  • There are no suggestions because the search field is empty.

How-To: Run Pipeline based on a Cron Interval

What Does This Article Cover?

  • What is Cron?
  • How to set it up?
  • Example
  • Other Related Material

What is Cron?

Cron is a time-based job scheduling syntax that automates repetitive tasks by scheduling them at a fixed interval.  It originated on Unix operating systems but can be used on any OS.  It is a powerful tool to automate to run jobs at a specific times or intervals without manual intervention.

How to set it up?

The setting of the Cron interval uses a third party npm library "cron-parser".  Downloading and installing Node.js and npm is covered here.  Importing an npm library is covered here.

  • Enable expression-imports on the Settings page.
  • This will create a /appData/expression-imports folder
  • Run the following command to install the npm package from the /expression-imports folder, or copy and paste the library files into the directory:
npm install cron-parser 

Example

The example will use a Transform stage to set a flag in the event.value that will then be read by a Switch stage to either run or not run the remaining logic in the pipeline. 

The expression below is use in the CheckCronExpression Transform stage.   It is used to read a cron expression that is stored in the pipeline state and determine if the pipeline should be run the current poll interval.   To work correctly, the poll interval on the pipeline should be more frequent than the cron interval.  

// get next interval based on cron expression
function nextInterval (inputString)  {
    let parser = require('cron-parser');
    let interval = parser.parseExpression(inputString);
    return interval.next(); 
}

// setup
const currentDateTime = new Date();
const twoMinutesAgo = new Date(currentDateTime.getTime() - 1 * 60 * 1000);
let lastPoll = new Date(state.pipeline.get("lastPoll", twoMinutesAgo))
event.value.RunJob = false;

// cron Expression
let cronExpression = state.pipeline.get("cronExpression", "0 0/2 * * * ?");
state.pipeline.set("cronExpression", cronExpression);
let nextCronInterval = new Date(nextInterval (cronExpression)); 

// check within cron interval
let lastCronInterval = new Date(state.pipeline.get("lastCronInterval", twoMinutesAgo))
if (nextCronInterval > lastCronInterval )
{
    let nextPoll = new Date(currentDateTime.getTime() + (currentDateTime.getTime() - lastPoll.getTime()));
    if (nextCronInterval <= nextPoll)
    {
        event.value.RunJob = true;
        state.pipeline.set("lastCronInterval", nextCronInterval);
    }
}
state.pipeline.set("lastPoll", currentDateTime);
stage.setValue(event.value);

In this example, the pipeline poll happens once a minute while the cron expression happen every 2 minutes.  With this setup, the switch stage will be set to True when the current minute is odd and will be set to False if the current minute is True.  The next interval when the cron expression is "0 0/2 * * * ?" is the next even minute.   Then, based on the if statement, it will set RunJob = true only when it the next interval is less than a minute into the future.   See the cron-parser package and cron expression in the Other Related Material section.

Here is the configuration for the pipeline, MQTT connector and KB tag.

{
    "productInfo": {
        "company": "HighByte",
        "product": "IntelligenceHub",
        "version": "4.0.0",
        "build": "2024.10.7.595",
        "stage": "Release"
    },
    "project": {
        "version": 9,
        "connections": [
            {
                "name": "MQTT",
                "uri": "mqtt://localhost:1885",
                "tags": [
                    "KB"
                ],
                "writes": {
                    "flattenModeledValues": false
                },
                "subscriptions": {},
                "storeForward": {
                    "enabled": false,
                    "maxEntries": 100,
                    "waitOnFailureInterval": {
                        "duration": 1,
                        "units": "Seconds"
                    }
                },
                "settings": {
                    "connectionTimeoutSeconds": 10,
                    "keepAliveSeconds": 60,
                    "requestTimeoutMS": 5000,
                    "cleanSession": true,
                    "ssl": false,
                    "redundantBrokers": [],
                    "inputDiscovery": ""
                }
            }
        ],
        "inputs": [],
        "outputs": [],
        "modeling": {
            "models": [],
            "instances": []
        },
        "conditions": [],
        "functions": [],
        "tags": [
            {
                "name": "KB"
            }
        ],
        "pipelines": [
            {
                "name": "CronInterval",
                "uri": "pipeline",
                "tags": [
                    "KB"
                ],
                "settings": {
                    "inputStages": [
                        "CheckCronExpression"
                    ],
                    "trackActivity": false,
                    "triggers": [
                        {
                            "name": "PolledTrigger",
                            "config": {
                                "type": ".TriggerPolled",
                                "enabled": true,
                                "interval": {
                                    "duration": 1,
                                    "units": "Minutes"
                                }
                            },
                            "display": {
                                "position": {
                                    "x": -451.60875759425267,
                                    "y": 0
                                }
                            }
                        }
                    ],
                    "stages": [
                        {
                            "name": "MQTT",
                            "outputs": [],
                            "config": {
                                "type": ".DynamicWriteConfig",
                                "failureOutputs": [],
                                "connectionReference": "{{Connection.MQTT}}",
                                "qualifier": {
                                    "topic": "CronInterval",
                                    "qos": 0,
                                    "namedRoot": false,
                                    "retained": true,
                                    "breakupArrays": false,
                                    "filterList": [
                                        "_name",
                                        "_model",
                                        "_timestamp"
                                    ]
                                },
                                "qualifierExpression": "",
                                "ignoreResult": false,
                                "connectionType": "mqtt"
                            },
                            "display": {
                                "position": {
                                    "x": 1140,
                                    "y": -3.217515188505331
                                }
                            }
                        },
                        {
                            "name": "CheckCronExpression",
                            "outputs": [
                                "Switch"
                            ],
                            "config": {
                                "type": ".TransformConfig",
                                "transformExpression": "// get next interval based on cron expression\r\nfunction nextInterval (inputString)  {\r\n    let parser = require('cron-parser');\r\n    let interval = parser.parseExpression(inputString);\r\n    return interval.next(); \r\n}\r\n\r\n// setup\r\nconst currentDateTime = new Date();\r\nconst twoMinutesAgo = new Date(currentDateTime.getTime() - 1 * 60 * 1000);\r\nlet lastPoll = new Date(state.pipeline.get(\"lastPoll\", twoMinutesAgo))\r\nevent.value.RunJob = false;\r\n\r\n// cron Expression\r\nlet cronExpression = state.pipeline.get(\"cronExpression\", \"0 0/2 * * * ?\");\r\nstate.pipeline.set(\"cronExpression\", cronExpression);\r\nlet nextCronInterval = new Date(nextInterval (cronExpression)); \r\n\r\n// check within cron interval\r\nlet lastCronInterval = new Date(state.pipeline.get(\"lastCronInterval\", twoMinutesAgo))\r\nif (nextCronInterval > lastCronInterval )\r\n{\r\n    let nextPoll = new Date(currentDateTime.getTime() + (currentDateTime.getTime() - lastPoll.getTime()));\r\n    if (nextCronInterval <= nextPoll)\r\n    {\r\n        event.value.RunJob = true;\r\n        state.pipeline.set(\"lastCronInterval\", nextCronInterval);\r\n    }\r\n}\r\nstate.pipeline.set(\"lastPoll\", currentDateTime);\r\nstage.setValue(event.value);"
                            },
                            "display": {
                                "position": {
                                    "x": 240,
                                    "y": 0
                                }
                            }
                        },
                        {
                            "name": "Switch",
                            "outputs": [],
                            "config": {
                                "type": ".SwitchConfig",
                                "evaluationType": "onFirst",
                                "switchCases": [
                                    {
                                        "expression": "return Boolean(event.value.RunJob)",
                                        "outputs": [
                                            "MQTT"
                                        ],
                                        "description": "Value Is True"
                                    },
                                    {
                                        "expression": "return true",
                                        "outputs": [],
                                        "description": "Default"
                                    }
                                ]
                            },
                            "display": {
                                "position": {
                                    "x": 690,
                                    "y": -1.6087575942526655
                                }
                            }
                        }
                    ]
                }
            }
        ],
        "namespace": []
    },
    "network": {
        "groups": [],
        "hubs": []
    }
}

Feel free to reach out to us if you have any issues or have any questions.

Other Related Material: