diff --git a/lib/puppet/util/queue/stomp.rb b/lib/puppet/util/queue/stomp.rb index cabc56627..4a7081bc7 100644 --- a/lib/puppet/util/queue/stomp.rb +++ b/lib/puppet/util/queue/stomp.rb @@ -1,47 +1,60 @@ require 'puppet/util/queue' require 'stomp' require 'uri' -# Implements the Ruby Stomp client as a queue type within the Puppet::Indirector::Queue::Client -# registry, for use with the :queue indirection terminus type. +# Implements the Ruby Stomp client as a queue type within the +# Puppet::Indirector::Queue::Client registry, for use with the :queue +# indirection terminus type. # -# Looks to Puppet[:queue_source] for the sole argument to the underlying Stomp::Client constructor; -# consequently, for this client to work, Puppet[:queue_source] must use the Stomp::Client URL-like -# syntax for identifying the Stomp message broker: login:pass@host.port +# Looks to Puppet[:queue_source] for the sole argument to the +# underlying Stomp::Client constructor; consequently, for this client to work, +# Puppet[:queue_source] must use the Stomp::Client URL-like syntax +# for identifying the Stomp message broker: login:pass@host.port class Puppet::Util::Queue::Stomp attr_accessor :stomp_client def initialize begin uri = URI.parse(Puppet[:queue_source]) rescue => detail raise ArgumentError, "Could not create Stomp client instance - queue source #{Puppet[:queue_source]} is invalid: #{detail}" end unless uri.scheme == "stomp" raise ArgumentError, "Could not create Stomp client instance - queue source #{Puppet[:queue_source]} is not a Stomp URL: #{detail}" end begin self.stomp_client = Stomp::Client.new(uri.user, uri.password, uri.host, uri.port, true) rescue => detail raise ArgumentError, "Could not create Stomp client instance with queue source #{Puppet[:queue_source]}: got internal Stomp client error #{detail}" end + + # Identify the supported method for sending messages. + @method = + case + when stomp_client.respond_to?(:publish) + :publish + when stomp_client.respond_to?(:send) + :send + else + raise ArgumentError, "STOMP client does not respond to either publish or send" + end end def publish_message(target, msg) - stomp_client.publish(stompify_target(target), msg, :persistent => true) + stomp_client.__send__(@method, stompify_target(target), msg, :persistent => true) end def subscribe(target) stomp_client.subscribe(stompify_target(target), :ack => :client) do |stomp_message| yield(stomp_message.body) stomp_client.acknowledge(stomp_message) end end def stompify_target(target) '/queue/' + target.to_s end Puppet::Util::Queue.register_queue_type(self, :stomp) end diff --git a/spec/unit/util/queue/stomp_spec.rb b/spec/unit/util/queue/stomp_spec.rb index 6799becea..7730ab7cb 100755 --- a/spec/unit/util/queue/stomp_spec.rb +++ b/spec/unit/util/queue/stomp_spec.rb @@ -1,135 +1,140 @@ #!/usr/bin/env rspec require 'spec_helper' require 'puppet/util/queue' describe Puppet::Util::Queue, :if => Puppet.features.stomp?, :'fails_on_ruby_1.9.2' => true do it 'should load :stomp client appropriately' do Puppet.settings.stubs(:value).returns 'faux_queue_source' Puppet::Util::Queue.queue_type_to_class(:stomp).name.should == 'Puppet::Util::Queue::Stomp' end end describe 'Puppet::Util::Queue::Stomp', :if => Puppet.features.stomp?, :'fails_on_ruby_1.9.2' => true do before do # So we make sure we never create a real client instance. # Otherwise we'll try to connect, and that's bad. - Stomp::Client.stubs(:new).returns stub("client") + Stomp::Client.stubs(:new).returns stub("client", :publish => true) end it 'should be registered with Puppet::Util::Queue as :stomp type' do Puppet::Util::Queue.queue_type_to_class(:stomp).should == Puppet::Util::Queue::Stomp end describe "when initializing" do it "should create a Stomp client instance" do - Stomp::Client.expects(:new).returns stub("stomp_client") + Stomp::Client.expects(:new).returns stub("stomp_client", :publish => true) Puppet::Util::Queue::Stomp.new end it "should provide helpful failures when the queue source is not a valid source" do # Stub rather than expect, so we can include the source in the error Puppet.settings.stubs(:value).with(:queue_source).returns "-----" lambda { Puppet::Util::Queue::Stomp.new }.should raise_error(ArgumentError) end it "should fail unless the queue source is a stomp URL" do # Stub rather than expect, so we can include the source in the error Puppet.settings.stubs(:value).with(:queue_source).returns "http://foo/bar" lambda { Puppet::Util::Queue::Stomp.new }.should raise_error(ArgumentError) end it "should fail somewhat helpfully if the Stomp client cannot be created" do Stomp::Client.expects(:new).raises RuntimeError lambda { Puppet::Util::Queue::Stomp.new }.should raise_error(ArgumentError) end list = %w{user password host port} {"user" => "myuser", "password" => "mypass", "host" => "foohost", "port" => 42}.each do |name, value| it "should use the #{name} from the queue source as the queueing #{name}" do Puppet.settings.expects(:value).with(:queue_source).returns "stomp://myuser:mypass@foohost:42/" Stomp::Client.expects(:new).with { |*args| args[list.index(name)] == value } Puppet::Util::Queue::Stomp.new end end it "should create a reliable client instance" do Puppet.settings.expects(:value).with(:queue_source).returns "stomp://myuser@foohost:42/" Stomp::Client.expects(:new).with { |*args| args[4] == true } Puppet::Util::Queue::Stomp.new end end describe "when publishing a message" do before do - @client = stub 'client' + @client = stub 'client', :publish => true Stomp::Client.stubs(:new).returns @client @queue = Puppet::Util::Queue::Stomp.new end it "should publish it to the queue client instance" do @client.expects(:publish).with { |queue, msg, options| msg == "Smite!" } @queue.publish_message('fooqueue', 'Smite!') end it "should publish it to the transformed queue name" do @client.expects(:publish).with { |queue, msg, options| queue == "/queue/fooqueue" } @queue.publish_message('fooqueue', 'Smite!') end it "should publish it as a persistent message" do @client.expects(:publish).with { |queue, msg, options| options[:persistent] == true } @queue.publish_message('fooqueue', 'Smite!') end + + it "should use send when the gem does not support publish" do + Stomp::Client.stubs(:new).returns(stub('client', :send => true)) + Puppet::Util::Queue::Stomp.new.publish_message('fooqueue', 'Smite!') + end end describe "when subscribing to a queue" do before do - @client = stub 'client', :acknowledge => true + @client = stub 'client', :acknowledge => true, :publish => true Stomp::Client.stubs(:new).returns @client @queue = Puppet::Util::Queue::Stomp.new end it "should subscribe via the queue client instance" do @client.expects(:subscribe) @queue.subscribe('fooqueue') end it "should subscribe to the transformed queue name" do @client.expects(:subscribe).with { |queue, options| queue == "/queue/fooqueue" } @queue.subscribe('fooqueue') end it "should specify that its messages should be acknowledged" do @client.expects(:subscribe).with { |queue, options| options[:ack] == :client } @queue.subscribe('fooqueue') end it "should yield the body of any received message" do message = mock 'message' message.expects(:body).returns "mybody" @client.expects(:subscribe).yields(message) body = nil @queue.subscribe('fooqueue') { |b| body = b } body.should == "mybody" end it "should acknowledge all successfully processed messages" do message = stub 'message', :body => "mybode" @client.stubs(:subscribe).yields(message) @client.expects(:acknowledge).with(message) @queue.subscribe('fooqueue') { |b| "eh" } end end it 'should transform the simple queue name to "/queue/"' do Puppet::Util::Queue::Stomp.new.stompify_target('blah').should == '/queue/blah' end end