|
|
|
|
@ -55,7 +55,9 @@ use OCP\Diagnostics\IEventLogger; |
|
|
|
|
use OCP\IRequestId; |
|
|
|
|
use OCP\PreConditionNotMetException; |
|
|
|
|
use OCP\Profiler\IProfiler; |
|
|
|
|
use Psr\Clock\ClockInterface; |
|
|
|
|
use Psr\Log\LoggerInterface; |
|
|
|
|
use function in_array; |
|
|
|
|
|
|
|
|
|
class Connection extends PrimaryReadReplicaConnection { |
|
|
|
|
/** @var string */ |
|
|
|
|
@ -67,6 +69,8 @@ class Connection extends PrimaryReadReplicaConnection { |
|
|
|
|
/** @var SystemConfig */ |
|
|
|
|
private $systemConfig; |
|
|
|
|
|
|
|
|
|
private ClockInterface $clock; |
|
|
|
|
|
|
|
|
|
private LoggerInterface $logger; |
|
|
|
|
|
|
|
|
|
protected $lockedTable = null; |
|
|
|
|
@ -83,6 +87,7 @@ class Connection extends PrimaryReadReplicaConnection { |
|
|
|
|
|
|
|
|
|
protected ?float $transactionActiveSince = null; |
|
|
|
|
|
|
|
|
|
/** @var array<string, int> */ |
|
|
|
|
protected $tableDirtyWrites = []; |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
@ -110,6 +115,7 @@ class Connection extends PrimaryReadReplicaConnection { |
|
|
|
|
$this->tablePrefix = $params['tablePrefix']; |
|
|
|
|
|
|
|
|
|
$this->systemConfig = \OC::$server->getSystemConfig(); |
|
|
|
|
$this->clock = \OCP\Server::get(ClockInterface::class); |
|
|
|
|
$this->logger = \OC::$server->get(LoggerInterface::class); |
|
|
|
|
|
|
|
|
|
/** @var \OCP\Profiler\IProfiler */ |
|
|
|
|
@ -265,10 +271,19 @@ class Connection extends PrimaryReadReplicaConnection { |
|
|
|
|
*/ |
|
|
|
|
public function executeQuery(string $sql, array $params = [], $types = [], QueryCacheProfile $qcp = null): Result { |
|
|
|
|
$tables = $this->getQueriedTables($sql); |
|
|
|
|
$now = $this->clock->now()->getTimestamp(); |
|
|
|
|
$dirtyTableWrites = []; |
|
|
|
|
foreach ($tables as $table) { |
|
|
|
|
$lastAccess = $this->tableDirtyWrites[$table] ?? 0; |
|
|
|
|
// Only very recent writes are considered dirty |
|
|
|
|
if ($lastAccess >= ($now - 3)) { |
|
|
|
|
$dirtyTableWrites[] = $table; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
if ($this->isTransactionActive()) { |
|
|
|
|
// Transacted queries go to the primary. The consistency of the primary guarantees that we can not run |
|
|
|
|
// into a dirty read. |
|
|
|
|
} elseif (count(array_intersect($this->tableDirtyWrites, $tables)) === 0) { |
|
|
|
|
} elseif (count($dirtyTableWrites) === 0) { |
|
|
|
|
// No tables read that could have been written already in the same request and no transaction active |
|
|
|
|
// so we can switch back to the replica for reading as long as no writes happen that switch back to the primary |
|
|
|
|
// We cannot log here as this would log too early in the server boot process |
|
|
|
|
@ -280,7 +295,7 @@ class Connection extends PrimaryReadReplicaConnection { |
|
|
|
|
(int) ($this->systemConfig->getValue('loglevel_dirty_database_queries', null) ?? 0), |
|
|
|
|
'dirty table reads: ' . $sql, |
|
|
|
|
[ |
|
|
|
|
'tables' => $this->tableDirtyWrites, |
|
|
|
|
'tables' => array_keys($this->tableDirtyWrites), |
|
|
|
|
'reads' => $tables, |
|
|
|
|
'exception' => new \Exception('dirty table reads: ' . $sql), |
|
|
|
|
], |
|
|
|
|
@ -335,7 +350,9 @@ class Connection extends PrimaryReadReplicaConnection { |
|
|
|
|
*/ |
|
|
|
|
public function executeStatement($sql, array $params = [], array $types = []): int { |
|
|
|
|
$tables = $this->getQueriedTables($sql); |
|
|
|
|
$this->tableDirtyWrites = array_unique(array_merge($this->tableDirtyWrites, $tables)); |
|
|
|
|
foreach ($tables as $table) { |
|
|
|
|
$this->tableDirtyWrites[$table] = $this->clock->now()->getTimestamp(); |
|
|
|
|
} |
|
|
|
|
$sql = $this->replaceTablePrefix($sql); |
|
|
|
|
$sql = $this->adapter->fixupStatement($sql); |
|
|
|
|
$this->queriesExecuted++; |
|
|
|
|
|