diff --git a/wallace/__init__.py b/wallace/__init__.py --- a/wallace/__init__.py +++ b/wallace/__init__.py @@ -29,6 +29,7 @@ import struct import sys import tempfile +from threading import _Timer import time import traceback @@ -99,6 +100,15 @@ def worker_process(*args, **kw): log.debug(_("Worker process %s initializing") % (multiprocessing.current_process().name), level=1) +class Timer(_Timer): + def run(self): + while True: + while not self.finished.is_set(): + self.finished.wait(self.interval) + self.function(*self.args, **self.kwargs) + + self.finished.set() + class WallaceDaemon(object): def __init__(self): self.current_connections = 0 @@ -190,6 +200,8 @@ else: self.pool = multiprocessing.Pool(conf.max_threads, worker_process, ()) + self.pickup_spool_messages(sync=True) + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) @@ -226,54 +238,8 @@ s.listen(5) - # Mind you to include the trailing slash - pickup_path = '/var/spool/pykolab/wallace/' - for root, directory, files in os.walk(pickup_path): - for filename in files: - filepath = os.path.join(root, filename) - - if not root == pickup_path: - module = os.path.dirname(root).replace(pickup_path, '') - - # Compare uppercase status (specifically, DEFER) with - # lowercase (plugin names). - # - # The messages in DEFER are supposed to be picked up by - # another thread, whereas the messages in other directories - # are pending being handled by their respective plugins. - # - # TODO: Handle messages in spool directories for which a - # plugin had been enabled, but is not enabled any longer. - # - - if module.lower() == "defer": - # Wallace was unable to deliver to re-injection smtpd. - # Skip it, another thread is picking up the deferred - # messages. - continue - - stage = root.replace(pickup_path, '').split('/') - if len(stage) < 2: - stage = None - else: - stage = stage[1] - - if stage.lower() == "hold": - continue - - # Do not handle messages in a defer state. - if stage.lower() == "defer": - continue - - self.current_connections += 1 - self.pool.apply_async(pickup_message, (filepath, (self.modules), {'module': module, 'stage': stage})) - self.current_connections -= 1 - - continue - - self.current_connections += 1 - self.pool.apply_async(pickup_message, (filepath, (self.modules))) - self.current_connections -= 1 + self.timer = Timer(180, self.pickup_spool_messages) + self.timer.start() # start background process to run periodic jobs in active modules self.heartbeat = multiprocessing.Process(target=modules_heartbeat, args=[self.modules]) @@ -306,6 +272,88 @@ return "X-Kolab-From: " + mailfrom + "\r\n" + \ "X-Kolab-To: " + COMMASPACE.join(rcpttos) + "\r\n" + def pickup_spool_messages(self, sync=False): + # Mind you to include the trailing slash + pickup_path = '/var/spool/pykolab/wallace/' + + messages = [] + for root, directory, files in os.walk(pickup_path): + for filename in files: + messages.append((root, filename)) + + + for root, filename in messages: + filepath = os.path.join(root, filename) + + try: + if os.stat(filepath).st_mtime + 150 > time.time(): + log.debug("Skipping %s" % (filepath), level=8) + continue + + except: + continue + + if not root == pickup_path: + module = os.path.dirname(root).replace(pickup_path, '') + + # Compare uppercase status (specifically, DEFER) with + # lowercase (plugin names). + # + # The messages in DEFER are supposed to be picked up by + # another thread, whereas the messages in other directories + # are pending being handled by their respective plugins. + # + # TODO: Handle messages in spool directories for which a + # plugin had been enabled, but is not enabled any longer. + # + + if module.lower() == "defer": + # Wallace was unable to deliver to re-injection smtpd. + # Skip it, another thread is picking up the deferred + # messages. + continue + + stage = root.replace(pickup_path, '').split('/') + + if len(stage) < 2: + stage = None + else: + stage = stage[1] + + if stage.lower() == "hold": + continue + + # Do not handle messages in a defer state. + if stage.lower() == "defer": + continue + + self.current_connections += 1 + + if sync: + pickup_message(filepath, self.modules, module=module, stage=stage) + else: + self.pool.apply_async( + pickup_message, + ( + filepath, + (self.modules), + {'module': module, 'stage': stage} + ) + ) + + self.current_connections -= 1 + + continue + + self.current_connections += 1 + + if sync: + pickup_message(filepath, self.modules) + else: + self.pool.apply_async(pickup_message, (filepath, (self.modules))) + + self.current_connections -= 1 + def process_message(self, peer, mailfrom, rcpttos, data): """ We have retrieved the message. This should be as fast as possible, diff --git a/wallace/module_invitationpolicy.py b/wallace/module_invitationpolicy.py --- a/wallace/module_invitationpolicy.py +++ b/wallace/module_invitationpolicy.py @@ -19,6 +19,7 @@ import datetime import os +import signal import tempfile import time from urlparse import urlparse @@ -1290,8 +1291,14 @@ msg['From'] = Header(utils.str2unicode('%s' % orgname) if orgname else '') msg['From'].append("<%s>" % orgemail) + seed = random.randint(0, 6) + alarm_after = (seed * 10) + 60 + log.debug(_("Set alarm to %s seconds") % (alarm_after), level=8) + signal.alarm(alarm_after) + result = modules._sendmail(orgemail, receiving_user['mail'], msg.as_string()) log.debug(_("Sent update notification to %r: %r") % (receiving_user['mail'], result), level=8) + signal.alarm(0) def send_cancel_notification(object, receiving_user, deleted=False, sender=None, comment=None): """ @@ -1354,8 +1361,14 @@ msg['From'] = Header(utils.str2unicode('%s' % orgname) if orgname else '') msg['From'].append("<%s>" % orgemail) + seed = random.randint(0, 6) + alarm_after = (seed * 10) + 60 + log.debug(_("Set alarm to %s seconds") % (alarm_after), level=8) + signal.alarm(alarm_after) + result = modules._sendmail(orgemail, receiving_user['mail'], msg.as_string()) log.debug(_("Sent cancel notification to %r: %r") % (receiving_user['mail'], result), level=8) + signal.alarm(0) def is_auto_reply(user, sender_email, type): accept_available = False diff --git a/wallace/module_resources.py b/wallace/module_resources.py --- a/wallace/module_resources.py +++ b/wallace/module_resources.py @@ -22,6 +22,7 @@ import os import pytz import random +import signal import tempfile import time from urlparse import urlparse @@ -1355,8 +1356,14 @@ resource['cn'], participant_status_label(status) if success else _('failed') )) + seed = random.randint(0, 6) + alarm_after = (seed * 10) + 60 + log.debug(_("Set alarm to %s seconds") % (alarm_after), level=8) + signal.alarm(alarm_after) + result = modules._sendmail(resource['mail'], owner['mail'], msg.as_string()) log.debug(_("Owner notification was sent successfully: %r") % result, level=8) + signal.alarm(0) def owner_notification_text(resource, owner, event, success): organizer = event.get_organizer()