diff --git a/akonadi/collectionsync.cpp b/akonadi/collectionsync.cpp index ce5ab9702..4c6618212 100644 --- a/akonadi/collectionsync.cpp +++ b/akonadi/collectionsync.cpp @@ -1,221 +1,236 @@ /* Copyright (c) 2007 Volker Krause This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #include "collectionsync_p.h" #include "collection.h" #include "collectioncreatejob.h" #include "collectiondeletejob.h" #include "collectionfetchjob.h" #include "collectionmodifyjob.h" #include using namespace Akonadi; /** * @internal */ class CollectionSync::Private { public: Private() : pendingJobs( 0 ), - incremental( false ) + incremental( false ), + streaming( false ) { } QString resourceId; // local: mapped remote id -> collection, id -> collection QHash localCollections; QSet unprocessedLocalCollections; // remote: mapped id -> collection QHash remoteCollections; // remote collections waiting for a parent QList orphanRemoteCollections; // removed remote collections Collection::List removedRemoteCollections; // create counter int pendingJobs; bool incremental; + bool streaming; }; CollectionSync::CollectionSync( const QString &resourceId, QObject *parent ) : TransactionSequence( parent ), d( new Private ) { d->resourceId = resourceId; } CollectionSync::~CollectionSync() { delete d; } void CollectionSync::setRemoteCollections(const Collection::List & remoteCollections) { foreach ( const Collection &c, remoteCollections ) { d->remoteCollections.insert( c.id(), c ); } + if ( !d->streaming ) + retrievalDone(); } void CollectionSync::setRemoteCollections(const Collection::List & changedCollections, const Collection::List & removedCollections) { d->incremental = true; foreach ( const Collection &c, changedCollections ) { d->remoteCollections.insert( c.id(), c ); } - d->removedRemoteCollections = removedCollections; + d->removedRemoteCollections += removedCollections; + if ( !d->streaming ) + retrievalDone(); } void CollectionSync::doStart() { - CollectionFetchJob *job = new CollectionFetchJob( Collection::root(), CollectionFetchJob::Recursive, this ); - job->setResource( d->resourceId ); - connect( job, SIGNAL(result(KJob*)), SLOT(slotLocalListDone(KJob*)) ); } void CollectionSync::slotLocalListDone(KJob * job) { if ( job->error() ) return; Collection::List list = static_cast( job )->collections(); foreach ( const Collection &c, list ) { d->localCollections.insert( c.remoteId(), c ); d->unprocessedLocalCollections.insert( c ); } // added / updated foreach ( const Collection &c, d->remoteCollections ) { if ( c.remoteId().isEmpty() ) { kWarning( 5250 ) << "Collection '" << c.name() <<"' does not have a remote identifier - skipping"; continue; } Collection local = d->localCollections.value( c.remoteId() ); d->unprocessedLocalCollections.remove( local ); // missing locally if ( !local.isValid() ) { // determine local parent Collection localParent; if ( c.parent() >= 0 ) localParent = Collection( c.parent() ); if ( c.parentRemoteId().isEmpty() ) localParent = Collection::root(); else localParent = d->localCollections.value( c.parentRemoteId() ); // no local parent found, create later if ( !localParent.isValid() ) { d->orphanRemoteCollections << c; continue; } createLocalCollection( c, localParent ); continue; } // update local collection d->pendingJobs++; Collection upd( c ); upd.setId( local.id() ); CollectionModifyJob *mod = new CollectionModifyJob( upd, this ); connect( mod, SIGNAL(result(KJob*)), SLOT(slotLocalChangeDone(KJob*)) ); } // removed if ( !d->incremental ) d->removedRemoteCollections = d->unprocessedLocalCollections.toList(); foreach ( const Collection &c, d->removedRemoteCollections ) { d->pendingJobs++; CollectionDeleteJob *job = new CollectionDeleteJob( c, this ); connect( job, SIGNAL(result(KJob*)), SLOT(slotLocalChangeDone(KJob*)) ); } d->localCollections.clear(); checkDone(); } void CollectionSync::slotLocalCreateDone(KJob * job) { d->pendingJobs--; if ( job->error() ) return; Collection newLocal = static_cast( job )->collection(); // d->localCollections.insert( newLocal.remoteId(), newLocal ); // search for children we can create now Collection::List stillOrphans; foreach ( const Collection &orphan, d->orphanRemoteCollections ) { if ( orphan.parentRemoteId() == newLocal.remoteId() ) { createLocalCollection( orphan, newLocal ); } else { stillOrphans << orphan; } } d->orphanRemoteCollections = stillOrphans; checkDone(); } void CollectionSync::createLocalCollection(const Collection & c, const Collection & parent) { d->pendingJobs++; Collection col( c ); col.setParent( parent ); CollectionCreateJob *create = new CollectionCreateJob( col, this ); connect( create, SIGNAL(result(KJob*)), SLOT(slotLocalCreateDone(KJob*)) ); } void CollectionSync::checkDone() { // still running jobs if ( d->pendingJobs > 0 ) return; // still orphan collections if ( !d->orphanRemoteCollections.isEmpty() ) { setError( Unknown ); setErrorText( QLatin1String( "Found unresolved orphan collections" ) ); foreach ( const Collection &col, d->orphanRemoteCollections ) kDebug() << "found orphan collection:" << col.remoteId() << "parent:" << col.parentRemoteId(); } commit(); } void CollectionSync::slotLocalChangeDone(KJob * job) { if ( job->error() ) return; d->pendingJobs--; checkDone(); } +void CollectionSync::setStreamingEnabled( bool streaming ) +{ + d->streaming = streaming; +} + +void CollectionSync::retrievalDone() +{ + CollectionFetchJob *job = new CollectionFetchJob( Collection::root(), CollectionFetchJob::Recursive, this ); + job->setResource( d->resourceId ); + connect( job, SIGNAL(result(KJob*)), SLOT(slotLocalListDone(KJob*)) ); +} + #include "collectionsync_p.moc" diff --git a/akonadi/collectionsync_p.h b/akonadi/collectionsync_p.h index 7b5251ca8..db3885c08 100644 --- a/akonadi/collectionsync_p.h +++ b/akonadi/collectionsync_p.h @@ -1,85 +1,101 @@ /* Copyright (c) 2007 Volker Krause This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #ifndef AKONADI_COLLECTIONSYNC_P_H #define AKONADI_COLLECTIONSYNC_P_H #include #include namespace Akonadi { /** @internal Syncs remote and local collections. + + @todo Optimize the streaming case, so far only the interface supports streaming, + not the actual sync algorithm. */ class CollectionSync : public TransactionSequence { Q_OBJECT public: /** Creates a new collection synchronzier. @param resourceId The identifier of the resource we are syncing. @param parent The parent object. */ explicit CollectionSync( const QString &resourceId, QObject *parent = 0 ); /** Destroys this job. */ ~CollectionSync(); /** Sets the result of a full remote collection listing. @param remoteCollections A list of collections. Important: All of these need a unique remote identifier and parent remote identifier. */ void setRemoteCollections( const Collection::List &remoteCollections ); /** Sets the result of an incremental remote collection listing. @param changedCollections A list of remotely added or changed collections. @param removedCollections A lost of remotely deleted collections. */ void setRemoteCollections( const Collection::List &changedCollections, const Collection::List &removedCollections ); + /** + Enables streaming, that is not all collections are delivered at once. + Use setRemoteCollections() multiple times when streaming is enabled and call + retrievalDone() when all collections have been retrieved. + Must be called before the first call to setRemoteCollections(). + */ + void setStreamingEnabled( bool streaming ); + + /** + Indicate that all collections have been retrieved in streaming mode. + */ + void retrievalDone(); + protected: void doStart(); private: void createLocalCollection( const Collection &c, const Collection &parent ); void checkDone(); private Q_SLOTS: void slotLocalListDone( KJob *job ); void slotLocalCreateDone( KJob *job ); void slotLocalChangeDone( KJob *job ); private: class Private; Private* const d; }; } #endif diff --git a/akonadi/tests/collectionsynctest.cpp b/akonadi/tests/collectionsynctest.cpp index ec86fb6e2..f30523013 100644 --- a/akonadi/tests/collectionsynctest.cpp +++ b/akonadi/tests/collectionsynctest.cpp @@ -1,197 +1,196 @@ /* Copyright (c) 2009 Volker Krause This library is free software; you can redistribute it and/or modify it under the terms of the GNU Library General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Library General Public License for more details. You should have received a copy of the GNU Library General Public License along with this library; see the file COPYING.LIB. If not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. */ #include "test_utils.h" #include #include #include #include #include #include "../akonadi/collectionsync.cpp" #include #include #include #include using namespace Akonadi; Q_DECLARE_METATYPE( KJob* ) class CollectionSyncTest : public QObject { Q_OBJECT private: Collection::List fetchCollections( const QString &res ) { CollectionFetchJob *fetch = new CollectionFetchJob( Collection::root(), CollectionFetchJob::Recursive, this ); fetch->setResource( res ); Q_ASSERT( fetch->exec() ); Q_ASSERT( !fetch->collections().isEmpty() ); return fetch->collections(); } private slots: void initTestCase() { Control::start(); qRegisterMetaType(); // switch all resources offline to reduce interference from them foreach ( Akonadi::AgentInstance agent, Akonadi::AgentManager::self()->instances() ) agent.setIsOnline( false ); } void testFullSync() { Collection::List origCols = fetchCollections( "akonadi_knut_resource_0" ); CollectionSync* syncer = new CollectionSync( "akonadi_knut_resource_0" ); syncer->setRemoteCollections( origCols ); AKVERIFYEXEC( syncer ); Collection::List resultCols = fetchCollections( "akonadi_knut_resource_0" ); QCOMPARE( resultCols.count(), origCols.count() ); } void testFullStreamingSync() { Collection::List origCols = fetchCollections( "akonadi_knut_resource_0" ); CollectionSync* syncer = new CollectionSync( "akonadi_knut_resource_0" ); syncer->setAutoDelete( false ); QSignalSpy spy( syncer, SIGNAL(result(KJob*)) ); QVERIFY( spy.isValid() ); - // ### streaming not implemented yet -// syncer->setTotalItems( origCols.count() ); + syncer->setStreamingEnabled( true ); QTest::qWait( 10 ); QCOMPARE( spy.count(), 0 ); -// + for ( int i = 0; i < origCols.count(); ++i ) { -// Item::List l; -// l << origCols[i]; -// syncer->setFullSyncItems( l ); -// if ( i < origCols.count() - 1 ) -// QTest::qWait( 10 ); // enter the event loop so itemsync actually can do something -// QCOMPARE( spy.count(), 0 ); + Collection::List l; + l << origCols[i]; + syncer->setRemoteCollections( l ); + if ( i < origCols.count() - 1 ) + QTest::qWait( 10 ); // enter the event loop so itemsync actually can do something + QCOMPARE( spy.count(), 0 ); } -// QTest::qWait( 1000 ); // let it finish its job -// QCOMPARE( spy.count(), 1 ); -// KJob *job = spy.at( 0 ).at( 0 ).value(); -// QCOMPARE( job, syncer ); -// QCOMPARE( job->error(), 0 ); -// + syncer->retrievalDone(); + QTest::qWait( 1000 ); // let it finish its job + QCOMPARE( spy.count(), 1 ); + KJob *job = spy.at( 0 ).at( 0 ).value(); + QCOMPARE( job, syncer ); + QCOMPARE( job->error(), 0 ); + Collection::List resultCols = fetchCollections( "akonadi_knut_resource_0" ); QCOMPARE( resultCols.count(), origCols.count() ); delete syncer; } void testIncrementalSync() { Collection::List origCols = fetchCollections( "akonadi_knut_resource_0" ); CollectionSync* syncer = new CollectionSync( "akonadi_knut_resource_0" ); syncer->setRemoteCollections( origCols, Collection::List() ); AKVERIFYEXEC( syncer ); Collection::List resultCols = fetchCollections( "akonadi_knut_resource_0" ); QCOMPARE( resultCols.count(), origCols.count() ); Collection::List delCols; delCols << resultCols.front(); resultCols.pop_front(); // ### not implemented yet I guess #if 0 Collection colWithOnlyRemoteId; colWithOnlyRemoteId.setRemoteId( resultCols.front().remoteId() ); delCols << colWithOnlyRemoteId; resultCols.pop_front(); #endif #if 0 // ### should this work? Collection colWithRandomRemoteId; colWithRandomRemoteId.setRemoteId( KRandom::randomString( 100 ) ); delCols << colWithRandomRemoteId; #endif syncer = new CollectionSync( "akonadi_knut_resource_0" ); syncer->setRemoteCollections( resultCols, delCols ); AKVERIFYEXEC( syncer ); Collection::List resultCols2 = fetchCollections( "akonadi_knut_resource_0" ); QCOMPARE( resultCols2.count(), resultCols.count() ); } void testIncrementalStreamingSync() { Collection::List origCols = fetchCollections( "akonadi_knut_resource_0" ); CollectionSync* syncer = new CollectionSync( "akonadi_knut_resource_0" ); syncer->setAutoDelete( false ); QSignalSpy spy( syncer, SIGNAL(result(KJob*)) ); QVERIFY( spy.isValid() ); - // ### not implemented yet -// syncer->setStreamingEnabled( true ); + syncer->setStreamingEnabled( true ); QTest::qWait( 10 ); QCOMPARE( spy.count(), 0 ); for ( int i = 0; i < origCols.count(); ++i ) { -/* Item::List l; + Collection::List l; l << origCols[i]; - syncer->setIncrementalSyncItems( l, Item::List() ); + syncer->setRemoteCollections( l, Collection::List() ); if ( i < origCols.count() - 1 ) QTest::qWait( 10 ); // enter the event loop so itemsync actually can do something - QCOMPARE( spy.count(), 0 );*/ + QCOMPARE( spy.count(), 0 ); } -// syncer->deliveryDone(); -// QTest::qWait( 1000 ); // let it finish its job -// QCOMPARE( spy.count(), 1 ); -// KJob *job = spy.at( 0 ).at( 0 ).value(); -// QCOMPARE( job, syncer ); -// QCOMPARE( job->error(), 0 ); + syncer->retrievalDone(); + QTest::qWait( 1000 ); // let it finish its job + QCOMPARE( spy.count(), 1 ); + KJob *job = spy.at( 0 ).at( 0 ).value(); + QCOMPARE( job, syncer ); + QCOMPARE( job->error(), 0 ); Collection::List resultCols = fetchCollections( "akonadi_knut_resource_0" ); QCOMPARE( resultCols.count(), origCols.count() ); delete syncer; } void testEmptyIncrementalSync() { Collection::List origCols = fetchCollections( "akonadi_knut_resource_0" ); CollectionSync* syncer = new CollectionSync( "akonadi_knut_resource_0" ); syncer->setRemoteCollections( Collection::List(), Collection::List() ); AKVERIFYEXEC( syncer ); Collection::List resultCols = fetchCollections( "akonadi_knut_resource_0" ); QCOMPARE( resultCols.count(), origCols.count() ); } }; QTEST_AKONADIMAIN( CollectionSyncTest, NoGUI ) #include "collectionsynctest.moc"