Skip to main content
Edit this page

ClickHouse Rust クライアント

ClickHouseに接続するための公式Rustクライアントは、もともとPaul Loydによって開発されました。クライアントのソースコードはGitHubリポジトリで利用可能です。

概要

  • 行をエンコード/デコードするためにserdeを使用します。
  • serde属性をサポート:skip_serializing, skip_deserializing, rename
  • HTTPトランスポート上でRowBinaryフォーマットを使用します。
    • TCP上でNativeに切り替える計画があります。
  • TLSをサポートします(native-tlsおよびrustls-tls機能を通じて)。
  • 圧縮と解凍(LZ4)をサポートします。
  • データの選択や挿入、DDLの実行、クライアント側でのバッチ処理のためのAPIを提供します。
  • 単体テスト用の便利なモックを提供します。

インストール

クレートを使用するには、Cargo.tomlに次のように追加してください:

[dependencies]
clickhouse = "0.12.2"

[dev-dependencies]
clickhouse = { version = "0.12.2", features = ["test-util"] }

参考: crates.ioページ

Cargo機能

  • lz4(デフォルトで有効) — Compression::Lz4Compression::Lz4Hc(_)のバリアントを有効にします。有効にすると、Compression::Lz4WATCHを除くすべてのクエリに対してデフォルトで使用されます。
  • native-tlsHTTPSスキーマを持つURLをhyper-tlsを通じてサポートし、OpenSSLにリンクします。
  • rustls-tlsHTTPSスキーマを持つURLをhyper-rustlsを通じてサポートし、OpenSSLにリンクしません。
  • inserterclient.inserter()を有効にします。
  • test-util — モックを追加します。詳しくはを参照してください。dev-dependenciesのみに使用してください。
  • watchclient.watch機能を有効にします。詳細については該当セクションを参照してください。
  • uuidserde::uuidを追加し、uuidクレートと連携します。
  • timeserde::timeを追加し、timeクレートと連携します。
Info

HTTPS URL経由でClickHouseに接続する場合は、native-tlsまたはrustls-tls機能を有効にする必要があります。 両方を有効にした場合、rustls-tls機能が優先されます。

ClickHouseバージョンの互換性

クライアントはClickHouseのLTSまたはそれ以降のバージョン、およびClickHouse Cloudと互換性があります。

ClickHouseサーバーv22.6より前のバージョンは、いくつかの稀なケースでRowBinaryを正しく処理しません。 この問題を解決するには、v0.11+を使用し、wa-37420機能を有効にできます。この機能は新しいClickHouseバージョンでは使用しないでください。

クライアントリポジトリのでクライアント利用のさまざまなシナリオをカバーすることを目指しています。概要はexamples READMEで利用可能です。

例や以下のドキュメントで不明な点や不足がある場合は、お問い合わせください

使用方法

Note

ch2rs crateは、ClickHouseから行タイプを生成するのに便利です。

クライアントインスタンスの作成

Tip

作成されたクライアントを再利用するか、基礎のhyper接続プールを再利用するためにクローンしてください。

use clickhouse::Client;

let client = Client::default()
// プロトコルとポートの両方を含める必要があります
.with_url("http://localhost:8123")
.with_user("name")
.with_password("123")
.with_database("test");

HTTPSまたはClickHouse Cloud接続

HTTPSはrustls-tlsまたはnative-tlsのcargo機能で動作します。

その後、通常の方法でクライアントを作成します。この例では、環境変数を使用して接続の詳細を保存しています:

Info

URLはプロトコルとポートの両方を含める必要があります。例:https://instance.clickhouse.cloud:8443

fn read_env_var(key: &str) -> String {
env::var(key).unwrap_or_else(|_| panic!("{key} env variable should be set"))
}

let client = Client::default()
.with_url(read_env_var("CLICKHOUSE_URL"))
.with_user(read_env_var("CLICKHOUSE_USER"))
.with_password(read_env_var("CLICKHOUSE_PASSWORD"));

参考:

行の選択

use serde::Deserialize;
use clickhouse::Row;
use clickhouse::sql::Identifier;

#[derive(Row, Deserialize)]
struct MyRow<'a> {
no: u32,
name: &'a str,
}

let table_name = "some";
let mut cursor = client
.query("SELECT ?fields FROM ? WHERE no BETWEEN ? AND ?")
.bind(Identifier(table_name))
.bind(500)
.bind(504)
.fetch::<MyRow<'_>>()?;

while let Some(row) = cursor.next().await? { .. }
  • プレースホルダー?fieldsは、Rowのフィールドno, nameに置き換えられます。
  • プレースホルダー?は、以下のbind()呼び出しで値に置き換えられます。
  • 便利なfetch_one::<Row>()fetch_all::<Row>()メソッドは、それぞれ最初の行またはすべての行を取得するために使用できます。
  • sql::Identifierはテーブル名をバインドするために使用できます。

注意:応答全体がストリーミングされるため、カーソルは一部の行を生成した後でもエラーを返す場合があります。この場合、サーバー側の応答バッファリングを有効にするためにquery(...).with_option("wait_end_of_query", "1")を試すことができます。詳細はこちらbuffer_sizeオプションも役立ちます。

Danger

行を選択するときにwait_end_of_queryを慎重に使用してください。サーバー側でのメモリ消費が増加し、パフォーマンス全体が低下する可能性があります。

行の挿入

use serde::Serialize;
use clickhouse::Row;

#[derive(Row, Serialize)]
struct MyRow {
no: u32,
name: String,
}

let mut insert = client.insert("some")?;
insert.write(&MyRow { no: 0, name: "foo".into() }).await?;
insert.write(&MyRow { no: 1, name: "bar".into() }).await?;
insert.end().await?;
  • end()が呼び出されないと、INSERTは中止されます。
  • 行はネットワーク負荷を分配するストリームとして順次送信されます。
  • ClickHouseは、すべての行が同じパーティションに適合し、その数がmax_insert_block_size以下である場合にのみバッチをアトミックに挿入します。

非同期挿入(サーバー側バッチ処理)

クライアント側でデータのバッチ処理を避けるためにClickHouse非同期挿入を利用できます。これは、async_insertオプションをinsertメソッドに(またはクライアントインスタンス自体に)提供することで実現できます。これにより、すべてのinsert呼び出しに影響します。

let client = Client::default()
.with_url("http://localhost:8123")
.with_option("async_insert", "1")
.with_option("wait_for_async_insert", "0");

参考:

インサータ機能(クライアント側バッチ処理)

inserter cargo機能が必要です。

let mut inserter = client.inserter("some")?
.with_timeouts(Some(Duration::from_secs(5)), Some(Duration::from_secs(20)))
.with_max_bytes(50_000_000)
.with_max_rows(750_000)
.with_period(Some(Duration::from_secs(15)));

inserter.write(&MyRow { no: 0, name: "foo".into() })?;
inserter.write(&MyRow { no: 1, name: "bar".into() })?;
let stats = inserter.commit().await?;
if stats.rows > 0 {
println!(
"{} bytes, {} rows, {} transactions have been inserted",
stats.bytes, stats.rows, stats.transactions,
);
}

// アプリケーション終了時にinserterを最終化し、残りの行をコミットすることを忘れないでください。. `end()`は統計も提供します。
inserter.end().await?;
  • Inserterは、いずれかのしきい値(max_bytesmax_rowsperiod)に達した場合にcommit()でアクティブな挿入を終了します。
  • パラレルインサータによる負荷スパイクを避けるために、with_period_biasを使用してアクティブなINSERT間の間隔がバイアスされます。
  • Inserter::time_left()を使用して、現在の期間が終了した時を検出できます。項目がまれに生成される場合は、再びInserter::commit()を呼び出して制限をチェックしてください。
  • 時間しきい値は、インサータを高速にするためにquantaクレートを使用して実装されています。test-utilが有効な場合、使用されません(したがって、カスタムテストでtokio::time::advance()によって時間を管理できます)。
  • commit()呼び出し間のすべての行は、同じINSERTステートメントで挿入されます。
Danger

挿入を終了/最終化する場合にフラッシュすることを忘れないでください:

inserter.end().await?;

DDLの実行

単一ノードの展開では、次のようにDDLを実行するだけで十分です:

client.query("DROP TABLE IF EXISTS some").execute().await?;

しかし、負荷分散装置やClickHouse Cloudを使用している場合、すべてのレプリカでDDLが適用されるのを待つことが推奨されます。これはwait_end_of_queryオプションを使用して行うことができます:

client
.query("DROP TABLE IF EXISTS some")
.with_option("wait_end_of_query", "1")
.execute()
.await?;

ClickHouse設定

with_optionメソッドを使用して、さまざまなClickHouse設定を適用できます。例えば:

let numbers = client
.query("SELECT number FROM system.numbers")
// この設定はこの特定のクエリにのみ適用されます。グローバルなクライアント設定を上書きします。
.with_option("limit", "3")
.fetch_all::<u64>()
.await?;

queryと同様に、insertおよびinserter メソッドでも同様に機能します。さらに、Clientインスタンスでこのメソッドを呼び出して、すべてのクエリに対するグローバル設定を行うことができます。

クエリID

with_optionを使用して、ClickHouseのクエリログでクエリを識別するためにquery_idオプションを設定できます。

let numbers = client
.query("SELECT number FROM system.numbers LIMIT 1")
.with_option("query_id", "some-query-id")
.fetch_all::<u64>()
.await?;

queryと同様に、insertおよびinserter メソッドでも同様に機能します。

Danger

query_idを手動で設定する場合は、それが一意であることを確認してください。UUIDはそのための良い選択肢です。

参考:クライアントリポジトリのquery_idの例

セッションID

query_idと同様に、同じセッションでステートメントを実行するためにsession_idを設定できます。session_idは、クライアントレベルでグローバルに、またはqueryinsertinserter呼び出しごとに設定することができます。

let client = Client::default()
.with_url("http://localhost:8123")
.with_option("session_id", "my-session");
Danger

クラスター化されたデプロイメントでは、「スティッキーセッション」がないため、この機能を適切に利用するには特定のクラスター ノードに接続する必要があります。たとえば、ラウンドロビンの負荷分散装置は、後続のリクエストが同じ ClickHouse ノードによって処理されることを保証しません。

参考:クライアントリポジトリのsession_idの例

カスタムHTTPヘッダー

プロキシ認証を使用している場合やカスタムヘッダーを渡す必要がある場合、次のように行うことができます:

let client = Client::default()
.with_url("http://localhost:8123")
.with_header("X-My-Header", "hello");

参考:クライアントリポジトリのカスタムHTTPヘッダーの例

カスタムHTTPクライアント

基礎となるHTTP接続プールの設定を微調整するのに役立ちます。

use hyper_util::client::legacy::connect::HttpConnector;
use hyper_util::client::legacy::Client as HyperClient;
use hyper_util::rt::TokioExecutor;

let connector = HttpConnector::new(); // またはHttpsConnectorBuilder
let hyper_client = HyperClient::builder(TokioExecutor::new())
// クライアント側で特定のアイドルソケットを生かす時間(ミリ秒単位)。
// これはClickHouseサーバーのKeepAliveタイムアウトよりもかなり短いことが想定されています。
// これは、デフォルトで23.11バージョン以前の3秒、以降のバージョンの後10秒でした。
.pool_idle_timeout(Duration::from_millis(2_500))
// プール内で許可される最大のアイドルKeep-Alive接続。
.pool_max_idle_per_host(4)
.build(connector);

let client = Client::with_http_client(hyper_client).with_url("http://localhost:8123");
Danger

この例はレガシーなHyper APIに依存しており、将来変更される可能性があります。

参考:クライアントリポジトリのカスタムHTTPクライアントの例

データ型

  • (U)Int(8|16|32|64|128)は対応する(u|i)(8|16|32|64|128)型またはその周りの新しい型にマッピングします。
  • (U)Int256は直接サポートされていませんが、回避策があります
  • Float(32|64)は対応するf(32|64)またはその周りの新しい型にマッピングします。
  • Decimal(32|64|128)は対応するi(32|64|128)またはその周りの新しい型にマッピングします。fixnumや他の実装のサイン付き固定小数点数を使用することがより便利です。
  • Booleanboolまたはその周りの新しい型にマッピングします。
  • Stringは任意の文字列またはバイト型にマッピングします。例:&str, &[u8], String, Vec<u8>またはSmartString。新しい型もサポートされます。バイトを保存するには、serde_bytesを使用することを考慮してください。これはより効率的です。
#[derive(Row, Debug, Serialize, Deserialize)]
struct MyRow<'a> {
str: &'a str,
string: String,
#[serde(with = "serde_bytes")]
bytes: Vec<u8>,
#[serde(with = "serde_bytes")]
byte_slice: &'a [u8],
}
  • FixedString(N)はバイトの配列としてサポートされます。例:[u8; N]
#[derive(Row, Debug, Serialize, Deserialize)]
struct MyRow {
fixed_str: [u8; 16], // FixedString(16)
}
  • Enum(8|16)serde_reprを使用してサポートされています。
use serde_repr::{Deserialize_repr, Serialize_repr};

#[derive(Row, Serialize, Deserialize)]
struct MyRow {
level: Level,
}

#[derive(Debug, Serialize_repr, Deserialize_repr)]
#[repr(u8)]
enum Level {
Debug = 1,
Info = 2,
Warn = 3,
Error = 4,
}
  • UUIDserde::uuidを使用してuuid::Uuidにマッピングされます。uuid機能が必要です。
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
#[serde(with = "clickhouse::serde::uuid")]
uuid: uuid::Uuid,
}
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
#[serde(with = "clickhouse::serde::ipv4")]
ipv4: std::net::Ipv4Addr,
}
  • Dateu16またはその周りの新しい型にマッピングされ、 1970-01-01から経過した日数を表します。また、serde::time::dateを使用して、time::Dateにマッピングされます。time機能が必要です。
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
days: u16,
#[serde(with = "clickhouse::serde::time::date")]
date: Date,
}
  • Date32i32またはその周りの新しい型にマッピングされ、1970-01-01から経過した日数を表します。また、serde::time::date32を使用して、time::Dateにマッピングされます。time機能が必要です。
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
days: i32,
#[serde(with = "clickhouse::serde::time::date32")]
date: Date,
}
  • DateTimeu32またはその周りの新しい型にマッピングされ、UNIX時代から経過した秒数を表します。また、serde::time::datetimeを使用して、time::OffsetDateTimeにマッピングされます。time機能が必要です。
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
ts: u32,
#[serde(with = "clickhouse::serde::time::datetime")]
dt: OffsetDateTime,
}
  • DateTime64(_)i32またはその周りの新しい型にマッピングされ、UNIX時代から経過した時間を表します。また、serde::time::datetime64::*を使用して、time::OffsetDateTimeにマッピングされます。time機能が必要です。
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
ts: i64, // `DateTime64(X)`に応じた経過秒/μs/ms/ns
#[serde(with = "clickhouse::serde::time::datetime64::secs")]
dt64s: OffsetDateTime, // `DateTime64(0)`
#[serde(with = "clickhouse::serde::time::datetime64::millis")]
dt64ms: OffsetDateTime, // `DateTime64(3)`
#[serde(with = "clickhouse::serde::time::datetime64::micros")]
dt64us: OffsetDateTime, // `DateTime64(6)`
#[serde(with = "clickhouse::serde::time::datetime64::nanos")]
dt64ns: OffsetDateTime, // `DateTime64(9)`
}
  • Tuple(A, B, ...)(A, B, ...)またはその周りの新しい型にマッピングされます。
  • Array(_)は任意のスライス、例:Vec<_>, &[_]にマッピングされます。新しい型もサポートされます。
  • Map(K, V)Array((K, V))のように動作します。
  • LowCardinality(_)はシームレスにサポートされます。
  • Nullable(_)Option<_>にマッピングされます。clickhouse::serde::*ヘルパーに対しては::optionを追加します。
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
#[serde(with = "clickhouse::serde::ipv4::option")]
ipv4_opt: Option<Ipv4Addr>,
}
  • Nestedはリネーミングを使用して複数の配列を提供することでサポートされます。
// CREATE TABLE test(items Nested(name String, count UInt32))
#[derive(Row, Serialize, Deserialize)]
struct MyRow {
#[serde(rename = "items.name")]
items_name: Vec<String>,
#[serde(rename = "items.count")]
items_count: Vec<u32>,
}
  • Geo型はサポートされています。 Pointはタプル(f64, f64)として動作し、その他の型は単に点のスライスです。
type Point = (f64, f64);
type Ring = Vec<Point>;
type Polygon = Vec<Ring>;
type MultiPolygon = Vec<Polygon>;
type LineString = Vec<Point>;
type MultiLineString = Vec<LineString>;

#[derive(Row, Serialize, Deserialize)]
struct MyRow {
point: Point,
ring: Ring,
polygon: Polygon,
multi_polygon: MultiPolygon,
line_string: LineString,
multi_line_string: MultiLineString,
}
  • VariantDynamic、(新しい)JSONデータ型はまだサポートされていません。

モック

このクレートは、CHサーバーをモックし、DDL、SELECTINSERTWATCHクエリをテストするためのユーティリティを提供します。この機能はtest-util機能で有効になります。dev-dependencyとしてのみ使用してください。

参考:

トラブルシューティング

CANNOT_READ_ALL_DATA

CANNOT_READ_ALL_DATAエラーの最も一般的な原因は、アプリケーション側の行定義がClickHouseのものと一致していないことです。

次のようなテーブルを考えてみます:

CREATE OR REPLACE TABLE event_log (id UInt32)
ENGINE = MergeTree
ORDER BY timestamp

そして、アプリケーション側でEventLogが次のように定義されている場合、型が一致していません:

#[derive(Debug, Serialize, Deserialize, Row)]
struct EventLog {
id: String, // <- 本来はu32であるべき!
}

データを挿入すると、次のエラーが発生する可能性があります:

Error: BadResponse("Code: 33. DB::Exception: Cannot read all data. Bytes read: 5. Bytes expected: 23.: (at row 1)\n: While executing BinaryRowInputFormat. (CANNOT_READ_ALL_DATA)")

この例を正確にするためには、EventLog構造体を次のように正しく定義します:

#[derive(Debug, Serialize, Deserialize, Row)]
struct EventLog {
id: u32
}

既知の制限

  • VariantDynamic、(新しい)JSONデータ型はまだサポートされていません。
  • サーバー側のパラメータバインディングはまだサポートされていません。追跡するためにこの問題を参照してください。

お問い合わせ

質問がある場合や手助けが必要な場合は、Community SlackGitHub issuesを通じてお気軽にお問い合わせください。