STOMP と rabbitmq-stomp の話

はじめに

 先日 CodeZine に寄稿した記事 で簡単に触れた STOMP について紹介します。
 後ほど解説しますが、STOMP は非常にシンプルなプロトコルで読もうと思えば数時間で読めてしまいます。
 今回は STOMP プロトコルがどんなものかという事と、STOMP の実装として RabbitMQ の STOMP アダプタ (以下、’rabbitmq-stomp’) に注目し、どういう事ができて AMQP と比べてどうなのかということを簡単に示したいと思います。

STOMP とは?

 STOMP は非常にシンプルで拡張可能なメッセージパッシングプロトコルになります。STOMP は AMQP や MQTT などのバイナリプロトコルと異なり、テキストベースのプロトコルになります。
 どういうことかというと、STOMP ではフレームという単位でメッセージ (content) のやり取りを行います。キューにメッセージを送る際には SEND フレームを STOMP サーバに対して送り、キューに格納されているメッセージを取得する際にはサーバに対して SUBSCRIBE フレームを送信し、メッセージ本体が格納された MESSAGE フレームの送信をサーバから待ち受けるというように、フレームを単位として処理が行われます。そして STOMP は一般にテキストベースのプロトコルと呼ばれる、フレームのヘッダー及びメッセージ本体をテキストデータとしてネットワークに流します。これに対して AMQP や MQTT では、ネットワークに流すデータを圧縮するために、こうしたデータ (AMQP の場合フレーム (Frame)、MQTT ではコマンドメッセージとそれぞれ呼ばれている) をバイナリにエンコードした上でネットワークに流しています。
 こうしたテキストベースのプロトコルでは、実装が非常に簡単に行える利点があります。例えば、クライアント側の処理であれば、telnet からメッセージを送信するなんて事も出来てしまいます。
 以下では、telnet から STOMP サーバに対してメッセージ ‘hogefuga’ を送信し、その後そのメッセージを取り出すという処理を行っています。

 (太文字の部分が STOMP サーバから送られた内容になります)

 更にクライアントがサーバからメッセージを取得した際に送る確認応答 (ACK) を TCP などのように後から累積して返す方法 (累積応答確認) が選択でき、短時間に大量のメッセージを受信することができます。
 しかし AMQP 0.9.1 にあるような fanout や topic exchange による複数のキューに対するメッセージのブロードキャストや、柔軟なメッセージルーティングを実現させることが出来ません(ただ rabbitmq-stomp では拡張機能として topic 機能によるメッセージブロードキャストをサポートしています)。またトランザクション処理について、STOMP ではサポートされていますが AMQP のようにシステムが自動でトランザクション処理をしてくれるのに対して、ユーザがトランザクション (複数のフレームから成る処理のまとまり) を規定してやらなければなりません。
 このように STOMP ではリッチな機能やトランザクション処理の抽象化が行われている AMQP と比べて非常に簡素なプロトコルで、特定のプロセスとメッセージパッシングを行う上でプリミティブな機能を提供しています。

STOMP 実装について

 STOMP の実装は いろいろ ありますが、人気が無いためか最新の STOMP v1.2 に対応し且つキチンとメンテされているプロジェクトは ActiveMQ Artemisrabbitmq-stomp くらいです。今回は rabbitmq-stomp に注目して特徴を解説します。
 rabbitmq-stomp のセットアップの方法については 公式マニュアル をご参照ください。

使い方

 以下は Ruby の STOMP パッケージ stomp を用いて、キュー ‘hoge’ に対してメッセージを送信するサンプルプログラムになります。

送信処理(sender.rb)
require 'stomp'

conn = Stomp::Connection.open
conn.publish "/queue/fuga", "hello world!"
受信処理(receiver.rb)
require 'stomp'

conn = Stomp::Connection.open
conn.subscribe '/queue/fuga'
loop { puts conn.receive.body }

 簡単ですね。stomp パッケージでは Stomp::Client クラスの publish, subscribe の第一引数で指定しる文字列がそれぞれ SEND フレームと SUBSCRIBE フレームの destination ヘッダで指定されます。先頭の ‘/queue/’ は STOMP のキューを表す予約語で、その後にキュー名 ‘hoge’ を指定しています。
 rabbitmq-stomp ではこうした STOMP v1.2 で規定されている機能に加えて、STOMP クライアントから AMQP のキューに対して送る機能やメッセージを broadcast する機能拡張を行っています。以下では、rabbitmq-stomp の拡張機能について簡単なサンプルと共に紹介してゆきます。

/topic/xxx

 送信キュー名 (SEND フレームの destination ヘッダで指定されてる値) のプレフィックスが ‘/topic’ の場合、rabbitmq-stomp ブローカーは AMQP の topic exchange(*1) の要領でメッセージを STOMP キューに格納します。
 STOMP v1.2 の仕様では destination ヘッダのセマンティクスは規定されていませんが、rabbitmq-stomp ではこうした独自のセマンティクスを持たせる事で、様々なオリジナル機能を提供しています。逆に rabbitmq-stomp で規定されていない値を SEND や SUBSCRIBE フレームの destination ヘッダで指定した場合、ERROR が帰ってきます。

送信処理(sender.rb)
require 'stomp'

conn = Stomp::Connection.open
conn.publish("/topic/fuga", "hello world!")
受信処理(receiver.rb)
require 'stomp'

conn = Stomp::Connection.open
conn.subscribe '/topic/fuga'
loop { puts conn.receive.body }

(*1) AMQP にどのような exchange があって、それぞれどのような働きをするかについては こちらの記事 が詳しいです。

/exchange/(exchange-name)/xxx

 destination ヘッダーの冒頭に予約語 ‘/exchange’ が指定された場合、それに続く (exchange-name) で指定される RabbitMQ の exchange によってメッセージの転送を実施することができます。RabbitMQ によって提供されている exchange は ‘rabbitmqctl list_exchanges’ コマンドによって取得できます。以下ではメッセージのブロードキャストを行う fanout exchange を使って、メッセージを送受信する処理を記述しています。

送信処理(sender.rb)
require 'stomp'

conn = Stomp::Connection.open
conn.publish("/exchange/amq.fanout/foo", "hello world!")
受信処理(receiver.rb)
require 'stomp'

conn = Stomp::Connection.open
conn.subscribe '/exchange/amq.fanout/foo'
loop { puts conn.receive.body }

/amq/xxx

 ’/amq’ が destination で指定された場合、AMQP 側(RabbitMQ 本体)の処理で作成されたキューに対してメッセージを送ることができます。これによって STOMP クライアントから AMQP のキューに対してメッセージが送れるわけですが、用途は謎です。RabbitMQ 本体で作成した AMQP の名前付きキュー ‘amq-test’ に対して STOMP クライアントからメッセージを送受信する処理を以下に示します。

送信処理(sender.rb)
require 'stomp'

conn = Stomp::Connection.open
conn.publish("/amq/queue/amq-test", "hello world!")
受信処理(receiver.rb)
require 'bunny'

c = Bunny.new(:host => 'localhost', :user => 'guest', :password => 'guest')
c.start

ch = c.create_channel
q = ch.queue('amq-test')

begin
  q.subscribe(:block => true) do |_, _, data|
    puts data
  end
rescue Exception => _
  ch.close
  c.close
end

/temp-queue/xxx

 最後に ‘/temp-queue’ が指定された場合について紹介します。これまでの機能は全て ‘destination’ ヘッダに対して指定する予約語でしたが、これは SEND フレームの ‘reply-to’ ヘッダに記述します。SEND フレームの ‘reply-to’ に ‘/temp-queue’ を指定すると、rabbitmq-stomp はランダムな名前が設定された一時キューを作成し、メッセージを受信する側が受け取る MESSAGE フレームの reply-to ヘッダに一時キューの名前を格納します。MESSAGE フレームを受け取った側は ‘reply-to’ ヘッダに指定されたキューに対してメッセージを送信すると、最初に SEND フレームを送った側において ‘reply-to’ ヘッダで指定したラベルから、メッセージを受け取ることができます。図にすると以下のようになります。

 この仕組みは、特定の相手からのメッセージの返信があるケースで便利です。特に RPC などにおいて効果を発揮します。また client から送った最初の SEND フレームの ‘reply-to’ ヘッダで指定する受信用のキュー名はセッション毎に独立しており、別のプロセスが同名の ‘/temp-queue’ を指定したとしても、別のプロセス宛の返信メッセージが混在することはありません。最後にサンプルプログラムを示します。

送信処理(sender.rb)
require 'stomp'

conn = Stomp::Connection.open
conn.publish("/queue/test-fuga", "hello world!", {
  'reply-to' => '/temp-queue/test'
})
puts conn.receive.body
受信処理(receiver.rb)
conn = Stomp::Connection.open
conn.subscribe '/queue/test-fuga'
loop do
  msg = conn.receive

  puts "received: #{msg.body}"

  conn.publish(msg.headers['reply-to'], '(reply) ' + msg.body)
end

AMQP との比較

 ここまでの話で STOMP がどんなもので、STOMP の実装の一つ rabbitmq-stomp でどんなことが出来るかについて紹介しました。最後に RabbitMQ の AMQP 実装と STOMP 実装での簡単なパフォーマンスベンチマークを取った結果を紹介したいと思います。ベンチマークで用いたスクリプトは こちら になります。またベンチマークの実行環境は以下のとおりです。

 ベンチマークでは、複数のサイズのメッセージを大量に送受信し、処理が完了するまでの時間でスループットの傾向を調べています。以下が実行結果になります。

 横軸が送信した各メッセージサイズを表し、縦軸が処理を終えるまでの時間になります。また棒グラフと折れ線グラフはそれぞれ、送信処理と受信処理を表しています。
 メッセージストアは STOMP の方が高速ですが、メッセージの取得処理は AMQP の方が 2 ~ 3 割ほど早いです。恐らく、rabbitmq-stomp において何がしかの実装上のオーバーヘッドがかかっていることが予想されます。

最後に

 rabbitmq-stomp の Contributor に名前を連ねました☆(ゝω・)vキャピ


PAGE TOP