diff --git a/lib/puppet/indirector/queue.rb b/lib/puppet/indirector/queue.rb new file mode 100644 index 000000000..c58af9814 --- /dev/null +++ b/lib/puppet/indirector/queue.rb @@ -0,0 +1,78 @@ +require 'puppet/indirector/terminus' +require 'puppet/util/queue' +require 'yaml' + +# Implements the :queue abstract indirector terminus type, for storing +# model instances to a message queue, presumably for the purpose of out-of-process +# handling of changes related to the model. +# +# Relies upon Puppet::Util::Queue for registry and client object management, +# and specifies a default queue type of :stomp, appropriate for use with a variety of message brokers. +# +# It's up to the queue client type to instantiate itself correctly based on Puppet configuration information. +# +# A single queue client is maintained for the abstract terminus, meaning that you can only use one type +# of queue client, one message broker solution, etc., with the indirection mechanism. +# +# Per-indirection queues are assumed, based on the indirection name. If the :catalog indirection makes +# use of this :queue terminus, queue operations work against the "catalog" queue. It is up to the queue +# client library to handle queue creation as necessary (for a number of popular queuing solutions, queue +# creation is automatic and not a concern). +class Puppet::Indirector::Queue < Puppet::Indirector::Terminus + extend ::Puppet::Util::Queue + self.queue_type_default = :stomp + + # Queue has no idiomatic "find" + def find(request) + nil + end + + # Place the request on the queue + def save(request) + begin + Puppet.info "Queueing catalog for %s" % request.key + client.send_message(queue, render(request.instance)) + rescue => detail + raise Puppet::Error, "Could not write %s to queue: %s\nInstance::%s\n client : %s" % [request.key, detail,request.instance.to_s,client.to_s] + end + end + + def self.queue + indirection_name + end + + def queue + self.class.queue + end + + # Returns the singleton queue client object. + def client + self.class.client + end + + # Formats the model instance associated with _request_ appropriately for message delivery. + # Uses YAML serialization. + def render(obj) + YAML::dump(obj) + end + + # converts the _message_ from deserialized format to an actual model instance. + def self.intern(message) + YAML::load(message) + end + + # Provides queue subscription functionality; for a given indirection, use this method on the terminus + # to subscribe to the indirection-specific queue. Your _block_ will be executed per new indirection + # model received from the queue, with _obj_ being the model instance. + def self.subscribe + client.subscribe(queue) do |msg| + begin + yield(self.intern(msg)) + rescue => detail + # really, this should log the exception rather than raise it all the way up the stack; + # we don't want exceptions resulting from a single message bringing down a listener + raise Puppet::Error, "Error occured with subscription to queue %s for indirection %s: %s" % [queue, indirection_name, detail] + end + end + end +end diff --git a/spec/unit/indirector/queue.rb b/spec/unit/indirector/queue.rb new file mode 100755 index 000000000..de9a27fb2 --- /dev/null +++ b/spec/unit/indirector/queue.rb @@ -0,0 +1,87 @@ +#!/usr/bin/env ruby + +require File.dirname(__FILE__) + '/../../spec_helper' +require 'puppet/indirector/queue' + +class Puppet::Indirector::Queue::TestClient + def self.reset + @queues = {} + end + + def self.queues + @queues ||= {} + end + + def subscribe(queue) + stack = self.class.queues[queue] ||= [] + while stack.length > 0 do + yield(stack.shift) + end + end + + def send_message(queue, message) + stack = self.class.queues[queue] ||= [] + stack.push(message) + queue + end +end + +class FooExampleData + attr_accessor :name +end + +describe Puppet::Indirector::Queue do + before :each do + @indirection = stub 'indirection', :name => :my_queue, :register_terminus_type => nil + Puppet::Indirector::Indirection.stubs(:instance).with(:my_queue).returns(@indirection) + @store_class = Class.new(Puppet::Indirector::Queue) do + def self.to_s + 'MyQueue::MyType' + end + end + @store = @store_class.new + + @subject_class = FooExampleData + @subject = @subject_class.new + @subject.name = :me + + Puppet.settings.stubs(:value).returns("bogus setting data") + Puppet.settings.stubs(:value).with(:queue_client).returns(:test_client) + Puppet::Util::Queue.stubs(:queue_type_to_class).with(:test_client).returns(Puppet::Indirector::Queue::TestClient) + Puppet::Indirector::Queue::TestClient.reset + + @request = stub 'request', :key => :me, :instance => @subject + end + + it 'should use the correct client type and queue' do + @store.queue.should == :my_queue + @store.client.should be_an_instance_of(Puppet::Indirector::Queue::TestClient) + end + + it 'should use render() to convert object to message' do + @store.expects(:render).with(@subject).once + @store.save(@request) + end + + it 'should save and restore with the appropriate queue, and handle subscribe block' do + @subject_two = @subject_class.new + @subject_two.name = :too + @store.save(@request) + @store.save(stub('request_two', :key => 'too', :instance => @subject_two)) + + received = [] + @store_class.subscribe do |obj| + received.push(obj) + end + + received[0].name.should == @subject.name + received[1].name.should == @subject_two.name + end + + it 'should use intern() to convert message to object with subscribe()' do + @store.save(@request) + @store_class.expects(:intern).with(@store.render(@subject)).once + @store_class.subscribe {|o| o } + end +end +