diff --git a/akonadi/resourcebase.cpp b/akonadi/resourcebase.cpp index 2d8e4f2ac..31359a387 100644 --- a/akonadi/resourcebase.cpp +++ b/akonadi/resourcebase.cpp @@ -1,662 +1,664 @@ /* Copyright (c) 2006 Till Adam Copyright (c) 2007 Volker Krause This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #include "resourcebase.h" #include "agentbase_p.h" #include "resourceadaptor.h" #include "collectiondeletejob.h" #include "collectionsync_p.h" #include "itemsync.h" #include "resourcescheduler_p.h" #include "tracerinterface.h" #include "xdgbasedirs_p.h" #include "changerecorder.h" #include "collectionfetchjob.h" #include "collectionfetchscope.h" #include "collectionmodifyjob.h" #include "itemfetchjob.h" #include "itemfetchscope.h" #include "itemmodifyjob.h" #include "itemmodifyjob_p.h" #include "session.h" #include "resourceselectjob_p.h" #include "monitor_p.h" #include #include #include #include #include #include #include #include #include #include #include using namespace Akonadi; class Akonadi::ResourceBasePrivate : public AgentBasePrivate { public: ResourceBasePrivate( ResourceBase *parent ) : AgentBasePrivate( parent ), scheduler( 0 ), mItemSyncer( 0 ), mCollectionSyncer( 0 ), mHierarchicalRid( false ) { mStatusMessage = defaultReadyMessage(); } Q_DECLARE_PUBLIC( ResourceBase ) void delayedInit() { if ( !QDBusConnection::sessionBus().registerService( QLatin1String( "org.freedesktop.Akonadi.Resource." ) + mId ) ) kFatal() << "Unable to register service at D-Bus: " << QDBusConnection::sessionBus().lastError().message(); AgentBasePrivate::delayedInit(); } virtual void changeProcessed() { mMonitor->changeProcessed(); if ( !mMonitor->isEmpty() ) scheduler->scheduleChangeReplay(); scheduler->taskDone(); } void slotDeliveryDone( KJob* job ); void slotCollectionSyncDone( KJob *job ); void slotLocalListDone( KJob *job ); void slotSynchronizeCollection( const Collection &col ); void slotCollectionListDone( KJob *job ); void slotItemSyncDone( KJob *job ); void slotPercent( KJob* job, unsigned long percent ); void slotDeleteResourceCollection(); void slotDeleteResourceCollectionDone( KJob *job ); void slotCollectionDeletionDone( KJob *job ); void slotPrepareItemRetrieval( const Akonadi::Item &item ); void slotPrepareItemRetrievalResult( KJob* job ); void changeCommittedResult( KJob* job ); // synchronize states Collection currentCollection; ResourceScheduler *scheduler; ItemSync *mItemSyncer; CollectionSync *mCollectionSyncer; bool mHierarchicalRid; }; ResourceBase::ResourceBase( const QString & id ) : AgentBase( new ResourceBasePrivate( this ), id ) { Q_D( ResourceBase ); new ResourceAdaptor( this ); d->scheduler = new ResourceScheduler( this ); d->mMonitor->setChangeRecordingEnabled( true ); connect( d->mMonitor, SIGNAL( changesAdded() ), d->scheduler, SLOT( scheduleChangeReplay() ) ); d->mMonitor->setResourceMonitored( d->mId.toLatin1() ); connect( d->scheduler, SIGNAL( executeFullSync() ), SLOT( retrieveCollections() ) ); connect( d->scheduler, SIGNAL( executeCollectionTreeSync() ), SLOT( retrieveCollections() ) ); connect( d->scheduler, SIGNAL( executeCollectionSync( const Akonadi::Collection& ) ), SLOT( slotSynchronizeCollection( const Akonadi::Collection& ) ) ); connect( d->scheduler, SIGNAL( executeItemFetch( const Akonadi::Item&, const QSet& ) ), SLOT( slotPrepareItemRetrieval(Akonadi::Item)) ); connect( d->scheduler, SIGNAL( executeResourceCollectionDeletion() ), SLOT( slotDeleteResourceCollection() ) ); connect( d->scheduler, SIGNAL( status( int, const QString& ) ), SIGNAL( status( int, const QString& ) ) ); connect( d->scheduler, SIGNAL( executeChangeReplay() ), d->mMonitor, SLOT( replayNext() ) ); connect( d->scheduler, SIGNAL( fullSyncComplete() ), SIGNAL( synchronized() ) ); connect( d->mMonitor, SIGNAL( nothingToReplay() ), d->scheduler, SLOT( taskDone() ) ); + connect( d->mMonitor, SIGNAL(collectionRemoved(Akonadi::Collection)), + d->scheduler, SLOT(collectionRemoved(Akonadi::Collection)) ); connect( this, SIGNAL( synchronized() ), d->scheduler, SLOT( taskDone() ) ); connect( this, SIGNAL( agentNameChanged( const QString& ) ), this, SIGNAL( nameChanged( const QString& ) ) ); d->scheduler->setOnline( d->mOnline ); if ( !d->mMonitor->isEmpty() ) d->scheduler->scheduleChangeReplay(); new ResourceSelectJob( identifier() ); } ResourceBase::~ResourceBase() { } void ResourceBase::synchronize() { d_func()->scheduler->scheduleFullSync(); } void ResourceBase::setName( const QString &name ) { AgentBase::setAgentName( name ); } QString ResourceBase::name() const { return AgentBase::agentName(); } QString ResourceBase::parseArguments( int argc, char **argv ) { QString identifier; if ( argc < 3 ) { kDebug() << "Not enough arguments passed..."; exit( 1 ); } for ( int i = 1; i < argc - 1; ++i ) { if ( QLatin1String( argv[ i ] ) == QLatin1String( "--identifier" ) ) identifier = QLatin1String( argv[ i + 1 ] ); } if ( identifier.isEmpty() ) { kDebug() << "Identifier argument missing"; exit( 1 ); } QByteArray catalog; char *p = strrchr( argv[0], '/' ); if ( p ) catalog = QByteArray( p + 1 ); else catalog = QByteArray( argv[0] ); KCmdLineArgs::init( argc, argv, identifier.toLatin1(), catalog, ki18nc("@title, application name", "Akonadi Resource"), "0.1", ki18nc("@title, application description", "Akonadi Resource") ); KCmdLineOptions options; options.add( "identifier ", ki18nc("@label, commandline option", "Resource identifier") ); KCmdLineArgs::addCmdLineOptions( options ); return identifier; } int ResourceBase::init( ResourceBase *r ) { QApplication::setQuitOnLastWindowClosed( false ); int rv = kapp->exec(); delete r; return rv; } void ResourceBase::itemRetrieved( const Item &item ) { Q_D( ResourceBase ); Q_ASSERT( d->scheduler->currentTask().type == ResourceScheduler::FetchItem ); if ( !item.isValid() ) { QDBusMessage reply( d->scheduler->currentTask().dbusMsg ); reply << false; QDBusConnection::sessionBus().send( reply ); d->scheduler->taskDone(); return; } Item i( item ); QSet requestedParts = d->scheduler->currentTask().itemParts; foreach ( const QByteArray &part, requestedParts ) { if ( !item.loadedPayloadParts().contains( part ) ) { kWarning() << "Item does not provide part" << part; } } ItemModifyJob *job = new ItemModifyJob( i ); // FIXME: remove once the item with which we call retrieveItem() has a revision number job->disableRevisionCheck(); connect( job, SIGNAL( result( KJob* ) ), SLOT( slotDeliveryDone( KJob* ) ) ); } void ResourceBasePrivate::slotDeliveryDone(KJob * job) { Q_Q( ResourceBase ); Q_ASSERT( scheduler->currentTask().type == ResourceScheduler::FetchItem ); QDBusMessage reply( scheduler->currentTask().dbusMsg ); if ( job->error() ) { emit q->error( QLatin1String( "Error while creating item: " ) + job->errorString() ); reply << false; } else { reply << true; } QDBusConnection::sessionBus().send( reply ); scheduler->taskDone(); } void ResourceBasePrivate::slotDeleteResourceCollection() { Q_Q( ResourceBase ); CollectionFetchJob *job = new CollectionFetchJob( Collection::root(), CollectionFetchJob::FirstLevel ); job->fetchScope().setResource( q->identifier() ); connect( job, SIGNAL( result( KJob* ) ), q, SLOT( slotDeleteResourceCollectionDone( KJob* ) ) ); } void ResourceBasePrivate::slotDeleteResourceCollectionDone( KJob *job ) { Q_Q( ResourceBase ); if ( job->error() ) { emit q->error( job->errorString() ); scheduler->taskDone(); } else { const CollectionFetchJob *fetchJob = static_cast( job ); if ( !fetchJob->collections().isEmpty() ) { CollectionDeleteJob *job = new CollectionDeleteJob( fetchJob->collections().first() ); connect( job, SIGNAL( result( KJob* ) ), q, SLOT( slotCollectionDeletionDone( KJob* ) ) ); } else { // there is no resource collection, so just ignore the request scheduler->taskDone(); } } } void ResourceBasePrivate::slotCollectionDeletionDone( KJob *job ) { Q_Q( ResourceBase ); if ( job->error() ) { emit q->error( job->errorString() ); } scheduler->taskDone(); } void ResourceBase::changeCommitted( const Item& item ) { Q_D( ResourceBase ); ItemModifyJob *job = new ItemModifyJob( item ); job->d_func()->setClean(); job->disableRevisionCheck(); // TODO: remove, but where/how do we handle the error? job->ignorePayload(); // we only want to reset the dirty flag and update the remote id d->changeProcessed(); } void ResourceBase::changeCommitted( const Collection &collection ) { CollectionModifyJob *job = new CollectionModifyJob( collection ); connect( job, SIGNAL(result(KJob*)), SLOT(changeCommittedResult(KJob*)) ); } void ResourceBasePrivate::changeCommittedResult( KJob *job ) { Q_Q( ResourceBase ); if ( job->error() ) emit q->error( i18n( "Updating local collection failed: %1.", job->errorText() ) ); mMonitor->d_ptr->invalidateCache( static_cast( job )->collection() ); changeProcessed(); } bool ResourceBase::requestItemDelivery( qint64 uid, const QString & remoteId, const QString &mimeType, const QStringList &_parts ) { Q_D( ResourceBase ); if ( !isOnline() ) { emit error( i18nc( "@info", "Cannot fetch item in offline mode." ) ); return false; } setDelayedReply( true ); // FIXME: we need at least the revision number too Item item( uid ); item.setMimeType( mimeType ); item.setRemoteId( remoteId ); QSet parts; Q_FOREACH( const QString &str, _parts ) parts.insert( str.toLatin1() ); d->scheduler->scheduleItemFetch( item, parts, message().createReply() ); return true; } void ResourceBase::collectionsRetrieved( const Collection::List & collections ) { Q_D( ResourceBase ); Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree || d->scheduler->currentTask().type == ResourceScheduler::SyncAll, "ResourceBase::collectionsRetrieved()", "Calling collectionsRetrieved() although no collection retrieval is in progress" ); if ( !d->mCollectionSyncer ) { d->mCollectionSyncer = new CollectionSync( identifier() ); d->mCollectionSyncer->setHierarchicalRemoteIds( d->mHierarchicalRid ); connect( d->mCollectionSyncer, SIGNAL( percent( KJob*, unsigned long ) ), SLOT( slotPercent( KJob*, unsigned long ) ) ); connect( d->mCollectionSyncer, SIGNAL( result( KJob* ) ), SLOT( slotCollectionSyncDone( KJob* ) ) ); } d->mCollectionSyncer->setRemoteCollections( collections ); } void ResourceBase::collectionsRetrievedIncremental( const Collection::List & changedCollections, const Collection::List & removedCollections ) { Q_D( ResourceBase ); Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree || d->scheduler->currentTask().type == ResourceScheduler::SyncAll, "ResourceBase::collectionsRetrievedIncremental()", "Calling collectionsRetrievedIncremental() although no collection retrieval is in progress" ); if ( !d->mCollectionSyncer ) { d->mCollectionSyncer = new CollectionSync( identifier() ); d->mCollectionSyncer->setHierarchicalRemoteIds( d->mHierarchicalRid ); connect( d->mCollectionSyncer, SIGNAL( percent( KJob*, unsigned long ) ), SLOT( slotPercent( KJob*, unsigned long ) ) ); connect( d->mCollectionSyncer, SIGNAL( result( KJob* ) ), SLOT( slotCollectionSyncDone( KJob* ) ) ); } d->mCollectionSyncer->setRemoteCollections( changedCollections, removedCollections ); } void ResourceBase::setCollectionStreamingEnabled( bool enable ) { Q_D( ResourceBase ); Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree || d->scheduler->currentTask().type == ResourceScheduler::SyncAll, "ResourceBase::setCollectionStreamingEnabled()", "Calling setCollectionStreamingEnabled() although no collection retrieval is in progress" ); if ( !d->mCollectionSyncer ) { d->mCollectionSyncer = new CollectionSync( identifier() ); d->mCollectionSyncer->setHierarchicalRemoteIds( d->mHierarchicalRid ); connect( d->mCollectionSyncer, SIGNAL( percent( KJob*, unsigned long ) ), SLOT( slotPercent( KJob*, unsigned long ) ) ); connect( d->mCollectionSyncer, SIGNAL( result( KJob* ) ), SLOT( slotCollectionSyncDone( KJob* ) ) ); } d->mCollectionSyncer->setStreamingEnabled( enable ); } void ResourceBase::collectionsRetrievalDone() { Q_D( ResourceBase ); Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::SyncCollectionTree || d->scheduler->currentTask().type == ResourceScheduler::SyncAll, "ResourceBase::collectionsRetrievalDone()", "Calling collectionsRetrievalDone() although no collection retrieval is in progress" ); // streaming enabled, so finalize the sync if ( d->mCollectionSyncer ) { d->mCollectionSyncer->retrievalDone(); } // user did the sync himself, we are done now else { // FIXME: we need the same special case for SyncAll as in slotCollectionSyncDone here! d->scheduler->taskDone(); } } void ResourceBasePrivate::slotCollectionSyncDone( KJob * job ) { Q_Q( ResourceBase ); mCollectionSyncer = 0; if ( job->error() ) { emit q->error( job->errorString() ); } else { if ( scheduler->currentTask().type == ResourceScheduler::SyncAll ) { CollectionFetchJob *list = new CollectionFetchJob( Collection::root(), CollectionFetchJob::Recursive ); list->fetchScope().setResource( mId ); list->fetchScope().setAncestorRetrieval( q->changeRecorder()->collectionFetchScope().ancestorRetrieval() ); q->connect( list, SIGNAL( result( KJob* ) ), q, SLOT( slotLocalListDone( KJob* ) ) ); return; } } scheduler->taskDone(); } void ResourceBasePrivate::slotLocalListDone( KJob * job ) { Q_Q( ResourceBase ); if ( job->error() ) { emit q->error( job->errorString() ); } else { Collection::List cols = static_cast( job )->collections(); foreach ( const Collection &col, cols ) { scheduler->scheduleSync( col ); } scheduler->scheduleFullSyncCompletion(); } scheduler->taskDone(); } void ResourceBasePrivate::slotSynchronizeCollection( const Collection &col ) { Q_Q( ResourceBase ); currentCollection = col; // check if this collection actually can contain anything QStringList contentTypes = currentCollection.contentMimeTypes(); contentTypes.removeAll( Collection::mimeType() ); if ( !contentTypes.isEmpty() ) { emit q->status( AgentBase::Running, i18nc( "@info:status", "Syncing collection '%1'", currentCollection.name() ) ); q->retrieveItems( currentCollection ); return; } scheduler->taskDone(); } void ResourceBasePrivate::slotPrepareItemRetrieval( const Akonadi::Item &item ) { Q_Q( ResourceBase ); ItemFetchJob *fetch = new ItemFetchJob( item, this ); fetch->fetchScope().setAncestorRetrieval( q->changeRecorder()->itemFetchScope().ancestorRetrieval() ); q->connect( fetch, SIGNAL(result(KJob*)), SLOT(slotPrepareItemRetrievalResult(KJob*)) ); } void ResourceBasePrivate::slotPrepareItemRetrievalResult( KJob* job ) { Q_Q( ResourceBase ); Q_ASSERT_X( scheduler->currentTask().type == ResourceScheduler::FetchItem, "ResourceBasePrivate::slotPrepareItemRetrievalResult()", "Preparing item retrieval although no item retrieval is in progress" ); if ( job->error() ) { q->cancelTask( job->errorText() ); return; } ItemFetchJob *fetch = qobject_cast( job ); if ( fetch->items().count() != 1 ) { q->cancelTask( QLatin1String("The requested item does no longer exist") ); return; } const Item item = fetch->items().first(); const QSet parts = scheduler->currentTask().itemParts; if ( !q->retrieveItem( item, parts ) ) q->cancelTask(); } void ResourceBase::itemsRetrievalDone() { Q_D( ResourceBase ); // streaming enabled, so finalize the sync if ( d->mItemSyncer ) { d->mItemSyncer->deliveryDone(); } // user did the sync himself, we are done now else { d->scheduler->taskDone(); } } void ResourceBase::clearCache() { Q_D( ResourceBase ); d->scheduler->scheduleResourceCollectionDeletion(); } Collection ResourceBase::currentCollection() const { Q_D( const ResourceBase ); Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::SyncCollection , "ResourceBase::currentCollection()", "Trying to access current collection although no item retrieval is in progress" ); return d->currentCollection; } Item ResourceBase::currentItem() const { Q_D( const ResourceBase ); Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::FetchItem , "ResourceBase::currentItem()", "Trying to access current item although no item retrieval is in progress" ); return d->scheduler->currentTask().item; } void ResourceBase::synchronizeCollectionTree() { d_func()->scheduler->scheduleCollectionTreeSync(); } void ResourceBase::cancelTask() { Q_D( ResourceBase ); switch ( d->scheduler->currentTask().type ) { case ResourceScheduler::FetchItem: itemRetrieved( Item() ); // sends the error reply and break; case ResourceScheduler::ChangeReplay: d->changeProcessed(); break; default: d->scheduler->taskDone(); } } void ResourceBase::cancelTask( const QString &msg ) { cancelTask(); emit error( msg ); } void ResourceBase::deferTask() { Q_D( ResourceBase ); d->scheduler->deferTask(); } void ResourceBase::doSetOnline( bool state ) { d_func()->scheduler->setOnline( state ); } void ResourceBase::synchronizeCollection( qint64 collectionId ) { CollectionFetchJob* job = new CollectionFetchJob( Collection( collectionId ), CollectionFetchJob::Base ); job->fetchScope().setResource( identifier() ); job->fetchScope().setAncestorRetrieval( changeRecorder()->collectionFetchScope().ancestorRetrieval() ); connect( job, SIGNAL( result( KJob* ) ), SLOT( slotCollectionListDone( KJob* ) ) ); } void ResourceBasePrivate::slotCollectionListDone( KJob *job ) { if ( !job->error() ) { Collection::List list = static_cast( job )->collections(); if ( !list.isEmpty() ) { Collection col = list.first(); scheduler->scheduleSync( col ); } } // TODO: error handling } void ResourceBase::setTotalItems( int amount ) { kDebug() << amount; Q_D( ResourceBase ); setItemStreamingEnabled( true ); d->mItemSyncer->setTotalItems( amount ); } void ResourceBase::setItemStreamingEnabled( bool enable ) { Q_D( ResourceBase ); Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::SyncCollection, "ResourceBase::setItemStreamingEnabled()", "Calling setItemStreamingEnabled() although no item retrieval is in progress" ); if ( !d->mItemSyncer ) { d->mItemSyncer = new ItemSync( currentCollection() ); connect( d->mItemSyncer, SIGNAL( percent( KJob*, unsigned long ) ), SLOT( slotPercent( KJob*, unsigned long ) ) ); connect( d->mItemSyncer, SIGNAL( result( KJob* ) ), SLOT( slotItemSyncDone( KJob* ) ) ); } d->mItemSyncer->setStreamingEnabled( enable ); } void ResourceBase::itemsRetrieved( const Item::List &items ) { Q_D( ResourceBase ); Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::SyncCollection, "ResourceBase::itemsRetrieved()", "Calling itemsRetrieved() although no item retrieval is in progress" ); if ( !d->mItemSyncer ) { d->mItemSyncer = new ItemSync( currentCollection() ); connect( d->mItemSyncer, SIGNAL( percent( KJob*, unsigned long ) ), SLOT( slotPercent( KJob*, unsigned long ) ) ); connect( d->mItemSyncer, SIGNAL( result( KJob* ) ), SLOT( slotItemSyncDone( KJob* ) ) ); } d->mItemSyncer->setFullSyncItems( items ); } void ResourceBase::itemsRetrievedIncremental( const Item::List &changedItems, const Item::List &removedItems ) { Q_D( ResourceBase ); Q_ASSERT_X( d->scheduler->currentTask().type == ResourceScheduler::SyncCollection, "ResourceBase::itemsRetrievedIncremental()", "Calling itemsRetrievedIncremental() although no item retrieval is in progress" ); if ( !d->mItemSyncer ) { d->mItemSyncer = new ItemSync( currentCollection() ); connect( d->mItemSyncer, SIGNAL( percent( KJob*, unsigned long ) ), SLOT( slotPercent( KJob*, unsigned long ) ) ); connect( d->mItemSyncer, SIGNAL( result( KJob* ) ), SLOT( slotItemSyncDone( KJob* ) ) ); } d->mItemSyncer->setIncrementalSyncItems( changedItems, removedItems ); } void ResourceBasePrivate::slotItemSyncDone( KJob *job ) { mItemSyncer = 0; Q_Q( ResourceBase ); if ( job->error() ) { emit q->error( job->errorString() ); } scheduler->taskDone(); } void ResourceBasePrivate::slotPercent( KJob *job, unsigned long percent ) { Q_Q( ResourceBase ); Q_UNUSED( job ); emit q->percent( percent ); } void ResourceBase::setHierarchicalRemoteIdentifiersEnabled( bool enable ) { Q_D( ResourceBase ); d->mHierarchicalRid = enable; } #include "resourcebase.moc" diff --git a/akonadi/resourcescheduler.cpp b/akonadi/resourcescheduler.cpp index 43ef31ef1..fc585e219 100644 --- a/akonadi/resourcescheduler.cpp +++ b/akonadi/resourcescheduler.cpp @@ -1,246 +1,259 @@ /* Copyright (c) 2007 Volker Krause This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #include "resourcescheduler_p.h" #include #include #include #include using namespace Akonadi; qint64 ResourceScheduler::Task::latestSerial = 0; static QDBusAbstractInterface *s_resourcetracker = 0; //@cond PRIVATE ResourceScheduler::ResourceScheduler( QObject *parent ) : QObject( parent ), mOnline( false ) { } void ResourceScheduler::scheduleFullSync() { Task t; t.type = SyncAll; if ( !mTaskList.isEmpty() && ( mTaskList.last() == t || mCurrentTask == t ) ) return; mTaskList << t; signalTaskToTracker( t, "SyncAll" ); scheduleNext(); } void ResourceScheduler::scheduleCollectionTreeSync() { Task t; t.type = SyncCollectionTree; if ( !mTaskList.isEmpty() && ( mTaskList.last() == t || mCurrentTask == t ) ) return; mTaskList << t; signalTaskToTracker( t, "SyncCollectionTree" ); scheduleNext(); } void ResourceScheduler::scheduleSync(const Collection & col) { Task t; t.type = SyncCollection; t.collection = col; if ( !mTaskList.isEmpty() && ( mTaskList.last() == t || mCurrentTask == t ) ) return; mTaskList << t; signalTaskToTracker( t, "SyncCollection" ); scheduleNext(); } void ResourceScheduler::scheduleItemFetch(const Item & item, const QSet &parts, const QDBusMessage & msg) { Task t; t.type = FetchItem; t.item = item; t.itemParts = parts; t.dbusMsg = msg; if ( !mTaskList.isEmpty() && ( mTaskList.last() == t || mCurrentTask == t ) ) return; mTaskList << t; signalTaskToTracker( t, "FetchItem" ); scheduleNext(); } void ResourceScheduler::scheduleResourceCollectionDeletion() { Task t; t.type = DeleteResourceCollection; if ( !mTaskList.isEmpty() && ( mTaskList.last() == t || mCurrentTask == t ) ) return; mTaskList << t; signalTaskToTracker( t, "DeleteResourceCollection" ); scheduleNext(); } void ResourceScheduler::scheduleChangeReplay() { Task t; t.type = ChangeReplay; if ( mTaskList.contains( t ) ) return; mTaskList << t; signalTaskToTracker( t, "ChangeReplay" ); scheduleNext(); } void Akonadi::ResourceScheduler::scheduleFullSyncCompletion() { Task t; t.type = SyncAllDone; mTaskList << t; signalTaskToTracker( t, "SyncAllDone" ); scheduleNext(); } void ResourceScheduler::taskDone() { if ( isEmpty() ) emit status( AgentBase::Idle ); if ( s_resourcetracker ) { QList argumentList; argumentList << QString::number( mCurrentTask.serial ) << QString(); s_resourcetracker->asyncCallWithArgumentList(QLatin1String("jobEnded"), argumentList); } mCurrentTask = Task(); scheduleNext(); } void ResourceScheduler::deferTask() { if ( s_resourcetracker ) { QList argumentList; argumentList << QString::number( mCurrentTask.serial ) << QString(); s_resourcetracker->asyncCallWithArgumentList(QLatin1String("jobEnded"), argumentList); } Task t = mCurrentTask; mCurrentTask = Task(); mTaskList << t; signalTaskToTracker( t, "DeferedTask" ); scheduleNext(); } bool ResourceScheduler::isEmpty() { return mTaskList.isEmpty(); } void ResourceScheduler::scheduleNext() { if ( mCurrentTask.type != Invalid || mTaskList.isEmpty() || !mOnline ) return; QTimer::singleShot( 0, this, SLOT(executeNext()) ); } void ResourceScheduler::executeNext() { if( mCurrentTask.type != Invalid || mTaskList.isEmpty() ) return; mCurrentTask = mTaskList.takeFirst(); if ( s_resourcetracker ) { QList argumentList; argumentList << QString::number( mCurrentTask.serial ); s_resourcetracker->asyncCallWithArgumentList(QLatin1String("jobStarted"), argumentList); } switch ( mCurrentTask.type ) { case SyncAll: emit executeFullSync(); break; case SyncCollectionTree: emit executeCollectionTreeSync(); break; case SyncCollection: emit executeCollectionSync( mCurrentTask.collection ); break; case FetchItem: emit executeItemFetch( mCurrentTask.item, mCurrentTask.itemParts ); break; case DeleteResourceCollection: emit executeResourceCollectionDeletion(); break; case ChangeReplay: emit executeChangeReplay(); break; case SyncAllDone: emit fullSyncComplete(); break; default: Q_ASSERT( false ); } } ResourceScheduler::Task ResourceScheduler::currentTask() const { return mCurrentTask; } void ResourceScheduler::setOnline(bool state) { if ( mOnline == state ) return; mOnline = state; if ( mOnline ) { scheduleNext(); } else if ( mCurrentTask.type != Invalid ) { // abort running task mTaskList.prepend( mCurrentTask ); mCurrentTask = Task(); } } void ResourceScheduler::signalTaskToTracker( const Task &task, const QByteArray &taskType ) { // if there's a job tracer running, tell it about the new job if ( !s_resourcetracker && QDBusConnection::sessionBus().interface()->isServiceRegistered(QLatin1String("org.kde.akonadiconsole") ) ) { s_resourcetracker = new QDBusInterface( QLatin1String("org.kde.akonadiconsole"), QLatin1String("/resourcesJobtracker"), QLatin1String("org.freedesktop.Akonadi.JobTracker"), QDBusConnection::sessionBus(), 0 ); } if ( s_resourcetracker ) { QList argumentList; argumentList << static_cast( parent() )->identifier() << QString::number( task.serial ) << QString() << QString::fromLatin1( taskType ); s_resourcetracker->asyncCallWithArgumentList(QLatin1String("jobCreated"), argumentList); } } +void ResourceScheduler::collectionRemoved( const Akonadi::Collection &collection ) +{ + if ( !collection.isValid() ) // should not happen, but you never know... + return; + for ( QList::iterator it = mTaskList.begin(); it != mTaskList.end(); ) { + if ( (*it).type == SyncCollection && (*it).collection == collection ) { + it = mTaskList.erase( it ); + kDebug() << " erasing"; + } else + ++it; + } +} + //@endcond #include "resourcescheduler_p.moc" diff --git a/akonadi/resourcescheduler_p.h b/akonadi/resourcescheduler_p.h index f870a6587..a10339a6f 100644 --- a/akonadi/resourcescheduler_p.h +++ b/akonadi/resourcescheduler_p.h @@ -1,173 +1,178 @@ /* Copyright (c) 2007 Volker Krause This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #ifndef AKONADI_RESOURCESCHEDULER_P_H #define AKONADI_RESOURCESCHEDULER_P_H #include #include #include #include #include #include namespace Akonadi { //@cond PRIVATE /** @internal Manages synchronization and fetch requests for a resource. @todo Attach to the ResourceBase Monitor, */ class ResourceScheduler : public QObject { Q_OBJECT public: enum TaskType { Invalid, SyncAll, SyncCollectionTree, SyncCollection, FetchItem, ChangeReplay, DeleteResourceCollection, SyncAllDone }; class Task { static qint64 latestSerial; public: Task() : serial( ++latestSerial ), type( Invalid ) {} qint64 serial; TaskType type; Collection collection; Item item; QSet itemParts; QDBusMessage dbusMsg; bool operator==( const Task &other ) const { return type == other.type && (collection == other.collection || (!collection.isValid() && !other.collection.isValid())) && (item == other.item || (!item.isValid() && !other.item.isValid())) && itemParts == other.itemParts; } }; ResourceScheduler( QObject *parent = 0 ); /** Schedules a full synchronization. */ void scheduleFullSync(); /** Schedules a collection tree sync. */ void scheduleCollectionTreeSync(); /** Schedules the synchronization of a single collection. @param col The collection to synchronize. */ void scheduleSync( const Collection &col ); /** Schedules fetching of a single PIM item. @param item The item to fetch. @param parts List of names of the parts of the item to fetch. @param msg The associated D-Bus message. */ void scheduleItemFetch( const Item &item, const QSet &parts, const QDBusMessage &msg ); /** Schedules deletion of the resource collection. This method is used to implement the ResourceBase::clearCache() functionality. */ void scheduleResourceCollectionDeletion(); /** Insert synchronization completetion marker into the task queue. */ void scheduleFullSyncCompletion(); /** Returns true if no tasks are running or in the queue. */ bool isEmpty(); /** Returns the current task. */ Task currentTask() const; /** Sets the online state. */ void setOnline( bool state ); public Q_SLOTS: /** Schedules replaying changes. */ void scheduleChangeReplay(); /** The current task has been finished */ void taskDone(); /** The current task can't be finished now and will be rescheduled later */ void deferTask(); + /** + Remove tasks that affect @p collection. + */ + void collectionRemoved( const Akonadi::Collection &collection ); + Q_SIGNALS: void executeFullSync(); void executeCollectionSync( const Akonadi::Collection &col ); void executeCollectionTreeSync(); void executeItemFetch( const Akonadi::Item &item, const QSet &parts ); void executeResourceCollectionDeletion(); void executeChangeReplay(); void fullSyncComplete(); void status( int status, const QString &message = QString() ); private slots: void scheduleNext(); void executeNext(); private: void signalTaskToTracker( const Task &task, const QByteArray &taskType ); QList mTaskList; Task mCurrentTask; bool mOnline; }; //@endcond } #endif