Indici data: Difference between revisions
No edit summary |
|||
| Line 57: | Line 57: | ||
* Missing SQL definitions → Triggers static data loading | * Missing SQL definitions → Triggers static data loading | ||
* Other errors → Fails workflow and logs details | * Other errors → Fails workflow and logs details | ||
==Audit logging== | ==Audit logging== | ||
Latest revision as of 21:40, 1 September 2025
Background[edit | edit source]
Indici data is received automatically every day from Valentia. Ultimately, it is envisaged that this will form an important part of a broader K'aute data platform. But, for now, the focus is on picking up the data and bringing it to a manageable state so it can be used for analysis and contract reporting.
Data transfer[edit | edit source]
Summary of the indici data transfer process is shown in the graph below:
Valentia sends daily delta files (Parquet format) to arn:aws:s3:::kpa-valentia. The IAM user valentia uses policy valentia-import for:
s3:ListBuckets3:PutObjects3:GetObject
Trigger mechanism: An Amazon EventBridge rule detects file uploads and starts the Step Functions workflow. Lifecycle policies:
- Move files to Intelligent tiering after a few days
- Move to Archive format after longer periods
Partitioning incoming data[edit | edit source]
When EventBridge detects a new file:
- Triggers Step Functions workflow
- Workflow calls
copyIndiciFileslambda - File moved to date-partitioned location in
kpa-indici-partitioned
Files are partitioned using their embedded timestamp. All files are processed regardless of content.[1]
Data processing workflow[edit | edit source]
The Step Functions workflow coordinates:
Loading data[edit | edit source]
indiciLoadAppointmentMedications lambda:
- Reads Parquet → pandas DataFrame
- Processes datetimes (NaT → 'None')
- Identifies dataset from filename
- Pulls transformation rules from DynamoDB
- Adds metadata (filename + timestamp)
- Validates row count > 0
- Executes SQL to load into
indici_staging - Logs to
auditlog
DB role: lambda_writer
Requires: Psycopg2 layer
Deduplication[edit | edit source]
rptDeduplication lambda:
- Performs UPSERTs to
rpttables - Handles conflicts using primary keys
- Logs update/insert counts to
auditlog
Error handling[edit | edit source]
Workflow handles:
- Empty files → Logs to CloudWatch and skips
- Missing SQL definitions → Triggers static data loading
- Other errors → Fails workflow and logs details
Audit logging[edit | edit source]
All outcomes logged through stateMachine_LogHandler to CloudWatch:
SuccessLogs: Processed filesEmptyFilesLogs: Skipped empty filesErrorLogs: Processing failures
Infrastructure[edit | edit source]
Obsolete:
pg_cronjobsScheduled database functions
Current:
- EventBridge rule
- Step Functions state machine
- CloudWatch log groups
- Processing Lambdas
Version control[edit | edit source]
All related code for this work should be checked into gitlab repo malamalama.
TODO[edit | edit source]
- Update/Insert counts should be logged in
indici_staging.auditlog. - Not sure if this has been implemented?
References[edit | edit source]
- ↑ Empty files are handled in later steps