Changeset View
Changeset View
Standalone View
Standalone View
pykolab/auth/ldap/auth_cache.py
Show First 20 Lines • Show All 51 Lines • ▼ Show 20 Lines | |||||
conf = pykolab.getConf() | conf = pykolab.getConf() | ||||
log = pykolab.getLogger('pykolab.auth_cache') | log = pykolab.getLogger('pykolab.auth_cache') | ||||
metadata = MetaData() | metadata = MetaData() | ||||
db = None | db = None | ||||
## | ## | ||||
## Classes | ## Classes | ||||
Lint: PEP8 E266: too many leading '#' for block comment | |||||
## | ## | ||||
DeclarativeBase = declarative_base() | DeclarativeBase = declarative_base() | ||||
class Entry(DeclarativeBase): | class Entry(DeclarativeBase): | ||||
Lint: PEP8 E302 expected 2 blank lines, found 1 Lint: PEP8 E302: expected 2 blank lines, found 1 | |||||
__tablename__ = 'entries' | __tablename__ = 'entries' | ||||
id = Column(Integer, primary_key=True) | id = Column(Integer, primary_key=True) | ||||
domain = Column(String(256), index=True, nullable=True) | domain = Column(String(256), index=True, nullable=True) | ||||
key = Column(Text, index=True, nullable=False) | key = Column(Text, index=True, nullable=False) | ||||
value = Column(Text, nullable=False) | value = Column(Text, nullable=False) | ||||
last_change = Column(DateTime, nullable=False, default=datetime.datetime.now()) | last_change = Column(DateTime, nullable=False, default=datetime.datetime.now()) | ||||
Lint: PEP8 E501 line too long (83 > 79 characters) Lint: PEP8 E501: line too long (83 > 79 characters) | |||||
def __init__(self, key, value): | def __init__(self, key, value): | ||||
self.key = key | self.key = key | ||||
if not isinstance(value, unicode): | if not isinstance(value, unicode): | ||||
self.value = unicode(value, 'utf-8') | self.value = unicode(value, 'utf-8') | ||||
else: | else: | ||||
self.value = value | self.value = value | ||||
## | ## | ||||
## Functions | ## Functions | ||||
Lint: PEP8 E266 too many leading '#' for block comment Lint: PEP8 E266: too many leading '#' for block comment | |||||
## | ## | ||||
def del_entry(key): | def del_entry(key): | ||||
Lint: PEP8 E302 expected 2 blank lines, found 1 Lint: PEP8 E302: expected 2 blank lines, found 1 | |||||
db = init_db() | db = init_db() | ||||
try: | try: | ||||
_entries = db.query(Entry).filter_by(key=key).delete() | _entries = db.query(Entry).filter_by(key=key).delete() | ||||
except sqlalchemy.exc.OperationalError, errmsg: | except sqlalchemy.exc.OperationalError, errmsg: | ||||
db = init_db(reinit=True) | db = init_db(reinit=True) | ||||
except sqlalchemy.exc.InvalidRequest, errmsg: | except sqlalchemy.exc.InvalidRequest, errmsg: | ||||
db = init_db(reinit=True) | db = init_db(reinit=True) | ||||
finally: | finally: | ||||
_entries = db.query(Entry).filter_by(key=key).delete() | _entries = db.query(Entry).filter_by(key=key).delete() | ||||
db.commit() | db.commit() | ||||
def get_entry(key): | def get_entry(key): | ||||
Lint: PEP8 E302 expected 2 blank lines, found 1 Lint: PEP8 E302: expected 2 blank lines, found 1 | |||||
db = init_db() | db = init_db() | ||||
try: | try: | ||||
_entries = db.query(Entry).filter_by(key=key).all() | _entries = db.query(Entry).filter_by(key=key).all() | ||||
except sqlalchemy.exc.OperationalError, errmsg: | except sqlalchemy.exc.OperationalError, errmsg: | ||||
db = init_db(reinit=True) | db = init_db(reinit=True) | ||||
except sqlalchemy.exc.InvalidRequest, errmsg: | except sqlalchemy.exc.InvalidRequest, errmsg: | ||||
db = init_db(reinit=True) | db = init_db(reinit=True) | ||||
finally: | finally: | ||||
_entries = db.query(Entry).filter_by(key=key).all() | _entries = db.query(Entry).filter_by(key=key).all() | ||||
if len(_entries) == 0: | if len(_entries) == 0: | ||||
return None | return None | ||||
if len(_entries) > 1: | if len(_entries) > 1: | ||||
return None | return None | ||||
log.debug("Entry found: %r" % (_entries[0].__dict__)) | log.debug("Entry found: %r" % (_entries[0].__dict__)) | ||||
log.debug("Returning: %r" % (_entries[0].value)) | log.debug("Returning: %r" % (_entries[0].value)) | ||||
return _entries[0].value.encode('utf-8', 'latin1') | return _entries[0].value.encode('utf-8', 'latin1') | ||||
def set_entry(key, value): | def set_entry(key, value): | ||||
Lint: PEP8 E302 expected 2 blank lines, found 1 Lint: PEP8 E302: expected 2 blank lines, found 1 | |||||
db = init_db() | db = init_db() | ||||
try: | try: | ||||
_entries = db.query(Entry).filter_by(key=key).all() | _entries = db.query(Entry).filter_by(key=key).all() | ||||
except sqlalchemy.exc.OperationalError, errmsg: | except sqlalchemy.exc.OperationalError, errmsg: | ||||
db = init_db(reinit=True) | db = init_db(reinit=True) | ||||
except sqlalchemy.exc.InvalidRequest, errmsg: | except sqlalchemy.exc.InvalidRequest, errmsg: | ||||
db = init_db(reinit=True) | db = init_db(reinit=True) | ||||
finally: | finally: | ||||
_entries = db.query(Entry).filter_by(key=key).all() | _entries = db.query(Entry).filter_by(key=key).all() | ||||
if len(_entries) == 0: | if len(_entries) == 0: | ||||
db.add( | db.add( | ||||
Entry( | Entry( | ||||
key, | key, | ||||
value | value | ||||
) | ) | ||||
) | ) | ||||
db.commit() | db.commit() | ||||
elif len(_entries) == 1: | elif len(_entries) == 1: | ||||
if not isinstance(value, unicode): | |||||
value = unicode(value, 'utf-8') | |||||
if not _entries[0].value == value: | if not _entries[0].value == value: | ||||
_entries[0].value = value | _entries[0].value = value | ||||
_entries[0].last_change = datetime.datetime.now() | _entries[0].last_change = datetime.datetime.now() | ||||
db.commit() | db.commit() | ||||
def purge_entries(db): | def purge_entries(db): | ||||
Lint: PEP8 E302 expected 2 blank lines, found 1 Lint: PEP8 E302: expected 2 blank lines, found 1 | |||||
db.query(Entry).filter(Entry.last_change <= (datetime.datetime.now() - datetime.timedelta(1))).delete() | db.query(Entry).filter(Entry.last_change <= (datetime.datetime.now() - datetime.timedelta(1))).delete() | ||||
Lint: PEP8 E501 line too long (107 > 79 characters) Lint: PEP8 E501: line too long (107 > 79 characters) | |||||
db.commit() | db.commit() | ||||
def init_db(reinit=False): | def init_db(reinit=False): | ||||
Lint: PEP8 E302 expected 2 blank lines, found 1 Lint: PEP8 E302: expected 2 blank lines, found 1 | |||||
""" | """ | ||||
Returns a SQLAlchemy Session() instance. | Returns a SQLAlchemy Session() instance. | ||||
""" | """ | ||||
global db | global db | ||||
if not db == None and not reinit: | if not db == None and not reinit: | ||||
Lint: PEP8 E711 comparison to None should be 'if cond is None:' Lint: PEP8 E711: comparison to None should be 'if cond is None:' | |||||
return db | return db | ||||
db_uri = conf.get('ldap', 'auth_cache_uri') | db_uri = conf.get('ldap', 'auth_cache_uri') | ||||
if db_uri == None: | if db_uri == None: | ||||
Lint: PEP8 E711 comparison to None should be 'if cond is None:' Lint: PEP8 E711: comparison to None should be 'if cond is None:' | |||||
db_uri = 'sqlite:///%s/auth_cache.db' % (KOLAB_LIB_PATH) | db_uri = 'sqlite:///%s/auth_cache.db' % (KOLAB_LIB_PATH) | ||||
if reinit: | if reinit: | ||||
import os | import os | ||||
if os.path.isfile('%s/auth_cache.db' % (KOLAB_LIB_PATH)): | if os.path.isfile('%s/auth_cache.db' % (KOLAB_LIB_PATH)): | ||||
os.unlink('%s/auth_cache.db' % (KOLAB_LIB_PATH)) | os.unlink('%s/auth_cache.db' % (KOLAB_LIB_PATH)) | ||||
echo = conf.debuglevel > 8 | echo = conf.debuglevel > 8 | ||||
engine = create_engine(db_uri, echo=echo) | engine = create_engine(db_uri, echo=echo) | ||||
DeclarativeBase.metadata.create_all(engine) | DeclarativeBase.metadata.create_all(engine) | ||||
Session = sessionmaker(bind=engine) | Session = sessionmaker(bind=engine) | ||||
db = Session() | db = Session() | ||||
purge_entries(db) | purge_entries(db) | ||||
return db | return db |
too many leading '#' for block comment