diff --git a/pykolab/itip/__init__.py b/pykolab/itip/__init__.py index eda4f04..4999fad 100644 --- a/pykolab/itip/__init__.py +++ b/pykolab/itip/__init__.py @@ -1,335 +1,363 @@ import icalendar import pykolab import traceback import kolabformat +import re from pykolab.xml import to_dt from pykolab.xml import event_from_ical from pykolab.xml import todo_from_ical from pykolab.xml import participant_status_label from pykolab.translate import _ +from tzlocal import windows_tz log = pykolab.getLogger('pykolab.wallace') def events_from_message(message, methods=None): return objects_from_message(message, ["VEVENT"], methods) def todos_from_message(message, methods=None): return objects_from_message(message, ["VTODO"], methods) - def objects_from_message(message, objnames, methods=None): """ Obtain the iTip payload from email.message """ # Placeholder for any itip_objects found in the message. itip_objects = [] seen_uids = [] # iTip methods we are actually interested in. Other methods will be ignored. if methods is None: methods = [ "REQUEST", "CANCEL" ] # Are all iTip messages multipart? No! RFC 6047, section 2.4 states "A # MIME body part containing content information that conforms to this # document MUST have (...)" but does not state whether an iTip message must # therefore also be multipart. # Check each part for part in message.walk(): # The iTip part MUST be Content-Type: text/calendar (RFC 6047, section 2.4) # But in real word, other mime-types are used as well if part.get_content_type() in [ "text/calendar", "text/x-vcalendar", "application/ics" ]: if not str(part.get_param('method')).upper() in methods: log.info(_("Method %r not really interesting for us.") % (part.get_param('method'))) continue # Get the itip_payload itip_payload = part.get_payload(decode=True) log.debug(_("Raw iTip payload (%r): %r") % (part.get_param('charset'), itip_payload), level=9) + # Convert unsupported timezones, etc. + itip_payload = _convert_itip_payload(itip_payload) + # Python iCalendar prior to 3.0 uses "from_string". if hasattr(icalendar.Calendar, 'from_ical'): cal = icalendar.Calendar.from_ical(itip_payload) elif hasattr(icalendar.Calendar, 'from_string'): cal = icalendar.Calendar.from_string(itip_payload) # If we can't read it, we're out else: log.error(_("Could not read iTip from message.")) return [] for c in cal.walk(): if c.name in objnames: itip = {} if c['uid'] in seen_uids: log.debug(_("Duplicate iTip object: %s") % (c['uid']), level=9) continue # From the event, take the following properties: # # - method # - uid # - sequence # - start # - end (if any) # - duration (if any) # - organizer # - attendees (if any) # - resources (if any) # itip['type'] = 'task' if c.name == 'VTODO' else 'event' itip['uid'] = str(c['uid']) itip['method'] = str(cal['method']).upper() itip['sequence'] = int(c['sequence']) if c.has_key('sequence') else 0 itip['recurrence-id'] = c['recurrence-id'].dt if c.has_key('recurrence-id') and hasattr(c['recurrence-id'], 'dt') else None if c.has_key('dtstart'): itip['start'] = c['dtstart'].dt elif itip['type'] == 'event': log.error(_("iTip event without a start")) continue if c.has_key('dtend'): itip['end'] = c['dtend'].dt if c.has_key('duration'): itip['duration'] = c['duration'].dt itip['end'] = itip['start'] + c['duration'].dt itip['organizer'] = c['organizer'] itip['attendees'] = c['attendee'] if itip.has_key('attendees') and not isinstance(itip['attendees'], list): itip['attendees'] = [c['attendee']] if c.has_key('resources'): itip['resources'] = c['resources'] itip['raw'] = itip_payload try: # distinguish event and todo here if itip['type'] == 'task': itip['xml'] = todo_from_ical(c, itip_payload) else: itip['xml'] = event_from_ical(c, itip_payload) except Exception, e: log.error("event|todo_from_ical() exception: %r; iCal: %s" % (e, itip_payload)) continue itip_objects.append(itip) seen_uids.append(c['uid']) # end if c.name in objnames # end for c in cal.walk() # end if part.get_content_type() == "text/calendar" # end for part in message.walk() if not len(itip_objects) and not message.is_multipart(): log.debug(_("Message is not an iTip message (non-multipart message)"), level=5) return itip_objects - def check_event_conflict(kolab_event, itip_event): """ Determine whether the given kolab event conflicts with the given itip event """ conflict = False # don't consider conflict with myself if kolab_event.uid == itip_event['uid']: return conflict # don't consider conflict if event has TRANSP:TRANSPARENT if _is_transparent(kolab_event): return conflict _es = to_dt(kolab_event.get_start()) _ee = to_dt(kolab_event.get_ical_dtend()) # use iCal style end date: next day for all-day events _ev = kolab_event _ei = 0 # naive loops to check for collisions in (recurring) events # TODO: compare recurrence rules directly (e.g. matching time slot or weekday or monthday) while not conflict and _es is not None: _is = to_dt(itip_event['start']) _ie = to_dt(itip_event['end']) _iv = itip_event['xml'] _ii = 0 while not conflict and _is is not None: # log.debug("* Comparing event dates at %s/%s with %s/%s" % (_es, _ee, _is, _ie), level=9) conflict = not _is_transparent(_ev) and not _is_transparent(_iv) and check_date_conflict(_es, _ee, _is, _ie) _is = to_dt(itip_event['xml'].get_next_occurence(_is)) if itip_event['xml'].is_recurring() else None _ie = to_dt(itip_event['xml'].get_occurence_end_date(_is)) # get full occurrence to compare the dates from a possible exception if _is is not None and itip_event['xml'].has_exceptions(): _ix = itip_event['xml'].get_instance(_is) if _ix is not None: _is = to_dt(_ix.get_start()) _ie = to_dt(_ix.get_end()) _iv = _ix # iterate through all exceptions (non-recurring) elif _is is None and not itip_event['xml'].is_recurring() and len(itip_event['xml'].get_exceptions()) > _ii: _iv = itip_event['xml'].get_exceptions()[_ii] _is = to_dt(_iv.get_start()) _ie = to_dt(_iv.get_end()) _ii += 1 _es = to_dt(kolab_event.get_next_occurence(_es)) if kolab_event.is_recurring() else None _ee = to_dt(kolab_event.get_occurence_end_date(_es)) # get full instance to compare the dates from a possible exception if _es is not None and kolab_event.has_exceptions(): _ex = kolab_event.get_instance(_es) if _ex is not None: _es = to_dt(_ex.get_start()) _ee = to_dt(_ex.get_end()) _ev = _ex # iterate through all exceptions (non-recurring) elif _es is None and not kolab_event.is_recurring() and len(kolab_event.get_exceptions()) > _ei: _ev = kolab_event.get_exceptions()[_ei] _es = to_dt(_ev.get_start()) _ee = to_dt(_ev.get_end()) _ei += 1 return conflict - def _is_transparent(event): return event.get_transparency() or event.get_status() == kolabformat.StatusCancelled +def _convert_itip_payload(itip): + matchlist = re.findall("^((DTSTART|DTEND|DUE|EXDATE|COMPLETED)[:;][^\n]+)$", itip, re.MULTILINE) + + for match in matchlist: + match = match[0] + search = re.search(";TZID=([^:;]+)", match) + + if search: + tzorig = tzdest = search.group(1).replace('"', '') + + # timezone in Olson-database format, nothing to convert + if re.match("[a-zA-Z]+/[a-zA-Z0-9_+-]+", tzorig): + continue + + # convert timezone from windows format to Olson + if tzorig in windows_tz.win_tz: + tzdest = windows_tz.win_tz[tzorig] + + # @TODO: Should be prefer server time if it has the same offset? + + # replace old with new timezone name + if tzorig != tzdest: + replace = match.replace(search.group(0), ";TZID=" + tzdest) + itip = itip.replace("\n" + match, "\n" + replace) + + return itip def check_date_conflict(_es, _ee, _is, _ie): """ Check the given event start/end dates for conflicts """ conflict = False # TODO: add margin for all-day dates (+13h; -12h) if _es < _is: if _es <= _ie: if _ee <= _is: conflict = False else: conflict = True else: conflict = True elif _es == _is: conflict = True else: # _es > _is if _es < _ie: conflict = True else: conflict = False - + return conflict def send_reply(from_address, itip_events, response_text, subject=None): """ Send the given iCal events as a valid iTip REPLY to the organizer. """ import smtplib conf = pykolab.getConf() smtp = None if isinstance(itip_events, dict): itip_events = [ itip_events ] for itip_event in itip_events: attendee = itip_event['xml'].get_attendee_by_email(from_address) participant_status = itip_event['xml'].get_ical_attendee_participant_status(attendee) log.debug(_("Send iTip reply %s for %s %r") % (participant_status, itip_event['xml'].type, itip_event['xml'].uid), level=8) event_summary = itip_event['xml'].get_summary() message_text = response_text % { 'summary':event_summary, 'status':participant_status_label(participant_status), 'name':attendee.get_name() } if subject is not None: subject = subject % { 'summary':event_summary, 'status':participant_status_label(participant_status), 'name':attendee.get_name() } try: message = itip_event['xml'].to_message_itip(from_address, method="REPLY", participant_status=participant_status, message_text=message_text, subject=subject ) except Exception, e: log.error(_("Failed to compose iTip reply message: %r: %s") % (e, traceback.format_exc())) return smtp = smtplib.SMTP("localhost", 10026) # replies go through wallace again if conf.debuglevel > 8: smtp.set_debuglevel(True) try: smtp.sendmail(message['From'], message['To'], message.as_string()) except Exception, e: log.error(_("SMTP sendmail error: %r") % (e)) if smtp: smtp.quit() def send_request(to_address, itip_events, request_text, subject=None, direct=False): """ Send an iTip REQUEST message from the given iCal events """ import smtplib conf = pykolab.getConf() smtp = None if isinstance(itip_events, dict): itip_events = [ itip_events ] for itip_event in itip_events: event_summary = itip_event['xml'].get_summary() message_text = request_text % { 'summary':event_summary } if subject is not None: subject = subject % { 'summary':event_summary } try: message = itip_event['xml'].to_message_itip(None, method="REQUEST", message_text=message_text, subject=subject ) except Exception, e: log.error(_("Failed to compose iTip request message: %r") % (e)) return port = 10027 if direct else 10026 smtp = smtplib.SMTP("localhost", port) if conf.debuglevel > 8: smtp.set_debuglevel(True) try: smtp.sendmail(message['From'], to_address, message.as_string()) except Exception, e: log.error(_("SMTP sendmail error: %r") % (e)) if smtp: smtp.quit() diff --git a/tests/unit/test-011-itip.py b/tests/unit/test-011-itip.py index 38e00b8..173a26e 100644 --- a/tests/unit/test-011-itip.py +++ b/tests/unit/test-011-itip.py @@ -1,541 +1,546 @@ # -*- coding: utf-8 -*- import pykolab import datetime import pytz import kolabformat from pykolab import itip from pykolab.xml import Event from pykolab.xml import participant_status_label from pykolab.translate import _ from icalendar import Calendar from email import message from email import message_from_string from wallace import module_resources from twisted.trial import unittest # define some iTip MIME messages itip_multipart = """MIME-Version: 1.0 Content-Type: multipart/mixed; boundary="=_c8894dbdb8baeedacae836230e3436fd" From: "Doe, John" Date: Fri, 13 Jul 2012 13:54:14 +0100 Message-ID: <240fe7ae7e139129e9eb95213c1016d7@example.org> User-Agent: Roundcube Webmail/0.9-0.3.el6.kolab_3.0 To: resource-collection-car@example.org Subject: "test" has been updated --=_c8894dbdb8baeedacae836230e3436fd Content-Type: text/plain; charset=UTF-8; format=flowed Content-Transfer-Encoding: quoted-printable *test* --=_c8894dbdb8baeedacae836230e3436fd Content-Type: text/calendar; charset=UTF-8; method=REQUEST; name=event.ics Content-Disposition: attachment; filename=event.ics Content-Transfer-Encoding: quoted-printable BEGIN:VCALENDAR VERSION:2.0 PRODID:-//Roundcube Webmail 0.9-0.3.el6.kolab_3.0//NONSGML Calendar//EN CALSCALE:GREGORIAN METHOD:REQUEST BEGIN:VEVENT UID:626421779C777FBE9C9B85A80D04DDFA-A4BF5BBB9FEAA271 DTSTAMP:20120713T1254140 DTSTART;TZID=3DEurope/London:20120713T100000 DTEND;TZID=3DEurope/London:20120713T110000 SUMMARY:test DESCRIPTION:test ORGANIZER;CN=3D"Doe, John":mailto:john.doe@example.org ATTENDEE;ROLE=3DREQ-PARTICIPANT;PARTSTAT=3DNEEDS-ACTION;RSVP=3DTRUE:mailt= o:resource-collection-car@example.org ATTENDEE;ROLE=3DOPT-PARTICIPANT;PARTSTAT=3DNEEDS-ACTION;RSVP=3DTRUE:mailto:anoth= er-resource@example.org TRANSP:OPAQUE END:VEVENT END:VCALENDAR --=_c8894dbdb8baeedacae836230e3436fd-- """ itip_non_multipart = """Return-Path: Sender: john.doe@example.org Content-Type: text/calendar; method=REQUEST; charset=UTF-8 Content-Transfer-Encoding: quoted-printable To: resource-collection-car@example.org From: john.doe@example.org Date: Mon, 24 Feb 2014 11:27:28 +0100 Message-ID: <1a3aa8995e83dd24cf9247e538ac913a@example.org> Subject: test BEGIN:VCALENDAR VERSION:2.0 PRODID:-//Roundcube Webmail 0.9-0.3.el6.kolab_3.0//NONSGML Calendar//EN CALSCALE:GREGORIAN METHOD:REQUEST BEGIN:VEVENT UID:626421779C777FBE9C9B85A80D04DDFA-A4BF5BBB9FEAA271 DTSTAMP:20120713T125414Z DTSTART;TZID=3DEurope/London:20120713T100000 DTEND;TZID=3DEurope/London:20120713T110000 SUMMARY:test DESCRIPTION:test ORGANIZER;CN=3D"Doe, John":mailto:john.doe@example.org ATTENDEE;ROLE=3DREQ-PARTICIPANT;PARTSTAT=3DACCEPTED;RSVP=3DTRUE:mailt= o:resource-collection-car@example.org TRANSP:OPAQUE END:VEVENT END:VCALENDAR """ itip_google_multipart = """MIME-Version: 1.0 Message-ID: <001a11c2ad84243e0604f3246bae@google.com> Date: Mon, 24 Feb 2014 10:27:28 +0000 Subject: =?ISO-8859-1?Q?Invitation=3A_iTip_from_Apple_=40_Mon_Feb_24=2C_2014_12pm_?= =?ISO-8859-1?Q?=2D_1pm_=28Tom_=26_T=E4m=29?= From: "john.doe" To: Content-Type: multipart/mixed; boundary=001a11c2ad84243df004f3246bad --001a11c2ad84243df004f3246bad Content-Type: multipart/alternative; boundary=001a11c2ad84243dec04f3246bab --001a11c2ad84243dec04f3246bab Content-Type: text/plain; charset=ISO-8859-1; format=flowed; delsp=yes --001a11c2ad84243dec04f3246bab Content-Type: text/html; charset=ISO-8859-1 Content-Transfer-Encoding: quoted-printable
--001a11c2ad84243dec04f3246bab Content-Type: text/calendar; charset=UTF-8; method=REQUEST Content-Transfer-Encoding: 7bit BEGIN:VCALENDAR PRODID:-//Google Inc//Google Calendar 70.9054//EN VERSION:2.0 CALSCALE:GREGORIAN METHOD:REQUEST BEGIN:VEVENT DTSTART:20140224T110000Z DTEND:20140224T120000Z DTSTAMP:20140224T102728Z ORGANIZER:mailto:kepjllr6mcq7d0959u4cdc7000@group.calendar.google.com UID:0BE2F640-5814-47C9-ABAE-E7E959204E76 ATTENDEE;CUTYPE=INDIVIDUAL;ROLE=REQ-PARTICIPANT;PARTSTAT=ACCEPTED;RSVP=TRUE ;X-NUM-GUESTS=0:mailto:kepjllr6mcq7d0959u4cdc7000@group.calendar.google.com ATTENDEE;CUTYPE=INDIVIDUAL;ROLE=REQ-PARTICIPANT;PARTSTAT=NEEDS-ACTION;RSVP= TRUE;CN=John Sample;X-NUM-GUESTS=0:mailto:john.sample@example.org CREATED:20140224T102728Z DESCRIPTION:Testing Multipart structure\\nView your event at http://www.goog le.com/calendar/event?action=VIEW&eid=XzYxMTRhY2k2Nm9xMzBiOWw3MG9qOGI5azZ0M WppYmExODkwa2FiYTU2dDJqaWQ5cDY4bzM4aDluNm8gdGhvbWFzQGJyb3RoZXJsaS5jaA&tok=N TIja2VwamxscjZtY3E3ZDA5NTl1NGNkYzcwMDBAZ3JvdXAuY2FsZW5kYXIuZ29vZ2xlLmNvbTkz NTcyYTU2YmUwNWMxNjY0Zjc3OTU0MzhmMDcwY2FhN2NjZjIzYWM&ctz=Europe/Zurich&hl=en . LAST-MODIFIED:20140224T102728Z LOCATION: SEQUENCE:5 STATUS:CONFIRMED SUMMARY:iTip from Apple TRANSP:OPAQUE END:VEVENT END:VCALENDAR --001a11c2ad84243dec04f3246bab-- --001a11c2ad84243df004f3246bad Content-Type: application/ics; name="invite.ics" Content-Disposition: attachment; filename="invite.ics" Content-Transfer-Encoding: base64 QkVHSU46VkNBTEVOREFSDQpQUk9ESUQ6LS8vR29vZ2xlIEluYy8vR29vZ2xlIENhbGVuZGFyIDcw LjkwNTQvL0VODQpWRVJTSU9OOjIuMA0KQ0FMU0NBTEU6R1JFR09SSUFODQpNRVRIT0Q6UkVRVUVT VA0KQkVHSU46VkVWRU5UDQpEVFNUQVJUOjIwMTQwMjI0VDExMDAwMFoNCkRURU5EOjIwMTQwMjI0 VDEyMDAwMFoNCkRUU1RBTVA6MjAxNDAyMjRUMTAyNzI4Wg0KT1JHQU5JWkVSOm1haWx0bzprZXBq bGxyNm1jcTdkMDk1OXU0Y2RjNzAwMEBncm91cC5jYWxlbmRhci5nb29nbGUuY29tDQpVSUQ6MEJF MkY2NDAtNTgxNC00N0M5LUFCQUUtRTdFOTU5MjA0RTc2DQpBVFRFTkRFRTtDVVRZUEU9SU5ESVZJ RFVBTDtST0xFPVJFUS1QQVJUSUNJUEFOVDtQQVJUU1RBVD1BQ0NFUFRFRDtSU1ZQPVRSVUUNCiA7 WC1OVU0tR1VFU1RTPTA6bWFpbHRvOmtlcGpsbHI2bWNxN2QwOTU5dTRjZGM3MDAwQGdyb3VwLmNh bGVuZGFyLmdvb2dsZS5jb20NCkFUVEVOREVFO0NVVFlQRT1JTkRJVklEVUFMO1JPTEU9UkVRLVBB UlRJQ0lQQU5UO1BBUlRTVEFUPU5FRURTLUFDVElPTjtSU1ZQPQ0KIFRSVUU7WC1OVU0tR1VFU1RT PTA6bWFpbHRvOnRob21hc0Bicm90aGVybGkuY2gNCkFUVEVOREVFO0NVVFlQRT1JTkRJVklEVUFM O1JPTEU9UkVRLVBBUlRJQ0lQQU5UO1BBUlRTVEFUPU5FRURTLUFDVElPTjtSU1ZQPQ0KIFRSVUU7 Q049VGhvbWFzIEJydWVkZXJsaTtYLU5VTS1HVUVTVFM9MDptYWlsdG86cm91bmRjdWJlQGdtYWls LmNvbQ0KQ1JFQVRFRDoyMDE0MDIyNFQxMDI3MjhaDQpERVNDUklQVElPTjpUZXN0aW5nIE11bHRp cGFydCBzdHJ1Y3R1cmVcblZpZXcgeW91ciBldmVudCBhdCBodHRwOi8vd3d3Lmdvb2cNCiBsZS5j b20vY2FsZW5kYXIvZXZlbnQ/YWN0aW9uPVZJRVcmZWlkPVh6WXhNVFJoWTJrMk5tOXhNekJpT1d3 M01HOXFPR0k1YXpaME0NCiBXcHBZbUV4T0Rrd2EyRmlZVFUyZERKcWFXUTVjRFk0YnpNNGFEbHVO bThnZEdodmJXRnpRR0p5YjNSb1pYSnNhUzVqYUEmdG9rPU4NCiBUSWphMlZ3YW14c2NqWnRZM0Uz WkRBNU5UbDFOR05rWXpjd01EQkFaM0p2ZFhBdVkyRnNaVzVrWVhJdVoyOXZaMnhsTG1OdmJUa3oN CiBOVGN5WVRVMlltVXdOV014TmpZMFpqYzNPVFUwTXpobU1EY3dZMkZoTjJOalpqSXpZV00mY3R6 PUV1cm9wZS9adXJpY2gmaGw9ZW4NCiAuDQpMQVNULU1PRElGSUVEOjIwMTQwMjI0VDEwMjcyOFoN CkxPQ0FUSU9OOg0KU0VRVUVOQ0U6NQ0KU1RBVFVTOkNPTkZJUk1FRA0KU1VNTUFSWTppVGlwIGZy b20gQXBwbGUNClRSQU5TUDpPUEFRVUUNCkVORDpWRVZFTlQNCkVORDpWQ0FMRU5EQVINCg== --001a11c2ad84243df004f3246bad-- """ itip_application_ics = """MIME-Version: 1.0 Content-Type: multipart/mixed; boundary="=_c8894dbdb8baeedacae836230e3436fd" From: "Doe, John" Date: Fri, 13 Jul 2012 13:54:14 +0100 Message-ID: <240fe7ae7e139129e9eb95213c101622@example.org> User-Agent: Roundcube Webmail/0.9-0.3.el6.kolab_3.0 To: resource-collection-car@example.org Subject: "test" has been updated --=_c8894dbdb8baeedacae836230e3436fd Content-Transfer-Encoding: quoted-printable Content-Type: text/plain; charset=UTF-8; format=flowed --=_c8894dbdb8baeedacae836230e3436fd Content-Type: application/ics; charset=UTF-8; method=REQUEST; name=event.ics Content-Transfer-Encoding: quoted-printable BEGIN:VCALENDAR VERSION:2.0 PRODID:-//Roundcube Webmail 0.9-0.3.el6.kolab_3.0//NONSGML Calendar//EN CALSCALE:GREGORIAN METHOD:REQUEST BEGIN:VEVENT UID:626421779C777FBE9C9B85A80D04DDFA-A4BF5BBB9FEAA271 DTSTAMP:20120713T1254140 DTSTART;TZID=3DEurope/London:20120713T100000 DTEND;TZID=3DEurope/London:20120713T110000 SUMMARY:test DESCRIPTION:test ORGANIZER;CN=3D"Doe, John":mailto:john.doe@example.org ATTENDEE;ROLE=3DREQ-PARTICIPANT;PARTSTAT=3DNEEDS-ACTION;RSVP=3DTRUE:mailt= o:resource-collection-car@example.org TRANSP:OPAQUE END:VEVENT END:VCALENDAR --=_c8894dbdb8baeedacae836230e3436fd-- """ itip_recurring = """Return-Path: Sender: john.doe@example.org Content-Type: text/calendar; method=REQUEST; charset=UTF-8 Content-Transfer-Encoding: 8bit From: john.doe@example.org Date: Mon, 24 Feb 2014 11:27:28 +0100 Message-ID: <1a3aa8995e83dd24cf9247e538ac913a@example.org> Subject: Recurring BEGIN:VCALENDAR VERSION:2.0 PRODID:-//Apple Inc.//Mac OS X 10.9.2//EN CALSCALE:GREGORIAN METHOD:REQUEST BEGIN:VEVENT UID:dbdb8baeedacae836230e3436fd-5e83dd24cf92 DTSTAMP:20140213T1254140 DTSTART;TZID=Europe/London:20120709T100000 DTEND;TZID=Europe/London:20120709T120000 RRULE:FREQ=DAILY;INTERVAL=1;COUNT=5 SUMMARY:Recurring ORGANIZER;CN="Doe, John":mailto:john.doe@example.org ATTENDEE;ROLE=REQ-PARTICIPANT;CUTYPE=RESOURCE;PARTSTAT=NEEDS-ACTION;RSVP=TRUE:mailto:jane@example.com TRANSP:OPAQUE END:VEVENT END:VCALENDAR """ itip_unicode = """MIME-Version: 1.0 Content-Type: multipart/mixed; boundary="=_c8894dbdb8baeedacae836230e3436fd" From: "Doe, John" Date: Tue, 25 Feb 2014 13:54:14 +0100 Message-ID: <240fe7ae7e139129e9eb95213c1016d7@example.org> User-Agent: Roundcube Webmail/0.9-0.3.el6.kolab_3.0 To: resource-car-audia4@example.org Subject: "test" --=_c8894dbdb8baeedacae836230e3436fd Content-Type: text/plain; charset=UTF-8; format=flowed Content-Transfer-Encoding: quoted-printable *test* --=_c8894dbdb8baeedacae836230e3436fd Content-Type: text/calendar; charset=UTF-8; method=REQUEST; name=event.ics Content-Disposition: attachment; filename=event.ics Content-Transfer-Encoding: quoted-printable BEGIN:VCALENDAR VERSION:2.0 PRODID:-//Roundcube=20Webmail=200.9-0.3.el6.kolab_3.0//NONSGML=20Calendar//= EN CALSCALE:GREGORIAN METHOD:REQUEST BEGIN:VEVENT UID:eea25142-fb1c-4831-a02d-ac9fb4c16b70 DTSTAMP:20140213T125414Z -DTSTART;TZID=3DEurope/London:20140713T100000 +DTSTART;TZID=3D"W. Europe Standard Time":20140713T100000 DTEND;TZID=3DEurope/London:20140713T140000 SUMMARY:Testing =C3=9Cmlauts DESCRIPTION:Testing =C3=9Cmlauts LOCATION:Rue the Gen=C3=A8ve ORGANIZER;CN=3D"D=C3=BE,=20John":mailto:john.doe@example.org ATTENDEE;ROLE=3DREQ-PARTICIPANT;CUTYPE=3DRESOURCE;PARTSTAT=3DNEEDS-ACTION;R= SVP=3DTRUE:mailto:resource-car-audia4@example.org ATTENDEE;ROLE=3DREQ-PARTICIPANT;PARTSTAT=3DTENTATIVE;CN=3DSomebody=20Else:m= ailto:somebody@else.com TRANSP:OPAQUE END:VEVENT END:VCALENDAR --=_c8894dbdb8baeedacae836230e3436fd-- """ itip_empty = """MIME-Version: 1.0 Date: Fri, 17 Jan 2014 13:51:50 +0100 From: User-Agent: Roundcube Webmail/0.9.5 To: john.sample@example.org Subject: "test" has been sent Message-ID: <52D92766.5040508@somedomain.com> Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 7bit Message plain text goes here... """ conf = pykolab.getConf() if not hasattr(conf, 'defaults'): conf.finalize_conf() class TestITip(unittest.TestCase): def setUp(self): # intercept calls to smtplib.SMTP.sendmail() import smtplib self.patch(smtplib.SMTP, "__init__", self._mock_smtp_init) self.patch(smtplib.SMTP, "quit", self._mock_nop) self.patch(smtplib.SMTP, "sendmail", self._mock_smtp_sendmail) self.smtplog = [] def _mock_nop(self, domain=None): pass def _mock_smtp_init(self, host=None, port=None, local_hostname=None, timeout=0): pass def _mock_smtp_sendmail(self, from_addr, to_addr, message, mail_options=None, rcpt_options=None): self.smtplog.append((from_addr, to_addr, message)) def test_001_itip_events_from_message(self): itips1 = itip.events_from_message(message_from_string(itip_multipart)) self.assertEqual(len(itips1), 1, "Multipart iTip message with text/calendar") self.assertEqual(itips1[0]['method'], "REQUEST", "iTip request method property") itips2 = itip.events_from_message(message_from_string(itip_non_multipart)) self.assertEqual(len(itips2), 1, "Detect non-multipart iTip messages") itips3 = itip.events_from_message(message_from_string(itip_application_ics)) self.assertEqual(len(itips3), 1, "Multipart iTip message with application/ics attachment") itips4 = itip.events_from_message(message_from_string(itip_google_multipart)) self.assertEqual(len(itips4), 1, "Multipart iTip message from Google") itips5 = itip.events_from_message(message_from_string(itip_empty)) self.assertEqual(len(itips5), 0, "Simple plain text message") # invalid itip blocks self.assertRaises(Exception, itip.events_from_message, message_from_string(itip_multipart.replace("BEGIN:VEVENT", ""))) itips6 = itip.events_from_message(message_from_string(itip_multipart.replace("DTSTART;", "X-DTSTART;"))) self.assertEqual(len(itips6), 0, "Event with not DTSTART") itips7 = itip.events_from_message(message_from_string(itip_non_multipart.replace("METHOD:REQUEST", "METHOD:PUBLISH").replace("method=REQUEST", "method=PUBLISH"))) self.assertEqual(len(itips7), 0, "Invalid METHOD") # iTips with unicode data itips8 = itip.events_from_message(message_from_string(itip_unicode)) self.assertEqual(len(itips8), 1) xml = itips8[0]['xml'] self.assertEqual(xml.get_summary(), "Testing Ümlauts") self.assertEqual(xml.get_location(), "Rue the Genève") + # Timezone conversion + itips = itip.events_from_message(message_from_string(itip_unicode)) + xml = itips[0]['xml'] + self.assertEqual(xml.get_start().tzinfo.__str__(), "Europe/Berlin") + def test_002_check_date_conflict(self): astart = datetime.datetime(2014, 7, 13, 10, 0, 0) aend = astart + datetime.timedelta(hours=2) bstart = datetime.datetime(2014, 7, 13, 10, 0, 0) bend = astart + datetime.timedelta(hours=1) self.assertTrue(itip.check_date_conflict(astart, aend, bstart, bend)) bstart = datetime.datetime(2014, 7, 13, 11, 0, 0) bend = astart + datetime.timedelta(minutes=30) self.assertTrue(itip.check_date_conflict(astart, aend, bstart, bend)) bend = astart + datetime.timedelta(hours=2) self.assertTrue(itip.check_date_conflict(astart, aend, bstart, bend)) bstart = datetime.datetime(2014, 7, 13, 12, 0, 0) bend = astart + datetime.timedelta(hours=1) self.assertFalse(itip.check_date_conflict(astart, aend, bstart, bend)) self.assertFalse(itip.check_date_conflict(bstart, bend, astart, aend)) bstart = datetime.datetime(2014, 6, 13, 10, 0, 0) bend = datetime.datetime(2014, 6, 14, 12, 0, 0) self.assertFalse(itip.check_date_conflict(astart, aend, bstart, bend)) bstart = datetime.datetime(2014, 7, 10, 12, 0, 0) bend = datetime.datetime(2014, 7, 14, 14, 0, 0) self.assertTrue(itip.check_date_conflict(astart, aend, bstart, bend)) def test_002_check_event_conflict(self): itip_event = itip.events_from_message(message_from_string(itip_non_multipart))[0] event = Event() event.set_start(datetime.datetime(2012, 7, 13, 9, 30, 0, tzinfo=itip_event['start'].tzinfo)) event.set_end(datetime.datetime(2012, 7, 13, 10, 30, 0, tzinfo=itip_event['start'].tzinfo)) self.assertTrue(itip.check_event_conflict(event, itip_event), "Conflicting dates") event.set_uid(itip_event['uid']) self.assertFalse(itip.check_event_conflict(event, itip_event), "No conflict for same UID") allday = Event() allday.set_start(datetime.date(2012, 7, 13)) allday.set_end(datetime.date(2012, 7, 13)) self.assertTrue(itip.check_event_conflict(allday, itip_event), "Conflicting allday event") allday.set_transparency(True) self.assertFalse(itip.check_event_conflict(allday, itip_event), "No conflict if event is set to transparent") event2 = Event() event2.set_start(datetime.datetime(2012, 7, 13, 10, 0, 0, tzinfo=pytz.timezone("US/Central"))) event2.set_end(datetime.datetime(2012, 7, 13, 11, 0, 0, tzinfo=pytz.timezone("US/Central"))) self.assertFalse(itip.check_event_conflict(event, itip_event), "No conflict with timezone shift") rrule = kolabformat.RecurrenceRule() rrule.setFrequency(kolabformat.RecurrenceRule.Weekly) rrule.setCount(10) event3 = Event() event3.set_recurrence(rrule) event3.set_start(datetime.datetime(2012, 6, 29, 9, 30, 0, tzinfo=pytz.utc)) event3.set_end(datetime.datetime(2012, 6, 29, 10, 30, 0, tzinfo=pytz.utc)) self.assertTrue(itip.check_event_conflict(event3, itip_event), "Conflict in (3rd) recurring event instance") itip_event = itip.events_from_message(message_from_string(itip_recurring))[0] self.assertTrue(itip.check_event_conflict(event3, itip_event), "Conflict in two recurring events") event4 = Event() event4.set_recurrence(rrule) event4.set_start(datetime.datetime(2012, 7, 1, 9, 30, 0, tzinfo=pytz.utc)) event4.set_end(datetime.datetime(2012, 7, 1, 10, 30, 0, tzinfo=pytz.utc)) self.assertFalse(itip.check_event_conflict(event4, itip_event), "No conflict in two recurring events") itip_event = itip.events_from_message(message_from_string(itip_non_multipart))[0] rrule.setFrequency(kolabformat.RecurrenceRule.Daily) rrule.setCount(10) event5 = Event() event5.set_recurrence(rrule) event5.set_start(datetime.datetime(2012, 7, 9, 10, 0, 0, tzinfo=pytz.timezone("Europe/London"))) event5.set_end(datetime.datetime(2012, 7, 9, 11, 0, 0, tzinfo=pytz.timezone("Europe/London"))) event_xml = str(event5) exception = Event(from_string=event_xml) exception.set_start(datetime.datetime(2012, 7, 13, 14, 0, 0, tzinfo=pytz.timezone("Europe/London"))) exception.set_end(datetime.datetime(2012, 7, 13, 16, 0, 0, tzinfo=pytz.timezone("Europe/London"))) exception.set_recurrence_id(datetime.datetime(2012, 7, 13, 10, 0, 0, tzinfo=pytz.timezone("Europe/London")), False) event5.add_exception(exception) self.assertFalse(itip.check_event_conflict(event5, itip_event), "No conflict with exception date") exception = Event(from_string=event_xml) exception.set_start(datetime.datetime(2012, 7, 13, 10, 0, 0, tzinfo=pytz.timezone("Europe/London"))) exception.set_end(datetime.datetime(2012, 7, 13, 11, 0, 0, tzinfo=pytz.timezone("Europe/London"))) exception.set_status('CANCELLED') exception.set_recurrence_id(datetime.datetime(2012, 7, 13, 10, 0, 0, tzinfo=pytz.timezone("Europe/London")), False) event5.add_exception(exception) self.assertFalse(itip.check_event_conflict(event5, itip_event), "No conflict with cancelled exception") def test_002_check_event_conflict_single(self): itip_event = itip.events_from_message(message_from_string(itip_non_multipart))[0] event = Event() event.set_start(datetime.datetime(2012, 7, 10, 9, 30, 0, tzinfo=itip_event['start'].tzinfo)) event.set_end(datetime.datetime(2012, 7, 10, 10, 30, 0, tzinfo=itip_event['start'].tzinfo)) event.set_recurrence_id(event.get_start()) dtstart = datetime.datetime(2012, 7, 13, 9, 30, 0, tzinfo=itip_event['start'].tzinfo) second = Event(from_string=str(event)) second.set_start(dtstart) second.set_end(dtstart + datetime.timedelta(hours=1)) second.set_recurrence_id(dtstart) event.add_exception(second) self.assertTrue(itip.check_event_conflict(event, itip_event), "Conflicting dates (exception)") itip_event = itip.events_from_message(message_from_string(itip_non_multipart))[0] dtstart = datetime.datetime(2012, 7, 15, 10, 0, 0, tzinfo=itip_event['start'].tzinfo) second = Event(from_string=str(itip_event['xml'])) second.set_start(dtstart + datetime.timedelta(hours=1)) second.set_end(dtstart + datetime.timedelta(hours=2)) second.set_recurrence_id(dtstart) second.set_transparency(True) itip_event['xml'].add_exception(second) self.assertEqual(len(itip_event['xml'].get_exceptions()), 1) event = Event() event.set_start(datetime.datetime(2012, 7, 11, 9, 30, 0, tzinfo=itip_event['start'].tzinfo)) event.set_end(datetime.datetime(2012, 7, 11, 10, 30, 0, tzinfo=itip_event['start'].tzinfo)) self.assertFalse(itip.check_event_conflict(event, itip_event), "Conflicting dates (no)") event = Event() event.set_start(datetime.datetime(2012, 7, 15, 11, 0, 0, tzinfo=itip_event['start'].tzinfo)) event.set_end(datetime.datetime(2012, 7, 15, 11, 30, 0, tzinfo=itip_event['start'].tzinfo)) self.assertFalse(itip.check_event_conflict(event, itip_event), "Conflicting dates (exception)") def test_003_send_reply(self): itip_events = itip.events_from_message(message_from_string(itip_non_multipart)) itip.send_reply("resource-collection-car@example.org", itip_events, "SUMMARY=%(summary)s; STATUS=%(status)s; NAME=%(name)s;") self.assertEqual(len(self.smtplog), 1) self.assertEqual(self.smtplog[0][0], 'resource-collection-car@example.org', "From attendee") self.assertEqual(self.smtplog[0][1], 'john.doe@example.org', "To organizer") _accepted = participant_status_label('ACCEPTED') message = message_from_string(self.smtplog[0][2]) self.assertEqual(message.get('Subject'), _("Invitation for %(summary)s was %(status)s") % {'summary': 'test', 'status': _accepted}) text = str(message.get_payload(0)) self.assertIn('SUMMARY=3Dtest', text) self.assertIn('STATUS=3D' + _accepted, text) def test_004_send_reply_unicode(self): itip_events = itip.events_from_message(message_from_string(itip_non_multipart.replace('SUMMARY:test', "SUMMARY:With äöü"))) itip.send_reply("resource-collection-car@example.org", itip_events, "SUMMARY=%(summary)s; STATUS=%(status)s; NAME=%(name)s;") self.assertEqual(len(self.smtplog), 1) self.assertIn("Subject: =?utf-8?q?Invitation_for_With_=C3=A4=C3=B6=C3=BC_was_Accepted?=", self.smtplog[0][2]) self.assertIn('SUMMARY=3DWith =C3=A4=C3=B6=C3=BC', self.smtplog[0][2])