XLog a key rotation event rather than the result

Before this commit, we Xlogged the binary result of the _map file
content during key rotation. This led to issues:
1. Replicas would rewrite their own WAL keys with the primary's ones.
And WAL keys are different on replicas. The same would have happened
with SMGR keys since we're also planning to have them different across
replicas.
2. The crash recovery would rewrite the latest WAL key as it's being
created before redo.

This commit switches to rather Xlogging the event of rotation (to which
key should rotate) and lets redo/replicas perform the actual rotation.

Fixes PG-1468, PG-1541
pull/220/head
Andrew Pogrebnoy 5 months ago committed by Andrew Pogrebnoi
parent f3719a73b4
commit 0d86245ccd
  1. 52
      contrib/pg_tde/src/access/pg_tde_tdemap.c
  2. 55
      contrib/pg_tde/src/catalog/tde_principal_key.c
  3. 2
      contrib/pg_tde/src/include/access/pg_tde_tdemap.h
  4. 4
      contrib/pg_tde/src/include/catalog/tde_principal_key.h

@ -618,7 +618,7 @@ finalize_key_rotation(const char *path_old, const char *path_new)
* Rotate keys and generates the WAL record for it.
*/
void
pg_tde_perform_rotate_key(TDEPrincipalKey *principal_key, TDEPrincipalKey *new_principal_key)
pg_tde_perform_rotate_key(TDEPrincipalKey *principal_key, TDEPrincipalKey *new_principal_key, bool write_xlog)
{
TDESignedPrincipalKeyInfo new_signed_key_info;
off_t old_curr_pos,
@ -627,8 +627,6 @@ pg_tde_perform_rotate_key(TDEPrincipalKey *principal_key, TDEPrincipalKey *new_p
new_fd;
char old_path[MAXPGPATH],
new_path[MAXPGPATH];
XLogPrincipalKeyRotate *xlrec;
off_t xlrec_size;
pg_tde_sign_principal_key_info(&new_signed_key_info, new_principal_key);
@ -668,30 +666,38 @@ pg_tde_perform_rotate_key(TDEPrincipalKey *principal_key, TDEPrincipalKey *new_p
}
close(old_fd);
/* Build WAL record containing the new file */
xlrec_size = SizeoOfXLogPrincipalKeyRotate + new_curr_pos;
xlrec = (XLogPrincipalKeyRotate *) palloc(xlrec_size);
xlrec->databaseId = principal_key->keyInfo.databaseId;
xlrec->file_size = new_curr_pos;
if (pg_pread(new_fd, xlrec->buff, xlrec->file_size, 0) == -1)
ereport(ERROR,
errcode_for_file_access(),
errmsg("could not write WAL for key rotation: %m"));
close(new_fd);
/* Insert the XLog record */
XLogBeginInsert();
XLogRegisterData((char *) xlrec, xlrec_size);
XLogInsert(RM_TDERMGR_ID, XLOG_TDE_ROTATE_PRINCIPAL_KEY);
/*
* Do the final steps - replace the current _map with the file with new
* data
*/
finalize_key_rotation(old_path, new_path);
pfree(xlrec);
/*
* We do WAL writes past the event ("the write behind logging") rather
* than before ("the write ahead") because we need logging here only for
* replication purposes. The rotation results in data written and fsynced
* to disk. Which in most cases would happen way before it's written to
* the WAL disk file. As WAL will be flushed at the end of the
* transaction, on its commit, hence after this function returns (there is
* also a bg writer, but the commit is what is guaranteed). And it makes
* sense to replicate the event only after its effect has been
* successfully applied to the source.
*/
if (write_xlog)
{
XLogPrincipalKeyRotate xlrec;
/* Do the final steps */
finalize_key_rotation(old_path, new_path);
xlrec.databaseId = principal_key->keyInfo.databaseId;
xlrec.keyringId = principal_key->keyInfo.keyringId;
memcpy(xlrec.keyName, new_principal_key->keyInfo.name, sizeof(new_principal_key->keyInfo.name));
/* Insert the XLog record */
XLogBeginInsert();
XLogRegisterData((char *) &xlrec, sizeof(XLogPrincipalKeyRotate));
XLogInsert(RM_TDERMGR_ID, XLOG_TDE_ROTATE_PRINCIPAL_KEY);
}
}
/*

@ -349,7 +349,7 @@ set_principal_key_with_keyring(const char *key_name, const char *provider_name,
else
{
/* key rotation */
pg_tde_perform_rotate_key(curr_principal_key, new_principal_key);
pg_tde_perform_rotate_key(curr_principal_key, new_principal_key, true);
if (!TDEisInGlobalSpace(curr_principal_key->keyInfo.databaseId))
{
@ -370,12 +370,59 @@ set_principal_key_with_keyring(const char *key_name, const char *provider_name,
void
xl_tde_perform_rotate_key(XLogPrincipalKeyRotate *xlrec)
{
TDEPrincipalKey *curr_principal_key;
TDEPrincipalKey *new_principal_key;
GenericKeyring *new_keyring;
KeyInfo *keyInfo;
KeyringReturnCodes kr_ret;
LWLockAcquire(tde_lwlock_enc_keys(), LW_EXCLUSIVE);
pg_tde_write_map_keydata_file(xlrec->file_size, xlrec->buff);
clear_principal_key_cache(xlrec->databaseId);
curr_principal_key = GetPrincipalKeyNoDefault(xlrec->databaseId, LW_EXCLUSIVE);
/* Should not happen */
if (curr_principal_key == NULL)
{
ereport(ERROR, errmsg("failed to retrieve current principal key for database %u.", xlrec->databaseId));
}
new_keyring = GetKeyProviderByID(xlrec->keyringId, xlrec->databaseId);
keyInfo = KeyringGetKey(new_keyring, xlrec->keyName, &kr_ret);
if (kr_ret != KEYRING_CODE_SUCCESS && kr_ret != KEYRING_CODE_RESOURCE_NOT_AVAILABLE)
{
ereport(ERROR,
errmsg("failed to retrieve principal key from keyring provider: \"%s\"", new_keyring->provider_name),
errdetail("Error code: %d", kr_ret));
}
/* The new key should be on keyring by this time */
if (keyInfo == NULL)
{
ereport(ERROR, errmsg("failed to retrieve principal key from keyring for database %u.", xlrec->databaseId));
}
new_principal_key = palloc_object(TDEPrincipalKey);
new_principal_key->keyInfo.databaseId = xlrec->databaseId;
new_principal_key->keyInfo.keyringId = new_keyring->keyring_id;
memcpy(new_principal_key->keyInfo.name, keyInfo->name, TDE_KEY_NAME_LEN);
gettimeofday(&new_principal_key->keyInfo.creationTime, NULL);
new_principal_key->keyLength = keyInfo->data.len;
memcpy(new_principal_key->keyData, keyInfo->data.data, keyInfo->data.len);
pg_tde_perform_rotate_key(curr_principal_key, new_principal_key, false);
if (!TDEisInGlobalSpace(curr_principal_key->keyInfo.databaseId))
{
clear_principal_key_cache(curr_principal_key->keyInfo.databaseId);
push_principal_key_to_cache(new_principal_key);
}
LWLockRelease(tde_lwlock_enc_keys());
pfree(new_keyring);
pfree(new_principal_key);
}
/*
@ -997,7 +1044,7 @@ pg_tde_rotate_default_key_for_database(TDEPrincipalKey *oldKey, TDEPrincipalKey
newKey->keyInfo.databaseId = oldKey->keyInfo.databaseId;
/* key rotation */
pg_tde_perform_rotate_key(oldKey, newKey);
pg_tde_perform_rotate_key(oldKey, newKey, true);
if (!TDEisInGlobalSpace(newKey->keyInfo.databaseId))
{

@ -119,7 +119,7 @@ extern TDESignedPrincipalKeyInfo *pg_tde_get_principal_key_info(Oid dbOid);
extern bool pg_tde_verify_principal_key_info(TDESignedPrincipalKeyInfo *signed_key_info, const TDEPrincipalKey *principal_key);
extern void pg_tde_save_principal_key(const TDEPrincipalKey *principal_key, bool write_xlog);
extern void pg_tde_save_principal_key_redo(const TDESignedPrincipalKeyInfo *signed_key_info);
extern void pg_tde_perform_rotate_key(TDEPrincipalKey *principal_key, TDEPrincipalKey *new_principal_key);
extern void pg_tde_perform_rotate_key(TDEPrincipalKey *principal_key, TDEPrincipalKey *new_principal_key, bool write_xlog);
extern void pg_tde_write_map_keydata_file(off_t size, char *file_data);
const char *tde_sprint_key(InternalKey *k);

@ -37,8 +37,8 @@ typedef struct TDEPrincipalKey
typedef struct XLogPrincipalKeyRotate
{
Oid databaseId;
off_t file_size;
char buff[FLEXIBLE_ARRAY_MEMBER];
Oid keyringId;
char keyName[PRINCIPAL_KEY_NAME_LEN];
} XLogPrincipalKeyRotate;
#define SizeoOfXLogPrincipalKeyRotate offsetof(XLogPrincipalKeyRotate, buff)

Loading…
Cancel
Save