diff --git a/lib/puppet/transaction.rb b/lib/puppet/transaction.rb index 40e17e901..f7cb128c7 100644 --- a/lib/puppet/transaction.rb +++ b/lib/puppet/transaction.rb @@ -1,363 +1,371 @@ require 'puppet' require 'puppet/util/tagging' require 'puppet/application' require 'digest/sha1' require 'set' # the class that actually walks our resource/property tree, collects the changes, # and performs them # # @api private class Puppet::Transaction require 'puppet/transaction/additional_resource_generator' require 'puppet/transaction/event' require 'puppet/transaction/event_manager' require 'puppet/transaction/resource_harness' require 'puppet/resource/status' attr_accessor :catalog, :ignoreschedules, :for_network_device # The report, once generated. attr_reader :report # Routes and stores any events and subscriptions. attr_reader :event_manager # Handles most of the actual interacting with resources attr_reader :resource_harness attr_reader :prefetched_providers include Puppet::Util include Puppet::Util::Tagging def initialize(catalog, report, prioritizer) @catalog = catalog @report = report || Puppet::Transaction::Report.new("apply", catalog.version, catalog.environment) @prioritizer = prioritizer @report.add_times(:config_retrieval, @catalog.retrieval_duration || 0) @event_manager = Puppet::Transaction::EventManager.new(self) @resource_harness = Puppet::Transaction::ResourceHarness.new(self) @prefetched_providers = Hash.new { |h,k| h[k] = {} } end # Invoke the pre_run_check hook in every resource in the catalog. # This should (only) be called by Transaction#evaluate before applying # the catalog. # # @see Puppet::Transaction#evaluate # @see Puppet::Type#pre_run_check # @raise [Puppet::Error] If any pre-run checks failed. # @return [void] def perform_pre_run_checks prerun_errors = {} @catalog.vertices.each do |res| begin res.pre_run_check rescue Puppet::Error => detail prerun_errors[res] = detail end end unless prerun_errors.empty? prerun_errors.each do |res, detail| res.log_exception(detail) end raise Puppet::Error, "Some pre-run checks failed" end end # This method does all the actual work of running a transaction. It # collects all of the changes, executes them, and responds to any # necessary events. def evaluate(&block) block ||= method(:eval_resource) generator = AdditionalResourceGenerator.new(@catalog, relationship_graph, @prioritizer) @catalog.vertices.each { |resource| generator.generate_additional_resources(resource) } perform_pre_run_checks Puppet.info "Applying configuration version '#{catalog.version}'" if catalog.version continue_while = lambda { !stop_processing? } post_evalable_providers = Set.new pre_process = lambda do |resource| prov_class = resource.provider.class post_evalable_providers << prov_class if prov_class.respond_to?(:post_resource_eval) prefetch_if_necessary(resource) # If we generated resources, we don't know what they are now # blocking, so we opt to recompute it, rather than try to track every # change that would affect the number. relationship_graph.clear_blockers if generator.eval_generate(resource) end providerless_types = [] overly_deferred_resource_handler = lambda do |resource| # We don't automatically assign unsuitable providers, so if there # is one, it must have been selected by the user. return if missing_tags?(resource) if resource.provider resource.err "Provider #{resource.provider.class.name} is not functional on this host" else providerless_types << resource.type end resource_status(resource).failed = true end canceled_resource_handler = lambda do |resource| resource_status(resource).skipped = true resource.debug "Transaction canceled, skipping" end teardown = lambda do # Just once per type. No need to punish the user. providerless_types.uniq.each do |type| Puppet.err "Could not find a suitable provider for #{type}" end post_evalable_providers.each do |provider| begin provider.post_resource_eval rescue => detail Puppet.log_exception(detail, "post_resource_eval failed for provider #{provider}") end end end relationship_graph.traverse(:while => continue_while, :pre_process => pre_process, :overly_deferred_resource_handler => overly_deferred_resource_handler, :canceled_resource_handler => canceled_resource_handler, :teardown => teardown) do |resource| if resource.is_a?(Puppet::Type::Component) Puppet.warning "Somehow left a component in the relationship graph" else resource.info "Starting to evaluate the resource" if Puppet[:evaltrace] and @catalog.host_config? seconds = thinmark { block.call(resource) } resource.info "Evaluated in %0.2f seconds" % seconds if Puppet[:evaltrace] and @catalog.host_config? end end Puppet.debug "Finishing transaction #{object_id}" end # Wraps application run state check to flag need to interrupt processing def stop_processing? Puppet::Application.stop_requested? && catalog.host_config? end # Are there any failed resources in this transaction? def any_failed? report.resource_statuses.values.detect { |status| status.failed? } end # Find all of the changed resources. def changed? report.resource_statuses.values.find_all { |status| status.changed }.collect { |status| catalog.resource(status.resource) } end def relationship_graph catalog.relationship_graph(@prioritizer) end def resource_status(resource) report.resource_statuses[resource.to_s] || add_resource_status(Puppet::Resource::Status.new(resource)) end # The tags we should be checking. def tags self.tags = Puppet[:tags] unless defined?(@tags) super end def prefetch_if_necessary(resource) provider_class = resource.provider.class return unless provider_class.respond_to?(:prefetch) and !prefetched_providers[resource.type][provider_class.name] resources = resources_by_provider(resource.type, provider_class.name) if provider_class == resource.class.defaultprovider providerless_resources = resources_by_provider(resource.type, nil) providerless_resources.values.each {|res| res.provider = provider_class.name} resources.merge! providerless_resources end prefetch(provider_class, resources) end private # Apply all changes for a resource def apply(resource, ancestor = nil) status = resource_harness.evaluate(resource) add_resource_status(status) event_manager.queue_events(ancestor || resource, status.events) unless status.failed? rescue => detail resource.err "Could not evaluate: #{detail}" end # Evaluate a single resource. def eval_resource(resource, ancestor = nil) if skip?(resource) resource_status(resource).skipped = true resource.debug("Resource is being skipped, unscheduling all events") event_manager.dequeue_all_events_for_resource(resource) else resource_status(resource).scheduled = true apply(resource, ancestor) event_manager.process_events(resource) end end def failed?(resource) s = resource_status(resource) and s.failed? end # Does this resource have any failed dependencies? def failed_dependencies?(resource) # First make sure there are no failed dependencies. To do this, # we check for failures in any of the vertexes above us. It's not # enough to check the immediate dependencies, which is why we use # a tree from the reversed graph. found_failed = false # When we introduced the :whit into the graph, to reduce the combinatorial # explosion of edges, we also ended up reporting failures for containers # like class and stage. This is undesirable; while just skipping the # output isn't perfect, it is RC-safe. --daniel 2011-06-07 suppress_report = (resource.class == Puppet::Type.type(:whit)) relationship_graph.dependencies(resource).each do |dep| next unless failed?(dep) found_failed = true # See above. --daniel 2011-06-06 unless suppress_report then resource.notice "Dependency #{dep} has failures: #{resource_status(dep).failed}" end end found_failed end # A general method for recursively generating new resources from a # resource. def generate_additional_resources(resource) return unless resource.respond_to?(:generate) begin made = resource.generate rescue => detail resource.log_exception(detail, "Failed to generate additional resources using 'generate': #{detail}") end return unless made made = [made] unless made.is_a?(Array) made.uniq.each do |res| begin res.tag(*resource.tags) @catalog.add_resource(res) res.finish add_conditional_directed_dependency(resource, res) generate_additional_resources(res) rescue Puppet::Resource::Catalog::DuplicateResourceError res.info "Duplicate generated resource; skipping" end end end # Should we ignore tags? def ignore_tags? ! @catalog.host_config? end def resources_by_provider(type_name, provider_name) unless @resources_by_provider @resources_by_provider = Hash.new { |h, k| h[k] = Hash.new { |h1, k1| h1[k1] = {} } } @catalog.vertices.each do |resource| if resource.class.attrclass(:provider) prov = resource.provider && resource.provider.class.name @resources_by_provider[resource.type][prov][resource.name] = resource end end end @resources_by_provider[type_name][provider_name] || {} end # Prefetch any providers that support it, yo. We don't support prefetching # types, just providers. def prefetch(provider_class, resources) type_name = provider_class.resource_type.name return if @prefetched_providers[type_name][provider_class.name] Puppet.debug "Prefetching #{provider_class.name} resources for #{type_name}" begin provider_class.prefetch(resources) rescue LoadError, Puppet::MissingCommand => detail Puppet.log_exception(detail, "Could not prefetch #{type_name} provider '#{provider_class.name}': #{detail}") end @prefetched_providers[type_name][provider_class.name] = true end def add_resource_status(status) report.add_resource_status(status) end # Is the resource currently scheduled? def scheduled?(resource) self.ignoreschedules or resource_harness.scheduled?(resource) end # Should this resource be skipped? def skip?(resource) if missing_tags?(resource) resource.debug "Not tagged with #{tags.join(", ")}" elsif ! scheduled?(resource) resource.debug "Not scheduled" elsif failed_dependencies?(resource) # When we introduced the :whit into the graph, to reduce the combinatorial # explosion of edges, we also ended up reporting failures for containers # like class and stage. This is undesirable; while just skipping the # output isn't perfect, it is RC-safe. --daniel 2011-06-07 unless resource.class == Puppet::Type.type(:whit) then resource.warning "Skipping because of failed dependencies" end elsif resource.virtual? resource.debug "Skipping because virtual" elsif !host_and_device_resource?(resource) && resource.appliable_to_host? && for_network_device resource.debug "Skipping host resources because running on a device" elsif !host_and_device_resource?(resource) && resource.appliable_to_device? && !for_network_device resource.debug "Skipping device resources because running on a posix host" else return false end true end def host_and_device_resource?(resource) resource.appliable_to_host? && resource.appliable_to_device? end # Is this resource tagged appropriately? def missing_tags?(resource) return false if ignore_tags? return false if tags.empty? not resource.tagged?(*tags) end + + # These two methods are only made public to enable the existing spec tests to run + # under rspec 3 (apparently rspec 2 didn't enforce access controls?). Please do not + # treat these as part of a public API. + # Possible future improvement: rewrite to not require access to private methods. + public :skip? + public :missing_tags? + end require 'puppet/transaction/report' diff --git a/lib/puppet/transaction/resource_harness.rb b/lib/puppet/transaction/resource_harness.rb index b9caaf247..5968d79fb 100644 --- a/lib/puppet/transaction/resource_harness.rb +++ b/lib/puppet/transaction/resource_harness.rb @@ -1,250 +1,256 @@ require 'puppet/resource/status' class Puppet::Transaction::ResourceHarness NO_ACTION = Object.new extend Forwardable def_delegators :@transaction, :relationship_graph attr_reader :transaction def initialize(transaction) @transaction = transaction end def evaluate(resource) status = Puppet::Resource::Status.new(resource) begin context = ResourceApplicationContext.from_resource(resource, status) perform_changes(resource, context) if status.changed? && ! resource.noop? cache(resource, :synced, Time.now) resource.flush if resource.respond_to?(:flush) end rescue => detail status.failed_because(detail) ensure status.evaluation_time = Time.now - status.time end status end def scheduled?(resource) return true if Puppet[:ignoreschedules] return true unless schedule = schedule(resource) # We use 'checked' here instead of 'synced' because otherwise we'll # end up checking most resources most times, because they will generally # have been synced a long time ago (e.g., a file only gets updated # once a month on the server and its schedule is daily; the last sync time # will have been a month ago, so we'd end up checking every run). schedule.match?(cached(resource, :checked).to_i) end def schedule(resource) unless resource.catalog resource.warning "Cannot schedule without a schedule-containing catalog" return nil end return nil unless name = resource[:schedule] resource.catalog.resource(:schedule, name) || resource.fail("Could not find schedule #{name}") end # Used mostly for scheduling and auditing at this point. def cached(resource, name) Puppet::Util::Storage.cache(resource)[name] end # Used mostly for scheduling and auditing at this point. def cache(resource, name, value) Puppet::Util::Storage.cache(resource)[name] = value end private def perform_changes(resource, context) cache(resource, :checked, Time.now) return [] if ! allow_changes?(resource) # Record the current state in state.yml. context.audited_params.each do |param| cache(resource, param, context.current_values[param]) end ensure_param = resource.parameter(:ensure) if ensure_param && ensure_param.should ensure_event = sync_if_needed(ensure_param, context) else ensure_event = NO_ACTION end if ensure_event == NO_ACTION if context.resource_present? resource.properties.each do |param| sync_if_needed(param, context) end else resource.debug("Nothing to manage: no ensure and the resource doesn't exist") end end capture_audit_events(resource, context) end def allow_changes?(resource) if resource.purging? and resource.deleting? and deps = relationship_graph.dependents(resource) \ and ! deps.empty? and deps.detect { |d| ! d.deleting? } deplabel = deps.collect { |r| r.ref }.join(",") plurality = deps.length > 1 ? "":"s" resource.warning "#{deplabel} still depend#{plurality} on me -- not purging" false else true end end + # allow_changes? is only made public to enable the existing spec tests to run + # under rspec 3 (apparently rspec 2 didn't enforce access controls?). Please do not + # treat this as part of a public API. + # Possible future improvement: rewrite to not require access to private methods. + public :allow_changes? + def sync_if_needed(param, context) historical_value = context.historical_values[param.name] current_value = context.current_values[param.name] do_audit = context.audited_params.include?(param.name) begin if param.should && !param.safe_insync?(current_value) event = create_change_event(param, current_value, historical_value) if do_audit event = audit_event(event, param) end brief_audit_message = audit_message(param, do_audit, historical_value, current_value) if param.noop noop(event, param, current_value, brief_audit_message) else sync(event, param, current_value, brief_audit_message) end event else NO_ACTION end rescue => detail # Execution will continue on StandardErrors, just store the event Puppet.log_exception(detail) event = create_change_event(param, current_value, historical_value) event.status = "failure" event.message = "change from #{param.is_to_s(current_value)} to #{param.should_to_s(param.should)} failed: #{detail}" event rescue Exception => detail # Execution will halt on Exceptions, they get raised to the application event = create_change_event(param, current_value, historical_value) event.status = "failure" event.message = "change from #{param.is_to_s(current_value)} to #{param.should_to_s(param.should)} failed: #{detail}" raise ensure if event context.record(event) event.send_log context.synced_params << param.name end end end def create_change_event(property, current_value, historical_value) event = property.event event.previous_value = current_value event.desired_value = property.should event.historical_value = historical_value event end # This method is an ugly hack because, given a Time object with nanosecond # resolution, roundtripped through YAML serialization, the Time object will # be truncated to microseconds. # For audit purposes, this code special cases this comparison, and compares # the two objects by their second and microsecond components. tv_sec is the # number of seconds since the epoch, and tv_usec is only the microsecond # portion of time. def are_audited_values_equal(a, b) a == b || (a.is_a?(Time) && b.is_a?(Time) && a.tv_sec == b.tv_sec && a.tv_usec == b.tv_usec) end private :are_audited_values_equal def audit_event(event, property) event.audited = true event.status = "audit" if !are_audited_values_equal(event.historical_value, event.previous_value) event.message = "audit change: previously recorded value #{property.is_to_s(event.historical_value)} has been changed to #{property.is_to_s(event.previous_value)}" end event end def audit_message(param, do_audit, historical_value, current_value) if do_audit && historical_value && !are_audited_values_equal(historical_value, current_value) " (previously recorded value was #{param.is_to_s(historical_value)})" else "" end end def noop(event, param, current_value, audit_message) event.message = "current_value #{param.is_to_s(current_value)}, should be #{param.should_to_s(param.should)} (noop)#{audit_message}" event.status = "noop" end def sync(event, param, current_value, audit_message) param.sync event.message = "#{param.change_to_s(current_value, param.should)}#{audit_message}" event.status = "success" end def capture_audit_events(resource, context) context.audited_params.each do |param_name| if context.historical_values.include?(param_name) if !are_audited_values_equal(context.historical_values[param_name], context.current_values[param_name]) && !context.synced_params.include?(param_name) parameter = resource.parameter(param_name) event = audit_event(create_change_event(parameter, context.current_values[param_name], context.historical_values[param_name]), parameter) event.send_log context.record(event) end else resource.property(param_name).notice "audit change: newly-recorded value #{context.current_values[param_name]}" end end end # @api private ResourceApplicationContext = Struct.new(:resource, :current_values, :historical_values, :audited_params, :synced_params, :status) do def self.from_resource(resource, status) ResourceApplicationContext.new(resource, resource.retrieve_resource.to_hash, Puppet::Util::Storage.cache(resource).dup, (resource[:audit] || []).map { |p| p.to_sym }, [], status) end def resource_present? resource.present?(current_values) end def record(event) status << event end end end