ScalikeJDBCを使ってBigQueryにアクセスするための拡張ライブラリscalikejdbc-bigquery
について紹介します。
こんにちは、@ocadarumaです。
おもに広告効果計測ツールの開発/運用などをやっています。
この記事では、最近作ったscalikejdbc-bigqueryというライブラリを紹介します。
scalikejdbc-bigqueryとは
ScalikeJDBCは、名前の通りJDBCドライバ経由でDBにアクセスする機能を提供するScalaライブラリで、弊社でも多くのScalaプロジェクトで採用しています。(参考: Migrating to ScalikeJDBC 2.4.x)
一方、近ごろ飛ぶ鳥を落とす勢いのGoogle BigQueryは、JDBCドライバを提供していないため、直接ScalikeJDBCから繋ぐことができません。(3rd partyのドライバはあります。後述)
scalikejdbc-bigqueryを使うと、ScalikeJDBCのDSLを用いて組み立てたクエリをBigQueryに対して発行し、結果を取得することができます。
リポジトリはこちら : https://github.com/ocadaruma/scalikejdbc-bigquery
対象読者
- ScalaおよびScalikeJDBCを使っていて、BigQueryをデータストアとして使うアプリケーションを作っている方
なお、JDBCやScalikeJDBCの基礎的な事項の説明は省きます。
環境
この記事の内容は、次の環境に基づいて書かれています。
作った背景
弊社のあるシステムにて、パフォーマンスなどの観点から、アクセスログのデータストアをAmazon RedshiftからBigQueryへ移行することになりました。
既存のRedshiftを使った集計部分はScala/ScalikeJDBC(QueryDSL)で書かれていたため、方針としては、以下の3パターンあたりが考えられました。
- BigQuery Client Libraryを使って既存の集計ロジックを移植する
- 3rd partyのBigQuery用JDBCドライバを使用する (https://cloud.google.com/bigquery/partners/simba-drivers/)
- クエリの組み立てのみScalikeJDBCを使い、実行する部分のみBigQueryへアクセスするよう差し替える
まず1については、SQLの組み立てが複雑な業務ロジックに基づいていたことと、実質的にScalikeJDBCのクエリビルダ相当のことをやる形になるため、実装コストが高いと判断しました。
次の2について、Simba Driverを試してみた結果、以下の懸念が上がりました。
- PreparedStatementを使用した際に正しく動作しない
- https://issuetracker.google.com/issues/35906079
- version 1.0.5にて直ったようだが、他にもバグを踏みそう
- かつオープンソースで無いため、踏んだときに迅速な対応が困難
- ジョブのキャンセルやScanサイズの取得といったBigQuery固有の操作を行いたいが、ドキュメントを見る限りSimba driverにはそのような機能は無さそう
したがって3の方針をとり、BigQueryへのアクセス部分をscalikejdbc-bigqueryとしてライブラリ化した、という経緯です。
scalikejdbc-bigqueryの使い方
ではさっそく、Githubの公開データセットへのアクセスを通して、scalikejdbc-bigqueryの使い方を見てみましょう。(併せてREADMEもご参照ください。)
なお課金にはご注意ください。
以下のクエリを投げることをゴールとします。
Github上のリポジトリで、もっとも選択されているライセンスとそのリポジトリ数
あらかじめ、gcloud auth login
しておくか、keyファイルのパスをGOOGLE_APPLICATION_CREDENTIALS
環境変数にセットしておくなどしておきます。(参考: How the Application Default Credentials work)
まずbuild.sbt
に以下の依存を追加します。
libraryDependencies ++= Seq( "com.mayreh" %% "scalikejdbc-bigquery" % "0.0.4", "org.scalikejdbc" %% "scalikejdbc" % "3.0.0-RC3" )
次に、licenseテーブルを表すentityおよびdaoを作ります。
import scalikejdbc._ case class Licenses( repoName: String, license: String ) object Licenses extends SQLSyntaxSupport[Licenses] { override val columns = Seq("repo_name", "license") val li = this.syntax("li") }
scalikejdbc-bigqueryはBigQuery Client Libraryを使いますので、BigQueryサービスをインスタンス化しておきます。
import com.google.auth.oauth2.GoogleCredentials import com.google.cloud.bigquery.{BigQueryOptions, DatasetId} val credentials = GoogleCredentials.getApplicationDefault val bigQuery = BigQueryOptions.newBuilder() .setCredentials(credentials) .setProjectId("your-gcp-project-id") .build() .getService
これで、以下のコードで目的の集計を行うことができます。
import scalikejdbc.bigquery._ import Licenses.li val executor = new QueryExecutor(bigQuery, QueryConfig()) val dataset = DatasetId.of("bigquery-public-data", "github_repos") val cnt = sqls"cnt" val Some((license, count)) = bq { select(li.result.license, sqls"count(1) as $cnt") .from(Licenses in dataset as li) .groupBy(li.license) .orderBy(cnt) .limit(1) }.map { rs => (rs.string(li.resultName.license), rs.int(cnt)) }.single.run(executor).result println(s"license: $license, count: $count")
実行してみます。
license: mit, count: 1667029
と、通常のQueryDSLとほぼ変わらないsyntaxで、BigQueryへSQLを発行できました。
ちなみに、普通にJDBCドライバ経由で発行する場合は以下のようになります。(コード上の違いはConnectionを作るところと、テーブル参照のとこおよびapply
の部分くらいです)
import Licenses.li DB.readOnly { implicit session => val cnt = sqls"cnt" val Some((license, count)) = bq { select(li.result.license, sqls"count(1) as $cnt") .from(Licenses as li) .groupBy(li.license) .orderBy(cnt) .limit(1) }.map { rs => (rs.string(li.resultName.license), rs.int(cnt)) }.single.apply() println(s"license: $license, count: $count") }
scalikejdbc-bigqueryでは、(当然といえば当然ですが)ScalikeJDBCのConnectionPool周りの機能はまったく使用しません。
scalikejdbc-bigqueryの設計
前述の背景より、scalikejdbc-bigqueryには「QueryDSLをほぼそのまま使用できる」という要件が求められました。
ここでは、それをどのように実現しているかについて解説します。
まず、すごく大雑把にScalikeJDBCのQueryDSLの仕組みを書くと、以下のような流れです。
selectFrom
とか.where.eq
とかを使って、SQLSyntax
インスタンスが作られる (SQLSyntax
には、パラメータがplaceholderになったstatementと、パラメータが分かれて保持される)SQLSyntax
からPreparedStatement
を作り、パラメータをバインドしていく- 作った
PreparedStatement
を実行し、ResultSet
を取得する ResultSet
をScalaのcollectionなどに変換する (One-to-X APIなど)
1〜4のうち、JDBCに特有の部分をBigQuery Client Libraryに置き換えれば実現できる、というわけです。
PreparedStatementを作る部分(1, 2)
BigQuery Client LibraryにはParameterを埋め込むAPIがあるため、QueryDSLで作られたSQLSyntax
をそのまま使えばよさそうな感じがします。
一方、DSLを構成するwhere.eq
などのメソッドのシグネチャを見てみると、以下のようにB
がParameterBinderFactory
型クラスに属することを要求していることがわかります。
def eq[B: ParameterBinderFactory](column: SQLSyntax, value: B): ConditionSQLBuilder[A]
ParameterBinderFactory[B]
はざっくり言うと、「B型の値を受け取って『PreparedStatement
を受け取ってそれにB型の値をsetする関数』を返す機能」を持つ型クラスです。
したがって、以下のようなものを実装し、BigQuery Client Libraryを使ったRequestを組み立てる作りとしました。
- パラメータを、BigQuery Client Libraryの
QueryParameterValue
に変換して保持するPreparedStatement
実装 SQLSyntax
に保持されたstatement/パラメータから、BigQuery Client LibraryのQueryRequest
を作るクラス
ResultSetを取得する部分(3, 4)
素のScalikeJDBCではPreparedStatement#executeQuery
でResultSetを取得していますが、
BigQueryの場合は、Client LibraryのQueryResultから作ればOKです。
また、One-to-X APIについてもJDBCに依存していますので、再実装しています。(現状OneToManyのみ)
まとめ
背景の通り、あるシステムのBigQueryへの移行のために開発したscalikejdbc-bigqueryですが、当初の意図通り、最小限の書き直しで既存の集計部分を動作させることができました。
ただし、まだ本格的な移行のために絶賛作業中の段階ですので、しばらくはAPIが安定しないかもしれません。
お使いいただく際は、その点にご留意くだされば幸いです。