Indici data: Difference between revisions

From Kautepedia
Jump to navigation Jump to search
No edit summary
 
Line 57: Line 57:
* Missing SQL definitions → Triggers static data loading
* Missing SQL definitions → Triggers static data loading
* Other errors → Fails workflow and logs details
* Other errors → Fails workflow and logs details
==Key changes==
{| class="wikitable"
|-
! Previous !! Current
|-
| S3 → Lambda trigger || S3 → EventBridge → Step Functions
|-
| Nightly cron deduplication || Immediate per-file deduplication
|-
| Manual error handling || Built-in error branching
|-
| Centralized 4am processing || Real-time file processing
|}
<div style="background-color: #f0f8ff; padding: 10px; border-left: 3px solid #1e90ff; margin: 10px 0;">
'''Why EventBridge?'''<br>
Provides more reliable triggering than direct S3-Lambda and enables complex workflows.
</div>


==Audit logging==
==Audit logging==

Latest revision as of 21:40, 1 September 2025

Background[edit | edit source]

Indici data is received automatically every day from Valentia. Ultimately, it is envisaged that this will form an important part of a broader K'aute data platform. But, for now, the focus is on picking up the data and bringing it to a manageable state so it can be used for analysis and contract reporting.

Data transfer[edit | edit source]

Summary of the indici data transfer process is shown in the graph below:


Valentia sends daily delta files (Parquet format) to arn:aws:s3:::kpa-valentia. The IAM user valentia uses policy valentia-import for:

  • s3:ListBucket
  • s3:PutObject
  • s3:GetObject

Trigger mechanism: An Amazon EventBridge rule detects file uploads and starts the Step Functions workflow. Lifecycle policies:

  1. Move files to Intelligent tiering after a few days
  2. Move to Archive format after longer periods

Partitioning incoming data[edit | edit source]

When EventBridge detects a new file:

  1. Triggers Step Functions workflow
  2. Workflow calls copyIndiciFiles lambda
  3. File moved to date-partitioned location in kpa-indici-partitioned

Files are partitioned using their embedded timestamp. All files are processed regardless of content.[1]

Data processing workflow[edit | edit source]

The Step Functions workflow coordinates:

Loading data[edit | edit source]

indiciLoadAppointmentMedications lambda:

  1. Reads Parquet → pandas DataFrame
  2. Processes datetimes (NaT → 'None')
  3. Identifies dataset from filename
  4. Pulls transformation rules from DynamoDB
  5. Adds metadata (filename + timestamp)
  6. Validates row count > 0
  7. Executes SQL to load into indici_staging
  8. Logs to auditlog

DB role: lambda_writer
Requires: Psycopg2 layer

Deduplication[edit | edit source]

rptDeduplication lambda:

  1. Performs UPSERTs to rpt tables
  2. Handles conflicts using primary keys
  3. Logs update/insert counts to auditlog

Error handling[edit | edit source]

Workflow handles:

  • Empty files → Logs to CloudWatch and skips
  • Missing SQL definitions → Triggers static data loading
  • Other errors → Fails workflow and logs details

Audit logging[edit | edit source]

All outcomes logged through stateMachine_LogHandler to CloudWatch:

  • SuccessLogs: Processed files
  • EmptyFilesLogs: Skipped empty files
  • ErrorLogs: Processing failures

Infrastructure[edit | edit source]

Obsolete:

  • pg_cron jobs
  • Scheduled database functions

Current:

  • EventBridge rule
  • Step Functions state machine
  • CloudWatch log groups
  • Processing Lambdas

Version control[edit | edit source]

All related code for this work should be checked into gitlab repo malamalama.

TODO[edit | edit source]

  • Update/Insert counts should be logged in indici_staging.auditlog. - Not sure if this has been implemented?

References[edit | edit source]

  1. Empty files are handled in later steps