diff --git a/lib/puppet/util/queue/stomp.rb b/lib/puppet/util/queue/stomp.rb
index cabc56627..4a7081bc7 100644
--- a/lib/puppet/util/queue/stomp.rb
+++ b/lib/puppet/util/queue/stomp.rb
@@ -1,47 +1,60 @@
require 'puppet/util/queue'
require 'stomp'
require 'uri'
-# Implements the Ruby Stomp client as a queue type within the Puppet::Indirector::Queue::Client
-# registry, for use with the :queue indirection terminus type.
+# Implements the Ruby Stomp client as a queue type within the
+# Puppet::Indirector::Queue::Client registry, for use with the :queue
+# indirection terminus type.
#
-# Looks to Puppet[:queue_source] for the sole argument to the underlying Stomp::Client constructor;
-# consequently, for this client to work, Puppet[:queue_source] must use the Stomp::Client URL-like
-# syntax for identifying the Stomp message broker: login:pass@host.port
+# Looks to Puppet[:queue_source] for the sole argument to the
+# underlying Stomp::Client constructor; consequently, for this client to work,
+# Puppet[:queue_source] must use the Stomp::Client URL-like syntax
+# for identifying the Stomp message broker: login:pass@host.port
class Puppet::Util::Queue::Stomp
attr_accessor :stomp_client
def initialize
begin
uri = URI.parse(Puppet[:queue_source])
rescue => detail
raise ArgumentError, "Could not create Stomp client instance - queue source #{Puppet[:queue_source]} is invalid: #{detail}"
end
unless uri.scheme == "stomp"
raise ArgumentError, "Could not create Stomp client instance - queue source #{Puppet[:queue_source]} is not a Stomp URL: #{detail}"
end
begin
self.stomp_client = Stomp::Client.new(uri.user, uri.password, uri.host, uri.port, true)
rescue => detail
raise ArgumentError, "Could not create Stomp client instance with queue source #{Puppet[:queue_source]}: got internal Stomp client error #{detail}"
end
+
+ # Identify the supported method for sending messages.
+ @method =
+ case
+ when stomp_client.respond_to?(:publish)
+ :publish
+ when stomp_client.respond_to?(:send)
+ :send
+ else
+ raise ArgumentError, "STOMP client does not respond to either publish or send"
+ end
end
def publish_message(target, msg)
- stomp_client.publish(stompify_target(target), msg, :persistent => true)
+ stomp_client.__send__(@method, stompify_target(target), msg, :persistent => true)
end
def subscribe(target)
stomp_client.subscribe(stompify_target(target), :ack => :client) do |stomp_message|
yield(stomp_message.body)
stomp_client.acknowledge(stomp_message)
end
end
def stompify_target(target)
'/queue/' + target.to_s
end
Puppet::Util::Queue.register_queue_type(self, :stomp)
end
diff --git a/spec/unit/util/queue/stomp_spec.rb b/spec/unit/util/queue/stomp_spec.rb
index 91036793c..be8defe58 100755
--- a/spec/unit/util/queue/stomp_spec.rb
+++ b/spec/unit/util/queue/stomp_spec.rb
@@ -1,136 +1,141 @@
#!/usr/bin/env ruby
require File.dirname(__FILE__) + '/../../../spec_helper'
require 'puppet/util/queue'
describe Puppet::Util::Queue, :if => Puppet.features.stomp? do
it 'should load :stomp client appropriately' do
Puppet.settings.stubs(:value).returns 'faux_queue_source'
Puppet::Util::Queue.queue_type_to_class(:stomp).name.should == 'Puppet::Util::Queue::Stomp'
end
end
describe 'Puppet::Util::Queue::Stomp', :if => Puppet.features.stomp? do
before do
# So we make sure we never create a real client instance.
# Otherwise we'll try to connect, and that's bad.
- Stomp::Client.stubs(:new).returns stub("client")
+ Stomp::Client.stubs(:new).returns stub("client", :publish => true)
end
it 'should be registered with Puppet::Util::Queue as :stomp type' do
Puppet::Util::Queue.queue_type_to_class(:stomp).should == Puppet::Util::Queue::Stomp
end
describe "when initializing" do
it "should create a Stomp client instance" do
- Stomp::Client.expects(:new).returns stub("stomp_client")
+ Stomp::Client.expects(:new).returns stub("stomp_client", :publish => true)
Puppet::Util::Queue::Stomp.new
end
it "should provide helpful failures when the queue source is not a valid source" do
# Stub rather than expect, so we can include the source in the error
Puppet.settings.stubs(:value).with(:queue_source).returns "-----"
lambda { Puppet::Util::Queue::Stomp.new }.should raise_error(ArgumentError)
end
it "should fail unless the queue source is a stomp URL" do
# Stub rather than expect, so we can include the source in the error
Puppet.settings.stubs(:value).with(:queue_source).returns "http://foo/bar"
lambda { Puppet::Util::Queue::Stomp.new }.should raise_error(ArgumentError)
end
it "should fail somewhat helpfully if the Stomp client cannot be created" do
Stomp::Client.expects(:new).raises RuntimeError
lambda { Puppet::Util::Queue::Stomp.new }.should raise_error(ArgumentError)
end
list = %w{user password host port}
{"user" => "myuser", "password" => "mypass", "host" => "foohost", "port" => 42}.each do |name, value|
it "should use the #{name} from the queue source as the queueing #{name}" do
Puppet.settings.expects(:value).with(:queue_source).returns "stomp://myuser:mypass@foohost:42/"
Stomp::Client.expects(:new).with { |*args| args[list.index(name)] == value }
Puppet::Util::Queue::Stomp.new
end
end
it "should create a reliable client instance" do
Puppet.settings.expects(:value).with(:queue_source).returns "stomp://myuser@foohost:42/"
Stomp::Client.expects(:new).with { |*args| args[4] == true }
Puppet::Util::Queue::Stomp.new
end
end
describe "when publishing a message" do
before do
- @client = stub 'client'
+ @client = stub 'client', :publish => true
Stomp::Client.stubs(:new).returns @client
@queue = Puppet::Util::Queue::Stomp.new
end
it "should publish it to the queue client instance" do
@client.expects(:publish).with { |queue, msg, options| msg == "Smite!" }
@queue.publish_message('fooqueue', 'Smite!')
end
it "should publish it to the transformed queue name" do
@client.expects(:publish).with { |queue, msg, options| queue == "/queue/fooqueue" }
@queue.publish_message('fooqueue', 'Smite!')
end
it "should publish it as a persistent message" do
@client.expects(:publish).with { |queue, msg, options| options[:persistent] == true }
@queue.publish_message('fooqueue', 'Smite!')
end
+
+ it "should use send when the gem does not support publish" do
+ Stomp::Client.stubs(:new).returns(stub('client', :send => true))
+ Puppet::Util::Queue::Stomp.new.publish_message('fooqueue', 'Smite!')
+ end
end
describe "when subscribing to a queue" do
before do
- @client = stub 'client', :acknowledge => true
+ @client = stub 'client', :acknowledge => true, :publish => true
Stomp::Client.stubs(:new).returns @client
@queue = Puppet::Util::Queue::Stomp.new
end
it "should subscribe via the queue client instance" do
@client.expects(:subscribe)
@queue.subscribe('fooqueue')
end
it "should subscribe to the transformed queue name" do
@client.expects(:subscribe).with { |queue, options| queue == "/queue/fooqueue" }
@queue.subscribe('fooqueue')
end
it "should specify that its messages should be acknowledged" do
@client.expects(:subscribe).with { |queue, options| options[:ack] == :client }
@queue.subscribe('fooqueue')
end
it "should yield the body of any received message" do
message = mock 'message'
message.expects(:body).returns "mybody"
@client.expects(:subscribe).yields(message)
body = nil
@queue.subscribe('fooqueue') { |b| body = b }
body.should == "mybody"
end
it "should acknowledge all successfully processed messages" do
message = stub 'message', :body => "mybode"
@client.stubs(:subscribe).yields(message)
@client.expects(:acknowledge).with(message)
@queue.subscribe('fooqueue') { |b| "eh" }
end
end
it 'should transform the simple queue name to "/queue/"' do
Puppet::Util::Queue::Stomp.new.stompify_target('blah').should == '/queue/blah'
end
end