Skip to main content

PostgreSQL テーブルエンジン

PostgreSQLエンジンを使用すると、リモートのPostgreSQLサーバーに保存されているデータに対してSELECTおよびINSERTクエリを実行できます。

Note

現在、PostgreSQLバージョン12以上のみがサポートされています。

Postgresデータのレプリケーションまたは移行にPeerDBを使用する

Postgresテーブルエンジンに加えて、ClickHouseによるPeerDBを使用して、PostgresからClickHouseへの継続的なデータパイプラインを設定できます。PeerDBは、変化データキャプチャ(CDC)を使用してPostgresからClickHouseにデータをレプリケートするために設計されたツールです。

テーブルの作成

CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 type1 [DEFAULT|MATERIALIZED|ALIAS expr1] [TTL expr1],
name2 type2 [DEFAULT|MATERIALIZED|ALIAS expr2] [TTL expr2],
...
) ENGINE = PostgreSQL({host:port, database, table, user, password[, schema, [, on_conflict]] | named_collection[, option=value [,..]]})

CREATE TABLE クエリの詳細な説明を参照してください。

テーブル構造は、オリジナルのPostgreSQLテーブル構造と異なる場合があります:

  • カラム名はオリジナルのPostgreSQLテーブルと同じである必要がありますが、これらのカラムの一部を任意の順序で使用できます。
  • カラムタイプは、オリジナルのPostgreSQLテーブルのタイプと異なる場合があります。ClickHouseは値をClickHouseデータタイプにキャストしようとします。
  • external_table_functions_use_nulls設定は、Nullableカラムをどのように処理するかを定義します。デフォルト値: 1。0の場合、テーブル関数はNullableカラムを作成せず、nullの代わりにデフォルト値を挿入します。これは配列内のNULL値にも適用されます。

エンジンパラメータ

  • host:port — PostgreSQLサーバーのアドレス。
  • database — リモートのデータベース名。
  • table — リモートのテーブル名。
  • user — PostgreSQLユーザー。
  • password — ユーザーパスワード。
  • schema — デフォルト以外のテーブルスキーマ。オプション。
  • on_conflict — 衝突解決戦略。例: ON CONFLICT DO NOTHING。オプション。このオプションを追加すると挿入が非効率になります。

プロダクション環境では名前付きコレクション(バージョン21.11以降で利用可能)が推奨されます。以下は例です:

<named_collections>
<postgres_creds>
<host>localhost</host>
<port>5432</port>
<user>postgres</user>
<password>****</password>
<schema>schema1</schema>
</postgres_creds>
</named_collections>

一部のパラメータはキー値引数によって上書き可能です:

SELECT * FROM postgresql(postgres_creds, table='table1');

実装の詳細

PostgreSQL側のSELECTクエリは、読み取り専用のPostgreSQLトランザクション内でCOPY (SELECT ...) TO STDOUTとして実行され、各SELECTクエリの後にコミットされます。

=, !=, >, >=, <, <=, INのような単純なWHERE句はPostgreSQLサーバーで実行されます。

すべての結合、集計、並べ替え、IN [ array ]条件、およびLIMITサンプリング制約は、PostgreSQLへのクエリが終了した後にのみClickHouseで実行されます。

PostgreSQL側のINSERTクエリは、各INSERTステートメントの後に自動コミットされるPostgreSQLトランザクション内でCOPY "table_name" (field1, field2, ... fieldN) FROM STDINとして実行されます。

PostgreSQLのArray型はClickHouseの配列に変換されます。

Note

注意 - PostgreSQLでは、type_name[]のように作成された配列データは、同じカラム内の異なるテーブル行に異なる次元の多次元配列を含むことがあります。しかしClickHouseでは、同じカラム内のすべてのテーブル行に同じ次元数の多次元配列を持つことのみが許可されています。

複数のレプリカをサポートしており、|でリストする必要があります。例:

CREATE TABLE test_replicas (id UInt32, name String) ENGINE = PostgreSQL(`postgres{2|3|4}:5432`, 'clickhouse', 'test_replicas', 'postgres', 'mysecretpassword');

PostgreSQL Dictionary ソース用のレプリカプライオリティがサポートされています。マップ内の数字が大きいほど優先順位は低く、最高優先順位は0です。

以下の例では、レプリカexample01-1が最高優先順位を持っています:

<postgresql>
<port>5432</port>
<user>clickhouse</user>
<password>qwerty</password>
<replica>
<host>example01-1</host>
<priority>1</priority>
</replica>
<replica>
<host>example01-2</host>
<priority>2</priority>
</replica>
<db>db_name</db>
<table>table_name</table>
<where>id=10</where>
<invalidate_query>SQL_QUERY</invalidate_query>
</postgresql>
</source>

使用例

PostgreSQLのテーブル

postgres=# CREATE TABLE "public"."test" (
"int_id" SERIAL,
"int_nullable" INT NULL DEFAULT NULL,
"float" FLOAT NOT NULL,
"str" VARCHAR(100) NOT NULL DEFAULT '',
"float_nullable" FLOAT NULL DEFAULT NULL,
PRIMARY KEY (int_id));

CREATE TABLE

postgres=# INSERT INTO test (int_id, str, "float") VALUES (1,'test',2);
INSERT 0 1

postgresql> SELECT * FROM test;
int_id | int_nullable | float | str | float_nullable
--------+--------------+-------+------+----------------
1 | | 2 | test |
(1 row)

ClickHouseでのテーブル作成と上記で作成したPostgreSQLテーブルへの接続

この例では、PostgreSQLテーブルエンジンを使用して、ClickHouseテーブルをPostgreSQLテーブルに接続し、PostgreSQLデータベースに対してSELECTINSERTの両方のステートメントを使用します:

CREATE TABLE default.postgresql_table
(
`float_nullable` Nullable(Float32),
`str` String,
`int_id` Int32
)
ENGINE = PostgreSQL('localhost:5432', 'public', 'test', 'postges_user', 'postgres_password');

SELECTクエリを使用してPostgreSQLテーブルからClickHouseテーブルに初期データを挿入

postgresqlテーブル関数は、PostgreSQLからClickHouseへのデータをコピーし、ClickHouseでクエリを実行または分析を行うことでデータのクエリパフォーマンスを向上させるか、PostgreSQLからClickHouseへのデータ移行に使用されます。ここではPostgreSQLからClickHouseへデータをコピーするので、ClickHouseではMergeTreeテーブルエンジンを使用し、postgresql_copyと呼びます:

CREATE TABLE default.postgresql_copy
(
`float_nullable` Nullable(Float32),
`str` String,
`int_id` Int32
)
ENGINE = MergeTree
ORDER BY (int_id);
INSERT INTO default.postgresql_copy
SELECT * FROM postgresql('localhost:5432', 'public', 'test', 'postges_user', 'postgres_password');

PostgreSQLテーブルからClickHouseテーブルへの増分データの挿入

初回の挿入後にPostgreSQLテーブルとClickHouseテーブルの間で継続的な同期を行う場合、ClickHouseでWHERE句を使用して、タイムスタンプまたはユニークなシーケンスIDに基づいてPostgreSQLに追加されたデータのみを挿入できます。

これは以前に追加された最大のIDまたはタイムスタンプを追跡するといった作業を必要とします。例:

SELECT max(`int_id`) AS maxIntID FROM default.postgresql_copy;

その後、PostgreSQLテーブルからmaxIDより大きい値を挿入します:

INSERT INTO default.postgresql_copy
SELECT * FROM postgresql('localhost:5432', 'public', 'test', 'postges_user', 'postgres_password');
WHERE int_id > maxIntID;

ClickHouseテーブルからのデータ選択

SELECT * FROM postgresql_copy WHERE str IN ('test');
┌─float_nullable─┬─str──┬─int_id─┐
│ ᴺᵁᴸᴸ │ test │ 1 │
└────────────────┴──────┴────────┘

デフォルト以外のスキーマを使用する

postgres=# CREATE SCHEMA "nice.schema";

postgres=# CREATE TABLE "nice.schema"."nice.table" (a integer);

postgres=# INSERT INTO "nice.schema"."nice.table" SELECT i FROM generate_series(0, 99) as t(i)
CREATE TABLE pg_table_schema_with_dots (a UInt32)
ENGINE PostgreSQL('localhost:5432', 'clickhouse', 'nice.table', 'postgrsql_user', 'password', 'nice.schema');

関連項目

関連コンテンツ