Page Menu
Home
Phorge
Search
Configure Global Search
Log In
Files
F16569279
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Flag For Later
Award Token
Size
5 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/pykolab/auth/ldap/auth_cache.py b/pykolab/auth/ldap/auth_cache.py
index dcf8c23..41ef438 100644
--- a/pykolab/auth/ldap/auth_cache.py
+++ b/pykolab/auth/ldap/auth_cache.py
@@ -1,208 +1,216 @@
# Copyright 2010-2013 Kolab Systems AG (http://www.kolabsys.com)
#
# Jeroen van Meeuwen (Kolab Systems) <vanmeeuwen a kolabsys.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import datetime
import os
import sqlalchemy
from sqlalchemy import Column
from sqlalchemy import DateTime
from sqlalchemy import Integer
from sqlalchemy import MetaData
from sqlalchemy import String
from sqlalchemy import Text
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import pykolab
from pykolab.constants import KOLAB_LIB_PATH
# pylint: disable=invalid-name
conf = pykolab.getConf()
log = pykolab.getLogger('pykolab.auth_cache')
metadata = MetaData()
db = None
try:
unicode('')
except NameError:
unicode = str
#
# Classes
#
DeclarativeBase = declarative_base()
# pylint: disable=too-few-public-methods
class Entry(DeclarativeBase):
__tablename__ = 'entries'
id = Column(Integer, primary_key=True)
domain = Column(String(256), index=True, nullable=True)
key = Column(Text, index=True, nullable=False)
value = Column(Text, nullable=False)
last_change = Column(DateTime, nullable=False, default=datetime.datetime.now())
def __init__(self, key, value):
self.key = key
if not isinstance(value, unicode):
self.value = unicode(value, 'utf-8')
else:
self.value = value
#
# Functions
#
def del_entry(key):
# pylint: disable=global-statement
global db
db = init_db()
if db is None:
return
purge_entries()
try:
db.query(Entry).filter_by(key=key).delete()
except sqlalchemy.exc.OperationalError:
db = init_db(reinit=True)
except sqlalchemy.exc.InvalidRequest:
db = init_db(reinit=True)
finally:
db.query(Entry).filter_by(key=key).delete()
db.commit()
def get_entry(key):
# pylint: disable=global-statement
global db
db = init_db()
if db is None:
return None
try:
_entries = db.query(Entry).filter_by(key=key).all()
except sqlalchemy.exc.OperationalError:
db = init_db(reinit=True)
except sqlalchemy.exc.InvalidRequest:
db = init_db(reinit=True)
finally:
_entries = db.query(Entry).filter_by(key=key).all()
- if len(_entries) != 1:
+ __entries = []
+
+ for __entry in _entries:
+ if __entry.last_change <= datetime.datetime.now() - datetime.timedelta(0, 7200):
+ continue
+
+ __entries.append(__entry)
+
+ if len(__entries) != 1:
return None
- log.debug("Entry found: %r" % (_entries[0].__dict__))
- log.debug("Returning: %r" % (_entries[0].value))
+ log.debug("Entry found: %r" % (__entries[0].__dict__))
+ log.debug("Returning: %r" % (__entries[0].value))
- return _entries[0].value.encode('utf-8', 'latin1')
+ return __entries[0].value.encode('utf-8', 'latin1')
def set_entry(key, value):
# pylint: disable=global-statement
global db
db = init_db()
if db is None:
return
try:
_entries = db.query(Entry).filter_by(key=key).all()
except sqlalchemy.exc.OperationalError:
db = init_db(reinit=True)
except sqlalchemy.exc.InvalidRequest:
db = init_db(reinit=True)
finally:
_entries = db.query(Entry).filter_by(key=key).all()
if not _entries:
db.add(Entry(key, value))
db.commit()
elif len(_entries) == 1:
if not isinstance(value, unicode):
value = unicode(value, 'utf-8')
if not _entries[0].value == value:
_entries[0].value = value
_entries[0].last_change = datetime.datetime.now()
db.commit()
def purge_entries():
# pylint: disable=global-statement
global db
try:
db.query(Entry).filter(
- Entry.last_change <= (datetime.datetime.now() - datetime.timedelta(1))
+ Entry.last_change <= (datetime.datetime.now() - datetime.timedelta(0, 7200))
).delete()
db.commit()
except Exception:
return
def init_db(reinit=False):
"""
Returns a SQLAlchemy Session() instance.
"""
# pylint: disable=global-statement
global db
if db is not None and not reinit:
return db
db_uri = conf.get('ldap', 'auth_cache_uri')
if db_uri == "null":
return None
if db_uri is None:
db_uri = 'sqlite:///%s/auth_cache.db' % (KOLAB_LIB_PATH)
if reinit:
if os.path.isfile('%s/auth_cache.db' % (KOLAB_LIB_PATH)):
os.unlink('%s/auth_cache.db' % (KOLAB_LIB_PATH))
echo = conf.debuglevel > 8
engine = create_engine(db_uri, echo=echo)
DeclarativeBase.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
db = Session()
return db
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Fri, Nov 1, 8:20 AM (1 d, 7 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
10074915
Default Alt Text
(5 KB)
Attached To
Mode
rP pykolab
Attached
Detach File
Event Timeline
Log In to Comment