Aggregating Data using a Database
Methods for recording transactional data into a database. Then based on fixed intervals, the data can be queried using aggregate database functions.
What Does This Article Cover:
The article will show how data can be inserted into a database. Once the data is insert, then aggregate functions such as AVG (average) can be used to query the data on fixed intervals and outputted to a data source (e.g. MQTT).
Here is the configuration used in this article:
{
"productInfo" : {
"company" : "HighByte",
"product" : "IntelligenceHub",
"version" : "4.0.1",
"build" : "2024.11.5.4",
"stage" : "Release"
},
"project" : {
"version" : 9,
"connections" : [ {
"name" : "MQTT_KB",
"uri" : "mqtt://localhost:1885",
"tags" : [ "ProductionTransactionExample" ],
"writes" : {
"flattenModeledValues" : false
},
"subscriptions" : { },
"storeForward" : {
"enabled" : false,
"maxEntries" : 100,
"waitOnFailureInterval" : {
"duration" : 1,
"units" : "Seconds"
}
},
"settings" : {
"connectionTimeoutSeconds" : 10,
"keepAliveSeconds" : 60,
"requestTimeoutMS" : 5000,
"cleanSession" : true
}
}, {
"name" : "MSSQL_KB",
"uri" : "jdbc.sqlserver://52.10.6.4:1433",
"tags" : [ "ProductionTransactionExample" ],
"writes" : {
"flattenModeledValues" : false
},
"subscriptions" : { },
"storeForward" : {
"enabled" : false,
"maxEntries" : 100,
"waitOnFailureInterval" : {
"duration" : 1,
"units" : "Seconds"
}
},
"settings" : {
"database" : " KnowledgeBase",
"username" : "highbyte",
"password" : {
"type" : "Encrypted",
"value" : {
"keyId" : "Cf/sVXT5wrSLSprn61yJRA==",
"iv" : "L/tf6nK+xvJZ+lEwXOmgmg==",
"ciphertext" : "V6J7oYItsyfEB0r8q3nIjA=="
}
}
}
} ],
"inputs" : [ {
"name" : "Locations",
"connection" : "MSSQL_KB",
"type" : "jdbc.sqlserver",
"qualifier" : {
"type" : "query",
"query" : "select top (1) * from Locations where Id > and (AltPathNumber = or AltPathNumber = 0) order by Id",
"index" : {
"enabled" : false,
"indexName" : "",
"indexValue" : ""
}
},
"cacheLifetime" : {
"enabled" : false
},
"template" : {
"type" : "Static",
"params" : [ {
"name" : "Id",
"pattern" : "300"
}, {
"name" : "AltPathNumber",
"pattern" : "2"
} ]
}
}, {
"name" : "ProductionTransactionExample",
"connection" : "MSSQL_KB",
"type" : "jdbc.sqlserver",
"qualifier" : {
"type" : "query",
"query" : "select * from ProductionTransactionExample",
"index" : {
"enabled" : false,
"indexName" : "",
"indexValue" : ""
}
},
"cacheLifetime" : {
"enabled" : false
},
"template" : {
"type" : "Off"
}
}, {
"name" : "ProductionTransactionExample_AverageByMaterial",
"connection" : "MSSQL_KB",
"type" : "jdbc.sqlserver",
"qualifier" : {
"type" : "query",
"query" : " SELECT Avg(ProducedQuantity) as average, MaterialNumber\n FROM [ProductionTransactionExample]\n group by MaterialNumber",
"index" : {
"enabled" : false,
"indexName" : "",
"indexValue" : ""
}
},
"cacheLifetime" : {
"enabled" : false
},
"template" : {
"type" : "Off"
}
}, {
"name" : "ProductionTransactionExample_AverageByMaterialByHour",
"connection" : "MSSQL_KB",
"type" : "jdbc.sqlserver",
"qualifier" : {
"type" : "query",
"query" : " SELECT Avg(ProducedQuantity) as average, MaterialNumber, DATEPART(HOUR, ProductionDate) as hour\n FROM [ProductionTransactionExample]\n group by MaterialNumber, DATEPART(HOUR, ProductionDate)",
"index" : {
"enabled" : false,
"indexName" : "",
"indexValue" : ""
}
},
"cacheLifetime" : {
"enabled" : false
},
"template" : {
"type" : "Off"
}
}, {
"name" : "ProductionTransactionExample_AverageByMaterialByShift",
"connection" : "MSSQL_KB",
"type" : "jdbc.sqlserver",
"qualifier" : {
"type" : "query",
"query" : " SELECT Avg(ProducedQuantity) as average, MaterialNumber, case when DATEPART(HOUR, ProductionDate) < 8 then 'Shift1' when DATEPART(HOUR, ProductionDate) in (8, 9, 10, 11, 12, 13, 14, 15) then 'Shift2' else 'Shift3' end as shift\n FROM [ProductionTransactionExample]\n group by MaterialNumber, case when DATEPART(HOUR, ProductionDate) < 8 then 'Shift1' when DATEPART(HOUR, ProductionDate) in (8, 9, 10, 11, 12, 13, 14, 15) then 'Shift2' else 'Shift3' end \n\n ",
"index" : {
"enabled" : false,
"indexName" : "",
"indexValue" : ""
}
},
"cacheLifetime" : {
"enabled" : false
},
"template" : {
"type" : "Off"
}
}, {
"name" : "ProductionTransactionExample_AverageByStation",
"connection" : "MSSQL_KB",
"type" : "jdbc.sqlserver",
"qualifier" : {
"type" : "query",
"query" : "SELECT Avg(ProducedQuantity) as average, station\n FROM [ProductionTransactionExample]\n group by station",
"index" : {
"enabled" : false,
"indexName" : "",
"indexValue" : ""
}
},
"cacheLifetime" : {
"enabled" : false
},
"template" : {
"type" : "Off"
}
}, {
"name" : "ProductionTransactionExample_AverageByStationByHour",
"connection" : "MSSQL_KB",
"type" : "jdbc.sqlserver",
"qualifier" : {
"type" : "query",
"query" : " SELECT Avg(ProducedQuantity) as average, station, DATEPART(HOUR, ProductionDate) as hour\n FROM [ProductionTransactionExample]\n group by station, DATEPART(HOUR, ProductionDate)",
"index" : {
"enabled" : false,
"indexName" : "",
"indexValue" : ""
}
},
"cacheLifetime" : {
"enabled" : false
},
"template" : {
"type" : "Off"
}
}, {
"name" : "ProductionTransactionExample_AverageByStationByMaterialFilterDate",
"connection" : "MSSQL_KB",
"type" : "jdbc.sqlserver",
"qualifier" : {
"type" : "query",
"query" : " SELECT Avg(ProducedQuantity) as average, MaterialNumber, case when DATEPART(HOUR, ProductionDate) < 8 then 'Shift1' when DATEPART(HOUR, ProductionDate) in (8, 9, 10, 11, 12, 13, 14, 15) then 'Shift2' else 'Shift3' end as Shift\n FROM [ProductionTransactionExample]\n where ProductionDate >= '2025-01-02' and\n ProductionDate < '2025-01-03'\n group by MaterialNumber, case when DATEPART(HOUR, ProductionDate) < 8 then 'Shift1' when DATEPART(HOUR, ProductionDate) in (8, 9, 10, 11, 12, 13, 14, 15) then 'Shift2' else 'Shift3' end \n \n ",
"index" : {
"enabled" : false,
"indexName" : "",
"indexValue" : ""
}
},
"cacheLifetime" : {
"enabled" : false
},
"template" : {
"type" : "Off"
}
}, {
"name" : "ProductionTransactionExample_AverageByStationByShift",
"connection" : "MSSQL_KB",
"type" : "jdbc.sqlserver",
"qualifier" : {
"type" : "query",
"query" : " SELECT Avg(ProducedQuantity) as average, station, case when DATEPART(HOUR, ProductionDate) < 8 then 'Shift1' when DATEPART(HOUR, ProductionDate) in (8, 9, 10, 11, 12, 13, 14, 15) then 'Shift2' else 'Shift3' end as shift\n FROM [ProductionTransactionExample]\n group by station, case when DATEPART(HOUR, ProductionDate) < 8 then 'Shift1' when DATEPART(HOUR, ProductionDate) in (8, 9, 10, 11, 12, 13, 14, 15) then 'Shift2' else 'Shift3' end\n ",
"index" : {
"enabled" : false,
"indexName" : "",
"indexValue" : ""
}
},
"cacheLifetime" : {
"enabled" : false
},
"template" : {
"type" : "Off"
}
}, {
"name" : "ProductionTransactionExample_AverageByStationByShiftFilterDate",
"connection" : "MSSQL_KB",
"type" : "jdbc.sqlserver",
"qualifier" : {
"type" : "query",
"query" : " SELECT Avg(ProducedQuantity) as average, station, case when DATEPART(HOUR, ProductionDate) < 8 then 'Shift1' when DATEPART(HOUR, ProductionDate) in (8, 9, 10, 11, 12, 13, 14, 15) then 'Shift2' else 'Shift3' end as shift\n FROM [ProductionTransactionExample]\n where ProductionDate >= {{this.StartDate}} and\n ProductionDate < {{this.EndDate}}\n group by station, case when DATEPART(HOUR, ProductionDate) < 8 then 'Shift1' when DATEPART(HOUR, ProductionDate) in (8, 9, 10, 11, 12, 13, 14, 15) then 'Shift2' else 'Shift3' end\n \n ",
"index" : {
"enabled" : false,
"indexName" : "",
"indexValue" : ""
}
},
"cacheLifetime" : {
"enabled" : false
},
"template" : {
"type" : "Off"
}
} ],
"outputs" : [ {
"name" : "ProductionTransactionExample",
"connection" : "MSSQL_KB",
"type" : "jdbc.sqlserver",
"qualifier" : {
"writeType" : "insert",
"createOption" : "off",
"tableCacheInterval" : {
"duration" : 1,
"units" : "Hours"
},
"breakupArrays" : false,
"table" : "ProductionTransactionExample"
}
} ],
"modeling" : {
"models" : [ {
"name" : "ProductionTransactionExample",
"tags" : [ "ProductionTransactionExample" ],
"attributes" : [ {
"attributeType" : "Internal",
"name" : "ProductionID",
"nullable" : false,
"required" : false,
"array" : false,
"internalType" : "Any"
}, {
"attributeType" : "Internal",
"name" : "OrderNumber",
"nullable" : false,
"required" : false,
"array" : false,
"internalType" : "Any"
}, {
"attributeType" : "Internal",
"name" : "MaterialNumber",
"nullable" : false,
"required" : false,
"array" : false,
"internalType" : "Any"
}, {
"attributeType" : "Internal",
"name" : "SerialNumber",
"nullable" : false,
"required" : false,
"array" : false,
"internalType" : "Any"
}, {
"attributeType" : "Internal",
"name" : "Station",
"nullable" : false,
"required" : false,
"array" : false,
"internalType" : "Any"
}, {
"attributeType" : "Internal",
"name" : "ProducedQuantity",
"nullable" : false,
"required" : false,
"array" : false,
"internalType" : "Any"
}, {
"attributeType" : "Internal",
"name" : "ProductionDate",
"nullable" : false,
"required" : false,
"array" : false,
"internalType" : "Any"
} ]
} ],
"instances" : [ ]
},
"conditions" : [ ],
"functions" : [ ],
"tags" : [ {
"name" : "ProductionTransactionExample"
} ],
"pipelines" : [ {
"name" : "ProductionTransactionExample_Insert",
"uri" : "pipeline",
"tags" : [ "ProductionTransactionExample" ],
"settings" : {
"inputStages" : [ "Model" ],
"trackActivity" : false,
"triggers" : [ ],
"stages" : [ {
"name" : "Model",
"outputs" : [ "Write" ],
"config" : {
"type" : ".ModelConfig",
"model" : "ProductionTransactionExample",
"objectName" : "ProductionTrasnactionExampleObject",
"initExpression" : "",
"attributes" : [ {
"name" : "ProductionID",
"expression" : {
"type" : "JavaScript",
"expression" : ""
},
"defaultValue" : 1
}, {
"name" : "OrderNumber",
"expression" : {
"type" : "JavaScript",
"expression" : ""
},
"defaultValue" : "1"
}, {
"name" : "MaterialNumber",
"expression" : {
"type" : "JavaScript",
"expression" : ""
},
"defaultValue" : "A"
}, {
"name" : "SerialNumber",
"expression" : {
"type" : "JavaScript",
"expression" : ""
},
"defaultValue" : "B"
}, {
"name" : "Station",
"expression" : {
"type" : "JavaScript",
"expression" : ""
},
"defaultValue" : "Station1"
}, {
"name" : "ProducedQuantity",
"expression" : {
"type" : "JavaScript",
"expression" : ""
},
"defaultValue" : 1
}, {
"name" : "ProductionDate",
"expression" : {
"type" : "JavaScript",
"expression" : "let d = new Date()\r\nreturn d"
}
} ]
},
"display" : {
"position" : {
"x" : 268,
"y" : 43
}
}
}, {
"name" : "Write",
"outputs" : [ ],
"config" : {
"type" : ".WriteConfig",
"failureOutputs" : [ ],
"references" : [ "{{Connection.MSSQL_KB.ProductionTransactionExample}}" ],
"ignoreResult" : false
},
"display" : {
"position" : {
"x" : 744,
"y" : 56
}
}
} ]
}
}, {
"name" : "ProductionTransactionExample_UpdateMQTT",
"uri" : "pipeline",
"tags" : [ "ProductionTransactionExample" ],
"settings" : {
"inputStages" : [ "Array" ],
"trackActivity" : true,
"triggers" : [ {
"name" : "FlowTrigger",
"config" : {
"type" : ".TriggerFlow",
"inReferences" : [ "{{Connection.MSSQL_KB.ProductionTransactionExample_AverageByStationByShiftFilterDate(StartDate='2024-01-02',EndDate='2025-01-03')}}" ],
"publishMode" : "All",
"flowEvaluation" : {
"type" : "Polled",
"interval" : {
"duration" : 1,
"units" : "Seconds"
},
"delay" : {
"duration" : 0,
"units" : "Seconds"
},
"mode" : "Always"
},
"template" : {
"type" : "Off"
},
"enabled" : true
},
"display" : {
"position" : {
"x" : -450,
"y" : 0
}
}
} ],
"stages" : [ {
"name" : "MQTT_KB",
"outputs" : [ ],
"config" : {
"type" : ".DynamicWriteConfig",
"failureOutputs" : [ ],
"connectionReference" : "{{Connection.MQTT_KB}}",
"ignoreResult" : false,
"qualifier" : {
"topic" : "ProductionTransactionExample/{{event.metadata.filtered.station}}/{{event.metadata.filtered.shift}}/{{event.metadata.breakupName}}",
"qos" : 0,
"namedRoot" : false,
"retained" : true,
"breakupArrays" : false,
"filterList" : [ "_timestamp" ]
},
"connectionType" : "mqtt"
},
"display" : {
"position" : {
"x" : 1590,
"y" : 0
}
}
}, {
"name" : "Filter",
"outputs" : [ "Object" ],
"config" : {
"type" : ".AttributeFilterConfig",
"filterOption" : "exclude",
"attributeList" : [ "station", "shift" ],
"retainAsMetadata" : true,
"metadataKey" : "filtered"
},
"display" : {
"position" : {
"x" : 690,
"y" : 0
}
}
}, {
"name" : "Array",
"outputs" : [ "Filter" ],
"config" : {
"type" : ".BreakupConfig",
"breakupType" : "array"
},
"display" : {
"position" : {
"x" : 240,
"y" : 0
}
}
}, {
"name" : "Object",
"outputs" : [ "MQTT_KB" ],
"config" : {
"type" : ".BreakupConfig",
"breakupType" : "object"
},
"display" : {
"position" : {
"x" : 1140,
"y" : 0
}
}
} ]
}
} ],
"namespace" : [ ]
},
"network" : {
"groups" : [ ],
"hubs" : [ ]
}
}
Example Preparation
-
Enable Intelligence Hub MQTT broker
- In the left-hand navigation panel, navigate to Manage, and click Settings.
- Under the MQTT Broker section enable the broker, if ports 1885 and 1886 are in use on the server hosting your Intelligence Hub, update to ports of your choosing, otherwise accept the defaults and click the Save button.
-
Import the required content
- In the left-hand navigation panel, navigate to Manage, and click Project
- Within the Import screen, ensure Full Project is off (otherwise your existing project will be overwritten).
- Copy the JSON provided in the code window above.
- Change the Import Type to JSON and paste the provided JSON into the Project box and click the Import button.
- Update the imported Connections as required
-
Navigate to Configure and click Connections, click "MQTT_KB" and Update the MQTT settings as required based on the prior preparation step #1.
-
Navigate to Configure and click Connections, click "MSSQL_KB" and type the following in the password field.
Copypassword
-
Click the Save button.
-
-
Setup the UNS Client
-
In the left-hand navigation panel, navigate to Tools and right click UNS Client and select Open Link in New Tab.
-
Enter login information.
-
For Connection select "MQTT_KB".
-
For Subscribed Topics enter "ProductionTransactionExample/#".
-
Click Add.
-
Click the "x" to remove topic "#".
-
Click the Connect button.
-
Confirm the UNS Client says "Connected to MQTT_KB".
-
Return to the previous web browser tab.
-
ProductionTransactionExample_Insert pipeline:
Out to the box, this pipeline will not allow inserts into the example database. The user only has read permissions. To implement this pipeline on other system, make sure the user has write permissions.