diff --git a/lib/puppet/network/http.rb b/lib/puppet/network/http.rb index 4d4d5bc5f..8d27a3340 100644 --- a/lib/puppet/network/http.rb +++ b/lib/puppet/network/http.rb @@ -1,19 +1,20 @@ module Puppet::Network::HTTP HEADER_ENABLE_PROFILING = "X-Puppet-Profiling" HEADER_PUPPET_VERSION = "X-Puppet-Version" require 'puppet/network/http/issues' require 'puppet/network/http/error' require 'puppet/network/http/route' require 'puppet/network/http/api' require 'puppet/network/http/api/v1' require 'puppet/network/http/api/v2' require 'puppet/network/http/handler' require 'puppet/network/http/response' require 'puppet/network/http/request' require 'puppet/network/http/site' require 'puppet/network/http/session' require 'puppet/network/http/factory' require 'puppet/network/http/dummy_pool' + require 'puppet/network/http/pool' require 'puppet/network/http/memory_response' end diff --git a/lib/puppet/network/http/pool.rb b/lib/puppet/network/http/pool.rb new file mode 100644 index 000000000..bc9a8b08c --- /dev/null +++ b/lib/puppet/network/http/pool.rb @@ -0,0 +1,82 @@ +require 'sync' + +class Puppet::Network::HTTP::Pool + FIFTEEN_SECONDS = 15 + + def initialize(keepalive_timeout = FIFTEEN_SECONDS) + @pool_mutex = Mutex.new + @pool = {} + @keepalive_timeout = keepalive_timeout + end + + def take_connection(site) + @pool_mutex.synchronize do + now = Time.now + + sessions = @pool[site] + if sessions + sessions.each_with_index do |session, idx| + # possible optimzation, since keepalive is immutable, + # connections will be sorted in mru order, and their + # keepalive timeout will be increasing order. So as + # soon as we hit an expired connection, we know + # all that follow are expired too. + if session.expired?(now) + Puppet.debug("Closing expired connection for #{site}") + begin + session.connection.close + rescue => detail + Puppet.log_exception(detail, "Failed to close session for #{site}: #{detail}") + end + else + session = sessions.slice!(idx) + Puppet.debug("Using cached connection for #{site}") + return session.connection + end + end + end + end + + Puppet.debug("Creating new connection for #{site}") + Puppet::Network::HttpPool.http_instance(site.host, site.port, site.scheme == 'https', true) + end + + def add_connection(site, connection) + @pool_mutex.synchronize do + sessions = @pool[site] + + if sessions.nil? + sessions = [] + @pool[site] = sessions + end + + # MRU + expiration = Time.now + @keepalive_timeout + session = Puppet::Network::HTTP::Session.new(connection, expiration) + Puppet.debug("Caching connection for #{site}") + sessions.unshift(session) + end + end + + def connection_count + count = 0 + @pool_mutex.synchronize do + @pool.each_pair do |site, sessions| + count += sessions.count + end + end + count + end + + def close + @pool_mutex.synchronize do + @pool.each_pair do |site, sessions| + sessions.each do |session| + Puppet.debug("Closing connection for #{site}") + session.connection.close + end + end + @pool.clear + end + end +end diff --git a/spec/unit/network/http/pool_spec.rb b/spec/unit/network/http/pool_spec.rb new file mode 100755 index 000000000..076694be3 --- /dev/null +++ b/spec/unit/network/http/pool_spec.rb @@ -0,0 +1,156 @@ +#! /usr/bin/env ruby +require 'spec_helper' + +require 'puppet/network/http' +require 'puppet/network/http_pool' + +describe Puppet::Network::HTTP::Pool do + before :each do + Puppet::SSL::Key.indirection.terminus_class = :memory + Puppet::SSL::CertificateRequest.indirection.terminus_class = :memory + end + + let(:site) do + Puppet::Network::HTTP::Site.new('https', 'rubygems.org', 443) + end + let(:github_site) do + Puppet::Network::HTTP::Site.new('https', 'github.com', 443) + end + + def create_empty_pool + Puppet::Network::HTTP::Pool.new + end + + def create_pool_with_connection(site, connection) + pool = Puppet::Network::HTTP::Pool.new + pool.add_connection(site, connection) + pool + end + + def create_pool_with_expired_connections(site, *connections) + # setting keepalive timeout to -1 ensures any newly added + # connections have already expired + pool = Puppet::Network::HTTP::Pool.new(-1) + connections.each do |conn| + pool.add_connection(site, conn) + end + pool + end + + def create_connection(site) + Puppet::Network::HttpPool.http_instance(site.host, site.port, site.scheme == 'https', false) + end + + def expects_connection_for_site(site) + Puppet::Network::HttpPool.expects(:http_instance).with(site.host, site.port, site.scheme == 'https', true) + end + + context 'when taking a connection' do + it 'returns a new connection if the pool is empty' do + expects_connection_for_site(site) + + pool = create_empty_pool + pool.take_connection(site) + end + + it 'returns a new connection if there are no matching connections for that site' do + connection = create_connection(site) + pool = create_pool_with_connection(site, connection) + + expects_connection_for_site(github_site) + + pool.take_connection(github_site) + end + + it 'takes a matching connection from the pool' do + connection = create_connection(site) + pool = create_pool_with_connection(site, connection) + + expect(pool.take_connection(site)).to eq(connection) + end + + it 'takes the most recently used connection from the pool' do + least_recently_used = create_connection(site) + most_recently_used = create_connection(site) + + pool = create_empty_pool + pool.add_connection(site, least_recently_used) + pool.add_connection(site, most_recently_used) + + expect(pool.take_connection(site)).to eq(most_recently_used) + end + + it 'closes all expired connections' do + conn1 = create_connection(site) + conn2 = create_connection(site) + + conn1.expects(:close) + conn2.expects(:close) + + expects_connection_for_site(site) + + pool = create_pool_with_expired_connections(site, conn1, conn2) + pool.take_connection(site) + end + + it 'logs an exception if it fails to close an expired connection' do + Puppet.expects(:log_exception).with(is_a(IOError), "Failed to close session for #{site}: read timeout") + + connection = create_connection(site) + connection.expects(:close).raises(IOError, 'read timeout') + + pool = create_pool_with_expired_connections(site, connection) + pool.take_connection(site) + end + end + + context 'when adding a connection' do + it 'adds the connection to an empty pool' do + connection = create_connection(site) + pool = create_pool_with_connection(site, connection) + + expect(pool.connection_count).to eq(1) + end + + it 'adds the connection to a pool with a connection for the same site' do + conn1 = create_connection(site) + conn2 = create_connection(site) + + pool = create_empty_pool + pool.add_connection(site, conn1) + pool.add_connection(site, conn2) + + expect(pool.connection_count).to eq(2) + end + + it 'adds the connection to a pool with a connection for a different site' do + connection = create_connection(site) + + pool = create_empty_pool + pool.add_connection(site, connection) + pool.add_connection(github_site, connection) + + expect(pool.connection_count).to eq(2) + end + end + + context 'when closing the pool' do + it 'closes all cached connections' do + connection = create_connection(site) + connection.expects(:close) + + pool = create_pool_with_connection(site, connection) + pool.close + end + + it 'clears the pool' do + connection = create_connection(site) + connection.stubs(:close) + + pool = create_pool_with_connection(site, connection) + pool.close + + expect(pool.connection_count).to eq(0) + end + end +end