Page MenuHomePhorge

D3779.1775240100.diff
No OneTemporary

Authored By
Unknown
Size
16 KB
Referenced Files
None
Subscribers
None

D3779.1775240100.diff

diff --git a/pykolab/wap_client/__init__.py b/pykolab/wap_client/__init__.py
--- a/pykolab/wap_client/__init__.py
+++ b/pykolab/wap_client/__init__.py
@@ -30,7 +30,7 @@
kolab_wap_url = conf.get('kolab_wap', 'api_url')
-if not kolab_wap_url == None:
+if kolab_wap_url is not None:
result = urlparse(kolab_wap_url)
else:
result = None
@@ -52,15 +52,16 @@
conn = None
+
def authenticate(username=None, password=None, domain=None):
global session_id
- if username == None:
+ if username is None:
username = conf.get('ldap', 'bind_dn')
- if password == None:
+ if password is None:
password = conf.get('ldap', 'bind_pw')
- if domain == None:
+ if domain is None:
domain = conf.get('kolab', 'primary_domain')
post = json.dumps(
@@ -80,10 +81,11 @@
session_id = response['session_token']
return True
+
def connect(uri=None):
global conn, API_SSL, API_PORT, API_HOSTNAME, API_BASE
- if not uri == None:
+ if uri is not None:
result = urlparse(uri)
if hasattr(result, 'scheme') and result.scheme == 'https':
@@ -99,7 +101,7 @@
if hasattr(result, 'path'):
API_BASE = result.path
- if conn == None:
+ if conn is None:
if API_SSL:
conn = httplib.HTTPSConnection(API_HOSTNAME, API_PORT)
else:
@@ -109,6 +111,7 @@
return conn
+
def disconnect(quit=False):
global conn, session_id
@@ -120,15 +123,17 @@
conn.close()
conn = None
+
def domain_add(domain, aliases=[]):
dna = conf.get('ldap', 'domain_name_attribute')
post = json.dumps({
- dna: [ domain ] + aliases
+ dna: [domain] + aliases
})
return request('POST', 'domain.add', post=post)
+
def domain_delete(domain, force=False):
domain_id, domain_attrs = domain_find(domain).popitem()
@@ -141,34 +146,41 @@
return request('POST', 'domain.delete', post=post)
+
def domain_find(domain):
dna = conf.get('ldap', 'domain_name_attribute')
- get = { dna: domain }
+ get = {dna: domain}
return request('GET', 'domain.find', get=get)
+
def domain_info(domain):
domain_id, domain_attrs = domain_find(domain)
- get = { 'id': domain_id }
+ get = {'id': domain_id}
return request('GET', 'domain.info', get=get)
+
def domains_capabilities():
return request('GET', 'domains.capabilities')
+
def domains_list():
return request('GET', 'domains.list')
+
def form_value_generate(params):
post = json.dumps(params)
return request('POST', 'form_value.generate', post=post)
+
def form_value_generate_password(*args, **kw):
return request('GET', 'form_value.generate_password')
+
def form_value_list_options(object_type, object_type_id, attribute):
post = json.dumps(
{
@@ -180,24 +192,26 @@
return request('POST', 'form_value.list_options', post=post)
+
def form_value_select_options(object_type, object_type_id, attribute):
post = json.dumps(
{
'object_type': object_type,
'type_id': object_type_id,
- 'attributes': [ attribute ]
+ 'attributes': [attribute]
}
)
return request('POST', 'form_value.select_options', post=post)
+
def get_group_input():
group_types = group_types_list()
if len(group_types) > 1:
for key in group_types:
if not key == "status":
- print("%s) %s" % (key,group_types[key]['name']))
+ print("%s) %s" % (key, group_types[key]['name']))
group_type_id = utils.ask_question("Please select the group type")
@@ -223,11 +237,12 @@
params[attribute] = utils.ask_question(attribute)
for attribute in group_type_info['auto_form_fields']:
- retval = eval("group_form_value_generate_%s(params)" % (attribute))
+ retval = eval("group_form_value_generate_%s(params)" % attribute)
params[attribute] = retval[attribute]
return params
+
def get_user_input():
user_types = user_types_list()
@@ -235,7 +250,7 @@
print("")
for key in user_types['list']:
if not key == "status":
- print("%s) %s" % (key,user_types['list'][key]['name']))
+ print("%s) %s" % (key, user_types['list'][key]['name']))
print("")
user_type_id = utils.ask_question("Please select the user type")
@@ -264,7 +279,8 @@
for attribute in user_type_info['form_fields']:
if isinstance(user_type_info['form_fields'][attribute], dict):
- if 'optional' in user_type_info['form_fields'][attribute] and user_type_info['form_fields'][attribute]['optional']:
+ if ('optional' in user_type_info['form_fields'][attribute]) and \
+ user_type_info['form_fields'][attribute]['optional']:
may_attrs.append(attribute)
else:
must_attrs.append(attribute)
@@ -284,7 +300,7 @@
default = attribute_values[attribute]['default']
params[attribute] = utils.ask_menu(
- "Choose the %s value" % (attribute),
+ "Choose the %s value" % attribute,
attribute_values[attribute]['list'],
default=default
)
@@ -295,7 +311,7 @@
default = user_type_info['form_fields'][attribute]['default']
params[attribute] = utils.ask_menu(
- "Choose the %s value" % (attribute),
+ "Choose the %s value" % attribute,
user_type_info['form_fields'][attribute]['values'],
default=default
)
@@ -314,16 +330,18 @@
return params
+
def group_add(params=None):
- if params == None:
+ if params is None:
params = get_group_input()
post = json.dumps(params)
return request('POST', 'group.add', post=post)
+
def group_delete(params=None):
- if params == None:
+ if params is None:
params = {
'id': utils.ask_question("Name of group to delete", "group")
}
@@ -332,66 +350,79 @@
return request('POST', 'group.delete', post=post)
+
def group_form_value_generate_mail(params=None):
- if params == None:
+ if params is None:
params = get_user_input()
params = json.dumps(params)
return request('POST', 'group_form_value.generate_mail', params)
+
def group_find(params=None):
- post = { 'search': { 'params': {} } }
+ post = {'search': {'params': {}}}
- for (k,v) in params.items():
- post['search']['params'][k] = { 'value': v, 'type': 'exact' }
+ for (k, v) in params.items():
+ post['search']['params'][k] = {'value': v, 'type': 'exact'}
return request('POST', 'group.find', post=json.dumps(post))
+
def group_info(group=None):
- if group == None:
+ if group is None:
group = utils.ask_question("group DN")
- return request('GET', 'group.info', get={ 'id': group })
+ return request('GET', 'group.info', get={'id': group})
+
def group_members_list(group=None):
- if group == None:
+ if group is None:
group = utils.ask_question("Group email address")
- group = request('GET', 'group.members_list?group=%s' % (group))
+ group = request('GET', 'group.members_list?group=%s' % group)
return group
+
def group_types_list():
return request('GET', 'group_types.list')
+
def groups_list(params={}):
return request('POST', 'groups.list', post=json.dumps(params))
+
def ou_add(params={}):
return request('POST', 'ou.add', post=json.dumps(params))
+
def ou_delete(params={}):
return request('POST', 'ou.delete', post=json.dumps(params))
+
def ou_edit(params={}):
return request('POST', 'ou.edit', post=json.dumps(params))
+
def ou_find(params=None):
- post = { 'search': { 'params': {} } }
+ post = {'search': {'params': {}}}
- for (k,v) in params.items():
- post['search']['params'][k] = { 'value': v, 'type': 'exact' }
+ for (k, v) in params.items():
+ post['search']['params'][k] = {'value': v, 'type': 'exact'}
return request('POST', 'ou.find', post=json.dumps(post))
+
def ou_info(ou):
- _params = { 'id': ou }
+ _params = {'id': ou}
ou = request('GET', 'ou.info', get=_params)
return ou
+
def ous_list(params={}):
return request('POST', 'ous.list', post=json.dumps(params))
+
def request(method, api_uri, get=None, post=None, headers={}):
response_data = request_raw(method, api_uri, get, post, headers)
@@ -402,10 +433,11 @@
print("%s: %s (code %s)" % (response_data['status'], response_data['reason'], response_data['code']))
return False
+
def request_raw(method, api_uri, get=None, post=None, headers={}, isretry=False):
global session_id
- if not session_id == None:
+ if session_id is not None:
headers["X-Session-Token"] = session_id
reconnect = False
@@ -414,12 +446,12 @@
if conf.debuglevel > 8:
conn.set_debuglevel(9)
- if not get == None:
+ if get is not None:
_get = "?%s" % (urllib.urlencode(get))
else:
_get = ""
- log.debug(_("Requesting %r with params %r") % ("%s/%s" % (API_BASE,api_uri), (get, post)), level=8)
+ log.debug(_("Requesting %r with params %r") % ("%s/%s" % (API_BASE, api_uri), (get, post)), level=8)
try:
conn.request(method.upper(), "%s/%s%s" % (API_BASE, api_uri, _get), post, headers)
@@ -427,7 +459,7 @@
response = conn.getresponse()
data = response.read()
- log.debug(_("Got response: %r") % (data), level=8)
+ log.debug(_("Got response: %r") % data, level=8)
except (httplib.BadStatusLine, httplib.CannotSendRequest) as e:
if isretry:
@@ -448,41 +480,48 @@
return response_data
+
def resource_add(params=None):
- if params == None:
+ if params is None:
params = get_user_input()
return request('POST', 'resource.add', post=json.dumps(params))
+
def resource_delete(params=None):
- if params == None:
+ if params is None:
params = {
'id': utils.ask_question("Resource DN to delete", "resource")
}
return request('POST', 'resource.delete', post=json.dumps(params))
+
def resource_find(params=None):
- post = { 'search': { 'params': {} } }
+ post = {'search': {'params': {}}}
- for (k,v) in params.items():
- post['search']['params'][k] = { 'value': v, 'type': 'exact' }
+ for (k, v) in params.items():
+ post['search']['params'][k] = {'value': v, 'type': 'exact'}
return request('POST', 'resource.find', post=json.dumps(post))
+
def resource_info(resource=None):
- if resource == None:
+ if resource is None:
resource = utils.ask_question("Resource DN")
- return request('GET', 'resource.info', get={ 'id': resource })
+ return request('GET', 'resource.info', get={'id': resource})
+
def resource_types_list():
return request('GET', 'resource_types.list')
+
def resources_list(params={}):
return request('POST', 'resources.list', post=json.dumps(params))
+
def role_add(params=None):
- if params == None:
+ if params is None:
role_name = utils.ask_question("Role name")
params = {
'cn': role_name
@@ -492,11 +531,13 @@
return request('POST', 'role.add', params)
+
def role_capabilities():
return request('GET', 'role.capabilities')
+
def role_delete(params=None):
- if params == None:
+ if params is None:
role_name = utils.ask_question("Role name")
role = role_find_by_attribute({'cn': role_name})
params = {
@@ -513,70 +554,81 @@
return request('POST', 'role.delete', post=post)
+
def role_find_by_attribute(params=None):
- if params == None:
+ if params is None:
role_name = utils.ask_question("Role name")
else:
role_name = params['cn']
- get = { 'cn': role_name }
+ get = {'cn': role_name}
role = request('GET', 'role.find_by_attribute', get=get)
return role
+
def role_info(role_name):
role = role_find_by_attribute({'cn': role_name})
- get = { 'role': role['id'] }
+ get = {'role': role['id']}
role = request('GET', 'role.info', get=get)
return role
+
def roles_list():
return request('GET', 'roles.list')
+
def sharedfolder_add(params=None):
- if params == None:
+ if params is None:
params = get_user_input()
return request('POST', 'sharedfolder.add', post=json.dumps(params))
+
def sharedfolder_delete(params=None):
- if params == None:
+ if params is None:
params = {
'id': utils.ask_question("Shared Folder DN to delete", "sharedfolder")
}
return request('POST', 'sharedfolder.delete', post=json.dumps(params))
+
def sharedfolders_list(params={}):
return request('POST', 'sharedfolders.list', post=json.dumps(params))
+
def system_capabilities(domain=None):
- return request('GET', 'system.capabilities', get={'domain':domain})
+ return request('GET', 'system.capabilities', get={'domain': domain})
+
def system_get_domain():
return request('GET', 'system.get_domain')
+
def system_select_domain(domain=None):
- if domain == None:
+ if domain is None:
domain = utils.ask_question("Domain name")
- get = { 'domain': domain }
+ get = {'domain': domain}
return request('GET', 'system.select_domain', get=get)
+
def user_add(params=None):
- if params == None:
+ if params is None:
params = get_user_input()
params = json.dumps(params)
return request('POST', 'user.add', post=params)
+
def user_delete(params=None):
- if params == None:
+ if params is None:
params = {
'id': utils.ask_question("Username for user to delete", "user")
}
@@ -585,8 +637,9 @@
return request('POST', 'user.delete', post=post)
-def user_edit(user = None, attributes={}):
- if user == None:
+
+def user_edit(user=None, attributes={}):
+ if user is None:
get = {
'id': utils.ask_question("Username for user to edit", "user")
}
@@ -606,12 +659,13 @@
return user_edit
+
def user_find(attribs=None):
- if attribs == None:
+ if attribs is None:
post = {
'search': {
'params': {
- utils.ask_question("Attribute") : {
+ utils.ask_question("Attribute"): {
'value': utils.ask_question("value"),
'type': 'exact'
}
@@ -619,10 +673,10 @@
}
}
else:
- post = { 'search': { 'params': {} } }
+ post = {'search': {'params': {}}}
- for (k,v) in attribs.items():
- post['search']['params'][k] = { 'value': v, 'type': 'exact' }
+ for (k, v) in attribs.items():
+ post['search']['params'][k] = {'value': v, 'type': 'exact'}
post = json.dumps(post)
@@ -630,38 +684,44 @@
return user
+
def user_form_value_generate(params=None):
- if params == None:
+ if params is None:
params = get_user_input()
post = json.dumps(params)
return request('POST', 'form_value.generate', post=post)
+
def user_form_value_generate_uid(params=None):
- if params == None:
+ if params is None:
params = get_user_input()
params = json.dumps(params)
return request('POST', 'form_value.generate_uid', params)
+
def user_form_value_generate_userpassword(*args, **kw):
result = form_value_generate_password()
- return { 'userpassword': result['password'] }
+ return {'userpassword': result['password']}
+
def user_info(user=None):
- if user == None:
+ if user is None:
user = utils.ask_question("User email address")
- _params = { 'id': user }
+ _params = {'id': user}
user = request('GET', 'user.info', get=_params)
return user
+
def users_list(params={}):
return request('POST', 'users.list', post=json.dumps(params))
+
def user_types_list():
return request('GET', 'user_types.list')

File Metadata

Mime Type
text/plain
Expires
Fri, Apr 3, 6:15 PM (12 h, 42 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
18825005
Default Alt Text
D3779.1775240100.diff (16 KB)

Event Timeline