Step Functions

From Kautepedia
Revision as of 04:23, 27 June 2025 by Solomon.pidoke (talk | contribs) (Log Streams)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)
Jump to navigation Jump to search

Background[edit | edit source]

We currently have a state machine designed to process Indici files, Indici-State-Machine

Indici-State-Machine[edit | edit source]

The Indici-State-Machine is an AWS Step Functions state machine designed to process Indici files through a series of AWS Lambda functions. It uses an EventBridge rule to start the state machine automatically whenever a file is uploaded to the kpa-valentia S3 bucket. This event-driven setup ensures smooth and immediate processing of new data files. The state machine consists of the following steps:

Transform Event[edit | edit source]

  • Type: Pass
  • Purpose: Prepares and reorganises the incoming file details (the bucket name and object key) so they can be easily processed by the following steps.
  • Trigger: Starts when an EventBridge rule detects a file upload to the kpa-valentia S3 bucket.
  • Output Location: Stores the transformed event details under `$.TransformedEvent`.
  • Key Details Processed: Extracts the bucket name and file name from the uploaded event and formats them into a list for easy processing.
  • Next Step: Copy Indici Files

Copy Indici Files[edit | edit source]

  • Type: Task
  • Purpose: Copies the uploaded file to a specific location in a partitioned S3 bucket for further processing.
  • Function Used: Calls the copyIndiciFiles Lambda function.
  • Data Passed: Uses the TransformedEvent output by the Transform Event step.
  • Output Location: Stores the result under `$.CopyIndiciFilesResult`.
  • Retry Rules: Automatically retries if there are errors with the function, with increasing wait times between retries.
  • Catch: Catches all errors (States.ALL) and transitions to Check Error Type with the error details in $.ErrorInfo.
  • Next Step: Data Load to indici_staging RDS

Data Load to indici_staging RDS[edit | edit source]

  • Type: Task
  • Purpose: Moves the copied file data into the indici_staging database table for validation.
  • Function Used: Calls the indiciLoadAppointmentMedications Lambda function.
  • Data Passed: Combines the bucket name (now partitioned) and the location of the newly copied file is retrieved from $.CopyIndiciFilesResult.Payload.body.new_key.
  • Output Location: Stores the result under `$.DataLoadResult`.
  • Retry Rules: Automatically retries if there are errors with the function, with increasing wait times between retries.
  • Catch: Catches all errors and transitions to Check Error Type.
  • Next Step: Deduplicate Indici Data

Deduplicate Indici Data[edit | edit source]

  • Type: Task
  • Purpose: Removes duplicate records from the database to ensure clean and accurate data.
  • Function Used: Calls the rptDeduplication Lambda function.
  • Data Passed: Uses the key from $.CopyIndiciFilesResult.Payload.body.new_key.
  • Output Location: Stores the result under `$.DeduplicationResult`.
  • Retry Rules: Automatically retries if there are errors with the function, with increasing wait times between retries.
  • Catch: Catches all errors and transitions to Check Error Type.
  • Next Step: Deduplicated

Deduplicated[edit | edit source]

  • Type: Pass
  • Purpose: A simple pass state that can reorganize certain data from previous steps as needed.
  • Parameters: Retains details such as CopyIndiciFilesResult, DataLoadResult, and DeduplicationResult. Initializes NoSQLDDResult to null (in case the next steps set it)
  • Output Location: Stores the transformed event details under `$.TransformedEvent`.
  • Key Details Processed: Extracts the bucket name and file name from the uploaded event and formats them into a list for easy processing.
  • Next Step: Write Success Log

Write Success Log[edit | edit source]

  • Type: Task
  • Purpose: Logs the successful completion of the state machine and includes key file details for tracking.
  • Function Used: Calls the `stateMachine_LogHandler` Lambda function.
  • Log Details:
 * Log Group: stateMachine  
 * Log Stream: SuccessLogs  
 * Status: Success  
 * Additional Information: Includes bucket name, file name, file size, and unique file identifier.  
 * Success Message: "Execution completed successfully."  
  • Next Step: Success State

Error Handling - Errors are handled by sending them to Check Error Type, where conditions can route the execution flow to different error-handling states. Remark: Currently, from the State Machine CloudWatch logs, we cannot capture the filename for Lambda function timeout scenarios. If we encounter more Lambda function timeouts, we can capture the filename in the State Machine CloudWatch logs by using "ErrorEquals" and "Catch" to match error strings that come from Lambda exceptions (timeout) and generate the State Machine log with the filename.

The only downside of this workaround is that the State Machine becomes more complex since we have to add a Catch for every Lambda function to capture the timeout keyword. We cannot use "ClientError" because the Lambda job quits as soon as the timeout occurs.

Check Error Type[edit | edit source]

  • Type: Choice
  • Purpose: Evaluates the error captured in $.ErrorInfo.Cause and decides which path to follow.
  • Branches:
  1. File is Empty: If StringMatches *File is Empty!*, transitions to Write Empty File Error Log.
  2. Missing sql_dd attribute: If StringMatches *No sql_dd attribute found*, transitions to No sql_dd Data Load.
  3. Default: All other errors go to Write General Error Log.

Write Empty File Error Log[edit | edit source]

  • Type: Task
  • Purpose: Logs empty file errors to a dedicated CloudWatch Log Stream (EmptyFilesLogs).
  • Function Used: Calls the 'stateMachine_LogHandler' Lambda function.
  • Log Details:
  1. LogGroup: stateMachine
  2. LogStream: EmptyFilesLogs
  3. LogStream: EmptyFilesLogs
  • Next Step: Success State

Write General Error Log[edit | edit source]

  • Type: Task
  • Purpose: Logs any errors that are not recognized as “File is Empty!” or missing sql_dd attribute..
  • Function Used: Calls the `stateMachine_LogHandler` Lambda function.
  • Log Details:
 * Log Group: stateMachine  
 * Log Stream: ErrorLogs or EmptyFilesLogs, depending on the error type.  
 * Error Message: Extracted from the error details found in `$.ErrorInfo.Cause`.  
  • Next Step: Fail State

Handling Missing sql_dd Data - Certain files may lack the sql_dd attribute. In this case, the state machine routes the error to a special path to run an additional data load step.

No sql_dd Data Load[edit | edit source]

  • Type: Task
  • Purpose: Runs an alternate data load process for files that do not contain the sql_dd attribute..
  • Function Used: Calls the 'noSql_ddStaticDataLoadtoRpt' Lambda function.
  • Data Passed: The entire execution state ($)
  • Output Location: $.NoSQLDDResult
  • Catch: If an error occurs here, logs a General Error Log and goes to Fail State.
  • Next Step: Check Last updated date

Check Last update date[edit | edit source]

  • Type: Choice
  • Purpose: Determines whether a refresh is needed based on the last kptinsertedat value in the target rpt table..
  • Function Used: Calls the 'noSql_ddStaticDataLoadtoRpt' Lambda function.
  • Data Passed: Looks at $.NoSQLDDResult.Payload.status value
  • Branches:
  1. Refreshed: The data load now always proceeds, regardless of how recent the last insert was. The Lambda returns {"status": "Refreshed"} and transitions to Refreshed.
  2. Skipped: This path is no longer used for now.
  3. Default: Any other status or unexpected response transitions to the Write General Error Log step.

Refreshed[edit | edit source]

  • Type: Pass
  • Purpose: For cases when the data load was successfully refreshed.
  • Next Step: Write Success Log

Skipped[edit | edit source]

  • Type: Pass
  • Purpose: For cases when no refresh was needed.
  • Next Step: Write Success Log

Success State[edit | edit source]

  • Type: Succeed
  • Purpose: Confirms that the state machine finished successfully.

Fail State[edit | edit source]

  • Type: Fail
  • Purpose: Indicates that the state machine could not complete due to an error.
  • Error Details: Includes a generic error name ("StateMachineError") and the cause of the failure.

Error Handling[edit | edit source]

The error handling in the Indici-State-Machine is designed to ensure robust processing and debugging of Indici files. Below are the key mechanisms used:

Retries[edit | edit source]

  • Purpose: Automatically retries tasks to handle transient errors.
  • Where Implemented:
 * Copy Indici Files
 * Load Data into indici_staging Table
 * Deduplicate Indici Data
  • Retry Rules:
 * Errors handled: `Lambda.ServiceException`, `Lambda.AWSLambdaException`, `Lambda.SdkClientException`, `Lambda.TooManyRequestsException`
 * Retry mechanism:
   * Interval between retries starts at 1 second.
   * Maximum of 3 attempts.

Error Catching[edit | edit source]

  • Purpose: Captures errors to enable conditional handling and logging.
  • Where Implemented:
 * Each task contains a `Catch` block to handle `States.ALL` errors.
  • Error Storage: Errors are saved under `$.ErrorInfo`.

Error Logging[edit | edit source]

  • Purpose: Logs errors for debugging and analysis.
  • LogGroup: All logs are stored in the stateMachine LogGroup, with the following streams:
 * ErrorLogs: Captures general errors during state machine execution.
 * SuccessLogs: Logs the successful completion of tasks.
 * EmptyFilesLogs: Logs specific errors when a file is empty.
  • Lambda Function Used for Logging: `stateMachine_LogHandler`
  • Write Empty File Error Log:
 * Triggered if the error cause matches `File is Empty!`.
 * Logs to the EmptyFilesLogs stream in the stateMachine LogGroup.
 * After logging, the state machine transitions to the Success State.
  • Write General Error Log:
 * Logs all other errors to the ErrorLogs stream in the stateMachine LogGroup.
 * Error details are extracted from `$.ErrorInfo.Cause`.

Failure Handling[edit | edit source]

  • Fail State:
 * Indicates that the state machine could not complete successfully.
 * Includes a generic error name ("StateMachineError") and the specific cause.

Error-Specific Decision Making[edit | edit source]

  • Check Error Type State:
 * Inspects `$.ErrorInfo.Cause` to determine error type.
 * Routes execution to:
     File is Empty: If StringMatches *File is Empty!*, transitions to Write Empty File Error Log.
     Missing sql_dd attribute: If StringMatches *No sql_dd attribute found*, transitions to No sql_dd Data Load.
     Default: All other errors go to Write General Error Log.

Flow for Handling Errors[edit | edit source]

  1. Each task attempts execution.
  2. If a transient error occurs, retries are triggered.
  3. Errors are caught and stored in `$.ErrorInfo`.
  4. The `Check Error Type` state evaluates the error and routes to the appropriate logging step.
  5. Errors are logged in CloudWatch under appropriate streams within the stateMachine LogGroup using the `stateMachine_LogHandler` Lambda function.
  6. If an empty file error occurs, the state machine logs it and transitions to the Success State. For all other errors, it transitions to the Fail State.

Kotahi-State-Machine[edit | edit source]

The Kotahi-State-Machine is an AWS Step Functions workflow that automatically processes client data files uploaded to the kpt-kotahi S3 bucket. Designed to complement our existing Indici pipeline, it handles file validation, data loading, and deduplication.

Workflow Stages[edit | edit source]

1. File Ingestion

  • Triggered automatically via EventBridge on file upload
  • Skips the following files which are either irrelevant or always empty:
    • Iwi, IwiGroupings, MessageTemplates, CorrespondenceType, ClientCorrespondence

2. Data Processing

  1. Copy valid files → partitioned storage
  2. Load data → PostgreSQL kotahi_staging
  3. Remove duplicates → Clean dataset

3. Success Handling

  • Consolidate execution results
  • Log key metrics (file/size/status)
  • Mark successful completion

Error Handling[edit | edit source]

Common Errors & Solutions:

  • Empty files → Log to special location → Treat as success
  • Missing sql_dd attribute
  1. Attempt static data load
  2. Check status:
  • Refreshed → Success
  • Skipped → Success
  • Other → Failure
  • General errors → Detailed error logs → Mark as failed

Log Streams[edit | edit source]

All logs go to CloudWatch using these streams:

  • kotahiStateMachine/ErrorLogs – General processing errors.
  • kotahiStateMachine/EmptyFilesLogs – Empty file-specific errors.
  • kotahiStateMachine/SuccessLogs – Successful completions.

References[edit | edit source]