Page Menu
Home
Phorge
Search
Configure Global Search
Log In
Files
F117773532
D3779.1775240100.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Flag For Later
Award Token
Authored By
Unknown
Size
16 KB
Referenced Files
None
Subscribers
None
D3779.1775240100.diff
View Options
diff --git a/pykolab/wap_client/__init__.py b/pykolab/wap_client/__init__.py
--- a/pykolab/wap_client/__init__.py
+++ b/pykolab/wap_client/__init__.py
@@ -30,7 +30,7 @@
kolab_wap_url = conf.get('kolab_wap', 'api_url')
-if not kolab_wap_url == None:
+if kolab_wap_url is not None:
result = urlparse(kolab_wap_url)
else:
result = None
@@ -52,15 +52,16 @@
conn = None
+
def authenticate(username=None, password=None, domain=None):
global session_id
- if username == None:
+ if username is None:
username = conf.get('ldap', 'bind_dn')
- if password == None:
+ if password is None:
password = conf.get('ldap', 'bind_pw')
- if domain == None:
+ if domain is None:
domain = conf.get('kolab', 'primary_domain')
post = json.dumps(
@@ -80,10 +81,11 @@
session_id = response['session_token']
return True
+
def connect(uri=None):
global conn, API_SSL, API_PORT, API_HOSTNAME, API_BASE
- if not uri == None:
+ if uri is not None:
result = urlparse(uri)
if hasattr(result, 'scheme') and result.scheme == 'https':
@@ -99,7 +101,7 @@
if hasattr(result, 'path'):
API_BASE = result.path
- if conn == None:
+ if conn is None:
if API_SSL:
conn = httplib.HTTPSConnection(API_HOSTNAME, API_PORT)
else:
@@ -109,6 +111,7 @@
return conn
+
def disconnect(quit=False):
global conn, session_id
@@ -120,15 +123,17 @@
conn.close()
conn = None
+
def domain_add(domain, aliases=[]):
dna = conf.get('ldap', 'domain_name_attribute')
post = json.dumps({
- dna: [ domain ] + aliases
+ dna: [domain] + aliases
})
return request('POST', 'domain.add', post=post)
+
def domain_delete(domain, force=False):
domain_id, domain_attrs = domain_find(domain).popitem()
@@ -141,34 +146,41 @@
return request('POST', 'domain.delete', post=post)
+
def domain_find(domain):
dna = conf.get('ldap', 'domain_name_attribute')
- get = { dna: domain }
+ get = {dna: domain}
return request('GET', 'domain.find', get=get)
+
def domain_info(domain):
domain_id, domain_attrs = domain_find(domain)
- get = { 'id': domain_id }
+ get = {'id': domain_id}
return request('GET', 'domain.info', get=get)
+
def domains_capabilities():
return request('GET', 'domains.capabilities')
+
def domains_list():
return request('GET', 'domains.list')
+
def form_value_generate(params):
post = json.dumps(params)
return request('POST', 'form_value.generate', post=post)
+
def form_value_generate_password(*args, **kw):
return request('GET', 'form_value.generate_password')
+
def form_value_list_options(object_type, object_type_id, attribute):
post = json.dumps(
{
@@ -180,24 +192,26 @@
return request('POST', 'form_value.list_options', post=post)
+
def form_value_select_options(object_type, object_type_id, attribute):
post = json.dumps(
{
'object_type': object_type,
'type_id': object_type_id,
- 'attributes': [ attribute ]
+ 'attributes': [attribute]
}
)
return request('POST', 'form_value.select_options', post=post)
+
def get_group_input():
group_types = group_types_list()
if len(group_types) > 1:
for key in group_types:
if not key == "status":
- print("%s) %s" % (key,group_types[key]['name']))
+ print("%s) %s" % (key, group_types[key]['name']))
group_type_id = utils.ask_question("Please select the group type")
@@ -223,11 +237,12 @@
params[attribute] = utils.ask_question(attribute)
for attribute in group_type_info['auto_form_fields']:
- retval = eval("group_form_value_generate_%s(params)" % (attribute))
+ retval = eval("group_form_value_generate_%s(params)" % attribute)
params[attribute] = retval[attribute]
return params
+
def get_user_input():
user_types = user_types_list()
@@ -235,7 +250,7 @@
print("")
for key in user_types['list']:
if not key == "status":
- print("%s) %s" % (key,user_types['list'][key]['name']))
+ print("%s) %s" % (key, user_types['list'][key]['name']))
print("")
user_type_id = utils.ask_question("Please select the user type")
@@ -264,7 +279,8 @@
for attribute in user_type_info['form_fields']:
if isinstance(user_type_info['form_fields'][attribute], dict):
- if 'optional' in user_type_info['form_fields'][attribute] and user_type_info['form_fields'][attribute]['optional']:
+ if ('optional' in user_type_info['form_fields'][attribute]) and \
+ user_type_info['form_fields'][attribute]['optional']:
may_attrs.append(attribute)
else:
must_attrs.append(attribute)
@@ -284,7 +300,7 @@
default = attribute_values[attribute]['default']
params[attribute] = utils.ask_menu(
- "Choose the %s value" % (attribute),
+ "Choose the %s value" % attribute,
attribute_values[attribute]['list'],
default=default
)
@@ -295,7 +311,7 @@
default = user_type_info['form_fields'][attribute]['default']
params[attribute] = utils.ask_menu(
- "Choose the %s value" % (attribute),
+ "Choose the %s value" % attribute,
user_type_info['form_fields'][attribute]['values'],
default=default
)
@@ -314,16 +330,18 @@
return params
+
def group_add(params=None):
- if params == None:
+ if params is None:
params = get_group_input()
post = json.dumps(params)
return request('POST', 'group.add', post=post)
+
def group_delete(params=None):
- if params == None:
+ if params is None:
params = {
'id': utils.ask_question("Name of group to delete", "group")
}
@@ -332,66 +350,79 @@
return request('POST', 'group.delete', post=post)
+
def group_form_value_generate_mail(params=None):
- if params == None:
+ if params is None:
params = get_user_input()
params = json.dumps(params)
return request('POST', 'group_form_value.generate_mail', params)
+
def group_find(params=None):
- post = { 'search': { 'params': {} } }
+ post = {'search': {'params': {}}}
- for (k,v) in params.items():
- post['search']['params'][k] = { 'value': v, 'type': 'exact' }
+ for (k, v) in params.items():
+ post['search']['params'][k] = {'value': v, 'type': 'exact'}
return request('POST', 'group.find', post=json.dumps(post))
+
def group_info(group=None):
- if group == None:
+ if group is None:
group = utils.ask_question("group DN")
- return request('GET', 'group.info', get={ 'id': group })
+ return request('GET', 'group.info', get={'id': group})
+
def group_members_list(group=None):
- if group == None:
+ if group is None:
group = utils.ask_question("Group email address")
- group = request('GET', 'group.members_list?group=%s' % (group))
+ group = request('GET', 'group.members_list?group=%s' % group)
return group
+
def group_types_list():
return request('GET', 'group_types.list')
+
def groups_list(params={}):
return request('POST', 'groups.list', post=json.dumps(params))
+
def ou_add(params={}):
return request('POST', 'ou.add', post=json.dumps(params))
+
def ou_delete(params={}):
return request('POST', 'ou.delete', post=json.dumps(params))
+
def ou_edit(params={}):
return request('POST', 'ou.edit', post=json.dumps(params))
+
def ou_find(params=None):
- post = { 'search': { 'params': {} } }
+ post = {'search': {'params': {}}}
- for (k,v) in params.items():
- post['search']['params'][k] = { 'value': v, 'type': 'exact' }
+ for (k, v) in params.items():
+ post['search']['params'][k] = {'value': v, 'type': 'exact'}
return request('POST', 'ou.find', post=json.dumps(post))
+
def ou_info(ou):
- _params = { 'id': ou }
+ _params = {'id': ou}
ou = request('GET', 'ou.info', get=_params)
return ou
+
def ous_list(params={}):
return request('POST', 'ous.list', post=json.dumps(params))
+
def request(method, api_uri, get=None, post=None, headers={}):
response_data = request_raw(method, api_uri, get, post, headers)
@@ -402,10 +433,11 @@
print("%s: %s (code %s)" % (response_data['status'], response_data['reason'], response_data['code']))
return False
+
def request_raw(method, api_uri, get=None, post=None, headers={}, isretry=False):
global session_id
- if not session_id == None:
+ if session_id is not None:
headers["X-Session-Token"] = session_id
reconnect = False
@@ -414,12 +446,12 @@
if conf.debuglevel > 8:
conn.set_debuglevel(9)
- if not get == None:
+ if get is not None:
_get = "?%s" % (urllib.urlencode(get))
else:
_get = ""
- log.debug(_("Requesting %r with params %r") % ("%s/%s" % (API_BASE,api_uri), (get, post)), level=8)
+ log.debug(_("Requesting %r with params %r") % ("%s/%s" % (API_BASE, api_uri), (get, post)), level=8)
try:
conn.request(method.upper(), "%s/%s%s" % (API_BASE, api_uri, _get), post, headers)
@@ -427,7 +459,7 @@
response = conn.getresponse()
data = response.read()
- log.debug(_("Got response: %r") % (data), level=8)
+ log.debug(_("Got response: %r") % data, level=8)
except (httplib.BadStatusLine, httplib.CannotSendRequest) as e:
if isretry:
@@ -448,41 +480,48 @@
return response_data
+
def resource_add(params=None):
- if params == None:
+ if params is None:
params = get_user_input()
return request('POST', 'resource.add', post=json.dumps(params))
+
def resource_delete(params=None):
- if params == None:
+ if params is None:
params = {
'id': utils.ask_question("Resource DN to delete", "resource")
}
return request('POST', 'resource.delete', post=json.dumps(params))
+
def resource_find(params=None):
- post = { 'search': { 'params': {} } }
+ post = {'search': {'params': {}}}
- for (k,v) in params.items():
- post['search']['params'][k] = { 'value': v, 'type': 'exact' }
+ for (k, v) in params.items():
+ post['search']['params'][k] = {'value': v, 'type': 'exact'}
return request('POST', 'resource.find', post=json.dumps(post))
+
def resource_info(resource=None):
- if resource == None:
+ if resource is None:
resource = utils.ask_question("Resource DN")
- return request('GET', 'resource.info', get={ 'id': resource })
+ return request('GET', 'resource.info', get={'id': resource})
+
def resource_types_list():
return request('GET', 'resource_types.list')
+
def resources_list(params={}):
return request('POST', 'resources.list', post=json.dumps(params))
+
def role_add(params=None):
- if params == None:
+ if params is None:
role_name = utils.ask_question("Role name")
params = {
'cn': role_name
@@ -492,11 +531,13 @@
return request('POST', 'role.add', params)
+
def role_capabilities():
return request('GET', 'role.capabilities')
+
def role_delete(params=None):
- if params == None:
+ if params is None:
role_name = utils.ask_question("Role name")
role = role_find_by_attribute({'cn': role_name})
params = {
@@ -513,70 +554,81 @@
return request('POST', 'role.delete', post=post)
+
def role_find_by_attribute(params=None):
- if params == None:
+ if params is None:
role_name = utils.ask_question("Role name")
else:
role_name = params['cn']
- get = { 'cn': role_name }
+ get = {'cn': role_name}
role = request('GET', 'role.find_by_attribute', get=get)
return role
+
def role_info(role_name):
role = role_find_by_attribute({'cn': role_name})
- get = { 'role': role['id'] }
+ get = {'role': role['id']}
role = request('GET', 'role.info', get=get)
return role
+
def roles_list():
return request('GET', 'roles.list')
+
def sharedfolder_add(params=None):
- if params == None:
+ if params is None:
params = get_user_input()
return request('POST', 'sharedfolder.add', post=json.dumps(params))
+
def sharedfolder_delete(params=None):
- if params == None:
+ if params is None:
params = {
'id': utils.ask_question("Shared Folder DN to delete", "sharedfolder")
}
return request('POST', 'sharedfolder.delete', post=json.dumps(params))
+
def sharedfolders_list(params={}):
return request('POST', 'sharedfolders.list', post=json.dumps(params))
+
def system_capabilities(domain=None):
- return request('GET', 'system.capabilities', get={'domain':domain})
+ return request('GET', 'system.capabilities', get={'domain': domain})
+
def system_get_domain():
return request('GET', 'system.get_domain')
+
def system_select_domain(domain=None):
- if domain == None:
+ if domain is None:
domain = utils.ask_question("Domain name")
- get = { 'domain': domain }
+ get = {'domain': domain}
return request('GET', 'system.select_domain', get=get)
+
def user_add(params=None):
- if params == None:
+ if params is None:
params = get_user_input()
params = json.dumps(params)
return request('POST', 'user.add', post=params)
+
def user_delete(params=None):
- if params == None:
+ if params is None:
params = {
'id': utils.ask_question("Username for user to delete", "user")
}
@@ -585,8 +637,9 @@
return request('POST', 'user.delete', post=post)
-def user_edit(user = None, attributes={}):
- if user == None:
+
+def user_edit(user=None, attributes={}):
+ if user is None:
get = {
'id': utils.ask_question("Username for user to edit", "user")
}
@@ -606,12 +659,13 @@
return user_edit
+
def user_find(attribs=None):
- if attribs == None:
+ if attribs is None:
post = {
'search': {
'params': {
- utils.ask_question("Attribute") : {
+ utils.ask_question("Attribute"): {
'value': utils.ask_question("value"),
'type': 'exact'
}
@@ -619,10 +673,10 @@
}
}
else:
- post = { 'search': { 'params': {} } }
+ post = {'search': {'params': {}}}
- for (k,v) in attribs.items():
- post['search']['params'][k] = { 'value': v, 'type': 'exact' }
+ for (k, v) in attribs.items():
+ post['search']['params'][k] = {'value': v, 'type': 'exact'}
post = json.dumps(post)
@@ -630,38 +684,44 @@
return user
+
def user_form_value_generate(params=None):
- if params == None:
+ if params is None:
params = get_user_input()
post = json.dumps(params)
return request('POST', 'form_value.generate', post=post)
+
def user_form_value_generate_uid(params=None):
- if params == None:
+ if params is None:
params = get_user_input()
params = json.dumps(params)
return request('POST', 'form_value.generate_uid', params)
+
def user_form_value_generate_userpassword(*args, **kw):
result = form_value_generate_password()
- return { 'userpassword': result['password'] }
+ return {'userpassword': result['password']}
+
def user_info(user=None):
- if user == None:
+ if user is None:
user = utils.ask_question("User email address")
- _params = { 'id': user }
+ _params = {'id': user}
user = request('GET', 'user.info', get=_params)
return user
+
def users_list(params={}):
return request('POST', 'users.list', post=json.dumps(params))
+
def user_types_list():
return request('GET', 'user_types.list')
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Fri, Apr 3, 6:15 PM (12 h, 42 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
18825005
Default Alt Text
D3779.1775240100.diff (16 KB)
Attached To
Mode
D3779: refurbish code PEP8 & Co./wap_client
Attached
Detach File
Event Timeline