Skip to main content

Apache NiFiをClickHouseに接続

Apache NiFiは、オープンソースのワークフロー管理ソフトウェアで、ソフトウェアシステム間のデータフローを自動化するために設計されています。ETLデータパイプラインを作成することができ、300以上のデータプロセッサが用意されています。このステップバイステップのチュートリアルでは、Apache NiFiをClickHouseにソースおよびデスティネーションとして接続し、サンプルデータセットをロードする方法を示します。

1. 接続情報を集める

HTTP(S) を使用して ClickHouse に接続するには、以下の情報が必要です:

  • HOST と PORT: 通常、TLS を使用する場合のポートは 8443、TLS を使用しない場合は 8123 です。

  • データベース名: デフォルトで default という名前のデータベースがありますが、接続したいデータベースの名前を使用してください。

  • ユーザー名とパスワード: デフォルトでユーザー名は default です。使用ケースに適したユーザー名を使用してください。

ClickHouse Cloud サービスの詳細は、ClickHouse Cloud コンソールで確認できます。 接続するサービスを選択し、接続 をクリックします:

ClickHouse Cloud service connect button

HTTPS を選択すると、サンプルの curl コマンドで詳細が確認できます。

ClickHouse Cloud HTTPS connection details

セルフマネージドの ClickHouse を使用している場合、接続の詳細は ClickHouse 管理者によって設定されます。

2. Apache NiFiをダウンロードして実行

  1. 新しいセットアップのために、https://nifi.apache.org/download.html からバイナリをダウンロードし、./bin/nifi.sh start を実行して開始します。

3. ClickHouse JDBCドライバーをダウンロード

  1. GitHubのClickHouse JDBCドライバーリリースページを訪問し、最新のJDBCリリース版を探します。
  2. リリースバージョンで「Show all xx assets」をクリックし、「shaded」または「all」のキーワードを含むJARファイル、例えば clickhouse-jdbc-0.5.0-all.jar を探します。
  3. JARファイルをApache NiFiがアクセスできるフォルダーに配置し、絶対パスをメモしておきます。

4. DBCPConnectionPoolコントローラーサービスを追加してプロパティを設定

  1. Apache NiFiでコントローラーサービスを設定するために、「歯車」ボタンをクリックしてNiFiフロー構成ページにアクセスします。

    Nifi Flow Configuration
  2. コントローラーサービスタブを選択し、右上の+ボタンをクリックして新しいコントローラーサービスを追加します。

    Add Controller Service
  3. DBCPConnectionPoolを検索して「Add」ボタンをクリックします。

    Search for DBCPConnectionPool
  4. 新しく追加されたDBCPConnectionPoolはデフォルトで無効な状態になります。「歯車」ボタンをクリックして設定を開始します。

    Nifi Flow Configuration
  5. 「プロパティ」セクションの下に以下の値を入力します。

    プロパティ備考
    Database Connection URLjdbc:ch:https://HOSTNAME:8443/default?ssl=true接続URLに応じてHOSTNAMEを置き換える
    Database Driver Class Namecom.clickhouse.jdbc.ClickHouseDriver
    Database Driver Location(s)/etc/nifi/nifi-X.XX.X/lib/clickhouse-jdbc-0.X.X-patchXX-shaded.jarClickHouse JDBCドライバJARファイルへの絶対パス
    Database UserdefaultClickHouseのユーザー名
    PasswordpasswordClickHouseのパスワード
  6. 設定セクションで、コントローラーサービスの名前を「ClickHouse JDBC」に変更し、参照しやすいようにします。

    Nifi Flow Configuration
  7. DBCPConnectionPoolコントローラーサービスを有効化するために「稲妻」ボタンをクリックし、「Enable」ボタンを押します。

    Nifi Flow Configuration
    Nifi Flow Configuration
  8. コントローラーサービスタブを確認し、コントローラーサービスが有効になっていることを確かめます。

    Nifi Flow Configuration

5. ExecuteSQLプロセッサを使用してテーブルから読み込み

  1. ​​ExecuteSQLプロセッサを追加し、適切な上流および下流のプロセッサを設定します。

    ​​ExecuteSQL processor
  2. ​​ExecuteSQLプロセッサの「プロパティ」セクションの下に以下の値を入力します。

    プロパティ備考
    Database Connection Pooling ServiceClickHouse JDBCClickHouse用に設定されたコントローラーサービスを選択
    SQL select querySELECT * FROM system.metricsここにクエリを入力
  3. ​​ExecuteSQLプロセッサを開始します。

    ​​ExecuteSQL processor
  4. クエリが正常に処理されたことを確認するために、出力キューのFlowFileを確認します。

    ​​ExecuteSQL processor
  5. 出力FlowFileの結果をフォーマット表示で確認します。

    ​​ExecuteSQL processor

6. MergeRecordとPutDatabaseRecordプロセッサを使用してテーブルに書き込み

  1. 複数の行を一度に挿入するには、まず複数のレコードを単一のレコードにマージする必要があります。これにはMergeRecordプロセッサを使用します。

  2. MergeRecordプロセッサの「プロパティ」セクションの下に以下の値を入力します。

    プロパティ備考
    Record ReaderJSONTreeReader適切なレコードリーダーを選択
    Record WriterJSONReadSetWriter適切なレコードライターを選択
    Minimum Number of Records1000最低限マージされる必要がある行数を増やします。デフォルトは1行
    Maximum Number of Records10000「Minimum Number of Records」より大きい値にします。デフォルトは1,000行
  3. 複数のレコードが1つにマージされていることを確認するために、MergeRecordプロセッサの入力と出力を確認します。出力は複数の入力レコードの配列です。

    入力

    ​​ExecuteSQL processor

    出力

    ​​ExecuteSQL processor
  4. PutDatabaseRecordプロセッサの「プロパティ」セクションの下に以下の値を入力します。

    プロパティ備考
    Record ReaderJSONTreeReader適切なレコードリーダーを選択
    Database TypeGenericデフォルトのままにします
    Statement TypeINSERT
    Database Connection Pooling ServiceClickHouse JDBCClickHouseコントローラーサービスを選択
    Table Nametblテーブル名をここに入力
    Translate Field Namesfalse挿入されるフィールド名がカラム名と一致する必要がある場合は「false」に設定
    Maximum Batch Size1000挿入ごとの最大行数。この値はMergeRecordプロセッサの「Minimum Number of Records」の値より小さくしてはいけません。
  5. 各挿入に複数の行が含まれていることを確認するために、MergeRecordで定義された「Minimum Number of Records」の値以上にテーブルの行数が増加しているか確認します。

    ​​ExecuteSQL processor
  6. おめでとうございます - Apache NiFiを使用してClickHouseにデータを正常にロードしました!