CloudWatch Logs(CWL)から Subscription Filter で Kinesis Data Firehose(Firehose) を利用して、S3 へログを保存というケースで、
Firehose で Lambda による Transform を実施したいことがある(ある情報をマスクする、秘匿化するなど)。
その時の考慮事項、実装方法など。

アーキテクチャ

アーキテクチャ図

考慮事項など

  • CWL から Firehose への転送の時点でログは既に gzip 圧縮されている
  • Firehose で Lambda による Transform を行う場合は、Lambda 関数内で gzip を展開する必要がある
  • Firehose で S3 配信時に gzip 圧縮を選択しても拡張子は gz になるが圧縮されない(CWL による gzip と、Firehose による gzip の二重圧縮を避けるため?)
  • そのため、Lambda で Transform 処理を行なった後に、Lambda 関数内で gzip する必要がある

Lambda の実装例(Python)

(内包表記使えばもう少しスマートに書けますがとりあえず。。。)

import base64
import gzip
import io
import json

def lambda_handler(event, context):

    transformed_records = []

    # 複数のレコードが event に渡されるためループで回す
    for rec in event['records']:
        record_id = rec['recordId']

        # Base64 でデコードし、バイナリバイトストリームに読み込む
        data = base64.b64decode(rec['data'])
        iodata = io.BytesIO(data)

        # gzip を展開
        with gzip.GzipFile(fileobj=iodata, mode='r') as f:
            data = json.loads(f.read())

        processed_data = ''
        for log_event in data['logEvents']:
            # 各ログイベントに対して何らかの変換処理を実施
            processed_data += transform(log_event) + '\n'

        # 変換したデータを gzip で圧縮
        iodata = io.BytesIO()
        with gzip.GzipFile(fileobj=iodata, mode='w') as f:
            f.write(processed_data.encode('utf-8'))

        iodata.seek(0)

        # レコードを再度構成する
        tmp = {
            'data': base64.b64encode(iodata.read()).decode('utf-8'),
            'result': 'Ok',
            'recordId': record_id
        }

        transformed_records.append(tmp)

    return {'records': transformed_records }

def transform(log_event):
    # ログイベントに対して何らかの処理を実施
    timestamp = log_event['timestamp']
    message = log_event['message']

    return json.dump({'timestamp': timestamp, 'message': message})

最後に・・・

この投稿は個人的なものであり、所属組織を代表するものではありません。ご了承ください。