Update Indici data load
Background[edit | edit source]
By default, we only load fields from Indici data that we absolutely need.
In some cases, it turns out that we are not loading data that we actually need for a particular job. In this case, there are some key steps required to do this cleanly.
Identify data points that are required[edit | edit source]
All target DB tables accommodate all fields; but not all of them are populated from our load job. Identifying required data can be done by:
- Picking up a parquet file with recent transactions recorded in
indici_staging.auditlog. - Eyeballing the file contents. One way to do this is a python script something like the following:
import pandas as pd
import numpy as np
pd.options.display.max_columns = None # show all columns
df = pd.read_parquet('your_file.parquet', engine='pyarrow')
print(df.head(450))
Note that some files have a lot of columns and will truncate what is visible without the max_columns directive.
Also note that fastparquet can be used as an engine instead of pyarrow.
Prepare database[edit | edit source]
Next it is necessary to truncate your target table, and remove associated records in indici_staging.auditlog.
Truncate target tables[1][edit | edit source]
truncate table indici_staging.target_table;
truncate table rpt.target_table;
Remove auditlog entries[edit | edit source]
delete from indici_staging.auditlog
where "table" = 'your_new_table';
Update SQL definition[edit | edit source]
Next you must change the SQL definition stored in DynamoDB table indiciLoadSQL, and used by the lambda insert function.
The partition key will equate to the target table name, and have three values associated with it:
df= DataFrame definition for subsetting data from the incoming filesql= SQL insert intoindici_stagingsql_dd= SQL for deduplication and insert/merge intorpt(used by Step Functions)
You must therefore update both the df and sql values to add in any new fields.
Save your changes in DynamoDB. The Lambda will use the new definition on its next run.
Backload existing data[edit | edit source]
To reprocess existing files, use the bulk reupload Lambda.
This Lambda:
- Loads objects from
kpa-valentiamatching a prefix (e.g.Invoices_) - Re-uploads them to the same bucket using
PutObjectlogic viacopy_from - This triggers the Step Function via EventBridge (which listens for S3
PutObjectevents onkpa-valentia)
Steps[edit | edit source]
- Set your prefix (e.g.
Invoices_) at the top of the Lambda - Run the Lambda — it will:
- Match all files starting with that prefix and ending in
.parquet - Re-upload each file using
copy_from()to itself - This will trigger the Step Function, which will:
- Call
copyIndiciFiles - Load data into
indici_staging - Call
rptDeduplicationto populaterpt
- Call
- Match all files starting with that prefix and ending in
Monitoring[edit | edit source]
You can monitor execution through:
- CloudWatch logs for the Lambda
- Step Function execution history (each file triggers an execution)
- Checking row counts in
indici_staging.your_tableandrpt.your_table
References[edit | edit source]
- ↑ Also be aware that some datatypes may not be set correctly for your new data. In this scenario, target will have to be recreated.