【AWS】AWS Lambdaを使用して外部システムのデータ移行のバッチ処理を作ってみる with Python
データ移行をAWS Lambdaで実施したいということがありませんか?
外部システムのURLやアクセスキー等をアプリに持っていなく、AWS Secrets Managerに保管しているため、これを使いたいという条件もあるでしょう
今回は外部システムのデータを別の外部システムに移行する実装例を紹介してみます
あ、Pythonで実装します(ライブラリが豊富。。。)
1. 工程はこんな感じ
- AWS Secrets ManagerからAPI情報を取得
・Secrets Managerを使用してAPIのURLや認証情報を取得します。 - データ取得APIを実行
・あるシステムのAPIからデータを取得します。 - データ加工
・取得したデータをもう一つのシステムのリクエスト形式に変換します。 - 登録APIの実行
・加工済みデータを使用して、別のシステムの登録APIを呼び出します。
2. 必要なライブラリは?
- boto3
- AWS SDK for Python
- requests:
- HTTPリクエスト用ライブラリ
(デフォルトではインストールされていないため、lambdaレイヤーを作成して利用する必要があります)
- HTTPリクエスト用ライブラリ
3. Secrets Managerの設定はこんな感じとします
{
"fetch_api_url": "https://api.fetch.com/data",
"register_api_url": "https://api.register.com/register",
"api_key": "your-api-key"
}
4. コードはこんな感じ
import json
import boto3
import requests
def get_secret(secret_name, region_name):
"""
AWS Secrets Managerからシークレットを取得
"""
client = boto3.client("secretsmanager", region_name=region_name)
response = client.get_secret_value(SecretId=secret_name)
return json.loads(response["SecretString"])
def fetch_data(api_url, headers):
"""
データ取得APIを実行
"""
response = requests.get(api_url, headers=headers)
response.raise_for_status() # エラー時に例外をスロー
return response.json()
def transform_data(data):
"""
データ加工処理(具体的な形式は要件に応じて調整)
"""
transformed_data = []
for item in data:
transformed_item = {
"id": item["id"],
"name": item["name"].upper(), # サンプル変換(大文字化)
"value": item["value"] * 10 # 値を10倍にする例
}
transformed_data.append(transformed_item)
return transformed_data
def register_data(api_url, headers, payload):
"""
登録APIを実行
"""
response = requests.post(api_url, headers=headers, json=payload)
response.raise_for_status() # エラー時に例外をスロー
return response.json()
def lambda_handler(event, context):
"""
AWS Lambdaのエントリーポイント
"""
# 1. Secrets Managerから情報を取得
secret_name = "your-secret-name"
region_name = "your-region-name"
secret = get_secret(secret_name, region_name)
fetch_api_url = secret["fetch_api_url"]
register_api_url = secret["register_api_url"]
api_key = secret["api_key"]
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
try:
# 2. データ取得APIを実行
raw_data = fetch_data(fetch_api_url, headers)
# 3. データ加工
transformed_data = transform_data(raw_data)
# 4. 登録APIの実行
for item in transformed_data:
register_response = register_data(register_api_url, headers, item)
print(f"Registered: {register_response}")
return {
"statusCode": 200,
"body": json.dumps("Successfully processed and registered data")
}
except Exception as e:
print(f"Error: {e}")
return {
"statusCode": 500,
"body": json.dumps(f"Error: {str(e)}")
}
5. 注意点
以下は一例ですが、注意して実装してみてください
1. Lambdaの最大タイムアウト
- デフォルトでは3秒、最大で15分(900秒)まで設定可能
- データ移行はAPIコールやデータ加工に時間がかかることが多いので、タイムアウト値を適切に設定する必要があります。
2. リトライとエラーハンドリング
- Lambdaのリトライ動作
- Lambdaは失敗した場合、自動的に再試行(デフォルトで2回まで)を行います。
- これがデータ移行の処理に影響を及ぼす可能性があります(例: 重複登録)。
3. データ量に応じた設計
- メモリ制限
- Lambdaのメモリは128MB~10,240MBの範囲で設定可能。
- データ移行では一度に大量のデータをメモリ上で処理すると、メモリ不足が発生する可能性があります。
これらを実現したい形に加工してぜひ活用してみてください!

是非フォローしてください
最新の情報をお伝えします