Skip to content
  • There are no suggestions because the search field is empty.

Reading in Appended Data from a .CSV

How to read in new data from a CSV that gets appended overtime. 

This guide demonstrates how to use HighByte Intelligence Hub to read a CSV file and write new rows to a SQL database and an MQTT broker. The example pipeline, Appended_Rows_Pipe, illustrates this workflow. Below are the details of each pipeline stage and configuration.

Pipeline Overview

The pipeline reads a CSV file at a scheduled interval, processes only the new rows, and writes the data to both a SQL database and MQTT broker.

Attached Files

How to Import and Test

  1. Import the attached pipeline into your HighByte Intelligence Hub instance.
  2. Ensure the sample CSV file is accessible at the configured file path.
  3. Start the pipeline to observe data flow into the SQL database and MQTT broker.

Pipeline Stages


Stage 1: Polled Trigger

  • Purpose: Polls the CSV file at a fixed interval.
  • Configuration:
    • Poll Interval: Set to 1 minute by default. Adjust this value based on your use case.

Stage 2: Read

  • Purpose: Reads the CSV file into an array format.
  • Configuration:
    • Performs an Inline Read with Key to process the array data effectively.

Stage 3: Transform

  • Purpose: Filters new rows from the CSV file.
  • Steps:
    1. The current length of the CSV data is calculated.
    2. The last known length of the CSV file is retrieved from the pipeline state. If this is the first run, the length defaults to 0.
    3. The array of CSV data is sliced to include only the new rows based on the difference between the current and previous lengths.
    4. The pipeline state is updated with the current length for future comparisons.
    5. The new rows are passed to the next stage for further processing.

Below is the code used in this stage, along with comments explaining each step:


// Get the current length of the CSV data
const current_csv_length = event.value.csv_data.length;

// Retrieve the previously recorded length from the pipeline state
// If no previous length is stored, default to 0
const prev_length = state.pipeline.get("previous_length") | 0;

// Slice the CSV array to include only new rows
// This trims the data by skipping rows that have already been processed
const new_data = event.value.csv_data.slice(prev_length);

// Update the pipeline state with the new CSV length
// This ensures the next iteration knows what has already been processed
state.pipeline.set("previous_length", current_csv_length);

// Pass the filtered new rows to the next stage for further processing
stage.setValue(new_data);

By leveraging state variables, this stage ensures that only the new rows in the CSV file are processed. The transformation is efficient, avoiding reprocessing of previously handled data, and maintains a clean workflow for subsequent stages.

Stage 4: Breakup

  • Purpose: Converts the array of new rows into individual events for processing.
  • Reason: Enables subsequent stages to process each row individually.

Stages 5 & 6: Write to SQL and MQTT

  • SQL Write:

    • Writes each new row to a SQLite database. For this demonstration, SQLite is configured as an in-memory database for ease of setup.
    • To persist data locally, update the database settings to use a file-based SQLite instance:
      • Navigate to Database Settings in the project.
      • Configure the database path to point to a local file. (ex. C:\mydatabase.db).
  • MQTT Write:

    • Writes each new row to an MQTT broker.
    • Dynamic Topic Paths: Topics are dynamically generated using the reference, ensuring unique topics for each row.

Customizations

  • Polling Interval: Adjust the polling interval to fit your requirements.
  • Database Type: Replace SQLite with another SQL database (e.g., MySQL, PostgreSQL) by updating the database connection settings.
  • MQTT Broker Configuration: Ensure the MQTT broker is properly configured in your project to receive the data.