Changeset View
Changeset View
Standalone View
Standalone View
wallace/__init__.py
Show All 23 Lines | |||||
import os | import os | ||||
import pwd | import pwd | ||||
import traceback | import traceback | ||||
from smtpd import SMTPChannel | from smtpd import SMTPChannel | ||||
import socket | import socket | ||||
import struct | import struct | ||||
import sys | import sys | ||||
import tempfile | import tempfile | ||||
from threading import _Timer | |||||
import time | import time | ||||
import traceback | import traceback | ||||
import pykolab | import pykolab | ||||
from pykolab import utils | from pykolab import utils | ||||
from pykolab.translate import _ | from pykolab.translate import _ | ||||
log = pykolab.getLogger('pykolab.wallace') | log = pykolab.getLogger('pykolab.wallace') | ||||
▲ Show 20 Lines • Show All 54 Lines • ▼ Show 20 Lines | while True: | ||||
time.sleep(60) | time.sleep(60) | ||||
except (SystemExit, KeyboardInterrupt), e: | except (SystemExit, KeyboardInterrupt), e: | ||||
log.info("Terminating heartbeat process") | log.info("Terminating heartbeat process") | ||||
break | break | ||||
def worker_process(*args, **kw): | def worker_process(*args, **kw): | ||||
log.debug(_("Worker process %s initializing") % (multiprocessing.current_process().name), level=1) | log.debug(_("Worker process %s initializing") % (multiprocessing.current_process().name), level=1) | ||||
class Timer(_Timer): | |||||
def run(self): | |||||
while True: | |||||
while not self.finished.is_set(): | |||||
self.finished.wait(self.interval) | |||||
self.function(*self.args, **self.kwargs) | |||||
self.finished.set() | |||||
class WallaceDaemon(object): | class WallaceDaemon(object): | ||||
def __init__(self): | def __init__(self): | ||||
self.current_connections = 0 | self.current_connections = 0 | ||||
self.max_connections = 24 | self.max_connections = 24 | ||||
self.pool = None | self.pool = None | ||||
daemon_group = conf.add_cli_parser_option_group(_("Daemon Options")) | daemon_group = conf.add_cli_parser_option_group(_("Daemon Options")) | ||||
▲ Show 20 Lines • Show All 75 Lines • ▼ Show 20 Lines | def __init__(self): | ||||
self.modules = [] | self.modules = [] | ||||
def do_wallace(self): | def do_wallace(self): | ||||
if version.StrictVersion(sys.version[:3]) >= version.StrictVersion("2.7"): | if version.StrictVersion(sys.version[:3]) >= version.StrictVersion("2.7"): | ||||
self.pool = multiprocessing.Pool(conf.max_threads, worker_process, (), 1) | self.pool = multiprocessing.Pool(conf.max_threads, worker_process, (), 1) | ||||
else: | else: | ||||
self.pool = multiprocessing.Pool(conf.max_threads, worker_process, ()) | self.pool = multiprocessing.Pool(conf.max_threads, worker_process, ()) | ||||
self.pickup_spool_messages(sync=True) | |||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | ||||
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | ||||
bound = False | bound = False | ||||
shutdown = False | shutdown = False | ||||
while not bound: | while not bound: | ||||
try: | try: | ||||
if shutdown: | if shutdown: | ||||
Show All 20 Lines | def do_wallace(self): | ||||
time.sleep(1) | time.sleep(1) | ||||
s.close() | s.close() | ||||
time.sleep(1) | time.sleep(1) | ||||
s.listen(5) | s.listen(5) | ||||
self.timer = Timer(180, self.pickup_spool_messages) | |||||
self.timer.start() | |||||
# start background process to run periodic jobs in active modules | |||||
self.heartbeat = multiprocessing.Process(target=modules_heartbeat, args=[self.modules]) | |||||
self.heartbeat.daemon = True | |||||
self.heartbeat.start() | |||||
try: | |||||
while 1: | |||||
while self.current_connections >= self.max_connections: | |||||
time.sleep(0.5) | |||||
pair = s.accept() | |||||
log.info(_("Accepted connection")) | |||||
if not pair == None: | |||||
self.current_connections += 1 | |||||
connection, address = pair | |||||
channel = SMTPChannel(self, connection, address) | |||||
asyncore.loop() | |||||
except Exception, errmsg: | |||||
traceback.print_exc() | |||||
s.shutdown(1) | |||||
s.close() | |||||
# shut down hearbeat process | |||||
self.heartbeat.terminate() | |||||
def data_header(self, mailfrom, rcpttos): | |||||
COMMASPACE = ', ' | |||||
return "X-Kolab-From: " + mailfrom + "\r\n" + \ | |||||
"X-Kolab-To: " + COMMASPACE.join(rcpttos) + "\r\n" | |||||
def pickup_spool_messages(self, sync=False): | |||||
# Mind you to include the trailing slash | # Mind you to include the trailing slash | ||||
pickup_path = '/var/spool/pykolab/wallace/' | pickup_path = '/var/spool/pykolab/wallace/' | ||||
messages = [] | |||||
for root, directory, files in os.walk(pickup_path): | for root, directory, files in os.walk(pickup_path): | ||||
for filename in files: | for filename in files: | ||||
messages.append((root, filename)) | |||||
for root, filename in messages: | |||||
filepath = os.path.join(root, filename) | filepath = os.path.join(root, filename) | ||||
try: | |||||
if os.stat(filepath).st_mtime + 150 > time.time(): | |||||
log.debug("Skipping %s" % (filepath), level=8) | |||||
continue | |||||
except: | |||||
continue | |||||
if not root == pickup_path: | if not root == pickup_path: | ||||
module = os.path.dirname(root).replace(pickup_path, '') | module = os.path.dirname(root).replace(pickup_path, '') | ||||
# Compare uppercase status (specifically, DEFER) with | # Compare uppercase status (specifically, DEFER) with | ||||
# lowercase (plugin names). | # lowercase (plugin names). | ||||
# | # | ||||
# The messages in DEFER are supposed to be picked up by | # The messages in DEFER are supposed to be picked up by | ||||
# another thread, whereas the messages in other directories | # another thread, whereas the messages in other directories | ||||
# are pending being handled by their respective plugins. | # are pending being handled by their respective plugins. | ||||
# | # | ||||
# TODO: Handle messages in spool directories for which a | # TODO: Handle messages in spool directories for which a | ||||
# plugin had been enabled, but is not enabled any longer. | # plugin had been enabled, but is not enabled any longer. | ||||
# | # | ||||
if module.lower() == "defer": | if module.lower() == "defer": | ||||
# Wallace was unable to deliver to re-injection smtpd. | # Wallace was unable to deliver to re-injection smtpd. | ||||
# Skip it, another thread is picking up the deferred | # Skip it, another thread is picking up the deferred | ||||
# messages. | # messages. | ||||
continue | continue | ||||
stage = root.replace(pickup_path, '').split('/') | stage = root.replace(pickup_path, '').split('/') | ||||
if len(stage) < 2: | if len(stage) < 2: | ||||
stage = None | stage = None | ||||
else: | else: | ||||
stage = stage[1] | stage = stage[1] | ||||
if stage.lower() == "hold": | if stage.lower() == "hold": | ||||
continue | continue | ||||
# Do not handle messages in a defer state. | # Do not handle messages in a defer state. | ||||
if stage.lower() == "defer": | if stage.lower() == "defer": | ||||
continue | continue | ||||
self.current_connections += 1 | self.current_connections += 1 | ||||
self.pool.apply_async(pickup_message, (filepath, (self.modules), {'module': module, 'stage': stage})) | |||||
self.current_connections -= 1 | |||||
continue | if sync: | ||||
pickup_message(filepath, self.modules, module=module, stage=stage) | |||||
else: | |||||
self.pool.apply_async( | |||||
pickup_message, | |||||
( | |||||
filepath, | |||||
(self.modules), | |||||
{'module': module, 'stage': stage} | |||||
) | |||||
) | |||||
self.current_connections += 1 | |||||
self.pool.apply_async(pickup_message, (filepath, (self.modules))) | |||||
self.current_connections -= 1 | self.current_connections -= 1 | ||||
# start background process to run periodic jobs in active modules | continue | ||||
self.heartbeat = multiprocessing.Process(target=modules_heartbeat, args=[self.modules]) | |||||
self.heartbeat.daemon = True | |||||
self.heartbeat.start() | |||||
try: | |||||
while 1: | |||||
while self.current_connections >= self.max_connections: | |||||
time.sleep(0.5) | |||||
pair = s.accept() | |||||
log.info(_("Accepted connection")) | |||||
if not pair == None: | |||||
self.current_connections += 1 | self.current_connections += 1 | ||||
connection, address = pair | |||||
channel = SMTPChannel(self, connection, address) | |||||
asyncore.loop() | |||||
except Exception, errmsg: | if sync: | ||||
traceback.print_exc() | pickup_message(filepath, self.modules) | ||||
s.shutdown(1) | else: | ||||
s.close() | self.pool.apply_async(pickup_message, (filepath, (self.modules))) | ||||
# shut down hearbeat process | |||||
self.heartbeat.terminate() | |||||
def data_header(self, mailfrom, rcpttos): | self.current_connections -= 1 | ||||
COMMASPACE = ', ' | |||||
return "X-Kolab-From: " + mailfrom + "\r\n" + \ | |||||
"X-Kolab-To: " + COMMASPACE.join(rcpttos) + "\r\n" | |||||
def process_message(self, peer, mailfrom, rcpttos, data): | def process_message(self, peer, mailfrom, rcpttos, data): | ||||
""" | """ | ||||
We have retrieved the message. This should be as fast as possible, | We have retrieved the message. This should be as fast as possible, | ||||
and not ever block. | and not ever block. | ||||
""" | """ | ||||
header = self.data_header(mailfrom, rcpttos) | header = self.data_header(mailfrom, rcpttos) | ||||
▲ Show 20 Lines • Show All 179 Lines • Show Last 20 Lines |