Page Menu
Home
Phorge
Search
Configure Global Search
Log In
Files
F117986552
D617.1775528060.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Flag For Later
Award Token
Authored By
Unknown
Size
8 KB
Referenced Files
None
Subscribers
None
D617.1775528060.diff
View Options
diff --git a/wallace/__init__.py b/wallace/__init__.py
--- a/wallace/__init__.py
+++ b/wallace/__init__.py
@@ -29,6 +29,7 @@
import struct
import sys
import tempfile
+from threading import _Timer
import time
import traceback
@@ -99,6 +100,15 @@
def worker_process(*args, **kw):
log.debug(_("Worker process %s initializing") % (multiprocessing.current_process().name), level=1)
+class Timer(_Timer):
+ def run(self):
+ while True:
+ while not self.finished.is_set():
+ self.finished.wait(self.interval)
+ self.function(*self.args, **self.kwargs)
+
+ self.finished.set()
+
class WallaceDaemon(object):
def __init__(self):
self.current_connections = 0
@@ -190,6 +200,8 @@
else:
self.pool = multiprocessing.Pool(conf.max_threads, worker_process, ())
+ self.pickup_spool_messages(sync=True)
+
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
@@ -226,54 +238,8 @@
s.listen(5)
- # Mind you to include the trailing slash
- pickup_path = '/var/spool/pykolab/wallace/'
- for root, directory, files in os.walk(pickup_path):
- for filename in files:
- filepath = os.path.join(root, filename)
-
- if not root == pickup_path:
- module = os.path.dirname(root).replace(pickup_path, '')
-
- # Compare uppercase status (specifically, DEFER) with
- # lowercase (plugin names).
- #
- # The messages in DEFER are supposed to be picked up by
- # another thread, whereas the messages in other directories
- # are pending being handled by their respective plugins.
- #
- # TODO: Handle messages in spool directories for which a
- # plugin had been enabled, but is not enabled any longer.
- #
-
- if module.lower() == "defer":
- # Wallace was unable to deliver to re-injection smtpd.
- # Skip it, another thread is picking up the deferred
- # messages.
- continue
-
- stage = root.replace(pickup_path, '').split('/')
- if len(stage) < 2:
- stage = None
- else:
- stage = stage[1]
-
- if stage.lower() == "hold":
- continue
-
- # Do not handle messages in a defer state.
- if stage.lower() == "defer":
- continue
-
- self.current_connections += 1
- self.pool.apply_async(pickup_message, (filepath, (self.modules), {'module': module, 'stage': stage}))
- self.current_connections -= 1
-
- continue
-
- self.current_connections += 1
- self.pool.apply_async(pickup_message, (filepath, (self.modules)))
- self.current_connections -= 1
+ self.timer = Timer(180, self.pickup_spool_messages)
+ self.timer.start()
# start background process to run periodic jobs in active modules
self.heartbeat = multiprocessing.Process(target=modules_heartbeat, args=[self.modules])
@@ -306,6 +272,88 @@
return "X-Kolab-From: " + mailfrom + "\r\n" + \
"X-Kolab-To: " + COMMASPACE.join(rcpttos) + "\r\n"
+ def pickup_spool_messages(self, sync=False):
+ # Mind you to include the trailing slash
+ pickup_path = '/var/spool/pykolab/wallace/'
+
+ messages = []
+ for root, directory, files in os.walk(pickup_path):
+ for filename in files:
+ messages.append((root, filename))
+
+
+ for root, filename in messages:
+ filepath = os.path.join(root, filename)
+
+ try:
+ if os.stat(filepath).st_mtime + 150 > time.time():
+ log.debug("Skipping %s" % (filepath), level=8)
+ continue
+
+ except:
+ continue
+
+ if not root == pickup_path:
+ module = os.path.dirname(root).replace(pickup_path, '')
+
+ # Compare uppercase status (specifically, DEFER) with
+ # lowercase (plugin names).
+ #
+ # The messages in DEFER are supposed to be picked up by
+ # another thread, whereas the messages in other directories
+ # are pending being handled by their respective plugins.
+ #
+ # TODO: Handle messages in spool directories for which a
+ # plugin had been enabled, but is not enabled any longer.
+ #
+
+ if module.lower() == "defer":
+ # Wallace was unable to deliver to re-injection smtpd.
+ # Skip it, another thread is picking up the deferred
+ # messages.
+ continue
+
+ stage = root.replace(pickup_path, '').split('/')
+
+ if len(stage) < 2:
+ stage = None
+ else:
+ stage = stage[1]
+
+ if stage.lower() == "hold":
+ continue
+
+ # Do not handle messages in a defer state.
+ if stage.lower() == "defer":
+ continue
+
+ self.current_connections += 1
+
+ if sync:
+ pickup_message(filepath, self.modules, module=module, stage=stage)
+ else:
+ self.pool.apply_async(
+ pickup_message,
+ (
+ filepath,
+ (self.modules),
+ {'module': module, 'stage': stage}
+ )
+ )
+
+ self.current_connections -= 1
+
+ continue
+
+ self.current_connections += 1
+
+ if sync:
+ pickup_message(filepath, self.modules)
+ else:
+ self.pool.apply_async(pickup_message, (filepath, (self.modules)))
+
+ self.current_connections -= 1
+
def process_message(self, peer, mailfrom, rcpttos, data):
"""
We have retrieved the message. This should be as fast as possible,
diff --git a/wallace/module_invitationpolicy.py b/wallace/module_invitationpolicy.py
--- a/wallace/module_invitationpolicy.py
+++ b/wallace/module_invitationpolicy.py
@@ -19,6 +19,7 @@
import datetime
import os
+import signal
import tempfile
import time
from urlparse import urlparse
@@ -1290,8 +1291,14 @@
msg['From'] = Header(utils.str2unicode('%s' % orgname) if orgname else '')
msg['From'].append("<%s>" % orgemail)
+ seed = random.randint(0, 6)
+ alarm_after = (seed * 10) + 60
+ log.debug(_("Set alarm to %s seconds") % (alarm_after), level=8)
+ signal.alarm(alarm_after)
+
result = modules._sendmail(orgemail, receiving_user['mail'], msg.as_string())
log.debug(_("Sent update notification to %r: %r") % (receiving_user['mail'], result), level=8)
+ signal.alarm(0)
def send_cancel_notification(object, receiving_user, deleted=False, sender=None, comment=None):
"""
@@ -1354,8 +1361,14 @@
msg['From'] = Header(utils.str2unicode('%s' % orgname) if orgname else '')
msg['From'].append("<%s>" % orgemail)
+ seed = random.randint(0, 6)
+ alarm_after = (seed * 10) + 60
+ log.debug(_("Set alarm to %s seconds") % (alarm_after), level=8)
+ signal.alarm(alarm_after)
+
result = modules._sendmail(orgemail, receiving_user['mail'], msg.as_string())
log.debug(_("Sent cancel notification to %r: %r") % (receiving_user['mail'], result), level=8)
+ signal.alarm(0)
def is_auto_reply(user, sender_email, type):
accept_available = False
diff --git a/wallace/module_resources.py b/wallace/module_resources.py
--- a/wallace/module_resources.py
+++ b/wallace/module_resources.py
@@ -22,6 +22,7 @@
import os
import pytz
import random
+import signal
import tempfile
import time
from urlparse import urlparse
@@ -1355,8 +1356,14 @@
resource['cn'], participant_status_label(status) if success else _('failed')
))
+ seed = random.randint(0, 6)
+ alarm_after = (seed * 10) + 60
+ log.debug(_("Set alarm to %s seconds") % (alarm_after), level=8)
+ signal.alarm(alarm_after)
+
result = modules._sendmail(resource['mail'], owner['mail'], msg.as_string())
log.debug(_("Owner notification was sent successfully: %r") % result, level=8)
+ signal.alarm(0)
def owner_notification_text(resource, owner, event, success):
organizer = event.get_organizer()
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Tue, Apr 7, 2:14 AM (5 h, 58 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
18834797
Default Alt Text
D617.1775528060.diff (8 KB)
Attached To
Mode
D617: Manage the pool processes such that they die after a limited quantity of time, and pick up messages from the spool asynchronously.
Attached
Detach File
Event Timeline