Indici data

From Kautepedia
Jump to navigation Jump to search

Background[edit | edit source]

Indici data is received automatically every day from Valentia. Ultimately, it is envisaged that this will form an important part of a broader K'aute data platform. But, for now, the focus is on picking up the data and bringing it to a manageable state so it can be used for analysis and contract reporting.

Data transfer[edit | edit source]

Summary of the indici data transfer process is shown in the graph below:


Valentia sends daily delta files (Parquet format) to arn:aws:s3:::kpa-valentia. The IAM user valentia uses policy valentia-import for:

  • s3:ListBucket
  • s3:PutObject
  • s3:GetObject

Trigger mechanism: An Amazon EventBridge rule detects file uploads and starts the Step Functions workflow. Lifecycle policies:

  1. Move files to Intelligent tiering after a few days
  2. Move to Archive format after longer periods

Partitioning incoming data[edit | edit source]

When EventBridge detects a new file:

  1. Triggers Step Functions workflow
  2. Workflow calls copyIndiciFiles lambda
  3. File moved to date-partitioned location in kpa-indici-partitioned

Files are partitioned using their embedded timestamp. All files are processed regardless of content.[1]

Data processing workflow[edit | edit source]

The Step Functions workflow coordinates:

Loading data[edit | edit source]

indiciLoadAppointmentMedications lambda:

  1. Reads Parquet → pandas DataFrame
  2. Processes datetimes (NaT → 'None')
  3. Identifies dataset from filename
  4. Pulls transformation rules from DynamoDB
  5. Adds metadata (filename + timestamp)
  6. Validates row count > 0
  7. Executes SQL to load into indici_staging
  8. Logs to auditlog

DB role: lambda_writer
Requires: Psycopg2 layer

Deduplication[edit | edit source]

rptDeduplication lambda:

  1. Performs UPSERTs to rpt tables
  2. Handles conflicts using primary keys
  3. Logs update/insert counts to auditlog

Error handling[edit | edit source]

Workflow handles:

  • Empty files → Logs to CloudWatch and skips
  • Missing SQL definitions → Triggers static data loading
  • Other errors → Fails workflow and logs details

Audit logging[edit | edit source]

All outcomes logged through stateMachine_LogHandler to CloudWatch:

  • SuccessLogs: Processed files
  • EmptyFilesLogs: Skipped empty files
  • ErrorLogs: Processing failures

Infrastructure[edit | edit source]

Obsolete:

  • pg_cron jobs
  • Scheduled database functions

Current:

  • EventBridge rule
  • Step Functions state machine
  • CloudWatch log groups
  • Processing Lambdas

Version control[edit | edit source]

All related code for this work should be checked into gitlab repo malamalama.

TODO[edit | edit source]

  • Update/Insert counts should be logged in indici_staging.auditlog. - Not sure if this has been implemented?

References[edit | edit source]

  1. Empty files are handled in later steps