こんにちは、 @cedretaber です。 先日の記事で、納会用アプリのクライアントサイドの解説がなされましたが、今回はサーバサイドの解説をしていきます。
納会用アプリの詳しい仕様については、前記事をご覧ください。
Akka HTTP
納会用アプリでは、サーバサイドをAkka HTTPを利用して開発しました。 Akka HTTPはAkkaで構築されたHTTPライブラリです。 Play2フレームワークのような比較的リッチなフレームワークに比べてシンプルで、かつWebSocketの利用が難しく無さそうに思えたので、今回利用しました。
なおAkka HTTPについては、公式ドキュメントに、
The Akka HTTP modules implement a full server- and client-side HTTP stack on top of akka-actor and akka-stream. It’s not a web-framework but rather a more general toolkit for providing and consuming HTTP-based services. While interaction with a browser is of course also in scope it is not the primary focus of Akka HTTP.
とある通り、単なるウェブフレームワークではなく、サーバサイド・クライアントサイドを包括する、HTTPベースのサービスの為のツールキットという位置付けです。マイクロサービスアーキテクチャに基くようなものを作る際に活用できそうです。
が、今回は単にウェブサーバを構築する為に利用しました。
Akka HTTPでWebSocketを扱う
Akka HTTPにはWebSocketを扱う為のモジュールが用意されています。上で述べた通り、Akka HTTPはサーバサイド、クライアントサイド両面のライブラリなので、WebSocketについてもサーバとして扱う手段とクライアントとして扱う手段が提供されています。
コード解説
では、納会用のクイズアプリについての解説を行っていきます。
ライブラリの依存性は以下のようになっております。 (Akka HTTPに関する部分のみ抜粋、見やすく書き直しています。)
val akkaVersion = "2.4.7" libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-http-core", "com.typesafe.akka" %% "akka-http-experimental" ).map(_ % akkaVersion)
なお、当アプリは、こちらのコードを大いに参考致しております。 github.com
クイズへの参加
WebSocketコネクションが確立される時、クエリストリングから参加者の名前と部署とを読み取って、これをキーに参加者登録を行います。
以下がルーティングです。
// WebSocket path("quiz") { (for { dept: String <- parameter('dept) name: String <- parameter('name) } yield handleWebSocketMessages(noukaiFlow(dept, name)) ).apply(identity)
クエリストリングは parameter
メソッドを使って取得します。このメソッドは path
のような他のメソッドと同様、
parameter('dept) { dept => parameter('name) { name => handleWebSocketMessages(noukaiFlow(dept, name)) } }
と書けるのですが、今回はネストが深くなるのを嫌ってfor式で書きました。
ここで handler
に渡している noukaiFlow
の定義は次のようなものです。
def noukaiFlow(dept: String, name: String): Flow[Message, Message, Any] = Flow[Message] .collect { case TextMessage.Strict(msg) => msg } .via(noukai.quizFlow(dept, name)) .map { case msg: SendMessage => msg.toJsonString |> TextMessage.Strict case e => println(e) Map("message" -> "an error.").asJson.toString |> TextMessage.Strict }
受け取るメッセージは TextMessage.Strict
型のみで、また送り出すメッセージも SendMessage
型だけを用いています。それ以外のメッセージが送り出されようとする場合、エラーとしています。
主要な処理は全て quizFlow
メソッドの中にあります。
このメソッドは次のようになっています。
def quizFlow(dept: String, name: String): Flow[String, SendMessage, Any] = { val sink = Flow[String] .map(ReceiveMessage.fromJsonString(Identify(dept, name))) .collect { case Some(m) => m } .to(Sink.actorRef[ReceiveMessage](quizMaster, AnswererLeft(Identify(dept, name)))) val source = Source .actorRef[SendMessage](1023, OverflowStrategy.fail) .mapMaterializedValue(quizMaster ! NewAnswerer(Identify(dept, name), _)) Flow.fromSinkAndSource(sink, source) }
まず、メッセージを受け取る処理(Sink)です。
受け取るメッセージは全てJSON形式の文字列である事を想定しており、ReceiveMessageの fromJsonString
メソッドでそれをScalaのオブジェクトへと変換しています。この時、変換に失敗した場合は次の collect
メソッドで切り捨てます。
そしてこのメッセージを、quizMasterという名前の変数のActorに送信しています。
また、メッセージを送り出す処理(Source)でも、SourceとなるActorRefをquizMasterに、部署名と名前と一緒に送りつけています( NewAnswerer(Identify(dept, name), _))
の部分)。このNewAnswererが受け付けられて、参加者登録が完了します。
quizMasterの処理を見てみましょう。
class QuizMasterActor extends Actor { // 中略 var answerers: Map[Identify, ActorRef] = Map.empty def receive = { // 新規参加者 case NewAnswerer(id, subscriber) => val info = InitInfo(AnswererInfo(id, Some(subscriber)), currentProblem) answerers.get(id).fold { // 新規作成する場合 context.actorOf(Props(classOf[AnswererActor])) |> { answerer => answerers += (id -> answerer) answerer ! info } } { answerer => // 既に存在していて、切断していた場合 answerer ! info } case m@AnswererLeft(id) => answerers.get(id).foreach(_ ! m) // 回線が切れても、情報は保持し続ける。
quizMasterは参加者一覧をIdentify型(部署名と名前との組)をキー、ActorRefを値としたマップであるanswerersで保持しています。値のActorRefは、参加者情報を保持するAnswererActor型のActorです。
新規参加者が来た場合、まず、既にそのキーで参加者が存在するかを確認します。というのもこのアプリでは、再接続が可能なように、切断時にも今までの回答情報を保持し続けるからです。
answerersの中にそのキーの参加者が見つからなかった場合は、新規作成します。
そして最後に、その参加者のAnswererActorに、参加者の部署名、名前、SourceとなるActorRef等が入ったcase classであるInitInfoを送りつけます。
では、参加者情報を扱うAnswererActorを見ましょう。
class AnswererActor extends Actor { // 中略 var answerInfo: AnswererInfo = AnswererInfo.default def receive = { // 初期化 // 再接続の際に、subscriberを掛け替えるのにも呼ばれる case InitInfo(info, currentProblem) => answerInfo = info if(currentProblem != 0) { // ここで現在の問題に切り替える処理 answerInfo.ref.foreach(_ ! BroadcastProblem(currentProblem)) } // 回答者の回線が切れた場合。データは保持、接続先は消しておく。 case _: AnswererLeft => answerInfo = answerInfo.copy(ref = None)
AnswererActorの中では、参加者情報をAnswererInfoクラスで保持しているので、単純にそれを掛け替える事で情報を更新します。
こうして、参加者に対してAnswererActorが用意され、クイズの準備が整いました。
クイズ開始
参加者が揃ったらクイズを始めましょう。 クイズの進行は、管理画面から行います。クイズの進行に関わる処理はquizMasterが全て引き受けていますが、これは独立したActorですので、HTTPリクエストを受けて、このActorにメッセージを送る事も可能です。
post { pathPrefix("api") { path("quizs" / IntNumber) { n => noukai.broadcastProblem(n) HttpEntity(ContentTypes.`application/json`, s"""{"num":$n}""") |> toComplete } ~ path("answers" / IntNumber) { n => noukai.answeringProblem(n) HttpEntity(ContentTypes.`application/json`, s"""{"num":$n}""") |> toComplete } } }
POSTリクエストを受けて、 broadcastProblem
メソッドや answeringProblem
を叩いています。
これらのメソッドはこのようになっています。
def broadcastProblem(id: Int) = quizMaster ! BroadcastProblem(id) def answeringProblem(id: Int) = (correctAnswers ? QueryCorrect(Seq(id))).foreach { case ResultCorrect(Seq(correct)) => quizMaster ! AnsweringProblem(id, correct) case _ => Unit }
単純に、Actorにメッセージを送っているだけです。WebSocketで接続されているクライアントへの通信はquizMasterが行うので、管理画面から操作を受けた時に行うのは、これだけで良いのです。
では、メッセージを受け取ったActorがどのように処理をしているのか見てみましょう。
// 次の問題配信 case p@BroadcastProblem(n) => currentProblem = n answeredNumber = 0 answerers.values.foreach(_ ! p)
問題配信の処理で、quizMasterが行っているのはこれだけです。
- 現在の問題番号(
currentProblem
)を架け替えて - 回答済み人数(
answeredNumber
)を0で初期化し - 全ての回答者のAnswererActorに送られてきたメッセージをそのまま送りつける
ブロードキャスト用メッセージは、quizMasterを通して全てのAnswererActorに到達します。
// 問題切り替え case p@BroadcastProblem(n) => startTime = LocalDateTime.now answerInfo.ref.foreach(_ ! p)
個々のAnswererActorでは、まずstartTime(回答開始時刻)を現在時で初期化して、それから、そのメッセージを更に answerInfo.ref
へと転送しています。
つまりBroadcastProblemメッセージは、
HTTPサーバ -> quizMaster -> 個々のAnswererActor -> SourceとなるActor
という経路を通って、最終的に全てのクライアントに配信されます。経由するActorでは、このメッセージを受けて状態を変更します。またクライアントもこのメッセージを受け取り、適切に画面を更新する事になります。
回答
問題が配信されたら、画面から回答を行う事ができるようになります。
回答の通信はJSON形式で、ReceiveMessageの fromJsonString
メソッドで適切に AnswerReach
型へと変換されます。
case class AnswerReach(identify: Identify, id: Int, answer: Int) extends ReceiveMessage
この時、 id
が問題番号、 answer
が回答番号です。
さて、このメッセージはFlowを通してまずquizMasterに送りつけられます。受け取った際のquizMasterの処理は次のようなものです。
// 回答到着 case a@AnswerReach(id, _, _) => answeredNumber += 1 answerers.get(id).foreach(_ ! a) answerers.values.foreach(_ ! SomeoneAnswered(answeredNumber))
- まず、回答済み人数を1増やし
- そして回答者のAnswererActorにメッセージを転送し
- 最後に全てのAnswererActorにSomeoneAnsweredメッセージを送りつける
2種類のメッセージを送っている事が分かります。順に見てみます。
// 回答 case a@AnswerReach(_, id, ans) => answers += (id -> (ans, Duration.between(startTime, LocalDateTime.now)))
回答が受け付けられると、問題IDをキーにして、 回答番号と、問題開始時間から回答受付時間までの期間 を記録しています。後者は、回答にかかった時間を集計するのに利用します。
そしてSomeoneAnsweredメッセージですが、
case s: SomeoneAnswered =>
answerInfo.ref.foreach(_ ! s)
これはそのままクライアントに投げているだけです。クライアントはこのメッセージを受け取って、画面の回答済み人数を更新します。
正解発表
回答できた人も、まだできていない人もいるでしょうが、時間が来たら締め切って正解を発表しないといけません。正解発表も、問題配信と同じく画面から行います。
ルーティングとその処理を再掲します。
post { pathPrefix("api") { path("quizs" / IntNumber) { n => noukai.broadcastProblem(n) HttpEntity(ContentTypes.`application/json`, s"""{"num":$n}""") |> toComplete } ~ path("answers" / IntNumber) { n => noukai.answeringProblem(n) HttpEntity(ContentTypes.`application/json`, s"""{"num":$n}""") |> toComplete } } }
def broadcastProblem(id: Int) = quizMaster ! BroadcastProblem(id) def answeringProblem(id: Int) = (correctAnswers ? QueryCorrect(Seq(id))).foreach { case ResultCorrect(Seq(correct)) => quizMaster ! AnsweringProblem(id, correct) case _ => Unit }
問題配信に比べて、正解発表はちょっと複雑な事を行っているように見えますが、難しくはないです。
まず、このアプリでは問題の正解一覧をcorrectAnswersというActorで管理しています。このActorにQueryCorrectというメッセージを送ると、ResultCorrectというメッセージで正解番号を返してくれます。
そこで、まず ask
メソッドを使ってcorrectAnswersに正解番号を問い合わせます。
correctAnswers ? QueryCorrect(Seq(id))
その後、戻ってきた正解番号と問題番号をAnsweringProblemメッセージに詰めて、quizMasterに送っています。
quizMaster ! AnsweringProblem(id, correct)
これを受け取ったquizMasterの処理が、
// 回答配信 case a: AnsweringProblem => currentProblem = 0 // 回答配信 〜 次の問題 の間に来た参加者には、「お待ち下さい」画面を見せる為 answerers.values.foreach(_ ! a)
で、まず現在の問題番号を0にして、その後で、メッセージを全てのAnswererActorに転送します。現在の問題番号を0にする理由はコメントに書いてある通りで、次の問題が開始されるまで待機画面にする為です。
転送されたメッセージを受け取ったAnswererActorは、
case a@AnsweringProblem(id, _, _) => val (ans, _) = answers.getOrElse(id, (-1, null)) answerInfo.ref.foreach(_ ! a.putAnswer(ans))
という処理を行います。
まず回答一覧であるanswersから、その問題のIDの回答を読み取ります。この時、見つからなかった場合、つまり未回答の場合は、 (-1, null)
というタプルを返します。nullなどという恐ろし気な文字列が見えますが、すぐ左を見れば分かる通り、この値は利用されないので安心して下さい。
val (ans, _) = answers.getOrElse(id, (-1, null))
この行で、当該問題の回答番号を ans
に束縛します。
そして、AnsweringProblemの putAnswer
メソッドを呼び出しています。この辺りは、AnsweringProblemの定義を見ていただくのが早いでしょう。
case class AnsweringProblem(id: Int, correctAnswer: Int, answer: Int = 0) extends SendMessage { def putAnswer(ans: Int): AnsweringProblem = this.copy(answer = ans) def toJsonString = s"""{ "isCorrect": ${correctAnswer == answer} }""" }
メッセージはクライアントに送信される前に toJsonString()
メソッドで文字列になります。AnsweringProblemはこの時、実際の回答と正しい回答を比較して、それが等しいか否かをBoolean型で示し、JSONにしてクライアントに送っているのです。
メッセージを受け取ったクライアントは、正否によって、画面に「正解」とか「不正解」とかの表示を行います。
アクションボタン
アクションボタンの意義については、フロントエンドの記事を参照して下さい。 アクションボタン押下も同じくWebSocketのメッセージとして送られてきます。どのような処理を行っているかは、コードを見るのが一番わかりやすいでしょう。
// アクションボタンが押された case a@ActionButtonPushed(id) => answerers.get(id).foreach(_ ! a)
例によって、quizMasterはAnswererActorにメッセージを転送するだけです。
// アクションボタン case _: ActionButtonPushed => actionCounter += 1
AnswererActorではアクションボタンが押された回数を記録していて、単純にそれを増加させています。
特に、説明する事は、無いです。
優秀者発表
クイズが終わりました! 成績発表を行いましょう。 今回のアプリでは、上位者一覧を管理者画面に表示して、それをスクリーンに映して会場に見せる、という形式を取りました。なので、成績発表APIはHTTPのAPIです。
val route = get { // 中略 pathPrefix("api") { path("rankList") { onComplete(noukai.getRanks(50)) { case Success(list) => HttpEntity(ContentTypes.`application/json`, s"""[${list.map(_.asJson.toString).mkString(", ") }]""") |> toComplete case Failure(e) => onError(e) } } ~
private[this] def makeSummary = (quizMaster ? RequestAnswers).mapTo[Seq[AnswersInfo]] def getRanks(max: Int = 50): Future[Seq[Score]] = for { list <- makeSummary validityRanking: Seq[(Identify, (Int, Double))] <- list.map { case AnswersInfo(id, ans, _) => val (pnums, rets) = ans.toList.sorted.unzip val (anss, durs) = rets.unzip (correctAnswers ? QueryAnswers(pnums, anss)).map { case ResultAnswers(rwList) => id -> ( rwList.count(identity), (durs.map { dur => dur.getSeconds + dur.getNano / 1000000 / 1000.0 }.sum * 1000).toInt / 1000.0 ) } }.toList.sequence.map(_.sortWith { case((_, (r1, t1)), (_, (r2, t2))) => if(r1 == r2) t1 < t2 else r1 > r2 }) } yield validityRanking.take(max).zipWithIndex.map { case ((Identify(dept, name), (corrects, dur)), i) => Score(i+1, s"$name($dept)", corrects, dur) } |> { list => val size = list.length if(size >= max ) list else list ++ (size + 1 to max).map { i => Score(i, "-", 0, 9999.999) } }
getRanks
メソッドは大分辛い事になっていますが、これには、
- 正解数、回答時間の短さの順で順位付けがしたかった
- 回答番号は保持していても正解番号は持っていないので、問題毎に正否の確認が必要
- 規定人数だけ上位から取得し、かつ全体の数が人数に満たない場合には無効な値で埋めたかった
といった処理を無理矢理1つのメソッドに詰め込んだ結果です。
まず、
(quizMaster ? RequestAnswers).mapTo[Seq[AnswersInfo]]
という処理で、全参加者の回答情報を問い合わせています。RequestAnswersメッセージを受け取るquizMasterの処理は以下です。
// 全参加者の結果問い合わせ case RequestAnswers => answerers.values.toList.map { ref => (ref ? RequestAnswers).mapTo[AnswersInfo] }.sequence.pipeTo(sender)
全てのAnswererActorに回答状況を問い合わせ、戻ってきた結果のリストを List[Future[AnswersInfo]]
型から sequence
メソッドで Future[List[AnswersInfo]]
型に組み換え、それを pipeTo
メソッドでメッセージの送り主に返しています。
問い合わせを受けたAnswererActorは次のような処理を行っています。
// 回答結果確認 case RequestAnswers => sender ! AnswersInfo(answerInfo.identify, answers, actionCounter)
自分自身の情報と、回答結果と、「アクション」した回数を返していますね。
このように、
問い合わせ 問い合わせ +---------+ +------------+ +---------------+ | | ~> | | ~> | |-+ | 集計処理 | | quizMaster | | AnswererActor | | | | <~ | | <~ | | | +---------+ +------------+ +---------------+ | 回答 回答 +----------------+
と問い合わせを繋いで、全ての参加者の回答を集計しています。
得られた結果をmapして、
val (pnums, rets) = ans.toList.sorted.unzip val (anss, durs) = rets.unzip
の辺りで問題番号のリストと回答番号のリストを取り出し、
correctAnswers ? QueryAnswers(pnums, anss)
correctAnswersに正否を問い合わせています。 こうして調べた正解の数と、回答にかかった時間をタプルにし、
id -> ( rwList.count(identity), (durs.map { dur => dur.getSeconds + dur.getNano / 1000000 / 1000.0 }.sum * 1000).toInt / 1000.0 )
ソートの、「回答数が多い方が上、同じならば回答時間が短い方が上」という条件に使っています。
}.toList.sequence.map(_.sortWith { case((_, (r1, t1)), (_, (r2, t2))) => if(r1 == r2) t1 < t2 else r1 > r2 })
一番「アクション」していた人
さて、最後は「アクション」の回数を集計して結果発表します。
path("mostClicker") { onComplete(noukai.getClicker) { case Success(clicker) => HttpEntity(ContentTypes.`application/json`, clicker.asJson.toString) |> toComplete case Failure(e) => onError(e) } } ~
ルーティングは成績発表とほとんど同じですね。
override def getClicker: Future[Clicker] = for { list <- makeSummary actionRanking = list.map { case AnswersInfo(id, _, count) => id -> count }.sortBy(_._2).reverse } yield actionRanking.map { case (Identify(dept, name), num) => Clicker(num, s"$name($dept)")}.head
集計処理も、成績発表と同じようなものですが、
- 正解を確認しなくていい
- 並び替えが複雑ではない
- 最優秀者のみ知る事ができれば足りる
といった条件があるので、非常にシンプルになっています。
以上で、クイズアプリの動きを概ね説明し終えました。
付記
Githubで公開しております納会用アプリを試す方法について、そしてJSONライブラリについて少々付け加えます。
問題について
今回解説したアプリですが、問題用のJSONを入れ替える事で、好きな問題を出す事ができます。是非とも試してみて下さい。
問題用のJSONは noukai_quiz_app/client/questions.json
に置かれています。
[ { "id": 1, "choices": [ "answer 1", "answer 2", "answer 3", "answer 4" ], "answer": 2, "description": "Question 1" }, { "id": 2, "choices": [ "answer 1", "answer 2", "answer 3", "answer 4" ], "answer": 2, "description": "Question 2" } ]
あまり解説するような事もないですが、中身はオブジェクトの配列になっていて、 id
に問題ID(数値で、かつ一意)を、 choices
に問題の選択肢を、 answer
に正解を、 description
に問題の解説文を入れます。
answer
は選択肢の番号ですが、0から始まる事に注意して下さい(3番目なら2、最初なら0)。
JSONライブラリについて
今回は、JSONライブラリとしてcirceを利用しました。case classをデコードしたりエンコードしたりする際に、 io.circe.generic.auto
をインポートしておけば何もしなくても自動的に変換してくれるから、というのが理由でした。
しかし、spray-jsonを使うべきだったかもしれません。というのも、Akka HTTPにはspray-jsonを利用する為のモジュールが用意されているので、例えば適切にFormatを定義しておけば、レスポンスにcase classを与えるだけで自動的にJSONに変換してくれる、といったサポートを利用できるからです。
また同僚に教えてもらったのですが、akka-http-jsonというライブラリもあるようですね。バックエンドのJSONライブラリを自由に選べて、circeを使う事もできます。
結び
以上、納会用アプリのサーバサイドを解説しました。 少し前に書いたコードなのですが、今見直すと多数の厳しい点や、余り望ましくない挙動をする箇所が見つかってしまい、力不足を痛感しました。 とはいえサーバサイドにAkka HTTPを、クライアントサイドにReact.jsを採用した当アプリは、納会本番を通して安定稼働する事ができました。
今回の記事を書くにあたって、コードを読み返し、またライブラリのコードリーディングを行った事で、Akka HTTPやAkka Streamsについての理解を深める事ができたかと思います。
Opt Technologiesでは共に働き、共に学んでいけるエンジニアを募集しています!