diff options
author | azevaykin <145343289+azevaykin@users.noreply.github.com> | 2024-01-23 16:16:26 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-23 16:16:26 +0300 |
commit | 4c2eccd0761715d4362271a6e7976e0928677f6b (patch) | |
tree | 5732fe4149f6704990a55b549aaa5edf50163886 | |
parent | 7844a7d4619059cc2e3d726c99036370f66c27c3 (diff) | |
download | ydb-4c2eccd0761715d4362271a6e7976e0928677f6b.tar.gz |
Move ReValidateKeys to TKeyValidator (#1217)
-rw-r--r-- | ydb/core/tx/datashard/datashard__engine_host.cpp | 19 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_write_operation.cpp | 17 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_write_operation.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/key_validator.cpp | 67 | ||||
-rw-r--r-- | ydb/core/tx/datashard/key_validator.h | 12 | ||||
-rw-r--r-- | ydb/core/tx/datashard/write_unit.cpp | 16 |
6 files changed, 108 insertions, 25 deletions
diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp index 21f65bb67f..deaed0d1e2 100644 --- a/ydb/core/tx/datashard/datashard__engine_host.cpp +++ b/ydb/core/tx/datashard/datashard__engine_host.cpp @@ -585,16 +585,7 @@ public: ui64 GetTableSchemaVersion(const TTableId& tableId) const override { if (TSysTables::IsSystemTable(tableId)) return 0; - const auto& userTables = Self->GetUserTables(); - auto it = userTables.find(tableId.PathId.LocalPathId); - if (it == userTables.end()) { - Y_FAIL_S("DatshardEngineHost (tablet id: " << Self->TabletID() - << " state: " << Self->GetState() - << ") unables to find given table with id: " << tableId); - return 0; - } else { - return it->second->GetTableSchemaVersion(); - } + return GetKeyValidator().GetTableSchemaVersion(tableId); } ui64 GetWriteTxId(const TTableId& tableId) const override { @@ -997,6 +988,13 @@ private: return static_cast<const TDataShardSysTables *>(Self->GetDataShardSysTables())->Get(tableId); } + TKeyValidator& GetKeyValidator() { + return EngineBay.GetKeyValidator(); + } + const TKeyValidator& GetKeyValidator() const { + return EngineBay.GetKeyValidator(); + } + TDataShard* Self; TEngineBay& EngineBay; NTable::TDatabase& DB; @@ -1023,6 +1021,7 @@ private: TEngineBay::TEngineBay(TDataShard * self, TTransactionContext& txc, const TActorContext& ctx, std::pair<ui64, ui64> stepTxId) : StepTxId(stepTxId) + , KeyValidator(*self, txc.DB) , LockTxId(0) , LockNodeId(0) { diff --git a/ydb/core/tx/datashard/datashard_write_operation.cpp b/ydb/core/tx/datashard/datashard_write_operation.cpp index 671240c8dd..1f60c13e77 100644 --- a/ydb/core/tx/datashard/datashard_write_operation.cpp +++ b/ydb/core/tx/datashard/datashard_write_operation.cpp @@ -192,18 +192,11 @@ void TValidatedWriteTx::SetTxKeys(const ::google::protobuf::RepeatedField<::NPro ui32 TValidatedWriteTx::ExtractKeys(bool allowErrors) { - using EResult = NMiniKQL::IEngineFlat::EResult; + SetTxKeys(RecordOperation().GetColumnIds()); + + bool isValid = ReValidateKeys(); + Y_ABORT_UNLESS(allowErrors || isValid, "Validation errors: %s", ErrStr.data()); - EResult result = EngineBay.Validate(); - if (allowErrors) { - if (result != EResult::Ok) { - ErrStr = EngineBay.GetEngine()->GetErrors(); - ErrCode = ConvertErrCode(result); - return 0; - } - } else { - Y_ABORT_UNLESS(result == EResult::Ok, "Engine errors: %s", EngineBay.GetEngine()->GetErrors().data()); - } return KeysCount(); } @@ -212,7 +205,7 @@ bool TValidatedWriteTx::ReValidateKeys() using EResult = NMiniKQL::IEngineFlat::EResult; - auto [result, error] = EngineBay.GetKqpComputeCtx().ValidateKeys(EngineBay.TxInfo()); + auto [result, error] = GetKeyValidator().ValidateKeys(); if (result != EResult::Ok) { ErrStr = std::move(error); ErrCode = ConvertErrCode(result); diff --git a/ydb/core/tx/datashard/datashard_write_operation.h b/ydb/core/tx/datashard/datashard_write_operation.h index 9cf371f5a8..e2db234723 100644 --- a/ydb/core/tx/datashard/datashard_write_operation.h +++ b/ydb/core/tx/datashard/datashard_write_operation.h @@ -148,7 +148,7 @@ public: } bool ParseRecord(const TDataShard::TTableInfos& tableInfos); - void SetTxKeys(const ::google::protobuf::RepeatedField<::NProtoBuf::uint32>& columnIds); + void SetTxKeys(const ::google::protobuf::RepeatedField<::NProtoBuf::uint32>& columnIds); ui32 ExtractKeys(bool allowErrors); bool ReValidateKeys(); diff --git a/ydb/core/tx/datashard/key_validator.cpp b/ydb/core/tx/datashard/key_validator.cpp index fbbf16fd6a..24efb64e74 100644 --- a/ydb/core/tx/datashard/key_validator.cpp +++ b/ydb/core/tx/datashard/key_validator.cpp @@ -1,15 +1,24 @@ #include "key_validator.h" -#include "ydb/core/base/appdata_fwd.h" +#include "datashard_impl.h" +#include "range_ops.h" #include <ydb/library/actors/core/actor.h> #include <ydb/library/actors/core/log.h> #include <ydb/library/services/services.pb.h> -#include <ydb/core/tx/datashard/range_ops.h> + + using namespace NKikimr; using namespace NKikimr::NDataShard; +TKeyValidator::TKeyValidator(const TDataShard& self, const NTable::TDatabase& db) + : Self(self) + , Db(db) +{ + +} + void TKeyValidator::AddReadRange(const TTableId& tableId, const TVector<NTable::TColumn>& columns, const TTableRange& range, const TVector<NScheme::TTypeInfo>& keyTypes, ui64 itemsLimit, bool reverse) { TVector<TKeyDesc::TColumnOp> columnOps; @@ -56,6 +65,60 @@ void TKeyValidator::AddWriteRange(const TTableId& tableId, const TTableRange& ra Info.SetLoaded(); } +bool TKeyValidator::IsValidKey(TKeyDesc& key) const { + ui64 localTableId = Self.GetLocalTableId(key.TableId); + return NMiniKQL::IsValidKey(Db.GetScheme(), localTableId, key); +} + +ui64 TKeyValidator::GetTableSchemaVersion(const TTableId& tableId) const { + if (TSysTables::IsSystemTable(tableId)) + return 0; + + const auto& userTables = Self.GetUserTables(); + auto it = userTables.find(tableId.PathId.LocalPathId); + if (it == userTables.end()) { + Y_FAIL_S("TKeyValidator (tablet id: " << Self.TabletID() << " state: " << Self.GetState() << ") unable to find given table with id: " << tableId); + return 0; + } else { + return it->second->GetTableSchemaVersion(); + } +} + +std::tuple<NMiniKQL::IEngineFlat::EResult, TString> TKeyValidator::ValidateKeys() const { + using EResult = NMiniKQL::IEngineFlat::EResult; + + for (const auto& validKey : Info.Keys) { + TKeyDesc* key = validKey.Key.get(); + + bool valid = IsValidKey(*key); + + if (valid) { + auto curSchemaVersion = GetTableSchemaVersion(key->TableId); + if (key->TableId.SchemaVersion && curSchemaVersion && curSchemaVersion != key->TableId.SchemaVersion) { + auto error = TStringBuilder() + << "Schema version mismatch for table id: " << key->TableId + << " key table version: " << key->TableId.SchemaVersion + << " current table version: " << curSchemaVersion; + return {EResult::SchemeChanged, std::move(error)}; + } + } else { + switch (key->Status) { + case TKeyDesc::EStatus::SnapshotNotExist: + return {EResult::SnapshotNotExist, ""}; + case TKeyDesc::EStatus::SnapshotNotReady: + key->Status = TKeyDesc::EStatus::Ok; + return {EResult::SnapshotNotReady, ""}; + default: + auto error = TStringBuilder() + << "Validate (" << __LINE__ << "): Key validation status: " << (ui32)key->Status; + return {EResult::KeyError, std::move(error)}; + } + } + } + + return {EResult::Ok, ""}; +} + NMiniKQL::IEngineFlat::TValidationInfo& TKeyValidator::GetInfo() { return Info; } diff --git a/ydb/core/tx/datashard/key_validator.h b/ydb/core/tx/datashard/key_validator.h index bee502fd13..b732f1ce11 100644 --- a/ydb/core/tx/datashard/key_validator.h +++ b/ydb/core/tx/datashard/key_validator.h @@ -1,5 +1,6 @@ #pragma once +#include <ydb/core/tablet_flat/flat_database.h> #include <ydb/core/engine/mkql_engine_flat.h> #include <ydb/core/scheme_types/scheme_type_registry.h> @@ -7,8 +8,11 @@ namespace NKikimr::NDataShard { +class TDataShard; + class TKeyValidator { public: + TKeyValidator(const TDataShard& self, const NTable::TDatabase& db); struct TColumnWriteMeta { NTable::TColumn Column; @@ -17,10 +21,18 @@ public: void AddReadRange(const TTableId& tableId, const TVector<NTable::TColumn>& columns, const TTableRange& range, const TVector<NScheme::TTypeInfo>& keyTypes, ui64 itemsLimit = 0, bool reverse = false); void AddWriteRange(const TTableId& tableId, const TTableRange& range, const TVector<NScheme::TTypeInfo>& keyTypes, const TVector<TColumnWriteMeta>& columns, bool isPureEraseOp); + + bool IsValidKey(TKeyDesc& key) const; + std::tuple<NMiniKQL::IEngineFlat::EResult, TString> ValidateKeys() const; + + ui64 GetTableSchemaVersion(const TTableId& tableId) const; NMiniKQL::IEngineFlat::TValidationInfo& GetInfo(); const NMiniKQL::IEngineFlat::TValidationInfo& GetInfo() const; private: + const TDataShard& Self; + const NTable::TDatabase& Db; + NMiniKQL::IEngineFlat::TValidationInfo Info; }; diff --git a/ydb/core/tx/datashard/write_unit.cpp b/ydb/core/tx/datashard/write_unit.cpp index 4c70820b28..4434420753 100644 --- a/ydb/core/tx/datashard/write_unit.cpp +++ b/ydb/core/tx/datashard/write_unit.cpp @@ -123,6 +123,22 @@ public: TDataShardLocksDb locksDb(DataShard, txc); TSetupSysLocks guardLocks(op, DataShard, &locksDb); + const TValidatedWriteTx::TPtr& writeTx = writeOp->GetWriteTx(); + + if (op->IsImmediate() && !writeOp->ReValidateKeys()) { + // Immediate transactions may be reordered with schema changes and become invalid + Y_ABORT_UNLESS(!writeTx->Ready()); + writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, writeTx->GetErrStr()); + return EExecutionStatus::Executed; + } + + if (writeTx->CheckCancelled()) { + writeOp->ReleaseTxData(txc, ctx); + writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_CANCELLED, "Tx was cancelled"); + DataShard.IncCounter(COUNTER_WRITE_CANCELLED); + return EExecutionStatus::Executed; + } + try { DoExecute(&DataShard, writeOp, txc, ctx); } catch (const TNeedGlobalTxId&) { |