From 80eb3dd0d0339bfccaeaf892bb57d8d353e56c73 Mon Sep 17 00:00:00 2001 From: Marcel Klehr Date: Tue, 14 Oct 2025 09:30:42 +0200 Subject: [PATCH] feat(TaskProcessingApiController): Add new next_batch endpoint Signed-off-by: Marcel Klehr --- .../TaskProcessingApiController.php | 123 ++++++++++++++---- 1 file changed, 100 insertions(+), 23 deletions(-) diff --git a/core/Controller/TaskProcessingApiController.php b/core/Controller/TaskProcessingApiController.php index 1759b3b017c..a241427cadc 100644 --- a/core/Controller/TaskProcessingApiController.php +++ b/core/Controller/TaskProcessingApiController.php @@ -532,29 +532,7 @@ class TaskProcessingApiController extends OCSController { #[ApiRoute(verb: 'GET', url: '/tasks_provider/next', root: '/taskprocessing')] public function getNextScheduledTask(array $providerIds, array $taskTypeIds): DataResponse { try { - $providerIdsBasedOnTaskTypesWithNull = array_unique(array_map(function ($taskTypeId) { - try { - return $this->taskProcessingManager->getPreferredProvider($taskTypeId)->getId(); - } catch (Exception) { - return null; - } - }, $taskTypeIds)); - - $providerIdsBasedOnTaskTypes = array_filter($providerIdsBasedOnTaskTypesWithNull, fn ($providerId) => $providerId !== null); - - // restrict $providerIds to providers that are configured as preferred for the passed task types - $possibleProviderIds = array_values(array_intersect($providerIdsBasedOnTaskTypes, $providerIds)); - - // restrict $taskTypeIds to task types that can actually be run by one of the now restricted providers - $possibleTaskTypeIds = array_values(array_filter($taskTypeIds, function ($taskTypeId) use ($possibleProviderIds) { - try { - $providerForTaskType = $this->taskProcessingManager->getPreferredProvider($taskTypeId)->getId(); - } catch (Exception) { - // no provider found for task type - return false; - } - return in_array($providerForTaskType, $possibleProviderIds, true); - })); + [$possibleProviderIds, $possibleTaskTypeIds] = $this->intersectTaskTypesAndProviders($taskTypeIds, $providerIds); if (count($possibleProviderIds) === 0 || count($possibleTaskTypeIds) === 0) { throw new NotFoundException(); @@ -596,6 +574,73 @@ class TaskProcessingApiController extends OCSController { } } + /** + * Returns the next n scheduled tasks for the specified set of taskTypes and providers + * + * @param list $providerIds The ids of the providers + * @param list $taskTypeIds The ids of the task types + * @param int $numberOfTasks The number of tasks to return + * @return DataResponse, has_more: bool}, array{}>|DataResponse|DataResponse + * + * 200: Tasks returned + */ + #[ExAppRequired] + #[ApiRoute(verb: 'GET', url: '/tasks_provider/next_batch', root: '/taskprocessing')] + public function getNextScheduledTaskBatch(array $providerIds, array $taskTypeIds, int $numberOfTasks = 1): DataResponse { + try { + [$possibleProviderIds, $possibleTaskTypeIds] = $this->intersectTaskTypesAndProviders($taskTypeIds, $providerIds); + + if (count($possibleProviderIds) === 0 || count($possibleTaskTypeIds) === 0) { + return new DataResponse([ + 'tasks' => [], + 'has_more' => false, + ]); + } + + /** @var list $tasks */ + $tasks = []; + $taskIdsToIgnore = []; + // Stop when $numberOfTasks is reached or the json payload is larger than 50MiB + while (count($tasks) < $numberOfTasks && strlen(json_encode($tasks)) < 50 * 1024 * 1024) { + // Until we find a task whose task type is set to be provided by the providers requested with this request + // Or no scheduled task is found anymore (given the taskIds to ignore) + try { + $task = $this->taskProcessingManager->getNextScheduledTask($possibleTaskTypeIds, $taskIdsToIgnore); + } catch (NotFoundException) { + break; + } + try { + $provider = $this->taskProcessingManager->getPreferredProvider($task->getTaskTypeId()); + if (in_array($provider->getId(), $possibleProviderIds, true)) { + if ($this->taskProcessingManager->lockTask($task)) { + $tasks[] = ['task' => $task->jsonSerialize(), 'provider' => $provider->getId()]; + continue; + } + } + } catch (Exception) { + // There is no provider set for the task type of this task + // proceed to ignore this task + } + + $taskIdsToIgnore[] = (int)$task->getId(); + } + + try { + $this->taskProcessingManager->getNextScheduledTask($possibleTaskTypeIds, $taskIdsToIgnore); + $hasMore = true; + } catch (\Throwable) { + $hasMore = false; + } + + return new DataResponse([ + 'tasks' => $tasks, + 'has_more' => $hasMore, + ]); + } catch (Exception) { + return new DataResponse(['message' => $this->l->t('Internal error')], Http::STATUS_INTERNAL_SERVER_ERROR); + } + } + /** * @param resource $data * @return int @@ -611,4 +656,36 @@ class TaskProcessingApiController extends OCSController { $file = $folder->newFile(time() . '-' . rand(1, 100000), $data); return $file->getId(); } + + /** + * @param array $taskTypeIds + * @param array $providerIds + * @return array + */ + public function intersectTaskTypesAndProviders(array $taskTypeIds, array $providerIds): array { + $providerIdsBasedOnTaskTypesWithNull = array_unique(array_map(function ($taskTypeId) { + try { + return $this->taskProcessingManager->getPreferredProvider($taskTypeId)->getId(); + } catch (Exception) { + return null; + } + }, $taskTypeIds)); + + $providerIdsBasedOnTaskTypes = array_filter($providerIdsBasedOnTaskTypesWithNull, fn ($providerId) => $providerId !== null); + + // restrict $providerIds to providers that are configured as preferred for the passed task types + $possibleProviderIds = array_values(array_intersect($providerIdsBasedOnTaskTypes, $providerIds)); + + // restrict $taskTypeIds to task types that can actually be run by one of the now restricted providers + $possibleTaskTypeIds = array_values(array_filter($taskTypeIds, function ($taskTypeId) use ($possibleProviderIds) { + try { + $providerForTaskType = $this->taskProcessingManager->getPreferredProvider($taskTypeId)->getId(); + } catch (Exception) { + // no provider found for task type + return false; + } + return in_array($providerForTaskType, $possibleProviderIds, true); + })); + return [$possibleProviderIds, $possibleTaskTypeIds]; + } }