#chiroito ’s blog

Java を中心とした趣味の技術について

電車情報を使ったIoTリアルタイムストリーム処理

この記事はOracle Cloud Advent Calendar 2018 - Adventar」の12月22日の記事として書かれています。

今回やること

これまで仕事で散々 IoT やストリーム処理をしてきたので簡単に作ってみたいと思います。

作ってみたシステムは下図のように東京メトロさんの電車を GoogleMap に表示して、電車が今どこを走っているのかを見られるようにしてみましょう。

f:id:chiroito:20181223004925p:plain

今回、このシステムで使用する情報は電車の情報、駅の情報、路線の情報、行き先の情報です。電車の情報には、電車がどの駅とどの駅の間を走っているか、またどの駅に止まっているかが含まれていますが、電車が今いる座標は分かりません。電車から得られた情報をただ単に表示するだけでは芸が無いので、複数の情報を結合してみます。

システムの構成

システムの全体像は以下のとおりです。今回のシステムはOracleクラウドサービスである Oracle Cloud を使用して開発しています。左側がデバイスからのデータが投入される入口で、右側はユーザが情報を見る入口になります。

f:id:chiroito:20181223005552p:plain

OracleOSS

システムの全体像を見ていただくと、OSSが目に入ると思います。Oracle は昔からたくさんの OSS やそのコミュニティに対してスポンサーやコントリビュートをしており、Oracle Cloud が提供しているサービスの大半はこれらの OSS をベースにエンタープライズレベルでも使用可能なクラウドサービスを開発しています。

Data Source Layer

今回のシステムでは、電車の情報を自分のシステムに投入してもらうのは無理なので、東京メトロさんが公開している API から電車の情報を取って自分のシステムに投げるアプリを作っています(App1)。このアプリケーションは Kubernates のクラウドサービスである Oracle Container for Kubernates 上で動かします。

Ingestion Layer, Speed Layer, Storage Layer

電車の情報は、Kafka のクラウドサービスである Oracle Event Hub によって受け取ります。受け取った情報は、Hadoopクラウドサービスである Oracle Big Data Cloud Service - Compute Edtion 上で動いている Apache Spark のアプリケーションが処理します(App2) 。このアプリケーションは、電車がどの駅間を走っているかと各駅の座標情報から電車の座標を算出しています。今回は出発した駅と向かっている駅の中間点を算出してます。算出された情報は Oracle Coherence がベースとなっている Java Cloud Service 上に保存します。

このアプリケーションは GUI で Spark アプリケーションを開発できる Oracle Stream Analytics (OSA)を使って開発しました(後述)。OSAを使って開発することで、データのキャッシュや永続化などを自動的にインメモリ分散データキャッシュである Oracle Coherence へ格納・読み込みできるようになりエンタープライズなどで要求される高速なデータアクセスなどが可能となります。

Serving Layer, Visualization Layer

最後に電車を表示する側です。 Google Map のアプリケーションは定期的に電車の情報を取得して、得られた電車の情報を表示します。このアプリケーションの取得先として、時間を指定するとその時間に走っていた全ての電車の情報を先ほどの Oracle Coherence へ取りにいくアプリケーションがあります(App3)。このアプリケーションも Kubernates のクラウドサービスである Oracle Container for Kubernates 上で動きます。

Speed Layer の詳細

Spark アプリケーションの開発は Oracle Stream Analytics (OSA)を使って開発します。OSAでは、下図のような GUI を使って開発します。

f:id:chiroito:20181223011407p:plain

この図では、入力となる電車の生データに「丸ノ内線や銀座線」などの路線の情報、「荻窪行きや渋谷行き」などの行き先の情報、「西新宿駅外苑前駅の座標情報」などの駅情報を結合していきます。これらの情報は1度目はRDBMS である Exadata ベースの Autonomous Transaction Processing から読込まれますが、2度目からは Oracle Coherence から高速に読込まれます。

この例では、結合された情報を使って「稼働中の電車数」「各電車の座標」「遅延している電車数」を算出しています。これらの中で、「各電車の座標」を Oracle Coherence へ格納し、JavaScript のアプリケーションから App3 経由で情報を取りだしてGoogle Map 上に表示しています。