CloudWatch Logs(CWL)から Subscription Filter で Kinesis Data Firehose(Firehose) を利用して、S3 へログを保存というケースで、
Firehose で Lambda による Transform を実施したいことがある(ある情報をマスクする、秘匿化するなど)。
その時の考慮事項、実装方法など。
アーキテクチャ
考慮事項など
- CWL から Firehose への転送の時点でログは既に gzip 圧縮されている
- Firehose で Lambda による Transform を行う場合は、Lambda 関数内で gzip を展開する必要がある
- Firehose で S3 配信時に gzip 圧縮を選択しても拡張子は gz になるが圧縮されない(CWL による gzip と、Firehose による gzip の二重圧縮を避けるため?)
- そのため、Lambda で Transform 処理を行なった後に、Lambda 関数内で gzip する必要がある
Lambda の実装例(Python)
(内包表記使えばもう少しスマートに書けますがとりあえず。。。)
import base64
import gzip
import io
import json
def lambda_handler(event, context):
transformed_records = []
# 複数のレコードが event に渡されるためループで回す
for rec in event['records']:
record_id = rec['recordId']
# Base64 でデコードし、バイナリバイトストリームに読み込む
data = base64.b64decode(rec['data'])
iodata = io.BytesIO(data)
# gzip を展開
with gzip.GzipFile(fileobj=iodata, mode='r') as f:
data = json.loads(f.read())
processed_data = ''
for log_event in data['logEvents']:
# 各ログイベントに対して何らかの変換処理を実施
processed_data += transform(log_event) + '\n'
# 変換したデータを gzip で圧縮
iodata = io.BytesIO()
with gzip.GzipFile(fileobj=iodata, mode='w') as f:
f.write(processed_data.encode('utf-8'))
iodata.seek(0)
# レコードを再度構成する
tmp = {
'data': base64.b64encode(iodata.read()).decode('utf-8'),
'result': 'Ok',
'recordId': record_id
}
transformed_records.append(tmp)
return {'records': transformed_records }
def transform(log_event):
# ログイベントに対して何らかの処理を実施
timestamp = log_event['timestamp']
message = log_event['message']
return json.dump({'timestamp': timestamp, 'message': message})
最後に・・・
この投稿は個人的なものであり、所属組織を代表するものではありません。ご了承ください。