Changeset View
Changeset View
Standalone View
Standalone View
wallace/__init__.py
Show All 20 Lines | |||||
import asyncore | import asyncore | ||||
from distutils import version | from distutils import version | ||||
import grp | import grp | ||||
import multiprocessing | import multiprocessing | ||||
import os | import os | ||||
import pwd | import pwd | ||||
import traceback | import traceback | ||||
from smtpd import SMTPChannel | import smtpd | ||||
import socket | import socket | ||||
import struct | import struct | ||||
import sys | import sys | ||||
import tempfile | import tempfile | ||||
from threading import _Timer | from threading import _Timer | ||||
import time | import time | ||||
import pykolab | import pykolab | ||||
from pykolab import utils | from pykolab import utils | ||||
from pykolab.translate import _ | from pykolab.translate import _ | ||||
from modules import cb_action_ACCEPT | from modules import cb_action_ACCEPT | ||||
# pylint: disable=invalid-name | # pylint: disable=invalid-name | ||||
log = pykolab.getLogger('pykolab.wallace') | log = pykolab.getLogger('pykolab.wallace') | ||||
sys.stderr = pykolab.logger.StderrToLogger(log) | |||||
conf = pykolab.getConf() | conf = pykolab.getConf() | ||||
def pickup_message(filepath, *args, **kwargs): | def pickup_message(filepath, *args, **kwargs): | ||||
wallace_modules = args[0] | wallace_modules = args[0] | ||||
if 'module' in kwargs: | if 'module' in kwargs: | ||||
▲ Show 20 Lines • Show All 70 Lines • ▼ Show 20 Lines | |||||
# pylint: disable=too-few-public-methods | # pylint: disable=too-few-public-methods | ||||
class Timer(_Timer): | class Timer(_Timer): | ||||
def run(self): | def run(self): | ||||
while True: | while True: | ||||
while not self.finished.is_set(): | while not self.finished.is_set(): | ||||
self.finished.wait(self.interval) | self.finished.wait(self.interval) | ||||
log.debug(_("Timer looping function '%s' every %ss") % ( | |||||
self.function.__name__, | |||||
self.interval | |||||
), level=8) | |||||
self.function(*self.args, **self.kwargs) | self.function(*self.args, **self.kwargs) | ||||
self.finished.set() | self.finished.set() | ||||
log.debug(_("Timer loop %s") % ('still active','finished')[self.finished.is_set()], level=8) | |||||
break | |||||
class WallaceDaemon: | class WallaceDaemon: | ||||
def __init__(self): | def __init__(self): | ||||
self.current_connections = 0 | self.current_connections = 0 | ||||
self.max_connections = 24 | self.max_connections = 24 | ||||
self.parent_pid = None | self.parent_pid = None | ||||
self.pool = None | self.pool = None | ||||
Show All 29 Lines | def __init__(self): | ||||
dest="max_threads", | dest="max_threads", | ||||
action="store", | action="store", | ||||
default=4, | default=4, | ||||
type=int, | type=int, | ||||
help=_("Number of threads to use.") | help=_("Number of threads to use.") | ||||
) | ) | ||||
daemon_group.add_option( | daemon_group.add_option( | ||||
"--max-tasks", | |||||
dest = "max_tasks", | |||||
action = "store", | |||||
default = 10, | |||||
type = int, | |||||
help = _("Number of tasks per process.") | |||||
) | |||||
daemon_group.add_option( | |||||
"-p", "--pid-file", | "-p", "--pid-file", | ||||
dest="pidfile", | dest="pidfile", | ||||
action="store", | action="store", | ||||
default="/var/run/wallaced/wallaced.pid", | default="/var/run/wallaced/wallaced.pid", | ||||
help=_("Path to the PID file to use.") | help=_("Path to the PID file to use.") | ||||
) | ) | ||||
daemon_group.add_option( | daemon_group.add_option( | ||||
Show All 33 Lines | def __init__(self): | ||||
self.modules = conf.get_list('wallace', 'modules') | self.modules = conf.get_list('wallace', 'modules') | ||||
if not self.modules: | if not self.modules: | ||||
self.modules = [] | self.modules = [] | ||||
def do_wallace(self): | def do_wallace(self): | ||||
self.parent_pid = os.getpid() | self.parent_pid = os.getpid() | ||||
if version.StrictVersion(sys.version[:3]) >= version.StrictVersion("2.7"): | if version.StrictVersion(sys.version[:3]) >= version.StrictVersion("2.7"): | ||||
self.pool = multiprocessing.Pool(conf.max_threads, worker_process, (), 1) | self.pool = multiprocessing.Pool(conf.max_threads, worker_process, (), conf.max_tasks) | ||||
else: | else: | ||||
self.pool = multiprocessing.Pool(conf.max_threads, worker_process, ()) | self.pool = multiprocessing.Pool(conf.max_threads, worker_process, ()) | ||||
self.pickup_spool_messages(sync=True) | self.pickup_spool_messages(sync=True) | ||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | ||||
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | ||||
▲ Show 20 Lines • Show All 54 Lines • ▼ Show 20 Lines | def do_wallace(self): | ||||
log.debug( | log.debug( | ||||
"Wallace heartbeat is %s" % ('not alive', 'alive')[self.heartbeat.is_alive()], | "Wallace heartbeat is %s" % ('not alive', 'alive')[self.heartbeat.is_alive()], | ||||
level=8 | level=8 | ||||
) | ) | ||||
try: | try: | ||||
while 1: | while 1: | ||||
while self.current_connections >= self.max_connections: | while self.current_connections >= self.max_connections: | ||||
log.debug("Out of connections.") | log.debug(_("Reached limit of max connections of: %s. Sleeping for 0.5s") % self.max_connections, level=6) | ||||
time.sleep(0.5) | time.sleep(0.5) | ||||
pair = s.accept() | pair = s.accept() | ||||
log.info(_("Accepted connection")) | log.debug(_("Accepted connection %r with address %r") % (pair if pair is not None else (None, None)), level=8) | ||||
if pair is not None: | if pair is not None: | ||||
self.current_connections += 1 | self.current_connections += 1 | ||||
connection, address = pair | connection, address = pair | ||||
SMTPChannel(self, connection, address) | |||||
_smtpd = smtpd | |||||
# Set DEBUGSTREAM of smtpd to log to pykolab logger | |||||
if conf.debuglevel > 8: | |||||
_smtpd.DEBUGSTREAM = pykolab.logger.StderrToLogger(log) | |||||
log.debug(_("Creating SMTPChannel for accepted message"), level=8) | |||||
channel = _smtpd.SMTPChannel(self, connection, address) | |||||
asyncore.loop() | asyncore.loop() | ||||
else: | |||||
log.error(_("Socket accepted, but (conn, address) tuple is None.")) | |||||
# pylint: disable=broad-except | # pylint: disable=broad-except | ||||
except Exception: | except Exception: | ||||
traceback.print_exc() | traceback.print_exc() | ||||
s.shutdown(1) | s.shutdown(1) | ||||
s.close() | s.close() | ||||
# shut down hearbeat process | # shut down hearbeat process | ||||
▲ Show 20 Lines • Show All 108 Lines • ▼ Show 20 Lines | def process_message(self, peer, mailfrom, rcpttos, data): | ||||
# @TODO: and add line separator (\n or \r\n?) | # @TODO: and add line separator (\n or \r\n?) | ||||
# we should make sure there's only one line separator between | # we should make sure there's only one line separator between | ||||
# kolab headers and the original message (data) | # kolab headers and the original message (data) | ||||
os.write(fp, header) | os.write(fp, header) | ||||
os.write(fp, data) | os.write(fp, data) | ||||
os.close(fp) | os.close(fp) | ||||
log.debug(_("Started processing accepted message %s") % filename, level=8) | |||||
self.pool.apply_async(pickup_message, (filename, (self.modules))) | self.pool.apply_async(pickup_message, (filename, (self.modules))) | ||||
self.current_connections -= 1 | self.current_connections -= 1 | ||||
return "250 OK Message %s queued" % (filename) | return "250 OK Message %s queued" % (filename) | ||||
def reload_config(self, *args, **kwargs): | def reload_config(self, *args, **kwargs): | ||||
pass | pass | ||||
▲ Show 20 Lines • Show All 197 Lines • Show Last 20 Lines |