Page Menu
Home
Phorge
Search
Configure Global Search
Log In
Files
F117885025
test-011-wallace_resources.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Flag For Later
Award Token
Authored By
Unknown
Size
9 KB
Referenced Files
None
Subscribers
None
test-011-wallace_resources.py
View Options
import
pykolab
import
logging
import
datetime
from
pykolab
import
itip
from
icalendar
import
Calendar
from
email
import
message
from
email
import
message_from_string
from
wallace
import
module_resources
from
twisted.trial
import
unittest
# define some iTip MIME messages
itip_multipart
=
"""MIME-Version: 1.0
Content-Type: multipart/mixed;
boundary="=_c8894dbdb8baeedacae836230e3436fd"
From: "Doe, John" <doe@example.org>
Date: Fri, 13 Jul 2012 13:54:14 +0100
Message-ID: <240fe7ae7e139129e9eb95213c1016d7@example.org>
User-Agent: Roundcube Webmail/0.9-0.3.el6.kolab_3.0
To: resource-collection-car@example.org
Subject: "test" has been updated
--=_c8894dbdb8baeedacae836230e3436fd
Content-Type: text/plain; charset=UTF-8; format=flowed
Content-Transfer-Encoding: quoted-printable
*test*
--=_c8894dbdb8baeedacae836230e3436fd
Content-Type: text/calendar; charset=UTF-8; method=REQUEST;
name=event.ics
Content-Disposition: attachment;
filename=event.ics
Content-Transfer-Encoding: quoted-printable
BEGIN:VCALENDAR
VERSION:2.0
PRODID:-//Roundcube Webmail 0.9-0.3.el6.kolab_3.0//NONSGML Calendar//EN
CALSCALE:GREGORIAN
METHOD:REQUEST
BEGIN:VEVENT
UID:626421779C777FBE9C9B85A80D04DDFA-A4BF5BBB9FEAA271
DTSTAMP:20120713T1254140
DTSTART;TZID=3DEurope/London:20120713T100000
DTEND;TZID=3DEurope/London:20120713T110000
SUMMARY:test
DESCRIPTION:test
ORGANIZER;CN=3D"Doe, John":mailto:doe@example.org
ATTENDEE;ROLE=3DREQ-PARTICIPANT;PARTSTAT=3DNEEDS-ACTION;RSVP=3DTRUE:mailt=
o:resource-collection-car@example.org
ATTENDEE;ROLE=3DOPT-PARTICIPANT;PARTSTAT=3DNEEDS-ACTION;RSVP=3DTRUE:mailto:anoth=
er-resource@example.org
TRANSP:OPAQUE
END:VEVENT
END:VCALENDAR
--=_c8894dbdb8baeedacae836230e3436fd--
"""
itip_non_multipart
=
"""Return-Path: <doe@example.org>
Sender: doe@example.org
Content-Type: text/calendar; method=REQUEST; charset=UTF-8
Content-Transfer-Encoding: quoted-printable
To: resource-collection-car@example.org
From: doe@example.org
Date: Mon, 24 Feb 2014 11:27:28 +0100
Message-ID: <1a3aa8995e83dd24cf9247e538ac913a@example.org>
Subject: test
BEGIN:VCALENDAR
VERSION:2.0
PRODID:-//Roundcube Webmail 0.9-0.3.el6.kolab_3.0//NONSGML Calendar//EN
CALSCALE:GREGORIAN
METHOD:REQUEST
BEGIN:VEVENT
UID:626421779C777FBE9C9B85A80D04DDFA-A4BF5BBB9FEAA271
DTSTAMP:20120713T1254140
DTSTART;TZID=3DEurope/London:20120713T100000
DTEND;TZID=3DEurope/London:20120713T110000
SUMMARY:test
DESCRIPTION:test
ORGANIZER;CN=3D"Doe, John":mailto:doe@example.org
ATTENDEE;ROLE=3DREQ-PARTICIPANT;PARTSTAT=3DACCEPTED;RSVP=3DTRUE:mailt=
o:resource-collection-car@example.org
TRANSP:OPAQUE
END:VEVENT
END:VCALENDAR
"""
conf
=
pykolab
.
getConf
()
if
not
hasattr
(
conf
,
'defaults'
):
conf
.
finalize_conf
()
class
TestWallaceResources
(
unittest
.
TestCase
):
def
setUp
(
self
):
# monkey-patch the pykolab.auth module to check API calls
# without actually connecting to LDAP
self
.
patch
(
pykolab
.
auth
.
Auth
,
"connect"
,
self
.
_mock_nop
)
self
.
patch
(
pykolab
.
auth
.
Auth
,
"disconnect"
,
self
.
_mock_nop
)
self
.
patch
(
pykolab
.
auth
.
Auth
,
"find_resource"
,
self
.
_mock_find_resource
)
self
.
patch
(
pykolab
.
auth
.
Auth
,
"get_entry_attributes"
,
self
.
_mock_get_entry_attributes
)
self
.
patch
(
pykolab
.
auth
.
Auth
,
"search_entry_by_attribute"
,
self
.
_mock_search_entry_by_attribute
)
# intercept calls to smtplib.SMTP.sendmail()
import
smtplib
self
.
patch
(
smtplib
.
SMTP
,
"__init__"
,
self
.
_mock_smtp_init
)
self
.
patch
(
smtplib
.
SMTP
,
"quit"
,
self
.
_mock_nop
)
self
.
patch
(
smtplib
.
SMTP
,
"sendmail"
,
self
.
_mock_smtp_sendmail
)
self
.
smtplog
=
[];
def
_mock_nop
(
self
,
domain
=
None
):
pass
def
_mock_find_resource
(
self
,
address
):
if
not
'resource'
in
address
:
return
[];
(
prefix
,
domain
)
=
address
.
split
(
'@'
)
entry_dn
=
"cn="
+
prefix
+
",ou=Resources,dc="
+
",dc="
.
join
(
domain
.
split
(
'.'
))
return
[
entry_dn
];
def
_mock_get_entry_attributes
(
self
,
domain
,
entry
,
attributes
):
(
_
,
uid
)
=
entry
.
split
(
','
)[
0
]
.
split
(
'='
)
return
{
'cn'
:
uid
,
'mail'
:
uid
+
"@example.org"
,
'_attrib'
:
attributes
}
def
_mock_search_entry_by_attribute
(
self
,
attr
,
value
,
**
kw
):
results
=
[]
if
value
==
"cn=Room 101,ou=Resources,dc=example,dc=org"
:
results
.
append
((
'cn=Rooms,ou=Resources,dc=example,dc=org'
,
{
attr
:
value
,
'owner'
:
'uid=doe,ou=People,dc=example,dc=org'
}))
return
results
def
_mock_smtp_init
(
self
,
host
=
None
,
port
=
None
,
local_hostname
=
None
,
timeout
=
0
):
pass
def
_mock_smtp_sendmail
(
self
,
from_addr
,
to_addr
,
message
,
mail_options
=
None
,
rcpt_options
=
None
):
self
.
smtplog
.
append
((
from_addr
,
to_addr
,
message
))
def
_get_ics_part
(
self
,
message
):
ics_part
=
None
for
part
in
message
.
walk
():
if
part
.
get_content_type
()
==
'text/calendar'
:
ics_part
=
part
return
ics_part
def
_get_ical
(
self
,
ics
):
if
hasattr
(
Calendar
,
'from_ical'
):
cal
=
Calendar
.
from_ical
(
ics
)
elif
hasattr
(
Calendar
,
'from_string'
):
cal
=
Calendar
.
from_string
(
ics
)
for
e
in
cal
.
walk
():
if
e
.
name
==
"VEVENT"
:
return
e
return
None
def
test_002_resource_record_from_email_address
(
self
):
res
=
module_resources
.
resource_record_from_email_address
(
"doe@example.org"
)
self
.
assertEqual
(
len
(
res
),
0
);
def
test_003_resource_records_from_itip_events
(
self
):
message
=
message_from_string
(
itip_multipart
)
itips
=
itip
.
events_from_message
(
message
)
res
=
module_resources
.
resource_records_from_itip_events
(
itips
)
self
.
assertEqual
(
len
(
res
),
2
,
"Return resources:
%r
"
%
(
res
));
res
=
module_resources
.
resource_records_from_itip_events
(
itips
,
message
[
'To'
])
self
.
assertEqual
(
len
(
res
),
1
,
"Return target resource:
%r
"
%
(
res
));
self
.
assertEqual
(
"cn=resource-collection-car,ou=Resources,dc=example,dc=org"
,
res
[
0
]);
def
test_004_get_resource_owner
(
self
):
owner1
=
module_resources
.
get_resource_owner
({
'owner'
:
"uid=foo,ou=People,cd=example,dc=org"
})
self
.
assertIsInstance
(
owner1
,
dict
)
self
.
assertEqual
(
"foo@example.org"
,
owner1
[
'mail'
])
self
.
assertIn
(
"telephoneNumber"
,
owner1
[
'_attrib'
])
owner2
=
module_resources
.
get_resource_owner
({
'owner'
:
[
"uid=john,ou=People,cd=example,dc=org"
,
"uid=jane,ou=People,cd=example,dc=org"
]
})
self
.
assertIsInstance
(
owner2
,
dict
)
self
.
assertEqual
(
"john@example.org"
,
owner2
[
'mail'
])
owner3
=
module_resources
.
get_resource_owner
({
'dn'
:
"cn=cars,ou=Resources,cd=example,dc=org"
})
self
.
assertEqual
(
owner3
,
None
)
owner4
=
module_resources
.
get_resource_owner
({
'dn'
:
"cn=Room 101,ou=Resources,dc=example,dc=org"
})
self
.
assertEqual
(
"doe@example.org"
,
owner4
[
'mail'
])
def
test_005_send_response_accept
(
self
):
itip_event
=
itip
.
events_from_message
(
message_from_string
(
itip_non_multipart
))
module_resources
.
send_response
(
"resource-collection-car@example.org"
,
itip_event
)
self
.
assertEqual
(
len
(
self
.
smtplog
),
1
);
self
.
assertEqual
(
"resource-collection-car@example.org"
,
self
.
smtplog
[
0
][
0
])
self
.
assertEqual
(
"doe@example.org"
,
self
.
smtplog
[
0
][
1
])
response
=
message_from_string
(
self
.
smtplog
[
0
][
2
])
self
.
assertIn
(
"ACCEPTED"
.
lower
(),
response
[
'subject'
]
.
lower
(),
"Participant status in message subject:
%r
"
%
(
response
[
'subject'
]))
self
.
assertTrue
(
response
.
is_multipart
())
# find ics part of the response
ics_part
=
self
.
_get_ics_part
(
response
)
self
.
assertIsInstance
(
ics_part
,
message
.
Message
)
self
.
assertEqual
(
ics_part
.
get_param
(
'method'
),
"REPLY"
)
def
test_006_send_response_delegate
(
self
):
# delegate resource-collection-car@example.org => resource-car-audi-a4@example.org
itip_event
=
itip
.
events_from_message
(
message_from_string
(
itip_non_multipart
))[
0
]
itip_event
[
'xml'
]
.
delegate
(
'resource-collection-car@example.org'
,
'resource-car-audi-a4@example.org'
)
itip_event
[
'xml'
]
.
set_attendee_participant_status
(
itip_event
[
'xml'
]
.
get_attendee
(
'resource-car-audi-a4@example.org'
),
"ACCEPTED"
)
module_resources
.
send_response
(
"resource-collection-car@example.org"
,
itip_event
)
self
.
assertEqual
(
len
(
self
.
smtplog
),
2
);
self
.
assertEqual
(
"resource-collection-car@example.org"
,
self
.
smtplog
[
0
][
0
])
self
.
assertEqual
(
"resource-car-audi-a4@example.org"
,
self
.
smtplog
[
1
][
0
])
# delegated resource responds ACCEPTED
response1
=
message_from_string
(
self
.
smtplog
[
0
][
2
])
ical1
=
self
.
_get_ical
(
self
.
_get_ics_part
(
response1
)
.
get_payload
(
decode
=
True
))
self
.
assertIn
(
"DELEGATED"
.
lower
(),
response1
[
'subject'
]
.
lower
(),
"Participant status in message subject:
%r
"
%
(
response1
[
'subject'
]))
self
.
assertEqual
(
ical1
[
'attendee'
][
1
]
.
__str__
(),
"MAILTO:resource-car-audi-a4@example.org"
)
# resource collection responds DELEGATED
response2
=
message_from_string
(
self
.
smtplog
[
1
][
2
])
ical2
=
self
.
_get_ical
(
self
.
_get_ics_part
(
response2
)
.
get_payload
(
decode
=
True
))
self
.
assertIn
(
"ACCEPTED"
.
lower
(),
response2
[
'subject'
]
.
lower
(),
"Delegation message subject:
%r
"
%
(
response2
[
'subject'
]))
self
.
assertEqual
(
ical2
[
'attendee'
]
.
__str__
(),
"MAILTO:resource-car-audi-a4@example.org"
)
self
.
assertEqual
(
ical2
[
'attendee'
]
.
params
[
'PARTSTAT'
],
u"ACCEPTED"
)
File Metadata
Details
Attached
Mime Type
text/x-script.python
Expires
Mon, Apr 6, 1:48 AM (6 d, 17 h ago)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
18831878
Default Alt Text
test-011-wallace_resources.py (9 KB)
Attached To
Mode
rP pykolab
Attached
Detach File
Event Timeline