diff --git a/utils/activesynccli.py b/utils/activesynccli.py new file mode 100755 index 00000000..3173ec09 --- /dev/null +++ b/utils/activesynccli.py @@ -0,0 +1,344 @@ +#!/bin/env python3 + +""" +activesynccli.py + --host apps.kolabnow.com + --devicetype WindowsOutlook15 + --deviceid windowsascli + --user user@kolab.org --password Secret + --verbose + list --folder INBOX + +# Dependencies + + dnf install libwbxml-devel + pip install --global-option=build_ext --global-option="-I/usr/include/libwbxml-1.0/wbxml/" git+https://github.com/Apheleia-IT/python-wbxml#egg=wbxml + +""" + +import argparse +import base64 +import http.client +import urllib.parse +import struct +import xml.etree.ElementTree as ET +import ssl +import wbxml + + +def decode_timezone(tz): + decoded = base64.b64decode(tz) + bias, standardName, standardDate, standardBias, daylightName, daylightDate, daylightBias = struct.unpack('i64s16si64s16si', decoded) + print(f" TimeZone bias: {bias}min") + print(f" Standard Name: {standardName.decode()}") + year, month, day, week, hour, minute, second, millis = struct.unpack('hhhhhhhh', standardDate) + print(f" Standard Date: Year: {year} Month: {month} Day: {day} Week: {week} Hour: {hour} Minute: {minute} Second: {second} Millisecond: {millis}") + print(f" Daylight Name: {daylightName.decode()}") + year, month, day, week, hour, minute, second, millis = struct.unpack('hhhhhhhh', daylightDate) + print(f" Daylight Date: Year: {year} Month: {month} Day: {day} Week: {week} Hour: {hour} Minute: {minute} Second: {second} Millisecond: {millis}") + print(f" Daylight Bias: {daylightBias}min") + print() + + +def http_request(url, method, params=None, headers=None, body=None): + """ + Perform an HTTP request. + """ + + # print(url) + parsed_url = urllib.parse.urlparse(url) + # print("Connecting to ", parsed_url.netloc) + if url.startswith('https://'): + conn = http.client.HTTPSConnection(parsed_url.netloc, 443, context = ssl._create_unverified_context()) + else: + conn = http.client.HTTPConnection(parsed_url.netloc, 80) + + if params is None: + params = {} + + if headers is None: + headers = { + "Content-Type": "application/x-www-form-urlencoded; charset=utf-8" + } + + if body is None: + body = urllib.parse.urlencode(params) + + # print("Requesting", parsed_url.geturl(), "From", parsed_url.netloc) + conn.request(method, parsed_url.geturl(), body, headers) + response = conn.getresponse() + + # Handle redirects + if response.status in (301, 302,): + # print("Following redirect ", response.getheader('location', '')) + return http_request( + urllib.parse.urljoin(url, response.getheader('location', '')), + method, + params, + headers, + body) + + if not response.status == 200: + print(" ", "Status", response.status) + print(" ", response.read().decode()) + + return response + + +def basic_auth_headers(username, password): + user_and_pass = base64.b64encode( + f"{username}:{password}".encode("ascii") + ).decode("ascii") + + return { + "Authorization": "Basic {}".format(user_and_pass) + } + + +def try_get(name, url, verbose, headers = None, body = None): + response = http_request( + url, + "GET", + None, + headers, + body + ) + success = response.status == 200 + if not success: + print(f"=> Error: {name} is not available") + + if verbose or not success: + print(" ", "Status", response.status) + print(" ", response.read().decode()) + + return success + + +class ActiveSync: + def __init__(self, options): + self.host = options.host + self.username = options.username + self.password = options.password + self.verbose = options.verbose + + if options.deviceid: + self.deviceid = options.deviceid + else: + self.deviceid = 'v140Device' + + if options.devicetype: + self.devicetype = options.devicetype + else: + self.devicetype = 'iphone' + + if hasattr(options, 'folder') and options.folder: + self.folder = options.folder + else: + self.folder = None + + + def send_request(self, command, request, extra_args = None): + body = wbxml.xml_to_wbxml(request) + + headers = { + "Host": self.host, + **basic_auth_headers(self.username, self.password) + } + + headers.update( + { + "Content-Type": "application/vnd.ms-sync.wbxml", + 'MS-ASProtocolVersion': "14.0", + } + ) + + if extra_args is None: + extra_args = "" + + return http_request( + f"https://{self.host}/Microsoft-Server-ActiveSync?Cmd={command}&User={self.username}&DeviceId={self.deviceid}&DeviceType={self.devicetype}{extra_args}", + "POST", + None, + headers, + body + ) + + + def check(self): + headers = { + "Host": self.host, + **basic_auth_headers(self.username, self.password) + } + + response = http_request( + f"https://{self.host}/Microsoft-Server-ActiveSync", + "OPTIONS", + None, + headers, + None + ) + + success = response.status == 200 + data = response.read().decode() + if not success: + print("=> Error: Activesync is not available") + else: + # Sanity check of the data + assert response.getheader('MS-Server-ActiveSync', '') + assert '14.1' in response.getheader('MS-ASProtocolVersions', '') + assert 'FolderSync' in response.getheader('MS-ASProtocolCommands', '') + + if self.verbose or not success: + print(" ", "Status", response.status) + print(" ", data) + + return success + + + def fetch(self, collection_id, sync_key = 0): + request = """ + + + + + + {sync_key} + {collection_id} + 0 + 0 + 512 + + 0 + 2 + 8 + + 4 + 1 + + + + + 512 + + """.replace(' ', '').replace('\n', '') + + response = self.send_request('Sync', request.format(collection_id=collection_id, sync_key=sync_key)) + + assert response.status == 200 + + result = wbxml.wbxml_to_xml(response.read()) + + if self.verbose: + print(result) + + root = ET.fromstring(result) + xmlns = "http://synce.org/formats/airsync_wm5/airsync" + sync_key = root.find(f".//{{{xmlns}}}SyncKey").text + more_available = (len(root.findall(f".//{{{xmlns}}}MoreAvailable")) == 1) + if self.verbose: + print("Current SyncKey:", sync_key) + + for add in root.findall(f".//{{{xmlns}}}Add"): + serverId = add.find(f"{{{xmlns}}}ServerId").text + print(" ServerId", serverId) + applicationData = add.find(f"{{{xmlns}}}ApplicationData") + + calxmlns = "http://synce.org/formats/airsync_wm5/calendar" + subject = applicationData.find(f"{{{calxmlns}}}Subject") + if subject is not None: + print(" Subject", subject.text) + startTime = applicationData.find(f"{{{calxmlns}}}StartTime") + if startTime is not None: + print(" StartTime", startTime.text) + timeZone = applicationData.find(f"{{{calxmlns}}}TimeZone") + if timeZone is not None: + decode_timezone(timeZone.text) + #the dates are encoded like so: vstdyear/vstdmonth/vstdday/vstdweek/vstdhour/vstdminute/vstdsecond/vstdmillis + decoded = base64.b64decode(timeZone.text) + bias, standardName, standardDate, standardBias, daylightName, daylightDate, daylightBias = struct.unpack('i64s16si64s16si', decoded) + print(f" TimeZone bias: {bias}min") + print("") + + + print("\n") + + # Fetch after the initial sync + if sync_key == "1": + print("after initial sync") + self.fetch(collection_id, sync_key) + + # Fetch more + if more_available: + print("more available") + print(root.findall(f".//{{{xmlns}}}MoreAvailable")) + self.fetch(collection_id, sync_key) + + + + def list(self): + request = """ + + + + 0 + + """.replace(' ', '').replace('\n', '') + + response = self.send_request('FolderSync', request) + + assert response.status == 200 + + result = wbxml.wbxml_to_xml(response.read()) + + if self.verbose: + print(result) + + root = ET.fromstring(result) + xmlns = "http://synce.org/formats/airsync_wm5/folderhierarchy" + sync_key = root.find(f".//{{{xmlns}}}SyncKey").text + if self.verbose: + print("Current SyncKey:", sync_key) + + for add in root.findall(f".//{{{xmlns}}}Add"): + displayName = add.find(f"{{{xmlns}}}DisplayName").text + serverId = add.find(f"{{{xmlns}}}ServerId").text + print("ServerId", serverId) + print("DisplayName", displayName) + + if self.folder and displayName == self.folder: + self.fetch(serverId) + + + +def main(): + parser = argparse.ArgumentParser() + + parser.add_argument("--host", help="Host") + parser.add_argument("--user", help="Username") + parser.add_argument("--password", help="User password") + parser.add_argument("--verbose", action='store_true', help="Verbose output") + parser.add_argument("--deviceid", help="Device identifier ") + parser.add_argument("--devicetype", help="devicetype (WindowsOutlook15, iphone)") + + subparsers = parser.add_subparsers() + + parser_list = subparsers.add_parser('decode_timezone') + parser_list.add_argument("timezone", help="Base64 encoded timezone string ('Lv///0lyYW....///w==') ") + parser_list.set_defaults(func=lambda args: decode_timezone(args.timezone)) + + parser_list = subparsers.add_parser('list') + parser_list.add_argument("--folder", help="Folder") + parser_list.set_defaults(func=lambda args: ActiveSync(args).list()) + + parser_check = subparsers.add_parser('check') + parser_check.set_defaults(func=lambda args: ActiveSync(args).check()) + + options = parser.parse_args() + + if 'func' in options: + options.func(options) + + + +if __name__ == "__main__": + main() diff --git a/utils/generatemail.py b/utils/generatemail.py new file mode 100755 index 00000000..23945817 --- /dev/null +++ b/utils/generatemail.py @@ -0,0 +1,97 @@ +#!/bin/env python3 +""" + Generate a bunch of dummy messages in a folder. +""" +# import glob +# import os +from datetime import datetime, timedelta +import random +import argparse + + +mailtemplate = ''' +Return-Path: +Received: from imapb010.mykolab.com ([unix socket]) + by imapb010.mykolab.com (Cyrus 2.5.10-49-g2e214b4-Kolab-2.5.10-8.1.el7.kolab_14) with LMTPA; + Wed, 09 Aug 2017 18:37:01 +0200 +X-Sieve: CMU Sieve 2.4 +Received: from int-mx002.mykolab.com (unknown [10.9.13.2]) + by imapb010.mykolab.com (Postfix) with ESMTPS id 0A93910A25047 + for ; Wed, 9 Aug 2017 18:37:01 +0200 (CEST) +Received: from int-subm002.mykolab.com (unknown [10.9.37.2]) + by int-mx002.mykolab.com (Postfix) with ESMTPS id EC06AF6E + for ; Wed, 9 Aug 2017 18:37:00 +0200 (CEST) +MIME-Version: 1.0 +Content-Type: multipart/mixed; + boundary="=_291b8e96564265636432c6d494e02322" +Date: {date} +From: "Mollekopf, Christian" +To: christian@example.ch +Subject: {subject} +Message-ID: {messageid} + +--=_291b8e96564265636432c6d494e02322 +Content-Type: multipart/alternative; + boundary="=_ceff0fd19756f45ed1295ee2069ff8e0" + +--=_ceff0fd19756f45ed1295ee2069ff8e0 +Content-Transfer-Encoding: 7bit +Content-Type: text/plain; charset=US-ASCII + +sdlkjsdjf +--=_ceff0fd19756f45ed1295ee2069ff8e0 +Content-Transfer-Encoding: quoted-printable +Content-Type: text/html; charset=UTF-8 + + +

sdlkjsdjf

+ + + +--=_ceff0fd19756f45ed1295ee2069ff8e0-- + +--=_291b8e96564265636432c6d494e02322 +Content-Transfer-Encoding: base64 +Content-Type: text/plain; + name=xorg.conf +Content-Disposition: attachment; + filename=xorg.conf; + size=211 + +U2VjdGlvbiAiRGV2aWNlIgogICAgSWRlbnRpZmllciAgICAgIkRldmljZTAiCiAgICBEcml2ZXIg +{attachment}ICAgIEJvYXJkTmFtZSAgICAgICJOVlMgNDIwME0iCiAgICBPcHRpb24gIk5vTG9nbyIgInRydWUi +CiAgICBPcHRpb24gIlVzZUVESUQiICJ0cnVlIgpFbmRTZWN0aW9uCg== +--=_291b8e96564265636432c6d494e02322-- +'''.strip() + + +def populatemailbox(target_directory, count): + dtstamp = datetime.utcnow() + + # Reproducible results + random.seed(30) + + for i in range(1, count + 1): + dtstamp = dtstamp - timedelta(seconds=600) + + attachmentMultiplier = 50000 * random.randint(0, 10) # Approx 20 MB + result = mailtemplate.format( + messageid="".format(i), + subject="Foobar {}".format(i), + date=dtstamp.strftime("%a, %d %b %Y %H:%M:%S %z"), + attachment='ICAgIEJvYXJkTmFtZSAgICAgICJOVlMgNDIwME0iCiAgICBPcHRpb24gIk5vTG9nbyIgInRydWUi\n' * attachmentMultiplier + ) + fname = "{}/{}.".format(target_directory, i) + with open(fname, 'wb') as f: + f.write(result.encode()) + + +parser = argparse.ArgumentParser(description='Generate some mail.') +parser.add_argument('target_directory', help='the target directory') +parser.add_argument('--count', help='Number of emails to generate', type=int) + +args = parser.parse_args() + +populatemailbox(args.target_directory, args.count) diff --git a/utils/kolabendpointtester.py b/utils/kolabendpointtester.py new file mode 100755 index 00000000..28075a9b --- /dev/null +++ b/utils/kolabendpointtester.py @@ -0,0 +1,452 @@ +#!/bin/env python3 + +""" +kolabendpointtester.py + --host apps.kolabnow.com + --user user@kolab.org + --password Secret + --dav https://apps.kolabnow.com + --fb https://apps.kolabnow.com/calendars/user@kolab.org/6f552d35-95c4-41f6-a7d2-cfd02dd867db +""" + +import sys +import traceback +import socket +import ssl +import argparse +from base64 import b64encode +import http.client +import urllib.parse +import dns.resolver + +SSLNOVERIFY = False + +def print_assertion_failure(): + """ + Print an error message about a failed assertion + """ + _, _, trace = sys.exc_info() + tb_info = traceback.extract_tb(trace) + _filename, line, _func, text = tb_info[-1] + print(f" ERROR assertion on line {line} failed on {text}") + + +def http_request(url, method, params=None, headers=None, body=None): + """ + Perform an HTTP request. + """ + + parsed_url = urllib.parse.urlparse(url) + # print("Connecting to ", parsed_url.netloc) + if url.startswith('https://'): + conn = http.client.HTTPSConnection(parsed_url.netloc, 443, context = (ssl._create_unverified_context() if SSLNOVERIFY else None)) + else: + conn = http.client.HTTPConnection(parsed_url.netloc, 80) + + if params is None: + params = {} + + if headers is None: + headers = { + "Content-Type": "application/x-www-form-urlencoded; charset=utf-8" + } + + if body is None: + body = urllib.parse.urlencode(params) + + print("Requesting", parsed_url.path, "From", parsed_url.netloc) + conn.request(method, parsed_url.geturl(), body, headers) + response = conn.getresponse() + + # Handle redirects + if response.status in (301, 302,): + print("Following redirect ", response.getheader('location', '')) + return http_request( + urllib.parse.urljoin(url, response.getheader('location', '')), + method, + params, + headers, + body) + + return response + + +def basic_auth_headers(username, password): + user_and_pass = b64encode( + f"{username}:{password}".encode("ascii") + ).decode("ascii") + + return { + "Authorization": "Basic {}".format(user_and_pass) + } + + +def try_get(name, url, verbose, headers = None, body = None): + response = http_request( + url, + "GET", + None, + headers, + body + ) + success = response.status == 200 + if not success: + print(f"=> Error: {name} is not available") + + if verbose or not success: + print(" ", "Status", response.status) + print(" ", response.read().decode()) + + return success + + +def discover_principal(url, username, password, verbose = False): + body = '' + + headers = { + "Content-Type": "application/xml; charset=utf-8", + "Depth": "infinity", + **basic_auth_headers(username, password) + } + + response = http_request( + f"{url}/principals/{username}/", + "PROPFIND", + None, + headers, + body + ) + + success = response.status == 207 + if not success: + print("=> Error: Caldav is not available") + + if verbose or not success: + print(" ", "Status", response.status) + print(" ", response.read().decode()) + + return success + + +def test_freebusy_authenticated(url, username, password, verbose = False): + # Request our own freebusy authenticated + return try_get("Authenticated Freebusy", f"{url}/{username}.ifb", verbose, headers = basic_auth_headers(username, password)) + + +def test_freebusy_unauthenticated(url, username, password, verbose = False): + return try_get("Unauthenticated Freebusy", f"{url}/{username}.ifb", verbose) + + +def test_autoconfig(host, username, password, verbose = False): + if not try_get("Autoconf .well-known", f"https://{host}/.well-known/autoconfig/mail/config-v1.1.xml?emailaddress={username}", verbose): + return False + if not try_get("Autoconf /mail", f"https://{host}/mail/config-v1.1.xml?emailaddress={username}", verbose): + return False + +# TODO +# def test_007_well_known_outlook(): +# body = ''' +# +# admin@example.local +# +# http://schemas.microsoft.com/exchange/autodiscover/outlook/responseschema/2006a +# +# +# ''' + +# headers = { +# "Content-Type": "text/xml; charset=utf-8" +# } +# response = http_post( +# "https://kolab-vanilla.{}.local/autodiscover/autodiscover.xml".format(hostname), +# None, +# headers, +# body +# ) +# assert response.status == 200 +# data = response.read() +# decoded = codecs.decode(data) +# # Sanity check of the data +# assert 'example.local' in decoded +# assert "admin@example.local" in decoded + +# # Ensure the alternative urls also work +# assert http_post( +# "https://kolab-vanilla.{}.local/Autodiscover/Autodiscover.xml".format(hostname), +# None, +# headers, +# body +# ).status == 200 + +# assert http_post( +# "https://kolab-vanilla.{}.local/AutoDiscover/AutoDiscover.xml".format(hostname), +# None, +# headers, +# body +# ).status == 200 + + +def test_autodiscover_activesync(host, username, password, verbose = False): + """ + We expect something along the lines of + + + + + + User Name + user@example.com + + + + + MobileSync + https://kolab.example.com/Microsoft-Server-ActiveSync + https://kolab.example.com/Microsoft-Server-ActiveSync + + + + + + """ + + body = f''' + + {username} + + http://schemas.microsoft.com/exchange/autodiscover/mobilesync/responseschema/2006 + + +''' + + headers = { + "Content-Type": "text/xml; charset=utf-8", + **basic_auth_headers(username, password) + } + + response = http_request( + f"https://{host}/autodiscover/autodiscover.xml", + "POST", + None, + headers, + body + ) + + success = response.status == 200 + data = response.read().decode() + if success: + try: + # Sanity check of the data + assert "MobileSync" in data + assert f"https://{host}/Microsoft-Server-ActiveSync" in data + assert username in data + except AssertionError: + print_assertion_failure() + success = False + + if not success: + print("=> Error: Activesync autodiscover is not available") + + if verbose or not success: + print(" ", "Status", response.status) + print(" ", data) + + return success + + +def test_activesync(host, username, password, verbose = False): + headers = { + "Host": host, + **basic_auth_headers(username, password) + } + + response = http_request( + f"https://{host}/Microsoft-Server-ActiveSync", + "OPTIONS", + None, + headers, + None + ) + + success = response.status == 200 + data = response.read().decode() + if success: + try: + assert response.getheader('MS-Server-ActiveSync', '') + assert '14.1' in response.getheader('MS-ASProtocolVersions', '') + assert 'FolderSync' in response.getheader('MS-ASProtocolCommands', '') + except AssertionError: + print_assertion_failure() + success = False + + if not success: + print("=> Error: Activesync is not available") + + if verbose or not success: + print(" ", "Status", response.status) + print(" ", data) + + return success + + +def test_dns(host, verbose = False): + success = True + try: + answers = dns.resolver.resolve(host, 'MX') + for rdata in answers: + print(' MX Host', rdata.exchange, 'has preference', rdata.preference) + except dns.resolver.NXDOMAIN: + success = False + print(" ERROR on MX record") + except dns.resolver.NoAnswer: + success = False + print(" ERROR on MX record") + + try: + answers = dns.resolver.resolve(f"autodiscover.{host}", 'CNAME') + for rdata in answers: + print(' autodiscover CNAME', rdata.target) + except dns.resolver.NXDOMAIN: + success = False + print(" ERROR on autodiscover. CNAME entry") + except dns.resolver.NoAnswer: + success = False + print(" ERROR on autodiscover. CNAME entry") + + srv_records = [ + f"_autodiscover._tcp.{host}", + f"_caldav._tcp.{host}", + f"_caldavs._tcp.{host}", + f"_carddav._tcp.{host}", + f"_carddavs._tcp.{host}", + f"_imap._tcp.{host}", + f"_imaps._tcp.{host}", + f"_sieve._tcp.{host}", + f"_submission._tcp.{host}", + f"_webdav._tcp.{host}", + f"_webdavs._tcp.{host}", + ] + + for record in srv_records: + try: + answers = dns.resolver.resolve(record, 'SRV') + for rdata in answers: + print(" ", record, rdata.target) + except dns.resolver.NXDOMAIN: + success = False + print(" ERROR on record", record) + except dns.resolver.NoAnswer: + success = False + print(" ERROR on record", record) + + if not success: + print(f"=> Error: Dns entires on {host} not available") + + return success + + +def test_email_dns(host, verbose = False): + success = True + + srv_records = [ + f"_autodiscover._tcp.{host}" + ] + + for record in srv_records: + try: + answers = dns.resolver.resolve(record, 'SRV') + for rdata in answers: + print(" ", record, rdata.target) + except dns.resolver.NXDOMAIN: + success = False + print(" ERROR on record", record) + except dns.resolver.NoAnswer: + success = False + print(" ERROR on record", record) + + if not success: + print(f"=> Error: Dns entires on {host} not available") + + return success + +def test_certificates(host, davhost, imaphost, verbose): + success = True + hosts = [ + (host, 443), + ] + + if davhost: + hosts.append((urllib.parse.urlparse(davhost).netloc, 443)) + if imaphost: + hosts.append((imaphost, 993)) + hosts.append((imaphost, 587)) + + context = ssl.create_default_context() + + for hosttuple in hosts: + hostname, _port = hosttuple + try: + conn = context.wrap_socket(socket.socket(socket.AF_INET), server_hostname=hostname) + conn.connect(hosttuple) + cert = conn.getpeercert() + if verbose: + print(cert) + except OSError as err: + print(" ERROR on peer", hosttuple, err) + success = False + + if not success: + print("=> Error: Not all certificates are valid") + + return success + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--host", help="Host") + parser.add_argument("--username", help="Username") + parser.add_argument("--password", help="User password") + parser.add_argument("--imap", help="IMAP URI") + parser.add_argument("--dav", help="DAV URI") + parser.add_argument("--fb", help="Freebusy url as displayed in roundcube") + parser.add_argument("--verbose", action='store_true', help="Verbose output") + options = parser.parse_args() + + if options.dav: + if discover_principal(options.dav, options.username, options.password, options.verbose): + print("=> Caldav is available") + + if discover_principal("https://" + options.host + "/.well-known/caldav", options.username, options.password, options.verbose): + print("=> Caldav on .well-known/caldav is available") + + if test_autoconfig(options.host, options.username, options.password, options.verbose): + print("=> Autoconf available") + + if test_autodiscover_activesync(options.host, options.username, options.password, options.verbose): + print("=> Activesync Autodsicovery available") + + if test_activesync(options.host, options.username, options.password, options.verbose): + print("=> Activesync available") + + if options.fb and test_freebusy_authenticated(options.fb, options.username, options.password, options.verbose): + print("=> Authenticated Freebusy is available") + + # We rely on the activesync test to have generated the token for unauthenticated access. + if options.fb and test_freebusy_unauthenticated(options.fb, options.username, options.password, options.verbose): + print("=> Unauthenticated Freebusy is available") + + if test_dns(options.host, options.verbose): + print(f"=> DNS entries on {options.host} available") + + userhost = options.username.split('@')[1] + if test_email_dns(userhost, options.verbose): + print(f"=> DNS entries on {userhost} available") + + if test_certificates(options.host, options.dav, options.imap, options.verbose): + print("=> All certificates are valid") + + +if __name__ == "__main__": + main()