Open Data Catalog v2.0.0
Service.php
Go to the documentation of this file.
1 <?php
2 
3 namespace Drupal\datastore;
4 
8 use Procrastinator\Result;
9 use RootedData\RootedJsonData;
10 use Symfony\Component\DependencyInjection\ContainerInterface;
11 use Drupal\Core\DependencyInjection\ContainerInjectionInterface;
12 use Drupal\Core\Queue\QueueFactory;
16 
20 class Service implements ContainerInjectionInterface {
21 
27  private $resourceLocalizer;
28 
34  private $importServiceFactory;
35 
41  private $queue;
42 
48  private $jobStoreFactory;
49 
53  public static function create(ContainerInterface $container) {
54  return new Service(
55  $container->get('dkan.datastore.service.resource_localizer'),
56  $container->get('dkan.datastore.service.factory.import'),
57  $container->get('queue'),
58  $container->get('dkan.common.job_store')
59  );
60  }
61 
65  public function __construct(ResourceLocalizer $resourceLocalizer, Import $importServiceFactory, QueueFactory $queue, JobStoreFactory $jobStoreFactory) {
66  $this->queue = $queue;
67  $this->resourceLocalizer = $resourceLocalizer;
68  $this->importServiceFactory = $importServiceFactory;
69  $this->jobStoreFactory = $jobStoreFactory;
70  }
71 
85  public function import(string $identifier, bool $deferred = FALSE, $version = NULL): array {
86 
87  // If we passed $deferred, immediately add to the queue for later.
88  if ($deferred == TRUE) {
89  $this->queueImport($identifier, $version);
90  return [
91  'message' => "Resource {$identifier}:{$version} has been queued to be imported.",
92  ];
93  }
94 
95  $resource = NULL;
96  $result = NULL;
97  list($resource, $result) = $this->getResource($identifier, $version);
98 
99  if (!$resource) {
100  return $result;
101  }
102 
103  $result2 = $this->doImport($resource);
104 
105  return array_merge($result, $result2);
106  }
107 
111  private function doImport($resource) {
112  $importService = $this->getImportService($resource);
113  $importService->import();
114  return [$this->getLabelFromObject($importService) => $importService->getResult()];
115  }
116 
120  private function getLabelFromObject($object) {
121  return substr(strrchr(get_class($object), "\\"), 1);
122  }
123 
127  private function getResource($identifier, $version) {
128  $label = $this->getLabelFromObject($this->resourceLocalizer);
129  $resource = $this->resourceLocalizer->get($identifier, $version);
130 
131  if ($resource) {
132  $result = [
133  $label => $this->resourceLocalizer->getResult($identifier, $version),
134  ];
135  return [$resource, $result];
136  }
137 
138  // @todo we should not do this, we need a filefetcher queue worker.
139  $result = [
140  $label => $this->resourceLocalizer->localize($identifier, $version),
141  ];
142 
143  if ($result[$label]->getStatus() == Result::DONE) {
144  $resource = $this->resourceLocalizer->get($identifier, $version);
145  }
146 
147  return [$resource, $result];
148  }
149 
153  public function getImportService(Resource $resource) {
154  return $this->importServiceFactory->getInstance($resource->getUniqueIdentifier(), ['resource' => $resource]);
155  }
156 
165  public function drop(string $identifier, $version = NULL) {
166  $storage = $this->getStorage($identifier, $version);
167 
168  if ($storage) {
169  $storage->destroy();
170  }
171 
172  $this->resourceLocalizer->remove($identifier, $version);
173  }
174 
186  private function queueImport(string $identifier, string $version) {
187  // Attempt to fetch the file in a queue so as to not block user.
188  $queueId = $this->queue->get('datastore_import')
189  ->createItem(['identifier' => $identifier, 'version' => $version]);
190 
191  if ($queueId === FALSE) {
192  throw new \RuntimeException("Failed to create file fetcher queue for {$identifier}:{$version}");
193  }
194 
195  return $queueId;
196  }
197 
204  public function list() {
205  /* @var \Drupal\datastore\Service\Info\ImportInfoList $service */
206  $service = \Drupal::service('dkan.datastore.import_info_list');
207  return $service->buildList();
208  }
209 
213  public function summary($identifier) {
214  $id = NULL; $version = NULL;
215  [$id, $version] = Resource::getIdentifierAndVersion($identifier);
216  $storage = $this->getStorage($id, $version);
217 
218  if ($storage) {
219  $data = $storage->getSummary();
220  return $data;
221  }
222  throw new \Exception("no storage");
223  }
224 
238  public function getStorage(string $identifier, $version = NULL) {
239  $resource = $this->resourceLocalizer->get($identifier, $version);
240  if ($resource) {
241  $importService = $this->getImportService($resource);
242  return $importService->getStorage();
243  }
244  throw new \InvalidArgumentException("No datastore storage found for {$identifier}:{$version}.");
245  }
246 
256  public function runQuery(DatastoreQuery $datastoreQuery) {
257  $return = (object) [];
258  if ($datastoreQuery->{"$.results"} !== FALSE) {
259  $return->results = $this->runResultsQuery($datastoreQuery);
260  }
261 
262  if ($datastoreQuery->{"$.count"} !== FALSE) {
263  $return->count = $this->runCountQuery($datastoreQuery);
264  }
265 
266  if ($datastoreQuery->{"$.schema"} !== FALSE) {
267  $return->schema = $this->getSchema($datastoreQuery);
268  }
269 
270  $return->query = $datastoreQuery->{"$"};
271  return new RootedJsonData(json_encode($return));
272  }
273 
283  private function getSchema(DatastoreQuery $datastoreQuery) {
284  $storageMap = $this->getQueryStorageMap($datastoreQuery);
285  $schema = [];
286  foreach ($datastoreQuery->{"$.resources"} as $resource) {
287  $storage = $storageMap[$resource["alias"]];
288  $schema[$resource["id"]] = $storage->getSchema();
289  }
290  return $schema;
291  }
292 
302  public function getQueryStorageMap(DatastoreQuery $datastoreQuery) {
303  $storageMap = [];
304  foreach ($datastoreQuery->{"$.resources"} as $resource) {
305  list($identifier, $version) = Resource::getIdentifierAndVersion($resource["id"]);
306  $storage = $this->getStorage($identifier, $version);
307  $storageMap[$resource["alias"]] = $storage;
308  }
309  return $storageMap;
310  }
311 
321  private function runResultsQuery(DatastoreQuery $datastoreQuery) {
322  $primaryAlias = $datastoreQuery->{"$.resources[0].alias"};
323  if (!$primaryAlias) {
324  return [];
325  }
326 
327  $storageMap = $this->getQueryStorageMap($datastoreQuery);
328  $query = QueryFactory::create($datastoreQuery, $storageMap);
329 
330  $result = $storageMap[$primaryAlias]->query($query, $primaryAlias);
331 
332  if ($datastoreQuery->{"$.keys"} === FALSE) {
333  $result = array_map([$this, 'stripRowKeys'], $result);
334  }
335  return $result;
336 
337  }
338 
348  private function stripRowKeys($row) {
349  $arrayRow = (array) $row;
350  $newRow = [];
351  foreach ($arrayRow as $value) {
352  $newRow[] = $value;
353  }
354  return $newRow;
355  }
356 
363  private function runCountQuery(DatastoreQuery $datastoreQuery) {
364  $primaryAlias = $datastoreQuery->{"$.resources[0].alias"};
365  if (!$primaryAlias) {
366  return 0;
367  }
368 
369  $storageMap = $this->getQueryStorageMap($datastoreQuery);
370  $query = QueryFactory::create($datastoreQuery, $storageMap);
371 
372  unset($query->limit, $query->offset);
373  $query->count();
374  return $storageMap[$primaryAlias]->query($query, $primaryAlias)[0]->expression;
375  }
376 
384  return $this->resourceLocalizer;
385  }
386 
393  public function getQueueFactory(): QueueFactory {
394  return $this->queue;
395  }
396 
397 }
Drupal\datastore\Service\getQueryStorageMap
getQueryStorageMap(DatastoreQuery $datastoreQuery)
Definition: Service.php:302
Drupal\datastore\Service
Definition: Service.php:20
Drupal\datastore\Service\list
list()
Definition: Service.php:204
Drupal\datastore\Service\summary
summary($identifier)
Definition: Service.php:213
Drupal\common\Resource
Definition: Resource.php:28
Drupal\datastore\Service\DatastoreQuery
Definition: DatastoreQuery.php:10
Drupal\datastore\Service\ResourceLocalizer
Definition: ResourceLocalizer.php:23
Drupal\datastore
Drupal\datastore\Service\create
static create(ContainerInterface $container)
Definition: Service.php:53
Drupal\common\Storage\JobStoreFactory
Definition: JobStoreFactory.php:11
Drupal\datastore\Service\getQueueFactory
getQueueFactory()
Definition: Service.php:393
Drupal\datastore\Service\drop
drop(string $identifier, $version=NULL)
Definition: Service.php:165
Drupal\datastore\Storage\QueryFactory
Definition: QueryFactory.php:11
Drupal\datastore\Service\getStorage
getStorage(string $identifier, $version=NULL)
Definition: Service.php:238
Drupal\common\Resource\getIdentifierAndVersion
static getIdentifierAndVersion($string)
Definition: Resource.php:197
Drupal\datastore\Service\getImportService
getImportService(Resource $resource)
Definition: Service.php:153
Drupal\datastore\Service\__construct
__construct(ResourceLocalizer $resourceLocalizer, Import $importServiceFactory, QueueFactory $queue, JobStoreFactory $jobStoreFactory)
Definition: Service.php:65
Drupal\common\Resource\getUniqueIdentifier
getUniqueIdentifier()
Definition: Resource.php:153
Drupal\datastore\Service\getResourceLocalizer
getResourceLocalizer()
Definition: Service.php:383
Drupal\datastore\Service\runQuery
runQuery(DatastoreQuery $datastoreQuery)
Definition: Service.php:256
Drupal\datastore\Service\Factory\Import
Definition: Import.php:15