diff --git a/lib/puppet/indirector/queue.rb b/lib/puppet/indirector/queue.rb
new file mode 100644
index 000000000..c58af9814
--- /dev/null
+++ b/lib/puppet/indirector/queue.rb
@@ -0,0 +1,78 @@
+require 'puppet/indirector/terminus'
+require 'puppet/util/queue'
+require 'yaml'
+
+# Implements the :queue abstract indirector terminus type, for storing
+# model instances to a message queue, presumably for the purpose of out-of-process
+# handling of changes related to the model.
+#
+# Relies upon Puppet::Util::Queue for registry and client object management,
+# and specifies a default queue type of :stomp, appropriate for use with a variety of message brokers.
+#
+# It's up to the queue client type to instantiate itself correctly based on Puppet configuration information.
+#
+# A single queue client is maintained for the abstract terminus, meaning that you can only use one type
+# of queue client, one message broker solution, etc., with the indirection mechanism.
+#
+# Per-indirection queues are assumed, based on the indirection name. If the :catalog indirection makes
+# use of this :queue terminus, queue operations work against the "catalog" queue. It is up to the queue
+# client library to handle queue creation as necessary (for a number of popular queuing solutions, queue
+# creation is automatic and not a concern).
+class Puppet::Indirector::Queue < Puppet::Indirector::Terminus
+ extend ::Puppet::Util::Queue
+ self.queue_type_default = :stomp
+
+ # Queue has no idiomatic "find"
+ def find(request)
+ nil
+ end
+
+ # Place the request on the queue
+ def save(request)
+ begin
+ Puppet.info "Queueing catalog for %s" % request.key
+ client.send_message(queue, render(request.instance))
+ rescue => detail
+ raise Puppet::Error, "Could not write %s to queue: %s\nInstance::%s\n client : %s" % [request.key, detail,request.instance.to_s,client.to_s]
+ end
+ end
+
+ def self.queue
+ indirection_name
+ end
+
+ def queue
+ self.class.queue
+ end
+
+ # Returns the singleton queue client object.
+ def client
+ self.class.client
+ end
+
+ # Formats the model instance associated with _request_ appropriately for message delivery.
+ # Uses YAML serialization.
+ def render(obj)
+ YAML::dump(obj)
+ end
+
+ # converts the _message_ from deserialized format to an actual model instance.
+ def self.intern(message)
+ YAML::load(message)
+ end
+
+ # Provides queue subscription functionality; for a given indirection, use this method on the terminus
+ # to subscribe to the indirection-specific queue. Your _block_ will be executed per new indirection
+ # model received from the queue, with _obj_ being the model instance.
+ def self.subscribe
+ client.subscribe(queue) do |msg|
+ begin
+ yield(self.intern(msg))
+ rescue => detail
+ # really, this should log the exception rather than raise it all the way up the stack;
+ # we don't want exceptions resulting from a single message bringing down a listener
+ raise Puppet::Error, "Error occured with subscription to queue %s for indirection %s: %s" % [queue, indirection_name, detail]
+ end
+ end
+ end
+end
diff --git a/spec/unit/indirector/queue.rb b/spec/unit/indirector/queue.rb
new file mode 100755
index 000000000..de9a27fb2
--- /dev/null
+++ b/spec/unit/indirector/queue.rb
@@ -0,0 +1,87 @@
+#!/usr/bin/env ruby
+
+require File.dirname(__FILE__) + '/../../spec_helper'
+require 'puppet/indirector/queue'
+
+class Puppet::Indirector::Queue::TestClient
+ def self.reset
+ @queues = {}
+ end
+
+ def self.queues
+ @queues ||= {}
+ end
+
+ def subscribe(queue)
+ stack = self.class.queues[queue] ||= []
+ while stack.length > 0 do
+ yield(stack.shift)
+ end
+ end
+
+ def send_message(queue, message)
+ stack = self.class.queues[queue] ||= []
+ stack.push(message)
+ queue
+ end
+end
+
+class FooExampleData
+ attr_accessor :name
+end
+
+describe Puppet::Indirector::Queue do
+ before :each do
+ @indirection = stub 'indirection', :name => :my_queue, :register_terminus_type => nil
+ Puppet::Indirector::Indirection.stubs(:instance).with(:my_queue).returns(@indirection)
+ @store_class = Class.new(Puppet::Indirector::Queue) do
+ def self.to_s
+ 'MyQueue::MyType'
+ end
+ end
+ @store = @store_class.new
+
+ @subject_class = FooExampleData
+ @subject = @subject_class.new
+ @subject.name = :me
+
+ Puppet.settings.stubs(:value).returns("bogus setting data")
+ Puppet.settings.stubs(:value).with(:queue_client).returns(:test_client)
+ Puppet::Util::Queue.stubs(:queue_type_to_class).with(:test_client).returns(Puppet::Indirector::Queue::TestClient)
+ Puppet::Indirector::Queue::TestClient.reset
+
+ @request = stub 'request', :key => :me, :instance => @subject
+ end
+
+ it 'should use the correct client type and queue' do
+ @store.queue.should == :my_queue
+ @store.client.should be_an_instance_of(Puppet::Indirector::Queue::TestClient)
+ end
+
+ it 'should use render() to convert object to message' do
+ @store.expects(:render).with(@subject).once
+ @store.save(@request)
+ end
+
+ it 'should save and restore with the appropriate queue, and handle subscribe block' do
+ @subject_two = @subject_class.new
+ @subject_two.name = :too
+ @store.save(@request)
+ @store.save(stub('request_two', :key => 'too', :instance => @subject_two))
+
+ received = []
+ @store_class.subscribe do |obj|
+ received.push(obj)
+ end
+
+ received[0].name.should == @subject.name
+ received[1].name.should == @subject_two.name
+ end
+
+ it 'should use intern() to convert message to object with subscribe()' do
+ @store.save(@request)
+ @store_class.expects(:intern).with(@store.render(@subject)).once
+ @store_class.subscribe {|o| o }
+ end
+end
+