カピバラ好きなエンジニアブログ

興味ある技術とか検証した内容を赴くままに書いていきます。カピバラの可愛さこそ至高。

EmbulkでSQLServerに追加されたデータのみを指定してCSVに出力してみた

前回はSQL Serverから読み込んだデータをCSVに出力しましたが、データ量が多い場合毎回全データ取得するのはDBにも負荷がかかるのであまりオススメされません。
そこでSQLServerからデータを読み出す際に追加分だけを取得するようにしてみます。

データの追加分だけSQLServerから取得するためには、embulk-input-sqlserverプラグインのincrementalパラメータとincremental_columnsパラメータを使用します。
このパラメータで、取得するテーブル内の主キーやまたは日付カラムを指定し、データを読み込んだ際にその最新の値(or日付)を保持しておき、次回の読み込み時にはその値を元に追加されたデータのみを指定して取得します。
公式ドキュメントの説明は以下からご確認ください。

github.com

以下のブログを参考にさせていただきました。
tech.griphone.co.jp


実施作業

テストデータ作成

以下のブログを参考にさせていただき、テスト用のデータを作ります。
mura-hiro.com

できたデータがこちらです。とりあえず100件ほど作成しました。
f:id:live-your-life-dd18:20200313142637p:plain

ほとんどコピペになりますが、追加したSQLは以下です。

USE TESTDB ;  
GO  

DECLARE @p_InsertNumber Bigint    -- INSERTする行数
SELECT @p_InsertNumber=100;  -- 100万に設定

WITH Base AS
  (
    SELECT
      1 AS n

    UNION ALL
    
    SELECT 
      n + 1
    FROM
      Base
    WHERE
      n < @p_InsertNumber
  ),
  Nums AS
  (
     SELECT
       Row_Number() OVER(ORDER BY n) AS n
     FROM
       Base
  )
  
  INSERT INTO Employee 
  SELECT
      n
    , 'test'
    ,  GETDATE()
  FROM
    Base
  WHERE 
    n <= @p_InsertNumber

OPTION (MaxRecursion 0); -- 再帰クエリの再帰回数の上限をなくす


設定内容確認

次に前回CSV出力した際の設定ファイルの内容を確認します。

in:
  type: sqlserver
  driver_path: C:\drivers\sqljdbc_7.2\jpn\mssql-jdbc-7.2.2.jre8.jar
  host: EC2AMAZ-L2BKFDH
  user: dbuser
  password: "******"
  database: AdventureWorks2012
  schema: Production
  table: Product
  select: "ProductID, Name, ProductNumber"
  where: "ProductID < 500"
  order_by: "ProductID ASC"
out:
  type: file
  path_prefix: C:\Users\Administrator\try1\csv\production_product
  file_ext: csv
  formatter:
    type: csv
    charset: UTF-8

今回は前回の設定からselectパラメータ他を削除し、incrementalパラメータを2つ追加する変更を行います。
selectパラメータを削除する理由ですが、incrementalパラメータを設定することで処理実行時に内部で自動でSelect文が発行されるため、selectパラメータ他が記載されているとエラーになります。
※内部の処理としては、最初にincremental_columnsで設定したパラメータでソートが行われた後、Select文が実行されます。

in:
  type: sqlserver
  driver_path: C:\drivers\sqljdbc_7.2\jpn\mssql-jdbc-7.2.2.jre8.jar
  host: EC2AMAZ-L2BKFDH
  user: dbuser
  password: "******"
  database: TESTDB
  schema: dbo
  table: Employee
  incremental: true
  incremental_columns:
    - No
out:
  type: file
  path_prefix: C:\Users\Administrator\try1\csv\testdb_employee_no
  file_ext: csv
  formatter:
    type: csv
    charset: UTF-8


guessコマンド実行

guessコマンドを実行して、設定ファイルを出力します。

embulk guess .\try1\sqlserver_incremental_to_csv.yml -o config_inc_to_csv.yml

コマンドが正常終了していることを確認します。
f:id:live-your-life-dd18:20200313143531p:plain

出力された設定内容は以下のようになっています。
f:id:live-your-life-dd18:20200313143605p:plain

runコマンド実行(初回実行)

以下のコマンドを実行します。
前回と違うのは、-cオプションで差分の情報を保持しているファイルを指定しており、処理実行後は同ファイルを最新の値で更新します。

embulk run .\config_inc_to_csv.yml -c embulk_sql_inc_csv.diff.yaml

エラーなく正常終了しました。
f:id:live-your-life-dd18:20200313143920p:plain

ファイルも想定通りに出力されています。
f:id:live-your-life-dd18:20200313145514p:plain

出力メッセージ確認(初回実行)

出力されたメッセージを見ると、以下のSQLが実行されていることがわかります。
今回は初回実行で差分情報ファイルが存在していかなかったため、全件取得するような処理になっていました。

2020-03-13 14:38:07.365 +0900 [INFO] (0014:task-0000): SQL: SELECT * FROM "dbo"."Employee" ORDER BY "No"

最後の以下のメッセージで、incremental_columnsカラムに設定したカラムの最新の値がlast_recordとして設定されていることがわかります。

2020-03-13 14:38:07.607 +0900 [INFO] (main): Next config diff: {"in":{"last_record":[100]},"out":{}}

差分情報ファイル内容は以下のようになっています。
f:id:live-your-life-dd18:20200313144855p:plain

ファイルの中身も取得したデータが出力されています。
f:id:live-your-life-dd18:20200313145639p:plain

テストデータ追加

ここからが本題になります。
追加でテストデータを100件登録します。
f:id:live-your-life-dd18:20200313145223p:plain

追加するSQLは以下です。またもやほぼコピペです。

USE TESTDB ;  
GO  

DECLARE @p_InsertNumber Bigint    -- INSERTする行数
SELECT @p_InsertNumber=200;  -- 100万に設定

WITH Base AS
  (
    SELECT
      101 AS n

    UNION ALL
    
    SELECT 
      n + 1
    FROM
      Base
    WHERE
      n < @p_InsertNumber
  ),
  Nums AS
  (
     SELECT
       Row_Number() OVER(ORDER BY n) AS n
     FROM
       Base
  )
  
  INSERT INTO Employee 
  SELECT
      n
    , 'test'
    ,  GETDATE()
  FROM
    Base
  WHERE 
    n <= @p_InsertNumber

OPTION (MaxRecursion 0); -- 再帰クエリの再帰回数の上限をなくす


runコマンド実行(2回目実行)

テストデータの追加ができたら再度runコマンドを実行して、エラーが出ていないことを確認します。
f:id:live-your-life-dd18:20200314120542p:plain

ファイルが出力されていることを確認します。
f:id:live-your-life-dd18:20200314120909p:plain

ファイルの中身を確認すると、増分データのみが出力されていることが確認できました。
f:id:live-your-life-dd18:20200314120942p:plain

出力メッセージ確認(2回目実行)

rumコマンド実行時に出力された内容を確認すると、以下のSQLが実行されていることがわかります。
設定ファイルで指定したカラムをorder byでソートした後、Where句で追加されたデータのみが取得されるように条件指定されています。

2020-03-13 15:03:36.444 +0900 [INFO] (0014:task-0000): SQL: SELECT * FROM "dbo"."Employee" WHERE (("No" > ?)) ORDER BY "
No"

メッセージの最後にカラムの最新の値を更新して処理が終了されていることも確認できました。

2020-03-13 15:03:36.755 +0900 [INFO] (main): Next config diff: {"in":{"last_record":[200]},"out":{}}


感想及び所感

ウォッチするカラム名は主キーである必要はないようですが、データのロストを考えると主キーで設定したほうが間違いないですね。
日付カラムでも指定できそうですので、次回は日付カラムを指定して、同時間帯のデータがあった場合にどのような処理となるかをみていきます。