aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorazevaykin <145343289+azevaykin@users.noreply.github.com>2024-01-23 16:16:26 +0300
committerGitHub <noreply@github.com>2024-01-23 16:16:26 +0300
commit4c2eccd0761715d4362271a6e7976e0928677f6b (patch)
tree5732fe4149f6704990a55b549aaa5edf50163886
parent7844a7d4619059cc2e3d726c99036370f66c27c3 (diff)
downloadydb-4c2eccd0761715d4362271a6e7976e0928677f6b.tar.gz
Move ReValidateKeys to TKeyValidator (#1217)
-rw-r--r--ydb/core/tx/datashard/datashard__engine_host.cpp19
-rw-r--r--ydb/core/tx/datashard/datashard_write_operation.cpp17
-rw-r--r--ydb/core/tx/datashard/datashard_write_operation.h2
-rw-r--r--ydb/core/tx/datashard/key_validator.cpp67
-rw-r--r--ydb/core/tx/datashard/key_validator.h12
-rw-r--r--ydb/core/tx/datashard/write_unit.cpp16
6 files changed, 108 insertions, 25 deletions
diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp
index 21f65bb67f..deaed0d1e2 100644
--- a/ydb/core/tx/datashard/datashard__engine_host.cpp
+++ b/ydb/core/tx/datashard/datashard__engine_host.cpp
@@ -585,16 +585,7 @@ public:
ui64 GetTableSchemaVersion(const TTableId& tableId) const override {
if (TSysTables::IsSystemTable(tableId))
return 0;
- const auto& userTables = Self->GetUserTables();
- auto it = userTables.find(tableId.PathId.LocalPathId);
- if (it == userTables.end()) {
- Y_FAIL_S("DatshardEngineHost (tablet id: " << Self->TabletID()
- << " state: " << Self->GetState()
- << ") unables to find given table with id: " << tableId);
- return 0;
- } else {
- return it->second->GetTableSchemaVersion();
- }
+ return GetKeyValidator().GetTableSchemaVersion(tableId);
}
ui64 GetWriteTxId(const TTableId& tableId) const override {
@@ -997,6 +988,13 @@ private:
return static_cast<const TDataShardSysTables *>(Self->GetDataShardSysTables())->Get(tableId);
}
+ TKeyValidator& GetKeyValidator() {
+ return EngineBay.GetKeyValidator();
+ }
+ const TKeyValidator& GetKeyValidator() const {
+ return EngineBay.GetKeyValidator();
+ }
+
TDataShard* Self;
TEngineBay& EngineBay;
NTable::TDatabase& DB;
@@ -1023,6 +1021,7 @@ private:
TEngineBay::TEngineBay(TDataShard * self, TTransactionContext& txc, const TActorContext& ctx,
std::pair<ui64, ui64> stepTxId)
: StepTxId(stepTxId)
+ , KeyValidator(*self, txc.DB)
, LockTxId(0)
, LockNodeId(0)
{
diff --git a/ydb/core/tx/datashard/datashard_write_operation.cpp b/ydb/core/tx/datashard/datashard_write_operation.cpp
index 671240c8dd..1f60c13e77 100644
--- a/ydb/core/tx/datashard/datashard_write_operation.cpp
+++ b/ydb/core/tx/datashard/datashard_write_operation.cpp
@@ -192,18 +192,11 @@ void TValidatedWriteTx::SetTxKeys(const ::google::protobuf::RepeatedField<::NPro
ui32 TValidatedWriteTx::ExtractKeys(bool allowErrors)
{
- using EResult = NMiniKQL::IEngineFlat::EResult;
+ SetTxKeys(RecordOperation().GetColumnIds());
+
+ bool isValid = ReValidateKeys();
+ Y_ABORT_UNLESS(allowErrors || isValid, "Validation errors: %s", ErrStr.data());
- EResult result = EngineBay.Validate();
- if (allowErrors) {
- if (result != EResult::Ok) {
- ErrStr = EngineBay.GetEngine()->GetErrors();
- ErrCode = ConvertErrCode(result);
- return 0;
- }
- } else {
- Y_ABORT_UNLESS(result == EResult::Ok, "Engine errors: %s", EngineBay.GetEngine()->GetErrors().data());
- }
return KeysCount();
}
@@ -212,7 +205,7 @@ bool TValidatedWriteTx::ReValidateKeys()
using EResult = NMiniKQL::IEngineFlat::EResult;
- auto [result, error] = EngineBay.GetKqpComputeCtx().ValidateKeys(EngineBay.TxInfo());
+ auto [result, error] = GetKeyValidator().ValidateKeys();
if (result != EResult::Ok) {
ErrStr = std::move(error);
ErrCode = ConvertErrCode(result);
diff --git a/ydb/core/tx/datashard/datashard_write_operation.h b/ydb/core/tx/datashard/datashard_write_operation.h
index 9cf371f5a8..e2db234723 100644
--- a/ydb/core/tx/datashard/datashard_write_operation.h
+++ b/ydb/core/tx/datashard/datashard_write_operation.h
@@ -148,7 +148,7 @@ public:
}
bool ParseRecord(const TDataShard::TTableInfos& tableInfos);
- void SetTxKeys(const ::google::protobuf::RepeatedField<::NProtoBuf::uint32>& columnIds);
+ void SetTxKeys(const ::google::protobuf::RepeatedField<::NProtoBuf::uint32>& columnIds);
ui32 ExtractKeys(bool allowErrors);
bool ReValidateKeys();
diff --git a/ydb/core/tx/datashard/key_validator.cpp b/ydb/core/tx/datashard/key_validator.cpp
index fbbf16fd6a..24efb64e74 100644
--- a/ydb/core/tx/datashard/key_validator.cpp
+++ b/ydb/core/tx/datashard/key_validator.cpp
@@ -1,15 +1,24 @@
#include "key_validator.h"
-#include "ydb/core/base/appdata_fwd.h"
+#include "datashard_impl.h"
+#include "range_ops.h"
#include <ydb/library/actors/core/actor.h>
#include <ydb/library/actors/core/log.h>
#include <ydb/library/services/services.pb.h>
-#include <ydb/core/tx/datashard/range_ops.h>
+
+
using namespace NKikimr;
using namespace NKikimr::NDataShard;
+TKeyValidator::TKeyValidator(const TDataShard& self, const NTable::TDatabase& db)
+ : Self(self)
+ , Db(db)
+{
+
+}
+
void TKeyValidator::AddReadRange(const TTableId& tableId, const TVector<NTable::TColumn>& columns, const TTableRange& range, const TVector<NScheme::TTypeInfo>& keyTypes, ui64 itemsLimit, bool reverse)
{
TVector<TKeyDesc::TColumnOp> columnOps;
@@ -56,6 +65,60 @@ void TKeyValidator::AddWriteRange(const TTableId& tableId, const TTableRange& ra
Info.SetLoaded();
}
+bool TKeyValidator::IsValidKey(TKeyDesc& key) const {
+ ui64 localTableId = Self.GetLocalTableId(key.TableId);
+ return NMiniKQL::IsValidKey(Db.GetScheme(), localTableId, key);
+}
+
+ui64 TKeyValidator::GetTableSchemaVersion(const TTableId& tableId) const {
+ if (TSysTables::IsSystemTable(tableId))
+ return 0;
+
+ const auto& userTables = Self.GetUserTables();
+ auto it = userTables.find(tableId.PathId.LocalPathId);
+ if (it == userTables.end()) {
+ Y_FAIL_S("TKeyValidator (tablet id: " << Self.TabletID() << " state: " << Self.GetState() << ") unable to find given table with id: " << tableId);
+ return 0;
+ } else {
+ return it->second->GetTableSchemaVersion();
+ }
+}
+
+std::tuple<NMiniKQL::IEngineFlat::EResult, TString> TKeyValidator::ValidateKeys() const {
+ using EResult = NMiniKQL::IEngineFlat::EResult;
+
+ for (const auto& validKey : Info.Keys) {
+ TKeyDesc* key = validKey.Key.get();
+
+ bool valid = IsValidKey(*key);
+
+ if (valid) {
+ auto curSchemaVersion = GetTableSchemaVersion(key->TableId);
+ if (key->TableId.SchemaVersion && curSchemaVersion && curSchemaVersion != key->TableId.SchemaVersion) {
+ auto error = TStringBuilder()
+ << "Schema version mismatch for table id: " << key->TableId
+ << " key table version: " << key->TableId.SchemaVersion
+ << " current table version: " << curSchemaVersion;
+ return {EResult::SchemeChanged, std::move(error)};
+ }
+ } else {
+ switch (key->Status) {
+ case TKeyDesc::EStatus::SnapshotNotExist:
+ return {EResult::SnapshotNotExist, ""};
+ case TKeyDesc::EStatus::SnapshotNotReady:
+ key->Status = TKeyDesc::EStatus::Ok;
+ return {EResult::SnapshotNotReady, ""};
+ default:
+ auto error = TStringBuilder()
+ << "Validate (" << __LINE__ << "): Key validation status: " << (ui32)key->Status;
+ return {EResult::KeyError, std::move(error)};
+ }
+ }
+ }
+
+ return {EResult::Ok, ""};
+}
+
NMiniKQL::IEngineFlat::TValidationInfo& TKeyValidator::GetInfo() {
return Info;
}
diff --git a/ydb/core/tx/datashard/key_validator.h b/ydb/core/tx/datashard/key_validator.h
index bee502fd13..b732f1ce11 100644
--- a/ydb/core/tx/datashard/key_validator.h
+++ b/ydb/core/tx/datashard/key_validator.h
@@ -1,5 +1,6 @@
#pragma once
+#include <ydb/core/tablet_flat/flat_database.h>
#include <ydb/core/engine/mkql_engine_flat.h>
#include <ydb/core/scheme_types/scheme_type_registry.h>
@@ -7,8 +8,11 @@
namespace NKikimr::NDataShard {
+class TDataShard;
+
class TKeyValidator {
public:
+ TKeyValidator(const TDataShard& self, const NTable::TDatabase& db);
struct TColumnWriteMeta {
NTable::TColumn Column;
@@ -17,10 +21,18 @@ public:
void AddReadRange(const TTableId& tableId, const TVector<NTable::TColumn>& columns, const TTableRange& range, const TVector<NScheme::TTypeInfo>& keyTypes, ui64 itemsLimit = 0, bool reverse = false);
void AddWriteRange(const TTableId& tableId, const TTableRange& range, const TVector<NScheme::TTypeInfo>& keyTypes, const TVector<TColumnWriteMeta>& columns, bool isPureEraseOp);
+
+ bool IsValidKey(TKeyDesc& key) const;
+ std::tuple<NMiniKQL::IEngineFlat::EResult, TString> ValidateKeys() const;
+
+ ui64 GetTableSchemaVersion(const TTableId& tableId) const;
NMiniKQL::IEngineFlat::TValidationInfo& GetInfo();
const NMiniKQL::IEngineFlat::TValidationInfo& GetInfo() const;
private:
+ const TDataShard& Self;
+ const NTable::TDatabase& Db;
+
NMiniKQL::IEngineFlat::TValidationInfo Info;
};
diff --git a/ydb/core/tx/datashard/write_unit.cpp b/ydb/core/tx/datashard/write_unit.cpp
index 4c70820b28..4434420753 100644
--- a/ydb/core/tx/datashard/write_unit.cpp
+++ b/ydb/core/tx/datashard/write_unit.cpp
@@ -123,6 +123,22 @@ public:
TDataShardLocksDb locksDb(DataShard, txc);
TSetupSysLocks guardLocks(op, DataShard, &locksDb);
+ const TValidatedWriteTx::TPtr& writeTx = writeOp->GetWriteTx();
+
+ if (op->IsImmediate() && !writeOp->ReValidateKeys()) {
+ // Immediate transactions may be reordered with schema changes and become invalid
+ Y_ABORT_UNLESS(!writeTx->Ready());
+ writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, writeTx->GetErrStr());
+ return EExecutionStatus::Executed;
+ }
+
+ if (writeTx->CheckCancelled()) {
+ writeOp->ReleaseTxData(txc, ctx);
+ writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_CANCELLED, "Tx was cancelled");
+ DataShard.IncCounter(COUNTER_WRITE_CANCELLED);
+ return EExecutionStatus::Executed;
+ }
+
try {
DoExecute(&DataShard, writeOp, txc, ctx);
} catch (const TNeedGlobalTxId&) {