Open Data Catalog v
Import.php
Go to the documentation of this file.
1 <?php
2 
4 
5 use Drupal\Core\Config\ConfigFactoryInterface;
6 use Drupal\Core\Logger\LoggerChannelFactoryInterface;
7 use Drupal\Core\Plugin\ContainerFactoryPluginInterface;
8 use Drupal\Core\Queue\QueueWorkerBase;
9 
14 
15 use Procrastinator\Result;
16 use Symfony\Component\DependencyInjection\ContainerInterface;
17 
30 class Import extends QueueWorkerBase implements ContainerFactoryPluginInterface {
31  use LoggerTrait;
32 
38  protected $databaseQueue;
39 
45  protected $datastore;
46 
52  protected $referenceLookup;
53 
59  protected $datastoreConfig;
60 
66  protected $fileSystem;
67 
90  public function __construct(
91  array $configuration,
92  $plugin_id,
93  $plugin_definition,
94  ConfigFactoryInterface $configFactory,
96  LoggerChannelFactoryInterface $loggerFactory,
98  DatabaseConnectionFactoryInterface $defaultConnectionFactory,
99  DatabaseConnectionFactoryInterface $datastoreConnectionFactory
100  ) {
101  parent::__construct($configuration, $plugin_id, $plugin_definition);
102  $this->datastore = $datastore;
103  $this->referenceLookup = $referenceLookup;
104  $this->datastoreConfig = $configFactory->get('datastore.settings');
105  $this->databaseQueue = $datastore->getQueueFactory()->get($plugin_id);
106  $this->fileSystem = $datastore->getResourceLocalizer()->getFileSystem();
107  $this->setLoggerFactory($loggerFactory, 'datastore');
108  // Set the timeout for database connections to the queue lease time.
109  // This ensures that database connections will remain open for the
110  // duration of the time the queue is being processed.
111  $timeout = (int) $plugin_definition['cron']['lease_time'];
112  $defaultConnectionFactory->setConnectionTimeout($timeout);
113  $datastoreConnectionFactory->setConnectionTimeout($timeout);
114  }
115 
119  public static function create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition) {
120  return new static(
121  $configuration,
122  $plugin_id,
123  $plugin_definition,
124  $container->get('config.factory'),
125  $container->get('dkan.datastore.service'),
126  $container->get('logger.factory'),
127  $container->get('dkan.metastore.reference_lookup'),
128  $container->get('dkan.common.database_connection_factory'),
129  $container->get('dkan.datastore.database_connection_factory')
130  );
131  }
132 
136  public function processItem($data) {
137  if (is_object($data) && isset($data->data)) {
138  $data = $data->data;
139  }
140 
141  try {
142  $this->importData($data);
143  }
144  catch (\Exception $e) {
145  $this->error("Import for {$data['identifier']} returned an error: {$e->getMessage()}");
146  }
147  }
148 
155  protected function importData(array $data) {
156  $identifier = $data['identifier'];
157  $version = $data['version'];
158  $results = $this->datastore->import($identifier, FALSE, $version);
159 
160  $queued = FALSE;
161  foreach ($results as $label => $result) {
162  $queued = isset($result) ? $this->processResult($result, $data, $queued, $label) : FALSE;
163  }
164 
165  // Delete local resource file if enabled in datastore settings config.
166  if ($this->datastoreConfig->get('delete_local_resource')) {
167  $this->fileSystem->deleteRecursive("public://resources/{$identifier}_{$version}");
168  }
169  }
170 
186  protected function processResult(Result $result, $data, bool $queued = FALSE, string $label = 'Import') {
187  $uid = "{$data['identifier']}__{$data['version']}";
188  $status = $result->getStatus();
189  switch ($status) {
190  case Result::STOPPED:
191  if (!$queued) {
192  $newQueueItemId = $this->requeue($data);
193  $this->notice("$label for {$uid} is requeueing. (ID:{$newQueueItemId}).");
194  $queued = TRUE;
195  }
196  break;
197 
198  case Result::IN_PROGRESS:
199  case Result::ERROR:
200  $this->error("$label for {$uid} returned an error: {$result->getError()}");
201  break;
202 
203  case Result::DONE:
204  $this->notice("$label for {$uid} completed.");
205  $this->invalidateCacheTags("{$uid}__source");
206  break;
207  }
208 
209  return $queued;
210  }
211 
218  protected function invalidateCacheTags($resourceId) {
219  $this->referenceLookup->invalidateReferencerCacheTags('distribution', $resourceId, 'downloadURL');
220  }
221 
233  protected function requeue(array $data) {
234  return $this->databaseQueue->createItem($data);
235  }
236 
237 }
Drupal\common\LoggerTrait
trait LoggerTrait
Definition: LoggerTrait.php:11
Drupal\datastore\Plugin\QueueWorker
Definition: DictionaryEnforcer.php:3
Drupal\datastore\Plugin\QueueWorker\Import\processResult
processResult(Result $result, $data, bool $queued=FALSE, string $label='Import')
Definition: Import.php:186
Drupal\datastore\Plugin\QueueWorker\Import\importData
importData(array $data)
Definition: Import.php:155
Drupal\datastore\Plugin\QueueWorker\Import\$fileSystem
$fileSystem
Definition: Import.php:66
Drupal\datastore\Plugin\QueueWorker\Import\$datastoreConfig
$datastoreConfig
Definition: Import.php:59
Drupal\datastore\Plugin\QueueWorker\Import\create
static create(ContainerInterface $container, array $configuration, $plugin_id, $plugin_definition)
Definition: Import.php:119
Drupal\datastore\Plugin\QueueWorker\Import\$referenceLookup
$referenceLookup
Definition: Import.php:52
Drupal\datastore\Plugin\QueueWorker\Import\$databaseQueue
$databaseQueue
Definition: Import.php:38
Drupal\datastore\Plugin\QueueWorker\Import\processItem
processItem($data)
Definition: Import.php:136
Drupal\common\Storage\DatabaseConnectionFactoryInterface
Definition: DatabaseConnectionFactoryInterface.php:10
Drupal\metastore\Reference\ReferenceLookup
Definition: ReferenceLookup.php:18
Drupal\datastore\Service
Definition: DatastoreQuery.php:3
Drupal\datastore\Plugin\QueueWorker\Import\invalidateCacheTags
invalidateCacheTags($resourceId)
Definition: Import.php:218
Drupal\common\setLoggerFactory
setLoggerFactory(LoggerChannelFactory $loggerService)
Definition: LoggerTrait.php:53
Drupal\datastore\Plugin\QueueWorker\Import\$datastore
$datastore
Definition: Import.php:45
Drupal\datastore\Plugin\QueueWorker\Import\__construct
__construct(array $configuration, $plugin_id, $plugin_definition, ConfigFactoryInterface $configFactory, DatastoreService $datastore, LoggerChannelFactoryInterface $loggerFactory, ReferenceLookup $referenceLookup, DatabaseConnectionFactoryInterface $defaultConnectionFactory, DatabaseConnectionFactoryInterface $datastoreConnectionFactory)
Definition: Import.php:90
Drupal\datastore\Plugin\QueueWorker\Import
Definition: Import.php:30
Drupal\datastore\Plugin\QueueWorker\Import\requeue
requeue(array $data)
Definition: Import.php:233
Drupal\common\Storage\DatabaseConnectionFactoryInterface\setConnectionTimeout
setConnectionTimeout(int $timeout)