Open Data Catalog v2.0.0
Import.php
Go to the documentation of this file.
1 <?php
2 
4 
5 use Drupal\Core\Config\ConfigFactoryInterface;
6 use Drupal\Core\Logger\LoggerChannelFactoryInterface;
7 use Drupal\Core\Plugin\ContainerFactoryPluginInterface;
8 use Drupal\Core\Queue\QueueWorkerBase;
9 use Drupal\Core\Database\Connection;
10 
14 use Procrastinator\Result;
15 use Symfony\Component\DependencyInjection\ContainerInterface;
16 
29 class Import extends QueueWorkerBase implements ContainerFactoryPluginInterface {
30  use LoggerTrait;
31 
37  protected $databaseQueue;
38 
44  protected $datastore;
45 
51  protected $referenceLookup;
52 
58  protected $datastoreConfig;
59 
65  protected $fileSystem;
66 
89  public function __construct(
90  array $configuration,
91  $plugin_id,
92  $plugin_definition,
93  ConfigFactoryInterface $configFactory,
95  LoggerChannelFactoryInterface $loggerFactory,
97  Connection $defaultConnection,
98  Connection $datastoreConnection
99  ) {
100  parent::__construct($configuration, $plugin_id, $plugin_definition);
101  $this->datastore = $datastore;
102  $this->referenceLookup = $referenceLookup;
103  $this->datastoreConfig = $configFactory->get('datastore.settings');
104  $this->databaseQueue = $datastore->getQueueFactory()->get($plugin_id);
105  $this->fileSystem = $datastore->getResourceLocalizer()->getFileSystem();
106  $this->setLoggerFactory($loggerFactory, 'datastore');
107  // Set the timeout for database connections to the queue lease time.
108  // This ensures that database connections will remain open for the
109  // duration of the time the queue is being processed.
110  $timeout = (int) $plugin_definition['cron']['lease_time'];
111  $this->setConnectionTimeout($datastoreConnection, $timeout);
112  $this->setConnectionTimeout($defaultConnection, $timeout);
113  }
114 
118  public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition) {
119  return new static(
120  $configuration,
121  $plugin_id,
122  $plugin_definition,
123  $container->get('config.factory'),
124  $container->get('dkan.datastore.service'),
125  $container->get('logger.factory'),
126  $container->get('dkan.metastore.reference_lookup'),
127  $container->get('database'),
128  $container->get('dkan.datastore.database')
129  );
130  }
131 
140  protected function setConnectionTimeout(Connection $connection, int $timeout): void {
141  $command = 'SET SESSION wait_timeout = ' . $timeout;
142  $connection->query($command);
143  }
144 
148  public function processItem($data) {
149  if (is_object($data) && isset($data->data)) {
150  $data = $data->data;
151  }
152 
153  try {
154  $this->importData($data);
155  }
156  catch (\Exception $e) {
157  $this->error("Import for {$data['identifier']} returned an error: {$e->getMessage()}");
158  }
159  }
160 
167  protected function importData(array $data) {
168  $identifier = $data['identifier'];
169  $version = $data['version'];
170  $results = $this->datastore->import($identifier, FALSE, $version);
171 
172  $queued = FALSE;
173  foreach ($results as $result) {
174  $queued = isset($result) ? $this->processResult($result, $data, $queued) : FALSE;
175  }
176 
177  // Delete local resource file if enabled in datastore settings config.
178  if ($this->datastoreConfig->get('delete_local_resource')) {
179  $this->fileSystem->deleteRecursive("public://resources/{$identifier}_{$version}");
180  }
181  }
182 
196  protected function processResult(Result $result, $data, $queued = FALSE) {
197  $uid = "{$data['identifier']}__{$data['version']}";
198  $status = $result->getStatus();
199  switch ($status) {
200  case Result::STOPPED:
201  if (!$queued) {
202  $newQueueItemId = $this->requeue($data);
203  $this->notice("Import for {$uid} is requeueing. (ID:{$newQueueItemId}).");
204  $queued = TRUE;
205  }
206  break;
207 
208  case Result::IN_PROGRESS:
209  case Result::ERROR:
210  $this->error("Import for {$uid} returned an error: {$result->getError()}");
211  break;
212 
213  case Result::DONE:
214  $this->notice("Import for {$uid} completed.");
215  $this->invalidateCacheTags("{$uid}__source");
216  break;
217  }
218 
219  return $queued;
220  }
221 
228  protected function invalidateCacheTags($resourceId) {
229  $this->referenceLookup->invalidateReferencerCacheTags('distribution', $resourceId, 'downloadURL');
230  }
231 
243  protected function requeue(array $data) {
244  return $this->databaseQueue->createItem($data);
245  }
246 
247 }
Drupal\common\LoggerTrait
trait LoggerTrait
Definition: LoggerTrait.php:11
Drupal\datastore\Plugin\QueueWorker
Definition: Import.php:3
Drupal\datastore\Plugin\QueueWorker\Import\importData
importData(array $data)
Definition: Import.php:167
Drupal\datastore\Plugin\QueueWorker\Import\$fileSystem
$fileSystem
Definition: Import.php:65
Drupal\datastore\Plugin\QueueWorker\Import\$datastoreConfig
$datastoreConfig
Definition: Import.php:58
Drupal\datastore\Plugin\QueueWorker\Import\processResult
processResult(Result $result, $data, $queued=FALSE)
Definition: Import.php:196
Drupal\datastore\Plugin\QueueWorker\Import\create
static create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition)
Definition: Import.php:118
Drupal\datastore\Plugin\QueueWorker\Import\$referenceLookup
$referenceLookup
Definition: Import.php:51
Drupal\datastore\Plugin\QueueWorker\Import\$databaseQueue
$databaseQueue
Definition: Import.php:37
Drupal\datastore\Plugin\QueueWorker\Import\processItem
processItem($data)
Definition: Import.php:148
Drupal\datastore\Plugin\QueueWorker\Import\setConnectionTimeout
setConnectionTimeout(Connection $connection, int $timeout)
Definition: Import.php:140
Drupal\metastore\Reference\ReferenceLookup
Definition: ReferenceLookup.php:18
Drupal\datastore\Service
Definition: DatastoreQuery.php:3
Drupal\datastore\Plugin\QueueWorker\Import\invalidateCacheTags
invalidateCacheTags($resourceId)
Definition: Import.php:228
Drupal\common\setLoggerFactory
setLoggerFactory(LoggerChannelFactory $loggerService)
Definition: LoggerTrait.php:48
Drupal\datastore\Plugin\QueueWorker\Import\$datastore
$datastore
Definition: Import.php:44
Drupal\datastore\Plugin\QueueWorker\Import\__construct
__construct(array $configuration, $plugin_id, $plugin_definition, ConfigFactoryInterface $configFactory, DatastoreService $datastore, LoggerChannelFactoryInterface $loggerFactory, ReferenceLookup $referenceLookup, Connection $defaultConnection, Connection $datastoreConnection)
Definition: Import.php:89
Drupal\datastore\Plugin\QueueWorker\Import
Definition: Import.php:29
Drupal\datastore\Plugin\QueueWorker\Import\requeue
requeue(array $data)
Definition: Import.php:243