diff --git a/utils/activesynccli.py b/utils/activesynccli.py
new file mode 100755
--- /dev/null
+++ b/utils/activesynccli.py
@@ -0,0 +1,344 @@
+#!/bin/env python3
+
+"""
+activesynccli.py
+ --host apps.kolabnow.com
+ --devicetype WindowsOutlook15
+ --deviceid windowsascli
+ --user user@kolab.org --password Secret
+ --verbose
+ list --folder INBOX
+
+# Dependencies
+
+ dnf install libwbxml-devel
+ pip install --global-option=build_ext --global-option="-I/usr/include/libwbxml-1.0/wbxml/" git+https://github.com/Apheleia-IT/python-wbxml#egg=wbxml
+
+"""
+
+import argparse
+import base64
+import http.client
+import urllib.parse
+import struct
+import xml.etree.ElementTree as ET
+import ssl
+import wbxml
+
+
+def decode_timezone(tz):
+ decoded = base64.b64decode(tz)
+ bias, standardName, standardDate, standardBias, daylightName, daylightDate, daylightBias = struct.unpack('i64s16si64s16si', decoded)
+ print(f" TimeZone bias: {bias}min")
+ print(f" Standard Name: {standardName.decode()}")
+ year, month, day, week, hour, minute, second, millis = struct.unpack('hhhhhhhh', standardDate)
+ print(f" Standard Date: Year: {year} Month: {month} Day: {day} Week: {week} Hour: {hour} Minute: {minute} Second: {second} Millisecond: {millis}")
+ print(f" Daylight Name: {daylightName.decode()}")
+ year, month, day, week, hour, minute, second, millis = struct.unpack('hhhhhhhh', daylightDate)
+ print(f" Daylight Date: Year: {year} Month: {month} Day: {day} Week: {week} Hour: {hour} Minute: {minute} Second: {second} Millisecond: {millis}")
+ print(f" Daylight Bias: {daylightBias}min")
+ print()
+
+
+def http_request(url, method, params=None, headers=None, body=None):
+ """
+ Perform an HTTP request.
+ """
+
+ # print(url)
+ parsed_url = urllib.parse.urlparse(url)
+ # print("Connecting to ", parsed_url.netloc)
+ if url.startswith('https://'):
+ conn = http.client.HTTPSConnection(parsed_url.netloc, 443, context = ssl._create_unverified_context())
+ else:
+ conn = http.client.HTTPConnection(parsed_url.netloc, 80)
+
+ if params is None:
+ params = {}
+
+ if headers is None:
+ headers = {
+ "Content-Type": "application/x-www-form-urlencoded; charset=utf-8"
+ }
+
+ if body is None:
+ body = urllib.parse.urlencode(params)
+
+ # print("Requesting", parsed_url.geturl(), "From", parsed_url.netloc)
+ conn.request(method, parsed_url.geturl(), body, headers)
+ response = conn.getresponse()
+
+ # Handle redirects
+ if response.status in (301, 302,):
+ # print("Following redirect ", response.getheader('location', ''))
+ return http_request(
+ urllib.parse.urljoin(url, response.getheader('location', '')),
+ method,
+ params,
+ headers,
+ body)
+
+ if not response.status == 200:
+ print(" ", "Status", response.status)
+ print(" ", response.read().decode())
+
+ return response
+
+
+def basic_auth_headers(username, password):
+ user_and_pass = base64.b64encode(
+ f"{username}:{password}".encode("ascii")
+ ).decode("ascii")
+
+ return {
+ "Authorization": "Basic {}".format(user_and_pass)
+ }
+
+
+def try_get(name, url, verbose, headers = None, body = None):
+ response = http_request(
+ url,
+ "GET",
+ None,
+ headers,
+ body
+ )
+ success = response.status == 200
+ if not success:
+ print(f"=> Error: {name} is not available")
+
+ if verbose or not success:
+ print(" ", "Status", response.status)
+ print(" ", response.read().decode())
+
+ return success
+
+
+class ActiveSync:
+ def __init__(self, options):
+ self.host = options.host
+ self.username = options.username
+ self.password = options.password
+ self.verbose = options.verbose
+
+ if options.deviceid:
+ self.deviceid = options.deviceid
+ else:
+ self.deviceid = 'v140Device'
+
+ if options.devicetype:
+ self.devicetype = options.devicetype
+ else:
+ self.devicetype = 'iphone'
+
+ if hasattr(options, 'folder') and options.folder:
+ self.folder = options.folder
+ else:
+ self.folder = None
+
+
+ def send_request(self, command, request, extra_args = None):
+ body = wbxml.xml_to_wbxml(request)
+
+ headers = {
+ "Host": self.host,
+ **basic_auth_headers(self.username, self.password)
+ }
+
+ headers.update(
+ {
+ "Content-Type": "application/vnd.ms-sync.wbxml",
+ 'MS-ASProtocolVersion': "14.0",
+ }
+ )
+
+ if extra_args is None:
+ extra_args = ""
+
+ return http_request(
+ f"https://{self.host}/Microsoft-Server-ActiveSync?Cmd={command}&User={self.username}&DeviceId={self.deviceid}&DeviceType={self.devicetype}{extra_args}",
+ "POST",
+ None,
+ headers,
+ body
+ )
+
+
+ def check(self):
+ headers = {
+ "Host": self.host,
+ **basic_auth_headers(self.username, self.password)
+ }
+
+ response = http_request(
+ f"https://{self.host}/Microsoft-Server-ActiveSync",
+ "OPTIONS",
+ None,
+ headers,
+ None
+ )
+
+ success = response.status == 200
+ data = response.read().decode()
+ if not success:
+ print("=> Error: Activesync is not available")
+ else:
+ # Sanity check of the data
+ assert response.getheader('MS-Server-ActiveSync', '')
+ assert '14.1' in response.getheader('MS-ASProtocolVersions', '')
+ assert 'FolderSync' in response.getheader('MS-ASProtocolCommands', '')
+
+ if self.verbose or not success:
+ print(" ", "Status", response.status)
+ print(" ", data)
+
+ return success
+
+
+ def fetch(self, collection_id, sync_key = 0):
+ request = """
+
+
+
+
+
+ {sync_key}
+ {collection_id}
+ 0
+ 0
+ 512
+
+ 0
+ 2
+ 8
+
+ 4
+ 1
+
+
+
+
+ 512
+
+ """.replace(' ', '').replace('\n', '')
+
+ response = self.send_request('Sync', request.format(collection_id=collection_id, sync_key=sync_key))
+
+ assert response.status == 200
+
+ result = wbxml.wbxml_to_xml(response.read())
+
+ if self.verbose:
+ print(result)
+
+ root = ET.fromstring(result)
+ xmlns = "http://synce.org/formats/airsync_wm5/airsync"
+ sync_key = root.find(f".//{{{xmlns}}}SyncKey").text
+ more_available = (len(root.findall(f".//{{{xmlns}}}MoreAvailable")) == 1)
+ if self.verbose:
+ print("Current SyncKey:", sync_key)
+
+ for add in root.findall(f".//{{{xmlns}}}Add"):
+ serverId = add.find(f"{{{xmlns}}}ServerId").text
+ print(" ServerId", serverId)
+ applicationData = add.find(f"{{{xmlns}}}ApplicationData")
+
+ calxmlns = "http://synce.org/formats/airsync_wm5/calendar"
+ subject = applicationData.find(f"{{{calxmlns}}}Subject")
+ if subject is not None:
+ print(" Subject", subject.text)
+ startTime = applicationData.find(f"{{{calxmlns}}}StartTime")
+ if startTime is not None:
+ print(" StartTime", startTime.text)
+ timeZone = applicationData.find(f"{{{calxmlns}}}TimeZone")
+ if timeZone is not None:
+ decode_timezone(timeZone.text)
+ #the dates are encoded like so: vstdyear/vstdmonth/vstdday/vstdweek/vstdhour/vstdminute/vstdsecond/vstdmillis
+ decoded = base64.b64decode(timeZone.text)
+ bias, standardName, standardDate, standardBias, daylightName, daylightDate, daylightBias = struct.unpack('i64s16si64s16si', decoded)
+ print(f" TimeZone bias: {bias}min")
+ print("")
+
+
+ print("\n")
+
+ # Fetch after the initial sync
+ if sync_key == "1":
+ print("after initial sync")
+ self.fetch(collection_id, sync_key)
+
+ # Fetch more
+ if more_available:
+ print("more available")
+ print(root.findall(f".//{{{xmlns}}}MoreAvailable"))
+ self.fetch(collection_id, sync_key)
+
+
+
+ def list(self):
+ request = """
+
+
+
+ 0
+
+ """.replace(' ', '').replace('\n', '')
+
+ response = self.send_request('FolderSync', request)
+
+ assert response.status == 200
+
+ result = wbxml.wbxml_to_xml(response.read())
+
+ if self.verbose:
+ print(result)
+
+ root = ET.fromstring(result)
+ xmlns = "http://synce.org/formats/airsync_wm5/folderhierarchy"
+ sync_key = root.find(f".//{{{xmlns}}}SyncKey").text
+ if self.verbose:
+ print("Current SyncKey:", sync_key)
+
+ for add in root.findall(f".//{{{xmlns}}}Add"):
+ displayName = add.find(f"{{{xmlns}}}DisplayName").text
+ serverId = add.find(f"{{{xmlns}}}ServerId").text
+ print("ServerId", serverId)
+ print("DisplayName", displayName)
+
+ if self.folder and displayName == self.folder:
+ self.fetch(serverId)
+
+
+
+def main():
+ parser = argparse.ArgumentParser()
+
+ parser.add_argument("--host", help="Host")
+ parser.add_argument("--user", help="Username")
+ parser.add_argument("--password", help="User password")
+ parser.add_argument("--verbose", action='store_true', help="Verbose output")
+ parser.add_argument("--deviceid", help="Device identifier ")
+ parser.add_argument("--devicetype", help="devicetype (WindowsOutlook15, iphone)")
+
+ subparsers = parser.add_subparsers()
+
+ parser_list = subparsers.add_parser('decode_timezone')
+ parser_list.add_argument("timezone", help="Base64 encoded timezone string ('Lv///0lyYW....///w==') ")
+ parser_list.set_defaults(func=lambda args: decode_timezone(args.timezone))
+
+ parser_list = subparsers.add_parser('list')
+ parser_list.add_argument("--folder", help="Folder")
+ parser_list.set_defaults(func=lambda args: ActiveSync(args).list())
+
+ parser_check = subparsers.add_parser('check')
+ parser_check.set_defaults(func=lambda args: ActiveSync(args).check())
+
+ options = parser.parse_args()
+
+ if 'func' in options:
+ options.func(options)
+
+
+
+if __name__ == "__main__":
+ main()
diff --git a/utils/generatemail.py b/utils/generatemail.py
new file mode 100755
--- /dev/null
+++ b/utils/generatemail.py
@@ -0,0 +1,97 @@
+#!/bin/env python3
+"""
+ Generate a bunch of dummy messages in a folder.
+"""
+# import glob
+# import os
+from datetime import datetime, timedelta
+import random
+import argparse
+
+
+mailtemplate = '''
+Return-Path:
+Received: from imapb010.mykolab.com ([unix socket])
+ by imapb010.mykolab.com (Cyrus 2.5.10-49-g2e214b4-Kolab-2.5.10-8.1.el7.kolab_14) with LMTPA;
+ Wed, 09 Aug 2017 18:37:01 +0200
+X-Sieve: CMU Sieve 2.4
+Received: from int-mx002.mykolab.com (unknown [10.9.13.2])
+ by imapb010.mykolab.com (Postfix) with ESMTPS id 0A93910A25047
+ for ; Wed, 9 Aug 2017 18:37:01 +0200 (CEST)
+Received: from int-subm002.mykolab.com (unknown [10.9.37.2])
+ by int-mx002.mykolab.com (Postfix) with ESMTPS id EC06AF6E
+ for ; Wed, 9 Aug 2017 18:37:00 +0200 (CEST)
+MIME-Version: 1.0
+Content-Type: multipart/mixed;
+ boundary="=_291b8e96564265636432c6d494e02322"
+Date: {date}
+From: "Mollekopf, Christian"
+To: christian@example.ch
+Subject: {subject}
+Message-ID: {messageid}
+
+--=_291b8e96564265636432c6d494e02322
+Content-Type: multipart/alternative;
+ boundary="=_ceff0fd19756f45ed1295ee2069ff8e0"
+
+--=_ceff0fd19756f45ed1295ee2069ff8e0
+Content-Transfer-Encoding: 7bit
+Content-Type: text/plain; charset=US-ASCII
+
+sdlkjsdjf
+--=_ceff0fd19756f45ed1295ee2069ff8e0
+Content-Transfer-Encoding: quoted-printable
+Content-Type: text/html; charset=UTF-8
+
+
+sdlkjsdjf
+
+
+
+--=_ceff0fd19756f45ed1295ee2069ff8e0--
+
+--=_291b8e96564265636432c6d494e02322
+Content-Transfer-Encoding: base64
+Content-Type: text/plain;
+ name=xorg.conf
+Content-Disposition: attachment;
+ filename=xorg.conf;
+ size=211
+
+U2VjdGlvbiAiRGV2aWNlIgogICAgSWRlbnRpZmllciAgICAgIkRldmljZTAiCiAgICBEcml2ZXIg
+{attachment}ICAgIEJvYXJkTmFtZSAgICAgICJOVlMgNDIwME0iCiAgICBPcHRpb24gIk5vTG9nbyIgInRydWUi
+CiAgICBPcHRpb24gIlVzZUVESUQiICJ0cnVlIgpFbmRTZWN0aW9uCg==
+--=_291b8e96564265636432c6d494e02322--
+'''.strip()
+
+
+def populatemailbox(target_directory, count):
+ dtstamp = datetime.utcnow()
+
+ # Reproducible results
+ random.seed(30)
+
+ for i in range(1, count + 1):
+ dtstamp = dtstamp - timedelta(seconds=600)
+
+ attachmentMultiplier = 50000 * random.randint(0, 10) # Approx 20 MB
+ result = mailtemplate.format(
+ messageid="".format(i),
+ subject="Foobar {}".format(i),
+ date=dtstamp.strftime("%a, %d %b %Y %H:%M:%S %z"),
+ attachment='ICAgIEJvYXJkTmFtZSAgICAgICJOVlMgNDIwME0iCiAgICBPcHRpb24gIk5vTG9nbyIgInRydWUi\n' * attachmentMultiplier
+ )
+ fname = "{}/{}.".format(target_directory, i)
+ with open(fname, 'wb') as f:
+ f.write(result.encode())
+
+
+parser = argparse.ArgumentParser(description='Generate some mail.')
+parser.add_argument('target_directory', help='the target directory')
+parser.add_argument('--count', help='Number of emails to generate', type=int)
+
+args = parser.parse_args()
+
+populatemailbox(args.target_directory, args.count)
diff --git a/utils/kolabendpointtester.py b/utils/kolabendpointtester.py
new file mode 100755
--- /dev/null
+++ b/utils/kolabendpointtester.py
@@ -0,0 +1,452 @@
+#!/bin/env python3
+
+"""
+kolabendpointtester.py
+ --host apps.kolabnow.com
+ --user user@kolab.org
+ --password Secret
+ --dav https://apps.kolabnow.com
+ --fb https://apps.kolabnow.com/calendars/user@kolab.org/6f552d35-95c4-41f6-a7d2-cfd02dd867db
+"""
+
+import sys
+import traceback
+import socket
+import ssl
+import argparse
+from base64 import b64encode
+import http.client
+import urllib.parse
+import dns.resolver
+
+SSLNOVERIFY = False
+
+def print_assertion_failure():
+ """
+ Print an error message about a failed assertion
+ """
+ _, _, trace = sys.exc_info()
+ tb_info = traceback.extract_tb(trace)
+ _filename, line, _func, text = tb_info[-1]
+ print(f" ERROR assertion on line {line} failed on {text}")
+
+
+def http_request(url, method, params=None, headers=None, body=None):
+ """
+ Perform an HTTP request.
+ """
+
+ parsed_url = urllib.parse.urlparse(url)
+ # print("Connecting to ", parsed_url.netloc)
+ if url.startswith('https://'):
+ conn = http.client.HTTPSConnection(parsed_url.netloc, 443, context = (ssl._create_unverified_context() if SSLNOVERIFY else None))
+ else:
+ conn = http.client.HTTPConnection(parsed_url.netloc, 80)
+
+ if params is None:
+ params = {}
+
+ if headers is None:
+ headers = {
+ "Content-Type": "application/x-www-form-urlencoded; charset=utf-8"
+ }
+
+ if body is None:
+ body = urllib.parse.urlencode(params)
+
+ print("Requesting", parsed_url.path, "From", parsed_url.netloc)
+ conn.request(method, parsed_url.geturl(), body, headers)
+ response = conn.getresponse()
+
+ # Handle redirects
+ if response.status in (301, 302,):
+ print("Following redirect ", response.getheader('location', ''))
+ return http_request(
+ urllib.parse.urljoin(url, response.getheader('location', '')),
+ method,
+ params,
+ headers,
+ body)
+
+ return response
+
+
+def basic_auth_headers(username, password):
+ user_and_pass = b64encode(
+ f"{username}:{password}".encode("ascii")
+ ).decode("ascii")
+
+ return {
+ "Authorization": "Basic {}".format(user_and_pass)
+ }
+
+
+def try_get(name, url, verbose, headers = None, body = None):
+ response = http_request(
+ url,
+ "GET",
+ None,
+ headers,
+ body
+ )
+ success = response.status == 200
+ if not success:
+ print(f"=> Error: {name} is not available")
+
+ if verbose or not success:
+ print(" ", "Status", response.status)
+ print(" ", response.read().decode())
+
+ return success
+
+
+def discover_principal(url, username, password, verbose = False):
+ body = ''
+
+ headers = {
+ "Content-Type": "application/xml; charset=utf-8",
+ "Depth": "infinity",
+ **basic_auth_headers(username, password)
+ }
+
+ response = http_request(
+ f"{url}/principals/{username}/",
+ "PROPFIND",
+ None,
+ headers,
+ body
+ )
+
+ success = response.status == 207
+ if not success:
+ print("=> Error: Caldav is not available")
+
+ if verbose or not success:
+ print(" ", "Status", response.status)
+ print(" ", response.read().decode())
+
+ return success
+
+
+def test_freebusy_authenticated(url, username, password, verbose = False):
+ # Request our own freebusy authenticated
+ return try_get("Authenticated Freebusy", f"{url}/{username}.ifb", verbose, headers = basic_auth_headers(username, password))
+
+
+def test_freebusy_unauthenticated(url, username, password, verbose = False):
+ return try_get("Unauthenticated Freebusy", f"{url}/{username}.ifb", verbose)
+
+
+def test_autoconfig(host, username, password, verbose = False):
+ if not try_get("Autoconf .well-known", f"https://{host}/.well-known/autoconfig/mail/config-v1.1.xml?emailaddress={username}", verbose):
+ return False
+ if not try_get("Autoconf /mail", f"https://{host}/mail/config-v1.1.xml?emailaddress={username}", verbose):
+ return False
+
+# TODO
+# def test_007_well_known_outlook():
+# body = '''
+#
+# admin@example.local
+#
+# http://schemas.microsoft.com/exchange/autodiscover/outlook/responseschema/2006a
+#
+#
+# '''
+
+# headers = {
+# "Content-Type": "text/xml; charset=utf-8"
+# }
+# response = http_post(
+# "https://kolab-vanilla.{}.local/autodiscover/autodiscover.xml".format(hostname),
+# None,
+# headers,
+# body
+# )
+# assert response.status == 200
+# data = response.read()
+# decoded = codecs.decode(data)
+# # Sanity check of the data
+# assert 'example.local' in decoded
+# assert "admin@example.local" in decoded
+
+# # Ensure the alternative urls also work
+# assert http_post(
+# "https://kolab-vanilla.{}.local/Autodiscover/Autodiscover.xml".format(hostname),
+# None,
+# headers,
+# body
+# ).status == 200
+
+# assert http_post(
+# "https://kolab-vanilla.{}.local/AutoDiscover/AutoDiscover.xml".format(hostname),
+# None,
+# headers,
+# body
+# ).status == 200
+
+
+def test_autodiscover_activesync(host, username, password, verbose = False):
+ """
+ We expect something along the lines of
+
+
+
+
+
+ User Name
+ user@example.com
+
+
+
+
+ MobileSync
+ https://kolab.example.com/Microsoft-Server-ActiveSync
+ https://kolab.example.com/Microsoft-Server-ActiveSync
+
+
+
+
+
+ """
+
+ body = f'''
+
+ {username}
+
+ http://schemas.microsoft.com/exchange/autodiscover/mobilesync/responseschema/2006
+
+
+'''
+
+ headers = {
+ "Content-Type": "text/xml; charset=utf-8",
+ **basic_auth_headers(username, password)
+ }
+
+ response = http_request(
+ f"https://{host}/autodiscover/autodiscover.xml",
+ "POST",
+ None,
+ headers,
+ body
+ )
+
+ success = response.status == 200
+ data = response.read().decode()
+ if success:
+ try:
+ # Sanity check of the data
+ assert "MobileSync" in data
+ assert f"https://{host}/Microsoft-Server-ActiveSync" in data
+ assert username in data
+ except AssertionError:
+ print_assertion_failure()
+ success = False
+
+ if not success:
+ print("=> Error: Activesync autodiscover is not available")
+
+ if verbose or not success:
+ print(" ", "Status", response.status)
+ print(" ", data)
+
+ return success
+
+
+def test_activesync(host, username, password, verbose = False):
+ headers = {
+ "Host": host,
+ **basic_auth_headers(username, password)
+ }
+
+ response = http_request(
+ f"https://{host}/Microsoft-Server-ActiveSync",
+ "OPTIONS",
+ None,
+ headers,
+ None
+ )
+
+ success = response.status == 200
+ data = response.read().decode()
+ if success:
+ try:
+ assert response.getheader('MS-Server-ActiveSync', '')
+ assert '14.1' in response.getheader('MS-ASProtocolVersions', '')
+ assert 'FolderSync' in response.getheader('MS-ASProtocolCommands', '')
+ except AssertionError:
+ print_assertion_failure()
+ success = False
+
+ if not success:
+ print("=> Error: Activesync is not available")
+
+ if verbose or not success:
+ print(" ", "Status", response.status)
+ print(" ", data)
+
+ return success
+
+
+def test_dns(host, verbose = False):
+ success = True
+ try:
+ answers = dns.resolver.resolve(host, 'MX')
+ for rdata in answers:
+ print(' MX Host', rdata.exchange, 'has preference', rdata.preference)
+ except dns.resolver.NXDOMAIN:
+ success = False
+ print(" ERROR on MX record")
+ except dns.resolver.NoAnswer:
+ success = False
+ print(" ERROR on MX record")
+
+ try:
+ answers = dns.resolver.resolve(f"autodiscover.{host}", 'CNAME')
+ for rdata in answers:
+ print(' autodiscover CNAME', rdata.target)
+ except dns.resolver.NXDOMAIN:
+ success = False
+ print(" ERROR on autodiscover. CNAME entry")
+ except dns.resolver.NoAnswer:
+ success = False
+ print(" ERROR on autodiscover. CNAME entry")
+
+ srv_records = [
+ f"_autodiscover._tcp.{host}",
+ f"_caldav._tcp.{host}",
+ f"_caldavs._tcp.{host}",
+ f"_carddav._tcp.{host}",
+ f"_carddavs._tcp.{host}",
+ f"_imap._tcp.{host}",
+ f"_imaps._tcp.{host}",
+ f"_sieve._tcp.{host}",
+ f"_submission._tcp.{host}",
+ f"_webdav._tcp.{host}",
+ f"_webdavs._tcp.{host}",
+ ]
+
+ for record in srv_records:
+ try:
+ answers = dns.resolver.resolve(record, 'SRV')
+ for rdata in answers:
+ print(" ", record, rdata.target)
+ except dns.resolver.NXDOMAIN:
+ success = False
+ print(" ERROR on record", record)
+ except dns.resolver.NoAnswer:
+ success = False
+ print(" ERROR on record", record)
+
+ if not success:
+ print(f"=> Error: Dns entires on {host} not available")
+
+ return success
+
+
+def test_email_dns(host, verbose = False):
+ success = True
+
+ srv_records = [
+ f"_autodiscover._tcp.{host}"
+ ]
+
+ for record in srv_records:
+ try:
+ answers = dns.resolver.resolve(record, 'SRV')
+ for rdata in answers:
+ print(" ", record, rdata.target)
+ except dns.resolver.NXDOMAIN:
+ success = False
+ print(" ERROR on record", record)
+ except dns.resolver.NoAnswer:
+ success = False
+ print(" ERROR on record", record)
+
+ if not success:
+ print(f"=> Error: Dns entires on {host} not available")
+
+ return success
+
+def test_certificates(host, davhost, imaphost, verbose):
+ success = True
+ hosts = [
+ (host, 443),
+ ]
+
+ if davhost:
+ hosts.append((urllib.parse.urlparse(davhost).netloc, 443))
+ if imaphost:
+ hosts.append((imaphost, 993))
+ hosts.append((imaphost, 587))
+
+ context = ssl.create_default_context()
+
+ for hosttuple in hosts:
+ hostname, _port = hosttuple
+ try:
+ conn = context.wrap_socket(socket.socket(socket.AF_INET), server_hostname=hostname)
+ conn.connect(hosttuple)
+ cert = conn.getpeercert()
+ if verbose:
+ print(cert)
+ except OSError as err:
+ print(" ERROR on peer", hosttuple, err)
+ success = False
+
+ if not success:
+ print("=> Error: Not all certificates are valid")
+
+ return success
+
+
+def main():
+ parser = argparse.ArgumentParser()
+ parser.add_argument("--host", help="Host")
+ parser.add_argument("--username", help="Username")
+ parser.add_argument("--password", help="User password")
+ parser.add_argument("--imap", help="IMAP URI")
+ parser.add_argument("--dav", help="DAV URI")
+ parser.add_argument("--fb", help="Freebusy url as displayed in roundcube")
+ parser.add_argument("--verbose", action='store_true', help="Verbose output")
+ options = parser.parse_args()
+
+ if options.dav:
+ if discover_principal(options.dav, options.username, options.password, options.verbose):
+ print("=> Caldav is available")
+
+ if discover_principal("https://" + options.host + "/.well-known/caldav", options.username, options.password, options.verbose):
+ print("=> Caldav on .well-known/caldav is available")
+
+ if test_autoconfig(options.host, options.username, options.password, options.verbose):
+ print("=> Autoconf available")
+
+ if test_autodiscover_activesync(options.host, options.username, options.password, options.verbose):
+ print("=> Activesync Autodsicovery available")
+
+ if test_activesync(options.host, options.username, options.password, options.verbose):
+ print("=> Activesync available")
+
+ if options.fb and test_freebusy_authenticated(options.fb, options.username, options.password, options.verbose):
+ print("=> Authenticated Freebusy is available")
+
+ # We rely on the activesync test to have generated the token for unauthenticated access.
+ if options.fb and test_freebusy_unauthenticated(options.fb, options.username, options.password, options.verbose):
+ print("=> Unauthenticated Freebusy is available")
+
+ if test_dns(options.host, options.verbose):
+ print(f"=> DNS entries on {options.host} available")
+
+ userhost = options.username.split('@')[1]
+ if test_email_dns(userhost, options.verbose):
+ print(f"=> DNS entries on {userhost} available")
+
+ if test_certificates(options.host, options.dav, options.imap, options.verbose):
+ print("=> All certificates are valid")
+
+
+if __name__ == "__main__":
+ main()