diff --git a/src/app/Console/Commands/Data/MigrateCommand.php b/src/app/Console/Commands/Data/MigrateCommand.php index 245bb99a..e3238c01 100644 --- a/src/app/Console/Commands/Data/MigrateCommand.php +++ b/src/app/Console/Commands/Data/MigrateCommand.php @@ -1,63 +1,72 @@ argument('src')); $dst = new DataMigrator\Account($this->argument('dst')); + + $folderMapping = []; + foreach ($this->option('folder-mapping') as $mapping) { + $arr = explode(":", $mapping); + $folderMapping[$arr[0]] = $arr[1]; + } + $options = [ 'type' => $this->option('type'), 'force' => $this->option('force'), 'sync' => $this->option('sync'), + 'folderMapping' => $folderMapping, 'stdout' => true, ]; $migrator = new DataMigrator\Engine(); $migrator->migrate($src, $dst, $options); } } diff --git a/src/app/DataMigrator/Engine.php b/src/app/DataMigrator/Engine.php index b9f606fa..4405e47a 100644 --- a/src/app/DataMigrator/Engine.php +++ b/src/app/DataMigrator/Engine.php @@ -1,361 +1,368 @@ source = $source; $this->destination = $destination; $this->options = $options; // Create a unique identifier for the migration request $queue_id = md5(strval($source) . strval($destination) . ($options['type'] ?? '')); // TODO: When running in 'sync' mode we shouldn't create a queue at all // If queue exists, we'll display the progress only if ($queue = Queue::find($queue_id)) { // If queue contains no jobs, assume invalid // TODO: An better API to manage (reset) queues if (!$queue->jobs_started || !empty($options['force'])) { $queue->delete(); } else { while (true) { $this->debug(sprintf("Progress [%d of %d]\n", $queue->jobs_finished, $queue->jobs_started)); if ($queue->jobs_started == $queue->jobs_finished) { break; } sleep(1); $queue->refresh(); } return; } } // Initialize the source $this->exporter = $this->initDriver($source, ExporterInterface::class); $this->exporter->authenticate(); // Initialize the destination $this->importer = $this->initDriver($destination, ImporterInterface::class); $this->importer->authenticate(); // Create a queue $this->createQueue($queue_id); // We'll store temp files in storage/ tree $location = storage_path('export/') . $source->email; if (!file_exists($location)) { mkdir($location, 0740, true); } $types = empty($options['type']) ? [] : preg_split('/\s*,\s*/', strtolower($options['type'])); $this->debug("Fetching folders hierarchy..."); $folders = $this->exporter->getFolders($types); $count = 0; $async = empty($options['sync']); + $folderMapping = $this->options['folderMapping']; foreach ($folders as $folder) { $this->debug("Processing folder {$folder->fullname}..."); $folder->queueId = $queue_id; $folder->location = $location; + if (array_key_exists($folder->fullname, $folderMapping)) { + $folder->targetname = $folderMapping[$folder->fullname]; + } else { + $folder->targetname = $folder->fullname; + } + if ($async) { // Dispatch the job (for async execution) Jobs\FolderJob::dispatch($folder); $count++; } else { $this->processFolder($folder); } } if ($count) { $this->queue->bumpJobsStarted($count); } if ($async) { $this->debug(sprintf('Done. %d %s created in queue: %s.', $count, Str::plural('job', $count), $queue_id)); } else { $this->debug(sprintf('Done (queue: %s).', $queue_id)); } } /** * Processing of a folder synchronization */ public function processFolder(Folder $folder): void { // Job processing - initialize environment if (!$this->queue) { $this->envFromQueue($folder->queueId); } // Create the folder on the destination server $this->importer->createFolder($folder); $count = 0; $async = empty($this->options['sync']); // Fetch items from the source $this->exporter->fetchItemList( $folder, function ($item_or_set) use (&$count, $async) { if ($async) { // Dispatch the job (for async execution) if ($item_or_set instanceof ItemSet) { Jobs\ItemSetJob::dispatch($item_or_set); } else { Jobs\ItemJob::dispatch($item_or_set); } $count++; } else { if ($item_or_set instanceof ItemSet) { $this->processItemSet($item_or_set); } else { $this->processItem($item_or_set); } } }, $this->importer ); if ($count) { $this->queue->bumpJobsStarted($count); } if ($async) { $this->queue->bumpJobsFinished(); } } /** * Processing of item synchronization */ public function processItem(Item $item): void { // Job processing - initialize environment if (!$this->queue) { $this->envFromQueue($item->folder->queueId); } $this->exporter->fetchItem($item); $this->importer->createItem($item); if (!empty($item->filename) && str_starts_with($item->filename, storage_path('export/'))) { @unlink($item->filename); } if (empty($this->options['sync'])) { $this->queue->bumpJobsFinished(); } } /** * Processing of item-set synchronization */ public function processItemSet(ItemSet $set): void { // Job processing - initialize environment if (!$this->queue) { $this->envFromQueue($set->items[0]->folder->queueId); } $importItem = function (Item $item) { $this->importer->createItem($item); if (!empty($item->filename) && str_starts_with($item->filename, storage_path('export/'))) { @unlink($item->filename); } }; // Some exporters, e.g. DAV, might optimize fetching multiple items in one go if ($this->exporter instanceof FetchItemSetInterface) { $this->exporter->fetchItemSet($set, $importItem); } else { foreach ($set->items as $item) { $this->exporter->fetchItem($item); $importItem($item); } } // TODO: We should probably also track number of items migrated if (empty($this->options['sync'])) { $this->queue->bumpJobsFinished(); } } /** * Print progress/debug information */ public function debug($line) { if (!empty($this->options['stdout'])) { $output = new \Symfony\Component\Console\Output\ConsoleOutput(); $output->writeln("$line"); } else { \Log::debug("[DataMigrator] $line"); } } /** * Get migration option value. */ public function getOption(string $name) { return $this->options[$name] ?? null; } /** * Set migration queue option. Use this if you need to pass * some data between queue processes. */ public function setOption(string $name, $value): void { $this->options[$name] = $value; if ($this->queue) { $this->queue->data = $this->queueData(); $this->queue->save(); } } /** * Create a queue for the request * * @param string $queue_id Unique queue identifier */ protected function createQueue(string $queue_id): void { $this->queue = new Queue(); $this->queue->id = $queue_id; $this->queue->data = $this->queueData(); $this->queue->save(); } /** * Prepare queue data */ protected function queueData() { $options = $this->options; unset($options['stdout']); // jobs aren't in stdout anymore // TODO: data should be encrypted return [ 'source' => (string) $this->source, 'destination' => (string) $this->destination, 'options' => $options, ]; } /** * Initialize environment for job execution * * @param string $queueId Queue identifier */ protected function envFromQueue(string $queueId): void { $this->queue = Queue::findOrFail($queueId); $this->source = new Account($this->queue->data['source']); $this->destination = new Account($this->queue->data['destination']); $this->options = $this->queue->data['options']; $this->importer = $this->initDriver($this->destination, ImporterInterface::class); $this->exporter = $this->initDriver($this->source, ExporterInterface::class); } /** * Initialize (and select) migration driver */ protected function initDriver(Account $account, string $interface) { switch ($account->scheme) { case 'ews': $driver = new EWS($account, $this); break; case 'dav': case 'davs': $driver = new DAV($account, $this); break; case 'imap': case 'imaps': case 'tls': case 'ssl': $driver = new IMAP($account, $this); break; case 'test': $driver = new Test($account, $this); break; default: throw new \Exception("Failed to init driver for '{$account->scheme}'"); } // Make sure driver is used in the direction it supports if (!is_a($driver, $interface)) { throw new \Exception(sprintf( "'%s' driver does not implement %s", class_basename($driver), class_basename($interface) )); } return $driver; } } diff --git a/src/app/DataMigrator/IMAP.php b/src/app/DataMigrator/IMAP.php index fe59a416..9fb580be 100644 --- a/src/app/DataMigrator/IMAP.php +++ b/src/app/DataMigrator/IMAP.php @@ -1,464 +1,464 @@ account = $account; $this->engine = $engine; // TODO: Move this to self::authenticate()? $config = self::getConfig($account); $this->imap = self::initIMAP($config); } /** * Object destructor */ public function __destruct() { try { $this->imap->closeConnection(); } catch (\Throwable $e) { // Ignore. It may throw when destructing the object in tests // We also don't really care abount an error on this operation } } /** * Authenticate */ public function authenticate(): void { } /** * Create a folder. * * @param Folder $folder Folder data * * @throws \Exception on error */ public function createFolder(Folder $folder): void { if ($folder->type != 'mail') { throw new \Exception("IMAP does not support folder of type {$folder->type}"); } - if ($folder->fullname == 'INBOX') { + if ($folder->targetname == 'INBOX') { // INBOX always exists return; } - if (!$this->imap->createFolder($folder->fullname)) { + if (!$this->imap->createFolder($folder->targetname)) { \Log::warning("Failed to create the folder: {$this->imap->error}"); if (str_contains($this->imap->error, "Mailbox already exists")) { // Not an error } else { - throw new \Exception("Failed to create an IMAP folder {$folder->fullname}"); + throw new \Exception("Failed to create an IMAP folder {$folder->targetname}"); } } // TODO: Migrate folder subscription state } /** * Create an item in a folder. * * @param Item $item Item to import * * @throws \Exception */ public function createItem(Item $item): void { - $mailbox = $item->folder->fullname; + $mailbox = $item->folder->targetname; if (strlen($item->content)) { $result = $this->imap->append( $mailbox, $item->content, $item->data['flags'], $item->data['internaldate'], true ); if ($result === false) { throw new \Exception("Failed to append IMAP message into {$mailbox}"); } } elseif ($item->filename) { $result = $this->imap->appendFromFile( $mailbox, $item->filename, null, $item->data['flags'], $item->data['internaldate'], true ); if ($result === false) { throw new \Exception("Failed to append IMAP message into {$mailbox}"); } } // When updating an existing email message we have to... if ($item->existing) { if (!empty($result)) { // Remove the old one $this->imap->flag($mailbox, $item->existing['uid'], 'DELETED'); $this->imap->expunge($mailbox, $item->existing['uid']); } else { // Update flags foreach ($item->existing['flags'] as $flag) { if (!in_array($flag, $item->data['flags'])) { $this->imap->unflag($mailbox, $item->existing['uid'], $flag); } } foreach ($item->data['flags'] as $flag) { if (!in_array($flag, $item->existing['flags'])) { $this->imap->flag($mailbox, $item->existing['uid'], $flag); } } } } } /** * Fetching an item */ public function fetchItem(Item $item): void { [$uid, $messageId] = explode(':', $item->id, 2); $mailbox = $item->folder->fullname; // Get message flags $header = $this->imap->fetchHeader($mailbox, (int) $uid, true, false, ['FLAGS']); if ($header === false) { throw new \Exception("Failed to get IMAP message headers for {$mailbox}/{$uid}"); } // Remove flags that we can't append (e.g. RECENT) $flags = $this->filterImapFlags(array_keys($header->flags)); // If message already exists in the destination account we should update only flags // and be done with it. On the other hand for Drafts it's not unusual to get completely // different body for the same Message-ID. Same can happen not only in Drafts, I suppose. // So, we compare size and INTERNALDATE timestamp. if ( !$item->existing || $header->timestamp != $item->existing['timestamp'] || $header->size != $item->existing['size'] ) { // Handle message content in memory (up to 20MB), bigger messages will use a temp file if ($header->size > Engine::MAX_ITEM_SIZE) { // Save the message content to a file $location = $item->folder->tempFileLocation($uid . '.eml'); $fp = fopen($location, 'w'); if (!$fp) { throw new \Exception("Failed to open 'php://temp' stream"); } $result = $this->imap->handlePartBody($mailbox, $uid, true, '', null, null, $fp); } else { $result = $this->imap->handlePartBody($mailbox, $uid, true); } if ($result === false) { if (!empty($fp)) { fclose($fp); } throw new \Exception("Failed to fetch IMAP message for {$mailbox}/{$uid}"); } if (!empty($fp) && !empty($location)) { $item->filename = $location; fclose($fp); } else { $item->content = $result; } } $item->data = [ 'flags' => $flags, 'internaldate' => $header->internaldate, ]; } /** * Fetch a list of folder items */ public function fetchItemList(Folder $folder, $callback, ImporterInterface $importer): void { // Get existing messages' headers from the destination mailbox $existing = $importer->getItems($folder); $mailbox = $folder->fullname; // TODO: We should probably first use SEARCH/SORT to skip messages marked as \Deleted // It would also allow us to get headers in chunks 200 messages at a time, or so. // TODO: fetchHeaders() fetches too many headers, we should slim-down, here we need // only UID FLAGS INTERNALDATE BODY.PEEK[HEADER.FIELDS (DATE FROM MESSAGE-ID)] $messages = $this->imap->fetchHeaders($mailbox, '1:*', true, false, ['Message-Id']); if ($messages === false) { throw new \Exception("Failed to get all IMAP message headers for {$mailbox}"); } if (empty($messages)) { \Log::debug("Nothing to migrate for {$mailbox}"); return; } $set = new ItemSet(); foreach ($messages as $message) { // If Message-Id header does not exist create it based on internaldate/From/Date $id = $this->getMessageId($message, $mailbox); // Skip message that exists and did not change $exists = null; if (isset($existing[$id])) { $flags = $this->filterImapFlags(array_keys($message->flags)); if ( $flags == $existing[$id]['flags'] && $message->timestamp == $existing[$id]['timestamp'] && $message->size == $existing[$id]['size'] ) { continue; } $exists = $existing[$id]; } $set->items[] = Item::fromArray([ 'id' => $message->uid . ':' . $id, 'folder' => $folder, 'existing' => $exists, ]); if (count($set->items) == self::CHUNK_SIZE) { $callback($set); $set = new ItemSet(); } } if (count($set->items)) { $callback($set); } // TODO: Delete messages that do not exist anymore? } /** * Get folders hierarchy */ public function getFolders($types = []): array { $folders = $this->imap->listMailboxes('', ''); if ($folders === false) { throw new \Exception("Failed to get list of IMAP folders"); } // TODO: Migrate folder subscription state $result = []; foreach ($folders as $folder) { if ($this->shouldSkip($folder)) { \Log::debug("Skipping folder {$folder}."); continue; } $result[] = Folder::fromArray([ 'fullname' => $folder, 'type' => 'mail' ]); } return $result; } /** * Get a list of folder items, limited to their essential propeties * used in incremental migration to skip unchanged items. */ public function getItems(Folder $folder): array { $mailbox = $folder->fullname; // TODO: We should probably first use SEARCH/SORT to skip messages marked as \Deleted // TODO: fetchHeaders() fetches too many headers, we should slim-down, here we need // only UID FLAGS INTERNALDATE BODY.PEEK[HEADER.FIELDS (DATE FROM MESSAGE-ID)] $messages = $this->imap->fetchHeaders($mailbox, '1:*', true, false, ['Message-Id']); if ($messages === false) { throw new \Exception("Failed to get IMAP message headers in {$mailbox}"); } $result = []; foreach ($messages as $message) { // Remove flags that we can't append (e.g. RECENT) $flags = $this->filterImapFlags(array_keys($message->flags)); // Generate message ID if the header does not exist $id = $this->getMessageId($message, $mailbox); $result[$id] = [ 'uid' => $message->uid, 'flags' => $flags, 'size' => $message->size, 'timestamp' => $message->timestamp, ]; } return $result; } /** * Initialize IMAP connection and authenticate the user */ private static function initIMAP(array $config): \rcube_imap_generic { $imap = new \rcube_imap_generic(); if (\config('app.debug')) { $imap->setDebug(true, 'App\Backends\IMAP::logDebug'); } $imap->connect($config['host'], $config['user'], $config['password'], $config['options']); if (!$imap->connected()) { $message = sprintf("Login failed for %s against %s. %s", $config['user'], $config['host'], $imap->error); \Log::error($message); throw new \Exception("Connection to IMAP failed"); } return $imap; } /** * Get IMAP configuration */ private static function getConfig(Account $account): array { $uri = \parse_url($account->uri); $default_port = 143; $ssl_mode = null; if (isset($uri['scheme'])) { if (preg_match('/^(ssl|imaps)/', $uri['scheme'])) { $default_port = 993; $ssl_mode = 'ssl'; } elseif ($uri['scheme'] === 'tls') { $ssl_mode = 'tls'; } } $config = [ 'host' => $uri['host'], 'user' => $account->username, 'password' => $account->password, 'options' => [ 'port' => !empty($uri['port']) ? $uri['port'] : $default_port, 'ssl_mode' => $ssl_mode, 'socket_options' => [ 'ssl' => [ // TODO: These configuration options make sense for "local" Kolab IMAP, // but when connecting to external one we might want to just disable // cert validation, or make it optional via Account URI parameters 'verify_peer' => \config('services.imap.verify_peer'), 'verify_peer_name' => \config('services.imap.verify_peer'), 'verify_host' => \config('services.imap.verify_host') ], ], ], ]; // User impersonation. Example URI: imap://admin:password@hostname:143?user=user%40domain.tld if ($account->loginas) { $config['options']['auth_cid'] = $config['user']; $config['options']['auth_pw'] = $config['password']; $config['options']['auth_type'] = 'PLAIN'; $config['user'] = $account->loginas; } return $config; } /** * Limit IMAP flags to these that can be migrated */ private function filterImapFlags($flags) { // TODO: Support custom flags migration return array_filter( $flags, function ($flag) { return isset($this->imap->flags[$flag]); } ); } /** * Check if the folder should not be migrated */ private function shouldSkip($folder): bool { // TODO: This should probably use NAMESPACE information if (preg_match('~(Shared Folders|Other Users)/.*~', $folder)) { return true; } return false; } /** * Return Message-Id, generate unique identifier if Message-Id does not exist */ private function getMessageId($message, $folder): string { if (!empty($message->messageID)) { return $message->messageID; } return md5($folder . $message->from . ($message->date ?: $message->timestamp)); } } diff --git a/src/app/DataMigrator/Interface/Folder.php b/src/app/DataMigrator/Interface/Folder.php index 861cac00..71279a97 100644 --- a/src/app/DataMigrator/Interface/Folder.php +++ b/src/app/DataMigrator/Interface/Folder.php @@ -1,69 +1,72 @@ $value) { $obj->{$key} = $value; } return $obj; } /** * Returns location of a temp file for an Item content */ public function tempFileLocation(string $filename): string { $filename = preg_replace('/[^a-zA-Z0-9_:@.-]/', '', $filename); $location = $this->location; // TODO: What if parent folder not yet exists? if (!file_exists($location)) { mkdir($location, 0740, true); } $location .= '/' . $filename; return $location; } }