feat(TaskProcessingApiController): Add new next_batch endpoint

Signed-off-by: Marcel Klehr <mklehr@gmx.net>
pull/55735/head
Marcel Klehr 3 months ago
parent ea8ab8e192
commit 80eb3dd0d0
  1. 123
      core/Controller/TaskProcessingApiController.php

@ -532,29 +532,7 @@ class TaskProcessingApiController extends OCSController {
#[ApiRoute(verb: 'GET', url: '/tasks_provider/next', root: '/taskprocessing')]
public function getNextScheduledTask(array $providerIds, array $taskTypeIds): DataResponse {
try {
$providerIdsBasedOnTaskTypesWithNull = array_unique(array_map(function ($taskTypeId) {
try {
return $this->taskProcessingManager->getPreferredProvider($taskTypeId)->getId();
} catch (Exception) {
return null;
}
}, $taskTypeIds));
$providerIdsBasedOnTaskTypes = array_filter($providerIdsBasedOnTaskTypesWithNull, fn ($providerId) => $providerId !== null);
// restrict $providerIds to providers that are configured as preferred for the passed task types
$possibleProviderIds = array_values(array_intersect($providerIdsBasedOnTaskTypes, $providerIds));
// restrict $taskTypeIds to task types that can actually be run by one of the now restricted providers
$possibleTaskTypeIds = array_values(array_filter($taskTypeIds, function ($taskTypeId) use ($possibleProviderIds) {
try {
$providerForTaskType = $this->taskProcessingManager->getPreferredProvider($taskTypeId)->getId();
} catch (Exception) {
// no provider found for task type
return false;
}
return in_array($providerForTaskType, $possibleProviderIds, true);
}));
[$possibleProviderIds, $possibleTaskTypeIds] = $this->intersectTaskTypesAndProviders($taskTypeIds, $providerIds);
if (count($possibleProviderIds) === 0 || count($possibleTaskTypeIds) === 0) {
throw new NotFoundException();
@ -596,6 +574,73 @@ class TaskProcessingApiController extends OCSController {
}
}
/**
* Returns the next n scheduled tasks for the specified set of taskTypes and providers
*
* @param list<string> $providerIds The ids of the providers
* @param list<string> $taskTypeIds The ids of the task types
* @param int $numberOfTasks The number of tasks to return
* @return DataResponse<Http::STATUS_OK, array{tasks: list<array{task: CoreTaskProcessingTask, provider: string}>, has_more: bool}, array{}>|DataResponse<Http::STATUS_NO_CONTENT, null, array{}>|DataResponse<Http::STATUS_INTERNAL_SERVER_ERROR, array{message: string}, array{}>
*
* 200: Tasks returned
*/
#[ExAppRequired]
#[ApiRoute(verb: 'GET', url: '/tasks_provider/next_batch', root: '/taskprocessing')]
public function getNextScheduledTaskBatch(array $providerIds, array $taskTypeIds, int $numberOfTasks = 1): DataResponse {
try {
[$possibleProviderIds, $possibleTaskTypeIds] = $this->intersectTaskTypesAndProviders($taskTypeIds, $providerIds);
if (count($possibleProviderIds) === 0 || count($possibleTaskTypeIds) === 0) {
return new DataResponse([
'tasks' => [],
'has_more' => false,
]);
}
/** @var list<array{task:CoreTaskProcessingTask, provider:string}> $tasks */
$tasks = [];
$taskIdsToIgnore = [];
// Stop when $numberOfTasks is reached or the json payload is larger than 50MiB
while (count($tasks) < $numberOfTasks && strlen(json_encode($tasks)) < 50 * 1024 * 1024) {
// Until we find a task whose task type is set to be provided by the providers requested with this request
// Or no scheduled task is found anymore (given the taskIds to ignore)
try {
$task = $this->taskProcessingManager->getNextScheduledTask($possibleTaskTypeIds, $taskIdsToIgnore);
} catch (NotFoundException) {
break;
}
try {
$provider = $this->taskProcessingManager->getPreferredProvider($task->getTaskTypeId());
if (in_array($provider->getId(), $possibleProviderIds, true)) {
if ($this->taskProcessingManager->lockTask($task)) {
$tasks[] = ['task' => $task->jsonSerialize(), 'provider' => $provider->getId()];
continue;
}
}
} catch (Exception) {
// There is no provider set for the task type of this task
// proceed to ignore this task
}
$taskIdsToIgnore[] = (int)$task->getId();
}
try {
$this->taskProcessingManager->getNextScheduledTask($possibleTaskTypeIds, $taskIdsToIgnore);
$hasMore = true;
} catch (\Throwable) {
$hasMore = false;
}
return new DataResponse([
'tasks' => $tasks,
'has_more' => $hasMore,
]);
} catch (Exception) {
return new DataResponse(['message' => $this->l->t('Internal error')], Http::STATUS_INTERNAL_SERVER_ERROR);
}
}
/**
* @param resource $data
* @return int
@ -611,4 +656,36 @@ class TaskProcessingApiController extends OCSController {
$file = $folder->newFile(time() . '-' . rand(1, 100000), $data);
return $file->getId();
}
/**
* @param array $taskTypeIds
* @param array $providerIds
* @return array
*/
public function intersectTaskTypesAndProviders(array $taskTypeIds, array $providerIds): array {
$providerIdsBasedOnTaskTypesWithNull = array_unique(array_map(function ($taskTypeId) {
try {
return $this->taskProcessingManager->getPreferredProvider($taskTypeId)->getId();
} catch (Exception) {
return null;
}
}, $taskTypeIds));
$providerIdsBasedOnTaskTypes = array_filter($providerIdsBasedOnTaskTypesWithNull, fn ($providerId) => $providerId !== null);
// restrict $providerIds to providers that are configured as preferred for the passed task types
$possibleProviderIds = array_values(array_intersect($providerIdsBasedOnTaskTypes, $providerIds));
// restrict $taskTypeIds to task types that can actually be run by one of the now restricted providers
$possibleTaskTypeIds = array_values(array_filter($taskTypeIds, function ($taskTypeId) use ($possibleProviderIds) {
try {
$providerForTaskType = $this->taskProcessingManager->getPreferredProvider($taskTypeId)->getId();
} catch (Exception) {
// no provider found for task type
return false;
}
return in_array($providerForTaskType, $possibleProviderIds, true);
}));
return [$possibleProviderIds, $possibleTaskTypeIds];
}
}

Loading…
Cancel
Save