カピバラ好きなエンジニアブログ

興味ある技術とか検証した内容を赴くままに書いていきます。カピバラの可愛さこそ至高。

AWS 規範ガイダンス「Using Apache Iceberg on AWS」Memo #4

はじめに

こんにちは、半田です。

前回に引き続き、AWS 規範ガイダンス「Using Apache Iceberg on AWS」を読んでメモをしていきたいと思います。

docs.aws.amazon.com

前回は「Working with Apache Iceberg in AWS Glue」のページを読みました。
今回は「Working with Apache Iceberg tables by using Amazon Data Firehose」を読んでいきます。

前の記事の冒頭にも書きましたが、こういうメモをあまり書いたことがないので、「それまんま引用じゃね?」というところがあってもご容赦ください。
あと自分の理解を書いていますが、もしかしたら間違っている可能性もあるので、正しい情報は公式の各ドキュメントを参照するようにしてください。


目次


Working with Apache Iceberg tables by using Amazon Data Firehose(Amazon Data Firehoseを使ってApache Icebergテーブルを扱う)

Amazon Data Firehoseを利用すると、複数のソースからS3やRedshift等の送信先にデータを配信することができます。

docs.aws.amazon.com

Firehoseのソースを設定する際には以下の3種類から選択でき、"Direct PUT"を選択するとさらに複数のソースを指定することができます。

宛先としては以下が選択できるようです。

宛先にも含まれていますが、Data Firehoseを利用するとデータをAmazon S3Apache Icebergテーブルに直接配信することができます。
Firehoseを利用することで、1つのストリームから複数のIcebergテーブルに対して、INSERT/UPDATE/DLETEの操作を自動的に行うことができるようです。
また、S3のIcebergテーブルだけでなく、2024年12月にリリースされたS3 Tablesに対してもFirehoseからデータを配信することもできます。

AWS規範ガイダンス内に記載されているFirehoseストリームを設定するドキュメントを元に実際に設定してみます。 docs.aws.amazon.com


Firehoseストリームを設定する

ドキュメントの流れに従って進めていきます。

ソースと宛先を構成する

まずはFirehoseコンソールから新しいFirehoseストリームの作成を行い、ソースと宛先を設定します。

Apache Iceberg Tables にデータを配信するには、ストリームのソースを選択します。 ストリームのソースを構成するには、「ソース設定の構成」を参照してください。 次に、宛先としてApache Iceberg Tables を選択し、Firehose ストリーム名を指定します。

今回はシンプルな構成としたいので、ソースはDirect PUT、あて先はApache Iceberg Tablesにしました。

  • Direct PUT
  • Apache Iceberg Tables


データ変換を構成する

Firehoseストリームでは、Lambda関数を利用することで受信したデータに対して変換を行うことができます。
詳しくは以下のドキュメントに記載されていますが、データ変換を有効にすると受信したデータをバッファリングするようになっており、バッファされたデータがバッファリングサイズまで貯まるとLambdaが呼び出されます。
この時、FirehoseからLambda関数に送信するリクエストとレスポンスのペイロードサイズには制限があるため、あまり大きくなりすぎないように調整する必要があります。
また、Lambda関数が5分以上かかる場合はエラーが発生するため、複雑で時間がかかる変換は避ける必要があります。

docs.aws.amazon.com

  • 受信レコードを単一の Iceberg テーブルにルーティングする場合

docs.aws.amazon.com

受信したデータをIcebergテーブルに配信する場合、単一のIcebergテーブルに配信するのか、複数のIcebergテーブルに配信するのかによって実装の方法が異なります。
単一のIcebergテーブルに受信データをINSERTしたい場合は、以下のストリーム設定をするだけで済みます。
もしINSERTだけでなく、UPDATEやDELETEを実行したい場合は、JSONQuery(JQ)かLambdaを利用する必要があります。

[
  {
    "DestinationDatabaseName": "UserEvents",
    "DestinationTableName": "customer_id",
    "UniqueKeys": [
      "COLUMN_PLACEHOLDER"
    ],
    "S3ErrorOutputPrefix": "OPTIONAL_PREFIX_PLACEHOLDER"
  }
]
  • 受信レコードを異なる Iceberg テーブルにルーティングする場合

docs.aws.amazon.com



JSONQuery式を使用してFirehoseにルーティング情報を提供する

JSONQuery式を使用すると、Firehoseからシンプルな方法で複数のIcebergテーブルにデータを配信することができます。
JSONQuery式を利用してレコードから配信先の情報を抽出して、その配信先ごとにレコードをルーティングできます。

配信時に必要な情報は次の3つです。

  • Database Name:<配信先のデータベース名を指定>
  • Table Name:<配信先のテーブル名を指定>
  • Operation:<データの配信方法を指定(insert/update/delete)> ※デフォルトはinsert

これらの値には静的な値とレコードから取得した動的な値が設定できます。
仮に次のようなレコードがあった場合、

{
  "deviceId": "Device4567",
  "timestamp": "2023-11-28T10:40:00Z",
  "data": {
    "pressure": 1012.4,
    "location": {
      "zipcode": 24567
    }
  },
  "powerlevel": 82,
  "status": "online"
}

Database NameとOperationを静的な値、Table Nameを動的な値として設定するには次のように設定します。

Database Name : "IoTevents"
Table Name : .deviceId
Operation : "insert"

仮に次のようにレコードがネストされていた場合、

{
  "event": {
    "deviceId": "Device4567",
    "timestamp": "2023-11-28T10:40:00Z",
    "data": {
      "pressure": 1012.4,
      "location": {
        "zipcode": 24567
      }
    },
    "powerlevel": 82,
    "status": "online"
  }
}

ドット区切りで指定することでネストされたフィールド情報を取得することができます。

Database Name : "IoTevents"
Table Name :  .event.deviceId
Operation : "insert"

ただし、Operationにupdate/deleteを指定する場合は、Icebergテーブルの一意のキーを指定するか、Icebergで識別子フィールド ID を設定する必要があります。
これをしないとFirehoseはエラーが発生します。

When you specify the operation as update or delete, you must either specify unique keys for the destination table when you set up your Firehose stream, or set identifier-field-ids in Iceberg when you run create table or alter table operations in Iceberg. If you fail to specify this, then Firehose throws an error and delivers data to an S3 error bucket. 操作を「更新」または「削除」として指定する場合、Firehose ストリームを設定する際に目的のテーブルに一意のキーを指定するか、Iceberg でテーブルの作成または変更操作を実行する際に Iceberg で識別子フィールド ID を設定する必要があります。この設定を省略すると、Firehose はエラーを発生させ、データを S3 エラー バケットに送信します。

識別子フィールド IDがどういうものかについては、Icebergドキュメントの以下に記載があります。

スキーマは、オプションで、 プロパティを使用して、テーブル内の行を識別するプリミティブ フィールドのセットを追跡できますidentifier-field-ids(付録 C の JSON エンコードを参照)。 識別子フィールドが等しい場合、2つの行は「同じ」、つまり同じエンティティを表します。ただし、この識別子による行の一意性はIcebergによって保証または要求されておらず、処理エンジンまたはデータプロバイダーがそれを強制する責任があります。 識別子フィールドは構造体内でネストできますが、マップやリスト内ではネストできません。float、double、およびオプションのフィールドは識別子フィールドとして使用できません。また、ネストされたフィールドは、オプションの構造体内でネストされている場合、識別子フィールドとして使用できません。これは、識別子にnull値が含まれることを避けるためです。 iceberg.apache.org

こちらをざっと読む限りだと、同じレコードを指すフィールドとして識別子フィールド IDは機能し、Icebergとしてnull値が含まれることを避けるようにはなっている(?)が、一意性までは保証・要求されていないため、Icebergを利用する側で実装する責任がある、とあります。
つまりPKに類似した属性を持つ(ただ、一意性は持たない(保証しない))ので、このIDをキーにupdate/deleteを実施すると理解しました。

上記引用に記載されている「付録 C 」というのは以下になります。 iceberg.apache.org

識別子フィールド IDについて理解した上で実際にFirehoseに設定しようとすると、JQ設定に加えて「Unique key configuration(一意キー設定)」という項目があることに気付きました。

JQ設定と一意キー設定の違いについて、以下のドキュメントから確認してみます。 docs.aws.amazon.com

まずJQ設定はこちら

Apache Iceberg Tablesでは、受信レコードを異なる宛先テーブルにルーティングする方法と、挿入、更新、削除などの実行したい操作を指定する必要があります。 これは、FirehoseがJQ式を解析して必要な情報を取得できるようにすることで実現できます。詳細については、「JSONQuery式を使用してFirehoseにルーティング情報を提供する」を参照してください。

次に一意キー設定はこちら

複数のテーブルでの更新と削除 – ユニークキーとは、Apache Iceberg Tables 内の行を一意に識別する、ソースレコード内の1つ以上のフィールドです。 複数のテーブルで挿入のみのシナリオを使用する場合は、ユニークキーを設定する必要はありません。 特定のテーブルで更新と削除を行う場合は、必要なテーブルにユニークキーを設定する必要があります。 テーブルに行が欠落している場合、更新時に自動的に行が挿入されることに注意してください。 テーブルが1つしかない場合は、ユニークキーを設定できます。 更新操作の場合、Firehose は削除ファイルを配置し、その後に挿入を実行します。

ざっくり読み解くと、

  • JQ設定は「テーブルを動的に指定したい場合」と実装操作をINSERT以外の「UPDATE/DELETEにしたい場合」に利用
  • 一意キー設定は、INSERTの場合は「対象のデータベースとテーブルを指定」、UPDATE/DELETEの場合は「対象のデータベースとテーブルとユニークキーを指定」する際に利用

と理解しました。
JQ設定利用はチェックの有無で選択できるので、単一のテーブルにINSERTしたい場合は一意キー設定のみで問題なさそうです。
その他UPDATEを指定したケースでテーブル内に該当データがない場合はINESRTされたり、UPDATEの場合は一度削除ファイルが配置されてデータが追加される「DELTE/INSERT」の処理が行われる等の仕様についても記載されているため、そのあたりも知っておく必要があります。



AWS Lambda関数を使用してルーティング情報を提供する

受信したデータをシンプルに処理してルーティングするだけであれば JSONQuery式で十分事足りますが、もし受信データ内のデータを複雑に加工したり条件によって登録するデータを修正したい場合、Lambda関数を利用する必要があります。

FirehoseでLambda関数を利用して変換を行う場合、Lambda関数からのレスポンスには以下を含める必要があります。

  • recordId:入力ストリームレコード
  • result:変換結果 -> Ok or Dropped or ProcessingFailed
  • data:Lambda関数のBase64エンコードされた変換出力(ソースがKinesis Data StreamsかDirect PUTの場合のみ)
  • KafkaRecordValue:Lambda関数のBase64エンコードされた変換出力(ソースがAmazon MSKの場合のみ)

詳細な説明は以下に記載があります。 docs.aws.amazon.com

レスポンス例は以下になります。

{
  "recordId": "49655962066601463032522589543535113056108699331451682818000000",
  "result": "Ok",
  "data": "1IiwiI6ICJmYWxsIiwgImdgU21IiwiI6ICJmYWxsIiwg==tcHV0ZXIgU2NpZW5jZSIsICJzZW1"
}

また、Lambda関数から宛先Icebergテーブルのルーティング情報を指定したい場合、metadataを追加することで指定が可能です。
この指定方法は先ほど説明した JSONQuery式と同じ項目になります。

 {
    "recordId": "49655962066601463032522589543535113056108699331451682818000000",
    "result": "Ok",
    "data": "1IiwiI6ICJmYWxsIiwgImdgU21IiwiI6ICJmYWxsIiwg==tcHV0ZXIgU2NpZW5jZSIsICJzZW1",
    "metadata":{
        "otfMetadata":{
            "destinationTableName":"Device1234",
            "destinationDatabaseName":"IoTevents",
            "operation":"insert"
        }
    }
}

JSONQuery式とLambda関数を併用した場合は次のように処理が行われます。

  1. Lambda関数のレスポンスにmetadataに適切なルーティング設定があるかチェック
  2. なければ JSONQuery式に適切なルーティング設定があるかチェック
  3. なければ単一のテーブルとみなして、一意キー設定に適切なルーティング設定があるかチェック
  4. どれにも一致しない場合、指定したS3エラーバケットにデータを配信


データカタログを接続する

Apache IcebergはIcebergテーブルの書き込みにデータカタログが必要になります。
FirehoseはGlue Data Catalogとデフォルトで統合されており、Firehoseの設定画面から対象AWSアカウント(デフォルトはFirehoseと同じAWSアカウント)と対象リージョン(デフォルトはFirehoseと同じリージョン)を選択することができます。

もしS3 Tablesのテーブルにデータを投入したい場合、S3 Tableバケット名をCatalogで選択する必要があります。


JQ式を設定する

ここまでの内容でFirehoseでのIcebergテーブルへのデータ投入方法理解できたところで、実際に構築して動かしてみます。

まずは事前にAthenaからIcebergテーブルを作成しておきます。
以下ドキュメントにあるサンプル通りにdeviceIdをテーブル名に、デバイスデータを登録するIcebergテーブルを作成します。
後の方でUPDATEとDELETEも確認するため、ID列も追加します。

Route incoming records to different Iceberg tables - Amazon Data Firehose

  • device1234テーブル
CREATE TABLE device1234 (
    id int,
    timestamp date,
    temperature double,
    latitude double,
    longitude double,
    powerlevel int,
    status string
)
LOCATION 's3://test-tmp-20250602/ice_warehouse/iceberg_db/device1234/'
TBLPROPERTIES (
    'table_type' = 'ICEBERG'
);

  • device4567テーブル
CREATE TABLE device4567 (
    id int,
    timestamp date,
    temperature double,
    latitude double,
    longitude double,
    powerlevel int,
    status string
)
LOCATION 's3://test-tmp-20250602/ice_warehouse/iceberg_db/device4567/'
TBLPROPERTIES (
    'table_type' = 'ICEBERG'
);

エラー時にデータが出力されるS3パスも作成しておきます。

まずはJQ式でINSERTを試してみます。
JQ式でデータベース名とテーブル名を指定するため、一意キー設定は一旦デフォルトのままとします。

  • JQ式
    • Database expression: "iceberg_db"
    • Table expression: .deviceId
    • Operation expression - optional: "insert"
  • 一意キー設定
    • "DestinationDatabaseName""DATABASE_PLACEHOLDER"
    • "DestinationTableName""TABLE_PLACEHOLDER"
    • "UniqueKeys""COLUMN_PLACEHOLDER"
    • "S3ErrorOutputPrefix""OPTIONAL_PREFIX_PLACEHOLDER"

今回は検証用途なので、早くS3に配信させるためにBuffer hintsからBuffer intervalの設定値を300 seconds→60 secondsに変更しておきます。

バックアップ設定も必須で必要のようなので、エラー時のデータ出力先Prefixを指定しておきます。

それ以外の設定値は、IAMロールを除いてデフォルトとします。

IAMロールにアタッチしているポリシーで許可している権限は以下です。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "glue:GetTable",
                "glue:GetDatabase",
                "glue:UpdateTable"
            ],
            "Resource": [
                "arn:aws:glue:*:*:catalog",
                "arn:aws:glue:*:*:database/*",
                "arn:aws:glue:*:*:table/*/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:AbortMultipartUpload",
                "s3:GetBucketLocation",
                "s3:GetObject",
                "s3:ListBucket",
                "s3:ListBucketMultipartUploads",
                "s3:PutObject",
                "s3:DeleteObject"
            ],
            "Resource": [
                "arn:aws:s3:::*",
                "arn:aws:s3:::*/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kms:Decrypt",
                "kms:GenerateDataKey"
            ],
            "Resource": [
                "arn:aws:kms:*:*:key/*"
            ],
            "Condition": {
                "StringEquals": {
                    "kms:ViaService": "s3.ap-northeast-1.amazonaws.com"
                },
                "StringLike": {
                    "kms:EncryptionContext:aws:s3:arn": "arn:aws:s3:::*"
                }
            }
        },
        {
            "Effect": "Allow",
            "Action": [
                "lakeformation:GetDataAccess"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "logs:PutLogEvents",
                "logs:CreateLogGroup",
                "logs:CreateLogStream"
            ],
            "Resource": [
                "*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "lambda:InvokeFunction",
                "lambda:GetFunctionConfiguration"
            ],
            "Resource": [
                "arn:aws:lambda:*:*:function:*:*"
            ]
        }
    ]
}

Firehoseストリームが作成できました。

WSL上でデータ投入スクリプトを作成します。

$ cat ./generate_device_data.sh
#!/bin/bash

# Delivery stream name
STREAM_NAME="PUT-ICE-JQ-STREAM"

# Target region
REGION_NAME="ap-northeast-1"

# Device IDs (最大2つまで指定)
# 例: DEVICE_IDS=("Device001")
# 例: DEVICE_IDS=("Device001" "Device002")
DEVICE_IDS=("device1234")

# Function to generate random decimal number
random_decimal() {
    local min=$1
    local max=$2
    local range=$(echo "$max - $min" | bc -l)
    local random_fraction=$(echo "scale=4; $RANDOM / 32767" | bc -l)
    local result=$(echo "scale=4; $min + $range * $random_fraction" | bc -l)
    printf "%.4f" $result
}

# Device index for alternating selection
DEVICE_INDEX=0

# ID counters for each device (associative array)
declare -A DEVICE_COUNTERS

# Initialize counters for all devices in DEVICE_IDS
for device in "${DEVICE_IDS[@]}"; do
    DEVICE_COUNTERS["$device"]=1
done

# Function to generate device data
generate_device_data() {
    # Select device ID in alternating order
    local device_id=${DEVICE_IDS[$DEVICE_INDEX]}

    # Get current ID for this device
    local id=${DEVICE_COUNTERS[$device_id]}

    local timestamp=$(date -u +"%Y-%m-%d")
    local temperature=$(random_decimal -20 50)
    local latitude=$(random_decimal -90 90)
    local longitude=$(random_decimal -180 180)
    local powerlevel=$((RANDOM % 101))
    local status="online"

    local json_data="{\\\"deviceId\\\":\\\"$device_id\\\",\\\"id\\\":$id,\\\"timestamp\\\":\\\"$timestamp\\\",\\\"temperature\\\":$temperature,\\\"latitude\\\":$latitude,\\\"longitude\\\":$longitude,\\\"powerlevel\\\":$powerlevel,\\\"status\\\":\\\"$status\\\"}"

    echo "{\"Data\":\"$json_data\"}"
}

echo "Starting data generation for stream: $STREAM_NAME"
echo "Press Ctrl+C to stop"

# Continuous data generation and sending
while true; do
    # Get current device for counter update
    current_device=${DEVICE_IDS[$DEVICE_INDEX]}

    data=$(generate_device_data)

    aws firehose put-record \
        --delivery-stream-name "$STREAM_NAME" \
        --record "$data" \
        --cli-binary-format raw-in-base64-out \
        --region "$REGION_NAME"

    echo "Sent: $data"

    # Increment counter for current device
    DEVICE_COUNTERS[$current_device]=$((${DEVICE_COUNTERS[$current_device]} + 1))

    # Update index for next iteration (cycle between 0 and 1)
    DEVICE_INDEX=$(((DEVICE_INDEX + 1) % ${#DEVICE_IDS[@]}))

    sleep 1
done

スクリプトを実行します。(デプロイ権限を持つAWS認証情報がAWSCLIで登録されている前提)

$ chmod +x ./generate_device_data.sh
$ ./generate_device_data.sh

3レコード送信されたところでCtrl+Cでスクリプトを止めます。

$ ./generate_device_data.sh
Starting data generation for stream: PUT-ICE-JQ-STREAM
Press Ctrl+C to stop
{
    "RecordId": "3M3LSRNrF3pjs1u+cTRI2J16ioBb5VHX0inmazwqK10BIDx+gzEzyrrPrIyJDtZvXr+87wb74r/dJ2eEH9StM9OHu3cFDaO1UvLcQ+j+6MQX8Z+E+MAkS03XjrGEeMbmP4cey7jCqpS8ZdI7cnAMJqdY5FOzcRYn8OiJQWYAAAs5EAjsr2MyJyIUVNGxp/70KUemxO9YheytHjca7GIoKBk+385XbmLo",
    "Encrypted": false
}
Sent: {"Data":"{\"deviceId\":\"device1234\",\"id\":1,\"timestamp\":\"2025-06-06\",\"temperature\":44.1200,\"latitude\":-33.8760,\"longitude\":-99.8640,\"powerlevel\":75,\"status\":\"online\"}"}
{
    "RecordId": "ZbD12PmRKPvceI6Sl+GjIYhnIUQMrQNSSnjPSCr2R3x5CeQvDcaZQ96oxzXI61UajcDkT2Ku1t0kEFTSohw1TFJN1roUp3TzJnQx26oXkt81jPDzoAZncsV0cIa15vGlPvOvTjDC2yOCp2rN28gb0DSwPw9G9YN/UFcqzZdL1UZgqQzGXxkeQ7bXiIKMXpdTmanVKz132DL5GIJzmbfE5khAV/GObqJJ",
    "Encrypted": false
}
Sent: {"Data":"{\"deviceId\":\"device1234\",\"id\":2,\"timestamp\":\"2025-06-06\",\"temperature\":6.2710,\"latitude\":-66.7440,\"longitude\":120.5640,\"powerlevel\":73,\"status\":\"online\"}"}
{
    "RecordId": "kfr3KDHq6UBohWJ2GyqivKfxJkh/cBdcXDE9BqCto9UcH79jL0s1kjvJpvlAScAb9qjdZECMdex3k+U+dp3N5WZAkCOI0DlGCHm0xQ6oy+p87iI/VZmWmY7vbxG7qctA832dN+va1DI3paxVaRIY53fVLrYyasbGHS7YLb8LdcNaaBD0PRmjkHc23EdRGBvWttkhfK8zvLkZC3A4CU6Bns8f6ld0A5ou",
    "Encrypted": false
}
Sent: {"Data":"{\"deviceId\":\"device1234\",\"id\":3,\"timestamp\":\"2025-06-06\",\"temperature\":44.4490,\"latitude\":47.8440,\"longitude\":-83.8800,\"powerlevel\":55,\"status\":\"online\"}"}
^C

Athenaからクエリするとデータが自動で入っていることを確認できました。

SELECT * FROM "iceberg_db"."device1234" limit 20


次はスクリプトのdeviceIdを2つにして再度実行し、2テーブルに出力されるか確認します。
先に対象のテーブルのデータを全て消しておきます。

DELETE FROM device1234;
DELETE FROM device4567;

スクリプトを修正します。

~
# Device IDs (最大2つまで指定)
# 例: DEVICE_IDS=("Device001")
# 例: DEVICE_IDS=("Device001" "Device002")
DEVICE_IDS=("device1234" "device4567")
~

スクリプトを実行した結果は以下です。2つのdeviceIdデータがに交互に送信されています。

$ ./generate_device_data.sh
Starting data generation for stream: PUT-ICE-JQ-STREAM
Press Ctrl+C to stop
{
    "RecordId": "xRCko5KhXOTt0+db3NlDFJDMdaTojLjMZaoJMuvKSddNivryhTohag5CvIIF7CUhDbkffl+8JR6l2PR6RkR0MPZ6sex7KwtYd23lWbeDKppd9NBSf862wvDOzzQwXYR3Yw7SkyqEVZ02+2KzRrsOKqEJJ6KfpH00rxXWVfzOldlZ67Ovcf6pah2hwvBvfLsXc5ltUxCwz1LH/DD7tkYQAe30nfbjfBZb",
    "Encrypted": false
}
Sent: {"Data":"{\"deviceId\":\"device1234\",\"id\":1,\"timestamp\":\"2025-06-06\",\"temperature\":-1.1420,\"latitude\":24.1560,\"longitude\":-57.6000,\"powerlevel\":76,\"status\":\"online\"}"}
{
    "RecordId": "8sbStIoeXRdQhfvDv7yU+CXVwpp9UJubJfevQVTKJ7dFForlg/3GUM/CbFvaQbRDyFP5CSyX1tZttolHkRx16nPBEWStB1cFs+GEdAHQgRkZMFH5WBc9z46R0qUjn4vsA+RFtwifB1W4n5yCIp/dn7ixd+ThvtdQsO7QmYdN1vCbrwrgev+z/VTujyqlPu5I/tu2qnZ821wofosF+ATTVFs4twU7MWl7",
    "Encrypted": false
}
Sent: {"Data":"{\"deviceId\":\"device4567\",\"id\":1,\"timestamp\":\"2025-06-06\",\"temperature\":-0.0220,\"latitude\":6.6240,\"longitude\":7.1280,\"powerlevel\":69,\"status\":\"online\"}"}
{
    "RecordId": "pVSUgpatOZICYI1Aw4qI/0Jdvd9ukhTq7sVV2IXUA+k4KsDfAqaPI4ACgC4dOV180E7+hgviPIlcrokpAtKvlEjfoURZPJgR1S+a/YWaohAgCh3dTuPhkxRkXPID5rkBjtLj55RzUtTCqmRtjyV0AxEdyiwoP44gYajLdXFvMaQe6SCHo0GiiQfXWDgfyrrfXRteVat/r9AIxFBrYidbR1kjRTr9oWIB",
    "Encrypted": false
}
Sent: {"Data":"{\"deviceId\":\"device1234\",\"id\":2,\"timestamp\":\"2025-06-06\",\"temperature\":-12.1880,\"latitude\":-45.9000,\"longitude\":154.8720,\"powerlevel\":50,\"status\":\"online\"}"}
{
    "RecordId": "7sJOfab6SEXZwZaqPSlrmXRRdFHQhmtucnX5jItMREV9WmGltgXGYrw0wbHPtE8qCdvPU9wBxHTJh9+sH8XyWyjlm3/cxTE3m98GcHXtU1z/9vYenNb1Lbkz8fNcivCcjad7o5gpCn5+AURblTdAaO8Y9tWR6PBrgijLeV9cEcKcuK+LPWVlEGPLCRnv+2kXXMesmfeuR2/iDtSuvr9lkcQwV0J4byel",
    "Encrypted": false
}
Sent: {"Data":"{\"deviceId\":\"device4567\",\"id\":2,\"timestamp\":\"2025-06-06\",\"temperature\":-17.9070,\"latitude\":21.7260,\"longitude\":112.6080,\"powerlevel\":50,\"status\":\"online\"}"}
{
    "RecordId": "On/27OY/uPdb+gq/WzS+ISqU82HwilodIyTIAzRlLGa0iHL5FnV96i6g79f4fGjqLUa8ji9Ejlgu5d885kOsOOMayUmhi+4pfpi2UjSQT32ytV+MVA9yUbNMVIlTwe+TYec88H21Fn9MgcuPCwYDm43nl/YFX8lijGqgslE5IGLiHs5yy5pyt7vPW6foF6yGm2kIEx/J3GGPW5rp342yCD0PCms4lSMm",
    "Encrypted": false
}
Sent: {"Data":"{\"deviceId\":\"device1234\",\"id\":3,\"timestamp\":\"2025-06-06\",\"temperature\":2.8270,\"latitude\":47.5200,\"longitude\":114.5520,\"powerlevel\":14,\"status\":\"online\"}"}
{
    "RecordId": "Ui59k/gG+H5vNd3X2tgvYiQiqyTn8y25B1dDRVqnJydVJEZ3N/t7Hxg8XQl59EYBg/D5gWrp8pQAVS92Z5Qk3UnVxvPJ1ima5DRiXM7eXoTBiPT4diaGusm4/RBeCYYBH2o09NCaYeHMY/2Ys4gEmAWmWPQG/JRDJWbxs9hJ5jP8w/pbU23uxyN998afkAlEtRXA1xt7Q62GevEN8ok2LR2KLWfe7kVB",
    "Encrypted": false
}
Sent: {"Data":"{\"deviceId\":\"device4567\",\"id\":3,\"timestamp\":\"2025-06-06\",\"temperature\":-12.7620,\"latitude\":-44.5680,\"longitude\":155.4840,\"powerlevel\":22,\"status\":\"online\"}"}
^C

Athenaから確認すると送信したデータが2つのテーブルにそれぞれ追加されていることを確認できました。

  • device1234テーブル
SELECT * FROM "iceberg_db"."device1234" limit 20

  • device4567テーブル
SELECT * FROM "iceberg_db"."device4567" limit 20


次はJQ式と同じことをLambda関数で行います。

まず以下のSQLでテーブルのデータを消しておきます。

DELETE FROM device1234;
DELETE FROM device4567;

Python3.13のLambda関数を作成し、以下のコードをデプロイします。
ここでは、受け取った受信データのstatusカラムを"offline"に書き換える処理を行っています。
また、metadataパラメータでJQ式と同じ設定を入れています。

import json
import base64
import logging

# ログ設定
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event, context):
    """
    Firehose Data Transformation Lambda Function (最小限実装)
    - statusを"online"から"offline"に変更
    - deviceIdに基づいてIcebergテーブルをルーティング
    """
    
    logger.info(f"処理開始 - レコード数: {len(event['records'])}")
    
    output = []
    success_count = 0
    error_count = 0
    
    for record in event['records']:
        record_id = record['recordId']
        
        try:
            # Base64デコードしてJSONデータを取得
            payload = base64.b64decode(record['data']).decode('utf-8')
            data = json.loads(payload)
            
            logger.info(f"レコード {record_id}: 元データ - {data}")
            
            # statusを"online"から"offline"に変更
            original_status = data.get('status', 'unknown')
            data['status'] = 'offline'
            
            # deviceIdに基づいてテーブル名を決定
            device_id = data.get('deviceId', '')
            if device_id == 'device1234':
                table_name = 'device1234'
            elif device_id == 'device4567':
                table_name = 'device4567'
            else:
                # 想定外のdevice_idの場合はエラーとして処理
                logger.error(f"レコード {record_id}: 想定外のdeviceId - {device_id}")
                output_record = {
                    'recordId': record_id,
                    'result': 'ProcessingFailed',
                    'data': record['data']
                }
                output.append(output_record)
                error_count += 1
                continue
            
            # 変更されたデータをエンコード
            output_data = json.dumps(data, ensure_ascii=False)
            encoded_data = base64.b64encode(output_data.encode('utf-8')).decode('utf-8')
            
            # メタデータ付きでレスポンス作成
            output_record = {
                'recordId': record_id,
                'result': 'Ok',
                'data': encoded_data,
                'metadata': {
                    'otfMetadata': {
                        'destinationTableName': table_name,
                        'destinationDatabaseName': 'iceberg_db',
                        'operation': 'insert'
                    }
                }
            }
            
            logger.info(f"レコード {record_id}: 処理成功 - deviceId: {device_id}, status: {original_status}→offline, テーブル: {table_name}")
            success_count += 1
            
        except Exception as e:
            # エラー時は元データをそのまま返す
            logger.error(f"レコード {record_id}: 処理エラー - {str(e)}")
            output_record = {
                'recordId': record_id,
                'result': 'ProcessingFailed',
                'data': record['data']
            }
            error_count += 1
        
        output.append(output_record)
    
    logger.info(f"処理完了 - 成功: {success_count}, エラー: {error_count}")
    
    return {'records': output}

Lambda関数にアタッチするIAMロールには、最低限以下の権限が含まれるようにします。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "arn:aws:logs:*:*:*"
        }
    ]
}

Firehoseの方は次のように修正します。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "glue:GetTable",
                "glue:GetDatabase",
                "glue:UpdateTable",
                "glue:CreateTable",
                "glue:BatchCreatePartition",
                "glue:BatchDeletePartition",
                "glue:BatchUpdatePartition",
                "glue:GetPartition",
                "glue:GetPartitions",
                "glue:CreatePartition",
                "glue:DeletePartition",
                "glue:UpdatePartition"
            ],
            "Resource": [
                "arn:aws:glue:*:*:catalog",
                "arn:aws:glue:*:*:database/*",
                "arn:aws:glue:*:*:table/*/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:AbortMultipartUpload",
                "s3:GetBucketLocation",
                "s3:GetObject",
                "s3:ListBucket",
                "s3:ListBucketMultipartUploads",
                "s3:PutObject",
                "s3:DeleteObject"
            ],
            "Resource": [
                "arn:aws:s3:::*",
                "arn:aws:s3:::*/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kms:Decrypt",
                "kms:GenerateDataKey"
            ],
            "Resource": [
                "arn:aws:kms:*:*:key/*"
            ],
            "Condition": {
                "StringEquals": {
                    "kms:ViaService": "s3.ap-northeast-1.amazonaws.com"
                },
                "StringLike": {
                    "kms:EncryptionContext:aws:s3:arn": "arn:aws:s3:::*"
                }
            }
        },
        {
            "Effect": "Allow",
            "Action": [
                "lakeformation:GetDataAccess"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "logs:PutLogEvents",
                "logs:CreateLogGroup",
                "logs:CreateLogStream"
            ],
            "Resource": [
                "*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "lambda:InvokeFunction",
                "lambda:GetFunctionConfiguration"
            ],
            "Resource": [
                "arn:aws:lambda:ap-northeast-1:*:function:*"
            ]
        }
    ]
}

Lambda関数による変換設定とJQ式を無効化した新しいFirehoseストリームを作成します。

ストリーム名を変更したBashスクリプトを新しく作成します。

$ cat generate_device_data_lambda.sh
#!/bin/bash

# Delivery stream name
STREAM_NAME="PUT-ICE-Lambda-STREAM"

# Target region
REGION_NAME="ap-northeast-1"
~

スクリプトを実行した結果は以下です。2つのdeviceIdデータがに交互に送信されています。

$ ./generate_device_data_lambda.sh
Starting data generation for stream: PUT-ICE-Lambda-STREAM
Press Ctrl+C to stop
{
    "RecordId": "Bb4bbKvR5V6o3RQuFTtqk+BG/5Lu2wTf+In7jjiGMqlB6jXduadRAhjprXpaMr3Jht0eQ1Hpkf0jzzgFDUqBxlp7b8nvujp24Mrb2OJFtf9reIwX6Pjs3Pyo2J7jC4kqttUKRy9izYDAt5NQpVVReKN1+uuBJ2fmrNlmwFs1t4tciFN+OPUWSOsQwcTS8mqFGYgJq0EEMvbh6SRTimNHkXh2nJmtqecw",
    "Encrypted": false
}
Sent: {"Data":"{\"deviceId\":\"device1234\",\"id\":1,\"timestamp\":\"2025-06-06\",\"temperature\":35.2720,\"latitude\":-47.5740,\"longitude\":177.6600,\"powerlevel\":38,\"status\":\"online\"}"}
{
    "RecordId": "JxvHhipsPhchuSfT21Mj/M6Q2m2Y0q6qA2e/6tNgTpiSD3vlRBrk+L5ELI1E7lYBl924skDkThngzoozts54Xv5be1pQ9j+h2J4BLmYrDdaVpn4XhTq73oc1n9bUgldVTDhTrj0y4VQPOUUoAKdMXhEtZoNDCc6R5lh7W/fFh7HV4HZIHwo4dmUB68g5tIUtDU6sxDzwdzsu1eJCq3B31V6Lh/R+WGjQ",
    "Encrypted": false
}
Sent: {"Data":"{\"deviceId\":\"device4567\",\"id\":1,\"timestamp\":\"2025-06-06\",\"temperature\":-15.1980,\"latitude\":36.8280,\"longitude\":-31.7880,\"powerlevel\":33,\"status\":\"online\"}"}
{
    "RecordId": "8Q8siPc+oPEcigtA9ddrz2dm6YtiAzP7rlsGssyPgU7JYm7eWZACRFwdm8quIpwrEMAAlvRv9IPaOtIahpZSiaDc0nP+R8p0jt+uBlGe/0enIeOyv50DyM3UJQ6gCViFFIcHvvKyIfnM7vh9aoh6pH3NvndSSIjzbgPhh4YwyzwyGCafIzfwo8nd7Z+4JpSrTWj/i1xsB5ifozbdgoHo/82LdbV55uSM",
    "Encrypted": false
}
Sent: {"Data":"{\"deviceId\":\"device1234\",\"id\":2,\"timestamp\":\"2025-06-06\",\"temperature\":29.9030,\"latitude\":0.4500,\"longitude\":0.9360,\"powerlevel\":91,\"status\":\"online\"}"}
{
    "RecordId": "iT3n8Ioal2xbMT0VpitXe93x2NiMgHDtZtwZezlG6EomUXNGuej0jsAI4nni3vG6eKotd9rDycf84+R84U8nJPqgnM8C9BHA0irwiaUolklRPI72BE7r4tjZqGVpjjf/UeOfBDl4cSsVmeszFImhuHDAOp0wwqBoS+sFezaRWQgnHVBiPGEPy3bdtGM+2R+l3vRf/obs3Fg6K6WH+7Whp3ne9pT+LbzE",
    "Encrypted": false
}
Sent: {"Data":"{\"deviceId\":\"device4567\",\"id\":2,\"timestamp\":\"2025-06-06\",\"temperature\":-3.8160,\"latitude\":-45.2340,\"longitude\":-22.2120,\"powerlevel\":3,\"status\":\"online\"}"}
{
    "RecordId": "VfSpAWdJWlbq6YTJ0hATN0ulmBpbtm9WhXG7HIrYcErxT8U8wKJ8lUzL9kM1Z3E7oCOLroIWzIWhO6bIQkXzkaLVJORD+3hWE15xg1PSA7WhQiMJCvrmZqXuw/ZFZxDRXoIuN/G2d/3XuDFll6h1J4GaY18dRXR7alnAIZBoFGM1//L2GQsqZTxo7LkMXIg1wap99doKVZ/3/m0Hbq7Z2DtddulyKmNO",
    "Encrypted": false
}
Sent: {"Data":"{\"deviceId\":\"device1234\",\"id\":3,\"timestamp\":\"2025-06-06\",\"temperature\":3.1980,\"latitude\":-66.9600,\"longitude\":-151.3800,\"powerlevel\":28,\"status\":\"online\"}"}
{
    "RecordId": "OX7gXCPXZTqh4LEkgxX3n8Omkim8AweiYL8OrCFDSIqtHB2wG14OkPnlqP50KS0g6AYMr085rp5i96E3GnuzS3cYNiB17r1UJP6xRcZwdpYdpYaN3RwGYlHT9WtEI7628QwX+tuNGQVjcoUjGvtNjh+GcMTop4UW0Pwvi0YzRY/nF0GpRppNqXxAIPUxcR/EtRHbotw591Et9QXy6Zg1bOxSX7FIt2en",
    "Encrypted": false
}
Sent: {"Data":"{\"deviceId\":\"device4567\",\"id\":3,\"timestamp\":\"2025-06-06\",\"temperature\":-9.4650,\"latitude\":-1.8000,\"longitude\":60.8040,\"powerlevel\":79,\"status\":\"online\"}"}
^C

Athenaから確認すると送信したデータが2つのテーブルにそれぞれ追加されていることを確認できました。
想定通りstatusカラムは"offline"に書き換わっていることが確認できます。

  • device1234テーブル
SELECT * FROM "iceberg_db"."device1234" limit 20

  • device4567テーブル
SELECT * FROM "iceberg_db"."device4567" limit 20


一意のキーを設定する

ここまででJQ式とLambdaを使用したデータのINSERTを試してきました。
次は一意キー設定を利用したデータのUPDATEとDELETEを試してみようと思います。

まずはUPDATEからです。

JQ式でUPDATEを行うFirehoseストリームを新規で作成します。
JQ式でOperation expression - optional"update"を設定し、一意キー設定で"UniqueKeys"識別子フィールド IDとして"id"カラムを指定します。

一意キー設定のDB名やテーブル名をデフォルト値にしていたら次のエラーが出ました。
どうやら一意キー設定のDB名・テーブル名は存在チェックがされているようです。

Role arn:aws:iam::123456789123:role/test-data-firehose-role is not authorized to perform: glue:GetTable for the given table or the table does not exist.

以下のようにデータを追加するDB名・テーブル名を記載することでエラーなくストリームが作成できました。

[
  {
    "DestinationDatabaseName": "iceberg_db",
    "DestinationTableName": "device1234",
    "UniqueKeys": [
      "id"
    ]
  },
  {
    "DestinationDatabaseName": "iceberg_db",
    "DestinationTableName": "device4567",
    "UniqueKeys": [
      "id"
    ]
  }
]

まずUPDATEを指定した状態で、device1234テーブルに対象レコードがない場合にINSERTされるかを確認します。
id=4のレコードを手動送信します。

aws firehose put-record \
    --delivery-stream-name "PUT-ICE-JQ-UPDATE-STREAM" \
    --record "{\"Data\":\"{\\\"deviceId\\\":\\\"device1234\\\",\\\"id\\\":4,\\\"timestamp\\\":\\\"$(date -u +"%Y-%m-%d")\\\",\\\"temperature\\\":25.5,\\\"latitude\\\":-35.123,\\\"longitude\\\":145.678,\\\"powerlevel\\\":75,\\\"status\\\":\\\"online\\\"}\"}" \
    --cli-binary-format raw-in-base64-out \
    --region "ap-northeast-1"

"id=4"、"status=online"のレコードが想定通り追加されました。

次はdevice4567テーブルに同じid値を持つレコードがある場合にUPDATEされるかを確認します。

aws firehose put-record \
    --delivery-stream-name "PUT-ICE-JQ-UPDATE-STREAM" \
    --record "{\"Data\":\"{\\\"deviceId\\\":\\\"device4567\\\",\\\"id\\\":3,\\\"timestamp\\\":\\\"$(date -u +"%Y-%m-%d")\\\",\\\"temperature\\\":18.2,\\\"latitude\\\":12.345,\\\"longitude\\\":-67.890,\\\"powerlevel\\\":89,\\\"status\\\":\\\"online\\\"}\"}" \
    --cli-binary-format raw-in-base64-out \
    --region "ap-northeast-1"

実行前のdevice4567テーブルの内容はこちらです。
"id=3"のレコードの内容を更新します。

少し待って"id=3"をレコードを見ると更新されていました。

以下SQLでテーブルのスナップショットのコミット履歴を取得してみると、"overwrite"となっていることが確認できました。

SELECT 
    snapshot_id,
    committed_at,
    operation,
    summary
FROM "iceberg_db"."device4567$snapshots"
ORDER BY committed_at DESC;

"overwrite"コミットのサマリをJSON成形したものが以下になります。
"added-records": 1"added-delete-files": 1となっていることから、DELETE/INSERTが行われたことが読み取れます。

{
  "added-data-files": 1,
  "total-equality-deletes": 1,
  "added-equality-delete-files": 1,
  "added-records": 1,
  "added-delete-files": 1,
  "total-records": 4,
  "changed-partition-count": 1,
  "total-position-deletes": 0,
  "added-files-size": 2552,
  "added-equality-deletes": 1,
  "total-delete-files": 1,
  "total-files-size": 4807,
  "total-data-files": 2,
  "FIREHOSE-INTERNAL-CHECKPOINT": "000000000000000001,,,000000000000000000"
}


再試行期間を指定する

FirehoseにはIcebergテーブルへのデータ配信が失敗したときに再試行する機能があります。
デフォルトでは再試行間隔は300秒間隔になっているので、必要に応じて調整する必要があります。


失敗した配信または処理の処理

再試行期間が経過後にストリームの処理・配信が失敗した場合、バックアップ設定で設定されているS3バケットに対してデータが出力されるようになっています。
Firehoseストリームの作成画面だと以下の箇所で設定が可能です。


バッファヒントを設定する

Firehoseは受信データをバッファリングしてからIcebergテーブルに配信しますが、その配信タイミングとしては、バッファリングサイスを満たした場合かバッファリング時間を経過した場合の早い方で配信されます。
設定範囲は以下になります。

  • バッファサイズ:1~128MB
  • バッファ間隔:0~900秒

バッファリングの値を大きくすると

  • S3への書き込み回数が減る
  • データファイルのサイズが大きくなるため圧縮コストが削減される
  • クエリ実行時間が短縮される

というメリットがある反面、配信までのレイテンシーは高くなります。
バッファリングの値を小さくするとレイテンシーは低くなりますが、S3の書き込み回数が増えたりクエリ実行時間が増加したり等のデメリットがあります。
そのため、そのあたりのバランスを考慮して設定する必要があります。


詳細設定を構成する

Firehoseについての詳細な設定項目については以下に記載があります。 docs.aws.amazon.com

FirehoseからIcebergテーブルにデータを配信する場合はいくつかの前提条件を満たす必要があるため、設定がうまくいかない時は以下を確認すると良いかもしれません。 docs.aws.amazon.com


次回

今回は「Working with Apache Iceberg tables by using Amazon Data Firehose」について確認しました。
次回は以下ページを見ていきながら、可能なら実際に使い方も試してみたいと思います。


終わりに

今回は思ったよりもFirehoseの検証に時間がかかったのもあって、結構なボリュームになってしまいました。

あと本文では触れませんでしたが、Firehoseの料金ページを見ると送信元先のデータ取り込み・処理などには課金されますが、ストリームを作成しているだけでは料金が発生しないことに気付きました。
複雑な変換処理などをするともちろん料金は高くなりますが、単純にIcebergテーブルへデータを取り込むツールと考えるとGlueよりも選択肢に上がるのではないかと思いました。(思っただけです)
ただ、以下にある通り、Icebergテーブルへの配信は他の送信先に比べて若干高いので、利用時にはちゃんと試算・比較するのがよいでしょう。

  • 送信先としてのApache Icebergテーブル
    • 最初の250TB/月: $0.093
    • 次の750TB/月: $0.079
    • 1PB/月を超えた場合: $0.066
  • その他の送信先
    • 取り込み、GBあたり
      • 最初の500TB/月: $0.036
      • 次の1.5PB/月: $0.031
      • 次の3PB/月: $0.025
      • 5PB/月を超えた場合: お問い合わせ
    • 形式の変換、GBあたり: $0.022
    • VPC配信
      • VPC で宛先に配信される GB あたり: $0.01
      • 1 時間あたり AZ ごとの VPC 配信: $0.014
    • Amazon S3 配信の動的パーティショニング
      • 動的パーティショニングを通じて処理される GB あたり: $0.032
      • 配信された 1,000 S3 オブジェクトあたり: $0.008
      • JQ 処理、1 時間あたり (オプション): $0.112
    • CloudWatch Logs の解凍 (解凍した GB あたり): $0.00403

aws.amazon.com