Page MenuHomePhorge

utils.py
No OneTemporary

Authored By
Unknown
Size
17 KB
Referenced Files
None
Subscribers
None

utils.py

# -*- coding: utf-8 -*-
# Copyright 2010-2013 Kolab Systems AG (http://www.kolabsys.com)
#
# Jeroen van Meeuwen (Kolab Systems) <vanmeeuwen a kolabsys.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import base64
import getpass
import grp
import os
import pwd
import struct
import sys
import pykolab
from pykolab import constants
from pykolab.translate import _
log = pykolab.getLogger('pykolab.utils')
conf = pykolab.getConf()
def ask_question(question, default="", password=False, confirm=False):
"""
Ask a question on stderr.
Since the answer to the question may actually be a password, cover that
case with a getpass.getpass() prompt.
Accepts a default value, but ignores defaults for password prompts.
Usage: pykolab.utils.ask_question("What is the server?", default="localhost")
"""
if not default == "" and not default == None and conf.cli_keywords.answer_default:
if not conf.cli_keywords.quiet:
print ("%s [%s]: " % (question, default))
return default
if password:
if default == "" or default == None:
answer = getpass.getpass("%s: " % (question))
else:
answer = getpass.getpass("%s [%s]: " % (question, default))
else:
if default == "" or default == None:
answer = raw_input("%s: " % (question))
else:
answer = raw_input("%s [%s]: " % (question, default))
if not answer == "":
if confirm:
answer_confirm = None
answer_confirmed = False
while not answer_confirmed:
if password:
answer_confirm = getpass.getpass(_("Confirm %s: ") % (question))
else:
answer_confirm = raw_input(_("Confirm %s: ") % (question))
if not answer_confirm == answer:
print >> sys.stderr, _("Incorrect confirmation. " + \
"Please try again.")
if password:
if default == "" or default == None:
answer = getpass.getpass(_("%s: ") % (question))
else:
answer = getpass.getpass(_("%s [%s]: ") % (question, default))
else:
if default == "" or default == None:
answer = raw_input(_("%s: ") % (question))
else:
answer = raw_input(_("%s [%s]: ") % (question, default))
else:
answer_confirmed = True
if answer == "":
return default
else:
return answer
def ask_confirmation(question, default="y", all_inclusive_no=True):
"""
Create a confirmation dialog, including a default option (capitalized),
and a "yes" or "no" parsing that can either require an explicit, full
"yes" or "no", or take the default or any YyNn answer.
"""
default_answer = None
if default in [ "y", "Y" ]:
default_answer = True
default_no = "n"
default_yes = "Y"
elif default in [ "n", "N" ]:
default_answer = False
default_no = "N"
default_yes = "y"
else:
# This is a 'yes' or 'no' question the user
# needs to provide the full yes or no for.
default_no = "'no'"
default_yes = "Please type 'yes'"
if conf.cli_keywords.answer_yes or (conf.cli_keywords.answer_default and default_answer is not None):
if not conf.cli_keywords.quiet:
print ("%s [%s/%s]: " % (question,default_yes,default_no))
if conf.cli_keywords.answer_yes:
return True
if conf.cli_keywords.answer_default:
return default_answer
answer = False
while answer == False:
answer = raw_input("%s [%s/%s]: " % (question,default_yes,default_no))
# Parse answer and set back to False if not appropriate
if all_inclusive_no:
if answer == "" and not default_answer == None:
return default_answer
elif answer in [ "y", "Y", "yes" ]:
return True
elif answer in [ "n", "N", "no" ]:
return False
else:
answer = False
print >> sys.stderr, _("Please answer 'yes' or 'no'.")
else:
if not answer in [ "y", "Y", "yes" ]:
return False
else:
return True
def ask_menu(question, options={}, default=''):
if not default == '' and conf.cli_keywords.answer_default:
if not conf.cli_keywords.quiet:
print question + " [" + default + "]:"
return default
if not default == '':
print question + " [" + default + "]:"
else:
print question
answer_correct = False
max_key_length = 0
if isinstance(options, list):
_options = options
options = {}
for key in _options:
options[key] = key
keys = options.keys()
keys.sort()
while not answer_correct:
for key in keys:
key_length = len("%s" % key)
if key_length > max_key_length:
max_key_length = key_length
str_format = "%%%ds" % max_key_length
if default == '' or not default in options.keys():
for key in keys:
if options[key] == key:
print " - " + key
else:
print " - " + eval("str_format % key") + ": " + options[key]
answer = raw_input(_("Choice") + ": ")
else:
answer = raw_input(_("Choice (type '?' for options)") + ": ")
if answer == '?':
for key in keys:
if options[key] == key:
print " - " + key
else:
print " - " + eval("str_format % key") + ": " + options[key]
continue
if answer == '' and default in options.keys():
answer = default
if answer in [str(x) for x in options.keys()]:
answer_correct = True
return answer
def decode(key, enc):
if key == None:
return enc
dec = []
enc = base64.urlsafe_b64decode(enc)
for i in range(len(enc)):
key_c = key[i % len(key)]
dec_c = chr((256 + ord(enc[i]) - ord(key_c)) % 256)
dec.append(dec_c)
return "".join(dec)
def encode(key, clear):
if key == None:
return clear
enc = []
for i in range(len(clear)):
key_c = key[i % len(key)]
enc_c = chr((ord(clear[i]) + ord(key_c)) % 256)
enc.append(enc_c)
return base64.urlsafe_b64encode("".join(enc))
def ensure_directory(_dir, _user='root', _group='root'):
if not os.path.isdir(_dir):
os.makedirs(_dir)
try:
try:
(ruid, euid, suid) = os.getresuid()
(rgid, egid, sgid) = os.getresgid()
except AttributeError, errmsg:
ruid = os.getuid()
rgid = os.getgid()
if ruid == 0:
# Means we can setreuid() / setregid() / setgroups()
if rgid == 0:
# Get group entry details
try:
(
group_name,
group_password,
group_gid,
group_members
) = grp.getgrnam(_group)
except KeyError:
print >> sys.stderr, _("Group %s does not exist") % (
_group
)
sys.exit(1)
# Set real and effective group if not the same as current.
if not group_gid == rgid:
os.chown(_dir, -1, group_gid)
if ruid == 0:
# Means we haven't switched yet.
try:
(
user_name,
user_password,
user_uid,
user_gid,
user_gecos,
user_homedir,
user_shell
) = pwd.getpwnam(_user)
except KeyError:
print >> sys.stderr, _("User %s does not exist") % (_user)
sys.exit(1)
# Set real and effective user if not the same as current.
if not user_uid == ruid:
os.chown(_dir, user_uid, -1)
except:
print >> sys.stderr, _("Could not change the permissions on %s") % (_dir)
def generate_password():
import subprocess
p1 = subprocess.Popen(['head', '-c', '200', '/dev/urandom'], stdout=subprocess.PIPE)
p2 = subprocess.Popen(['tr', '-dc', '_A-Z-a-z-0-9'], stdin=p1.stdout, stdout=subprocess.PIPE)
p3 = subprocess.Popen(['head', '-c', '15'], stdin=p2.stdout, stdout=subprocess.PIPE)
p1.stdout.close()
p2.stdout.close()
output = p3.communicate()[0]
return output
def multiline_message(message):
if conf.cli_keywords.quiet:
return ""
column_width = 80
# First, replace all occurences of "\n"
message = message.replace(" ", "")
message = message.replace("\n", " ")
lines = []
line = ""
for word in message.split():
if (len(line) + len(word)) > column_width:
lines.append(line)
line = word
else:
if line == "":
line = word
else:
line += " %s" % (word)
lines.append(line)
return "\n%s\n" % ("\n".join(lines))
def stripped_message(message):
return "\n" + message.strip() + "\n"
def str2unicode(s, encoding='utf-8'):
if isinstance(s, unicode):
return s
try:
return unicode(s, encoding)
except:
pass
return s
def normalize(_object):
if type(_object) == list:
result = []
elif type(_object) == dict:
result = {}
else:
return _object
if type(_object) == list:
for item in _object:
result.append(item.lower())
result = list(set(result))
return result
elif type(_object) == dict:
for key in _object.keys():
if type(_object[key]) == list:
if _object[key] == None:
continue
if len(_object[key]) == 1:
result[key.lower()] = ''.join(_object[key])
else:
result[key.lower()] = _object[key]
else:
if _object[key] == None:
continue
# What the heck?
result[key.lower()] = _object[key]
if result.has_key('objectsid') and not result['objectsid'][0] == "S":
result['objectsid'] = sid_to_string(result['objectsid'])
if result.has_key('sn'):
result['surname'] = result['sn'].replace(' ', '')
if result.has_key('mail'):
if isinstance(result['mail'], list):
result['mail'] = result['mail'][0]
if len(result['mail']) > 0:
if len(result['mail'].split('@')) > 1:
result['domain'] = result['mail'].split('@')[1]
if not result.has_key('domain') and result.has_key('standard_domain'):
result['domain'] = result['standard_domain']
return result
def parse_input(_input, splitchars= [ ' ' ]):
"""
Split the input string using the split characters defined
in splitchars, and remove the empty list items, then unique the
list items.
Takes a string as input, and a list of characters the string should be
split with (list of delimiter characters).
"""
_parse_list = _input.split(splitchars.pop())
_output_list = []
for splitchar in splitchars:
__parse_list = []
for item in _parse_list:
__parse_list.extend(item.split(splitchar))
_parse_list = __parse_list
for item in _parse_list:
if not item == '':
if _output_list.count(item) < 1:
_output_list.append(item)
return _output_list
def parse_ldap_uri(uri):
"""
Parse an LDAP URI and return it's components.
Returns a tuple containing;
- protocol (ldap, ldaps),
- server (address or None),
- base_dn,
- attrs (list of attributes length 1, or empty),
- scope,
- filter
or None on failure
"""
_protocol = uri.split(':')[0]
try:
try:
_ldap_uri, _attr, _scope, _filter = uri.split('?')
_server = _ldap_uri.split('//')[1].split('/')[0]
_base_dn = _ldap_uri.split('//')[1].split('/')[1]
except:
_server = uri.split('//')[1].split('/')[0]
_attr = None
_scope = None
_filter = None
_base_dn = None
if len(_server.split(':')) > 1:
_port = _server.split(':')[1]
_server = _server.split(':')[0]
else:
if _protocol == 'ldaps':
_port = "636"
else:
_port = "389"
if _server == '':
_server = None
if _attr == '':
_attrs = []
else:
_attrs = [ _attr ]
if _scope == '':
_scope = 'sub'
if _filter == '':
_filter = "(objectclass=*)"
return (_protocol, _server, _port, _base_dn, _attr, _scope, _filter)
except:
return None
def pop_empty_from_list(_input_list):
_output_list = []
for item in _input_list:
if not item == '':
_output_list.append(item)
def sid_to_string(sid):
srl = ord(sid[0])
number_sub_id = ord(sid[1])
iav = struct.unpack('!Q', '\x00\x00' + sid[2:8])[0]
sub_ids = []
for i in range(number_sub_id):
sub_ids.append(struct.unpack('<I',sid[8+4*i:12+4*i])[0])
result = 'S-%d-%d-%s' % (
srl,
iav,
'-'.join([str(s) for s in sub_ids]),
)
return result
def standard_root_dn(domain):
return 'dc=%s' % (',dc='.join(domain.split('.')))
def translate(mystring, locale_name='en_US'):
import locale
import subprocess
log.debug(_("Transliterating string %r with locale %r") % (mystring, locale_name), level=8)
if len(locale.normalize(locale_name).split('.')) > 1:
(locale_name,locale_charset) = locale.normalize(locale_name).split('.')
else:
locale_charset = 'utf-8'
try:
log.debug(_("Attempting to set locale"), level=8)
locale.setlocale(locale.LC_ALL, (locale_name,locale_charset))
log.debug(_("Success setting locale"), level=8)
except:
log.debug(_("Failure to set locale"), level=8)
pass
command = [ '/usr/bin/iconv',
'-f', 'UTF-8',
'-t', 'ASCII//TRANSLIT',
'-s' ]
log.debug(_("Executing '%s | %s'") % (r"%s" % (mystring), ' '.join(command)), level=8)
process = subprocess.Popen(command, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE, env={'LANG': locale.normalize(locale_name)})
try:
print >> process.stdin, r"%s" % mystring
except UnicodeEncodeError, errmsg:
pass
result = process.communicate()[0].strip()
if '?' in result or (result == '' and not mystring == ''):
log.warning(_("Could not translate %s using locale %s") % (mystring, locale_name))
from pykolab import translit
result = translit.transliterate(mystring, locale_name)
return result
def true_or_false(val):
if val == None:
return False
if isinstance(val, bool):
return val
if isinstance(val, basestring) or isinstance(val, str):
val = val.lower()
if val in [ "true", "yes", "y", "1" ]:
return True
else:
return False
if isinstance(val, int) or isinstance(val, float):
if val >= 1:
return True
else:
return False
def is_service(services):
"""
Checks each item in list services to see if it has a RC script in
pykolab.constants.RC_DIR to see if it's a service, and returns
the name of the service for the first service it can find. However,
it also checks whether the other services exist and issues a warning if
more then one service exists.
Usage: utils.is_service(['dirsrv', 'ldap'])
"""
_service = None
_other_services = []
for service in services:
if os.path.isfile(os.path.join(constants.RC_DIR, service)):
if _service == '':
_service = service
else:
_other_services.append(service)
return (_service,_other_services)

File Metadata

Mime Type
text/x-script.python
Expires
Mon, Apr 6, 12:58 AM (5 d, 2 h ago)
Storage Engine
local-disk
Storage Format
Raw Data
Storage Handle
b9/57/592173397f8540ae3aabf575b5b2
Default Alt Text
utils.py (17 KB)

Event Timeline