diff --git a/docker/utils/rootfs/opt/app-root/src/kolabendpointtester.py b/docker/utils/rootfs/opt/app-root/src/kolabendpointtester.py
index fb2c244b..43b9a3b9 100755
--- a/docker/utils/rootfs/opt/app-root/src/kolabendpointtester.py
+++ b/docker/utils/rootfs/opt/app-root/src/kolabendpointtester.py
@@ -1,706 +1,816 @@
#!/bin/env python3
"""
kolabendpointtester.py
--host apps.kolabnow.com
--user user@kolab.org
--password Secret
--dav https://apps.kolabnow.com
--fb https://apps.kolabnow.com/calendars/user@kolab.org/6f552d35-95c4-41f6-a7d2-cfd02dd867db
"""
import sys
import traceback
import socket
import ssl
import argparse
from base64 import b64encode
import http.client
import urllib.parse
from imaplib import IMAP4
from imaplib import IMAP4_SSL
from smtplib import SMTP
from smtplib import SMTP_SSL
import dns.resolver
+import re
# print('\033[31m' + 'some red text')
RED='\033[31m'
GREEN='\033[32m'
RESET='\033[39m'
SSLNOVERIFY = True
def print_error(msg):
print(RED + f"=> ERROR: {msg}")
print(RESET) # and reset to default color
def print_success(msg):
print(GREEN + f"=> {msg}")
print(RESET) # and reset to default color
def print_assertion_failure():
"""
Print an error message about a failed assertion
"""
_, _, trace = sys.exc_info()
tb_info = traceback.extract_tb(trace)
_filename, line, _func, text = tb_info[-1]
print(f" ERROR assertion on line {line} failed on {text}")
def http_request(url, method, params=None, headers=None, body=None, verbose=False):
"""
Perform an HTTP request.
"""
parsed_url = urllib.parse.urlparse(url)
# print("Connecting to ", parsed_url.netloc)
if url.startswith('https://'):
conn = http.client.HTTPSConnection(parsed_url.netloc, 443, context = (ssl._create_unverified_context() if SSLNOVERIFY else None))
else:
conn = http.client.HTTPConnection(parsed_url.netloc, 80)
if verbose:
conn.set_debuglevel(9)
conn.connect()
if params is None:
params = {}
if headers is None:
headers = {
"Content-Type": "application/x-www-form-urlencoded; charset=utf-8"
}
if body is None:
body = urllib.parse.urlencode(params)
# Assemble a relative url
url = urllib.parse.urlunsplit(["", "", parsed_url.path, parsed_url.query, parsed_url.fragment])
print(f"Requesting {url} From {parsed_url.netloc} Using {method}")
conn.request(method, url, body, headers)
response = conn.getresponse()
# Handle redirects
if response.status in (301, 302,):
print("Following redirect ", response.getheader('location', ''))
return http_request(
urllib.parse.urljoin(url, response.getheader('location', '')),
method,
params,
headers,
body,
verbose)
return response
def basic_auth_headers(username, password):
user_and_pass = b64encode(
f"{username}:{password}".encode("ascii")
).decode("ascii")
return {
"Authorization": "Basic {}".format(user_and_pass)
}
def try_get(name, url, verbose, headers = None, body = None):
try:
response = http_request(
url,
"GET",
None,
headers,
body,
verbose
)
success = response.status == 200
except http.client.RemoteDisconnected:
print("Remote disconnected")
print_error(f"{name} is not available")
return False
if not success:
print_error(f"{name} is not available")
if verbose or not success:
print(" ", "Status", response.status)
print(" ", response.read().decode())
return success
def test_caldav_redirect(host, username, password, verbose):
return discover_principal("https://" + host + "/.well-known/caldav", username, password, verbose)
def discover_principal(url, username, password, verbose):
body = ''
headers = {
"Content-Type": "application/xml; charset=utf-8",
"Depth": "0",
**basic_auth_headers(username, password)
}
try:
response = http_request(
f"{url}",
"PROPFIND",
None,
headers,
body,
verbose
)
except http.client.RemoteDisconnected:
print_error(f"Caldav is not available at {url} (Remote disconnected)")
return False
success = response.status == 207
if not success:
print_error(f"Caldav is not available at {url} (status != 207)")
if verbose or not success:
print(" ", "Status", response.status)
print(" ", response.read().decode())
return success
def test_freebusy_authenticated(url, username, password, verbose = False):
# Request our own freebusy authenticated
return try_get("Authenticated Freebusy", f"{url}/{username}.ifb", verbose, headers = basic_auth_headers(username, password))
def test_freebusy_unauthenticated(url, username, password, verbose = False):
return try_get("Unauthenticated Freebusy", f"{url}/{username}.ifb", verbose)
def test_autoconfig(host, username, password, verbose = False):
if not try_get("Autoconf .well-known", f"https://{host}/.well-known/autoconfig/mail/config-v1.1.xml?emailaddress={username}", verbose):
return False
if not try_get("Autoconf /mail", f"https://{host}/mail/config-v1.1.xml?emailaddress={username}", verbose):
return False
return True
# TODO
# def test_007_well_known_outlook():
# body = '''
#
# admin@example.local
#
# http://schemas.microsoft.com/exchange/autodiscover/outlook/responseschema/2006a
#
#
# '''
# headers = {
# "Content-Type": "text/xml; charset=utf-8"
# }
# response = http_post(
# "https://kolab-vanilla.{}.local/autodiscover/autodiscover.xml".format(hostname),
# None,
# headers,
# body
# )
# assert response.status == 200
# data = response.read()
# decoded = codecs.decode(data)
# # Sanity check of the data
# assert 'example.local' in decoded
# assert "admin@example.local" in decoded
# # Ensure the alternative urls also work
# assert http_post(
# "https://kolab-vanilla.{}.local/Autodiscover/Autodiscover.xml".format(hostname),
# None,
# headers,
# body
# ).status == 200
# assert http_post(
# "https://kolab-vanilla.{}.local/AutoDiscover/AutoDiscover.xml".format(hostname),
# None,
# headers,
# body
# ).status == 200
def test_autodiscover_activesync(host, activesynchost, username, password, verbose = False):
"""
We expect something along the lines of
User Name
user@example.com
MobileSync
https://kolab.example.com/Microsoft-Server-ActiveSync
https://kolab.example.com/Microsoft-Server-ActiveSync
"""
body = f'''
{username}
http://schemas.microsoft.com/exchange/autodiscover/mobilesync/responseschema/2006
'''
headers = {
"Content-Type": "text/xml; charset=utf-8",
**basic_auth_headers(username, password)
}
try:
response = http_request(
f"https://{host}/autodiscover/autodiscover.xml",
"POST",
None,
headers,
body,
verbose
)
except http.client.RemoteDisconnected:
print("Remote disconnected")
print_error("Activesync autodiscover is not available")
return False
success = response.status == 200
data = response.read().decode()
if success:
try:
# Sanity check of the data
assert "MobileSync" in data
assert f"https://{activesynchost}/Microsoft-Server-ActiveSync" in data
assert username in data
except AssertionError:
print(data)
print_assertion_failure()
success = False
if not success:
print_error("Activesync autodiscover is not available")
if verbose or not success:
print(" ", "Status", response.status)
print(" ", data)
return success
def test_activesync(host, username, password, verbose = False):
headers = {
"Host": host,
**basic_auth_headers(username, password)
}
try:
response = http_request(
f"https://{host}/Microsoft-Server-ActiveSync",
"OPTIONS",
None,
headers,
None,
verbose
)
except http.client.RemoteDisconnected:
print("Remote disconnected")
print_error("Activesync is not available")
return False
success = response.status == 200
data = response.read().decode()
if success:
try:
assert response.getheader('MS-Server-ActiveSync', '')
assert '14.1' in response.getheader('MS-ASProtocolVersions', '')
assert 'FolderSync' in response.getheader('MS-ASProtocolCommands', '')
except AssertionError:
print_assertion_failure()
success = False
if not success:
print_error("Activesync is not available")
if verbose or not success:
print(" ", "Status", response.status)
print(" ", data)
return success
def test_dns(host, verbose = False):
success = True
try:
answers = dns.resolver.resolve(host, 'MX')
for rdata in answers:
print(' MX Host', rdata.exchange, 'has preference', rdata.preference)
except dns.resolver.NXDOMAIN:
success = False
print(" ERROR on MX record")
except dns.resolver.NoAnswer:
success = False
print(" ERROR on MX record")
try:
answers = dns.resolver.resolve(f"autodiscover.{host}", 'CNAME')
for rdata in answers:
print(' autodiscover CNAME', rdata.target)
except dns.resolver.NXDOMAIN:
success = False
print(f" ERROR on autodiscover.{host} CNAME entry")
except dns.resolver.NoAnswer:
success = False
print(f" ERROR on autodiscover.{host} CNAME entry")
srv_records = [
f"_autodiscover._tcp.{host}",
f"_caldav._tcp.{host}",
f"_caldavs._tcp.{host}",
f"_carddav._tcp.{host}",
f"_carddavs._tcp.{host}",
f"_imap._tcp.{host}",
f"_imaps._tcp.{host}",
f"_sieve._tcp.{host}",
f"_submission._tcp.{host}",
f"_webdav._tcp.{host}",
f"_webdavs._tcp.{host}",
]
for record in srv_records:
try:
answers = dns.resolver.resolve(record, 'SRV')
for rdata in answers:
print(" ", record, rdata.target)
except dns.resolver.NXDOMAIN:
success = False
print(" ERROR on record", record)
except dns.resolver.NoAnswer:
success = False
print(" ERROR on record", record)
if not success:
- print_error(f"Dns entires on {host} not available")
+ print_error(f"Dns entries on {host} not available")
return success
def test_email_dns(host, verbose = False):
success = True
srv_records = [
f"_autodiscover._tcp.{host}"
]
for record in srv_records:
try:
answers = dns.resolver.resolve(record, 'SRV')
for rdata in answers:
print(" ", record, rdata.target)
except dns.resolver.NXDOMAIN:
success = False
print(" ERROR on record", record)
except dns.resolver.NoAnswer:
success = False
print(" ERROR on record", record)
if not success:
- print_error(f"Dns entires on {host} not available")
+ print_error(f"Dns entries on {host} not available")
return success
+def test_dmarc_dns(host, verbose = False):
+ success = True
+
+ try:
+ answers = dns.resolver.resolve(f"_dmarc.{host}", 'TXT')
+ for rdata in answers:
+ print(" _dmarc TXT", rdata)
+ except dns.resolver.NXDOMAIN:
+ success = False
+ print(" ERROR on _dmarc TXT")
+ except dns.resolver.NoAnswer:
+ success = False
+ print(" ERROR on _dmarc TXT")
+
+ if not success:
+ print_error(f"DMARC dns entries on {host} not available")
+
+ return success
+
+def validate_spf_record(rdata):
+ # Something like "v=spf1 mx a:kolab.klab.cc mx:kolab.klab.cc -all"
+ data = str(rdata)
+ if not data.startswith('"v=spf1'):
+ return False
+
+ if not data.endswith('-all"'):
+ return False
+
+ return True
+
+def test_spf_dns(host, verbose = False):
+ success = False
+
+ try:
+ answers = dns.resolver.resolve(f"{host}", 'TXT')
+ for rdata in answers:
+ if validate_spf_record(rdata):
+ print(" SPF TXT", rdata)
+ success = True
+ break
+ else:
+ print(f" ERROR while validating spf record {data}")
+
+ except dns.resolver.NXDOMAIN:
+ success = False
+ print(" ERROR on SPF TXT")
+ except dns.resolver.NoAnswer:
+ success = False
+ print(" ERROR on SPF TXT")
+
+ if not success:
+ print_error(f"SPF dns entry on {host} not available")
+
+ return success
+
+def validate_dkim_record(rdata):
+ data = str(rdata)
+ # Re-assemble the quoted string
+ data = "".join(re.findall('"([^"]*)"', data))
+ print(data)
+ if not data.startswith("v=DKIM1;"):
+ return False
+ return True
+
+def test_dkim_dns(host, selector, verbose = False):
+ success = False
+
+ try:
+ answers = dns.resolver.resolve(f"{selector}._domainkey.{host}", 'TXT')
+ for rdata in answers:
+ print(f" DKIM {selector}", rdata)
+ if validate_dkim_record(rdata):
+ success = True
+ else:
+ print(f" ERROR while validating dkim key {data}")
+ except dns.resolver.NXDOMAIN:
+ success = False
+ print(" ERROR on DKIM TXT")
+ except dns.resolver.NoAnswer:
+ success = False
+ print(" ERROR on DKIM TXT")
+
+ if not success:
+ print_error(f"DKIM dns entries on {host} not available")
+
+ return success
def test_imap(host, user, password, verbose):
success = True
hosts = [
(host, 993, True, False),
# (host, 143, False, True),
]
for hosttuple in hosts:
if verbose:
print("Connecting to ", hosttuple)
host, port, usessl, starttls = hosttuple
try:
if usessl:
imap = IMAP4_SSL(host=host, port=port)
else:
imap = IMAP4(host=host, port=port)
if starttls:
imap.starttls()
imap.login(user, password)
status, list_response = imap.list()
assert status == 'OK'
for folder in list_response:
if 'INBOX' in folder.decode('utf-8'):
inbox_found = True
assert inbox_found
except AssertionError as err:
print(" ERROR on peer", hosttuple, err)
success = False
except Exception as err: # pylint: disable=broad-except
print(" ERROR on peer", hosttuple, err)
success = False
if not success:
print_error("IMAP failed")
return success
def test_smtp(host, user, password, verbose):
success = True
hosts = [
(host, 465, True, False),
(host, 587, False, True),
]
for hosttuple in hosts:
if verbose:
print("Connecting to ", hosttuple)
host, port, usessl, starttls = hosttuple
try:
if usessl:
smtp = SMTP_SSL(host=host, port=port)
else:
smtp = SMTP(host=host, port=port)
if starttls:
smtp.starttls()
# check we have an open socket
assert smtp.sock
# run a no-operation, which is basically a server-side pass-through
status, _response = smtp.noop()
assert status == 250
status, _response = smtp.login(user, password)
assert status == 235
status, _response = smtp.quit()
assert status == 221
except AssertionError as err:
print(" ERROR on peer", hosttuple, err)
success = False
except Exception as err: # pylint: disable=broad-except
print(" ERROR on peer", hosttuple, err)
success = False
if not success:
print_error("SMTP failed")
return success
def test_certificates(host, davhost, imaphost, verbose):
success = True
hosts = [
(host, 443),
]
if davhost:
hosts.append((urllib.parse.urlparse(davhost).netloc, 443))
if imaphost:
hosts.append((imaphost, 993))
hosts.append((imaphost, 465))
context = ssl.create_default_context()
for hosttuple in hosts:
hostname, _port = hosttuple
try:
conn = context.wrap_socket(socket.socket(socket.AF_INET), server_hostname=hostname)
conn.connect(hosttuple)
cert = conn.getpeercert()
if verbose:
print(f"Certificate for {hosttuple}: {cert}")
except OSError as err:
print(" ERROR on peer", hosttuple, err)
success = False
if not success:
print_error("Not all certificates are valid")
return success
def test_meet(host, verbose):
headers = {
"Host": host
}
try:
response = http_request(
f"https://{host}/meetmedia/signaling",
"OPTIONS",
None,
headers,
None,
verbose
)
except http.client.RemoteDisconnected:
print("Remote disconnected")
print_error("Activesync is not available")
return False
success = response.status == 200
data = response.read().decode()
if success:
try:
assert "Transport unknown" in data
except AssertionError:
print_assertion_failure()
success = False
if not success:
print_error("Meet signaling is not available")
if verbose or not success:
print(" ", "Status", response.status)
print(" ", data)
return success
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--host", help="Host")
parser.add_argument("--username", help="Username")
parser.add_argument("--password", help="User password")
parser.add_argument("--imap", help="IMAP URI")
parser.add_argument("--smtp", help="SMTP URI")
parser.add_argument("--dav", help="DAV URI")
parser.add_argument("--meet", help="MEET URI")
parser.add_argument("--autoconfig", help="Check autoconfig")
parser.add_argument("--dns", action='store_true', help="Check dns")
+ parser.add_argument("--dkim", help="Check DKIM dns record")
parser.add_argument("--activesync", help="ActiveSync URI")
parser.add_argument("--certificates", action='store_true', help="Check Certificates")
parser.add_argument("--fb", help="Freebusy url as displayed in roundcube")
parser.add_argument("--verbose", action='store_true', help="Verbose output")
parser.add_argument("--default", action='store_true', help="Standard checks with only username and password")
options = parser.parse_args()
error = False
+ # Maybe structure it like this.
+ # Then print report, and upload results to prometheus
+ # [testName, testEnabled, testLambda] => [testName, testResult]
+
if options.default:
options.host = options.username.split('@')[1]
options.dav = "https://" + options.host + "/.well-known/caldav"
options.imap = options.host
options.smtp = options.host
options.activesync = options.host
options.certificates = True
if options.dav:
if discover_principal(options.dav, options.username, options.password, options.verbose):
print_success("Caldav is available")
else:
error = True
if options.host:
if test_caldav_redirect(options.host, options.username, options.password, options.verbose):
print_success("Caldav on .well-known/caldav is available")
# Kolabnow doesn't support this atm (it offers the redirect on apps.kolabnow.com),
# so we ignore the error for now
if options.autoconfig:
if test_autoconfig(options.host, options.username, options.password, options.verbose):
print_success("Autoconf available")
else:
error = True
if options.activesync:
if options.autoconfig:
if test_autodiscover_activesync(options.host, options.activesync, options.username, options.password, options.verbose):
print_success("Activesync Autodsicovery available")
# Kolabnow doesn't support this, so we ignore the error for now
if test_activesync(options.activesync, options.username, options.password, options.verbose):
print_success("Activesync available")
else:
error = True
if options.fb:
if test_freebusy_authenticated(options.fb, options.username, options.password, options.verbose):
print_success("Authenticated Freebusy is available")
else:
error = True
# We rely on the activesync test to have generated the token for unauthenticated access.
if test_freebusy_unauthenticated(options.fb, options.username, options.password, options.verbose):
print_success("Unauthenticated Freebusy is available")
else:
error = True
if options.dns:
if test_dns(options.host, options.verbose):
- print(f"=> DNS entries on {options.host} available")
+ print_success(f"DNS entries on {options.host} available")
+ else:
+ error = True
+
+ if test_dmarc_dns(options.host, options.verbose):
+ print_success(f"DMARC DNS entries on {options.host} available")
+ else:
+ error = True
+
+ if test_spf_dns(options.host, options.verbose):
+ print_success(f"SPF DNS entries on {options.host} available")
else:
error = True
userhost = options.username.split('@')[1]
if test_email_dns(userhost, options.verbose):
- print(f"=> DNS entries on {userhost} available")
+ print_success(f"User DNS entries on {userhost} available")
+ else:
+ error = True
+
+ if options.dkim:
+ if test_dkim_dns(options.host, options.dkim, options.verbose):
+ print_success(f"DKIM DNS entries on {options.host} available")
else:
error = True
if options.certificates:
if test_certificates(options.host, options.dav, options.imap, options.verbose):
print_success("All certificates are valid")
else:
error = True
if options.imap:
if test_imap(options.imap, options.username, options.password, options.verbose):
print_success("IMAP is available")
else:
error = True
if options.smtp:
if test_smtp(options.smtp, options.username, options.password, options.verbose):
print_success("SMTP is available")
else:
error = True
if options.meet:
if test_meet(options.meet, options.verbose):
print_success("Meet is available")
else:
error = True
+ # Push result to prometheus
+
if error:
print_error("At least one check failed")
sys.exit(1)
if __name__ == "__main__":
main()
diff --git a/docker/utils/rootfs/opt/app-root/src/mailtransporttest.py b/docker/utils/rootfs/opt/app-root/src/mailtransporttest.py
index 841dc899..f57a458a 100755
--- a/docker/utils/rootfs/opt/app-root/src/mailtransporttest.py
+++ b/docker/utils/rootfs/opt/app-root/src/mailtransporttest.py
@@ -1,135 +1,184 @@
#!/bin/env python3
"""
Send an email via SMTP and then look for it via IMAP.
- ./mailtransporttest.py --sender-username test1@kolab.org --sender-password foobar --sender-host smtp.kolabnow.com --recipient-username test2@kolab.org --recipient-password foobar --recipient-host imap.kolabnow.com
+ ./mailtransporttest.py --sender-username test1@kolab.org --sender-password foobar --sender-host smtp.kolabnow.com --recipient-username test2@kolab.org --recipient-password foobar --recipient-host imap.kolabnow.com --validate
"""
from datetime import datetime
import argparse
import sys
import imaplib
import smtplib
import uuid
import time
mailtemplate = '''
MIME-Version: 1.0
Date: {date}
From: {sender}
To: {to}
Subject: {subject}
Message-ID: {messageid}
Content-Transfer-Encoding: 7bit
Content-Type: text/plain; charset=US-ASCII
{body}
'''.strip()
class SendTest:
def __init__(self, options):
self.recipient_host = options.recipient_host
self.recipient_port = options.recipient_port
self.recipient_username = options.recipient_username
self.recipient_password = options.recipient_password
self.sender_host = options.sender_host
self.sender_port = options.sender_port
self.sender_username = options.sender_username
self.sender_password = options.sender_password
self.target_address = options.target_address
self.body = options.body
self.verbose = options.verbose
+ self.validate = options.validate
self.uuid = str(uuid.uuid4())
self.subject = f"Delivery Check {self.uuid}"
+ def validate_message(self, message):
+ import email.parser
+ import email.policy
+ msg = email.parser.BytesParser(policy=email.policy.default).parsebytes(message)
+ # print(msg)
+
+ if msg['DKIM-Signature']:
+ print("There is a DKIM-Signature.")
+
+ # DKIM validation status
+ # Authentication-Results: kolab.klab.cc (amavis); dkim=pass (2048-bit key)
+ # reason="pass (just generated, assumed good)" header.d=kolab.klab.cc
+ if "dkim=pass" not in (msg['Authentication-Results'] or ""):
+ print("Failed to validate Authentication-Results header:")
+ print(msg['Authentication-Results'])
+ return False
+
+ if "NO" not in (msg['X-Spam-Flag'] or ""):
+ print("Test email is flagged as spam or header is missing")
+ print("Existing header: " + str(msg['X-Spam-Flag']))
+ return False
+
+ if "NO" not in (msg['X-Virus-Scanned'] or ""):
+ print("Message was virus scanned: " + msg['X-Virus-Scanned'])
+
+ # Ensure SPF record matches a received line?
+ # Suggest SPF record ip (sender ip)
+ # Validate DKIM-Signature according to DNS entry
+ # Calculate delay using Date header?
+ # These could all be statistics for prometheus
+
+ return True
+
def check_for_mail(self):
print(f"Checking for uuid {self.uuid}")
imap = imaplib.IMAP4_SSL(host=self.recipient_host, port=self.recipient_port)
if self.verbose:
imap.debug = 4
imap.login(self.recipient_username, self.recipient_password)
imap.select("INBOX")
# FIXME This seems to find emails that are not there
if self.body:
typ, data = imap.search(None, 'BODY', self.uuid)
else:
typ, data = imap.search(None, 'SUBJECT', self.uuid)
+
+
for num in data[0].split():
print(f"Found the mail with uid {num}")
+
+ if self.validate:
+ typ, data = imap.fetch(num, "(RFC822)")
+ message = data[0][1]
+ if not self.validate_message(message):
+ print("Failed to validate the message.")
+ print(message.decode())
+ sys.exit(1)
+
+
imap.store(num, '+FLAGS', '\\Deleted')
imap.expunge()
return True
return False
def send_mail(self, starttls):
dtstamp = datetime.utcnow()
if self.target_address:
to = self.target_address
else:
to = self.recipient_username
print(f"Sending email to {to}")
msg = mailtemplate.format(
messageid="<{}@deliverycheck.org>".format(self.uuid),
subject=self.subject,
sender=self.sender_username,
to=to,
date=dtstamp.strftime("%a, %d %b %Y %H:%M:%S %z"),
body=self.body,
)
if starttls:
with smtplib.SMTP(host=self.sender_host, port=self.sender_port or 587) as smtp:
smtp.starttls()
smtp.ehlo()
smtp.login(self.sender_username, self.sender_password)
smtp.noop()
smtp.sendmail(self.sender_username, to, msg)
print(f"Email with uuid {self.uuid} sent")
else:
with smtplib.SMTP_SSL(host=self.sender_host, port=self.sender_port or 465) as smtp:
smtp.login(self.sender_username, self.sender_password)
smtp.noop()
smtp.sendmail(self.sender_username, to, msg)
print(f"Email with uuid {self.uuid} sent")
parser = argparse.ArgumentParser(description='Mail transport tests.')
parser.add_argument('--sender-username', help='The SMTP sender username')
parser.add_argument('--sender-password', help='The SMTP sender password')
parser.add_argument('--sender-host', help='The SMTP sender host')
-parser.add_argument('--sender-port', help='The SMTP sender port', default=993)
+parser.add_argument('--sender-port', help='The SMTP sender port (defaults to 465/587)')
parser.add_argument('--recipient-username', help='The IMAP recipient username')
parser.add_argument('--recipient-password', help='The IMAP recipient password')
parser.add_argument('--recipient-host', help='The IMAP recipient host')
-parser.add_argument('--recipient-port', help='The IMAP recipient port (defaults to 465/587)')
+parser.add_argument('--recipient-port', help='The IMAP recipient port', default=993)
parser.add_argument('--timeout', help='Timeout in minutes', type=int, default=10)
-parser.add_argument("--starttls", action='store_true', help="Use starttls over 587")
-parser.add_argument("--verbose", action='store_true', help="Use starttls over 587")
+parser.add_argument("--starttls", action='store_true', help="Use SMTP starttls over 587")
+parser.add_argument("--verbose", action='store_true', help="Verbose mode")
parser.add_argument("--target-address", help="Target address instead of the recipient username")
parser.add_argument("--body", help="Body text to include")
+parser.add_argument("--validate", action='store_true', help="Validate the received message")
args = parser.parse_args()
obj = SendTest(args)
obj.send_mail(args.starttls)
timeout = 10
for i in range(1, round(args.timeout * 60 / timeout) + 1):
if obj.check_for_mail():
print("Success!")
+ # TODO print statistics? Push statistics directly someplace?
sys.exit(0)
print(f"waiting for {timeout}")
time.sleep(timeout)
+# TODO print statistics? Push statistics directly someplace?
print("Failed to find the mail")
sys.exit(1)