カピバラ好きなエンジニアブログ

ITのこととか趣味のカメラのことなどを赴くままに書いていきます。カピバラの可愛さこそ至高。

Embulkで自分で作成したSQLによる増分取り込みを実施してみた

以前の記事で、Embulkを使用してSQL Serverから増分データのみの取得を実施しましたが、その際の設定内容ではテーブル内のカラムすべてを取得するようになっていました。

ただ、どうしても特定のカラムのみ取り込みたい、抽出したいという要望が出てくることもあると思います。
そこで、今回は増分データを取得するのは変わりませんが、抽出するカラムを指定してみます。

前回の記事
capybara-engineer.hatenablog.com


自分で作成したSQLを使うためのパラメータはuse_raw_query_with_incrementalパラメータです。
※説明は以下のURL参照
github.com

実施作業

準備

GitHubの内容と以前使用したファイルを参考に、設定用のyamlファイルを作成します。

in:
  type: sqlserver
  query:
    SELECT
      foo.id as foo_id, bar.name
    FROM
      foo LEFT JOIN bar ON foo.id = bar.id
    WHERE
      foo.hoge IS NOT NULL
      AND foo.id > :foo_id
    ORDER BY
      foo.id ASC
  use_raw_query_with_incremental: true
  incremental_columns:
    - foo_id
  incremental: true
  last_record: [1]
  • 実際の設定内容
in:
  type: sqlserver
  driver_path: C:\drivers\sqljdbc_7.2\jpn\mssql-jdbc-7.2.2.jre8.jar
  host: EC2AMAZ-JGN0VFT
  user: dbuser
  password: "******"
  database: TESTDB
  query: ※②
    SELECT No, Name, RegDate, Add_No, Add_Name
    FROM dbo.Employee
  use_raw_query_with_incremental: true ※①
  incremental_columns: [No]
  incremental: true
  last_record: [1] ※③
out:
  type: stdout

以前のファイルと大きく異なる点は以下の3つです。

  • ①use_raw_query_with_incrementalパラメータを設定している
  • SQL文をファイル内に記述している
  • ③last_recordパラメータをyamlファイル内に記載している

順番に説明すると、
①は自分で作成したSQLで増分読み込みを使用する場合に設定しています。
②は実際に実行されるSQL文を記載しています。
③は増分読み込みする際に、基準となる値を設定しています。

outputはわかりやすくするために、コンソールに出力するようにしました。

yamlファイルが作成できたら実際に実行していきます。

Embulk実行

guessコマンドでconfigファイルを作成します。

embulk guess .\try1\sqlserver_incremental_to_sqlserver_query.yml -o .\config_inc_to_sqlserver_query.yml

f:id:live-your-life-dd18:20200330172804p:plain

configファイルが作成できたら、previewコマンドでデータが取得されるか確認してみます。

embulk preview .\config_inc_to_sqlserver_query.yml

f:id:live-your-life-dd18:20200330173010p:plain

うまくいくと思いきや以下のエラーで失敗しました。

Error: Column ":No" doesn't exist in query string



原因を調査してみましたが、NoカラムはちゃんとSQL文内に設定していますし、incremental_columnsパラメータにも指定してます。

データの取得先のSQL ServerSQLを実行してみましたが、問題なくデータは取得できました。
f:id:live-your-life-dd18:20200330173455p:plain


それならとGitHubのソースを見てみたところ、不足していたパラメータがわかりました。

previewコマンドを実行した際に以下のif文が実行されているのですが、よく見るとここで「:」+「カラム名」がrawQuery(つまり設定したSQL文)内に存在しているかを確認しています。

  • エラー箇所
if (!rawQuery.contains(":" + columnName)) {

embulk-input-jdbc/AbstractJdbcInputPlugin.java at 00e9855de0be1642b878c029abe0b107405d1f64 · embulk/embulk-input-jdbc · GitHub


Readmeを読み返すと以下のようにありました。

  • 本文
Prepared statement starts with : is available instead of fixed value. last_record value is necessary when you use this option. Please use prepared statement that is well distinguishable in SQL statement. Using too simple prepared statement like :a might cause SQL parse failure.

In the following example, prepared statement :foo_id will be replaced with value "1" which is specified in last_record.
  • 日本語訳(DeepL翻訳使用)
固定値の代わりに : で始まるprepared文を使用することができます。SQL文の中で区別しやすいようなprepared文を使用してください。aのような単純すぎるprepared文を使用すると、SQLのパースに失敗する可能性があります。

以下の例では、準備された文 :foo_id を last_record で指定された値 "1" に置き換えています。



どうやらlast_recordパラメータはただ設定するだけではダメで、last_recordで設定した値を埋め込むためのprepared文をSQL内に設定する必要があるようです。

それを踏まえて修正したファイル内容がこちらです。

in:
  type: sqlserver
  driver_path: C:\drivers\sqljdbc_7.2\jpn\mssql-jdbc-7.2.2.jre8.jar
  host: EC2AMAZ-JGN0VFT
  user: dbuser
  password: "******"
  database: TESTDB
  query:
    SELECT No, Name, RegDate, Add_No, Add_Name
    FROM dbo.Employee
    WHERE No > :No
    ORDER BY No ASC
  use_raw_query_with_incremental: true
  incremental_columns: [No]
  incremental: true
  last_record: [1]
out:
  type: stdout

このように設定することで、Embulkを実行した際にlast_recordパラメータの値がqueryパラメータ内のWHERE句にある「:No」に設定されて実行されるようになります。

再度guessコマンドとpreviewコマンドを実行してみたところ以下のようになりました。
f:id:live-your-life-dd18:20200330175159p:plain


runコマンドも実行してみます。

embulk run .\config_inc_to_sqlserver_query.yml -c .\embulk_sql_inc_sqlserver_query.diff.yaml

問題なく出力できました。
f:id:live-your-life-dd18:20200330175633p:plain

最新のレコードの値もちゃんとファイルに出力されていました。
f:id:live-your-life-dd18:20200330175715p:plain

おまけ

今回実施していて思ったのが、このやり方だとlast_recordがファイル内に直接書き込む形になっているため、実行のたびにyamlファイルを修正する必要が出てきてしまいます。

試しにlast_recordパラメータを削除してやってみましたが、必須パラメータなのかエラーで失敗してしまいました。
f:id:live-your-life-dd18:20200330180219p:plain

use_raw_query_with_incrementalパラメータを使わないやり方だと出力したファイルからの取得ができているので、ロジック的には可能だと思うのですが、現時点ではできないようです。

どうにかできないか方法を考えてみましたが、
①出力した最新のレコード値が設定されているファイルを読み込んで、yamlファイルを更新する
②embulkプラグインを自分で改修して、外部ファイルから読み込みできるようにする
ぐらいしか思いつきませんでした。

恒久的で汎用的にすることを考えると②をしたほうがよさそうな気もするので、時間見つけてやり方を調べてみようかと思います。

感想及び所感

かゆいところに微妙に手が届かないの、どうにかならないものか...