diff --git a/docker/utils/rootfs/opt/app-root/src/kolabendpointtester.py b/docker/utils/rootfs/opt/app-root/src/kolabendpointtester.py
index f55ac1ff..43b9a3b9 100755
--- a/docker/utils/rootfs/opt/app-root/src/kolabendpointtester.py
+++ b/docker/utils/rootfs/opt/app-root/src/kolabendpointtester.py
@@ -1,810 +1,816 @@
#!/bin/env python3
"""
kolabendpointtester.py
--host apps.kolabnow.com
--user user@kolab.org
--password Secret
--dav https://apps.kolabnow.com
--fb https://apps.kolabnow.com/calendars/user@kolab.org/6f552d35-95c4-41f6-a7d2-cfd02dd867db
"""
import sys
import traceback
import socket
import ssl
import argparse
from base64 import b64encode
import http.client
import urllib.parse
from imaplib import IMAP4
from imaplib import IMAP4_SSL
from smtplib import SMTP
from smtplib import SMTP_SSL
import dns.resolver
import re
# print('\033[31m' + 'some red text')
RED='\033[31m'
GREEN='\033[32m'
RESET='\033[39m'
SSLNOVERIFY = True
def print_error(msg):
print(RED + f"=> ERROR: {msg}")
print(RESET) # and reset to default color
def print_success(msg):
print(GREEN + f"=> {msg}")
print(RESET) # and reset to default color
def print_assertion_failure():
"""
Print an error message about a failed assertion
"""
_, _, trace = sys.exc_info()
tb_info = traceback.extract_tb(trace)
_filename, line, _func, text = tb_info[-1]
print(f" ERROR assertion on line {line} failed on {text}")
def http_request(url, method, params=None, headers=None, body=None, verbose=False):
"""
Perform an HTTP request.
"""
parsed_url = urllib.parse.urlparse(url)
# print("Connecting to ", parsed_url.netloc)
if url.startswith('https://'):
conn = http.client.HTTPSConnection(parsed_url.netloc, 443, context = (ssl._create_unverified_context() if SSLNOVERIFY else None))
else:
conn = http.client.HTTPConnection(parsed_url.netloc, 80)
if verbose:
conn.set_debuglevel(9)
conn.connect()
if params is None:
params = {}
if headers is None:
headers = {
"Content-Type": "application/x-www-form-urlencoded; charset=utf-8"
}
if body is None:
body = urllib.parse.urlencode(params)
# Assemble a relative url
url = urllib.parse.urlunsplit(["", "", parsed_url.path, parsed_url.query, parsed_url.fragment])
print(f"Requesting {url} From {parsed_url.netloc} Using {method}")
conn.request(method, url, body, headers)
response = conn.getresponse()
# Handle redirects
if response.status in (301, 302,):
print("Following redirect ", response.getheader('location', ''))
return http_request(
urllib.parse.urljoin(url, response.getheader('location', '')),
method,
params,
headers,
body,
verbose)
return response
def basic_auth_headers(username, password):
user_and_pass = b64encode(
f"{username}:{password}".encode("ascii")
).decode("ascii")
return {
"Authorization": "Basic {}".format(user_and_pass)
}
def try_get(name, url, verbose, headers = None, body = None):
try:
response = http_request(
url,
"GET",
None,
headers,
body,
verbose
)
success = response.status == 200
except http.client.RemoteDisconnected:
print("Remote disconnected")
print_error(f"{name} is not available")
return False
if not success:
print_error(f"{name} is not available")
if verbose or not success:
print(" ", "Status", response.status)
print(" ", response.read().decode())
return success
def test_caldav_redirect(host, username, password, verbose):
return discover_principal("https://" + host + "/.well-known/caldav", username, password, verbose)
def discover_principal(url, username, password, verbose):
body = ''
headers = {
"Content-Type": "application/xml; charset=utf-8",
"Depth": "0",
**basic_auth_headers(username, password)
}
try:
response = http_request(
f"{url}",
"PROPFIND",
None,
headers,
body,
verbose
)
except http.client.RemoteDisconnected:
print_error(f"Caldav is not available at {url} (Remote disconnected)")
return False
success = response.status == 207
if not success:
print_error(f"Caldav is not available at {url} (status != 207)")
if verbose or not success:
print(" ", "Status", response.status)
print(" ", response.read().decode())
return success
def test_freebusy_authenticated(url, username, password, verbose = False):
# Request our own freebusy authenticated
return try_get("Authenticated Freebusy", f"{url}/{username}.ifb", verbose, headers = basic_auth_headers(username, password))
def test_freebusy_unauthenticated(url, username, password, verbose = False):
return try_get("Unauthenticated Freebusy", f"{url}/{username}.ifb", verbose)
def test_autoconfig(host, username, password, verbose = False):
if not try_get("Autoconf .well-known", f"https://{host}/.well-known/autoconfig/mail/config-v1.1.xml?emailaddress={username}", verbose):
return False
if not try_get("Autoconf /mail", f"https://{host}/mail/config-v1.1.xml?emailaddress={username}", verbose):
return False
return True
# TODO
# def test_007_well_known_outlook():
# body = '''
#
# admin@example.local
#
# http://schemas.microsoft.com/exchange/autodiscover/outlook/responseschema/2006a
#
#
# '''
# headers = {
# "Content-Type": "text/xml; charset=utf-8"
# }
# response = http_post(
# "https://kolab-vanilla.{}.local/autodiscover/autodiscover.xml".format(hostname),
# None,
# headers,
# body
# )
# assert response.status == 200
# data = response.read()
# decoded = codecs.decode(data)
# # Sanity check of the data
# assert 'example.local' in decoded
# assert "admin@example.local" in decoded
# # Ensure the alternative urls also work
# assert http_post(
# "https://kolab-vanilla.{}.local/Autodiscover/Autodiscover.xml".format(hostname),
# None,
# headers,
# body
# ).status == 200
# assert http_post(
# "https://kolab-vanilla.{}.local/AutoDiscover/AutoDiscover.xml".format(hostname),
# None,
# headers,
# body
# ).status == 200
def test_autodiscover_activesync(host, activesynchost, username, password, verbose = False):
"""
We expect something along the lines of
User Name
user@example.com
MobileSync
https://kolab.example.com/Microsoft-Server-ActiveSync
https://kolab.example.com/Microsoft-Server-ActiveSync
"""
body = f'''
{username}
http://schemas.microsoft.com/exchange/autodiscover/mobilesync/responseschema/2006
'''
headers = {
"Content-Type": "text/xml; charset=utf-8",
**basic_auth_headers(username, password)
}
try:
response = http_request(
f"https://{host}/autodiscover/autodiscover.xml",
"POST",
None,
headers,
body,
verbose
)
except http.client.RemoteDisconnected:
print("Remote disconnected")
print_error("Activesync autodiscover is not available")
return False
success = response.status == 200
data = response.read().decode()
if success:
try:
# Sanity check of the data
assert "MobileSync" in data
assert f"https://{activesynchost}/Microsoft-Server-ActiveSync" in data
assert username in data
except AssertionError:
print(data)
print_assertion_failure()
success = False
if not success:
print_error("Activesync autodiscover is not available")
if verbose or not success:
print(" ", "Status", response.status)
print(" ", data)
return success
def test_activesync(host, username, password, verbose = False):
headers = {
"Host": host,
**basic_auth_headers(username, password)
}
try:
response = http_request(
f"https://{host}/Microsoft-Server-ActiveSync",
"OPTIONS",
None,
headers,
None,
verbose
)
except http.client.RemoteDisconnected:
print("Remote disconnected")
print_error("Activesync is not available")
return False
success = response.status == 200
data = response.read().decode()
if success:
try:
assert response.getheader('MS-Server-ActiveSync', '')
assert '14.1' in response.getheader('MS-ASProtocolVersions', '')
assert 'FolderSync' in response.getheader('MS-ASProtocolCommands', '')
except AssertionError:
print_assertion_failure()
success = False
if not success:
print_error("Activesync is not available")
if verbose or not success:
print(" ", "Status", response.status)
print(" ", data)
return success
def test_dns(host, verbose = False):
success = True
try:
answers = dns.resolver.resolve(host, 'MX')
for rdata in answers:
print(' MX Host', rdata.exchange, 'has preference', rdata.preference)
except dns.resolver.NXDOMAIN:
success = False
print(" ERROR on MX record")
except dns.resolver.NoAnswer:
success = False
print(" ERROR on MX record")
try:
answers = dns.resolver.resolve(f"autodiscover.{host}", 'CNAME')
for rdata in answers:
print(' autodiscover CNAME', rdata.target)
except dns.resolver.NXDOMAIN:
success = False
print(f" ERROR on autodiscover.{host} CNAME entry")
except dns.resolver.NoAnswer:
success = False
print(f" ERROR on autodiscover.{host} CNAME entry")
srv_records = [
f"_autodiscover._tcp.{host}",
f"_caldav._tcp.{host}",
f"_caldavs._tcp.{host}",
f"_carddav._tcp.{host}",
f"_carddavs._tcp.{host}",
f"_imap._tcp.{host}",
f"_imaps._tcp.{host}",
f"_sieve._tcp.{host}",
f"_submission._tcp.{host}",
f"_webdav._tcp.{host}",
f"_webdavs._tcp.{host}",
]
for record in srv_records:
try:
answers = dns.resolver.resolve(record, 'SRV')
for rdata in answers:
print(" ", record, rdata.target)
except dns.resolver.NXDOMAIN:
success = False
print(" ERROR on record", record)
except dns.resolver.NoAnswer:
success = False
print(" ERROR on record", record)
if not success:
print_error(f"Dns entries on {host} not available")
return success
def test_email_dns(host, verbose = False):
success = True
srv_records = [
f"_autodiscover._tcp.{host}"
]
for record in srv_records:
try:
answers = dns.resolver.resolve(record, 'SRV')
for rdata in answers:
print(" ", record, rdata.target)
except dns.resolver.NXDOMAIN:
success = False
print(" ERROR on record", record)
except dns.resolver.NoAnswer:
success = False
print(" ERROR on record", record)
if not success:
print_error(f"Dns entries on {host} not available")
return success
def test_dmarc_dns(host, verbose = False):
success = True
try:
answers = dns.resolver.resolve(f"_dmarc.{host}", 'TXT')
for rdata in answers:
print(" _dmarc TXT", rdata)
except dns.resolver.NXDOMAIN:
success = False
print(" ERROR on _dmarc TXT")
except dns.resolver.NoAnswer:
success = False
print(" ERROR on _dmarc TXT")
if not success:
print_error(f"DMARC dns entries on {host} not available")
return success
def validate_spf_record(rdata):
# Something like "v=spf1 mx a:kolab.klab.cc mx:kolab.klab.cc -all"
data = str(rdata)
if not data.startswith('"v=spf1'):
return False
if not data.endswith('-all"'):
return False
return True
def test_spf_dns(host, verbose = False):
success = False
try:
answers = dns.resolver.resolve(f"{host}", 'TXT')
for rdata in answers:
if validate_spf_record(rdata):
print(" SPF TXT", rdata)
success = True
break
else:
print(f" ERROR while validating spf record {data}")
except dns.resolver.NXDOMAIN:
success = False
print(" ERROR on SPF TXT")
except dns.resolver.NoAnswer:
success = False
print(" ERROR on SPF TXT")
if not success:
print_error(f"SPF dns entry on {host} not available")
return success
def validate_dkim_record(rdata):
data = str(rdata)
# Re-assemble the quoted string
data = "".join(re.findall('"([^"]*)"', data))
print(data)
if not data.startswith("v=DKIM1;"):
return False
return True
def test_dkim_dns(host, selector, verbose = False):
success = False
try:
answers = dns.resolver.resolve(f"{selector}._domainkey.{host}", 'TXT')
for rdata in answers:
print(f" DKIM {selector}", rdata)
if validate_dkim_record(rdata):
success = True
else:
print(f" ERROR while validating dkim key {data}")
except dns.resolver.NXDOMAIN:
success = False
print(" ERROR on DKIM TXT")
except dns.resolver.NoAnswer:
success = False
print(" ERROR on DKIM TXT")
if not success:
print_error(f"DKIM dns entries on {host} not available")
return success
def test_imap(host, user, password, verbose):
success = True
hosts = [
(host, 993, True, False),
# (host, 143, False, True),
]
for hosttuple in hosts:
if verbose:
print("Connecting to ", hosttuple)
host, port, usessl, starttls = hosttuple
try:
if usessl:
imap = IMAP4_SSL(host=host, port=port)
else:
imap = IMAP4(host=host, port=port)
if starttls:
imap.starttls()
imap.login(user, password)
status, list_response = imap.list()
assert status == 'OK'
for folder in list_response:
if 'INBOX' in folder.decode('utf-8'):
inbox_found = True
assert inbox_found
except AssertionError as err:
print(" ERROR on peer", hosttuple, err)
success = False
except Exception as err: # pylint: disable=broad-except
print(" ERROR on peer", hosttuple, err)
success = False
if not success:
print_error("IMAP failed")
return success
def test_smtp(host, user, password, verbose):
success = True
hosts = [
(host, 465, True, False),
(host, 587, False, True),
]
for hosttuple in hosts:
if verbose:
print("Connecting to ", hosttuple)
host, port, usessl, starttls = hosttuple
try:
if usessl:
smtp = SMTP_SSL(host=host, port=port)
else:
smtp = SMTP(host=host, port=port)
if starttls:
smtp.starttls()
# check we have an open socket
assert smtp.sock
# run a no-operation, which is basically a server-side pass-through
status, _response = smtp.noop()
assert status == 250
status, _response = smtp.login(user, password)
assert status == 235
status, _response = smtp.quit()
assert status == 221
except AssertionError as err:
print(" ERROR on peer", hosttuple, err)
success = False
except Exception as err: # pylint: disable=broad-except
print(" ERROR on peer", hosttuple, err)
success = False
if not success:
print_error("SMTP failed")
return success
def test_certificates(host, davhost, imaphost, verbose):
success = True
hosts = [
(host, 443),
]
if davhost:
hosts.append((urllib.parse.urlparse(davhost).netloc, 443))
if imaphost:
hosts.append((imaphost, 993))
hosts.append((imaphost, 465))
context = ssl.create_default_context()
for hosttuple in hosts:
hostname, _port = hosttuple
try:
conn = context.wrap_socket(socket.socket(socket.AF_INET), server_hostname=hostname)
conn.connect(hosttuple)
cert = conn.getpeercert()
if verbose:
print(f"Certificate for {hosttuple}: {cert}")
except OSError as err:
print(" ERROR on peer", hosttuple, err)
success = False
if not success:
print_error("Not all certificates are valid")
return success
def test_meet(host, verbose):
headers = {
"Host": host
}
try:
response = http_request(
f"https://{host}/meetmedia/signaling",
"OPTIONS",
None,
headers,
None,
verbose
)
except http.client.RemoteDisconnected:
print("Remote disconnected")
print_error("Activesync is not available")
return False
success = response.status == 200
data = response.read().decode()
if success:
try:
assert "Transport unknown" in data
except AssertionError:
print_assertion_failure()
success = False
if not success:
print_error("Meet signaling is not available")
if verbose or not success:
print(" ", "Status", response.status)
print(" ", data)
return success
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--host", help="Host")
parser.add_argument("--username", help="Username")
parser.add_argument("--password", help="User password")
parser.add_argument("--imap", help="IMAP URI")
parser.add_argument("--smtp", help="SMTP URI")
parser.add_argument("--dav", help="DAV URI")
parser.add_argument("--meet", help="MEET URI")
parser.add_argument("--autoconfig", help="Check autoconfig")
parser.add_argument("--dns", action='store_true', help="Check dns")
parser.add_argument("--dkim", help="Check DKIM dns record")
parser.add_argument("--activesync", help="ActiveSync URI")
parser.add_argument("--certificates", action='store_true', help="Check Certificates")
parser.add_argument("--fb", help="Freebusy url as displayed in roundcube")
parser.add_argument("--verbose", action='store_true', help="Verbose output")
parser.add_argument("--default", action='store_true', help="Standard checks with only username and password")
options = parser.parse_args()
error = False
+ # Maybe structure it like this.
+ # Then print report, and upload results to prometheus
+ # [testName, testEnabled, testLambda] => [testName, testResult]
+
if options.default:
options.host = options.username.split('@')[1]
options.dav = "https://" + options.host + "/.well-known/caldav"
options.imap = options.host
options.smtp = options.host
options.activesync = options.host
options.certificates = True
if options.dav:
if discover_principal(options.dav, options.username, options.password, options.verbose):
print_success("Caldav is available")
else:
error = True
if options.host:
if test_caldav_redirect(options.host, options.username, options.password, options.verbose):
print_success("Caldav on .well-known/caldav is available")
# Kolabnow doesn't support this atm (it offers the redirect on apps.kolabnow.com),
# so we ignore the error for now
if options.autoconfig:
if test_autoconfig(options.host, options.username, options.password, options.verbose):
print_success("Autoconf available")
else:
error = True
if options.activesync:
if options.autoconfig:
if test_autodiscover_activesync(options.host, options.activesync, options.username, options.password, options.verbose):
print_success("Activesync Autodsicovery available")
# Kolabnow doesn't support this, so we ignore the error for now
if test_activesync(options.activesync, options.username, options.password, options.verbose):
print_success("Activesync available")
else:
error = True
if options.fb:
if test_freebusy_authenticated(options.fb, options.username, options.password, options.verbose):
print_success("Authenticated Freebusy is available")
else:
error = True
# We rely on the activesync test to have generated the token for unauthenticated access.
if test_freebusy_unauthenticated(options.fb, options.username, options.password, options.verbose):
print_success("Unauthenticated Freebusy is available")
else:
error = True
if options.dns:
if test_dns(options.host, options.verbose):
print_success(f"DNS entries on {options.host} available")
else:
error = True
if test_dmarc_dns(options.host, options.verbose):
print_success(f"DMARC DNS entries on {options.host} available")
else:
error = True
if test_spf_dns(options.host, options.verbose):
print_success(f"SPF DNS entries on {options.host} available")
else:
error = True
userhost = options.username.split('@')[1]
if test_email_dns(userhost, options.verbose):
print_success(f"User DNS entries on {userhost} available")
else:
error = True
if options.dkim:
if test_dkim_dns(options.host, options.dkim, options.verbose):
print_success(f"DKIM DNS entries on {options.host} available")
else:
error = True
if options.certificates:
if test_certificates(options.host, options.dav, options.imap, options.verbose):
print_success("All certificates are valid")
else:
error = True
if options.imap:
if test_imap(options.imap, options.username, options.password, options.verbose):
print_success("IMAP is available")
else:
error = True
if options.smtp:
if test_smtp(options.smtp, options.username, options.password, options.verbose):
print_success("SMTP is available")
else:
error = True
if options.meet:
if test_meet(options.meet, options.verbose):
print_success("Meet is available")
else:
error = True
+ # Push result to prometheus
+
if error:
print_error("At least one check failed")
sys.exit(1)
if __name__ == "__main__":
main()
diff --git a/docker/utils/rootfs/opt/app-root/src/mailtransporttest.py b/docker/utils/rootfs/opt/app-root/src/mailtransporttest.py
index 841dc899..249b3b66 100755
--- a/docker/utils/rootfs/opt/app-root/src/mailtransporttest.py
+++ b/docker/utils/rootfs/opt/app-root/src/mailtransporttest.py
@@ -1,135 +1,190 @@
#!/bin/env python3
"""
Send an email via SMTP and then look for it via IMAP.
- ./mailtransporttest.py --sender-username test1@kolab.org --sender-password foobar --sender-host smtp.kolabnow.com --recipient-username test2@kolab.org --recipient-password foobar --recipient-host imap.kolabnow.com
+ ./mailtransporttest.py --sender-username test1@kolab.org --sender-password foobar --sender-host smtp.kolabnow.com --recipient-username test2@kolab.org --recipient-password foobar --recipient-host imap.kolabnow.com --validate
"""
from datetime import datetime
import argparse
import sys
import imaplib
import smtplib
import uuid
import time
mailtemplate = '''
MIME-Version: 1.0
Date: {date}
From: {sender}
To: {to}
Subject: {subject}
Message-ID: {messageid}
Content-Transfer-Encoding: 7bit
Content-Type: text/plain; charset=US-ASCII
{body}
'''.strip()
class SendTest:
def __init__(self, options):
self.recipient_host = options.recipient_host
self.recipient_port = options.recipient_port
self.recipient_username = options.recipient_username
self.recipient_password = options.recipient_password
self.sender_host = options.sender_host
self.sender_port = options.sender_port
self.sender_username = options.sender_username
self.sender_password = options.sender_password
self.target_address = options.target_address
self.body = options.body
self.verbose = options.verbose
+ self.validate = options.validate
self.uuid = str(uuid.uuid4())
self.subject = f"Delivery Check {self.uuid}"
+
+ def validate_message(message):
+ # Ensure SPF record matches a received line?
+ # Suggest SPF record ip (sender ip)
+ # Validate DKIM-Signature according to DNS entry
+ # Ensure Authentication-Results contains dkim=pass
+ # Ensure X-Virus-Scanned is set (amavis is available
+ # X-Spam-Flag is NO
+ # Calculate delay using Date header?
+ # These could all be statistics for prometheus
+# Return-Path:
+# Received: from postfix.kolab.klab.cc (172-16-8-22.postfix.kolab-klab-cc.svc.cluster.local [172.16.8.22])
+# by imap-backend (Cyrus unknown) with LMTPA;
+# Sun, 13 Oct 2024 21:15:32 +0000
+# X-Cyrus-Session-Id: cyrus-imapd-1728854132-26-2-5894465078812508901
+# X-Sieve: CMU Sieve 3.0
+# Authentication-Results: kolab.klab.cc (amavis); dkim=pass (2048-bit key)
+# reason="pass (just generated, assumed good)" header.d=kolab.klab.cc
+# DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=kolab.klab.cc; h=
+# content-transfer-encoding:content-type:content-type:message-id
+# :subject:subject:from:from:date:date:mime-version; s=
+# dkim20240320; t=1728854126; x=1729718127; bh=4pDpXBY8rCbX8+Mfrkl
+# ZzpQxaUsa3vSPUYjcDR3KAnU=; b=FZHjO3xOqj0g3dQm+QAxABzGL1eDh5CPYO3
+# x5H5baVVGbVhPQ06UnHjKR8gOk9is0BBDGwuHy01pO3lp1pirNzzgKXq/3oGxGmf
+# SXj3ifkN7x87k4flA9K1c8fJbcC46LbsiXZPSFMf3zgO1DJ7zmSxW8uJjJga2O8d
+# lmOlr40p5a97uFqalBgCwGgnzvzIcV92082pLG0HqUzUq7H0dNlqanC9dxYYDZrR
+# 3XZcH91QkZ6wRYK/VJ7Ysle98onTjbEJ2HpgMFT4Z+d07vmCQM5vZvXawWJmKdEq
+# VB7qQ6HhDRP/kvaZuzklNwaJp3668ANOhgPkn1z/VlaEDHsc6+g==
+# X-Virus-Scanned: amavis at kolab.klab.cc
+# X-Spam-Flag: NO
+# X-Spam-Score: -0.002
+# X-Spam-Level:
+# X-Spam-Status: No, score=-0.002 tagged_above=-999 required=6.2
+# tests=[NO_RECEIVED=-0.001, NO_RELAYS=-0.001] autolearn=ham autolearn_force=no
+# MIME-Version: 1.0
+# Date: Sun, 13 Oct 2024 23:15:14 +0200
+# From: admin@kolab.klab.cc
+# To: john@kolab.klab.cc
+# Subject: sdf
+# Message-ID: <0b22600293ff12241dc9fab794c414ff@kolab.klab.cc>
+# Content-Type: text/plain; charset=US-ASCII;
+# format=flowed
+# Content-Transfer-Encoding: 7bit
+
def check_for_mail(self):
print(f"Checking for uuid {self.uuid}")
imap = imaplib.IMAP4_SSL(host=self.recipient_host, port=self.recipient_port)
if self.verbose:
imap.debug = 4
imap.login(self.recipient_username, self.recipient_password)
imap.select("INBOX")
# FIXME This seems to find emails that are not there
if self.body:
typ, data = imap.search(None, 'BODY', self.uuid)
else:
typ, data = imap.search(None, 'SUBJECT', self.uuid)
+
+
+ if self.validateMessage:
+ # We should probably only get the headers that we need
+ typ, data = imap.fetch(num)
+ #FIXME fetch
+ validate_message()
+
for num in data[0].split():
print(f"Found the mail with uid {num}")
imap.store(num, '+FLAGS', '\\Deleted')
imap.expunge()
return True
return False
def send_mail(self, starttls):
dtstamp = datetime.utcnow()
if self.target_address:
to = self.target_address
else:
to = self.recipient_username
print(f"Sending email to {to}")
msg = mailtemplate.format(
messageid="<{}@deliverycheck.org>".format(self.uuid),
subject=self.subject,
sender=self.sender_username,
to=to,
date=dtstamp.strftime("%a, %d %b %Y %H:%M:%S %z"),
body=self.body,
)
if starttls:
with smtplib.SMTP(host=self.sender_host, port=self.sender_port or 587) as smtp:
smtp.starttls()
smtp.ehlo()
smtp.login(self.sender_username, self.sender_password)
smtp.noop()
smtp.sendmail(self.sender_username, to, msg)
print(f"Email with uuid {self.uuid} sent")
else:
with smtplib.SMTP_SSL(host=self.sender_host, port=self.sender_port or 465) as smtp:
smtp.login(self.sender_username, self.sender_password)
smtp.noop()
smtp.sendmail(self.sender_username, to, msg)
print(f"Email with uuid {self.uuid} sent")
parser = argparse.ArgumentParser(description='Mail transport tests.')
parser.add_argument('--sender-username', help='The SMTP sender username')
parser.add_argument('--sender-password', help='The SMTP sender password')
parser.add_argument('--sender-host', help='The SMTP sender host')
parser.add_argument('--sender-port', help='The SMTP sender port', default=993)
parser.add_argument('--recipient-username', help='The IMAP recipient username')
parser.add_argument('--recipient-password', help='The IMAP recipient password')
parser.add_argument('--recipient-host', help='The IMAP recipient host')
parser.add_argument('--recipient-port', help='The IMAP recipient port (defaults to 465/587)')
parser.add_argument('--timeout', help='Timeout in minutes', type=int, default=10)
parser.add_argument("--starttls", action='store_true', help="Use starttls over 587")
parser.add_argument("--verbose", action='store_true', help="Use starttls over 587")
parser.add_argument("--target-address", help="Target address instead of the recipient username")
parser.add_argument("--body", help="Body text to include")
args = parser.parse_args()
obj = SendTest(args)
obj.send_mail(args.starttls)
timeout = 10
for i in range(1, round(args.timeout * 60 / timeout) + 1):
if obj.check_for_mail():
print("Success!")
+ # TODO print statistics? Push statistics directly someplace?
sys.exit(0)
print(f"waiting for {timeout}")
time.sleep(timeout)
+# TODO print statistics? Push statistics directly someplace?
print("Failed to find the mail")
sys.exit(1)