diff --git a/docker/utils/rootfs/opt/app-root/src/kolabendpointtester.py b/docker/utils/rootfs/opt/app-root/src/kolabendpointtester.py
index de5e24b4..ec6d8c1d 100755
--- a/docker/utils/rootfs/opt/app-root/src/kolabendpointtester.py
+++ b/docker/utils/rootfs/opt/app-root/src/kolabendpointtester.py
@@ -1,653 +1,652 @@
#!/bin/env python3
"""
kolabendpointtester.py
--host apps.kolabnow.com
--user user@kolab.org
--password Secret
--dav https://apps.kolabnow.com
--fb https://apps.kolabnow.com/calendars/user@kolab.org/6f552d35-95c4-41f6-a7d2-cfd02dd867db
"""
import sys
import traceback
import socket
import ssl
import argparse
from base64 import b64encode
import http.client
import urllib.parse
from imaplib import IMAP4
from imaplib import IMAP4_SSL
from smtplib import SMTP
from smtplib import SMTP_SSL
import dns.resolver
# print('\033[31m' + 'some red text')
RED='\033[31m'
GREEN='\033[32m'
RESET='\033[39m'
SSLNOVERIFY = True
def print_error(msg):
print(RED + f"=> ERROR: {msg}")
print(RESET) # and reset to default color
def print_success(msg):
print(GREEN + f"=> {msg}")
print(RESET) # and reset to default color
def print_assertion_failure():
"""
Print an error message about a failed assertion
"""
_, _, trace = sys.exc_info()
tb_info = traceback.extract_tb(trace)
_filename, line, _func, text = tb_info[-1]
print(f" ERROR assertion on line {line} failed on {text}")
def http_request(url, method, params=None, headers=None, body=None, verbose=False):
"""
Perform an HTTP request.
"""
parsed_url = urllib.parse.urlparse(url)
# print("Connecting to ", parsed_url.netloc)
if url.startswith('https://'):
conn = http.client.HTTPSConnection(parsed_url.netloc, 443, context = (ssl._create_unverified_context() if SSLNOVERIFY else None))
else:
conn = http.client.HTTPConnection(parsed_url.netloc, 80)
if verbose:
conn.set_debuglevel(9)
conn.connect()
if params is None:
params = {}
if headers is None:
headers = {
"Content-Type": "application/x-www-form-urlencoded; charset=utf-8"
}
if body is None:
body = urllib.parse.urlencode(params)
# Assemble a relative url
url = urllib.parse.urlunsplit(["", "", parsed_url.path, parsed_url.query, parsed_url.fragment])
print(f"Requesting {url} From {parsed_url.netloc} Using {method}")
conn.request(method, url, body, headers)
response = conn.getresponse()
# Handle redirects
if response.status in (301, 302,):
print("Following redirect ", response.getheader('location', ''))
return http_request(
urllib.parse.urljoin(url, response.getheader('location', '')),
method,
params,
headers,
body,
verbose)
return response
def basic_auth_headers(username, password):
user_and_pass = b64encode(
f"{username}:{password}".encode("ascii")
).decode("ascii")
return {
"Authorization": "Basic {}".format(user_and_pass)
}
def try_get(name, url, verbose, headers = None, body = None):
try:
response = http_request(
url,
"GET",
None,
headers,
body,
verbose
)
success = response.status == 200
except http.client.RemoteDisconnected:
print("Remote disconnected")
print_error(f"{name} is not available")
return False
if not success:
print_error(f"{name} is not available")
if verbose or not success:
print(" ", "Status", response.status)
print(" ", response.read().decode())
return success
def test_caldav_redirect(host, username, password, verbose):
return discover_principal("https://" + host + "/.well-known/caldav", username, password, verbose)
def discover_principal(url, username, password, verbose):
body = ''
headers = {
"Content-Type": "application/xml; charset=utf-8",
"Depth": "0",
**basic_auth_headers(username, password)
}
try:
response = http_request(
f"{url}",
"PROPFIND",
None,
headers,
body,
verbose
)
except http.client.RemoteDisconnected:
- print("Remote disconnected")
- print_error("Caldav is not available")
+ print_error(f"Caldav is not available at {url} (Remote disconnected)")
return False
success = response.status == 207
if not success:
- print_error("Caldav is not available")
+ print_error(f"Caldav is not available at {url} (status != 207)")
if verbose or not success:
print(" ", "Status", response.status)
print(" ", response.read().decode())
return success
def test_freebusy_authenticated(url, username, password, verbose = False):
# Request our own freebusy authenticated
return try_get("Authenticated Freebusy", f"{url}/{username}.ifb", verbose, headers = basic_auth_headers(username, password))
def test_freebusy_unauthenticated(url, username, password, verbose = False):
return try_get("Unauthenticated Freebusy", f"{url}/{username}.ifb", verbose)
def test_autoconfig(host, username, password, verbose = False):
if not try_get("Autoconf .well-known", f"https://{host}/.well-known/autoconfig/mail/config-v1.1.xml?emailaddress={username}", verbose):
return False
if not try_get("Autoconf /mail", f"https://{host}/mail/config-v1.1.xml?emailaddress={username}", verbose):
return False
return True
# TODO
# def test_007_well_known_outlook():
# body = '''
#
# admin@example.local
#
# http://schemas.microsoft.com/exchange/autodiscover/outlook/responseschema/2006a
#
#
# '''
# headers = {
# "Content-Type": "text/xml; charset=utf-8"
# }
# response = http_post(
# "https://kolab-vanilla.{}.local/autodiscover/autodiscover.xml".format(hostname),
# None,
# headers,
# body
# )
# assert response.status == 200
# data = response.read()
# decoded = codecs.decode(data)
# # Sanity check of the data
# assert 'example.local' in decoded
# assert "admin@example.local" in decoded
# # Ensure the alternative urls also work
# assert http_post(
# "https://kolab-vanilla.{}.local/Autodiscover/Autodiscover.xml".format(hostname),
# None,
# headers,
# body
# ).status == 200
# assert http_post(
# "https://kolab-vanilla.{}.local/AutoDiscover/AutoDiscover.xml".format(hostname),
# None,
# headers,
# body
# ).status == 200
def test_autodiscover_activesync(host, activesynchost, username, password, verbose = False):
"""
We expect something along the lines of
User Name
user@example.com
MobileSync
https://kolab.example.com/Microsoft-Server-ActiveSync
https://kolab.example.com/Microsoft-Server-ActiveSync
"""
body = f'''
{username}
http://schemas.microsoft.com/exchange/autodiscover/mobilesync/responseschema/2006
'''
headers = {
"Content-Type": "text/xml; charset=utf-8",
**basic_auth_headers(username, password)
}
try:
response = http_request(
f"https://{host}/autodiscover/autodiscover.xml",
"POST",
None,
headers,
body,
verbose
)
except http.client.RemoteDisconnected:
print("Remote disconnected")
print_error("Activesync autodiscover is not available")
return False
success = response.status == 200
data = response.read().decode()
if success:
try:
# Sanity check of the data
assert "MobileSync" in data
assert f"https://{activesynchost}/Microsoft-Server-ActiveSync" in data
assert username in data
except AssertionError:
print(data)
print_assertion_failure()
success = False
if not success:
print_error("Activesync autodiscover is not available")
if verbose or not success:
print(" ", "Status", response.status)
print(" ", data)
return success
def test_activesync(host, username, password, verbose = False):
headers = {
"Host": host,
**basic_auth_headers(username, password)
}
try:
response = http_request(
f"https://{host}/Microsoft-Server-ActiveSync",
"OPTIONS",
None,
headers,
None,
verbose
)
except http.client.RemoteDisconnected:
print("Remote disconnected")
print_error("Activesync is not available")
return False
success = response.status == 200
data = response.read().decode()
if success:
try:
assert response.getheader('MS-Server-ActiveSync', '')
assert '14.1' in response.getheader('MS-ASProtocolVersions', '')
assert 'FolderSync' in response.getheader('MS-ASProtocolCommands', '')
except AssertionError:
print_assertion_failure()
success = False
if not success:
print_error("Activesync is not available")
if verbose or not success:
print(" ", "Status", response.status)
print(" ", data)
return success
def test_dns(host, verbose = False):
success = True
try:
answers = dns.resolver.resolve(host, 'MX')
for rdata in answers:
print(' MX Host', rdata.exchange, 'has preference', rdata.preference)
except dns.resolver.NXDOMAIN:
success = False
print(" ERROR on MX record")
except dns.resolver.NoAnswer:
success = False
print(" ERROR on MX record")
try:
answers = dns.resolver.resolve(f"autodiscover.{host}", 'CNAME')
for rdata in answers:
print(' autodiscover CNAME', rdata.target)
except dns.resolver.NXDOMAIN:
success = False
print(f" ERROR on autodiscover.{host} CNAME entry")
except dns.resolver.NoAnswer:
success = False
print(f" ERROR on autodiscover.{host} CNAME entry")
srv_records = [
f"_autodiscover._tcp.{host}",
f"_caldav._tcp.{host}",
f"_caldavs._tcp.{host}",
f"_carddav._tcp.{host}",
f"_carddavs._tcp.{host}",
f"_imap._tcp.{host}",
f"_imaps._tcp.{host}",
f"_sieve._tcp.{host}",
f"_submission._tcp.{host}",
f"_webdav._tcp.{host}",
f"_webdavs._tcp.{host}",
]
for record in srv_records:
try:
answers = dns.resolver.resolve(record, 'SRV')
for rdata in answers:
print(" ", record, rdata.target)
except dns.resolver.NXDOMAIN:
success = False
print(" ERROR on record", record)
except dns.resolver.NoAnswer:
success = False
print(" ERROR on record", record)
if not success:
print_error(f"Dns entires on {host} not available")
return success
def test_email_dns(host, verbose = False):
success = True
srv_records = [
f"_autodiscover._tcp.{host}"
]
for record in srv_records:
try:
answers = dns.resolver.resolve(record, 'SRV')
for rdata in answers:
print(" ", record, rdata.target)
except dns.resolver.NXDOMAIN:
success = False
print(" ERROR on record", record)
except dns.resolver.NoAnswer:
success = False
print(" ERROR on record", record)
if not success:
print_error(f"Dns entires on {host} not available")
return success
def test_imap(host, user, password, verbose):
success = True
hosts = [
(host, 993, True, False),
# (host, 143, False, True),
]
for hosttuple in hosts:
if verbose:
print("Connecting to ", hosttuple)
host, port, usessl, starttls = hosttuple
try:
if usessl:
imap = IMAP4_SSL(host=host, port=port)
else:
imap = IMAP4(host=host, port=port)
if starttls:
imap.starttls()
imap.login(user, password)
status, list_response = imap.list()
assert status == 'OK'
for folder in list_response:
if 'INBOX' in folder.decode('utf-8'):
inbox_found = True
assert inbox_found
except AssertionError as err:
print(" ERROR on peer", hosttuple, err)
success = False
except Exception as err: # pylint: disable=broad-except
print(" ERROR on peer", hosttuple, err)
success = False
if not success:
print_error("IMAP failed")
return success
def test_smtp(host, user, password, verbose):
success = True
hosts = [
(host, 465, True, False),
(host, 587, False, True),
]
for hosttuple in hosts:
if verbose:
print("Connecting to ", hosttuple)
host, port, usessl, starttls = hosttuple
try:
if usessl:
smtp = SMTP_SSL(host=host, port=port)
else:
smtp = SMTP(host=host, port=port)
if starttls:
smtp.starttls()
# check we have an open socket
assert smtp.sock
# run a no-operation, which is basically a server-side pass-through
status, _response = smtp.noop()
assert status == 250
status, _response = smtp.login(user, password)
assert status == 235
status, _response = smtp.quit()
assert status == 221
except AssertionError as err:
print(" ERROR on peer", hosttuple, err)
success = False
except Exception as err: # pylint: disable=broad-except
print(" ERROR on peer", hosttuple, err)
success = False
if not success:
print_error("SMTP failed")
return success
def test_certificates(host, davhost, imaphost, verbose):
success = True
hosts = [
(host, 443),
]
if davhost:
hosts.append((urllib.parse.urlparse(davhost).netloc, 443))
if imaphost:
hosts.append((imaphost, 993))
hosts.append((imaphost, 465))
context = ssl.create_default_context()
for hosttuple in hosts:
hostname, _port = hosttuple
try:
conn = context.wrap_socket(socket.socket(socket.AF_INET), server_hostname=hostname)
conn.connect(hosttuple)
cert = conn.getpeercert()
if verbose:
print(f"Certificate for {hosttuple}: {cert}")
except OSError as err:
print(" ERROR on peer", hosttuple, err)
success = False
if not success:
print_error("Not all certificates are valid")
return success
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--host", help="Host")
parser.add_argument("--username", help="Username")
parser.add_argument("--password", help="User password")
parser.add_argument("--imap", help="IMAP URI")
parser.add_argument("--smtp", help="SMTP URI")
parser.add_argument("--dav", help="DAV URI")
parser.add_argument("--autoconfig", help="Check autoconfig")
parser.add_argument("--dns", action='store_true', help="Check dns")
parser.add_argument("--activesync", help="ActiveSync URI")
parser.add_argument("--certificates", action='store_true', help="Check Certificates")
parser.add_argument("--fb", help="Freebusy url as displayed in roundcube")
parser.add_argument("--verbose", action='store_true', help="Verbose output")
options = parser.parse_args()
error = False
if options.dav:
if discover_principal(options.dav, options.username, options.password, options.verbose):
print_success("Caldav is available")
else:
error = True
if options.host:
if test_caldav_redirect(options.host, options.username, options.password, options.verbose):
print_success("Caldav on .well-known/caldav is available")
# Kolabnow doesn't support this atm (it offers the redirect on apps.kolabnow.com),
# so we ignore the error for now
if options.autoconfig:
if test_autoconfig(options.host, options.username, options.password, options.verbose):
print_success("Autoconf available")
else:
error = True
if options.activesync:
if options.autoconfig:
if test_autodiscover_activesync(options.host, options.activesync, options.username, options.password, options.verbose):
print_success("Activesync Autodsicovery available")
# Kolabnow doesn't support this, so we ignore the error for now
if test_activesync(options.activesync, options.username, options.password, options.verbose):
print_success("Activesync available")
else:
error = True
if options.fb:
if test_freebusy_authenticated(options.fb, options.username, options.password, options.verbose):
print_success("Authenticated Freebusy is available")
else:
error = True
# We rely on the activesync test to have generated the token for unauthenticated access.
if test_freebusy_unauthenticated(options.fb, options.username, options.password, options.verbose):
print_success("Unauthenticated Freebusy is available")
else:
error = True
if options.dns:
if test_dns(options.host, options.verbose):
print(f"=> DNS entries on {options.host} available")
else:
error = True
userhost = options.username.split('@')[1]
if test_email_dns(userhost, options.verbose):
print(f"=> DNS entries on {userhost} available")
else:
error = True
if options.certificates:
if test_certificates(options.host, options.dav, options.imap, options.verbose):
print_success("All certificates are valid")
else:
error = True
if options.imap:
if test_imap(options.imap, options.username, options.password, options.verbose):
print_success("IMAP is available")
else:
error = True
if options.smtp:
if test_smtp(options.smtp, options.username, options.password, options.verbose):
print_success("SMTP is available")
else:
error = True
if error:
print_error("At least one check failed")
sys.exit(1)
if __name__ == "__main__":
main()