Update Indici data load

From Kautepedia
Revision as of 02:20, 16 April 2025 by Solomon.pidoke (talk | contribs) (Updated!)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)
Jump to navigation Jump to search

Background[edit | edit source]

By default, we only load fields from Indici data that we absolutely need.

In some cases, it turns out that we are not loading data that we actually need for a particular job. In this case, there are some key steps required to do this cleanly.

Identify data points that are required[edit | edit source]

All target DB tables accommodate all fields; but not all of them are populated from our load job. Identifying required data can be done by:

  • Picking up a parquet file with recent transactions recorded in indici_staging.auditlog.
  • Eyeballing the file contents. One way to do this is a python script something like the following:
import pandas as pd
import numpy as np

pd.options.display.max_columns = None  # show all columns

df = pd.read_parquet('your_file.parquet', engine='pyarrow')
print(df.head(450))

Note that some files have a lot of columns and will truncate what is visible without the max_columns directive.

Also note that fastparquet can be used as an engine instead of pyarrow.

Prepare database[edit | edit source]

Next it is necessary to truncate your target table, and remove associated records in indici_staging.auditlog.

Truncate target tables[1][edit | edit source]

truncate table indici_staging.target_table;
truncate table rpt.target_table;

Remove auditlog entries[edit | edit source]

delete from indici_staging.auditlog 
where "table" = 'your_new_table';

Update SQL definition[edit | edit source]

Next you must change the SQL definition stored in DynamoDB table indiciLoadSQL, and used by the lambda insert function.

The partition key will equate to the target table name, and have three values associated with it:

  • df = DataFrame definition for subsetting data from the incoming file
  • sql = SQL insert into indici_staging
  • sql_dd = SQL for deduplication and insert/merge into rpt (used by Step Functions)

You must therefore update both the df and sql values to add in any new fields.

Save your changes in DynamoDB. The Lambda will use the new definition on its next run.

Backload existing data[edit | edit source]

To reprocess existing files, use the bulk reupload Lambda.

This Lambda:

  • Loads objects from kpa-valentia matching a prefix (e.g. Invoices_)
  • Re-uploads them to the same bucket using PutObject logic via copy_from
  • This triggers the Step Function via EventBridge (which listens for S3 PutObject events on kpa-valentia)

Steps[edit | edit source]

  1. Set your prefix (e.g. Invoices_) at the top of the Lambda
  2. Run the Lambda — it will:
    1. Match all files starting with that prefix and ending in .parquet
    2. Re-upload each file using copy_from() to itself
    3. This will trigger the Step Function, which will:
      1. Call copyIndiciFiles
      2. Load data into indici_staging
      3. Call rptDeduplication to populate rpt

Monitoring[edit | edit source]

You can monitor execution through:

  • CloudWatch logs for the Lambda
  • Step Function execution history (each file triggers an execution)
  • Checking row counts in indici_staging.your_table and rpt.your_table

References[edit | edit source]

  1. Also be aware that some datatypes may not be set correctly for your new data. In this scenario, target will have to be recreated.