Apache NiFiをClickHouseに接続
Apache NiFiは、オープンソースのワークフロー管理ソフトウェアで、ソフトウェアシステム間のデータフローを自動化するために設計されています。ETLデータパイプラインを作成することができ、300以上のデータプロセッサが用意されています。このステップバイステップのチュートリアルでは、Apache NiFiをClickHouseにソースおよびデスティネーションとして接続し、サンプルデータセットをロードする方法を示します。1. 接続情報を集める
HTTP(S) を使用して ClickHouse に接続するには、以下の情報が必要です:
HOST と PORT: 通常、TLS を使用する場合のポートは 8443、TLS を使用しない場合は 8123 です。
データベース名: デフォルトで
default
という名前のデータベースがありますが、接続したいデータベースの名前を使用してください。ユーザー名とパスワード: デフォルトでユーザー名は
default
です。使用ケースに適したユーザー名を使用してください。
ClickHouse Cloud サービスの詳細は、ClickHouse Cloud コンソールで確認できます。 接続するサービスを選択し、接続 をクリックします:
HTTPS を選択すると、サンプルの curl
コマンドで詳細が確認できます。
セルフマネージドの ClickHouse を使用している場合、接続の詳細は ClickHouse 管理者によって設定されます。
2. Apache NiFiをダウンロードして実行
- 新しいセットアップのために、https://nifi.apache.org/download.html からバイナリをダウンロードし、
./bin/nifi.sh start
を実行して開始します。
3. ClickHouse JDBCドライバーをダウンロード
- GitHubのClickHouse JDBCドライバーリリースページを訪問し、最新のJDBCリリース版を探します。
- リリースバージョンで「Show all xx assets」をクリックし、「shaded」または「all」のキーワードを含むJARファイル、例えば
clickhouse-jdbc-0.5.0-all.jar
を探します。 - JARファイルをApache NiFiがアクセスできるフォルダーに配置し、絶対パスをメモしておきます。
4. DBCPConnectionPoolコントローラーサービスを追加してプロパティを設定
Apache NiFiでコントローラーサービスを設定するために、「歯車」ボタンをクリックしてNiFiフロー構成ページにアクセスします。
コントローラーサービスタブを選択し、右上の
+
ボタンをクリックして新しいコントローラーサービスを追加します。DBCPConnectionPool
を検索して「Add」ボタンをクリックします。新しく追加されたDBCPConnectionPoolはデフォルトで無効な状態になります。「歯車」ボタンをクリックして設定を開始します。
「プロパティ」セクションの下に以下の値を入力します。
プロパティ 値 備考 Database Connection URL jdbc:ch:https://HOSTNAME:8443/default?ssl=true 接続URLに応じてHOSTNAMEを置き換える Database Driver Class Name com.clickhouse.jdbc.ClickHouseDriver Database Driver Location(s) /etc/nifi/nifi-X.XX.X/lib/clickhouse-jdbc-0.X.X-patchXX-shaded.jar ClickHouse JDBCドライバJARファイルへの絶対パス Database User default ClickHouseのユーザー名 Password password ClickHouseのパスワード 設定セクションで、コントローラーサービスの名前を「ClickHouse JDBC」に変更し、参照しやすいようにします。
DBCPConnectionPoolコントローラーサービスを有効化するために「稲妻」ボタンをクリックし、「Enable」ボタンを押します。
コントローラーサービスタブを確認し、コントローラーサービスが有効になっていることを確かめます。
5. ExecuteSQLプロセッサを使用してテーブルから読み込み
ExecuteSQLプロセッサを追加し、適切な上流および下流のプロセッサを設定します。
ExecuteSQLプロセッサの「プロパティ」セクションの下に以下の値を入力します。
プロパティ 値 備考 Database Connection Pooling Service ClickHouse JDBC ClickHouse用に設定されたコントローラーサービスを選択 SQL select query SELECT * FROM system.metrics ここにクエリを入力 ExecuteSQLプロセッサを開始します。
クエリが正常に処理されたことを確認するために、出力キューのFlowFileを確認します。
出力FlowFileの結果をフォーマット表示で確認します。
6. MergeRecordとPutDatabaseRecordプロセッサを使用してテーブルに書き込み
複数の行を一度に挿入するには、まず複数のレコードを単一のレコードにマージする必要があります。これにはMergeRecordプロセッサを使用します。
MergeRecordプロセッサの「プロパティ」セクションの下に以下の値を入力します。
プロパティ 値 備考 Record Reader JSONTreeReader 適切なレコードリーダーを選択 Record Writer JSONReadSetWriter 適切なレコードライターを選択 Minimum Number of Records 1000 最低限マージされる必要がある行数を増やします。デフォルトは1行 Maximum Number of Records 10000 「Minimum Number of Records」より大きい値にします。デフォルトは1,000行 複数のレコードが1つにマージされていることを確認するために、MergeRecordプロセッサの入力と出力を確認します。出力は複数の入力レコードの配列です。
入力
出力
PutDatabaseRecordプロセッサの「プロパティ」セクションの下に以下の値を入力します。
プロパティ 値 備考 Record Reader JSONTreeReader 適切なレコードリーダーを選択 Database Type Generic デフォルトのままにします Statement Type INSERT Database Connection Pooling Service ClickHouse JDBC ClickHouseコントローラーサービスを選択 Table Name tbl テーブル名をここに入力 Translate Field Names false 挿入されるフィールド名がカラム名と一致する必要がある場合は「false」に設定 Maximum Batch Size 1000 挿入ごとの最大行数。この値はMergeRecordプロセッサの「Minimum Number of Records」の値より小さくしてはいけません。 各挿入に複数の行が含まれていることを確認するために、MergeRecordで定義された「Minimum Number of Records」の値以上にテーブルの行数が増加しているか確認します。
おめでとうございます - Apache NiFiを使用してClickHouseにデータを正常にロードしました!