Skip to main content

ClickHouseとApache Sparkの統合

Apache Spark Apache Spark™は、シングルノードマシンまたはクラスターでデータエンジニアリング、データサイエンス、および機械学習を実行するための多言語エンジンです。

Apache SparkとClickHouseを接続する主な方法は2つあります:

  1. Spark Connector - SparkコネクタはDataSourceV2を実装し、独自のカタログ管理を備えています。現在、ClickHouseとSparkを統合する推奨方法です。
  2. Spark JDBC - JDBCデータソースを使用してSparkとClickHouseを統合します。

Spark Connector

このコネクタは、高度なパーティショニングや述語プッシュダウンなど、ClickHouseに特化した最適化を活用してクエリ性能やデータ処理を向上させます。コネクタはClickHouseの公式JDBCコネクタに基づいており、独自のカタログを管理します。

要件

  • Java 8または17
  • Scala 2.12または2.13
  • Apache Spark 3.3または3.4または3.5

互換性マトリックス

バージョン互換性のあるSparkバージョンClickHouse JDBCバージョン
mainSpark 3.3, 3.4, 3.50.6.3
0.8.0Spark 3.3, 3.4, 3.50.6.3
0.7.3Spark 3.3, 3.40.4.6
0.6.0Spark 3.30.3.2-patch11
0.5.0Spark 3.2, 3.30.3.2-patch11
0.4.0Spark 3.2, 3.3依存なし
0.3.0Spark 3.2, 3.3依存なし
0.2.1Spark 3.2依存なし
0.1.2Spark 3.2依存なし

ライブラリのダウンロード

バイナリJARの名前パターンは:

clickhouse-spark-runtime-${spark_binary_version}_${scala_binary_version}-${version}.jar

すべてのリリースされたJARはMaven Central Repositoryで見つけることができ、すべてのデイリービルドのSNAPSHOT JARはSonatype OSS Snapshots Repositoryで確認できます。

依存関係としてのインポート

Gradle

dependencies {
implementation("com.clickhouse.spark:clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }}")
implementation("com.clickhouse:clickhouse-jdbc:{{ clickhouse_jdbc_version }}:all") { transitive = false }
}

SNAPSHOTバージョンを使用したい場合は、次のリポジトリを追加します:

repositories {
maven { url = "https://s01.oss.sonatype.org/content/repositories/snapshots" }
}

Maven

<dependency>
<groupId>com.clickhouse.spark</groupId>
<artifactId>clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}</artifactId>
<version>{{ stable_version }}</version>
</dependency>
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<classifier>all</classifier>
<version>{{ clickhouse_jdbc_version }}</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

SNAPSHOTバージョンを使用したい場合は、次のリポジトリを追加します。

<repositories>
<repository>
<id>sonatype-oss-snapshots</id>
<name>Sonatype OSS Snapshots Repository</name>
<url>https://s01.oss.sonatype.org/content/repositories/snapshots</url>
</repository>
</repositories>

Spark SQLで遊ぶ

注意:SQLのみの使用例に対しては、Apache Kyuubiを本番環境で使用することが推奨されます。

Spark SQL CLIの起動

$SPARK_HOME/bin/spark-sql \
--conf spark.sql.catalog.clickhouse=com.clickhouse.spark.ClickHouseCatalog \
--conf spark.sql.catalog.clickhouse.host=${CLICKHOUSE_HOST:-127.0.0.1} \
--conf spark.sql.catalog.clickhouse.protocol=http \
--conf spark.sql.catalog.clickhouse.http_port=${CLICKHOUSE_HTTP_PORT:-8123} \
--conf spark.sql.catalog.clickhouse.user=${CLICKHOUSE_USER:-default} \
--conf spark.sql.catalog.clickhouse.password=${CLICKHOUSE_PASSWORD:-} \
--conf spark.sql.catalog.clickhouse.database=default \
--jars /path/clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }}.jar,/path/clickhouse-jdbc-{{ clickhouse_jdbc_version }}-all.jar

次の引数

  --jars /path/clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }}.jar,/path/clickhouse-jdbc-{{ clickhouse_jdbc_version }}-all.jar

は次のように置き換えることができます

  --repositories https://{maven-cental-mirror or private-nexus-repo} \
--packages com.clickhouse.spark:clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }},com.clickhouse:clickhouse-jdbc:{{ clickhouse_jdbc_version }}:all

これにより、JARをSparkクライアントノードにコピーする必要がなくなります。

操作

基本的な操作、例えばデータベースの作成、テーブルの作成、テーブルへの書き込み、テーブルの読み込みなど。

spark-sql> use clickhouse;
Time taken: 0.016 seconds

spark-sql> create database if not exists test_db;
Time taken: 0.022 seconds

spark-sql> show databases;
default
system
test_db
Time taken: 0.289 seconds, Fetched 3 row(s)

spark-sql> CREATE TABLE test_db.tbl_sql (
> create_time TIMESTAMP NOT NULL,
> m INT NOT NULL COMMENT 'part key',
> id BIGINT NOT NULL COMMENT 'sort key',
> value STRING
> ) USING ClickHouse
> PARTITIONED BY (m)
> TBLPROPERTIES (
> engine = 'MergeTree()',
> order_by = 'id',
> settings.index_granularity = 8192
> );
Time taken: 0.242 seconds

spark-sql> insert into test_db.tbl_sql values
> (timestamp'2021-01-01 10:10:10', 1, 1L, '1'),
> (timestamp'2022-02-02 10:10:10', 2, 2L, '2')
> as tabl(create_time, m, id, value);
Time taken: 0.276 seconds

spark-sql> select * from test_db.tbl_sql;
2021-01-01 10:10:10 1 1 1
2022-02-02 10:10:10 2 2 2
Time taken: 0.116 seconds, Fetched 2 row(s)

spark-sql> insert into test_db.tbl_sql select * from test_db.tbl_sql;
Time taken: 1.028 seconds

spark-sql> insert into test_db.tbl_sql select * from test_db.tbl_sql;
Time taken: 0.462 seconds

spark-sql> select count(*) from test_db.tbl_sql;
6
Time taken: 1.421 seconds, Fetched 1 row(s)

spark-sql> select * from test_db.tbl_sql;
2021-01-01 10:10:10 1 1 1
2021-01-01 10:10:10 1 1 1
2021-01-01 10:10:10 1 1 1
2022-02-02 10:10:10 2 2 2
2022-02-02 10:10:10 2 2 2
2022-02-02 10:10:10 2 2 2
Time taken: 0.123 seconds, Fetched 6 row(s)

spark-sql> delete from test_db.tbl_sql where id = 1;
Time taken: 0.129 seconds

spark-sql> select * from test_db.tbl_sql;
2022-02-02 10:10:10 2 2 2
2022-02-02 10:10:10 2 2 2
2022-02-02 10:10:10 2 2 2
Time taken: 0.101 seconds, Fetched 3 row(s)

Spark Shellで遊ぶ

Spark Shellの起動

$SPARK_HOME/bin/spark-shell \
--conf spark.sql.catalog.clickhouse=com.clickhouse.spark.ClickHouseCatalog \
--conf spark.sql.catalog.clickhouse.host=${CLICKHOUSE_HOST:-127.0.0.1} \
--conf spark.sql.catalog.clickhouse.protocol=http \
--conf spark.sql.catalog.clickhouse.http_port=${CLICKHOUSE_HTTP_PORT:-8123} \
--conf spark.sql.catalog.clickhouse.user=${CLICKHOUSE_USER:-default} \
--conf spark.sql.catalog.clickhouse.password=${CLICKHOUSE_PASSWORD:-} \
--conf spark.sql.catalog.clickhouse.database=default \
--jars /path/clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }}.jar,/path/clickhouse-jdbc-{{ clickhouse_jdbc_version }}-all.jar

次の引数

  --jars /path/clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }}.jar,/path/clickhouse-jdbc-{{ clickhouse_jdbc_version }}-all.jar

は次のように置き換えることができます

  --repositories https://{maven-cental-mirror or private-nexus-repo} \
--packages com.clickhouse.spark:clickhouse-spark-runtime-{{ spark_binary_version }}_{{ scala_binary_version }}:{{ stable_version }},com.clickhouse:clickhouse-jdbc:{{ clickhouse_jdbc_version }}:all

これにより、JARをSparkクライアントノードにコピーする必要がなくなります。

操作

基本的な操作、例えばデータベースの作成、テーブルの作成、テーブルへの書き込み、テーブルの読み込みなど。

scala> spark.sql("use clickhouse")
res0: org.apache.spark.sql.DataFrame = []

scala> spark.sql("create database test_db")
res1: org.apache.spark.sql.DataFrame = []

scala> spark.sql("show databases").show
+---------+
|namespace|
+---------+
| default|
| system|
| test_db|
+---------+

scala> spark.sql("""
| CREATE TABLE test_db.tbl (
| create_time TIMESTAMP NOT NULL,
| m INT NOT NULL COMMENT 'part key',
| id BIGINT NOT NULL COMMENT 'sort key',
| value STRING
| ) USING ClickHouse
| PARTITIONED BY (m)
| TBLPROPERTIES (
| engine = 'MergeTree()',
| order_by = 'id',
| settings.index_granularity = 8192
| )
| """)
res2: org.apache.spark.sql.DataFrame = []

scala> :paste
// Pasteモードに入ります(ctrl-Dで終了)

spark.createDataFrame(Seq(
("2021-01-01 10:10:10", 1L, "1"),
("2022-02-02 10:10:10", 2L, "2")
)).toDF("create_time", "id", "value")
.withColumn("create_time", to_timestamp($"create_time"))
.withColumn("m", month($"create_time"))
.select($"create_time", $"m", $"id", $"value")
.writeTo("test_db.tbl")
.append

// Pasteモードを終了し、現在解釈中です。

scala> spark.table("test_db.tbl").show
+-------------------+---+---+-----+
| create_time| m| id|value|
+-------------------+---+---+-----+
|2021-01-01 10:10:10| 1| 1| 1|
|2022-02-02 10:10:10| 2| 2| 2|
+-------------------+---+---+-----+

scala> spark.sql("DELETE FROM test_db.tbl WHERE id=1")
res3: org.apache.spark.sql.DataFrame = []

scala> spark.table("test_db.tbl").show
+-------------------+---+---+-----+
| create_time| m| id|value|
+-------------------+---+---+-----+
|2022-02-02 10:10:10| 2| 2| 2|
+-------------------+---+---+-----+

ClickHouseのネイティブSQLを実行します。

scala> val options = Map(
| "host" -> "clickhouse",
| "protocol" -> "http",
| "http_port" -> "8123",
| "user" -> "default",
| "password" -> ""
| )

scala> val sql = """
| |CREATE TABLE test_db.person (
| | id Int64,
| | name String,
| | age Nullable(Int32)
| |)
| |ENGINE = MergeTree()
| |ORDER BY id
| """.stripMargin

scala> spark.executeCommand("com.clickhouse.spark.ClickHouseCommandRunner", sql, options)

scala> spark.sql("show tables in clickhouse_s1r1.test_db").show
+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
| test_db| person| false|
+---------+---------+-----------+

scala> spark.table("clickhouse_s1r1.test_db.person").printSchema
root
|-- id: long (nullable = false)
|-- name: string (nullable = false)
|-- age: integer (nullable = true)

サポートされているデータ型

このセクションでは、SparkとClickHouse間のデータ型のマッピングについて説明します。以下の表は、ClickHouseからSparkにデータを読み取る際のデータ型の変換や、SparkからClickHouseにデータを挿入する際の変換のクイックリファレンスを提供します。

ClickHouseからSparkへのデータ読み込み

ClickHouse データ型Spark データ型サポートされているプリミティブかメモ
NothingNullTypeYes
BoolBooleanTypeYes
UInt8, Int16ShortTypeYes
Int8ByteTypeYes
UInt16,Int32IntegerTypeYes
UInt32,Int64, UInt64LongTypeYes
Int128,UInt128, Int256, UInt256DecimalType(38, 0)Yes
Float32FloatTypeYes
Float64DoubleTypeYes
String, JSON, UUID, Enum8, Enum16, IPv4, IPv6StringTypeYes
FixedStringBinaryType, StringTypeYes設定 READ_FIXED_STRING_AS で制御されます
DecimalDecimalTypeYes精度とスケールは Decimal128 まで可能
Decimal32DecimalType(9, scale)Yes
Decimal64DecimalType(18, scale)Yes
Decimal128DecimalType(38, scale)Yes
Date, Date32DateTypeYes
DateTime, DateTime32, DateTime64TimestampTypeYes
ArrayArrayTypeNo配列要素の型も変換されます
MapMapTypeNoキーは StringType に制限されます
IntervalYearYearMonthIntervalType(Year)Yes
IntervalMonthYearMonthIntervalType(Month)Yes
IntervalDay, IntervalHour, IntervalMinute, IntervalSecondDayTimeIntervalTypeNo特定の間隔型が使用されます
Object
Nested
Tuple
Point
Polygon
MultiPolygon
Ring
IntervalQuarter
IntervalWeek
Decimal256
AggregateFunction
SimpleAggregateFunction

SparkからClickHouseへのデータ挿入

Spark データ型ClickHouse データ型サポートされているプリミティブかメモ
BooleanTypeUInt8Yes
ByteTypeInt8Yes
ShortTypeInt16Yes
IntegerTypeInt32Yes
LongTypeInt64Yes
FloatTypeFloat32Yes
DoubleTypeFloat64Yes
StringTypeStringYes
VarcharTypeStringYes
CharTypeStringYes
DecimalTypeDecimal(p, s)Yes精度とスケールは Decimal128 まで可能
DateTypeDateYes
TimestampTypeDateTimeYes
ArrayType (リスト、タプル、配列)ArrayNo配列要素の型も変換されます
MapTypeMapNoキーは StringType に制限されます
Object
Nested

Spark JDBC

Sparkでサポートされているデータソースの中で最もよく使われるのがJDBCです。このセクションでは、SparkでClickHouse公式JDBCコネクタを使用する方法について詳しく説明します。

データの読み込み

public static void main(String[] args) {
// Sparkセッションの初期化
SparkSession spark = SparkSession.builder().appName("example").master("local").getOrCreate();

// JDBC接続の詳細
String jdbcUrl = "jdbc:ch://localhost:8123/default";
Properties jdbcProperties = new Properties();
jdbcProperties.put("user", "default");
jdbcProperties.put("password", "123456");

// ClickHouseからテーブルを読み込む
Dataset<Row> df = spark.read().jdbc(jdbcUrl, "example_table", jdbcProperties);

// DataFrameを表示する
df.show();

// Sparkセッションを停止する
spark.stop();
}

データの書き込み

Info

現時点では、JDBCを使用して既存のテーブルにのみデータを挿入できます。

    public static void main(String[] args) {
// Sparkセッションの初期化
SparkSession spark = SparkSession.builder().appName("example").master("local").getOrCreate();

// JDBC接続の詳細
String jdbcUrl = "jdbc:ch://localhost:8123/default";
Properties jdbcProperties = new Properties();
jdbcProperties.put("user", "default");
jdbcProperties.put("password", "******");
// サンプルDataFrameの作成
StructType schema = new StructType(new StructField[]{
DataTypes.createStructField("id", DataTypes.IntegerType, false),
DataTypes.createStructField("name", DataTypes.StringType, false)
});

List<Row> rows = new ArrayList<Row>();
rows.add(RowFactory.create(1, "John"));
rows.add(RowFactory.create(2, "Doe"));

Dataset<Row> df = spark.createDataFrame(rows, schema);

df.write()
.mode(SaveMode.Append)
.jdbc(jdbcUrl, "my_table", jdbcProperties);
// DataFrameを表示する
df.show();

// Sparkセッションを停止する
spark.stop();
}
Info

Spark JDBCを使用する場合、Sparkは単一パーティションでデータを読み取ります。より高い並行性を達成するには、partitionColumnlowerBoundupperBoundnumPartitionsを指定する必要があります。これらは、複数のワーカーから並行して読み取る際のテーブル分割を説明します。より詳細な情報については、Apache Sparkの公式ドキュメントでJDBC構成をご確認ください。