【AWS】AWS Lambdaを使用して外部システムのデータ移行のバッチ処理を作ってみる with Python

データ移行をAWS Lambdaで実施したいということがありませんか?

外部システムのURLやアクセスキー等をアプリに持っていなく、AWS Secrets Managerに保管しているため、これを使いたいという条件もあるでしょう

今回は外部システムのデータを別の外部システムに移行する実装例を紹介してみます

あ、Pythonで実装します(ライブラリが豊富。。。)


1. 工程はこんな感じ

  1. AWS Secrets ManagerからAPI情報を取得
    ・Secrets Managerを使用してAPIのURLや認証情報を取得します。
  2. データ取得APIを実行
    ・あるシステムのAPIからデータを取得します。
  3. データ加工
    ・取得したデータをもう一つのシステムのリクエスト形式に変換します。
  4. 登録APIの実行
    ・加工済みデータを使用して、別のシステムの登録APIを呼び出します。

2. 必要なライブラリは?

  • boto3
    • AWS SDK for Python
  • requests:
    • HTTPリクエスト用ライブラリ
      (デフォルトではインストールされていないため、lambdaレイヤーを作成して利用する必要があります)

3. Secrets Managerの設定はこんな感じとします

{
    "fetch_api_url": "https://api.fetch.com/data",
    "register_api_url": "https://api.register.com/register",
    "api_key": "your-api-key"
}

4. コードはこんな感じ

import json
import boto3
import requests

def get_secret(secret_name, region_name):
    """
    AWS Secrets Managerからシークレットを取得
    """
    client = boto3.client("secretsmanager", region_name=region_name)
    response = client.get_secret_value(SecretId=secret_name)
    return json.loads(response["SecretString"])

def fetch_data(api_url, headers):
    """
    データ取得APIを実行
    """
    response = requests.get(api_url, headers=headers)
    response.raise_for_status()  # エラー時に例外をスロー
    return response.json()

def transform_data(data):
    """
    データ加工処理(具体的な形式は要件に応じて調整)
    """
    transformed_data = []
    for item in data:
        transformed_item = {
            "id": item["id"],
            "name": item["name"].upper(),  # サンプル変換(大文字化)
            "value": item["value"] * 10   # 値を10倍にする例
        }
        transformed_data.append(transformed_item)
    return transformed_data

def register_data(api_url, headers, payload):
    """
    登録APIを実行
    """
    response = requests.post(api_url, headers=headers, json=payload)
    response.raise_for_status()  # エラー時に例外をスロー
    return response.json()

def lambda_handler(event, context):
    """
    AWS Lambdaのエントリーポイント
    """
    # 1. Secrets Managerから情報を取得
    secret_name = "your-secret-name"
    region_name = "your-region-name"
    secret = get_secret(secret_name, region_name)
    
    fetch_api_url = secret["fetch_api_url"]
    register_api_url = secret["register_api_url"]
    api_key = secret["api_key"]
    
    headers = {
        "Authorization": f"Bearer {api_key}",
        "Content-Type": "application/json"
    }
    
    try:
        # 2. データ取得APIを実行
        raw_data = fetch_data(fetch_api_url, headers)
        
        # 3. データ加工
        transformed_data = transform_data(raw_data)
        
        # 4. 登録APIの実行
        for item in transformed_data:
            register_response = register_data(register_api_url, headers, item)
            print(f"Registered: {register_response}")
        
        return {
            "statusCode": 200,
            "body": json.dumps("Successfully processed and registered data")
        }
    except Exception as e:
        print(f"Error: {e}")
        return {
            "statusCode": 500,
            "body": json.dumps(f"Error: {str(e)}")
        }

5. 注意点

以下は一例ですが、注意して実装してみてください

1. Lambdaの最大タイムアウト

  • デフォルトでは3秒、最大で15分(900秒)まで設定可能
    • データ移行はAPIコールやデータ加工に時間がかかることが多いので、タイムアウト値を適切に設定する必要があります。

2. リトライとエラーハンドリング

  • Lambdaのリトライ動作
    • Lambdaは失敗した場合、自動的に再試行(デフォルトで2回まで)を行います。
    • これがデータ移行の処理に影響を及ぼす可能性があります(例: 重複登録)。

3. データ量に応じた設計

  • メモリ制限
    • Lambdaのメモリは128MB~10,240MBの範囲で設定可能。
    • データ移行では一度に大量のデータをメモリ上で処理すると、メモリ不足が発生する可能性があります。

これらを実現したい形に加工してぜひ活用してみてください!

是非フォローしてください

最新の情報をお伝えします