Skip to content
  • There are no suggestions because the search field is empty.

Aggregating Data using a Database

Methods for recording transactional data into a database.  Then based on fixed intervals, the data can be queried using aggregate database functions.

What Does This Article Cover:

The article will show how data can be inserted into a database.   Once the data is insert, then aggregate functions such as AVG (average) can be used to query the data on fixed intervals and outputted to a data source (e.g. MQTT). 

Here is the configuration used in this article:

{
  "productInfo" : {
    "company" : "HighByte",
    "product" : "IntelligenceHub",
    "version" : "4.0.1",
    "build" : "2024.11.5.4",
    "stage" : "Release"
  },
  "project" : {
    "version" : 9,
    "connections" : [ {
      "name" : "MQTT_KB",
      "uri" : "mqtt://localhost:1885",
      "tags" : [ "ProductionTransactionExample" ],
      "writes" : {
        "flattenModeledValues" : false
      },
      "subscriptions" : { },
      "storeForward" : {
        "enabled" : false,
        "maxEntries" : 100,
        "waitOnFailureInterval" : {
          "duration" : 1,
          "units" : "Seconds"
        }
      },
      "settings" : {
        "connectionTimeoutSeconds" : 10,
        "keepAliveSeconds" : 60,
        "requestTimeoutMS" : 5000,
        "cleanSession" : true
      }
    }, {
      "name" : "MSSQL_KB",
      "uri" : "jdbc.sqlserver://52.10.6.4:1433",
    "tags" : [ "ProductionTransactionExample" ],
      "writes" : {
        "flattenModeledValues" : false
      },
      "subscriptions" : { },
      "storeForward" : {
        "enabled" : false,
        "maxEntries" : 100,
        "waitOnFailureInterval" : {
          "duration" : 1,
          "units" : "Seconds"
        }
      },
      "settings" : {
        "database" : " KnowledgeBase",
        "username" : "highbyte",
        "password" : {
          "type" : "Encrypted",
          "value" : {
            "keyId" : "Cf/sVXT5wrSLSprn61yJRA==",
            "iv" : "L/tf6nK+xvJZ+lEwXOmgmg==",
            "ciphertext" : "V6J7oYItsyfEB0r8q3nIjA=="
          }
        }
      }
    } ],
    "inputs" : [ {
      "name" : "Locations",
      "connection" : "MSSQL_KB",
      "type" : "jdbc.sqlserver",
      "qualifier" : {
        "type" : "query",
        "query" : "select top (1) * from Locations where Id > and (AltPathNumber = or AltPathNumber = 0) order by Id",
        "index" : {
          "enabled" : false,
          "indexName" : "",
          "indexValue" : ""
        }
      },
      "cacheLifetime" : {
        "enabled" : false
      },
      "template" : {
        "type" : "Static",
        "params" : [ {
          "name" : "Id",
          "pattern" : "300"
        }, {
          "name" : "AltPathNumber",
          "pattern" : "2"
        } ]
      }
    }, {
      "name" : "ProductionTransactionExample",
      "connection" : "MSSQL_KB",
      "type" : "jdbc.sqlserver",
      "qualifier" : {
        "type" : "query",
        "query" : "select * from ProductionTransactionExample",
        "index" : {
          "enabled" : false,
          "indexName" : "",
          "indexValue" : ""
        }
      },
      "cacheLifetime" : {
        "enabled" : false
      },
      "template" : {
        "type" : "Off"
      }
    }, {
      "name" : "ProductionTransactionExample_AverageByMaterial",
      "connection" : "MSSQL_KB",
      "type" : "jdbc.sqlserver",
      "qualifier" : {
        "type" : "query",
        "query" : " SELECT Avg(ProducedQuantity) as average, MaterialNumber\n  FROM [ProductionTransactionExample]\n  group by MaterialNumber",
        "index" : {
          "enabled" : false,
          "indexName" : "",
          "indexValue" : ""
        }
      },
      "cacheLifetime" : {
        "enabled" : false
      },
      "template" : {
        "type" : "Off"
      }
    }, {
      "name" : "ProductionTransactionExample_AverageByMaterialByHour",
      "connection" : "MSSQL_KB",
      "type" : "jdbc.sqlserver",
      "qualifier" : {
        "type" : "query",
        "query" : "  SELECT Avg(ProducedQuantity) as average, MaterialNumber, DATEPART(HOUR, ProductionDate) as hour\n  FROM [ProductionTransactionExample]\n  group by MaterialNumber, DATEPART(HOUR, ProductionDate)",
        "index" : {
          "enabled" : false,
          "indexName" : "",
          "indexValue" : ""
        }
      },
      "cacheLifetime" : {
        "enabled" : false
      },
      "template" : {
        "type" : "Off"
      }
    }, {
      "name" : "ProductionTransactionExample_AverageByMaterialByShift",
      "connection" : "MSSQL_KB",
      "type" : "jdbc.sqlserver",
      "qualifier" : {
        "type" : "query",
        "query" : " SELECT Avg(ProducedQuantity) as average, MaterialNumber, case when DATEPART(HOUR, ProductionDate) < 8 then 'Shift1' when DATEPART(HOUR, ProductionDate) in (8, 9, 10, 11, 12, 13, 14, 15) then 'Shift2' else 'Shift3' end as shift\n  FROM [ProductionTransactionExample]\n  group by MaterialNumber, case when DATEPART(HOUR, ProductionDate) < 8 then 'Shift1' when DATEPART(HOUR, ProductionDate) in (8, 9, 10, 11, 12, 13, 14, 15) then 'Shift2' else 'Shift3' end \n\n  ",
        "index" : {
          "enabled" : false,
          "indexName" : "",
          "indexValue" : ""
        }
      },
      "cacheLifetime" : {
        "enabled" : false
      },
      "template" : {
        "type" : "Off"
      }
    }, {
      "name" : "ProductionTransactionExample_AverageByStation",
      "connection" : "MSSQL_KB",
      "type" : "jdbc.sqlserver",
      "qualifier" : {
        "type" : "query",
        "query" : "SELECT Avg(ProducedQuantity) as average, station\n  FROM [ProductionTransactionExample]\n  group by station",
        "index" : {
          "enabled" : false,
          "indexName" : "",
          "indexValue" : ""
        }
      },
      "cacheLifetime" : {
        "enabled" : false
      },
      "template" : {
        "type" : "Off"
      }
    }, {
      "name" : "ProductionTransactionExample_AverageByStationByHour",
      "connection" : "MSSQL_KB",
      "type" : "jdbc.sqlserver",
      "qualifier" : {
        "type" : "query",
        "query" : "  SELECT Avg(ProducedQuantity) as average, station, DATEPART(HOUR, ProductionDate) as hour\n  FROM [ProductionTransactionExample]\n  group by station, DATEPART(HOUR, ProductionDate)",
        "index" : {
          "enabled" : false,
          "indexName" : "",
          "indexValue" : ""
        }
      },
      "cacheLifetime" : {
        "enabled" : false
      },
      "template" : {
        "type" : "Off"
      }
    }, {
      "name" : "ProductionTransactionExample_AverageByStationByMaterialFilterDate",
      "connection" : "MSSQL_KB",
      "type" : "jdbc.sqlserver",
      "qualifier" : {
        "type" : "query",
        "query" : "  SELECT Avg(ProducedQuantity) as average, MaterialNumber, case when DATEPART(HOUR, ProductionDate) < 8 then 'Shift1' when DATEPART(HOUR, ProductionDate) in (8, 9, 10, 11, 12, 13, 14, 15) then 'Shift2' else 'Shift3' end as Shift\n  FROM [ProductionTransactionExample]\n  where ProductionDate >= '2025-01-02' and\n  ProductionDate < '2025-01-03'\n  group by MaterialNumber, case when DATEPART(HOUR, ProductionDate) < 8 then 'Shift1' when DATEPART(HOUR, ProductionDate) in (8, 9, 10, 11, 12, 13, 14, 15) then 'Shift2' else 'Shift3' end \n  \n  ",
        "index" : {
          "enabled" : false,
          "indexName" : "",
          "indexValue" : ""
        }
      },
      "cacheLifetime" : {
        "enabled" : false
      },
      "template" : {
        "type" : "Off"
      }
    }, {
      "name" : "ProductionTransactionExample_AverageByStationByShift",
      "connection" : "MSSQL_KB",
      "type" : "jdbc.sqlserver",
      "qualifier" : {
        "type" : "query",
        "query" : "  SELECT Avg(ProducedQuantity) as average, station, case when DATEPART(HOUR, ProductionDate) < 8 then 'Shift1' when DATEPART(HOUR, ProductionDate) in (8, 9, 10, 11, 12, 13, 14, 15) then 'Shift2' else 'Shift3' end as shift\n  FROM [ProductionTransactionExample]\n  group by station, case when DATEPART(HOUR, ProductionDate) < 8 then 'Shift1' when DATEPART(HOUR, ProductionDate) in (8, 9, 10, 11, 12, 13, 14, 15) then 'Shift2' else 'Shift3' end\n  ",
        "index" : {
          "enabled" : false,
          "indexName" : "",
          "indexValue" : ""
        }
      },
      "cacheLifetime" : {
        "enabled" : false
      },
      "template" : {
        "type" : "Off"
      }
    }, {
      "name" : "ProductionTransactionExample_AverageByStationByShiftFilterDate",
      "connection" : "MSSQL_KB",
      "type" : "jdbc.sqlserver",
      "qualifier" : {
        "type" : "query",
        "query" : "    SELECT Avg(ProducedQuantity) as average, station, case when DATEPART(HOUR, ProductionDate) < 8 then 'Shift1' when DATEPART(HOUR, ProductionDate) in (8, 9, 10, 11, 12, 13, 14, 15) then 'Shift2' else 'Shift3' end as shift\n  FROM [ProductionTransactionExample]\n  where ProductionDate >= {{this.StartDate}} and\n  ProductionDate < {{this.EndDate}}\n  group by station, case when DATEPART(HOUR, ProductionDate) < 8 then 'Shift1' when DATEPART(HOUR, ProductionDate) in (8, 9, 10, 11, 12, 13, 14, 15) then 'Shift2' else 'Shift3' end\n  \n  ",
        "index" : {
          "enabled" : false,
          "indexName" : "",
          "indexValue" : ""
        }
      },
      "cacheLifetime" : {
        "enabled" : false
      },
      "template" : {
        "type" : "Off"
      }
    } ],
    "outputs" : [ {
      "name" : "ProductionTransactionExample",
      "connection" : "MSSQL_KB",
      "type" : "jdbc.sqlserver",
      "qualifier" : {
        "writeType" : "insert",
        "createOption" : "off",
        "tableCacheInterval" : {
          "duration" : 1,
          "units" : "Hours"
        },
        "breakupArrays" : false,
        "table" : "ProductionTransactionExample"
      }
    } ],
    "modeling" : {
      "models" : [ {
        "name" : "ProductionTransactionExample",
        "tags" : [ "ProductionTransactionExample" ],
        "attributes" : [ {
          "attributeType" : "Internal",
          "name" : "ProductionID",
          "nullable" : false,
          "required" : false,
          "array" : false,
          "internalType" : "Any"
        }, {
          "attributeType" : "Internal",
          "name" : "OrderNumber",
          "nullable" : false,
          "required" : false,
          "array" : false,
          "internalType" : "Any"
        }, {
          "attributeType" : "Internal",
          "name" : "MaterialNumber",
          "nullable" : false,
          "required" : false,
          "array" : false,
          "internalType" : "Any"
        }, {
          "attributeType" : "Internal",
          "name" : "SerialNumber",
          "nullable" : false,
          "required" : false,
          "array" : false,
          "internalType" : "Any"
        }, {
          "attributeType" : "Internal",
          "name" : "Station",
          "nullable" : false,
          "required" : false,
          "array" : false,
          "internalType" : "Any"
        }, {
          "attributeType" : "Internal",
          "name" : "ProducedQuantity",
          "nullable" : false,
          "required" : false,
          "array" : false,
          "internalType" : "Any"
        }, {
          "attributeType" : "Internal",
          "name" : "ProductionDate",
          "nullable" : false,
          "required" : false,
          "array" : false,
          "internalType" : "Any"
        } ]
      } ],
      "instances" : [ ]
    },
    "conditions" : [ ],
    "functions" : [ ],
    "tags" : [ {
      "name" : "ProductionTransactionExample"
    } ],
    "pipelines" : [ {
      "name" : "ProductionTransactionExample_Insert",
      "uri" : "pipeline",
      "tags" : [ "ProductionTransactionExample" ],
      "settings" : {
        "inputStages" : [ "Model" ],
        "trackActivity" : false,
        "triggers" : [ ],
        "stages" : [ {
          "name" : "Model",
          "outputs" : [ "Write" ],
          "config" : {
            "type" : ".ModelConfig",
            "model" : "ProductionTransactionExample",
            "objectName" : "ProductionTrasnactionExampleObject",
            "initExpression" : "",
            "attributes" : [ {
              "name" : "ProductionID",
              "expression" : {
                "type" : "JavaScript",
                "expression" : ""
              },
              "defaultValue" : 1
            }, {
              "name" : "OrderNumber",
              "expression" : {
                "type" : "JavaScript",
                "expression" : ""
              },
              "defaultValue" : "1"
            }, {
              "name" : "MaterialNumber",
              "expression" : {
                "type" : "JavaScript",
                "expression" : ""
              },
              "defaultValue" : "A"
            }, {
              "name" : "SerialNumber",
              "expression" : {
                "type" : "JavaScript",
                "expression" : ""
              },
              "defaultValue" : "B"
            }, {
              "name" : "Station",
              "expression" : {
                "type" : "JavaScript",
                "expression" : ""
              },
              "defaultValue" : "Station1"
            }, {
              "name" : "ProducedQuantity",
              "expression" : {
                "type" : "JavaScript",
                "expression" : ""
              },
              "defaultValue" : 1
            }, {
              "name" : "ProductionDate",
              "expression" : {
                "type" : "JavaScript",
                "expression" : "let d = new Date()\r\nreturn d"
              }
            } ]
          },
          "display" : {
            "position" : {
              "x" : 268,
              "y" : 43
            }
          }
        }, {
          "name" : "Write",
          "outputs" : [ ],
          "config" : {
            "type" : ".WriteConfig",
            "failureOutputs" : [ ],
            "references" : [ "{{Connection.MSSQL_KB.ProductionTransactionExample}}" ],
            "ignoreResult" : false
          },
          "display" : {
            "position" : {
              "x" : 744,
              "y" : 56
            }
          }
        } ]
      }
    }, {
      "name" : "ProductionTransactionExample_UpdateMQTT",
      "uri" : "pipeline",
      "tags" : [ "ProductionTransactionExample" ],
      "settings" : {
        "inputStages" : [ "Array" ],
        "trackActivity" : true,
        "triggers" : [ {
          "name" : "FlowTrigger",
          "config" : {
            "type" : ".TriggerFlow",
            "inReferences" : [ "{{Connection.MSSQL_KB.ProductionTransactionExample_AverageByStationByShiftFilterDate(StartDate='2024-01-02',EndDate='2025-01-03')}}" ],
            "publishMode" : "All",
            "flowEvaluation" : {
              "type" : "Polled",
              "interval" : {
                "duration" : 1,
                "units" : "Seconds"
              },
              "delay" : {
                "duration" : 0,
                "units" : "Seconds"
              },
              "mode" : "Always"
            },
            "template" : {
              "type" : "Off"
            },
            "enabled" : true
          },
          "display" : {
            "position" : {
              "x" : -450,
              "y" : 0
            }
          }
        } ],
        "stages" : [ {
          "name" : "MQTT_KB",
          "outputs" : [ ],
          "config" : {
            "type" : ".DynamicWriteConfig",
            "failureOutputs" : [ ],
            "connectionReference" : "{{Connection.MQTT_KB}}",
            "ignoreResult" : false,
            "qualifier" : {
              "topic" : "ProductionTransactionExample/{{event.metadata.filtered.station}}/{{event.metadata.filtered.shift}}/{{event.metadata.breakupName}}",
              "qos" : 0,
              "namedRoot" : false,
              "retained" : true,
              "breakupArrays" : false,
              "filterList" : [ "_timestamp" ]
            },
            "connectionType" : "mqtt"
          },
          "display" : {
            "position" : {
              "x" : 1590,
              "y" : 0
            }
          }
        }, {
          "name" : "Filter",
          "outputs" : [ "Object" ],
          "config" : {
            "type" : ".AttributeFilterConfig",
            "filterOption" : "exclude",
            "attributeList" : [ "station", "shift" ],
            "retainAsMetadata" : true,
            "metadataKey" : "filtered"
          },
          "display" : {
            "position" : {
              "x" : 690,
              "y" : 0
            }
          }
        }, {
          "name" : "Array",
          "outputs" : [ "Filter" ],
          "config" : {
            "type" : ".BreakupConfig",
            "breakupType" : "array"
          },
          "display" : {
            "position" : {
              "x" : 240,
              "y" : 0
            }
          }
        }, {
          "name" : "Object",
          "outputs" : [ "MQTT_KB" ],
          "config" : {
            "type" : ".BreakupConfig",
            "breakupType" : "object"
          },
          "display" : {
            "position" : {
              "x" : 1140,
              "y" : 0
            }
          }
        } ]
      }
    } ],
    "namespace" : [ ]
  },
  "network" : {
    "groups" : [ ],
    "hubs" : [ ]
  }
}

Example Preparation

  1. Enable Intelligence Hub MQTT broker

    • In the left-hand navigation panel, navigate to Manage, and click Settings.
    • Under the MQTT Broker section enable the broker, if ports 1885 and 1886 are in use on the server hosting your Intelligence Hub, update to ports of your choosing, otherwise accept the defaults and click the Save button.
  2. Import the required content

    • In the left-hand navigation panel, navigate to Manage, and click Project
    • Within the Import screen, ensure Full Project is off (otherwise your existing project will be overwritten).
    • Copy the JSON provided in the code window above.
    • Change the Import Type to JSON and paste the provided JSON into the Project box and click the Import button.
  3. Update the imported Connections as required
    • Navigate to Configure and click Connections, click "MQTT_KB" and Update the MQTT settings as required based on the prior preparation step #1.

    • Navigate to Configure and click Connections, click "MSSQL_KB" and type the following in the password field. 

      Copy
      password
    • Click the Save button.

  4. Setup the UNS Client

    • In the left-hand navigation panel, navigate to Tools and right click UNS Client and select Open Link in New Tab.

    • Enter login information.

    • For Connection select "MQTT_KB".

    • For Subscribed Topics enter "ProductionTransactionExample/#".

    • Click Add.

    • Click the "x" to remove topic "#".

    • Click the Connect button.

    • Confirm the UNS Client says "Connected to MQTT_KB".

    • Return to the previous web browser tab.

ProductionTransactionExample_Insert pipeline:

This pipeline shows out the transaction data is inserted into the database.   A trigger would need to be added to this pipeline to start it.   For event type data, an Event stage should be used.  After the data is received, it should be Modeled into the format required for the destination table.  Here we are using a Model stage to transform the data.   Final, the data is inserted into the database.  
 
 

Out to the box, this pipeline will not allow inserts into the example database.  The user only has read permissions.  To implement this pipeline on other system, make sure the user has write permissions.

  

ProductionTransactionExample_UpdateMQTT:

Here is where the data is aggregated and sent to MQTT.   The FlowTrigger reads from an input that uses an SQL statement that aggregates the transaction data.
 
 
 
Here is the input being used.   The query is aggregating the ProducedQuantity column by station and shift.   There also is a where clause that enable the StartDate and EndDate to be specified to filter the query by datetime.
 
 
The query will produce multiple results.   The average is broken down by station and by shift.
 
 
Then pipeline then breaks up the array of data, filters it and the breaks up the object to get the average value.  Once done, the result is written to MQTT.
 
 
 

Other Related Material: