読者です 読者をやめる 読者になる 読者になる

Opt Technologies Magazine

オプトテクノロジーズ 公式Webマガジン

ScalikeJDBC-BigQuery

f:id:opttechnologies2015:20170516143356p:plain

ScalikeJDBCを使ってBigQueryにアクセスするための拡張ライブラリscalikejdbc-bigqueryについて紹介します。

こんにちは、@ocadarumaです。

おもに広告効果計測ツールの開発/運用などをやっています。

この記事では、最近作ったscalikejdbc-bigqueryというライブラリを紹介します。

scalikejdbc-bigqueryとは

ScalikeJDBCは、名前の通りJDBCドライバ経由でDBにアクセスする機能を提供するScalaライブラリで、弊社でも多くのScalaプロジェクトで採用しています。(参考: Migrating to ScalikeJDBC 2.4.x)

一方、近ごろ飛ぶ鳥を落とす勢いのGoogle BigQueryは、JDBCドライバを提供していないため、直接ScalikeJDBCから繋ぐことができません。(3rd partyのドライバはあります。後述)

scalikejdbc-bigqueryを使うと、ScalikeJDBCのDSLを用いて組み立てたクエリをBigQueryに対して発行し、結果を取得することができます。

リポジトリはこちら : https://github.com/ocadaruma/scalikejdbc-bigquery

対象読者

  • ScalaおよびScalikeJDBCを使っていて、BigQueryをデータストアとして使うアプリケーションを作っている方

なお、JDBCやScalikeJDBCの基礎的な事項の説明は省きます。

環境

この記事の内容は、次の環境に基づいて書かれています。

  • Scala 2.11.8
  • Java 8
  • scalikejdbc-bigquery 0.0.4
  • ScalikeJDBC 3.0.0-RC3

作った背景

弊社のあるシステムにて、パフォーマンスなどの観点から、アクセスログのデータストアをAmazon RedshiftからBigQueryへ移行することになりました。

既存のRedshiftを使った集計部分はScala/ScalikeJDBC(QueryDSL)で書かれていたため、方針としては、以下の3パターンあたりが考えられました。

  1. BigQuery Client Libraryを使って既存の集計ロジックを移植する
  2. 3rd partyのBigQuery用JDBCドライバを使用する (https://cloud.google.com/bigquery/partners/simba-drivers/)
  3. クエリの組み立てのみScalikeJDBCを使い、実行する部分のみBigQueryへアクセスするよう差し替える

まず1については、SQLの組み立てが複雑な業務ロジックに基づいていたことと、実質的にScalikeJDBCのクエリビルダ相当のことをやる形になるため、実装コストが高いと判断しました。

次の2について、Simba Driverを試してみた結果、以下の懸念が上がりました。

  • PreparedStatementを使用した際に正しく動作しない
  • ジョブのキャンセルやScanサイズの取得といったBigQuery固有の操作を行いたいが、ドキュメントを見る限りSimba driverにはそのような機能は無さそう

したがって3の方針をとり、BigQueryへのアクセス部分をscalikejdbc-bigqueryとしてライブラリ化した、という経緯です。

scalikejdbc-bigqueryの使い方

ではさっそく、Githubの公開データセットへのアクセスを通して、scalikejdbc-bigqueryの使い方を見てみましょう。(併せてREADMEもご参照ください。)

なお課金にはご注意ください。

以下のクエリを投げることをゴールとします。

Github上のリポジトリで、もっとも選択されているライセンスとそのリポジトリ数

あらかじめ、gcloud auth loginしておくか、keyファイルのパスをGOOGLE_APPLICATION_CREDENTIALS環境変数にセットしておくなどしておきます。(参考: How the Application Default Credentials work

まずbuild.sbtに以下の依存を追加します。

libraryDependencies ++= Seq(
  "com.mayreh" %% "scalikejdbc-bigquery" % "0.0.4",
  "org.scalikejdbc" %% "scalikejdbc" % "3.0.0-RC3"
)

次に、licenseテーブルを表すentityおよびdaoを作ります。

import scalikejdbc._

case class Licenses(
  repoName: String,
  license: String
)

object Licenses extends SQLSyntaxSupport[Licenses] {
  override val columns = Seq("repo_name", "license")

  val li = this.syntax("li")
}

scalikejdbc-bigqueryはBigQuery Client Libraryを使いますので、BigQueryサービスをインスタンス化しておきます。

import com.google.auth.oauth2.GoogleCredentials
import com.google.cloud.bigquery.{BigQueryOptions, DatasetId}

val credentials = GoogleCredentials.getApplicationDefault
val bigQuery = BigQueryOptions.newBuilder()
  .setCredentials(credentials)
  .setProjectId("your-gcp-project-id")
  .build()
  .getService

これで、以下のコードで目的の集計を行うことができます。

import scalikejdbc.bigquery._
import Licenses.li

val executor = new QueryExecutor(bigQuery, QueryConfig())

val dataset = DatasetId.of("bigquery-public-data", "github_repos")

val cnt = sqls"cnt"

val Some((license, count)) = bq {
  select(li.result.license, sqls"count(1) as $cnt")
    .from(Licenses in dataset as li)
    .groupBy(li.license)
    .orderBy(cnt)
    .limit(1)
}.map { rs =>
  (rs.string(li.resultName.license), rs.int(cnt))
}.single.run(executor).result

println(s"license: $license, count: $count")

実行してみます。

license: mit, count: 1667029

と、通常のQueryDSLとほぼ変わらないsyntaxで、BigQueryへSQLを発行できました。

ちなみに、普通にJDBCドライバ経由で発行する場合は以下のようになります。(コード上の違いはConnectionを作るところと、テーブル参照のとこおよびapplyの部分くらいです)

import Licenses.li

DB.readOnly { implicit session =>
  val cnt = sqls"cnt"

  val Some((license, count)) = bq {
    select(li.result.license, sqls"count(1) as $cnt")
      .from(Licenses as li)
      .groupBy(li.license)
      .orderBy(cnt)
      .limit(1)
  }.map { rs =>
    (rs.string(li.resultName.license), rs.int(cnt))
  }.single.apply()

  println(s"license: $license, count: $count")
}

scalikejdbc-bigqueryでは、(当然といえば当然ですが)ScalikeJDBCのConnectionPool周りの機能はまったく使用しません。

scalikejdbc-bigqueryの設計

前述の背景より、scalikejdbc-bigqueryには「QueryDSLをほぼそのまま使用できる」という要件が求められました。

ここでは、それをどのように実現しているかについて解説します。

まず、すごく大雑把にScalikeJDBCのQueryDSLの仕組みを書くと、以下のような流れです。

  1. selectFromとか.where.eqとかを使って、SQLSyntaxインスタンスが作られる (SQLSyntaxには、パラメータがplaceholderになったstatementと、パラメータが分かれて保持される)
  2. SQLSyntaxからPreparedStatementを作り、パラメータをバインドしていく
  3. 作ったPreparedStatementを実行し、ResultSetを取得する
  4. ResultSetをScalaのcollectionなどに変換する (One-to-X APIなど)

1〜4のうち、JDBCに特有の部分をBigQuery Client Libraryに置き換えれば実現できる、というわけです。

PreparedStatementを作る部分(1, 2)

BigQuery Client LibraryにはParameterを埋め込むAPIがあるため、QueryDSLで作られたSQLSyntaxをそのまま使えばよさそうな感じがします。

一方、DSLを構成するwhere.eqなどのメソッドのシグネチャを見てみると、以下のようにBParameterBinderFactory型クラスに属することを要求していることがわかります。

def eq[B: ParameterBinderFactory](column: SQLSyntax, value: B): ConditionSQLBuilder[A]

ParameterBinderFactory[B]はざっくり言うと、「B型の値を受け取って『PreparedStatementを受け取ってそれにB型の値をsetする関数』を返す機能」を持つ型クラスです。

したがって、以下のようなものを実装し、BigQuery Client Libraryを使ったRequestを組み立てる作りとしました。

  • パラメータを、BigQuery Client LibraryのQueryParameterValueに変換して保持するPreparedStatement実装
  • SQLSyntaxに保持されたstatement/パラメータから、BigQuery Client LibraryのQueryRequestを作るクラス

ResultSetを取得する部分(3, 4)

素のScalikeJDBCではPreparedStatement#executeQueryでResultSetを取得していますが、 BigQueryの場合は、Client LibraryのQueryResultから作ればOKです。

また、One-to-X APIについてもJDBCに依存していますので、再実装しています。(現状OneToManyのみ)

まとめ

背景の通り、あるシステムのBigQueryへの移行のために開発したscalikejdbc-bigqueryですが、当初の意図通り、最小限の書き直しで既存の集計部分を動作させることができました。

ただし、まだ本格的な移行のために絶賛作業中の段階ですので、しばらくはAPIが安定しないかもしれません。

お使いいただく際は、その点にご留意くだされば幸いです。