diff --git a/akonadi/changerecorder.cpp b/akonadi/changerecorder.cpp index 84cb9f8ef..3c6b8af6f 100644 --- a/akonadi/changerecorder.cpp +++ b/akonadi/changerecorder.cpp @@ -1,194 +1,188 @@ /* Copyright (c) 2007 Volker Krause This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #include "changerecorder.h" #include "monitor_p.h" #include #include using namespace Akonadi; class Akonadi::ChangeRecorderPrivate : public MonitorPrivate { public: ChangeRecorderPrivate( ChangeRecorder* parent ) : MonitorPrivate( parent ), settings( 0 ), enableChangeRecording( true ) { } Q_DECLARE_PUBLIC( ChangeRecorder ) - NotificationMessage::List pendingNotifications; QSettings *settings; bool enableChangeRecording; - virtual void slotNotify( const NotificationMessage::List &msgs ) + virtual int pipelineSize() const { - if ( !enableChangeRecording ) { - foreach( const NotificationMessage &msg, msgs ) - processNotification( msg ); - return; - } + if ( enableChangeRecording ) + return 0; // we fill the pipeline ourselves when using change recording + return MonitorPrivate::pipelineSize(); + } + virtual void slotNotify( const NotificationMessage::List &msgs ) + { Q_Q( ChangeRecorder ); - int oldChanges = pendingNotifications.count(); - foreach ( const NotificationMessage &msg, msgs ) { - if ( acceptNotification( msg ) ) - NotificationMessage::appendAndCompress( pendingNotifications, msg ); - } - if ( pendingNotifications.count() != oldChanges ) { + const int oldChanges = pendingNotifications.size(); + MonitorPrivate::slotNotify( msgs ); // with change recording disabled this will automatically take care of dispatching notification messages + if ( enableChangeRecording && pendingNotifications.size() != oldChanges ) { saveNotifications(); emit q->changesAdded(); } } void loadNotifications() { pendingNotifications.clear(); QStringList list; settings->beginGroup( QLatin1String( "ChangeRecorder" ) ); int size = settings->beginReadArray( QLatin1String( "change" ) ); for ( int i = 0; i < size; ++i ) { settings->setArrayIndex( i ); NotificationMessage msg; msg.setSessionId( settings->value( QLatin1String( "sessionId" ) ).toByteArray() ); msg.setType( (NotificationMessage::Type)settings->value( QLatin1String( "type" ) ).toInt() ); msg.setOperation( (NotificationMessage::Operation)settings->value( QLatin1String( "op" ) ).toInt() ); msg.setUid( settings->value( QLatin1String( "uid" ) ).toLongLong() ); msg.setRemoteId( settings->value( QLatin1String( "rid" ) ).toString() ); msg.setResource( settings->value( QLatin1String( "resource" ) ).toByteArray() ); msg.setParentCollection( settings->value( QLatin1String( "parentCol" ) ).toLongLong() ); msg.setParentDestCollection( settings->value( QLatin1String( "parentDestCol" ) ).toLongLong() ); msg.setMimeType( settings->value( QLatin1String( "mimeType" ) ).toString() ); list = settings->value( QLatin1String( "itemParts" ) ).toStringList(); QSet itemParts; Q_FOREACH( const QString &entry, list ) itemParts.insert( entry.toLatin1() ); msg.setItemParts( itemParts ); pendingNotifications << msg; } settings->endArray(); settings->endGroup(); } void saveNotifications() { if ( !settings ) return; settings->beginGroup( QLatin1String( "ChangeRecorder" ) ); settings->beginWriteArray( QLatin1String( "change" ), pendingNotifications.count() ); for ( int i = 0; i < pendingNotifications.count(); ++i ) { settings->setArrayIndex( i ); NotificationMessage msg = pendingNotifications.at( i ); settings->setValue( QLatin1String( "sessionId" ), msg.sessionId() ); settings->setValue( QLatin1String( "type" ), msg.type() ); settings->setValue( QLatin1String( "op" ), msg.operation() ); settings->setValue( QLatin1String( "uid" ), msg.uid() ); settings->setValue( QLatin1String( "rid" ), msg.remoteId() ); settings->setValue( QLatin1String( "resource" ), msg.resource() ); settings->setValue( QLatin1String( "parentCol" ), msg.parentCollection() ); settings->setValue( QLatin1String( "parentDestCol" ), msg.parentDestCollection() ); settings->setValue( QLatin1String( "mimeType" ), msg.mimeType() ); QStringList list; const QSet itemParts = msg.itemParts(); QSetIterator it( itemParts ); while ( it.hasNext() ) list.append( QString::fromLatin1( it.next() ) ); settings->setValue( QLatin1String( "itemParts" ), list ); } settings->endArray(); settings->endGroup(); } }; ChangeRecorder::ChangeRecorder(QObject * parent) : Monitor( new ChangeRecorderPrivate( this ), parent ) { Q_D( ChangeRecorder ); d->init(); d->connectToNotificationManager(); } ChangeRecorder::~ ChangeRecorder() { Q_D( ChangeRecorder ); d->saveNotifications(); } void ChangeRecorder::setConfig(QSettings * settings) { Q_D( ChangeRecorder ); if ( settings ) { d->settings = settings; Q_ASSERT( d->pendingNotifications.isEmpty() ); d->loadNotifications(); } else if ( d->settings ) { d->saveNotifications(); d->settings = settings; } } void ChangeRecorder::replayNext() { - bool nothing = true; Q_D( ChangeRecorder ); - while( !d->pendingNotifications.isEmpty() ) { + if ( !d->pendingNotifications.isEmpty() ) { const NotificationMessage msg = d->pendingNotifications.first(); - if ( d->processNotification( msg ) ) { - nothing = false; - break; - } - d->pendingNotifications.takeFirst(); - } - if( nothing ) { + if ( d->ensureDataAvailable( msg ) ) + d->emitNotification( msg ); + else + d->pipeline.enqueue( msg ); + } else { // This is necessary when none of the notifications were accepted / processed // above, and so there is no one to call changeProcessed() and the ChangeReplay task // will be stuck forever in the ResourceScheduler. emit nothingToReplay(); } d->saveNotifications(); } bool ChangeRecorder::isEmpty() const { Q_D( const ChangeRecorder ); return d->pendingNotifications.isEmpty(); } void ChangeRecorder::changeProcessed() { Q_D( ChangeRecorder ); if ( !d->pendingNotifications.isEmpty() ) d->pendingNotifications.removeFirst(); d->saveNotifications(); } void ChangeRecorder::setChangeRecordingEnabled( bool enable ) { Q_D( ChangeRecorder ); d->enableChangeRecording = enable; Q_ASSERT( enable || d->pendingNotifications.isEmpty() ); } #include "changerecorder.moc" diff --git a/akonadi/monitor.h b/akonadi/monitor.h index 7d0cc7f67..db13fb5b1 100644 --- a/akonadi/monitor.h +++ b/akonadi/monitor.h @@ -1,378 +1,376 @@ /* Copyright (c) 2006 - 2007 Volker Krause This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #ifndef AKONADI_MONITOR_H #define AKONADI_MONITOR_H #include #include #include namespace Akonadi { class CollectionFetchScope; class CollectionStatistics; class Item; class ItemFetchScope; class MonitorPrivate; class Session; /** * @short Monitors an item or collection for changes. * * The Monitor emits signals if some of these objects are changed or * removed or new ones are added to the Akonadi storage. * * Optionally, the changed objects can be fetched automatically from the server. * To enable this, see itemFetchScope() and collectionFetchScope(). * * @todo: distinguish between monitoring collection properties and collection content. * @todo: special case for collection content counts changed * * @author Volker Krause */ class AKONADI_EXPORT Monitor : public QObject { Q_OBJECT public: /** * Creates a new monitor. * * @param parent The parent object. */ explicit Monitor( QObject *parent = 0 ); /** * Destroys the monitor. */ virtual ~Monitor(); /** * Sets whether the specified collection shall be monitored for changes. * * @param collection The collection to monitor. * If this collection is Collection::root(), all collections * in the Akonadi storage will be monitored. */ void setCollectionMonitored( const Collection &collection, bool monitored = true ); /** * Sets whether the specified item shall be monitored for changes. * * @param item The item to monitor. */ void setItemMonitored( const Item &item, bool monitored = true ); /** * Sets whether the specified resource shall be monitored for changes. * * @param resource The resource identifier. */ void setResourceMonitored( const QByteArray &resource, bool monitored = true ); /** * Sets whether objects of the specified mime type shall be monitored for changes. * * @param mimetype The mime type to monitor. */ void setMimeTypeMonitored( const QString &mimetype, bool monitored = true ); /** * Sets whether all items shall be monitored. */ void setAllMonitored( bool monitored = true ); /** * Ignores all change notifications caused by the given session. * * @param session The session you want to ignore. */ void ignoreSession( Session *session ); /** * Enables automatic fetching of changed collections from the Akonadi storage. * * @param enable @c true enables automatic fetching, @c false disables automatic fetching. */ void fetchCollection( bool enable ); /** * Enables automatic fetching of changed collection statistics information from * the Akonadi storage. * * @param enable @c true to enables automatic fetching, @c false disables automatic fetching. */ void fetchCollectionStatistics( bool enable ); /** * Sets the item fetch scope. * * Controls how much of an item's data is fetched from the server, e.g. * whether to fetch the full item payload or only meta data. * * @param fetchScope The new scope for item fetch operations. * * @see itemFetchScope() */ void setItemFetchScope( const ItemFetchScope &fetchScope ); /** * Returns the item fetch scope. * * Since this returns a reference it can be used to conveniently modify the * current scope in-place, i.e. by calling a method on the returned reference * without storing it in a local variable. See the ItemFetchScope documentation * for an example. * * @return a reference to the current item fetch scope * * @see setItemFetchScope() for replacing the current item fetch scope */ ItemFetchScope &itemFetchScope(); /** * Sets the collection fetch scope. * * Controls which collections are monitored and how much of a collection's data * is fetched from the server. * * @param fetchScope The new scope for collection fetch operations. * * @see collectionFetchScope() * @since 4.4 */ void setCollectionFetchScope( const CollectionFetchScope &fetchScope ); /** * Returns the collection fetch scope. * * Since this returns a reference it can be used to conveniently modify the * current scope in-place, i.e. by calling a method on the returned reference * without storing it in a local variable. See the CollectionFetchScope documentation * for an example. * * @return a reference to the current collection fetch scope * * @see setCollectionFetchScope() for replacing the current collection fetch scope * @since 4.4 */ CollectionFetchScope &collectionFetchScope(); /** * Returns the list of collections being monitored. * * @since 4.3 */ Collection::List collectionsMonitored() const; /** * Returns the set of items being monitored. * * @since 4.3 */ QList itemsMonitored() const; /** * Returns the set of mimetypes being monitored. * * @since 4.3 */ QStringList mimeTypesMonitored() const; /** * Returns the set of identifiers for resources being monitored. * * @since 4.3 */ QList resourcesMonitored() const; /** * Returns true if everything is being monitored. * * @since 4.3 */ bool isAllMonitored() const; Q_SIGNALS: /** * This signal is emitted if a monitored item has changed, e.g. item parts have been modified. * * @param item The changed item. * @param partIdentifiers The identifiers of the item parts that has been changed. */ void itemChanged( const Akonadi::Item &item, const QSet &partIdentifiers ); /** * This signal is emitted if a monitored item has been moved between two collections * * @param item The moved item. * @param collectionSource The collection the item has been moved from. * @param collectionDestination The collection the item has been moved to. */ void itemMoved( const Akonadi::Item &item, const Akonadi::Collection &collectionSource, const Akonadi::Collection &collectionDestination ); /** * This signal is emitted if an item has been added to a monitored collection in the Akonadi storage. * * @param item The new item. * @param collection The collection the item has been added to. */ void itemAdded( const Akonadi::Item &item, const Akonadi::Collection &collection ); /** * This signal is emitted if * - a monitored item has been removed from the Akonadi storage * or * - a item has been removed from a monitored collection. * * @param item The removed item. */ void itemRemoved( const Akonadi::Item &item ); /** * This signal is emitted if a reference to an item is added to a virtual collection. * @param item The linked item. * @param collection The collection the item is linked to. * * @since 4.2 */ void itemLinked( const Akonadi::Item &item, const Akonadi::Collection &collection ); /** * This signal is emitted if a reference to an item is removed from a virtual collection. * @param item The unlinked item. * @param collection The collection the item is unlinked from. * * @since 4.2 */ void itemUnlinked( const Akonadi::Item &item, const Akonadi::Collection &collection ); /** * This signal is emitted if a new collection has been added to a monitored collection in the Akonadi storage. * * @param collection The new collection. * @param parent The parent collection. */ void collectionAdded( const Akonadi::Collection &collection, const Akonadi::Collection &parent ); /** * This signal is emitted if a monitored collection has been changed (properties or content). * * @param collection The changed collection. */ void collectionChanged( const Akonadi::Collection &collection ); /** * This signals is emitted if a monitored collection has been moved. * * @param collection The moved collection. * @param source The previous parent collection. * @param distination The new parent collection. */ void collectionMoved( const Akonadi::Collection &collection, const Akonadi::Collection &source, const Akonadi::Collection &destination ); /** * This signal is emitted if a monitored collection has been removed from the Akonadi storage. * * @param collection The removed collection. */ void collectionRemoved( const Akonadi::Collection &collection ); /** * This signal is emitted if the statistics information of a monitored collection * has changed. * * @param id The collection identifier of the changed collection. * @param statistics The updated collection statistics, invalid if automatic * fetching of statistics changes is disabled. */ void collectionStatisticsChanged( Akonadi::Collection::Id id, const Akonadi::CollectionStatistics &statistics ); /** * This signal is emitted if the Monitor starts or stops monitoring @p collection explicitly. * @param collection The collection * @param monitored Whether the collection is now being monitored or not. * * @since 4.3 */ void collectionMonitored( const Akonadi::Collection &collection, bool monitored ); /** * This signal is emitted if the Monitor starts or stops monitoring @p item explicitly. * @param item The item * @param monitored Whether the item is now being monitored or not. * * @since 4.3 */ void itemMonitored( const Akonadi::Item &item, bool monitored ); /** * This signal is emitted if the Monitor starts or stops monitoring the resource with the identifier @p identifier explicitly. * @param identifier The identifier of the resource. * @param monitored Whether the resource is now being monitored or not. * * @since 4.3 */ void resourceMonitored( const QByteArray &identifier, bool monitored ); /** * This signal is emitted if the Monitor starts or stops monitoring @p mimeType explicitly. * @param mimeType The mimeType. * @param monitored Whether the mimeType is now being monitored or not. * * @since 4.3 */ void mimeTypeMonitored( const QString &mimeType, bool monitored ); /** * This signal is emitted if the Monitor starts or stops monitoring everything. * @param monitored Whether everything is now being monitored or not. * * @since 4.3 */ void allMonitored( bool monitored ); protected: //@cond PRIVATE MonitorPrivate *d_ptr; explicit Monitor( MonitorPrivate *d, QObject *parent = 0 ); //@endcond private: Q_DECLARE_PRIVATE( Monitor ) //@cond PRIVATE Q_PRIVATE_SLOT( d_ptr, void slotSessionDestroyed( QObject* ) ) Q_PRIVATE_SLOT( d_ptr, void slotStatisticsChangedFinished( KJob* ) ) Q_PRIVATE_SLOT( d_ptr, void slotFlushRecentlyChangedCollections() ) Q_PRIVATE_SLOT( d_ptr, void slotNotify( const Akonadi::NotificationMessage::List& ) ) - Q_PRIVATE_SLOT( d_ptr, void slotItemJobFinished( KJob* ) ) - Q_PRIVATE_SLOT( d_ptr, void slotCollectionJobFinished( KJob* ) ) Q_PRIVATE_SLOT( d_ptr, void dataAvailable() ) //@endcond }; } #endif diff --git a/akonadi/monitor_p.cpp b/akonadi/monitor_p.cpp index 825e47354..94ff902d5 100644 --- a/akonadi/monitor_p.cpp +++ b/akonadi/monitor_p.cpp @@ -1,504 +1,318 @@ /* Copyright (c) 2007 Tobias Koenig This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ // @cond PRIVATE #include "monitor_p.h" #include "collectionfetchjob.h" #include "collectionstatistics.h" #include "itemfetchjob.h" #include "notificationmessage_p.h" #include "session.h" #include using namespace Akonadi; static const int PipelineSize = 5; MonitorPrivate::MonitorPrivate(Monitor * parent) : q_ptr( parent ), nm( 0 ), monitorAll( false ), collectionCache( 3*PipelineSize ), // needs to be at least 3x pipeline size for the collection move case itemCache( PipelineSize ), // needs to be at least 1x pipeline size fetchCollection( false ), fetchCollectionStatistics( false ) { } void MonitorPrivate::init() { QObject::connect( &collectionCache, SIGNAL(dataAvailable()), q_ptr, SLOT(dataAvailable()) ); QObject::connect( &itemCache, SIGNAL(dataAvailable()), q_ptr, SLOT(dataAvailable()) ); } bool MonitorPrivate::connectToNotificationManager() { NotificationMessage::registerDBusTypes(); if ( !nm ) nm = new org::freedesktop::Akonadi::NotificationManager( QLatin1String( "org.freedesktop.Akonadi" ), QLatin1String( "/notifications" ), QDBusConnection::sessionBus(), q_ptr ); else return true; if ( !nm ) { kWarning() << "Unable to connect to notification manager"; } else { QObject::connect( nm, SIGNAL(notify(Akonadi::NotificationMessage::List)), q_ptr, SLOT(slotNotify(Akonadi::NotificationMessage::List)) ); return true; } return false; } +int MonitorPrivate::pipelineSize() const +{ + return PipelineSize; +} + bool MonitorPrivate::acceptNotification(const NotificationMessage & msg) { if ( isSessionIgnored( msg.sessionId() ) ) return false; switch ( msg.type() ) { case NotificationMessage::InvalidType: kWarning() << "Received invalid change notification!"; return false; case NotificationMessage::Item: return isItemMonitored( msg.uid(), msg.parentCollection(), msg.parentDestCollection(), msg.mimeType(), msg.resource() ) || isCollectionMonitored( msg.parentCollection(), msg.resource() ) || isCollectionMonitored( msg.parentDestCollection(), msg.resource() ); case NotificationMessage::Collection: return isCollectionMonitored( msg.uid(), msg.resource() ) || isCollectionMonitored( msg.parentCollection(), msg.resource() ) || isCollectionMonitored( msg.parentDestCollection(), msg.resource() ); } Q_ASSERT( false ); return false; } void MonitorPrivate::dispatchNotifications() { - while ( pipeline.size() < PipelineSize && !pendingNotifications.isEmpty() ) { + while ( pipeline.size() < pipelineSize() && !pendingNotifications.isEmpty() ) { const NotificationMessage msg = pendingNotifications.dequeue(); if ( ensureDataAvailable( msg ) && pipeline.isEmpty() ) emitNotification( msg ); else pipeline.enqueue( msg ); } } bool MonitorPrivate::ensureDataAvailable( const NotificationMessage &msg ) { bool allCached = true; if ( fetchCollection ) { if ( !collectionCache.ensureCached( msg.parentCollection(), mCollectionFetchScope ) ) allCached = false; if ( msg.operation() == NotificationMessage::Move && !collectionCache.ensureCached( msg.parentDestCollection(), mCollectionFetchScope ) ) allCached = false; } if ( msg.operation() == NotificationMessage::Remove ) return allCached; // the actual object is gone already, nothing to fetch there if ( msg.type() == NotificationMessage::Item && !mItemFetchScope.isEmpty() ) { if ( !itemCache.ensureCached( msg.uid(), mItemFetchScope ) ) allCached = false; } else if ( msg.type() == NotificationMessage::Collection && fetchCollection ) { if ( !collectionCache.ensureCached( msg.uid(), mCollectionFetchScope ) ) allCached = false; } return allCached; } void MonitorPrivate::emitNotification( const NotificationMessage &msg ) { const Collection parent = collectionCache.retrieve( msg.parentCollection() ); Collection destParent; if ( msg.operation() == NotificationMessage::Move ) destParent = collectionCache.retrieve( msg.parentDestCollection() ); if ( msg.type() == NotificationMessage::Collection ) { const Collection col = collectionCache.retrieve( msg.uid() ); emitCollectionNotification( msg, col, parent, destParent ); } else if ( msg.type() == NotificationMessage::Item ) { const Item item = itemCache.retrieve( msg.uid() ); emitItemNotification( msg, item, parent, destParent ); } } void MonitorPrivate::dataAvailable() { while ( !pipeline.isEmpty() ) { const NotificationMessage msg = pipeline.head(); if ( ensureDataAvailable( msg ) ) { emitNotification( msg ); pipeline.dequeue(); } else { break; } } dispatchNotifications(); } void MonitorPrivate::updatePendingStatistics( const NotificationMessage &msg ) { if ( msg.type() == NotificationMessage::Item ) { notifyCollectionStatisticsWatchers( msg.parentCollection(), msg.resource() ); } else if ( msg.type() == NotificationMessage::Collection && msg.operation() == NotificationMessage::Remove ) { // no need for statistics updates anymore recentlyChangedCollections.remove( msg.uid() ); } } -bool MonitorPrivate::processNotification(const NotificationMessage & msg) -{ - if ( !acceptNotification( msg ) ) - return false; - - if ( msg.type() == NotificationMessage::Item ) { - notifyCollectionStatisticsWatchers( msg.parentCollection(), msg.resource() ); - if ( !mItemFetchScope.isEmpty() && - ( msg.operation() == NotificationMessage::Add || msg.operation() == NotificationMessage::Move || - msg.operation() == NotificationMessage::Link ) ) { - Item item( msg.uid() ); - item.setRemoteId( msg.remoteId() ); - - ItemCollectionFetchJob *job = new ItemCollectionFetchJob( item, msg.parentCollection(), msg.parentDestCollection(), q_ptr ); - job->setFetchScope( mItemFetchScope ); - pendingJobs.insert( job, msg ); - QObject::connect( job, SIGNAL(result(KJob*)), q_ptr, SLOT(slotItemJobFinished(KJob*)) ); - return true; - } - if ( !mItemFetchScope.isEmpty() && msg.operation() == NotificationMessage::Modify ) { - Item item( msg.uid() ); - item.setRemoteId( msg.remoteId() ); - ItemFetchJob *job = new ItemFetchJob( item, q_ptr ); - job->setFetchScope( mItemFetchScope ); - pendingJobs.insert( job, msg ); - QObject::connect( job, SIGNAL(result(KJob*)), q_ptr, SLOT(slotItemJobFinished(KJob*)) ); - return true; - } - emitItemNotification( msg ); - return true; - - } else if ( msg.type() == NotificationMessage::Collection ) { - if ( msg.operation() != NotificationMessage::Remove && fetchCollection ) { - Collection::List list; - list << Collection( msg.uid() ); - if ( msg.operation() == NotificationMessage::Add || msg.operation() == NotificationMessage::Move ) - list << Collection( msg.parentCollection() ); - if ( msg.operation() == NotificationMessage::Move ) - list << Collection( msg.parentDestCollection() ); - CollectionFetchJob *job = new CollectionFetchJob( list, q_ptr ); - pendingJobs.insert( job, msg ); - QObject::connect( job, SIGNAL(result(KJob*)), q_ptr, SLOT(slotCollectionJobFinished(KJob*)) ); - return true; - } - if ( msg.operation() == NotificationMessage::Remove ) { - // no need for statistics updates anymore - recentlyChangedCollections.remove( msg.uid() ); - } - emitCollectionNotification( msg ); - return true; - - } else { - kWarning() << "Received unknown change notification!"; - } - return false; -} - void MonitorPrivate::slotSessionDestroyed( QObject * object ) { Session* session = qobject_cast( object ); if ( session ) sessions.removeAll( session->sessionId() ); } void MonitorPrivate::slotStatisticsChangedFinished( KJob* job ) { if ( job->error() ) { kWarning() << "Error on fetching collection statistics: " << job->errorText(); } else { CollectionStatisticsJob *statisticsJob = static_cast( job ); emit q_ptr->collectionStatisticsChanged( statisticsJob->collection().id(), statisticsJob->statistics() ); } } void MonitorPrivate::slotFlushRecentlyChangedCollections() { foreach( Collection::Id collection, recentlyChangedCollections ) { if ( fetchCollectionStatistics ) { fetchStatistics( collection ); } else { static const CollectionStatistics dummyStatistics; emit q_ptr->collectionStatisticsChanged( collection, dummyStatistics ); } } recentlyChangedCollections.clear(); } void MonitorPrivate::slotNotify( const NotificationMessage::List &msgs ) { foreach ( const NotificationMessage &msg, msgs ) { invalidateCaches( msg ); if ( acceptNotification( msg ) ) { updatePendingStatistics( msg ); NotificationMessage::appendAndCompress( pendingNotifications, msg ); } } dispatchNotifications(); } void MonitorPrivate::emitItemNotification( const NotificationMessage &msg, const Item &item, const Collection &collection, const Collection &collectionDest ) { Q_ASSERT( msg.type() == NotificationMessage::Item ); Collection col = collection; Collection colDest = collectionDest; if ( !col.isValid() ) { col = Collection( msg.parentCollection() ); col.setResource( QString::fromUtf8( msg.resource() ) ); } if ( !colDest.isValid() ) { colDest = Collection( msg.parentDestCollection() ); // FIXME setResource here required ? } Item it = item; if ( !it.isValid() ) { it = Item( msg.uid() ); it.setRemoteId( msg.remoteId() ); it.setMimeType( msg.mimeType() ); } switch ( msg.operation() ) { case NotificationMessage::Add: emit q_ptr->itemAdded( it, col ); break; case NotificationMessage::Modify: emit q_ptr->itemChanged( it, msg.itemParts() ); break; case NotificationMessage::Move: emit q_ptr->itemMoved( it, col, colDest ); break; case NotificationMessage::Remove: emit q_ptr->itemRemoved( it ); break; case NotificationMessage::Link: emit q_ptr->itemLinked( it, col ); break; case NotificationMessage::Unlink: emit q_ptr->itemUnlinked( it, col ); break; default: kDebug() << "Unknown operation type" << msg.operation() << "in item change notification"; break; } } void MonitorPrivate::emitCollectionNotification( const NotificationMessage &msg, const Collection &col, const Collection &par, const Collection &dest ) { Q_ASSERT( msg.type() == NotificationMessage::Collection ); Collection collection = col; if ( !collection.isValid() ) { collection = Collection( msg.uid() ); collection.setParentCollection( Collection( msg.parentCollection() ) ); collection.setResource( QString::fromUtf8( msg.resource() ) ); collection.setRemoteId( msg.remoteId() ); } Collection parent = par; if ( !parent.isValid() ) parent = Collection( msg.parentCollection() ); Collection destination = dest; if ( !destination.isValid() ) destination = Collection( msg.parentDestCollection() ); switch ( msg.operation() ) { case NotificationMessage::Add: emit q_ptr->collectionAdded( collection, parent ); break; case NotificationMessage::Modify: emit q_ptr->collectionChanged( collection ); break; case NotificationMessage::Move: emit q_ptr->collectionMoved( collection, parent, destination ); break; case NotificationMessage::Remove: emit q_ptr->collectionRemoved( collection ); break; default: kDebug() << "Unknown operation type" << msg.operation() << "in collection change notification"; } } -void MonitorPrivate::slotItemJobFinished( KJob* job ) -{ - if ( !pendingJobs.contains( job ) ) { - kWarning() << "Unknown job - wtf is going on here?"; - return; - } - NotificationMessage msg = pendingJobs.take( job ); - Item item; - Collection col; - Collection destCol; - if ( job->error() ) { - kWarning() << "Error on fetching item:" << job->errorText(); - } else { - ItemFetchJob *fetchJob = qobject_cast( job ); - if ( fetchJob && fetchJob->items().count() > 0 ) - item = fetchJob->items().first(); - ItemCollectionFetchJob *cfjob = qobject_cast( job ); - if ( cfjob ) { - item = cfjob->item(); - col = cfjob->collection(); - destCol = cfjob->destCollection(); - } - } - emitItemNotification( msg, item, col, destCol ); -} - -void MonitorPrivate::slotCollectionJobFinished( KJob* job ) -{ - if ( !pendingJobs.contains( job ) ) { - kWarning() << "Unknown job - wtf is going on here?"; - return; - } - NotificationMessage msg = pendingJobs.take( job ); - if ( job->error() ) { - kWarning() << "Error on fetching collection:" << job->errorText(); - } else { - CollectionFetchJob *listJob = qobject_cast( job ); - QHash cols; - foreach ( const Collection &c, listJob->collections() ) - cols.insert( c.id(), c ); - emitCollectionNotification( msg, cols.value( msg.uid() ), cols.value( msg.parentCollection() ), cols.value( msg.parentDestCollection() ) ); - } -} - void MonitorPrivate::invalidateCaches( const NotificationMessage &msg ) { // remove invalidates if ( msg.operation() == NotificationMessage::Remove ) { if ( msg.type() == NotificationMessage::Collection ) { collectionCache.invalidate( msg.uid() ); } else if ( msg.type() == NotificationMessage::Item ) { itemCache.invalidate( msg.uid() ); } } // modify removes the cache entry, as we need to re-fetch if ( msg.operation() == NotificationMessage::Modify ) { if ( msg.type() == NotificationMessage::Collection ) { collectionCache.update( msg.uid(), mCollectionFetchScope ); } else if ( msg.type() == NotificationMessage::Item ) { itemCache.update( msg.uid(), mItemFetchScope ); } } } - -ItemCollectionFetchJob::ItemCollectionFetchJob( const Item &item, Collection::Id collectionId, Collection::Id destCollectionId, QObject *parent ) - : Job( parent ), - mReferenceItem( item ), mCollectionId( collectionId ), mDestCollectionId( destCollectionId ) -{ -} - -ItemCollectionFetchJob::~ItemCollectionFetchJob() -{ -} - -Item ItemCollectionFetchJob::item() const -{ - return mItem; -} - -Collection ItemCollectionFetchJob::collection() const -{ - return mCollection; -} - -Collection ItemCollectionFetchJob::destCollection() const -{ - return mDestCollection; -} - -void ItemCollectionFetchJob::setFetchScope( const ItemFetchScope &fetchScope ) -{ - mFetchScope = fetchScope; -} - -void ItemCollectionFetchJob::doStart() -{ - CollectionFetchJob *listJob = new CollectionFetchJob( Collection( mCollectionId ), CollectionFetchJob::Base, this ); - connect( listJob, SIGNAL( result( KJob* ) ), SLOT( collectionJobDone( KJob* ) ) ); - addSubjob( listJob ); - - if ( mDestCollectionId > 0 ) { - CollectionFetchJob *destListJob = new CollectionFetchJob( Collection( mDestCollectionId ), CollectionFetchJob::Base, this ); - connect( destListJob, SIGNAL( result( KJob* ) ), SLOT( destCollectionJobDone( KJob* ) ) ); - addSubjob( destListJob ); - } - - - ItemFetchJob *fetchJob = new ItemFetchJob( mReferenceItem, this ); - fetchJob->setFetchScope( mFetchScope ); - connect( fetchJob, SIGNAL( result( KJob* ) ), SLOT( itemJobDone( KJob* ) ) ); - addSubjob( fetchJob ); -} - -void ItemCollectionFetchJob::collectionJobDone( KJob* job ) -{ - if ( !job->error() ) { - CollectionFetchJob *listJob = qobject_cast( job ); - if ( listJob->collections().isEmpty() ) { - setError( 1 ); - setErrorText( QLatin1String( "No collection found" ) ); - } else - mCollection = listJob->collections().first(); - } -} - -void ItemCollectionFetchJob::destCollectionJobDone( KJob* job ) -{ - if ( !job->error() ) { - CollectionFetchJob *listJob = qobject_cast( job ); - if ( listJob->collections().isEmpty() ) { - setError( 1 ); - setErrorText( QLatin1String( "No collection found" ) ); - } else - mDestCollection = listJob->collections().first(); - } -} - -void ItemCollectionFetchJob::itemJobDone( KJob* job ) -{ - if ( !job->error() ) { - ItemFetchJob *fetchJob = qobject_cast( job ); - if ( fetchJob->items().isEmpty() ) { - setError( 2 ); - setErrorText( QLatin1String( "No item found" ) ); - } else - mItem = fetchJob->items().first(); - - emitResult(); - } -} - // @endcond - -#include "monitor_p.moc" diff --git a/akonadi/monitor_p.h b/akonadi/monitor_p.h index 039dd361a..586d688fe 100644 --- a/akonadi/monitor_p.h +++ b/akonadi/monitor_p.h @@ -1,206 +1,164 @@ /* Copyright (c) 2007 Tobias Koenig This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #ifndef AKONADI_MONITOR_P_H #define AKONADI_MONITOR_P_H #include "monitor.h" #include "collection.h" #include "collectionstatisticsjob.h" #include "collectionfetchscope.h" #include "item.h" #include "itemfetchscope.h" #include "job.h" #include #include "notificationmanagerinterface.h" #include "entitycache_p.h" #include #include #include namespace Akonadi { class Monitor; /** * @internal */ class MonitorPrivate { public: MonitorPrivate( Monitor *parent ); virtual ~MonitorPrivate() {} void init(); Monitor *q_ptr; Q_DECLARE_PUBLIC( Monitor ) org::freedesktop::Akonadi::NotificationManager *nm; Collection::List collections; QSet resources; QSet items; QSet mimetypes; bool monitorAll; QList sessions; ItemFetchScope mItemFetchScope; CollectionFetchScope mCollectionFetchScope; - QHash pendingJobs; CollectionCache collectionCache; ItemCache itemCache; QQueue pendingNotifications; QQueue pipeline; bool fetchCollection; bool fetchCollectionStatistics; bool isCollectionMonitored( Collection::Id collection, const QByteArray &resource ) const { if ( monitorAll || isCollectionMonitored( collection ) || resources.contains( resource ) ) return true; return false; } bool isItemMonitored( Item::Id item, Collection::Id collection, Collection::Id collectionDest, const QString &mimetype, const QByteArray &resource ) const { if ( monitorAll || isCollectionMonitored( collection ) || isCollectionMonitored( collectionDest ) ||items.contains( item ) || resources.contains( resource ) || isMimeTypeMonitored( mimetype ) ) return true; return false; } bool isSessionIgnored( const QByteArray &sessionId ) const { return sessions.contains( sessionId ); } bool connectToNotificationManager(); bool acceptNotification( const NotificationMessage &msg ); void dispatchNotifications(); bool ensureDataAvailable( const NotificationMessage &msg ); void emitNotification( const NotificationMessage &msg ); void updatePendingStatistics( const NotificationMessage &msg ); - bool processNotification( const NotificationMessage &msg ); - void invalidateCaches( const NotificationMessage &msg ); + virtual int pipelineSize() const; + // private slots void dataAvailable(); void slotSessionDestroyed( QObject* ); void slotStatisticsChangedFinished( KJob* ); void slotFlushRecentlyChangedCollections(); virtual void slotNotify( const NotificationMessage::List &msgs ); - void slotItemJobFinished( KJob* job ); - void slotCollectionJobFinished( KJob *job ); void emitItemNotification( const NotificationMessage &msg, const Item &item = Item(), const Collection &collection = Collection(), const Collection &collectionDest = Collection() ); void emitCollectionNotification( const NotificationMessage &msg, const Collection &col = Collection(), const Collection &par = Collection(), const Collection &dest = Collection() ); private: // collections that need a statistics update QSet recentlyChangedCollections; bool isCollectionMonitored( Collection::Id collection ) const { if ( collections.contains( Collection( collection ) ) ) return true; if ( collections.contains( Collection::root() ) ) return true; return false; } bool isMimeTypeMonitored( const QString& mimetype ) const { if ( mimetypes.contains( mimetype ) ) return true; KMimeType::Ptr mimeType = KMimeType::mimeType( mimetype, KMimeType::ResolveAliases ); if ( mimeType.isNull() ) return false; foreach ( const QString &mt, mimetypes ) { if ( mimeType->is( mt ) ) return true; } return false; } void fetchStatistics( Collection::Id colId ) { CollectionStatisticsJob *job = new CollectionStatisticsJob( Collection( colId ), q_ptr ); QObject::connect( job, SIGNAL(result(KJob*)), q_ptr, SLOT(slotStatisticsChangedFinished(KJob*)) ); } void notifyCollectionStatisticsWatchers( Collection::Id collection, const QByteArray &resource ) { if ( isCollectionMonitored( collection, resource ) ) { if (recentlyChangedCollections.empty() ) QTimer::singleShot( 500, q_ptr, SLOT(slotFlushRecentlyChangedCollections()) ); recentlyChangedCollections.insert( collection ); } } }; - -/** - * @internal - * - * A job which fetches an item and a collection. - */ -class AKONADI_EXPORT ItemCollectionFetchJob : public Job -{ - Q_OBJECT - - public: - explicit ItemCollectionFetchJob( const Item &item, Collection::Id collectionId, Collection::Id destCollectionId, QObject *parent = 0 ); - ~ItemCollectionFetchJob(); - - Item item() const; - Collection collection() const; - Collection destCollection() const; - - void setFetchScope( const ItemFetchScope &fetchScope ); - - protected: - virtual void doStart(); - - private Q_SLOTS: - void collectionJobDone( KJob* job ); - void destCollectionJobDone( KJob* job ); - void itemJobDone( KJob* job ); - - private: - Item mReferenceItem; - Collection::Id mCollectionId; - Collection::Id mDestCollectionId; - - Item mItem; - Collection mCollection; - Collection mDestCollection; - ItemFetchScope mFetchScope; -}; - } #endif