はじめに
こんにちは、半田です。
前回に引き続き、AWS 規範ガイダンス「Using Apache Iceberg on AWS」を読んでメモをしていきたいと思います。
前回は「Getting started with Apache Iceberg tables in Amazon Athena SQL」のページを読みました。
今回は「Working with Apache Iceberg in AWS Glue」を読んでいきます。
前の記事の冒頭にも書きましたが、こういうメモをあまり書いたことがないので、「それまんま引用じゃね?」というところがあってもご容赦ください。
あと自分の理解を書いていますが、もしかしたら間違っている可能性もあるので、正しい情報は公式の各ドキュメントを参照するようにしてください。
目次
Working with Apache Iceberg in AWS Glue(AWS GlueでApache Icebergを使う)
このページではAWS GlueでApache Icebergを利用する際の使い方について紹介されています。
AWS Glue(以後Glue)はデータの変換やメタデータを管理する機能などが提供されるサービスで、データ基盤におけるETLを行うために利用されます。
Glue JobではApache SparkやPythonを利用したプログラムを書くことによってETL処理を実現します。
GlueでIcebergテーブルを操作するためには専用のコネクタが必要で、既に組み込まれているものを利用するか、カスタムコネクタを利用することが可能です。
ネイティブIcebergインテグレーションを使用する
2025年6月4日時点ではGlueバージョンは1.0 , 2.0 , 3.0 , 4.0 , 5.0の5つから選択できるようになっており、IcebergをはじめとするOpen Table Formatに対応しているのは3.0以上になります。
Glue JobでIcebergを利用するには
- [ジョブの詳細]タブを選択する
- [詳細プロパティ]の[ジョブパラメータ]で以下の値を設定する
--datalake-formats iceberg
Notebookを利用する場合は最初のセルで以下を実行して設定が可能です。
%%configure { "--conf" : <job-specific Spark configuration discussed later>, #ここは後述 "--datalake-formats" : "iceberg" }
Glueにバージョンがあるように、Apache Icebergにもバージョンがあります。
GlueバージョンとサポートされるIcebergバージョンの対応表は以下になります。
docs.aws.amazon.com
2025年6月4日時点のApache Icebergの最新バージョンは 1.9.1 のため、最新バージョンの機能を利用したい場合は後の方で説明する方法を利用する必要があります。 iceberg.apache.org
検証のために事前にAthenaで以下のSQLを実行して、GlueからデータをINSERTするIcebergテーブルを作成しておきます。
CREATE TABLE iceberg_glue_table ( no bigint, user string, age bigint, favorite string) LOCATION 's3://test-tmp-20250602/ice_warehouse/iceberg_db/iceberg_glue_table/' TBLPROPERTIES ( 'table_type' ='ICEBERG' )
GlueコンソールのVisual ETLからGUIでETL処理を実装します。
今回は過去に作成したIcebergテーブルのデータからfavoriteカラムが"banana"のユーザーを取得して別のテーブルに追加する処理にしています。
Visual ETLのData previewでは"test-user3"が表示されており、
Athenaでクエリした結果を見る限りは一致してそうです。
実際のScriptは以下になります。こちらは全て自動生成されたものです。
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue import DynamicFrame import re args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) # Script generated for node AWS Glue Data Catalog AWSGlueDataCatalog_node1749004732351_df = glueContext.create_data_frame.from_catalog(database="iceberg_db", table_name="iceberg_ctas_table") AWSGlueDataCatalog_node1749004732351 = DynamicFrame.fromDF(AWSGlueDataCatalog_node1749004732351_df, glueContext, "AWSGlueDataCatalog_node1749004732351") # Script generated for node Filter Filter_node1749004836995 = Filter.apply(frame=AWSGlueDataCatalog_node1749004732351, f=lambda row: (bool(re.match("banana", row["favorite"]))), transformation_ctx="Filter_node1749004836995") # Script generated for node AWS Glue Data Catalog AWSGlueDataCatalog_node1749005130956_df = Filter_node1749004836995.toDF() AWSGlueDataCatalog_node1749005130956 = glueContext.write_data_frame.from_catalog(frame=AWSGlueDataCatalog_node1749005130956_df, database="iceberg_db", table_name="iceberg_glue_table", additional_options={}) job.commit()
「Job details」タブからGlue Jobの設定をしていきます。最低限以下の設定を行います。
- IAM Role:Glueを実行する権限を持つIAMロールを選択
- Glue version:今回はGlue 5.0を選択
- number of workers:デフォルトが10なので、最小の2に変更
あと画面下部にあるジョブパラメータからIcebergパラメータを指定します。
- Key:
--datalake-formats
- Value:
iceberg
ここまでできたら保存して実行してみます。
1分と少しで正常終了しました。
先ほど作成したテーブルをAthenaからクエリすると、想定通りfavoriteカラムが"banana"のユーザーのみ追加されていました。
SELECT * FROM "iceberg_db"."iceberg_glue_table" limit 20
このようにスクリプトを実装せずに少しの操作でGlueからIcebergテーブルへの操作をすることができます。
アイスバーグのカスタムバージョンを使う(カスタムコネクタの使用)
事前準備
先ほども書きましたが、Glueに組み込まれていないIcebergバージョンを利用したいケースやアップグレードするタイミングを自分でコントロールしたい場合、カスタムコネクタや独自のJARファイルを使用する必要があります。
どのようなJARが必要かですが、AWS規範ガイダンスには次のように記載されています。
ここを見る限りだと以下の3つが必要なようです。
IcebergのJARが必要なのは分かりますが、残り2つは何故必要なのかまでは説明されていません。
- iceberg-spark-runtime-3.1_2.12-0.13.1.jar
- bundle-2.17.161.jar
- url-connection-client-2.17.161.jar
たとえば、Icebergバージョン0.13.1用のカスタマーコネクターを作成するには、次の手順に従います: ファイルiceberg-spark-runtime-3.1_2.12-0.13.1.jar、bundle-2.17.161.jar、およびurl-connection-client-2.17.161.jarをAmazon S3バケットにアップロードします。 これらのファイルは、それぞれのApache Mavenリポジトリからダウンロードできます。
IcebergドキュメントのAWS統合のページを見ると以下の記載が見つけられました。
IcebergをAWS上で利用する場合はAWS v2 SDKとApache HTTP Clientが必要なようです。
IcebergはAWS v2 SDKに依存しているため、AWS v2 SDKをご用意いただく必要があります。 すべてのデフォルトのAWSクライアントは、 HTTP接続管理にApache HTTP Clientを使用します 。この依存関係はAWS SDKバンドルには含まれていないため、別途追加する必要があります。 iceberg.apache.org
ただ、同ページのSparkの説明を読む限りだと iceberg-aws-bundle
にまとめてパッケージされているようなので、2つでも良さそうです。
例えば、Spark 3.4(scala 2.12)とAWSクライアント(iceberg-aws-bundleにパッケージされている)でAWSの機能を使用するには、Spark SQLシェルを次のように起動します iceberg.apache.org
記載されているSparkコマンドを見るとiceberg-spark-runtimeとiceberg-aws-bundleの2つだけが指定されていました。
spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.9.1,org.apache.iceberg:iceberg-aws-bundle:1.9.1 \
ダウンロードするJARのバージョン指定のためにGlue 5.0でサポートしているランタイムバージョンも確認しておきます。
・Spark 3.5.4 ・Python 3.11 ・Scala 2.12.18 docs.aws.amazon.com
以下のページで「iceberg-spark-runtime-3.5」を検索して、一番上に出てきたIcebergアイコンのものを押下します。
https://mvnrepository.com/artifact/org.apache.iceberg
今回は1.9.1バージョンを使ってみます。1.9.1
の数字の部分を押下します。
Filesにあるjar
を押下してJARファイルをダウンロードします。
同じ要領で以下のページで「iceberg-aws-bundle」を検索して、一番上に出てきたIcebergアイコンのものを押下します。
https://mvnrepository.com/artifact/org.apache.iceberg
こちらも1.9.1バージョンを使うので1.9.1
の数字の部分を押下します。
先ほどと同じようにFilesにあるjar
を押下してJARファイルをダウンロードします。
これで2つのJARファイルが用意できました。
Glue Connectionの作成
以下に記載の手順に従って設定していきます。 docs.aws.amazon.com
まずダウンロードしたJARファイルをS3バケットにアップロードします。
GlueコンソールのConnectionsから「Create custom connector 」を押下します。
先ほどJARファイルをアップロードしたS3パスを指定して、その他必要そうな情報を入力して作成します。
- Connector S3 URL:s3://test-tmp-20250602/iceberg/jar/
- Name:iceberg-1.9.0-connector
- Connector type:Spark
- Class name:org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.9.1
- Description - optional:org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.9.1
カスタムコネクタが作成できました。次にこのコネクタを利用したGlue Connectionを作成していきます。
カスタムコネクタのページの右上にある「Create connection」を押下します。
今回はGlue Jobのみで利用するため、AWS SecretやNetwork Optionsは利用せず、名前だけ入力して作成します。
- Name:iceberg-1_9_1-connection
Glue Connectionが作成できました。
カスタムコネクタを利用したIcebergテーブル操作
一から作るのは手間のため、先ほど作成したGlue JobのFilter条件を変更して使用します。
先ほどはfavoriteカラムが"banana"のユーザーのみ取得しましたが、次は"orange"のユーザーを取得するようにします。
ジョブパラメータは削除します。
あとは作成したGlue Connectionを設定するだけ...と思ったのですが、エラーがでてここからうまくいきませんでした。
カスタムコネクタは使う人が少ないのか情報があまりなく、ドキュメントにも詳しく載っていないので、一旦ここでのエラー解消は諦めて次にいきます。
もし設定方法が分かったら更新します。
アイスバーグのカスタムバージョンを使う(独自のJARファイルの使用)
独自のJARファイルを持ち込む
AWS規範ガイダンスの記載は少ないので、以下Glueユーザーガイドを参考に進めます。
別バージョンの Iceberg の使用 AWS Glue でサポートされないバージョンの Iceberg を使用するには、--extra-jars ジョブパラメータを使用して独自の Iceberg JAR ファイルを指定します。--datalake-formats パラメータの値として、iceberg を含めないようにしてください。AWS Glue 5.0 を使用する場合、--user-jars-first true ジョブパラメータを設定する必要があります。
上記を見る限りだと以下の手順ができれば問題なさそうです。
- --extra-jars パラメータを使用したJARファイルの指定
- --user-jars-firstパラメータを
true
で設定 - --datalake-formatsパラメータを設定しない
--extra-jars
パラメータはカンマ区切りのフルパスを設定すれば良さそうです。
AWS Glue がドライバーおよびエグゼキュターにコピーする追加ファイルへの Amazon S3 パス。AWSGlue は、スクリプトを実行する前にこれらのファイルを Java クラスパスにも追加します。複数の値はコンマ (,) で区切られた完全なパスでなければなりません。拡張機能は .jar である必要はありません docs.aws.amazon.com
以下のように設定しました。iceberg-aws-bundle
は必須かわからないので一旦設定せずにやってみます。
--extra-jars
:s3://test-tmp-20250602/iceberg/jar/iceberg-spark-runtime-3.5_2.13-1.9.1.jar
--user-jars-first
:true
ETL処理内容は先ほど実施しようとした内容と同じにしています。
java.lang.AbstractMethodError: Receiver class org.apache.spark.sql.catalyst.parser.extensions.IcebergSparkSqlExtensionsParser does not define or inherit an implementation of the resolved method 'abstract scala.collection.Seq parseMultipartIdentifier(java.lang.String)' of interface org.apache.spark.sql.catalyst.parser.ParserInterface.
エラーが発生しました。ClaudeによるとJARのScalaバージョンが2.13ベースで、Glue 5.0でサポートしているScala 2.12でないことが理由のようです。
以下からScala 2.12のJARをダウンロードして設定を修正して再実行してみます。 https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-spark-runtime-3.5_2.12/1.9.1
java.lang.IllegalArgumentException: Property 'write.object-storage.path' has been deprecated and will be removed in 2.0, use 'write.data.path' instead.
次のエラーはiceberg 1.9.1で廃止予定のプロパティが使用されていることが原因でエラーとなりました。
詳しく調べてもらったところAthenaで作成したicebergテーブルにプロパティが残っているのが理由で、書き込み時にadditional_optionsで指定してもうまくいかないようです。
そのため、処理の最初にテーブルプロパティを修正するSQLを実行する処理を追加しました。
# 非推奨プロパティエラーを修正 spark.sql("ALTER TABLE glue_catalog.iceberg_db.iceberg_glue_table SET TBLPROPERTIES ('write.data.path' = 's3://test-tmp-20250602/ice_warehouse/iceberg_db/iceberg_glue_table/data/')") spark.sql("ALTER TABLE glue_catalog.iceberg_db.iceberg_glue_table UNSET TBLPROPERTIES IF EXISTS ('write.object-storage.path')")
その上で書き込み時にadditional_optionsパラメータを追加して、iceberg 1.9.1でサポートしている"write.data.path"パラメータを指定するようにしました。
AWSGlueDataCatalog_node1749019848763 = glueContext.write_data_frame.from_catalog( frame=AWSGlueDataCatalog_node1749019848763_df, database="iceberg_db", table_name="iceberg_glue_table", additional_options={ "write.data.path": "s3://test-tmp-20250602/ice_warehouse/iceberg_db/iceberg_glue_table/data/" })
修正後のコード全量は以下です。
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue import DynamicFrame import re args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) # 非推奨プロパティエラーを修正 spark.sql("ALTER TABLE glue_catalog.iceberg_db.iceberg_glue_table SET TBLPROPERTIES ('write.data.path' = 's3://test-tmp-20250602/ice_warehouse/iceberg_db/iceberg_glue_table/data/')") spark.sql("ALTER TABLE glue_catalog.iceberg_db.iceberg_glue_table UNSET TBLPROPERTIES IF EXISTS ('write.object-storage.path')") # Script generated for node AWS Glue Data Catalog AWSGlueDataCatalog_node1749019842037_df = glueContext.create_data_frame.from_catalog(database="iceberg_db", table_name="iceberg_ctas_table") AWSGlueDataCatalog_node1749019842037 = DynamicFrame.fromDF(AWSGlueDataCatalog_node1749019842037_df, glueContext, "AWSGlueDataCatalog_node1749019842037") # Script generated for node Filter Filter_node1749019846654 = Filter.apply(frame=AWSGlueDataCatalog_node1749019842037, f=lambda row: (bool(re.match("orange", row["favorite"]))), transformation_ctx="Filter_node1749019846654") # Script generated for node AWS Glue Data Catalog AWSGlueDataCatalog_node1749019848763_df = Filter_node1749019846654.toDF() AWSGlueDataCatalog_node1749019848763 = glueContext.write_data_frame.from_catalog( frame=AWSGlueDataCatalog_node1749019848763_df, database="iceberg_db", table_name="iceberg_glue_table", additional_options={ "write.data.path": "s3://test-tmp-20250602/ice_warehouse/iceberg_db/iceberg_glue_table/data/" }) job.commit()
また、ジョブパラメータを2つ追加しています。
--conf
のほうはGlueでIcebergを扱うためのSpark設定で、--environment-variables
の方はログにAWS SDK v1の警告が出ていたのでついでに出ないように設定しました。
--environment-variables
は設定しなくても問題ありません。
--conf
:extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.glue_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog --conf spark.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO --conf spark.sql.catalog.glue_catalog.warehouse=s3://test-tmp-20250602/ice_warehouse/
--environment-variables
:CUSTOMER_AWS_JAVA_V1_DISABLE_DEPRECATION_ANNOUNCEMENT=true
修正と設定が完了後、再実行したところ無事ジョブが正常終了しました。
Athenaでもクエリしてみましたが、想定通りfavoriteカラムが"orange"のユーザーが追加されていました。
以下SQLを実行してスナップショット情報を見ると、summaryカラムにIceberg 1.9.1で作成されたスナップショットがあることが確認できました。
SELECT summary FROM "iceberg_db"."iceberg_glue_table$snapshots" LIMIT 2;
Spark configurations for Iceberg in AWS Glue
ここではAWS Glue ETLジョブでIcebergデータを扱うために必要なSpark設定について解説されています。
先ほども説明しましたが、Spark設定は--conf
キーを利用してGlueのジョブパラメータとして渡せます。(コード内に実装も可能です)
以下はnotebookで利用する際の設定値です。
%glue_version 3.0 %connections <name-of-the iceberg-connection> %%configure { "--conf" : "spark.sql.extensions=org.apache.iceberg.spark.extensions...", "--datalake-formats" : "iceberg" }
具体的な設定内容は以下になります。
- <catalog_name>:Iceberg Sparkセッションのカタログ名
- <catalog_name>.
:データとメタデータを保存するAmazon S3のパスを指定 - <catalog_name>.catalog-impl:AWS Glue Data Catalogをカタログに利用
- <catalog_name>.io-impl:Amazon S3マルチパートアップロード利用
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.catalog.<catalog_name>=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.<catalog_name>.<warehouse>=s3://<your-warehouse-dir>/ --conf spark.sql.catalog.<catalog_name>.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog --conf spark.sql.catalog.<catalog_name>.io-impl=org.apache.iceberg.aws.s3.S3FileIO
参考として、glue_iceberg
というカタログがある場合の--conf
設定が以下になります。
%%configure { "‐‐datalake-formats" : "iceberg", "‐‐conf" : "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", "‐‐conf" : "spark.sql.catalog.glue_iceberg=org.apache.iceberg.spark.SparkCatalog", "‐‐conf" : "spark.sql.catalog.glue_iceberg.warehouse=s3://<your-warehouse-dir>=>/", "‐‐conf" : " spark.sql.catalog.glue_iceberg.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog ", "‐‐conf" : " spark.sql.catalog.glue_iceberg.io-impl=org.apache.iceberg.aws.s3.S3FileIO }
以下のようにSparkコードに含めることも可能です。
spark = SparkSession.builder\ .config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")\ .config("spark.sql.catalog.glue_iceberg", "org.apache.iceberg.spark.SparkCatalog")\ .config("spark.sql.catalog.glue_iceberg.warehouse","s3://<your-warehouse-dir>/")\ .config("spark.sql.catalog.glue_iceberg.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \ .config("spark.sql.catalog.glue_iceberg.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") \ .getOrCreate()
Lake Formationに登録されているIcebergテーブルを読み書きする場合、Glue 4.0は以下の設定を追加します。
--conf spark.sql.catalog.glue_catalog.glue.lakeformation-enabled=true --conf spark.sql.catalog.glue_catalog.glue.id=<table-catalog-id>
Glue 5.0以降の場合はこちらのドキュメントに記載があるようです。 docs.aws.amazon.com
Best practices for AWS Glue jobs
AWS GlueのSparkジョブを利用してIcebergテーブルへのデータ読み書きを最適化するためのガイドラインがあります。 以下にドキュメントに記載されている内容を書き出します。
- AWS Glue の最新バージョンを使用し、可能な限りアップグレードしてください
- 新しいGlueバージョンの方がパフォーマンスや起動時間等が向上しており、新機能なども提供されているため
- AWS Glue ジョブのメモリを最適化する
- AWS Glue Auto Scaling を使用する
- AWS Glueワーカー数が動的に調整され、リソースとコストの最適化ができるため
- カスタムコネクタの使用、またはライブラリ依存関係の追加
- Icebergの使い始めはAWS Glueに組み込まれたIcebergライブラリの利用が最適
- ただ、本番環境などでIcebergバージョン等のコントールが必要であればカスタムコネクタや独自JARの導入を検討したほうがよい
- モニタリングとデバッグのために Spark UI を有効にする
- AWS GlueのSpark UIを利用することでジョブのステージをDAGで視覚的にモニタリングでき、トラブルシュートにかかる時間を減らすことができる
次回
今回はWorking with Apache Iceberg in AWS Glueについて確認しました。 流れで行くと次は以下ですが、内容的にGlueと近しいところがあるのでこれもスキップします。
次回は以下ページを見ていきながら、可能なら実際に使い方も試してみたいと思います。
- Working with Apache Iceberg tables by using Amazon Data Firehose(Amazon Data Firehoseを使ってApache Icebergテーブルを扱う)
終わりに
カスタムコネクタのところが最終的に上手くいかなかったのか少し心残りですが、もう一つのほうはうまくいったので良かったです。
↑は必要なところだけ書いていますが、裏ではかなりの試行錯誤があったことだけ書いておきます。