feat(taskprocessing): move cleanup method to private taskprocessing manager, use it in the cleanup bg job and implement a cleanup command

Signed-off-by: Julien Veyssier <julien-nc@posteo.net>
pull/54272/head
Julien Veyssier 2 months ago
parent e6adbd921e
commit 19801f7ec4
No known key found for this signature in database
GPG Key ID: 4141FEE162030638
  1. 50
      core/Command/TaskProcessing/Cleanup.php
  2. 1
      core/register_command.php
  3. 3
      lib/private/TaskProcessing/Db/TaskMapper.php
  4. 88
      lib/private/TaskProcessing/Manager.php
  5. 85
      lib/private/TaskProcessing/RemoveOldTasksBackgroundJob.php
  6. 4
      tests/lib/TaskProcessing/TaskProcessingTest.php

@ -0,0 +1,50 @@
<?php
/**
* SPDX-FileCopyrightText: 2025 Nextcloud GmbH and Nextcloud contributors
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
namespace OC\Core\Command\TaskProcessing;
use OC\Core\Command\Base;
use OC\TaskProcessing\Manager;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
class Cleanup extends Base {
public function __construct(
protected Manager $taskProcessingManager,
) {
parent::__construct();
}
protected function configure() {
$this
->setName('taskprocessing:task:cleanup')
->setDescription('cleanup old tasks')
->addArgument(
'maxAgeSeconds',
InputArgument::OPTIONAL,
// default is not defined as an argument default value because we want to show a nice "4 months" value
'delete tasks that are older than this number of seconds, defaults to ' . Manager::MAX_TASK_AGE_SECONDS . ' (4 months)',
);
parent::configure();
}
protected function execute(InputInterface $input, OutputInterface $output): int {
$maxAgeSeconds = $input->getArgument('maxAgeSeconds') ?? Manager::MAX_TASK_AGE_SECONDS;
$output->writeln('Cleanup up tasks older than '. $maxAgeSeconds . ' seconds and the related output files');
$cleanupResult = $this->taskProcessingManager->cleanupOldTasks($maxAgeSeconds);
foreach ($cleanupResult as $entry) {
if (isset($entry['task_id'], $entry['file_id'], $entry['file_name'])) {
$output->writeln("\t - " . 'Deleted appData/core/TaskProcessing/' . $entry['file_name'] . '(fileId: ' . $entry['file_id'] . ', taskId: ' . $entry['task_id'] . ')');
} elseif (isset($entry['directory_name'])) {
$output->writeln("\t - " . 'Deleted appData/core/'. $entry['directory_name'] . '/' . $entry['file_name']);
} elseif (isset($entry['deleted_task_count'])) {
$output->writeln("\t - " . 'Deleted '. $entry['deleted_task_count'] . ' tasks from the database');
}
}
return 0;
}
}

@ -253,6 +253,7 @@ if ($config->getSystemValueBool('installed', false)) {
$application->add(Server::get(EnabledCommand::class));
$application->add(Server::get(Command\TaskProcessing\ListCommand::class));
$application->add(Server::get(Statistics::class));
$application->add(Server::get(Command\TaskProcessing\Cleanup::class));
$application->add(Server::get(RedisCommand::class));
$application->add(Server::get(DistributedClear::class));

@ -205,7 +205,8 @@ class TaskMapper extends QBMapper {
*/
public function getTasksToCleanup(int $timeout, bool $force = false): \Generator {
$qb = $this->db->getQueryBuilder();
$qb->select($this->tableName)
$qb->select(Task::$columns)
->from($this->tableName)
->where($qb->expr()->lt('last_updated', $qb->createPositionalParameter($this->timeFactory->getDateTime()->getTimestamp() - $timeout)));
if (!$force) {
$qb->andWhere($qb->expr()->eq('cleanup', $qb->createPositionalParameter(1, IQueryBuilder::PARAM_INT)));

@ -30,6 +30,7 @@ use OCP\Files\IRootFolder;
use OCP\Files\Node;
use OCP\Files\NotPermittedException;
use OCP\Files\SimpleFS\ISimpleFile;
use OCP\Files\SimpleFS\ISimpleFolder;
use OCP\Http\Client\IClientService;
use OCP\IAppConfig;
use OCP\ICache;
@ -78,6 +79,8 @@ class Manager implements IManager {
'ai.taskprocessing_provider_preferences',
];
public const MAX_TASK_AGE_SECONDS = 60 * 60 * 24 * 30 * 4; // 4 months
/** @var list<IProvider>|null */
private ?array $providers = null;
@ -1487,6 +1490,91 @@ class Manager implements IManager {
return $ids;
}
/**
* @param int $ageInSeconds
* @return \Generator
*/
public function cleanupOldTasks(int $ageInSeconds = self::MAX_TASK_AGE_SECONDS): \Generator {
try {
foreach ($this->cleanupTaskProcessingTaskFiles($ageInSeconds) as $cleanedUpEntry) {
yield $cleanedUpEntry;
}
} catch (\Exception $e) {
$this->logger->warning('Failed to delete stale task processing tasks files', ['exception' => $e]);
}
try {
$deletedTaskCount = $this->taskMapper->deleteOlderThan($ageInSeconds);
yield ['deleted_task_count' => $deletedTaskCount];
} catch (\OCP\DB\Exception $e) {
$this->logger->warning('Failed to delete stale task processing tasks', ['exception' => $e]);
}
try {
$textToImageDeletedFiles = $this->clearFilesOlderThan($this->appData->getFolder('text2image'), $ageInSeconds);
foreach ($textToImageDeletedFiles as $entry) {
yield $entry;
}
} catch (\OCP\Files\NotFoundException $e) {
// noop
}
try {
$audioToTextDeletedFiles = $this->clearFilesOlderThan($this->appData->getFolder('audio2text'), $ageInSeconds);
foreach ($audioToTextDeletedFiles as $entry) {
yield $entry;
}
} catch (\OCP\Files\NotFoundException $e) {
// noop
}
}
/**
* @param ISimpleFolder $folder
* @param int $ageInSeconds
* @return \Generator
*/
private function clearFilesOlderThan(ISimpleFolder $folder, int $ageInSeconds): \Generator {
foreach ($folder->getDirectoryListing() as $file) {
if ($file->getMTime() < time() - $ageInSeconds) {
try {
$fileName = $file->getName();
$file->delete();
yield ['directory_name' => $folder->getName(), 'file_name' => $fileName];
} catch (NotPermittedException $e) {
$this->logger->warning('Failed to delete a stale task processing file', ['exception' => $e]);
}
}
}
}
/**
* @param int $ageInSeconds
* @return \Generator
* @throws Exception
* @throws InvalidPathException
* @throws NotFoundException
* @throws \JsonException
* @throws \OCP\Files\NotFoundException
*/
private function cleanupTaskProcessingTaskFiles(int $ageInSeconds): \Generator {
foreach ($this->taskMapper->getTasksToCleanup($ageInSeconds) as $task) {
$ocpTask = $task->toPublicTask();
$fileIds = $this->extractFileIdsFromTask($ocpTask);
foreach ($fileIds as $fileId) {
// only look for output files stored in appData/TaskProcessing/
$file = $this->rootFolder->getFirstNodeByIdInPath($fileId, '/' . $this->rootFolder->getAppDataDirectoryName() . '/core/TaskProcessing/');
if ($file instanceof File) {
try {
$fileId = $file->getId();
$fileName = $file->getName();
$file->delete();
yield ['task_id' => $task->getId(), 'file_id' => $fileId, 'file_name' => $fileName];
} catch (NotPermittedException $e) {
$this->logger->warning('Failed to delete a stale task processing file', ['exception' => $e]);
}
}
}
}
}
/**
* Make a request to the task's webhookUri if necessary
*

@ -6,106 +6,25 @@
*/
namespace OC\TaskProcessing;
use OC\TaskProcessing\Db\TaskMapper;
use OCP\AppFramework\Utility\ITimeFactory;
use OCP\BackgroundJob\TimedJob;
use OCP\DB\Exception;
use OCP\Files\AppData\IAppDataFactory;
use OCP\Files\File;
use OCP\Files\InvalidPathException;
use OCP\Files\IRootFolder;
use OCP\Files\NotFoundException;
use OCP\Files\NotPermittedException;
use OCP\Files\SimpleFS\ISimpleFolder;
use OCP\TaskProcessing\IManager;
use Psr\Log\LoggerInterface;
class RemoveOldTasksBackgroundJob extends TimedJob {
public const MAX_TASK_AGE_SECONDS = 60 * 60 * 24 * 30 * 4; // 4 months
private \OCP\Files\IAppData $appData;
public function __construct(
ITimeFactory $timeFactory,
private TaskMapper $taskMapper,
private IManager $taskProcessingManager,
private IRootFolder $rootFolder,
private LoggerInterface $logger,
IAppDataFactory $appDataFactory,
private Manager $taskProcessingManager,
) {
parent::__construct($timeFactory);
$this->setInterval(60 * 60 * 24);
// can be deferred to maintenance window
$this->setTimeSensitivity(self::TIME_INSENSITIVE);
$this->appData = $appDataFactory->get('core');
}
/**
* @inheritDoc
*/
protected function run($argument): void {
try {
$this->cleanupTaskProcessingTaskFiles();
} catch (\Exception $e) {
$this->logger->warning('Failed to delete stale task processing tasks files', ['exception' => $e]);
}
try {
$this->taskMapper->deleteOlderThan(self::MAX_TASK_AGE_SECONDS);
} catch (\OCP\DB\Exception $e) {
$this->logger->warning('Failed to delete stale task processing tasks', ['exception' => $e]);
}
try {
$this->clearFilesOlderThan($this->appData->getFolder('text2image'), self::MAX_TASK_AGE_SECONDS);
} catch (NotFoundException $e) {
// noop
}
try {
$this->clearFilesOlderThan($this->appData->getFolder('audio2text'), self::MAX_TASK_AGE_SECONDS);
} catch (NotFoundException $e) {
// noop
}
}
/**
* @param ISimpleFolder $folder
* @param int $ageInSeconds
* @return void
*/
private function clearFilesOlderThan(ISimpleFolder $folder, int $ageInSeconds): void {
foreach ($folder->getDirectoryListing() as $file) {
if ($file->getMTime() < time() - $ageInSeconds) {
try {
$file->delete();
} catch (NotPermittedException $e) {
$this->logger->warning('Failed to delete a stale task processing file', ['exception' => $e]);
}
}
}
}
/**
* @return void
* @throws InvalidPathException
* @throws NotFoundException
* @throws \JsonException
* @throws Exception
* @throws \OCP\TaskProcessing\Exception\NotFoundException
*/
private function cleanupTaskProcessingTaskFiles(): void {
foreach ($this->taskMapper->getTasksToCleanup(self::MAX_TASK_AGE_SECONDS) as $task) {
$ocpTask = $task->toPublicTask();
$fileIds = $this->taskProcessingManager->extractFileIdsFromTask($ocpTask);
foreach ($fileIds as $fileId) {
// only look for output files stored in appData/TaskProcessing/
$file = $this->rootFolder->getFirstNodeByIdInPath($fileId, '/' . $this->rootFolder->getAppDataDirectoryName() . '/TaskProcessing/');
if ($file instanceof File) {
try {
$file->delete();
} catch (NotPermittedException $e) {
$this->logger->warning('Failed to delete a stale task processing file', ['exception' => $e]);
}
}
}
}
iterator_to_array($this->taskProcessingManager->cleanupOldTasks());
}
}

@ -972,11 +972,7 @@ class TaskProcessingTest extends \Test\TestCase {
// run background job
$bgJob = new RemoveOldTasksBackgroundJob(
$timeFactory,
$this->taskMapper,
$this->manager,
Server::get(IRootFolder::class),
Server::get(LoggerInterface::class),
Server::get(IAppDataFactory::class),
);
$bgJob->setArgument([]);
$bgJob->start($this->jobList);

Loading…
Cancel
Save