feat(taskprocessing): add support for webhooks (http or AppAPI) in the task processing API

Signed-off-by: Julien Veyssier <julien-nc@posteo.net>
pull/46579/head
Julien Veyssier 2 years ago
parent fadef0c7ba
commit fffc784769
No known key found for this signature in database
GPG Key ID: 4141FEE162030638
  1. 10
      core/Controller/TaskProcessingApiController.php
  2. 51
      core/Migrations/Version30000Date20240717111406.php
  3. 10
      core/openapi-full.json
  4. 10
      core/openapi.json
  5. 1
      lib/composer/composer/autoload_classmap.php
  6. 1
      lib/composer/composer/autoload_static.php
  7. 16
      lib/private/TaskProcessing/Db/Task.php
  8. 79
      lib/private/TaskProcessing/Manager.php
  9. 37
      lib/public/TaskProcessing/Task.php
  10. 4
      tests/lib/TaskProcessing/TaskProcessingTest.php
  11. 2
      version.php

@ -97,7 +97,8 @@ class TaskProcessingApiController extends \OCP\AppFramework\OCSController {
* @param string $type Type of the task
* @param string $appId ID of the app that will execute the task
* @param string $customId An arbitrary identifier for the task
*
* @param string|null $webhookUri URI to be requested when the task finishes
* @param string|null $webhookMethod Method used for the webhook request (HTTP:GET, HTTP:POST, HTTP:PUT, HTTP:DELETE or AppAPI:APP_ID:GET, AppAPI:APP_ID:POST...)
* @return DataResponse<Http::STATUS_OK, array{task: CoreTaskProcessingTask}, array{}>|DataResponse<Http::STATUS_INTERNAL_SERVER_ERROR|Http::STATUS_BAD_REQUEST|Http::STATUS_PRECONDITION_FAILED|Http::STATUS_UNAUTHORIZED, array{message: string}, array{}>
*
* 200: Task scheduled successfully
@ -109,8 +110,13 @@ class TaskProcessingApiController extends \OCP\AppFramework\OCSController {
#[UserRateLimit(limit: 20, period: 120)]
#[AnonRateLimit(limit: 5, period: 120)]
#[ApiRoute(verb: 'POST', url: '/schedule', root: '/taskprocessing')]
public function schedule(array $input, string $type, string $appId, string $customId = ''): DataResponse {
public function schedule(
array $input, string $type, string $appId, string $customId = '',
?string $webhookUri = null, ?string $webhookMethod = null
): DataResponse {
$task = new Task($type, $input, $appId, $this->userId, $customId);
$task->setWebhookUri($webhookUri);
$task->setWebhookMethod($webhookMethod);
try {
$this->taskProcessingManager->scheduleTask($task);

@ -0,0 +1,51 @@
<?php
declare(strict_types=1);
/**
* SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
namespace OC\Core\Migrations;
use Closure;
use OCP\DB\ISchemaWrapper;
use OCP\DB\Types;
use OCP\Migration\IOutput;
use OCP\Migration\SimpleMigrationStep;
/**
*
*/
class Version30000Date20240717111406 extends SimpleMigrationStep {
/**
* @param IOutput $output
* @param Closure $schemaClosure The `\Closure` returns a `ISchemaWrapper`
* @param array $options
* @return null|ISchemaWrapper
*/
public function changeSchema(IOutput $output, Closure $schemaClosure, array $options): ?ISchemaWrapper {
/** @var ISchemaWrapper $schema */
$schema = $schemaClosure();
if ($schema->hasTable('taskprocessing_tasks')) {
$table = $schema->getTable('taskprocessing_tasks');
$table->addColumn('webhook_uri', Types::STRING, [
'notnull' => false,
'default' => null,
'length' => 4000,
]);
$table->addColumn('webhook_method', Types::STRING, [
'notnull' => false,
'default' => null,
'length' => 64,
]);
return $schema;
}
return null;
}
}

@ -3842,6 +3842,16 @@
"type": "string",
"default": "",
"description": "An arbitrary identifier for the task"
},
"webhookUri": {
"type": "string",
"nullable": true,
"description": "URI to be requested when the task finishes"
},
"webhookMethod": {
"type": "string",
"nullable": true,
"description": "Method used for the webhook request (HTTP:GET, HTTP:POST, HTTP:PUT, HTTP:DELETE or AppAPI:APP_ID:GET, AppAPI:APP_ID:POST...)"
}
}
}

@ -3842,6 +3842,16 @@
"type": "string",
"default": "",
"description": "An arbitrary identifier for the task"
},
"webhookUri": {
"type": "string",
"nullable": true,
"description": "URI to be requested when the task finishes"
},
"webhookMethod": {
"type": "string",
"nullable": true,
"description": "Method used for the webhook request (HTTP:GET, HTTP:POST, HTTP:PUT, HTTP:DELETE or AppAPI:APP_ID:GET, AppAPI:APP_ID:POST...)"
}
}
}

@ -1326,6 +1326,7 @@ return array(
'OC\\Core\\Migrations\\Version29000Date20240124132202' => $baseDir . '/core/Migrations/Version29000Date20240124132202.php',
'OC\\Core\\Migrations\\Version29000Date20240131122720' => $baseDir . '/core/Migrations/Version29000Date20240131122720.php',
'OC\\Core\\Migrations\\Version30000Date20240429122720' => $baseDir . '/core/Migrations/Version30000Date20240429122720.php',
'OC\\Core\\Migrations\\Version30000Date20240717111406' => $baseDir . '/core/Migrations/Version30000Date20240717111406.php',
'OC\\Core\\Notification\\CoreNotifier' => $baseDir . '/core/Notification/CoreNotifier.php',
'OC\\Core\\ResponseDefinitions' => $baseDir . '/core/ResponseDefinitions.php',
'OC\\Core\\Service\\LoginFlowV2Service' => $baseDir . '/core/Service/LoginFlowV2Service.php',

@ -1359,6 +1359,7 @@ class ComposerStaticInit749170dad3f5e7f9ca158f5a9f04f6a2
'OC\\Core\\Migrations\\Version29000Date20240124132202' => __DIR__ . '/../../..' . '/core/Migrations/Version29000Date20240124132202.php',
'OC\\Core\\Migrations\\Version29000Date20240131122720' => __DIR__ . '/../../..' . '/core/Migrations/Version29000Date20240131122720.php',
'OC\\Core\\Migrations\\Version30000Date20240429122720' => __DIR__ . '/../../..' . '/core/Migrations/Version30000Date20240429122720.php',
'OC\\Core\\Migrations\\Version30000Date20240717111406' => __DIR__ . '/../../..' . '/core/Migrations/Version30000Date20240717111406.php',
'OC\\Core\\Notification\\CoreNotifier' => __DIR__ . '/../../..' . '/core/Notification/CoreNotifier.php',
'OC\\Core\\ResponseDefinitions' => __DIR__ . '/../../..' . '/core/ResponseDefinitions.php',
'OC\\Core\\Service\\LoginFlowV2Service' => __DIR__ . '/../../..' . '/core/Service/LoginFlowV2Service.php',

@ -35,6 +35,10 @@ use OCP\TaskProcessing\Task as OCPTask;
* @method null|string getErrorMessage()
* @method setProgress(null|float $progress)
* @method null|float getProgress()
* @method setWebhookUri(string $webhookUri)
* @method string getWebhookUri()
* @method setWebhookMethod(string $webhookMethod)
* @method string getWebhookMethod()
*/
class Task extends Entity {
protected $lastUpdated;
@ -48,16 +52,18 @@ class Task extends Entity {
protected $completionExpectedAt;
protected $errorMessage;
protected $progress;
protected $webhookUri;
protected $webhookMethod;
/**
* @var string[]
*/
public static array $columns = ['id', 'last_updated', 'type', 'input', 'output', 'status', 'user_id', 'app_id', 'custom_id', 'completion_expected_at', 'error_message', 'progress'];
public static array $columns = ['id', 'last_updated', 'type', 'input', 'output', 'status', 'user_id', 'app_id', 'custom_id', 'completion_expected_at', 'error_message', 'progress', 'webhook_uri', 'webhook_method'];
/**
* @var string[]
*/
public static array $fields = ['id', 'lastUpdated', 'type', 'input', 'output', 'status', 'userId', 'appId', 'customId', 'completionExpectedAt', 'errorMessage', 'progress'];
public static array $fields = ['id', 'lastUpdated', 'type', 'input', 'output', 'status', 'userId', 'appId', 'customId', 'completionExpectedAt', 'errorMessage', 'progress', 'webhookUri', 'webhookMethod'];
public function __construct() {
@ -74,6 +80,8 @@ class Task extends Entity {
$this->addType('completionExpectedAt', 'datetime');
$this->addType('errorMessage', 'string');
$this->addType('progress', 'float');
$this->addType('webhookUri', 'string');
$this->addType('webhookMethod', 'string');
}
public function toRow(): array {
@ -97,6 +105,8 @@ class Task extends Entity {
'customId' => $task->getCustomId(),
'completionExpectedAt' => $task->getCompletionExpectedAt(),
'progress' => $task->getProgress(),
'webhookUri' => $task->getWebhookUri(),
'webhookMethod' => $task->getWebhookMethod(),
]);
return $taskEntity;
}
@ -114,6 +124,8 @@ class Task extends Entity {
$task->setCompletionExpectedAt($this->getCompletionExpectedAt());
$task->setErrorMessage($this->getErrorMessage());
$task->setProgress($this->getProgress());
$task->setWebhookUri($this->getWebhookUri());
$task->setWebhookMethod($this->getWebhookMethod());
return $task;
}
}

@ -9,9 +9,12 @@ declare(strict_types=1);
namespace OC\TaskProcessing;
use GuzzleHttp\Exception\ClientException;
use GuzzleHttp\Exception\ServerException;
use OC\AppFramework\Bootstrap\Coordinator;
use OC\Files\SimpleFS\SimpleFile;
use OC\TaskProcessing\Db\TaskMapper;
use OCP\App\IAppManager;
use OCP\AppFramework\Db\DoesNotExistException;
use OCP\AppFramework\Db\MultipleObjectsReturnedException;
use OCP\BackgroundJob\IJobList;
@ -27,6 +30,7 @@ use OCP\Files\IRootFolder;
use OCP\Files\Node;
use OCP\Files\NotPermittedException;
use OCP\Files\SimpleFS\ISimpleFile;
use OCP\Http\Client\IClientService;
use OCP\IConfig;
use OCP\IL10N;
use OCP\IServerContainer;
@ -53,6 +57,8 @@ use OCP\TaskProcessing\TaskTypes\TextToText;
use OCP\TaskProcessing\TaskTypes\TextToTextHeadline;
use OCP\TaskProcessing\TaskTypes\TextToTextSummary;
use OCP\TaskProcessing\TaskTypes\TextToTextTopics;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\NotFoundExceptionInterface;
use Psr\Log\LoggerInterface;
class Manager implements IManager {
@ -83,6 +89,8 @@ class Manager implements IManager {
private \OCP\TextToImage\IManager $textToImageManager,
private \OCP\SpeechToText\ISpeechToTextManager $speechToTextManager,
private IUserMountCache $userMountCache,
private IClientService $clientService,
private IAppManager $appManager,
) {
$this->appData = $appDataFactory->get('core');
}
@ -651,6 +659,7 @@ class Manager implements IManager {
$taskEntity = \OC\TaskProcessing\Db\Task::fromPublicTask($task);
try {
$this->taskMapper->update($taskEntity);
$this->runWebhook($task);
} catch (\OCP\DB\Exception $e) {
throw new \OCP\TaskProcessing\Exception\Exception('There was a problem finding the task', 0, $e);
}
@ -739,6 +748,7 @@ class Manager implements IManager {
$taskEntity = \OC\TaskProcessing\Db\Task::fromPublicTask($task);
try {
$this->taskMapper->update($taskEntity);
$this->runWebhook($task);
} catch (\OCP\DB\Exception $e) {
throw new \OCP\TaskProcessing\Exception\Exception('There was a problem finding the task', 0, $e);
}
@ -975,7 +985,7 @@ class Manager implements IManager {
/**
* @param mixed $fileId
* @param string $userId
* @param string|null $userId
* @return void
* @throws UnauthorizedException
*/
@ -989,4 +999,71 @@ class Manager implements IManager {
throw new UnauthorizedException('User ' . $userId . ' does not have access to file ' . $fileId);
}
}
/**
* Make a request to the task's webhookUri if necessary
*
* @param Task $task
*/
private function runWebhook(Task $task): void {
$uri = $task->getWebhookUri();
$method = $task->getWebhookMethod();
if (!$uri || !$method) {
return;
}
if (in_array($method, ['HTTP:GET', 'HTTP:POST', 'HTTP:PUT', 'HTTP:DELETE'], true)) {
$client = $this->clientService->newClient();
$httpMethod = preg_replace('/^HTTP:/', '', $method);
$options = [
'timeout' => 30,
'body' => json_encode([
'task' => $task->jsonSerialize(),
]),
'headers' => ['Content-Type' => 'application/json'],
];
try {
$client->request($httpMethod, $uri, $options);
} catch (ClientException | ServerException $e) {
$this->logger->warning('Task processing HTTP webhook failed for task ' . $task->getId() . '. Request failed', ['exception' => $e]);
} catch (\Exception | \Throwable $e) {
$this->logger->warning('Task processing HTTP webhook failed for task ' . $task->getId() . '. Unknown error', ['exception' => $e]);
}
} elseif (str_starts_with($method, 'AppAPI:') && str_starts_with($uri, '/')) {
$parsedMethod = explode(':', $method, 4);
if (count($parsedMethod) < 3) {
$this->logger->warning('Task processing AppAPI webhook failed for task ' . $task->getId() . '. Invalid method: ' . $method);
}
[, $exAppId, $httpMethod] = $parsedMethod;
if (!$this->appManager->isInstalled('app_api')) {
$this->logger->warning('Task processing AppAPI webhook failed for task ' . $task->getId() . '. AppAPI is disabled or not installed.');
return;
}
try {
$appApiFunctions = \OCP\Server::get(\OCA\AppAPI\PublicFunctions::class);
} catch (ContainerExceptionInterface|NotFoundExceptionInterface) {
$this->logger->warning('Task processing AppAPI webhook failed for task ' . $task->getId() . '. Could not get AppAPI public functions.');
return;
}
$exApp = $appApiFunctions->getExApp($exAppId);
if ($exApp === null) {
$this->logger->warning('Task processing AppAPI webhook failed for task ' . $task->getId() . '. ExApp ' . $exAppId . ' is missing.');
return;
} elseif (!$exApp['enabled']) {
$this->logger->warning('Task processing AppAPI webhook failed for task ' . $task->getId() . '. ExApp ' . $exAppId . ' is disabled.');
return;
}
$requestParams = [
'task' => $task->jsonSerialize(),
];
$requestOptions = [
'timeout' => 30,
];
$response = $appApiFunctions->exAppRequest($exAppId, $uri, $task->getUserId(), $httpMethod, $requestParams, $requestOptions);
if (is_array($response) && isset($response['error'])) {
$this->logger->warning('Task processing AppAPI webhook failed for task ' . $task->getId() . '. Error during request to ExApp(' . $exAppId . '): ', $response['error']);
}
}
}
}

@ -30,6 +30,9 @@ final class Task implements \JsonSerializable {
protected int $lastUpdated;
protected ?string $webhookUri = null;
protected ?string $webhookMethod = null;
/**
* @since 30.0.0
*/
@ -264,6 +267,40 @@ final class Task implements \JsonSerializable {
return $this->progress;
}
/**
* @return null|string
* @since 30.0.0
*/
final public function getWebhookUri(): ?string {
return $this->webhookUri;
}
/**
* @param string|null $webhookUri
* @return void
* @since 30.0.0
*/
final public function setWebhookUri(?string $webhookUri): void {
$this->webhookUri = $webhookUri;
}
/**
* @return null|string
* @since 30.0.0
*/
final public function getWebhookMethod(): ?string {
return $this->webhookMethod;
}
/**
* @param string|null $webhookMethod
* @return void
* @since 30.0.0
*/
final public function setWebhookMethod(?string $webhookMethod): void {
$this->webhookMethod = $webhookMethod;
}
/**
* @param int $status
* @return 'STATUS_CANCELLED'|'STATUS_FAILED'|'STATUS_SUCCESSFUL'|'STATUS_RUNNING'|'STATUS_SCHEDULED'|'STATUS_UNKNOWN'

@ -12,6 +12,7 @@ use OC\EventDispatcher\EventDispatcher;
use OC\TaskProcessing\Db\TaskMapper;
use OC\TaskProcessing\Manager;
use OC\TaskProcessing\RemoveOldTasksBackgroundJob;
use OCP\App\IAppManager;
use OCP\AppFramework\Utility\ITimeFactory;
use OCP\BackgroundJob\IJobList;
use OCP\EventDispatcher\IEventDispatcher;
@ -19,6 +20,7 @@ use OCP\Files\AppData\IAppDataFactory;
use OCP\Files\Config\ICachedMountInfo;
use OCP\Files\Config\IUserMountCache;
use OCP\Files\IRootFolder;
use OCP\Http\Client\IClientService;
use OCP\IConfig;
use OCP\IDBConnection;
use OCP\IServerContainer;
@ -387,6 +389,8 @@ class TaskProcessingTest extends \Test\TestCase {
$text2imageManager,
\OC::$server->get(ISpeechToTextManager::class),
$this->userMountCache,
\OC::$server->get(IClientService::class),
\OC::$server->get(IAppManager::class),
);
}

@ -9,7 +9,7 @@
// between betas, final and RCs. This is _not_ the public version number. Reset minor/patch level
// when updating major/minor version number.
$OC_Version = [30, 0, 0, 1];
$OC_Version = [30, 0, 0, 2];
// The human-readable string
$OC_VersionString = '30.0.0 dev';

Loading…
Cancel
Save