Reading in Appended Data from a .CSV
How to read in new data from a CSV that gets appended overtime.
This guide demonstrates how to use HighByte Intelligence Hub to read a CSV file and write new rows to a SQL database and an MQTT broker. The example pipeline, Appended_Rows_Pipe, illustrates this workflow. Below are the details of each pipeline stage and configuration.
Pipeline Overview
The pipeline reads a CSV file at a scheduled interval, processes only the new rows, and writes the data to both a SQL database and MQTT broker.
Attached Files
How to Import and Test
- Import the attached pipeline into your HighByte Intelligence Hub instance.
- Ensure the sample CSV file is accessible at the configured file path.
- Start the pipeline to observe data flow into the SQL database and MQTT broker.
Pipeline Stages
Stage 1: Polled Trigger
- Purpose: Polls the CSV file at a fixed interval.
- Configuration:
- Poll Interval: Set to 1 minute by default. Adjust this value based on your use case.
Stage 2: Read
- Purpose: Reads the CSV file into an array format.
- Configuration:
- Performs an Inline Read with Key to process the array data effectively.
Stage 3: Transform
- Purpose: Filters new rows from the CSV file.
- Steps:
- The current length of the CSV data is calculated.
- The last known length of the CSV file is retrieved from the pipeline state. If this is the first run, the length defaults to 0.
- The array of CSV data is sliced to include only the new rows based on the difference between the current and previous lengths.
- The pipeline state is updated with the current length for future comparisons.
- The new rows are passed to the next stage for further processing.
Below is the code used in this stage, along with comments explaining each step:
// Get the current length of the CSV data
const current_csv_length = event.value.csv_data.length;
// Retrieve the previously recorded length from the pipeline state
// If no previous length is stored, default to 0
const prev_length = state.pipeline.get("previous_length") | 0;
// Slice the CSV array to include only new rows
// This trims the data by skipping rows that have already been processed
const new_data = event.value.csv_data.slice(prev_length);
// Update the pipeline state with the new CSV length
// This ensures the next iteration knows what has already been processed
state.pipeline.set("previous_length", current_csv_length);
// Pass the filtered new rows to the next stage for further processing
stage.setValue(new_data);
By leveraging state variables, this stage ensures that only the new rows in the CSV file are processed. The transformation is efficient, avoiding reprocessing of previously handled data, and maintains a clean workflow for subsequent stages.
Stage 4: Breakup
- Purpose: Converts the array of new rows into individual events for processing.
- Reason: Enables subsequent stages to process each row individually.
Stages 5 & 6: Write to SQL and MQTT
-
SQL Write:
- Writes each new row to a SQLite database. For this demonstration, SQLite is configured as an in-memory database for ease of setup.
- To persist data locally, update the database settings to use a file-based SQLite instance:
- Navigate to Database Settings in the project.
- Configure the database path to point to a local file. (ex.
C:\mydatabase.db
).
-
MQTT Write:
- Writes each new row to an MQTT broker.
- Dynamic Topic Paths: Topics are dynamically generated using the
reference, ensuring unique topics for each row.
Customizations
- Polling Interval: Adjust the polling interval to fit your requirements.
- Database Type: Replace SQLite with another SQL database (e.g., MySQL, PostgreSQL) by updating the database connection settings.
- MQTT Broker Configuration: Ensure the MQTT broker is properly configured in your project to receive the data.