Page Menu
Home
Phorge
Search
Configure Global Search
Log In
Files
F117885963
__init__.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Flag For Later
Award Token
Authored By
Unknown
Size
13 KB
Referenced Files
None
Subscribers
None
__init__.py
View Options
import
icalendar
import
pykolab
import
traceback
import
kolabformat
import
re
from
pykolab.xml
import
to_dt
from
pykolab.xml
import
event_from_ical
from
pykolab.xml
import
todo_from_ical
from
pykolab.xml
import
participant_status_label
from
pykolab.translate
import
_
from
tzlocal
import
windows_tz
log
=
pykolab
.
getLogger
(
'pykolab.wallace'
)
def
events_from_message
(
message
,
methods
=
None
):
return
objects_from_message
(
message
,
[
"VEVENT"
],
methods
)
def
todos_from_message
(
message
,
methods
=
None
):
return
objects_from_message
(
message
,
[
"VTODO"
],
methods
)
def
objects_from_message
(
message
,
objnames
,
methods
=
None
):
"""
Obtain the iTip payload from email.message <message>
"""
# Placeholder for any itip_objects found in the message.
itip_objects
=
[]
seen_uids
=
[]
# iTip methods we are actually interested in. Other methods will be ignored.
if
methods
is
None
:
methods
=
[
"REQUEST"
,
"CANCEL"
]
# Are all iTip messages multipart? No! RFC 6047, section 2.4 states "A
# MIME body part containing content information that conforms to this
# document MUST have (...)" but does not state whether an iTip message must
# therefore also be multipart.
# Check each part
for
part
in
message
.
walk
():
# The iTip part MUST be Content-Type: text/calendar (RFC 6047, section 2.4)
# But in real word, other mime-types are used as well
if
part
.
get_content_type
()
in
[
"text/calendar"
,
"text/x-vcalendar"
,
"application/ics"
]:
if
not
str
(
part
.
get_param
(
'method'
))
.
upper
()
in
methods
:
log
.
info
(
_
(
"Method
%r
not really interesting for us."
)
%
(
part
.
get_param
(
'method'
)))
continue
# Get the itip_payload
itip_payload
=
part
.
get_payload
(
decode
=
True
)
log
.
debug
(
_
(
"Raw iTip payload (
%r
):
%r
"
)
%
(
part
.
get_param
(
'charset'
),
itip_payload
),
level
=
9
)
# Convert unsupported timezones, etc.
itip_payload
=
_convert_itip_payload
(
itip_payload
)
# Python iCalendar prior to 3.0 uses "from_string".
if
hasattr
(
icalendar
.
Calendar
,
'from_ical'
):
cal
=
icalendar
.
Calendar
.
from_ical
(
itip_payload
)
elif
hasattr
(
icalendar
.
Calendar
,
'from_string'
):
cal
=
icalendar
.
Calendar
.
from_string
(
itip_payload
)
# If we can't read it, we're out
else
:
log
.
error
(
_
(
"Could not read iTip from message."
))
return
[]
for
c
in
cal
.
walk
():
if
c
.
name
in
objnames
:
itip
=
{}
if
c
[
'uid'
]
in
seen_uids
:
log
.
debug
(
_
(
"Duplicate iTip object:
%s
"
)
%
(
c
[
'uid'
]),
level
=
9
)
continue
# From the event, take the following properties:
#
# - method
# - uid
# - sequence
# - start
# - end (if any)
# - duration (if any)
# - organizer
# - attendees (if any)
# - resources (if any)
#
itip
[
'type'
]
=
'task'
if
c
.
name
==
'VTODO'
else
'event'
itip
[
'uid'
]
=
str
(
c
[
'uid'
])
itip
[
'method'
]
=
str
(
cal
[
'method'
])
.
upper
()
itip
[
'sequence'
]
=
int
(
c
[
'sequence'
])
if
c
.
has_key
(
'sequence'
)
else
0
itip
[
'recurrence-id'
]
=
c
[
'recurrence-id'
]
.
dt
if
c
.
has_key
(
'recurrence-id'
)
and
hasattr
(
c
[
'recurrence-id'
],
'dt'
)
else
None
if
c
.
has_key
(
'dtstart'
):
itip
[
'start'
]
=
c
[
'dtstart'
]
.
dt
elif
itip
[
'type'
]
==
'event'
:
log
.
error
(
_
(
"iTip event without a start"
))
continue
if
c
.
has_key
(
'dtend'
):
itip
[
'end'
]
=
c
[
'dtend'
]
.
dt
if
c
.
has_key
(
'duration'
):
itip
[
'duration'
]
=
c
[
'duration'
]
.
dt
itip
[
'end'
]
=
itip
[
'start'
]
+
c
[
'duration'
]
.
dt
itip
[
'organizer'
]
=
c
[
'organizer'
]
itip
[
'attendees'
]
=
c
[
'attendee'
]
if
itip
.
has_key
(
'attendees'
)
and
not
isinstance
(
itip
[
'attendees'
],
list
):
itip
[
'attendees'
]
=
[
c
[
'attendee'
]]
if
c
.
has_key
(
'resources'
):
itip
[
'resources'
]
=
c
[
'resources'
]
itip
[
'raw'
]
=
itip_payload
try
:
# distinguish event and todo here
if
itip
[
'type'
]
==
'task'
:
itip
[
'xml'
]
=
todo_from_ical
(
c
,
itip_payload
)
else
:
itip
[
'xml'
]
=
event_from_ical
(
c
,
itip_payload
)
except
Exception
,
e
:
log
.
error
(
"event|todo_from_ical() exception:
%r
; iCal:
%s
"
%
(
e
,
itip_payload
))
continue
itip_objects
.
append
(
itip
)
seen_uids
.
append
(
c
[
'uid'
])
# end if c.name in objnames
# end for c in cal.walk()
# end if part.get_content_type() == "text/calendar"
# end for part in message.walk()
if
not
len
(
itip_objects
)
and
not
message
.
is_multipart
():
log
.
debug
(
_
(
"Message is not an iTip message (non-multipart message)"
),
level
=
5
)
return
itip_objects
def
check_event_conflict
(
kolab_event
,
itip_event
):
"""
Determine whether the given kolab event conflicts with the given itip event
"""
conflict
=
False
# don't consider conflict with myself
if
kolab_event
.
uid
==
itip_event
[
'uid'
]:
return
conflict
# don't consider conflict if event has TRANSP:TRANSPARENT
if
_is_transparent
(
kolab_event
):
return
conflict
if
_is_transparent
(
itip_event
[
'xml'
]):
return
conflict
_es
=
to_dt
(
kolab_event
.
get_start
())
_ee
=
to_dt
(
kolab_event
.
get_ical_dtend
())
# use iCal style end date: next day for all-day events
_ev
=
kolab_event
_ei
=
0
_is
=
to_dt
(
itip_event
[
'start'
])
_ie
=
to_dt
(
itip_event
[
'end'
])
_iv
=
itip_event
[
'xml'
]
_ii
=
0
# Escape looping through anything if neither of the events is recurring.
if
not
itip_event
[
'xml'
]
.
is_recurring
()
and
not
kolab_event
.
is_recurring
():
return
check_date_conflict
(
_es
,
_ee
,
_is
,
_ie
)
loop
=
0
done
=
False
# naive loops to check for collisions in (recurring) events
# TODO: compare recurrence rules directly (e.g. matching time slot or weekday or monthday)
while
not
conflict
and
not
done
:
loop
+=
1
# Scroll forward the kolab event recurrence until we're in the prime
# spot. We choose to start with the Kolab event because that is likely
# the older one.
if
_ee
<
_is
:
while
_ee
<
_is
and
_es
is
not
None
and
kolab_event
.
is_recurring
():
log
.
debug
(
"Attempt to move forward kolab event recurrence from
%s
closer to
%s
"
%
(
_ee
,
_is
),
level
=
8
)
__es
=
to_dt
(
kolab_event
.
get_next_occurence
(
_es
))
if
not
__es
is
None
:
_es
=
__es
_ee
=
to_dt
(
kolab_event
.
get_occurence_end_date
(
_es
))
else
:
done
=
True
break
# Scroll forward the itip event recurrence until we're in the
# prime spot, this time with the iTip event.
elif
_ie
<
_es
:
while
_ie
<
_es
and
_is
is
not
None
and
itip_event
[
'xml'
]
.
is_recurring
():
log
.
debug
(
"Attempt to move forward itip event recurrence from
%s
closer to
%s
"
%
(
_ie
,
_es
),
level
=
8
)
__is
=
to_dt
(
itip_event
[
'xml'
]
.
get_next_occurence
(
_is
))
if
not
__is
is
None
:
_is
=
__is
_ie
=
to_dt
(
itip_event
[
'xml'
]
.
get_occurence_end_date
(
_is
))
else
:
done
=
True
break
# Now that we have some events somewhere in the same neighborhood...
conflict
=
check_date_conflict
(
_es
,
_ee
,
_is
,
_ie
)
log
.
debug
(
"* Comparing itip at
%s
/
%s
with kolab at
%s
/
%s
:
%r
(
%d
)"
%
(
_is
,
_ie
,
_es
,
_ee
,
conflict
,
loop
),
level
=
8
)
if
not
conflict
:
if
kolab_event
.
is_recurring
()
and
itip_event
[
'xml'
]
.
is_recurring
():
if
not
kolab_event
.
has_exceptions
()
and
not
itip_event
[
'xml'
]
.
has_exceptions
():
log
.
debug
(
"No conflict, both recurring, but neither with exceptions"
,
level
=
8
)
done
=
True
break
_is
=
to_dt
(
itip_event
[
'xml'
]
.
get_next_occurence
(
_is
))
if
_is
is
not
None
:
_ie
=
to_dt
(
itip_event
[
'xml'
]
.
get_occurence_end_date
(
_is
))
else
:
done
=
True
return
conflict
def
_is_transparent
(
event
):
return
event
.
get_transparency
()
or
event
.
get_status
()
==
kolabformat
.
StatusCancelled
def
_convert_itip_payload
(
itip
):
matchlist
=
re
.
findall
(
"^((DTSTART|DTEND|DUE|EXDATE|COMPLETED)[:;][^
\n
]+)$"
,
itip
,
re
.
MULTILINE
)
for
match
in
matchlist
:
match
=
match
[
0
]
search
=
re
.
search
(
";TZID=([^:;]+)"
,
match
)
if
search
:
tzorig
=
tzdest
=
search
.
group
(
1
)
.
replace
(
'"'
,
''
)
# timezone in Olson-database format, nothing to convert
if
re
.
match
(
"[a-zA-Z]+/[a-zA-Z0-9_+-]+"
,
tzorig
):
continue
# convert timezone from windows format to Olson
if
tzorig
in
windows_tz
.
win_tz
:
tzdest
=
windows_tz
.
win_tz
[
tzorig
]
# @TODO: Should be prefer server time if it has the same offset?
# replace old with new timezone name
if
tzorig
!=
tzdest
:
replace
=
match
.
replace
(
search
.
group
(
0
),
";TZID="
+
tzdest
)
itip
=
itip
.
replace
(
"
\n
"
+
match
,
"
\n
"
+
replace
)
return
itip
def
check_date_conflict
(
_es
,
_ee
,
_is
,
_ie
):
"""
Check the given event start/end dates for conflicts
"""
conflict
=
False
# TODO: add margin for all-day dates (+13h; -12h)
if
_es
<
_is
:
if
_es
<=
_ie
:
if
_ee
<=
_is
:
conflict
=
False
else
:
conflict
=
True
else
:
conflict
=
True
elif
_es
==
_is
:
conflict
=
True
else
:
# _es > _is
if
_es
<
_ie
:
conflict
=
True
else
:
conflict
=
False
return
conflict
def
send_reply
(
from_address
,
itip_events
,
response_text
,
subject
=
None
):
"""
Send the given iCal events as a valid iTip REPLY to the organizer.
"""
import
smtplib
conf
=
pykolab
.
getConf
()
smtp
=
None
if
isinstance
(
itip_events
,
dict
):
itip_events
=
[
itip_events
]
for
itip_event
in
itip_events
:
attendee
=
itip_event
[
'xml'
]
.
get_attendee_by_email
(
from_address
)
participant_status
=
itip_event
[
'xml'
]
.
get_ical_attendee_participant_status
(
attendee
)
log
.
debug
(
_
(
"Send iTip reply
%s
for
%s
%r
"
)
%
(
participant_status
,
itip_event
[
'xml'
]
.
type
,
itip_event
[
'xml'
]
.
uid
),
level
=
8
)
event_summary
=
itip_event
[
'xml'
]
.
get_summary
()
message_text
=
response_text
%
{
'summary'
:
event_summary
,
'status'
:
participant_status_label
(
participant_status
),
'name'
:
attendee
.
get_name
()
}
if
subject
is
not
None
:
subject
=
subject
%
{
'summary'
:
event_summary
,
'status'
:
participant_status_label
(
participant_status
),
'name'
:
attendee
.
get_name
()
}
try
:
message
=
itip_event
[
'xml'
]
.
to_message_itip
(
from_address
,
method
=
"REPLY"
,
participant_status
=
participant_status
,
message_text
=
message_text
,
subject
=
subject
)
except
Exception
,
e
:
log
.
error
(
_
(
"Failed to compose iTip reply message:
%r
:
%s
"
)
%
(
e
,
traceback
.
format_exc
()))
return
smtp
=
smtplib
.
SMTP
(
"localhost"
,
10026
)
# replies go through wallace again
if
conf
.
debuglevel
>
8
:
smtp
.
set_debuglevel
(
True
)
try
:
smtp
.
sendmail
(
message
[
'From'
],
message
[
'To'
],
message
.
as_string
())
except
Exception
,
e
:
log
.
error
(
_
(
"SMTP sendmail error:
%r
"
)
%
(
e
))
if
smtp
:
smtp
.
quit
()
def
send_request
(
to_address
,
itip_events
,
request_text
,
subject
=
None
,
direct
=
False
):
"""
Send an iTip REQUEST message from the given iCal events
"""
import
smtplib
conf
=
pykolab
.
getConf
()
smtp
=
None
if
isinstance
(
itip_events
,
dict
):
itip_events
=
[
itip_events
]
for
itip_event
in
itip_events
:
event_summary
=
itip_event
[
'xml'
]
.
get_summary
()
message_text
=
request_text
%
{
'summary'
:
event_summary
}
if
subject
is
not
None
:
subject
=
subject
%
{
'summary'
:
event_summary
}
try
:
message
=
itip_event
[
'xml'
]
.
to_message_itip
(
None
,
method
=
"REQUEST"
,
message_text
=
message_text
,
subject
=
subject
)
except
Exception
,
e
:
log
.
error
(
_
(
"Failed to compose iTip request message:
%r
"
)
%
(
e
))
return
port
=
10027
if
direct
else
10026
smtp
=
smtplib
.
SMTP
(
"localhost"
,
port
)
if
conf
.
debuglevel
>
8
:
smtp
.
set_debuglevel
(
True
)
try
:
smtp
.
sendmail
(
message
[
'From'
],
to_address
,
message
.
as_string
())
except
Exception
,
e
:
log
.
error
(
_
(
"SMTP sendmail error:
%r
"
)
%
(
e
))
if
smtp
:
smtp
.
quit
()
File Metadata
Details
Attached
Mime Type
text/x-script.python
Expires
Mon, Apr 6, 2:18 AM (1 w, 3 d ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
18790872
Default Alt Text
__init__.py (13 KB)
Attached To
Mode
rP pykolab
Attached
Detach File
Event Timeline