Page Menu
Home
Phorge
Search
Configure Global Search
Log In
Files
F117883261
utils.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Flag For Later
Award Token
Authored By
Unknown
Size
17 KB
Referenced Files
None
Subscribers
None
utils.py
View Options
# -*- coding: utf-8 -*-
# Copyright 2010-2013 Kolab Systems AG (http://www.kolabsys.com)
#
# Jeroen van Meeuwen (Kolab Systems) <vanmeeuwen a kolabsys.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import
base64
import
getpass
import
grp
import
os
import
pwd
import
struct
import
sys
import
pykolab
from
pykolab
import
constants
from
pykolab.translate
import
_
log
=
pykolab
.
getLogger
(
'pykolab.utils'
)
conf
=
pykolab
.
getConf
()
def
ask_question
(
question
,
default
=
""
,
password
=
False
,
confirm
=
False
):
"""
Ask a question on stderr.
Since the answer to the question may actually be a password, cover that
case with a getpass.getpass() prompt.
Accepts a default value, but ignores defaults for password prompts.
Usage: pykolab.utils.ask_question("What is the server?", default="localhost")
"""
if
not
default
==
""
and
not
default
==
None
and
conf
.
cli_keywords
.
answer_default
:
if
not
conf
.
cli_keywords
.
quiet
:
print
(
"
%s
[
%s
]: "
%
(
question
,
default
))
return
default
if
password
:
if
default
==
""
or
default
==
None
:
answer
=
getpass
.
getpass
(
"
%s
: "
%
(
question
))
else
:
answer
=
getpass
.
getpass
(
"
%s
[
%s
]: "
%
(
question
,
default
))
else
:
if
default
==
""
or
default
==
None
:
answer
=
raw_input
(
"
%s
: "
%
(
question
))
else
:
answer
=
raw_input
(
"
%s
[
%s
]: "
%
(
question
,
default
))
if
not
answer
==
""
:
if
confirm
:
answer_confirm
=
None
answer_confirmed
=
False
while
not
answer_confirmed
:
if
password
:
answer_confirm
=
getpass
.
getpass
(
_
(
"Confirm
%s
: "
)
%
(
question
))
else
:
answer_confirm
=
raw_input
(
_
(
"Confirm
%s
: "
)
%
(
question
))
if
not
answer_confirm
==
answer
:
print
>>
sys
.
stderr
,
_
(
"Incorrect confirmation. "
+
\
"Please try again."
)
if
password
:
if
default
==
""
or
default
==
None
:
answer
=
getpass
.
getpass
(
_
(
"
%s
: "
)
%
(
question
))
else
:
answer
=
getpass
.
getpass
(
_
(
"
%s
[
%s
]: "
)
%
(
question
,
default
))
else
:
if
default
==
""
or
default
==
None
:
answer
=
raw_input
(
_
(
"
%s
: "
)
%
(
question
))
else
:
answer
=
raw_input
(
_
(
"
%s
[
%s
]: "
)
%
(
question
,
default
))
else
:
answer_confirmed
=
True
if
answer
==
""
:
return
default
else
:
return
answer
def
ask_confirmation
(
question
,
default
=
"y"
,
all_inclusive_no
=
True
):
"""
Create a confirmation dialog, including a default option (capitalized),
and a "yes" or "no" parsing that can either require an explicit, full
"yes" or "no", or take the default or any YyNn answer.
"""
default_answer
=
None
if
default
in
[
"y"
,
"Y"
]:
default_answer
=
True
default_no
=
"n"
default_yes
=
"Y"
elif
default
in
[
"n"
,
"N"
]:
default_answer
=
False
default_no
=
"N"
default_yes
=
"y"
else
:
# This is a 'yes' or 'no' question the user
# needs to provide the full yes or no for.
default_no
=
"'no'"
default_yes
=
"Please type 'yes'"
if
conf
.
cli_keywords
.
answer_yes
or
(
conf
.
cli_keywords
.
answer_default
and
default_answer
is
not
None
):
if
not
conf
.
cli_keywords
.
quiet
:
print
(
"
%s
[
%s
/
%s
]: "
%
(
question
,
default_yes
,
default_no
))
if
conf
.
cli_keywords
.
answer_yes
:
return
True
if
conf
.
cli_keywords
.
answer_default
:
return
default_answer
answer
=
False
while
answer
==
False
:
answer
=
raw_input
(
"
%s
[
%s
/
%s
]: "
%
(
question
,
default_yes
,
default_no
))
# Parse answer and set back to False if not appropriate
if
all_inclusive_no
:
if
answer
==
""
and
not
default_answer
==
None
:
return
default_answer
elif
answer
in
[
"y"
,
"Y"
,
"yes"
]:
return
True
elif
answer
in
[
"n"
,
"N"
,
"no"
]:
return
False
else
:
answer
=
False
print
>>
sys
.
stderr
,
_
(
"Please answer 'yes' or 'no'."
)
else
:
if
not
answer
in
[
"y"
,
"Y"
,
"yes"
]:
return
False
else
:
return
True
def
ask_menu
(
question
,
options
=
{},
default
=
''
):
if
not
default
==
''
and
conf
.
cli_keywords
.
answer_default
:
if
not
conf
.
cli_keywords
.
quiet
:
print
question
+
" ["
+
default
+
"]:"
return
default
if
not
default
==
''
:
print
question
+
" ["
+
default
+
"]:"
else
:
print
question
answer_correct
=
False
max_key_length
=
0
if
isinstance
(
options
,
list
):
_options
=
options
options
=
{}
for
key
in
_options
:
options
[
key
]
=
key
keys
=
options
.
keys
()
keys
.
sort
()
while
not
answer_correct
:
for
key
in
keys
:
key_length
=
len
(
"
%s
"
%
key
)
if
key_length
>
max_key_length
:
max_key_length
=
key_length
str_format
=
"
%%%d
s"
%
max_key_length
if
default
==
''
or
not
default
in
options
.
keys
():
for
key
in
keys
:
if
options
[
key
]
==
key
:
print
" - "
+
key
else
:
print
" - "
+
eval
(
"str_format % key"
)
+
": "
+
options
[
key
]
answer
=
raw_input
(
_
(
"Choice"
)
+
": "
)
else
:
answer
=
raw_input
(
_
(
"Choice (type '?' for options)"
)
+
": "
)
if
answer
==
'?'
:
for
key
in
keys
:
if
options
[
key
]
==
key
:
print
" - "
+
key
else
:
print
" - "
+
eval
(
"str_format % key"
)
+
": "
+
options
[
key
]
continue
if
answer
==
''
and
default
in
options
.
keys
():
answer
=
default
if
answer
in
[
str
(
x
)
for
x
in
options
.
keys
()]:
answer_correct
=
True
return
answer
def
decode
(
key
,
enc
):
if
key
==
None
:
return
enc
dec
=
[]
enc
=
base64
.
urlsafe_b64decode
(
enc
)
for
i
in
range
(
len
(
enc
)):
key_c
=
key
[
i
%
len
(
key
)]
dec_c
=
chr
((
256
+
ord
(
enc
[
i
])
-
ord
(
key_c
))
%
256
)
dec
.
append
(
dec_c
)
return
""
.
join
(
dec
)
def
encode
(
key
,
clear
):
if
key
==
None
:
return
clear
enc
=
[]
for
i
in
range
(
len
(
clear
)):
key_c
=
key
[
i
%
len
(
key
)]
enc_c
=
chr
((
ord
(
clear
[
i
])
+
ord
(
key_c
))
%
256
)
enc
.
append
(
enc_c
)
return
base64
.
urlsafe_b64encode
(
""
.
join
(
enc
))
def
ensure_directory
(
_dir
,
_user
=
'root'
,
_group
=
'root'
):
if
not
os
.
path
.
isdir
(
_dir
):
os
.
makedirs
(
_dir
)
try
:
try
:
(
ruid
,
euid
,
suid
)
=
os
.
getresuid
()
(
rgid
,
egid
,
sgid
)
=
os
.
getresgid
()
except
AttributeError
,
errmsg
:
ruid
=
os
.
getuid
()
rgid
=
os
.
getgid
()
if
ruid
==
0
:
# Means we can setreuid() / setregid() / setgroups()
if
rgid
==
0
:
# Get group entry details
try
:
(
group_name
,
group_password
,
group_gid
,
group_members
)
=
grp
.
getgrnam
(
_group
)
except
KeyError
:
print
>>
sys
.
stderr
,
_
(
"Group
%s
does not exist"
)
%
(
_group
)
sys
.
exit
(
1
)
# Set real and effective group if not the same as current.
if
not
group_gid
==
rgid
:
os
.
chown
(
_dir
,
-
1
,
group_gid
)
if
ruid
==
0
:
# Means we haven't switched yet.
try
:
(
user_name
,
user_password
,
user_uid
,
user_gid
,
user_gecos
,
user_homedir
,
user_shell
)
=
pwd
.
getpwnam
(
_user
)
except
KeyError
:
print
>>
sys
.
stderr
,
_
(
"User
%s
does not exist"
)
%
(
_user
)
sys
.
exit
(
1
)
# Set real and effective user if not the same as current.
if
not
user_uid
==
ruid
:
os
.
chown
(
_dir
,
user_uid
,
-
1
)
except
:
print
>>
sys
.
stderr
,
_
(
"Could not change the permissions on
%s
"
)
%
(
_dir
)
def
generate_password
():
import
subprocess
p1
=
subprocess
.
Popen
([
'head'
,
'-c'
,
'200'
,
'/dev/urandom'
],
stdout
=
subprocess
.
PIPE
)
p2
=
subprocess
.
Popen
([
'tr'
,
'-dc'
,
'_A-Z-a-z-0-9'
],
stdin
=
p1
.
stdout
,
stdout
=
subprocess
.
PIPE
)
p3
=
subprocess
.
Popen
([
'head'
,
'-c'
,
'15'
],
stdin
=
p2
.
stdout
,
stdout
=
subprocess
.
PIPE
)
p1
.
stdout
.
close
()
p2
.
stdout
.
close
()
output
=
p3
.
communicate
()[
0
]
return
output
def
multiline_message
(
message
):
if
conf
.
cli_keywords
.
quiet
:
return
""
column_width
=
80
# First, replace all occurences of "\n"
message
=
message
.
replace
(
" "
,
""
)
message
=
message
.
replace
(
"
\n
"
,
" "
)
lines
=
[]
line
=
""
for
word
in
message
.
split
():
if
(
len
(
line
)
+
len
(
word
))
>
column_width
:
lines
.
append
(
line
)
line
=
word
else
:
if
line
==
""
:
line
=
word
else
:
line
+=
"
%s
"
%
(
word
)
lines
.
append
(
line
)
return
"
\n
%s
\n
"
%
(
"
\n
"
.
join
(
lines
))
def
stripped_message
(
message
):
return
"
\n
"
+
message
.
strip
()
+
"
\n
"
def
str2unicode
(
s
,
encoding
=
'utf-8'
):
if
isinstance
(
s
,
unicode
):
return
s
try
:
return
unicode
(
s
,
encoding
)
except
:
pass
return
s
def
normalize
(
_object
):
if
type
(
_object
)
==
list
:
result
=
[]
elif
type
(
_object
)
==
dict
:
result
=
{}
else
:
return
_object
if
type
(
_object
)
==
list
:
for
item
in
_object
:
result
.
append
(
item
.
lower
())
result
=
list
(
set
(
result
))
return
result
elif
type
(
_object
)
==
dict
:
for
key
in
_object
.
keys
():
if
type
(
_object
[
key
])
==
list
:
if
_object
[
key
]
==
None
:
continue
if
len
(
_object
[
key
])
==
1
:
result
[
key
.
lower
()]
=
''
.
join
(
_object
[
key
])
else
:
result
[
key
.
lower
()]
=
_object
[
key
]
else
:
if
_object
[
key
]
==
None
:
continue
# What the heck?
result
[
key
.
lower
()]
=
_object
[
key
]
if
result
.
has_key
(
'objectsid'
)
and
not
result
[
'objectsid'
][
0
]
==
"S"
:
result
[
'objectsid'
]
=
sid_to_string
(
result
[
'objectsid'
])
if
result
.
has_key
(
'sn'
):
result
[
'surname'
]
=
result
[
'sn'
]
.
replace
(
' '
,
''
)
if
result
.
has_key
(
'mail'
):
if
isinstance
(
result
[
'mail'
],
list
):
result
[
'mail'
]
=
result
[
'mail'
][
0
]
if
len
(
result
[
'mail'
])
>
0
:
if
len
(
result
[
'mail'
]
.
split
(
'@'
))
>
1
:
result
[
'domain'
]
=
result
[
'mail'
]
.
split
(
'@'
)[
1
]
if
not
result
.
has_key
(
'domain'
)
and
result
.
has_key
(
'standard_domain'
):
result
[
'domain'
]
=
result
[
'standard_domain'
]
return
result
def
parse_input
(
_input
,
splitchars
=
[
' '
]):
"""
Split the input string using the split characters defined
in splitchars, and remove the empty list items, then unique the
list items.
Takes a string as input, and a list of characters the string should be
split with (list of delimiter characters).
"""
_parse_list
=
_input
.
split
(
splitchars
.
pop
())
_output_list
=
[]
for
splitchar
in
splitchars
:
__parse_list
=
[]
for
item
in
_parse_list
:
__parse_list
.
extend
(
item
.
split
(
splitchar
))
_parse_list
=
__parse_list
for
item
in
_parse_list
:
if
not
item
==
''
:
if
_output_list
.
count
(
item
)
<
1
:
_output_list
.
append
(
item
)
return
_output_list
def
parse_ldap_uri
(
uri
):
"""
Parse an LDAP URI and return it's components.
Returns a tuple containing;
- protocol (ldap, ldaps),
- server (address or None),
- base_dn,
- attrs (list of attributes length 1, or empty),
- scope,
- filter
or None on failure
"""
_protocol
=
uri
.
split
(
':'
)[
0
]
try
:
try
:
_ldap_uri
,
_attr
,
_scope
,
_filter
=
uri
.
split
(
'?'
)
_server
=
_ldap_uri
.
split
(
'//'
)[
1
]
.
split
(
'/'
)[
0
]
_base_dn
=
_ldap_uri
.
split
(
'//'
)[
1
]
.
split
(
'/'
)[
1
]
except
:
_server
=
uri
.
split
(
'//'
)[
1
]
.
split
(
'/'
)[
0
]
_attr
=
None
_scope
=
None
_filter
=
None
_base_dn
=
None
if
len
(
_server
.
split
(
':'
))
>
1
:
_port
=
_server
.
split
(
':'
)[
1
]
_server
=
_server
.
split
(
':'
)[
0
]
else
:
if
_protocol
==
'ldaps'
:
_port
=
"636"
else
:
_port
=
"389"
if
_server
==
''
:
_server
=
None
if
_attr
==
''
:
_attrs
=
[]
else
:
_attrs
=
[
_attr
]
if
_scope
==
''
:
_scope
=
'sub'
if
_filter
==
''
:
_filter
=
"(objectclass=*)"
return
(
_protocol
,
_server
,
_port
,
_base_dn
,
_attr
,
_scope
,
_filter
)
except
:
return
None
def
pop_empty_from_list
(
_input_list
):
_output_list
=
[]
for
item
in
_input_list
:
if
not
item
==
''
:
_output_list
.
append
(
item
)
def
sid_to_string
(
sid
):
srl
=
ord
(
sid
[
0
])
number_sub_id
=
ord
(
sid
[
1
])
iav
=
struct
.
unpack
(
'!Q'
,
'
\x00\x00
'
+
sid
[
2
:
8
])[
0
]
sub_ids
=
[]
for
i
in
range
(
number_sub_id
):
sub_ids
.
append
(
struct
.
unpack
(
'<I'
,
sid
[
8
+
4
*
i
:
12
+
4
*
i
])[
0
])
result
=
'S-
%d
-
%d
-
%s
'
%
(
srl
,
iav
,
'-'
.
join
([
str
(
s
)
for
s
in
sub_ids
]),
)
return
result
def
standard_root_dn
(
domain
):
return
'dc=
%s
'
%
(
',dc='
.
join
(
domain
.
split
(
'.'
)))
def
translate
(
mystring
,
locale_name
=
'en_US'
):
import
locale
import
subprocess
log
.
debug
(
_
(
"Transliterating string
%r
with locale
%r
"
)
%
(
mystring
,
locale_name
),
level
=
8
)
if
len
(
locale
.
normalize
(
locale_name
)
.
split
(
'.'
))
>
1
:
(
locale_name
,
locale_charset
)
=
locale
.
normalize
(
locale_name
)
.
split
(
'.'
)
else
:
locale_charset
=
'utf-8'
try
:
log
.
debug
(
_
(
"Attempting to set locale"
),
level
=
8
)
locale
.
setlocale
(
locale
.
LC_ALL
,
(
locale_name
,
locale_charset
))
log
.
debug
(
_
(
"Success setting locale"
),
level
=
8
)
except
:
log
.
debug
(
_
(
"Failure to set locale"
),
level
=
8
)
pass
command
=
[
'/usr/bin/iconv'
,
'-f'
,
'UTF-8'
,
'-t'
,
'ASCII//TRANSLIT'
,
'-s'
]
log
.
debug
(
_
(
"Executing '
%s
|
%s
'"
)
%
(
r"
%s
"
%
(
mystring
),
' '
.
join
(
command
)),
level
=
8
)
process
=
subprocess
.
Popen
(
command
,
stdout
=
subprocess
.
PIPE
,
stdin
=
subprocess
.
PIPE
,
stderr
=
subprocess
.
PIPE
,
env
=
{
'LANG'
:
locale
.
normalize
(
locale_name
)})
try
:
print
>>
process
.
stdin
,
r"
%s
"
%
mystring
except
UnicodeEncodeError
,
errmsg
:
pass
result
=
process
.
communicate
()[
0
]
.
strip
()
if
'?'
in
result
or
(
result
==
''
and
not
mystring
==
''
):
log
.
warning
(
_
(
"Could not translate
%s
using locale
%s
"
)
%
(
mystring
,
locale_name
))
from
pykolab
import
translit
result
=
translit
.
transliterate
(
mystring
,
locale_name
)
return
result
def
true_or_false
(
val
):
if
val
==
None
:
return
False
if
isinstance
(
val
,
bool
):
return
val
if
isinstance
(
val
,
basestring
)
or
isinstance
(
val
,
str
):
val
=
val
.
lower
()
if
val
in
[
"true"
,
"yes"
,
"y"
,
"1"
]:
return
True
else
:
return
False
if
isinstance
(
val
,
int
)
or
isinstance
(
val
,
float
):
if
val
>=
1
:
return
True
else
:
return
False
def
is_service
(
services
):
"""
Checks each item in list services to see if it has a RC script in
pykolab.constants.RC_DIR to see if it's a service, and returns
the name of the service for the first service it can find. However,
it also checks whether the other services exist and issues a warning if
more then one service exists.
Usage: utils.is_service(['dirsrv', 'ldap'])
"""
_service
=
None
_other_services
=
[]
for
service
in
services
:
if
os
.
path
.
isfile
(
os
.
path
.
join
(
constants
.
RC_DIR
,
service
)):
if
_service
==
''
:
_service
=
service
else
:
_other_services
.
append
(
service
)
return
(
_service
,
_other_services
)
File Metadata
Details
Attached
Mime Type
text/x-script.python
Expires
Mon, Apr 6, 12:58 AM (5 d, 2 h ago)
Storage Engine
local-disk
Storage Format
Raw Data
Storage Handle
b9/57/592173397f8540ae3aabf575b5b2
Default Alt Text
utils.py (17 KB)
Attached To
Mode
rP pykolab
Attached
Detach File
Event Timeline