Page Menu
Home
Phorge
Search
Configure Global Search
Log In
Files
F117725730
D785.1774892231.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Flag For Later
Award Token
Authored By
Unknown
Size
13 KB
Referenced Files
None
Subscribers
None
D785.1774892231.diff
View Options
diff --git a/pykolab/itip/__init__.py b/pykolab/itip/__init__.py
--- a/pykolab/itip/__init__.py
+++ b/pykolab/itip/__init__.py
@@ -1,26 +1,33 @@
-import icalendar
-import pykolab
+import re
import traceback
+
+import icalendar
import kolabformat
-import re
+import pykolab
+from pykolab.translate import _
from pykolab.xml import to_dt
from pykolab.xml import event_from_ical
from pykolab.xml import todo_from_ical
from pykolab.xml import participant_status_label
-from pykolab.translate import _
+
from tzlocal import windows_tz
+# pylint: disable=invalid-name
log = pykolab.getLogger('pykolab.wallace')
def events_from_message(message, methods=None):
return objects_from_message(message, ["VEVENT"], methods)
+
def todos_from_message(message, methods=None):
return objects_from_message(message, ["VTODO"], methods)
-def objects_from_message(message, objnames, methods=None):
+
+# pylint: disable=too-many-branches
+# pylint: disable=too-many-statements
+def objects_from_message(message, objnames, methods=None): # noqa: C901
"""
Obtain the iTip payload from email.message <message>
"""
@@ -30,7 +37,7 @@
# iTip methods we are actually interested in. Other methods will be ignored.
if methods is None:
- methods = [ "REQUEST", "CANCEL" ]
+ methods = ["REQUEST", "CANCEL"]
# Are all iTip messages multipart? No! RFC 6047, section 2.4 states "A
# MIME body part containing content information that conforms to this
@@ -38,19 +45,23 @@
# therefore also be multipart.
# Check each part
+ # pylint: disable=too-many-nested-blocks
for part in message.walk():
# The iTip part MUST be Content-Type: text/calendar (RFC 6047, section 2.4)
# But in real word, other mime-types are used as well
- if part.get_content_type() in [ "text/calendar", "text/x-vcalendar", "application/ics" ]:
- if not str(part.get_param('method')).upper() in methods:
- log.info(_("Method %r not really interesting for us.") % (part.get_param('method')))
+ if part.get_content_type() in ["text/calendar", "text/x-vcalendar", "application/ics"]:
+ if str(part.get_param('method')).upper() not in methods:
+ log.info("Method %r not really interesting for us." % (part.get_param('method')))
continue
# Get the itip_payload
itip_payload = part.get_payload(decode=True)
- log.debug(_("Raw iTip payload (%r): %r") % (part.get_param('charset'), itip_payload), level=8)
+ log.debug(
+ "Raw iTip payload (%r): %r" % (part.get_param('charset'), itip_payload),
+ level=8
+ )
# Convert unsupported timezones, etc.
itip_payload = _convert_itip_payload(itip_payload)
@@ -90,33 +101,37 @@
itip['type'] = 'task' if c.name == 'VTODO' else 'event'
itip['uid'] = str(c['uid'])
itip['method'] = str(cal['method']).upper()
- itip['sequence'] = int(c['sequence']) if c.has_key('sequence') else 0
- itip['recurrence-id'] = c['recurrence-id'].dt if c.has_key('recurrence-id') and hasattr(c['recurrence-id'], 'dt') else None
+ itip['sequence'] = int(c['sequence']) if 'sequence' in c else 0
- if c.has_key('dtstart'):
+ itip['recurrence-id'] = None
+ if 'recurrence-id' in c:
+ if hasattr(c['recurrence-id'], 'dt'):
+ itip['recurrence-id'] = c['recurrence-id'].dt
+
+ if 'dtstart' in c:
itip['start'] = c['dtstart'].dt
elif itip['type'] == 'event':
log.error(_("iTip event without a start"))
continue
- if c.has_key('dtend'):
+ if 'dtend' in c:
itip['end'] = c['dtend'].dt
- if c.has_key('duration'):
+ if 'duration' in c:
itip['duration'] = c['duration'].dt
itip['end'] = itip['start'] + c['duration'].dt
# Outlook can send itip replies with no organizer property
- if c.has_key('organizer'):
+ if 'organizer' in c:
itip['organizer'] = c['organizer']
- if c.has_key('attendee'):
+ if 'attendee' in c:
itip['attendees'] = c['attendee']
- if itip.has_key('attendees') and not isinstance(itip['attendees'], list):
+ if 'attendees' in itip and not isinstance(itip['attendees'], list):
itip['attendees'] = [c['attendee']]
- if c.has_key('resources'):
+ if 'resources' in c:
itip['resources'] = c['resources']
itip['raw'] = itip_payload
@@ -127,8 +142,13 @@
itip['xml'] = todo_from_ical(c, itip_payload)
else:
itip['xml'] = event_from_ical(c, itip_payload)
- except Exception, e:
- log.error("event|todo_from_ical() exception: %r; iCal: %s" % (e, itip_payload))
+
+ # pylint: disable=broad-except
+ except Exception as e:
+ log.error(
+ "event|todo_from_ical() exception: %r; iCal: %s" % (e, itip_payload)
+ )
+
continue
itip_objects.append(itip)
@@ -148,6 +168,7 @@
return itip_objects
+
def check_event_conflict(kolab_event, itip_event):
"""
Determine whether the given kolab event conflicts with the given itip event
@@ -166,14 +187,11 @@
return conflict
_es = to_dt(kolab_event.get_start())
- _ee = to_dt(kolab_event.get_ical_dtend()) # use iCal style end date: next day for all-day events
- _ev = kolab_event
- _ei = 0
+ # use iCal style end date: next day for all-day events
+ _ee = to_dt(kolab_event.get_ical_dtend())
_is = to_dt(itip_event['start'])
_ie = to_dt(itip_event['end'])
- _iv = itip_event['xml']
- _ii = 0
# Escape looping through anything if neither of the events is recurring.
if not itip_event['xml'].is_recurring() and not kolab_event.is_recurring():
@@ -193,10 +211,17 @@
# the older one.
if _ee < _is:
while _ee < _is and _es is not None and kolab_event.is_recurring():
- log.debug("Attempt to move forward kolab event recurrence from %s closer to %s" % (_ee, _is), level=8)
+ log.debug(
+ "Attempt to move forward kolab event recurrence from {} closer to {}".format(
+ _ee,
+ _is
+ ),
+ level=8
+ )
+
__es = to_dt(kolab_event.get_next_occurence(_es))
- if not __es is None:
+ if __es is not None and not __es == _es:
_es = __es
_ee = to_dt(kolab_event.get_occurence_end_date(_es))
else:
@@ -207,10 +232,17 @@
# prime spot, this time with the iTip event.
elif _ie < _es:
while _ie < _es and _is is not None and itip_event['xml'].is_recurring():
- log.debug("Attempt to move forward itip event recurrence from %s closer to %s" % (_ie, _es), level=8)
+ log.debug(
+ "Attempt to move forward itip event recurrence from {} closer to {}".format(
+ _ie,
+ _es
+ ),
+ level=8
+ )
+
__is = to_dt(itip_event['xml'].get_next_occurence(_is))
- if not __is is None:
+ if __is is not None and not _is == __is:
_is = __is
_ie = to_dt(itip_event['xml'].get_occurence_end_date(_is))
else:
@@ -219,7 +251,12 @@
# Now that we have some events somewhere in the same neighborhood...
conflict = check_date_conflict(_es, _ee, _is, _ie)
- log.debug("* Comparing itip at %s/%s with kolab at %s/%s: %r (%d)" % (_is, _ie, _es, _ee, conflict, loop), level=8)
+ log.debug(
+ "* Comparing itip at %s/%s with kolab at %s/%s: conflict - %r (occurence - %d)" % (
+ _is, _ie, _es, _ee, conflict, loop
+ ),
+ level=8
+ )
if not conflict:
if kolab_event.is_recurring() and itip_event['xml'].is_recurring():
@@ -237,9 +274,11 @@
return conflict
+
def _is_transparent(event):
return event.get_transparency() or event.get_status() == kolabformat.StatusCancelled
+
def _convert_itip_payload(itip):
matchlist = re.findall("^((DTSTART|DTEND|DUE|EXDATE|COMPLETED)[:;][^\n]+)$", itip, re.MULTILINE)
@@ -267,6 +306,7 @@
return itip
+
def check_date_conflict(_es, _ee, _is, _ie):
"""
Check the given event start/end dates for conflicts
@@ -285,7 +325,7 @@
conflict = True
elif _es == _is:
conflict = True
- else: # _es > _is
+ else: # _es > _is
if _es < _ie:
conflict = True
else:
@@ -304,29 +344,47 @@
smtp = None
if isinstance(itip_events, dict):
- itip_events = [ itip_events ]
+ itip_events = [itip_events]
for itip_event in itip_events:
attendee = itip_event['xml'].get_attendee_by_email(from_address)
participant_status = itip_event['xml'].get_ical_attendee_participant_status(attendee)
- log.debug(_("Send iTip reply %s for %s %r") % (participant_status, itip_event['xml'].type, itip_event['xml'].uid), level=8)
+ log.debug(
+ "Send iTip reply {} for {} {}".format(
+ participant_status,
+ itip_event['xml'].type,
+ itip_event['xml'].uid
+ ),
+ level=8
+ )
event_summary = itip_event['xml'].get_summary()
- message_text = response_text % { 'summary':event_summary, 'status':participant_status_label(participant_status), 'name':attendee.get_name() }
+ message_text = response_text % {
+ 'summary': event_summary,
+ 'status': participant_status_label(participant_status),
+ 'name': attendee.get_name()
+ }
if subject is not None:
- subject = subject % { 'summary':event_summary, 'status':participant_status_label(participant_status), 'name':attendee.get_name() }
+ subject = subject % {
+ 'summary': event_summary,
+ 'status': participant_status_label(participant_status),
+ 'name': attendee.get_name()
+ }
try:
- message = itip_event['xml'].to_message_itip(from_address,
+ message = itip_event['xml'].to_message_itip(
+ from_address,
method="REPLY",
participant_status=participant_status,
message_text=message_text,
subject=subject
)
- except Exception, e:
- log.error(_("Failed to compose iTip reply message: %r: %s") % (e, traceback.format_exc()))
+
+ # pylint: disable=broad-except
+ except Exception as e:
+ log.error("Failed to compose iTip reply message: %r: %s" % (e, traceback.format_exc()))
return
smtp = smtplib.SMTP("localhost", 10026) # replies go through wallace again
@@ -336,7 +394,9 @@
try:
smtp.sendmail(message['From'], message['To'], message.as_string())
- except Exception, e:
+
+ # pylint: disable=broad-except
+ except Exception as e:
log.error(_("SMTP sendmail error: %r") % (e))
if smtp:
@@ -353,22 +413,25 @@
smtp = None
if isinstance(itip_events, dict):
- itip_events = [ itip_events ]
+ itip_events = [itip_events]
for itip_event in itip_events:
event_summary = itip_event['xml'].get_summary()
- message_text = request_text % { 'summary':event_summary }
+ message_text = request_text % {'summary': event_summary}
if subject is not None:
- subject = subject % { 'summary':event_summary }
+ subject = subject % {'summary': event_summary}
try:
- message = itip_event['xml'].to_message_itip(None,
+ message = itip_event['xml'].to_message_itip(
+ None,
method="REQUEST",
message_text=message_text,
subject=subject
)
- except Exception, e:
+
+ # pylint: disable=broad-except
+ except Exception as e:
log.error(_("Failed to compose iTip request message: %r") % (e))
return
@@ -380,7 +443,9 @@
try:
smtp.sendmail(message['From'], to_address, message.as_string())
- except Exception, e:
+
+ # pylint: disable=broad-except
+ except Exception as e:
log.error(_("SMTP sendmail error: %r") % (e))
if smtp:
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Mon, Mar 30, 5:37 PM (2 d, 18 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
18818043
Default Alt Text
D785.1774892231.diff (13 KB)
Attached To
Mode
D785: Prevent event conflict check jumping into infinite loop
Attached
Detach File
Event Timeline