diff --git a/acceptance/tests/concurrency/ticket_2659_concurrent_catalog_requests.rb b/acceptance/tests/concurrency/ticket_2659_concurrent_catalog_requests.rb new file mode 100644 index 000000000..eb3241c74 --- /dev/null +++ b/acceptance/tests/concurrency/ticket_2659_concurrent_catalog_requests.rb @@ -0,0 +1,108 @@ +test_name "concurrent catalog requests (PUP-2659)" + +# we're only testing the effects of loading a master with concurrent requests +confine :except, :platform => 'windows' + +step "setup a manifest" + +testdir = master.tmpdir("concurrent") + +apply_manifest_on(master, <<-MANIFEST, :catch_failures => true) + File { + ensure => directory, + owner => #{master['user']}, + group => #{master['group']}, + mode => 0750, + } + + file { '#{testdir}': } + file { '#{testdir}/busy': } + file { '#{testdir}/busy/one.txt': + ensure => file, + mode => 640, + content => "Something to read", + } + file { '#{testdir}/busy/two.txt': + ensure => file, + mode => 640, + content => "Something else to read", + } + file { '#{testdir}/busy/three.txt': + ensure => file, + mode => 640, + content => "Something more else to read", + } + + file { '#{testdir}/manifests': } + file { '#{testdir}/manifests/site.pp': + ensure => file, + content => ' + $foo = inline_template(" + <%- 1000.times do + Dir.glob(\\'#{testdir}/busy/*.txt\\').each do |f| + File.new(f).read + end + end + %> + \\'touched the file system for a bit\\' + ") + notify { "end": + message => $foo, + } + ', + mode => 640, + } +MANIFEST + +step "start master" +master_opts = { + 'main' => { + 'manifest' => "#{testdir}/manifests/site.pp", + } +} +with_puppet_running_on(master, master_opts, testdir) do + + step "concurrent catalog curls (with alliterative alacrity)" + agents.each do |agent| + cert_path = on(agent, puppet('config', 'print', 'hostcert')).stdout.chomp + key_path = on(agent, puppet('config', 'print', 'hostprivkey')).stdout.chomp + cacert_path = on(agent, puppet('config', 'print', 'localcacert')).stdout.chomp + agent_cert = on(agent, puppet('config', 'print', 'certname')).stdout.chomp + + run_count = 6 + agent_tmpdir = agent.tmpdir("concurrent-loop-script") + test_script = "#{agent_tmpdir}/loop.sh" + create_remote_file(agent, test_script, <<-EOF) + declare -a MYPIDS + loops=#{run_count} + + for (( i=0; i<$loops; i++ )); do + ( + sleep_for="0.$(( $RANDOM % 49 ))" + sleep $sleep_for + url='https://#{master}:8140/production/catalog/#{agent_cert}' + echo "Curling: $url" + curl -v -# -H 'Accept: text/pson' --cert #{cert_path} --key #{key_path} --cacert #{cacert_path} $url + echo "$PPID Completed" + ) > "#{agent_tmpdir}/catalog-request-$i.out" 2>&1 & + echo "Launched $!" + MYPIDS[$i]=$! + done + + for (( i=0; i<$loops; i++ )); do + wait ${MYPIDS[$i]} + done + + echo "All requests are finished" + EOF + on(agent, "chmod +x #{test_script}") + on(agent, "#{test_script}") + run_count.times do |i| + step "Checking the results of catalog request ##{i}" + on(agent, "cat #{agent_tmpdir}/catalog-request-#{i}.out") do + assert_match(%r{< HTTP/1.* 200}, stdout) + assert_match(%r{touched the file system for a bit}, stdout) + end + end + end +end diff --git a/lib/puppet/network/http/webrick/rest.rb b/lib/puppet/network/http/webrick/rest.rb index 66987151a..64c859a10 100644 --- a/lib/puppet/network/http/webrick/rest.rb +++ b/lib/puppet/network/http/webrick/rest.rb @@ -1,95 +1,102 @@ require 'puppet/network/http/handler' require 'resolv' require 'webrick' require 'puppet/util/ssl' class Puppet::Network::HTTP::WEBrickREST < WEBrick::HTTPServlet::AbstractServlet include Puppet::Network::HTTP::Handler + def self.mutex + @mutex ||= Mutex.new + end + def initialize(server) raise ArgumentError, "server is required" unless server register([Puppet::Network::HTTP::API::V2.routes, Puppet::Network::HTTP::API::V1.routes]) super(server) end # Retrieve the request parameters, including authentication information. def params(request) params = request.query || {} params = Hash[params.collect do |key, value| all_values = value.list [key, all_values.length == 1 ? value : all_values] end] params = decode_params(params) params.merge(client_information(request)) end - # WEBrick uses a service method to respond to requests. Simply delegate to the handler response method. + # WEBrick uses a service method to respond to requests. Simply delegate to + # the handler response method. def service(request, response) - process(request, response) + self.class.mutex.synchronize do + process(request, response) + end end def headers(request) result = {} request.each do |k, v| result[k.downcase] = v end result end def http_method(request) request.request_method end def path(request) request.path end def body(request) request.body end def client_cert(request) if cert = request.client_cert Puppet::SSL::Certificate.from_instance(cert) else nil end end # Set the specified format as the content type of the response. def set_content_type(response, format) response["content-type"] = format_to_mime(format) end def set_response(response, result, status = 200) response.status = status if status >= 200 and status != 304 response.body = result response["content-length"] = result.stat.size if result.is_a?(File) end end # Retrieve node/cert/ip information from the request object. def client_information(request) result = {} if peer = request.peeraddr and ip = peer[3] result[:ip] = ip end # If they have a certificate (which will almost always be true) # then we get the hostname from the cert, instead of via IP # info result[:authenticated] = false if cert = request.client_cert and cn = Puppet::Util::SSL.cn_from_subject(cert.subject) result[:node] = cn result[:authenticated] = true else result[:node] = resolve_node(result) end result end end