diff --git a/lib/puppet/indirector/queue.rb b/lib/puppet/indirector/queue.rb index fd089f431..85ffacacc 100644 --- a/lib/puppet/indirector/queue.rb +++ b/lib/puppet/indirector/queue.rb @@ -1,81 +1,81 @@ require 'puppet/indirector/terminus' require 'puppet/util/queue' require 'puppet/util' # Implements the :queue abstract indirector terminus type, for storing # model instances to a message queue, presumably for the purpose of out-of-process # handling of changes related to the model. # # Relies upon Puppet::Util::Queue for registry and client object management, # and specifies a default queue type of :stomp, appropriate for use with a variety of message brokers. # # It's up to the queue client type to instantiate itself correctly based on Puppet configuration information. # # A single queue client is maintained for the abstract terminus, meaning that you can only use one type # of queue client, one message broker solution, etc., with the indirection mechanism. # # Per-indirection queues are assumed, based on the indirection name. If the :catalog indirection makes # use of this :queue terminus, queue operations work against the "catalog" queue. It is up to the queue # client library to handle queue creation as necessary (for a number of popular queuing solutions, queue # creation is automatic and not a concern). class Puppet::Indirector::Queue < Puppet::Indirector::Terminus extend ::Puppet::Util::Queue include Puppet::Util def initialize(*args) super raise ArgumentError, "Queueing requires pson support" unless Puppet.features.pson? end # Queue has no idiomatic "find" def find(request) nil end # Place the request on the queue def save(request) result = nil benchmark :info, "Queued #{indirection.name} for #{request.key}" do - result = client.send_message(queue, request.instance.render(:pson)) + result = client.publish_message(queue, request.instance.render(:pson)) end result rescue => detail raise Puppet::Error, "Could not write #{request.key} to queue: #{detail}\nInstance::#{request.instance}\n client : #{client}" end def self.queue indirection_name end def queue self.class.queue end # Returns the singleton queue client object. def client self.class.client end # converts the _message_ from deserialized format to an actual model instance. def self.intern(message) result = nil benchmark :info, "Loaded queued #{indirection.name}" do result = model.convert_from(:pson, message) end result end # Provides queue subscription functionality; for a given indirection, use this method on the terminus # to subscribe to the indirection-specific queue. Your _block_ will be executed per new indirection # model received from the queue, with _obj_ being the model instance. def self.subscribe client.subscribe(queue) do |msg| begin yield(self.intern(msg)) rescue => detail puts detail.backtrace if Puppet[:trace] Puppet.err "Error occured with subscription to queue #{queue} for indirection #{indirection_name}: #{detail}" end end end end diff --git a/lib/puppet/util/queue.rb b/lib/puppet/util/queue.rb index 02357742a..636bdcf2e 100644 --- a/lib/puppet/util/queue.rb +++ b/lib/puppet/util/queue.rb @@ -1,96 +1,96 @@ require 'puppet/indirector' require 'puppet/util/instance_loader' # Implements a message queue client type plugin registry for use by the indirector facility. # Client modules for speaking a particular protocol (e.g. Stomp::Client for Stomp message # brokers, Memcached for Starling and Sparrow, etc.) register themselves with this module. # # Client classes are expected to live under the Puppet::Util::Queue namespace and corresponding # directory; the attempted use of a client via its typename (see below) will cause Puppet::Util::Queue # to attempt to load the corresponding plugin if it is not yet loaded. The client class registers itself # with Puppet::Util::Queue and should use the same type name as the autloader expects for the plugin file. # class Puppet::Util::Queue::SpecialMagicalClient < Messaging::SpecialMagic # ... # Puppet::Util::Queue.register_queue_type_class(self) # end # # This module reduces the rightmost segment of the class name into a pretty symbol that will # serve as the queuing client's name. Which means that the "SpecialMagicalClient" above will # be named :special_magical_client within the registry. # # Another class/module may mix-in this module, and may then make use of the registered clients. # class Queue::Fue # # mix it in at the class object level rather than instance level # extend ::Puppet::Util::Queue # end # # Queue::Fue instances can get a message queue client through the registry through the mixed-in method # +client+, which will return a class-wide singleton client instance, determined by +client_class+. # # The client plugins are expected to implement an interface similar to that of Stomp::Client: # * new should return a connected, ready-to-go client instance. Note that no arguments are passed in. -# * send_message(queue, message) should send the _message_ to the specified _queue_. +# * publish_message(queue, message) should publish the _message_ to the specified _queue_. # * subscribe(queue) _block_ subscribes to _queue_ and executes _block_ upon receiving a message. # * _queue_ names are simple names independent of the message broker or client library. No "/queue/" prefixes like in Stomp::Client. module Puppet::Util::Queue extend Puppet::Util::InstanceLoader instance_load :queue_clients, 'puppet/util/queue' # Adds a new class/queue-type pair to the registry. The _type_ argument is optional; if not provided, # _type_ defaults to a lowercased, underscored symbol programmatically derived from the rightmost # namespace of klass.name. # # # register with default name +:you+ # register_queue_type(Foo::You) # # # register with explicit queue type name +:myself+ # register_queue_type(Foo::Me, :myself) # # If the type is already registered, an exception is thrown. No checking is performed of _klass_, # however; a given class could be registered any number of times, as long as the _type_ differs with # each registration. def self.register_queue_type(klass, type = nil) type ||= queue_type_from_class(klass) raise Puppet::Error, "Queue type #{type} is already registered" if instance_hash(:queue_clients).include?(type) instance_hash(:queue_clients)[type] = klass end # Given a queue type symbol, returns the associated +Class+ object. If the queue type is unknown # (meaning it hasn't been registered with this module), an exception is thrown. def self.queue_type_to_class(type) c = loaded_instance :queue_clients, type raise Puppet::Error, "Queue type #{type} is unknown." unless c c end # Given a class object _klass_, returns the programmatic default queue type name symbol for _klass_. # The algorithm is as shown in earlier examples; the last namespace segment of _klass.name_ is taken # and converted from mixed case to underscore-separated lowercase, and interned. # queue_type_from_class(Foo) -> :foo # queue_type_from_class(Foo::Too) -> :too # queue_type_from_class(Foo::ForYouTwo) -> :for_you_too # # The implicit assumption here, consistent with Puppet's approach to plugins in general, # is that all your client modules live in the same namespace, such that reduction to # a flat namespace of symbols is reasonably safe. def self.queue_type_from_class(klass) # convert last segment of classname from studly caps to lower case with underscores, and symbolize klass.name.split('::').pop.sub(/^[A-Z]/) {|c| c.downcase}.gsub(/[A-Z]/) {|c| '_' + c.downcase }.intern end # The class object for the client to be used, determined by queue configuration # settings. # Looks to the :queue_type configuration entry in the running application for # the default queue type to use. def client_class Puppet::Util::Queue.queue_type_to_class(Puppet[:queue_type]) end # Returns (instantiating as necessary) the singleton queue client instance, according to the # client_class. No arguments go to the client class constructor, meaning its up to the client class # to know how to determine its queue message source (presumably through Puppet configuration data). def client @client ||= client_class.new end end diff --git a/lib/puppet/util/queue/stomp.rb b/lib/puppet/util/queue/stomp.rb index c18edae6a..cabc56627 100644 --- a/lib/puppet/util/queue/stomp.rb +++ b/lib/puppet/util/queue/stomp.rb @@ -1,47 +1,47 @@ require 'puppet/util/queue' require 'stomp' require 'uri' # Implements the Ruby Stomp client as a queue type within the Puppet::Indirector::Queue::Client # registry, for use with the :queue indirection terminus type. # # Looks to Puppet[:queue_source] for the sole argument to the underlying Stomp::Client constructor; # consequently, for this client to work, Puppet[:queue_source] must use the Stomp::Client URL-like # syntax for identifying the Stomp message broker: login:pass@host.port class Puppet::Util::Queue::Stomp attr_accessor :stomp_client def initialize begin uri = URI.parse(Puppet[:queue_source]) rescue => detail raise ArgumentError, "Could not create Stomp client instance - queue source #{Puppet[:queue_source]} is invalid: #{detail}" end unless uri.scheme == "stomp" raise ArgumentError, "Could not create Stomp client instance - queue source #{Puppet[:queue_source]} is not a Stomp URL: #{detail}" end begin self.stomp_client = Stomp::Client.new(uri.user, uri.password, uri.host, uri.port, true) rescue => detail raise ArgumentError, "Could not create Stomp client instance with queue source #{Puppet[:queue_source]}: got internal Stomp client error #{detail}" end end - def send_message(target, msg) - stomp_client.send(stompify_target(target), msg, :persistent => true) + def publish_message(target, msg) + stomp_client.publish(stompify_target(target), msg, :persistent => true) end def subscribe(target) stomp_client.subscribe(stompify_target(target), :ack => :client) do |stomp_message| yield(stomp_message.body) stomp_client.acknowledge(stomp_message) end end def stompify_target(target) '/queue/' + target.to_s end Puppet::Util::Queue.register_queue_type(self, :stomp) end diff --git a/spec/integration/indirector/catalog/queue_spec.rb b/spec/integration/indirector/catalog/queue_spec.rb index 4581e3062..5a4a8a4ab 100755 --- a/spec/integration/indirector/catalog/queue_spec.rb +++ b/spec/integration/indirector/catalog/queue_spec.rb @@ -1,58 +1,58 @@ #!/usr/bin/env ruby Dir.chdir(File.dirname(__FILE__)) { (s = lambda { |f| File.exist?(f) ? require(f) : Dir.chdir("..") { s.call(f) } }).call("spec/spec_helper.rb") } require 'puppet/resource/catalog' describe "Puppet::Resource::Catalog::Queue", :if => Puppet.features.pson? do before do Puppet::Resource::Catalog.indirection.terminus(:queue) @catalog = Puppet::Resource::Catalog.new @one = Puppet::Resource.new(:file, "/one") @two = Puppet::Resource.new(:file, "/two") @catalog.add_resource(@one, @two) @catalog.add_edge(@one, @two) Puppet[:trace] = true end after { Puppet.settings.clear } - it "should render catalogs to pson and send them via the queue client when catalogs are saved" do + it "should render catalogs to pson and publish them via the queue client when catalogs are saved" do terminus = Puppet::Resource::Catalog.indirection.terminus(:queue) client = mock 'client' terminus.stubs(:client).returns client - client.expects(:send_message).with(:catalog, @catalog.to_pson) + client.expects(:publish_message).with(:catalog, @catalog.to_pson) request = Puppet::Indirector::Request.new(:catalog, :save, "foo", :instance => @catalog) terminus.save(request) end it "should intern catalog messages when they are passed via a subscription" do client = mock 'client' Puppet::Resource::Catalog::Queue.stubs(:client).returns client pson = @catalog.to_pson client.expects(:subscribe).with(:catalog).yields(pson) Puppet.expects(:err).never result = [] Puppet::Resource::Catalog::Queue.subscribe do |catalog| result << catalog end catalog = result.shift catalog.should be_instance_of(Puppet::Resource::Catalog) catalog.resource(:file, "/one").should be_instance_of(Puppet::Resource) catalog.resource(:file, "/two").should be_instance_of(Puppet::Resource) catalog.should be_edge(catalog.resource(:file, "/one"), catalog.resource(:file, "/two")) end end diff --git a/spec/unit/indirector/queue_spec.rb b/spec/unit/indirector/queue_spec.rb index bbe00c75f..bfd7598ad 100755 --- a/spec/unit/indirector/queue_spec.rb +++ b/spec/unit/indirector/queue_spec.rb @@ -1,124 +1,124 @@ #!/usr/bin/env ruby require File.dirname(__FILE__) + '/../../spec_helper' require 'puppet/indirector/queue' class Puppet::Indirector::Queue::TestClient end class FooExampleData attr_accessor :name def self.pson_create(pson) new(pson['data'].to_sym) end def initialize(name = nil) @name = name if name end def render(format = :pson) to_pson end def to_pson(*args) {:type => self.class.to_s, :data => name}.to_pson(*args) end end describe Puppet::Indirector::Queue, :if => Puppet.features.pson? do before :each do @model = mock 'model' @indirection = stub 'indirection', :name => :my_queue, :register_terminus_type => nil, :model => @model Puppet::Indirector::Indirection.stubs(:instance).with(:my_queue).returns(@indirection) @store_class = Class.new(Puppet::Indirector::Queue) do def self.to_s 'MyQueue::MyType' end end @store = @store_class.new @subject_class = FooExampleData @subject = @subject_class.new @subject.name = :me Puppet.settings.stubs(:value).returns("bogus setting data") Puppet.settings.stubs(:value).with(:queue_type).returns(:test_client) Puppet::Util::Queue.stubs(:queue_type_to_class).with(:test_client).returns(Puppet::Indirector::Queue::TestClient) @request = stub 'request', :key => :me, :instance => @subject end it "should require PSON" do Puppet.features.expects(:pson?).returns false lambda { @store_class.new }.should raise_error(ArgumentError) end it 'should use the correct client type and queue' do @store.queue.should == :my_queue @store.client.should be_an_instance_of(Puppet::Indirector::Queue::TestClient) end describe "when saving" do it 'should render the instance using pson' do @subject.expects(:render).with(:pson) - @store.client.stubs(:send_message) + @store.client.stubs(:publish_message) @store.save(@request) end - it "should send the rendered message to the appropriate queue on the client" do + it "should publish the rendered message to the appropriate queue on the client" do @subject.expects(:render).returns "mypson" - @store.client.expects(:send_message).with(:my_queue, "mypson") + @store.client.expects(:publish_message).with(:my_queue, "mypson") @store.save(@request) end it "should catch any exceptions raised" do - @store.client.expects(:send_message).raises ArgumentError + @store.client.expects(:publish_message).raises ArgumentError lambda { @store.save(@request) }.should raise_error(Puppet::Error) end end describe "when subscribing to the queue" do before do @store_class.stubs(:model).returns @model end it "should use the model's Format support to intern the message from pson" do @model.expects(:convert_from).with(:pson, "mymessage") @store_class.client.expects(:subscribe).yields("mymessage") @store_class.subscribe {|o| o } end it "should yield each interned received message" do @model.stubs(:convert_from).returns "something" @subject_two = @subject_class.new @subject_two.name = :too @store_class.client.expects(:subscribe).with(:my_queue).multiple_yields(@subject, @subject_two) received = [] @store_class.subscribe do |obj| received.push(obj) end received.should == %w{something something} end it "should log but not propagate errors" do @store_class.client.expects(:subscribe).yields("foo") @store_class.expects(:intern).raises ArgumentError Puppet.expects(:err) @store_class.expects(:puts) @store_class.subscribe {|o| o } end end end diff --git a/spec/unit/util/queue/stomp_spec.rb b/spec/unit/util/queue/stomp_spec.rb index c33f1a670..91036793c 100755 --- a/spec/unit/util/queue/stomp_spec.rb +++ b/spec/unit/util/queue/stomp_spec.rb @@ -1,136 +1,136 @@ #!/usr/bin/env ruby require File.dirname(__FILE__) + '/../../../spec_helper' require 'puppet/util/queue' describe Puppet::Util::Queue, :if => Puppet.features.stomp? do it 'should load :stomp client appropriately' do Puppet.settings.stubs(:value).returns 'faux_queue_source' Puppet::Util::Queue.queue_type_to_class(:stomp).name.should == 'Puppet::Util::Queue::Stomp' end end describe 'Puppet::Util::Queue::Stomp', :if => Puppet.features.stomp? do before do # So we make sure we never create a real client instance. # Otherwise we'll try to connect, and that's bad. Stomp::Client.stubs(:new).returns stub("client") end it 'should be registered with Puppet::Util::Queue as :stomp type' do Puppet::Util::Queue.queue_type_to_class(:stomp).should == Puppet::Util::Queue::Stomp end describe "when initializing" do it "should create a Stomp client instance" do Stomp::Client.expects(:new).returns stub("stomp_client") Puppet::Util::Queue::Stomp.new end it "should provide helpful failures when the queue source is not a valid source" do # Stub rather than expect, so we can include the source in the error Puppet.settings.stubs(:value).with(:queue_source).returns "-----" lambda { Puppet::Util::Queue::Stomp.new }.should raise_error(ArgumentError) end it "should fail unless the queue source is a stomp URL" do # Stub rather than expect, so we can include the source in the error Puppet.settings.stubs(:value).with(:queue_source).returns "http://foo/bar" lambda { Puppet::Util::Queue::Stomp.new }.should raise_error(ArgumentError) end it "should fail somewhat helpfully if the Stomp client cannot be created" do Stomp::Client.expects(:new).raises RuntimeError lambda { Puppet::Util::Queue::Stomp.new }.should raise_error(ArgumentError) end list = %w{user password host port} {"user" => "myuser", "password" => "mypass", "host" => "foohost", "port" => 42}.each do |name, value| it "should use the #{name} from the queue source as the queueing #{name}" do Puppet.settings.expects(:value).with(:queue_source).returns "stomp://myuser:mypass@foohost:42/" Stomp::Client.expects(:new).with { |*args| args[list.index(name)] == value } Puppet::Util::Queue::Stomp.new end end it "should create a reliable client instance" do Puppet.settings.expects(:value).with(:queue_source).returns "stomp://myuser@foohost:42/" Stomp::Client.expects(:new).with { |*args| args[4] == true } Puppet::Util::Queue::Stomp.new end end - describe "when sending a message" do + describe "when publishing a message" do before do @client = stub 'client' Stomp::Client.stubs(:new).returns @client @queue = Puppet::Util::Queue::Stomp.new end - it "should send it to the queue client instance" do - @client.expects(:send).with { |queue, msg, options| msg == "Smite!" } - @queue.send_message('fooqueue', 'Smite!') + it "should publish it to the queue client instance" do + @client.expects(:publish).with { |queue, msg, options| msg == "Smite!" } + @queue.publish_message('fooqueue', 'Smite!') end - it "should send it to the transformed queue name" do - @client.expects(:send).with { |queue, msg, options| queue == "/queue/fooqueue" } - @queue.send_message('fooqueue', 'Smite!') + it "should publish it to the transformed queue name" do + @client.expects(:publish).with { |queue, msg, options| queue == "/queue/fooqueue" } + @queue.publish_message('fooqueue', 'Smite!') end - it "should send it as a persistent message" do - @client.expects(:send).with { |queue, msg, options| options[:persistent] == true } - @queue.send_message('fooqueue', 'Smite!') + it "should publish it as a persistent message" do + @client.expects(:publish).with { |queue, msg, options| options[:persistent] == true } + @queue.publish_message('fooqueue', 'Smite!') end end describe "when subscribing to a queue" do before do @client = stub 'client', :acknowledge => true Stomp::Client.stubs(:new).returns @client @queue = Puppet::Util::Queue::Stomp.new end it "should subscribe via the queue client instance" do @client.expects(:subscribe) @queue.subscribe('fooqueue') end it "should subscribe to the transformed queue name" do @client.expects(:subscribe).with { |queue, options| queue == "/queue/fooqueue" } @queue.subscribe('fooqueue') end it "should specify that its messages should be acknowledged" do @client.expects(:subscribe).with { |queue, options| options[:ack] == :client } @queue.subscribe('fooqueue') end it "should yield the body of any received message" do message = mock 'message' message.expects(:body).returns "mybody" @client.expects(:subscribe).yields(message) body = nil @queue.subscribe('fooqueue') { |b| body = b } body.should == "mybody" end it "should acknowledge all successfully processed messages" do message = stub 'message', :body => "mybode" @client.stubs(:subscribe).yields(message) @client.expects(:acknowledge).with(message) @queue.subscribe('fooqueue') { |b| "eh" } end end it 'should transform the simple queue name to "/queue/"' do Puppet::Util::Queue::Stomp.new.stompify_target('blah').should == '/queue/blah' end end