oslo.messaging で RPC を実装


 こんにちは、インフラ本部の大山裕泰です。
 今日は OpenStack の共通ライブラリ群 oslo に含まれる olso.messaging を使って RPC over MQ 処理を実装してみます。なぜ RPC を MQ を介して行うと良いかについては 過去のエントリ で書きましたので、興味があれば読んでみてください。

oslo って何?

 oslo は OpenStack の各コンポーネント (OpenStack の最も大きなソフトウェアモジュールの単位) で共通で利用するライブラリを集めたプロジェクトになります。
 その昔の OpenStack では設定ファイルやコマンドライン引数の解析、メッセージパッシング、ロギングなどといった処理が各コンポーネント毎に個別に存在しており、開発・メンテナンスコストが掛かる問題がありました。
 そして Grizzly サイクルで共通ライブラリ群を提供する olso プロジェクトが発足し、Havana で設定ファイルとコマンドライン引数の解析を行なう oslo.config と、通知や RPC の仕組みを提供する oslo.messaging がリリースされました。現在では、国際化や並行処理など様々な機能が加わり、本稿執筆時点では 34 のライブラリ が存在します。

 oslo 自体は OpenStack のプロジェクトですが、用途は OpenStack に限定されません。例えば oslo.config は、設定ファイルとコマンドライン引数の解析を行なうライブラリとして広く知られており、NTT Communication などが中心となって開発している SDN Framework Ryu などで採用されています。
 今回紹介する oslo.messaging では、RabbitMQZeroMQ などの MQ システムを介した通知や RPC の仕組みを提供します。

なぜ oslo.messaging なのか?

 oslo.messaging の使い方について見て行く前に、oslo.messaging についての理解を深めるために、なぜこの仕組みを使うのかについて考えてみたいと思います。
 MQ を利用した通知や RPC の仕組みは、もちろん oslo.messaging が登場する以前から存在していました。RabbitMQ を使う場合には Pika という AMQP ライブラリを用いてそれらの処理を行なうことができます。また ZeroMQ を使う場合には、PyZMQ ライブラリを用い、Kafka にも 同様の Python ライブラリ が存在します。

 しかしこれらを用いることによって、ミドルウェアないはプロトコル依存の実装になってしまいます。
 oslo.messaging では、ミドルウェア・プロトコル非依存の通知、RPC の仕組みを提供しており、これによってユーザは様々な MQ システムを選択することができるようになりました。

oslo.messaging の使い方

 以降では oslo.messaging を使って、汎用的な RPC over MQ 処理を行なう方法を紹介します。サンプルとして以下のコードを利用します。

https://github.com/userlocalhost2000/oslo-messaging-examples

 コードの中身を見る前に、システムのアウトラインについて簡単に把握したいと思います。以下は oslo.messaging と他のシステムとの関連、及び内部アーキテクチャについて表した図になります。

 処理のおおまかな流れとしては、左上の “User Application” が “RPCClient” で定義されたインターフェイスを叩き、”Transport” オブジェクトで抽象化された MQ システム (RabbitMQ など) を経由して “Server” に登録された “Endpoint(s)” の処理を実行し結果を取得します。ここでは、oslo.messaging レイヤにおけるクライアント、サーバ側の内部の仕組みについて簡単に解説します。

 クライアント側では RPCClient オブジェクトがユーザアプリケーションに対して、RPC 処理の仕組みを提供します。また Transport オブジェクトが、RabbitMQ や Kafka , ZeroMQ などの各種 MOM を抽象化し、ユーザは RPCClient が提供するインターフェイスを通して MOM 非依存な RPC リクエストをサーバに送ることができます。
 サーバ側では、ユーザからの RPC 要求を処理する Server オブジェクトを生成します。その際、ユーザ側で各 PRC 要求に対応する処理 (Endpoint) を一つ以上実装してやり、それらを Server オブジェクト生成時に指定します。
 また Server 及び Endpoint(s) に対して RPC API の名前空間と互換 version について設定することができます。これらの設定を保持したものが Target オブジェクトになり Server 及び Endpoint にひも付きます。
 Endpoint オブジェクトにおいて Target を省略した場合には、グローバルな名前空間と version1.0 が暗黙的に設定されます。ただし Server オブジェクトにおいては Target オブジェクトを topic 及び server パラメータ付きで指定しなければなりません。
 Target オブジェクトの topic パラメータは、サーバが提供する API を識別するための項目で amqp ドライバにおいては名前付きキューの名前に対応します。クライアントはこの値だけを知っていれば、サーバの場所を意識せずにサーバに対して RPC 要求を送ることができます。
 また Target オブジェクトの server パラメータは、文字通り API を提供するサーバを指定する項目ですが、実際に存在するサーバのホスト名ないしは IP アドレスを指定しなければならないわけではく、サーバを指定する任意の文字列になります。これはクライアントが topic パラメータで指定した API を持つサーバ群のうち、特定のサーバに対してメッセージを送りたい場合に利用されるパラメータになり amqp ドライバにおいては、topic 名で指定した名前付きキューに加えて、”${topic}.${server}” の名前付きキューを生成します (${topic} と ${server} にはそれぞれパラメータの設定値が入ります)。

動かしてみる

 各データ構造の役割と関連がわかったところで、サンプルコードを見ながら実際に動かしてみます。
 まずサンプルコードを動かすために oslo.messaging をインストールします。またここでは Python 2.7.6 を使用しています、尚本稿執筆時点で OpenStack がサポートしている Python のバージョンは 2.7.x と 3.4.x になります [*1](https://wiki.openstack.org/wiki/Python3)。

$ pip install oslo_messaging

 続いて次のサンプルコードを GitHub から取得します。

$ git clone git@github.com:userlocalhost2000/oslo-messaging-examples.git

 サーバ側のコード `src/server.py` について簡単に解説します。以下がその抜粋になります。

     7	URL = 'rabbit://guest:guest@localhost:5672/'
     8	
     9	class TestEndpoint(object):
    10	  target = oslo_messaging.Target(namespace='foo', version='1.2')
    11	
    12	  def hoge(self, ctx, arg):
    13	    print("[TestEndpoint] hoge(%s, %d) is called" % (ctx, arg))
    14	    return arg * 2
    15	
    16	transport = oslo_messaging.get_transport(cfg.CONF, url = URL)
    17	target = oslo_messaging.Target(topic='test01', server='server1')
    18	endpoints = [
    19	  TestEndpoint(),
    20	]
    21	
    22	server = oslo_messaging.get_rpc_server(transport, target, endpoints)

 22 行目の get_rpc_server メソッドの呼び出しで Server オブジェクトを生成します。RPCClient からの RPC 要求に対して endpoints パラメータで指定したオブジェクトにマッチするメソッドを呼び出します。その際、エンドポイントオブジェクトに target メンバが設定されている場合には、namespace と version ネゴシエーションの確認を行います。target メンバが指定されていない場合には、デフォルトの namespace (=None) と version (=1.0) が内部的に設定されます。
 コールバックメソッド `hoge()` のパラメータはそれぞれ、`ctx` がユーザ定義のオブジェクト (ディレクトリ型) を表し、`arg` がユーザ定義の引数を表します。

 次にクライアント側のコード `src/client.py` についても簡単に見て行きます。以下がその抜粋になります。

     6	URL = 'rabbit://guest:guest@localhost:5672/'
     7	
     8	class TestClient(object):
     9	  def __init__(self, transport):
    10	    target = oslo_messaging.Target(topic='test01')
    11	    self.client = oslo_messaging.RPCClient(transport, target)
    12	
    13	  def hoge(self, ctxt, arg):
    14	    cctxt= self.client.prepare(namespace='foo', version='1.1')
    15	    return cctxt.call(ctxt, 'hoge', arg = arg)
    16	
    17	transport = oslo_messaging.get_transport(cfg.CONF, url = URL)
    18	client = TestClient(transport)
    19	
    20	print(client.hoge({}, 10))

 20 行目でユーザ定義の RPCClient ラッパーの hoge を呼び出し、内部的に 15 行目でサーバに対して hoge メソッドの呼び出しを行っています。その際 14 行目の prepare() メソッドの呼び出しで RPCClient オブジェクトが内部で持つ Target オブジェクトの namespace と version パラメータを上書きします。
 version パラメータがサーバ側のエンドポイント hoge() で指定した値と一致していないことに気づいたかもしれません。Server 側のバージョンネゴシエーション処理では、クライアントからの要求 version のメジャーバージョン (上の位) が一致しており、かつマイナーバージョン (下の位) がサーバで設定した値以下であれば、互換性がある要求として当該メソッドを実行し結果を返します。

 それでは、実際にこれらを実行してみます。次のようにターミナルからサーバスクリプト `src/server.py` を実行し、次の別のターミナルを開いてクライアントスクリプト `src/client.py` を実行してみてください。


(左:サーバ、右:クライアント)

 クライアント側からの呼び出しでサーバ側で定義したエンドポイント hoge() が呼び出され、クライアント側で実行結果が受け取られていることが確認できました。

«
»

コメントを残す

メールアドレスが公開されることはありません。 * が付いている欄は必須項目です

Optionally add an image (JPEG only)