CloudWatch Logs(CWL)から Subscription Filter で Kinesis Data Firehose(Firehose) を利用して、S3 へログを保存というケースで、
Firehose で Lambda による Transform を実施したいことがある(ある情報をマスクする、秘匿化するなど)。
その時の考慮事項、実装方法など。
アーキテクチャ
考慮事項など
- CWL から Firehose への転送の時点でログは既に gzip 圧縮されている
 - Firehose で Lambda による Transform を行う場合は、Lambda 関数内で gzip を展開する必要がある
 - Firehose で S3 配信時に gzip 圧縮を選択しても拡張子は gz になるが圧縮されない(CWL による gzip と、Firehose による gzip の二重圧縮を避けるため?)
 - そのため、Lambda で Transform 処理を行なった後に、Lambda 関数内で gzip する必要がある
 
Lambda の実装例(Python)
(内包表記使えばもう少しスマートに書けますがとりあえず。。。)
import base64
import gzip
import io
import json
def lambda_handler(event, context):
    transformed_records = []
    # 複数のレコードが event に渡されるためループで回す
    for rec in event['records']:
        record_id = rec['recordId']
        # Base64 でデコードし、バイナリバイトストリームに読み込む
        data = base64.b64decode(rec['data'])
        iodata = io.BytesIO(data)
        # gzip を展開
        with gzip.GzipFile(fileobj=iodata, mode='r') as f:
            data = json.loads(f.read())
        processed_data = ''
        for log_event in data['logEvents']:
            # 各ログイベントに対して何らかの変換処理を実施
            processed_data += transform(log_event) + '\n'
        # 変換したデータを gzip で圧縮
        iodata = io.BytesIO()
        with gzip.GzipFile(fileobj=iodata, mode='w') as f:
            f.write(processed_data.encode('utf-8'))
        iodata.seek(0)
        # レコードを再度構成する
        tmp = {
            'data': base64.b64encode(iodata.read()).decode('utf-8'),
            'result': 'Ok',
            'recordId': record_id
        }
        transformed_records.append(tmp)
    return {'records': transformed_records }
def transform(log_event):
    # ログイベントに対して何らかの処理を実施
    timestamp = log_event['timestamp']
    message = log_event['message']
    return json.dump({'timestamp': timestamp, 'message': message})
最後に・・・
この投稿は個人的なものであり、所属組織を代表するものではありません。ご了承ください。