diff --git a/lib/puppet/indirector/queue.rb b/lib/puppet/indirector/queue.rb
index fd089f431..85ffacacc 100644
--- a/lib/puppet/indirector/queue.rb
+++ b/lib/puppet/indirector/queue.rb
@@ -1,81 +1,81 @@
require 'puppet/indirector/terminus'
require 'puppet/util/queue'
require 'puppet/util'
# Implements the :queue abstract indirector terminus type, for storing
# model instances to a message queue, presumably for the purpose of out-of-process
# handling of changes related to the model.
#
# Relies upon Puppet::Util::Queue for registry and client object management,
# and specifies a default queue type of :stomp, appropriate for use with a variety of message brokers.
#
# It's up to the queue client type to instantiate itself correctly based on Puppet configuration information.
#
# A single queue client is maintained for the abstract terminus, meaning that you can only use one type
# of queue client, one message broker solution, etc., with the indirection mechanism.
#
# Per-indirection queues are assumed, based on the indirection name. If the :catalog indirection makes
# use of this :queue terminus, queue operations work against the "catalog" queue. It is up to the queue
# client library to handle queue creation as necessary (for a number of popular queuing solutions, queue
# creation is automatic and not a concern).
class Puppet::Indirector::Queue < Puppet::Indirector::Terminus
extend ::Puppet::Util::Queue
include Puppet::Util
def initialize(*args)
super
raise ArgumentError, "Queueing requires pson support" unless Puppet.features.pson?
end
# Queue has no idiomatic "find"
def find(request)
nil
end
# Place the request on the queue
def save(request)
result = nil
benchmark :info, "Queued #{indirection.name} for #{request.key}" do
- result = client.send_message(queue, request.instance.render(:pson))
+ result = client.publish_message(queue, request.instance.render(:pson))
end
result
rescue => detail
raise Puppet::Error, "Could not write #{request.key} to queue: #{detail}\nInstance::#{request.instance}\n client : #{client}"
end
def self.queue
indirection_name
end
def queue
self.class.queue
end
# Returns the singleton queue client object.
def client
self.class.client
end
# converts the _message_ from deserialized format to an actual model instance.
def self.intern(message)
result = nil
benchmark :info, "Loaded queued #{indirection.name}" do
result = model.convert_from(:pson, message)
end
result
end
# Provides queue subscription functionality; for a given indirection, use this method on the terminus
# to subscribe to the indirection-specific queue. Your _block_ will be executed per new indirection
# model received from the queue, with _obj_ being the model instance.
def self.subscribe
client.subscribe(queue) do |msg|
begin
yield(self.intern(msg))
rescue => detail
puts detail.backtrace if Puppet[:trace]
Puppet.err "Error occured with subscription to queue #{queue} for indirection #{indirection_name}: #{detail}"
end
end
end
end
diff --git a/lib/puppet/util/queue.rb b/lib/puppet/util/queue.rb
index 02357742a..636bdcf2e 100644
--- a/lib/puppet/util/queue.rb
+++ b/lib/puppet/util/queue.rb
@@ -1,96 +1,96 @@
require 'puppet/indirector'
require 'puppet/util/instance_loader'
# Implements a message queue client type plugin registry for use by the indirector facility.
# Client modules for speaking a particular protocol (e.g. Stomp::Client for Stomp message
# brokers, Memcached for Starling and Sparrow, etc.) register themselves with this module.
#
# Client classes are expected to live under the Puppet::Util::Queue namespace and corresponding
# directory; the attempted use of a client via its typename (see below) will cause Puppet::Util::Queue
# to attempt to load the corresponding plugin if it is not yet loaded. The client class registers itself
# with Puppet::Util::Queue and should use the same type name as the autloader expects for the plugin file.
# class Puppet::Util::Queue::SpecialMagicalClient < Messaging::SpecialMagic
# ...
# Puppet::Util::Queue.register_queue_type_class(self)
# end
#
# This module reduces the rightmost segment of the class name into a pretty symbol that will
# serve as the queuing client's name. Which means that the "SpecialMagicalClient" above will
# be named :special_magical_client within the registry.
#
# Another class/module may mix-in this module, and may then make use of the registered clients.
# class Queue::Fue
# # mix it in at the class object level rather than instance level
# extend ::Puppet::Util::Queue
# end
#
# Queue::Fue instances can get a message queue client through the registry through the mixed-in method
# +client+, which will return a class-wide singleton client instance, determined by +client_class+.
#
# The client plugins are expected to implement an interface similar to that of Stomp::Client:
# * new should return a connected, ready-to-go client instance. Note that no arguments are passed in.
-# * send_message(queue, message) should send the _message_ to the specified _queue_.
+# * publish_message(queue, message) should publish the _message_ to the specified _queue_.
# * subscribe(queue) _block_ subscribes to _queue_ and executes _block_ upon receiving a message.
# * _queue_ names are simple names independent of the message broker or client library. No "/queue/" prefixes like in Stomp::Client.
module Puppet::Util::Queue
extend Puppet::Util::InstanceLoader
instance_load :queue_clients, 'puppet/util/queue'
# Adds a new class/queue-type pair to the registry. The _type_ argument is optional; if not provided,
# _type_ defaults to a lowercased, underscored symbol programmatically derived from the rightmost
# namespace of klass.name.
#
# # register with default name +:you+
# register_queue_type(Foo::You)
#
# # register with explicit queue type name +:myself+
# register_queue_type(Foo::Me, :myself)
#
# If the type is already registered, an exception is thrown. No checking is performed of _klass_,
# however; a given class could be registered any number of times, as long as the _type_ differs with
# each registration.
def self.register_queue_type(klass, type = nil)
type ||= queue_type_from_class(klass)
raise Puppet::Error, "Queue type #{type} is already registered" if instance_hash(:queue_clients).include?(type)
instance_hash(:queue_clients)[type] = klass
end
# Given a queue type symbol, returns the associated +Class+ object. If the queue type is unknown
# (meaning it hasn't been registered with this module), an exception is thrown.
def self.queue_type_to_class(type)
c = loaded_instance :queue_clients, type
raise Puppet::Error, "Queue type #{type} is unknown." unless c
c
end
# Given a class object _klass_, returns the programmatic default queue type name symbol for _klass_.
# The algorithm is as shown in earlier examples; the last namespace segment of _klass.name_ is taken
# and converted from mixed case to underscore-separated lowercase, and interned.
# queue_type_from_class(Foo) -> :foo
# queue_type_from_class(Foo::Too) -> :too
# queue_type_from_class(Foo::ForYouTwo) -> :for_you_too
#
# The implicit assumption here, consistent with Puppet's approach to plugins in general,
# is that all your client modules live in the same namespace, such that reduction to
# a flat namespace of symbols is reasonably safe.
def self.queue_type_from_class(klass)
# convert last segment of classname from studly caps to lower case with underscores, and symbolize
klass.name.split('::').pop.sub(/^[A-Z]/) {|c| c.downcase}.gsub(/[A-Z]/) {|c| '_' + c.downcase }.intern
end
# The class object for the client to be used, determined by queue configuration
# settings.
# Looks to the :queue_type configuration entry in the running application for
# the default queue type to use.
def client_class
Puppet::Util::Queue.queue_type_to_class(Puppet[:queue_type])
end
# Returns (instantiating as necessary) the singleton queue client instance, according to the
# client_class. No arguments go to the client class constructor, meaning its up to the client class
# to know how to determine its queue message source (presumably through Puppet configuration data).
def client
@client ||= client_class.new
end
end
diff --git a/lib/puppet/util/queue/stomp.rb b/lib/puppet/util/queue/stomp.rb
index c18edae6a..cabc56627 100644
--- a/lib/puppet/util/queue/stomp.rb
+++ b/lib/puppet/util/queue/stomp.rb
@@ -1,47 +1,47 @@
require 'puppet/util/queue'
require 'stomp'
require 'uri'
# Implements the Ruby Stomp client as a queue type within the Puppet::Indirector::Queue::Client
# registry, for use with the :queue indirection terminus type.
#
# Looks to Puppet[:queue_source] for the sole argument to the underlying Stomp::Client constructor;
# consequently, for this client to work, Puppet[:queue_source] must use the Stomp::Client URL-like
# syntax for identifying the Stomp message broker: login:pass@host.port
class Puppet::Util::Queue::Stomp
attr_accessor :stomp_client
def initialize
begin
uri = URI.parse(Puppet[:queue_source])
rescue => detail
raise ArgumentError, "Could not create Stomp client instance - queue source #{Puppet[:queue_source]} is invalid: #{detail}"
end
unless uri.scheme == "stomp"
raise ArgumentError, "Could not create Stomp client instance - queue source #{Puppet[:queue_source]} is not a Stomp URL: #{detail}"
end
begin
self.stomp_client = Stomp::Client.new(uri.user, uri.password, uri.host, uri.port, true)
rescue => detail
raise ArgumentError, "Could not create Stomp client instance with queue source #{Puppet[:queue_source]}: got internal Stomp client error #{detail}"
end
end
- def send_message(target, msg)
- stomp_client.send(stompify_target(target), msg, :persistent => true)
+ def publish_message(target, msg)
+ stomp_client.publish(stompify_target(target), msg, :persistent => true)
end
def subscribe(target)
stomp_client.subscribe(stompify_target(target), :ack => :client) do |stomp_message|
yield(stomp_message.body)
stomp_client.acknowledge(stomp_message)
end
end
def stompify_target(target)
'/queue/' + target.to_s
end
Puppet::Util::Queue.register_queue_type(self, :stomp)
end
diff --git a/spec/integration/indirector/catalog/queue_spec.rb b/spec/integration/indirector/catalog/queue_spec.rb
index 569f096bf..940c8bab8 100755
--- a/spec/integration/indirector/catalog/queue_spec.rb
+++ b/spec/integration/indirector/catalog/queue_spec.rb
@@ -1,57 +1,57 @@
#!/usr/bin/env rspec
require 'spec_helper'
require 'puppet/resource/catalog'
describe "Puppet::Resource::Catalog::Queue", :if => Puppet.features.pson? do
before do
Puppet::Resource::Catalog.indirection.terminus(:queue)
@catalog = Puppet::Resource::Catalog.new
@one = Puppet::Resource.new(:file, "/one")
@two = Puppet::Resource.new(:file, "/two")
@catalog.add_resource(@one, @two)
@catalog.add_edge(@one, @two)
Puppet[:trace] = true
end
after { Puppet.settings.clear }
- it "should render catalogs to pson and send them via the queue client when catalogs are saved" do
+ it "should render catalogs to pson and publish them via the queue client when catalogs are saved" do
terminus = Puppet::Resource::Catalog.indirection.terminus(:queue)
client = mock 'client'
terminus.stubs(:client).returns client
- client.expects(:send_message).with(:catalog, @catalog.to_pson)
+ client.expects(:publish_message).with(:catalog, @catalog.to_pson)
request = Puppet::Indirector::Request.new(:catalog, :save, "foo", :instance => @catalog)
terminus.save(request)
end
it "should intern catalog messages when they are passed via a subscription" do
client = mock 'client'
Puppet::Resource::Catalog::Queue.stubs(:client).returns client
pson = @catalog.to_pson
client.expects(:subscribe).with(:catalog).yields(pson)
Puppet.expects(:err).never
result = []
Puppet::Resource::Catalog::Queue.subscribe do |catalog|
result << catalog
end
catalog = result.shift
catalog.should be_instance_of(Puppet::Resource::Catalog)
catalog.resource(:file, "/one").should be_instance_of(Puppet::Resource)
catalog.resource(:file, "/two").should be_instance_of(Puppet::Resource)
catalog.should be_edge(catalog.resource(:file, "/one"), catalog.resource(:file, "/two"))
end
end
diff --git a/spec/unit/indirector/queue_spec.rb b/spec/unit/indirector/queue_spec.rb
index b84ed2aea..eba136bbc 100755
--- a/spec/unit/indirector/queue_spec.rb
+++ b/spec/unit/indirector/queue_spec.rb
@@ -1,121 +1,121 @@
#!/usr/bin/env rspec
require 'spec_helper'
require 'puppet/indirector/queue'
class Puppet::Indirector::Queue::TestClient
end
class FooExampleData
attr_accessor :name
def self.pson_create(pson)
new(pson['data'].to_sym)
end
def initialize(name = nil)
@name = name if name
end
def render(format = :pson)
to_pson
end
def to_pson(*args)
{:type => self.class.to_s, :data => name}.to_pson(*args)
end
end
describe Puppet::Indirector::Queue, :if => Puppet.features.pson? do
before :each do
@model = mock 'model'
@indirection = stub 'indirection', :name => :my_queue, :register_terminus_type => nil, :model => @model
Puppet::Indirector::Indirection.stubs(:instance).with(:my_queue).returns(@indirection)
module MyQueue; end
@store_class = class MyQueue::MyType < Puppet::Indirector::Queue
self
end
@store = @store_class.new
@subject_class = FooExampleData
@subject = @subject_class.new
@subject.name = :me
Puppet[:queue_type] = :test_client
Puppet::Util::Queue.stubs(:queue_type_to_class).with(:test_client).returns(Puppet::Indirector::Queue::TestClient)
@request = stub 'request', :key => :me, :instance => @subject
end
it "should require PSON" do
Puppet.features.expects(:pson?).returns false
lambda { @store_class.new }.should raise_error(ArgumentError)
end
it 'should use the correct client type and queue' do
@store.queue.should == :my_queue
@store.client.should be_an_instance_of(Puppet::Indirector::Queue::TestClient)
end
describe "when saving" do
it 'should render the instance using pson' do
@subject.expects(:render).with(:pson)
- @store.client.stubs(:send_message)
+ @store.client.stubs(:publish_message)
@store.save(@request)
end
- it "should send the rendered message to the appropriate queue on the client" do
+ it "should publish the rendered message to the appropriate queue on the client" do
@subject.expects(:render).returns "mypson"
- @store.client.expects(:send_message).with(:my_queue, "mypson")
+ @store.client.expects(:publish_message).with(:my_queue, "mypson")
@store.save(@request)
end
it "should catch any exceptions raised" do
- @store.client.expects(:send_message).raises ArgumentError
+ @store.client.expects(:publish_message).raises ArgumentError
lambda { @store.save(@request) }.should raise_error(Puppet::Error)
end
end
describe "when subscribing to the queue" do
before do
@store_class.stubs(:model).returns @model
end
it "should use the model's Format support to intern the message from pson" do
@model.expects(:convert_from).with(:pson, "mymessage")
@store_class.client.expects(:subscribe).yields("mymessage")
@store_class.subscribe {|o| o }
end
it "should yield each interned received message" do
@model.stubs(:convert_from).returns "something"
@subject_two = @subject_class.new
@subject_two.name = :too
@store_class.client.expects(:subscribe).with(:my_queue).multiple_yields(@subject, @subject_two)
received = []
@store_class.subscribe do |obj|
received.push(obj)
end
received.should == %w{something something}
end
it "should log but not propagate errors" do
@store_class.client.expects(:subscribe).yields("foo")
@store_class.expects(:intern).raises(ArgumentError)
expect { @store_class.subscribe {|o| o } }.should_not raise_error
@logs.length.should == 1
@logs.first.message.should =~ /Error occured with subscription to queue my_queue for indirection my_queue: ArgumentError/
@logs.first.level.should == :err
end
end
end
diff --git a/spec/unit/util/queue/stomp_spec.rb b/spec/unit/util/queue/stomp_spec.rb
index f67189cf5..99c77d0b4 100755
--- a/spec/unit/util/queue/stomp_spec.rb
+++ b/spec/unit/util/queue/stomp_spec.rb
@@ -1,135 +1,135 @@
#!/usr/bin/env rspec
require 'spec_helper'
require 'puppet/util/queue'
describe Puppet::Util::Queue, :if => Puppet.features.stomp? do
it 'should load :stomp client appropriately' do
Puppet.settings.stubs(:value).returns 'faux_queue_source'
Puppet::Util::Queue.queue_type_to_class(:stomp).name.should == 'Puppet::Util::Queue::Stomp'
end
end
describe 'Puppet::Util::Queue::Stomp', :if => Puppet.features.stomp? do
before do
# So we make sure we never create a real client instance.
# Otherwise we'll try to connect, and that's bad.
Stomp::Client.stubs(:new).returns stub("client")
end
it 'should be registered with Puppet::Util::Queue as :stomp type' do
Puppet::Util::Queue.queue_type_to_class(:stomp).should == Puppet::Util::Queue::Stomp
end
describe "when initializing" do
it "should create a Stomp client instance" do
Stomp::Client.expects(:new).returns stub("stomp_client")
Puppet::Util::Queue::Stomp.new
end
it "should provide helpful failures when the queue source is not a valid source" do
# Stub rather than expect, so we can include the source in the error
Puppet.settings.stubs(:value).with(:queue_source).returns "-----"
lambda { Puppet::Util::Queue::Stomp.new }.should raise_error(ArgumentError)
end
it "should fail unless the queue source is a stomp URL" do
# Stub rather than expect, so we can include the source in the error
Puppet.settings.stubs(:value).with(:queue_source).returns "http://foo/bar"
lambda { Puppet::Util::Queue::Stomp.new }.should raise_error(ArgumentError)
end
it "should fail somewhat helpfully if the Stomp client cannot be created" do
Stomp::Client.expects(:new).raises RuntimeError
lambda { Puppet::Util::Queue::Stomp.new }.should raise_error(ArgumentError)
end
list = %w{user password host port}
{"user" => "myuser", "password" => "mypass", "host" => "foohost", "port" => 42}.each do |name, value|
it "should use the #{name} from the queue source as the queueing #{name}" do
Puppet.settings.expects(:value).with(:queue_source).returns "stomp://myuser:mypass@foohost:42/"
Stomp::Client.expects(:new).with { |*args| args[list.index(name)] == value }
Puppet::Util::Queue::Stomp.new
end
end
it "should create a reliable client instance" do
Puppet.settings.expects(:value).with(:queue_source).returns "stomp://myuser@foohost:42/"
Stomp::Client.expects(:new).with { |*args| args[4] == true }
Puppet::Util::Queue::Stomp.new
end
end
- describe "when sending a message" do
+ describe "when publishing a message" do
before do
@client = stub 'client'
Stomp::Client.stubs(:new).returns @client
@queue = Puppet::Util::Queue::Stomp.new
end
- it "should send it to the queue client instance" do
- @client.expects(:send).with { |queue, msg, options| msg == "Smite!" }
- @queue.send_message('fooqueue', 'Smite!')
+ it "should publish it to the queue client instance" do
+ @client.expects(:publish).with { |queue, msg, options| msg == "Smite!" }
+ @queue.publish_message('fooqueue', 'Smite!')
end
- it "should send it to the transformed queue name" do
- @client.expects(:send).with { |queue, msg, options| queue == "/queue/fooqueue" }
- @queue.send_message('fooqueue', 'Smite!')
+ it "should publish it to the transformed queue name" do
+ @client.expects(:publish).with { |queue, msg, options| queue == "/queue/fooqueue" }
+ @queue.publish_message('fooqueue', 'Smite!')
end
- it "should send it as a persistent message" do
- @client.expects(:send).with { |queue, msg, options| options[:persistent] == true }
- @queue.send_message('fooqueue', 'Smite!')
+ it "should publish it as a persistent message" do
+ @client.expects(:publish).with { |queue, msg, options| options[:persistent] == true }
+ @queue.publish_message('fooqueue', 'Smite!')
end
end
describe "when subscribing to a queue" do
before do
@client = stub 'client', :acknowledge => true
Stomp::Client.stubs(:new).returns @client
@queue = Puppet::Util::Queue::Stomp.new
end
it "should subscribe via the queue client instance" do
@client.expects(:subscribe)
@queue.subscribe('fooqueue')
end
it "should subscribe to the transformed queue name" do
@client.expects(:subscribe).with { |queue, options| queue == "/queue/fooqueue" }
@queue.subscribe('fooqueue')
end
it "should specify that its messages should be acknowledged" do
@client.expects(:subscribe).with { |queue, options| options[:ack] == :client }
@queue.subscribe('fooqueue')
end
it "should yield the body of any received message" do
message = mock 'message'
message.expects(:body).returns "mybody"
@client.expects(:subscribe).yields(message)
body = nil
@queue.subscribe('fooqueue') { |b| body = b }
body.should == "mybody"
end
it "should acknowledge all successfully processed messages" do
message = stub 'message', :body => "mybode"
@client.stubs(:subscribe).yields(message)
@client.expects(:acknowledge).with(message)
@queue.subscribe('fooqueue') { |b| "eh" }
end
end
it 'should transform the simple queue name to "/queue/"' do
Puppet::Util::Queue::Stomp.new.stompify_target('blah').should == '/queue/blah'
end
end