カピバラ好きなエンジニアブログ

興味ある技術とか検証した内容を赴くままに書いていきます。カピバラの可愛さこそ至高。

EmbulkでCSVファイルをS3にアップロードする

過去に以下のような記事を書きましたが、今回はそれらを組み合わせてサンプルで準備されているCSVファイルをAWSのS3にアップロードしてみます。

www.capybara-engineer.com

www.capybara-engineer.com


尚、S3にアップロードする設定でアクセスキーとシークレットキーを設定しないときにどのように認証されるかを確認することが本当の目的です。

実施作業

準備

本日使用するプラグインはこちらです。
github.com


過去使用したプラグインも含まれているので不要なものもありますが、今回はembulk-output-s3のversion1.5.0を使用します。
f:id:live-your-life-dd18:20210412131627p:plain


実行元となるEmbulkのymlファイルはこちらです。

in:
  type: file
  path_prefix: 'C:\work\embulk_sample\.\try1\csv\sample_'
out:
  type: stdout



guessコマンドを実行後のymlファイルはこちらです。

in:
  type: file
  path_prefix: C:\work\embulk_sample\.\try1\csv\sample_
  decoders:
  - {type: gzip}
  parser:
    charset: UTF-8
    newline: LF
    type: csv
    delimiter: ','
    quote: '"'
    escape: '"'
    null_string: 'NULL'
    trim_if_not_quoted: false
    skip_header_lines: 1
    allow_extra_columns: false
    allow_optional_columns: false
    columns:
    - {name: id, type: long}
    - {name: account, type: long}
    - {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
    - {name: purchase, type: timestamp, format: '%Y%m%d'}
    - {name: comment, type: string}
out: {type: stdout}



コンソールに出力した結果も貼っておきます。
f:id:live-your-life-dd18:20210412132406p:plain


あとはアップロード先のS3バケットも作成しておきます。
f:id:live-your-life-dd18:20210412132919p:plain

Embulk ymlファイル修正

S3にアップロードするようにEmbulkのymlファイルを以下のように更新します。
後述しますが、アクセスキーとシークレットキーの項目は前回記事から引用時に削除しています。

in:
  type: file
  path_prefix: 'C:\work\embulk_sample\.\try1\csv\sample_'
out:
  type: s3
  path_prefix: test/sample-seed
  file_ext: .csv
  bucket: test-tmp-20210412
  endpoint: s3-ap-northeast-1.amazonaws.com
  formatter:
    type: csv



※過去記事ではアクセスキーとシークレットキーでアップロードしていましたが、公式のGitHubには以下のように記載があり、デフォルトではEC2にアタッチされているIAMロールで認証するようになっているようです。

access_key_id: AWS access key id. This parameter is required when your agent is not running on EC2 instance with an IAM Role. (string, defualt: null)
secret_access_key: AWS secret key. This parameter is required when your agent is not running on EC2 instance with an IAM Role. (string, defualt: null)


S3にアップロード(EC2にIAMロールが設定されていない場合)

Embulkを実行するEC2にIAMロールがアタッチされていないことを確認します。
f:id:live-your-life-dd18:20210412133748p:plain


embulk guessコマンドで実行用のymlファイルを生成します。

> gc .\config1.yml
in:
  type: file
  path_prefix: C:\work\embulk_sample\.\try1\csv\sample_
  decoders:
  - {type: gzip}
  parser:
    charset: UTF-8
    newline: LF
    type: csv
    delimiter: ','
    quote: '"'
    escape: '"'
    null_string: 'NULL'
    trim_if_not_quoted: false
    skip_header_lines: 1
    allow_extra_columns: false
    allow_optional_columns: false
    columns:
    - {name: id, type: long}
    - {name: account, type: long}
    - {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
    - {name: purchase, type: timestamp, format: '%Y%m%d'}
    - {name: comment, type: string}
out:
  type: s3
  path_prefix: test/sample-seed
  file_ext: .csv
  bucket: test-tmp-20210412
  endpoint: s3-ap-northeast-1.amazonaws.com
  formatter: {type: csv}



AmazonS3Exceptionが出力され、Access Deniedで失敗していることがわかります。ここまでは予想通りです。

> embulk run .\config1.yml -b .\test_bundle
2021-04-12 04:40:12.347 +0000: Embulk v0.9.23
2021-04-12 04:40:13.887 +0000 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected.
2021-04-12 04:40:17.150 +0000 [INFO] (main): BUNDLE_GEMFILE is being set: "C:\work\embulk_sample\.\test_bundle\Gemfile"
2021-04-12 04:40:17.167 +0000 [INFO] (main): Gem's home and path are being cleared.
2021-04-12 04:40:19.021 +0000 [INFO] (main): Started Embulk v0.9.23
2021-04-12 04:40:19.495 +0000 [INFO] (0001:transaction): Loaded plugin embulk-output-s3 (1.5.0)
2021-04-12 04:40:19.562 +0000 [INFO] (0001:transaction): Listing local files at directory 'C:\work\embulk_sample\.\try1\csv' filtering filename by prefix 'sample_'
2021-04-12 04:40:19.568 +0000 [INFO] (0001:transaction): "follow_symlinks" is set false. Note that symbolic links to directories are skipped.
2021-04-12 04:40:19.571 +0000 [INFO] (0001:transaction): Loading files [C:\work\embulk_sample\.\try1\csv\sample_01.csv.gz]
2021-04-12 04:40:19.632 +0000 [INFO] (0001:transaction): Using local thread executor with max_threads=2 / tasks=1
2021-04-12 04:40:19.688 +0000 [INFO] (0001:transaction): {done:  0 / 1, running: 0}
2021-04-12 04:40:21.100 +0000 [INFO] (0013:task-0000): Writing S3 file 'test/sample-seed.000.00.csv'
2021-04-12 04:40:21.792 +0000 [INFO] (0001:transaction): {done:  1 / 1, running: 0}
org.embulk.exec.PartialExecutionException: com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; Request ID: K3X5KJHADPT54CFP; S3 Extended Request ID: 0D3+lwnrd/p50rEgNQdgvzeUEUYoD3iRfMoi7tNqBihITq7gZcuGxYxzVAK8JFQTa1Tc/0+cy08=), S3 Extended Request ID: 0D3+lwnrd/p50rEgNQdgvzeUEUYoD3iRfMoi7tNqBihITq7gZcuGxYxzVAK8JFQTa1Tc/0+cy08=
        at org.embulk.exec.BulkLoader$LoaderState.buildPartialExecuteException(BulkLoader.java:340)
        at org.embulk.exec.BulkLoader.doRun(BulkLoader.java:566)
        at org.embulk.exec.BulkLoader.access$000(BulkLoader.java:35)
        at org.embulk.exec.BulkLoader$1.run(BulkLoader.java:353)
        at org.embulk.exec.BulkLoader$1.run(BulkLoader.java:350)
        at org.embulk.spi.Exec.doWith(Exec.java:22)
        at org.embulk.exec.BulkLoader.run(BulkLoader.java:350)
        at org.embulk.EmbulkEmbed.run(EmbulkEmbed.java:242)
        at org.embulk.EmbulkRunner.runInternal(EmbulkRunner.java:291)
        at org.embulk.EmbulkRunner.run(EmbulkRunner.java:155)
        at org.embulk.cli.EmbulkRun.runSubcommand(EmbulkRun.java:431)
        at org.embulk.cli.EmbulkRun.run(EmbulkRun.java:90)
        at org.embulk.cli.Main.main(Main.java:64)
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; Request ID: K3X5KJHADPT54CFP; S3 Extended Request ID: 0D3+lwnrd/p50rEgNQdgvzeUEUYoD3iRfMoi7tNqBihITq7gZcuGxYxzVAK8JFQTa1Tc/0+cy08=), S3 Extended Request ID: 0D3+lwnrd/p50rEgNQdgvzeUEUYoD3iRfMoi7tNqBihITq7gZcuGxYxzVAK8JFQTa1Tc/0+cy08=
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1639)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
        at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
        at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
        at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
        at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
        at com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1749)
        at org.embulk.output.S3FileOutputPlugin$S3FileOutput.putFile(S3FileOutputPlugin.java:192)
        at org.embulk.output.S3FileOutputPlugin$S3FileOutput.closeCurrent(S3FileOutputPlugin.java:202)
        at org.embulk.output.S3FileOutputPlugin$S3FileOutput.finish(S3FileOutputPlugin.java:259)
        at org.embulk.spi.util.FileOutputOutputStream.close(FileOutputOutputStream.java:94)
        at sun.nio.cs.StreamEncoder.implClose(Unknown Source)
        at sun.nio.cs.StreamEncoder.close(Unknown Source)
        at java.io.OutputStreamWriter.close(Unknown Source)
        at java.io.BufferedWriter.close(Unknown Source)
        at org.embulk.spi.util.LineEncoder.finish(LineEncoder.java:90)
        at org.embulk.standards.CsvFormatterPlugin$1.finish(CsvFormatterPlugin.java:194)
        at org.embulk.spi.FileOutputRunner$DelegateTransactionalPageOutput.finish(FileOutputRunner.java:154)
        at org.embulk.spi.PageBuilder.finish(PageBuilder.java:227)
        at org.embulk.standards.CsvParserPlugin.run(CsvParserPlugin.java:388)
        at org.embulk.spi.FileInputRunner.run(FileInputRunner.java:140)
        at org.embulk.spi.util.Executors.process(Executors.java:62)
        at org.embulk.spi.util.Executors.process(Executors.java:38)
        at org.embulk.exec.LocalExecutorPlugin$DirectExecutor$1.call(LocalExecutorPlugin.java:170)
        at org.embulk.exec.LocalExecutorPlugin$DirectExecutor$1.call(LocalExecutorPlugin.java:167)
        at java.util.concurrent.FutureTask.run(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)

Error: com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; Request ID: K3X5KJHADPT54CFP; S3 Extended Request ID: 0D3+lwnrd/p50rEgNQdgvzeUEUYoD3iRfMoi7tNqBihITq7gZcuGxYxzVAK8JFQTa1Tc/0+cy08=), S3 Extended Request ID: 0D3+lwnrd/p50rEgNQdgvzeUEUYoD3iRfMoi7tNqBihITq7gZcuGxYxzVAK8JFQTa1Tc/0+cy08=


S3にアップロード(EC2にIAMロールが設定した場合)

今度はEC2にS3 Full Accessポリシーを付与したIAMロールをアタッチして実行してみます。
f:id:live-your-life-dd18:20210412134242p:plain


処理が正常終了しました。

> embulk run .\config1.yml -b .\test_bundle
2021-04-12 04:43:16.528 +0000: Embulk v0.9.23
2021-04-12 04:43:18.032 +0000 [WARN] (main): DEPRECATION: JRuby org.jruby.embed.ScriptingContainer is directly injected.
2021-04-12 04:43:21.283 +0000 [INFO] (main): BUNDLE_GEMFILE is being set: "C:\work\embulk_sample\.\test_bundle\Gemfile"
2021-04-12 04:43:21.301 +0000 [INFO] (main): Gem's home and path are being cleared.
2021-04-12 04:43:23.200 +0000 [INFO] (main): Started Embulk v0.9.23
2021-04-12 04:43:23.442 +0000 [INFO] (0001:transaction): Loaded plugin embulk-output-s3 (1.5.0)
2021-04-12 04:43:23.509 +0000 [INFO] (0001:transaction): Listing local files at directory 'C:\work\embulk_sample\.\try1\csv' filtering filename by prefix 'sample_'
2021-04-12 04:43:23.515 +0000 [INFO] (0001:transaction): "follow_symlinks" is set false. Note that symbolic links to directories are skipped.
2021-04-12 04:43:23.518 +0000 [INFO] (0001:transaction): Loading files [C:\work\embulk_sample\.\try1\csv\sample_01.csv.gz]
2021-04-12 04:43:23.578 +0000 [INFO] (0001:transaction): Using local thread executor with max_threads=2 / tasks=1
2021-04-12 04:43:23.628 +0000 [INFO] (0001:transaction): {done:  0 / 1, running: 0}
2021-04-12 04:43:24.093 +0000 [INFO] (0014:task-0000): Writing S3 file 'test/sample-seed.000.00.csv'
2021-04-12 04:43:24.740 +0000 [INFO] (0001:transaction): {done:  1 / 1, running: 0}
2021-04-12 04:43:24.749 +0000 [INFO] (main): Committed.
2021-04-12 04:43:24.753 +0000 [INFO] (main): Next config diff: {"in":{"last_path":"C:\\work\\embulk_sample\\.\\try1\\csv\\sample_01.csv.gz"},"out":{}}



S3上にも想定通りアップロードされているようです。
f:id:live-your-life-dd18:20210412134432p:plain

アクセスキーとシークレットキー、IAMロール以外は無理なのか

使っていて一つ疑問になったのは、"上記以外の認証方式は使えないのか"ということです。


例を挙げると、よく似たプラグインで以下のembulk-output-s3_parquetはS3アップロード時にparquet変換も可能なものなのですが、こちらはauth_methodパラメータで複数の認証方式が設定可能となっています。
github.com


embulk-output-s3の認証周りのロジックを確認してみます。
以下の実装を見る限りだと、指定したアクセスキーとシークレットキーパラメータをAWS Java SDKのBasicAWSCredentialsクラスに渡して、その後AWS S3 Clientにパラメータとして渡しているようです。
embulk-output-s3/S3FileOutputPlugin.java at 11ab151755cef8d8e2d75f2f7c52ecdb155cd4e9 · llibra/embulk-output-s3 · GitHub
BasicAWSCredentials (AWS SDK for Java - 1.11.996)


アクセスキーとシークレットキーが設定されていない場合は、AWS S3 ClientにProxy情報のみ渡されています。
認証情報についての細かい制御は実装されていないように見えます。
embulk-output-s3/S3FileOutputPlugin.java at 11ab151755cef8d8e2d75f2f7c52ecdb155cd4e9 · llibra/embulk-output-s3 · GitHub


ではembulk-output-s3_parquetの認証は同実装されているのかとGitHubをみてみると、Embulkで設定可能な認証パラメータごとに細かく制御が実装されており、アクセスキーとシークレットキーを使用する場合は実装でBasic認証か一時認証キーを発行して接続するようになっていました。
embulk-output-s3_parquet/AwsCredentials.scala at master · civitaspo/embulk-output-s3_parquet · GitHub


上記の結果から追加で実装をすればembulk-output-s3プラグインも複数の認証ができるような気がしますが、そのままではキー使用かIAMロール使用しかダメなようです。。

感想及び所感

ということでEmbulkのS3にアップロードするプラグインについて試してみました。


AWSのベストプラクティスに従うのであればIAMロールをアタッチして使用するのが最適だと思います。
docs.aws.amazon.com