diff options
author | Igor Makunin <[email protected]> | 2022-03-21 17:02:12 +0300 |
---|---|---|
committer | Igor Makunin <[email protected]> | 2022-03-21 17:02:12 +0300 |
commit | ad30bd0d2db46b1527ccad96eed69c1da2dcd368 (patch) | |
tree | c05467c8fe9425de993068ed056e57adffe9c297 | |
parent | 09c16353e13ff2477100e5a738cf94b0cc370d49 (diff) |
KIKIMR-14541: validate keys at the execution unit
ref:aec4fc2c05ed7cf2dfb5b8ccabb67642bcbe3bd6
-rw-r--r-- | ydb/core/tx/datashard/datashard__engine_host.cpp | 7 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__engine_host.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_active_transaction.cpp | 19 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_kqp.cpp | 6 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_kqp_compute.cpp | 36 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_kqp_compute.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp | 9 |
7 files changed, 69 insertions, 12 deletions
diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp index 5df538f4385..71329019d41 100644 --- a/ydb/core/tx/datashard/datashard__engine_host.cpp +++ b/ydb/core/tx/datashard/datashard__engine_host.cpp @@ -571,15 +571,16 @@ TEngineBay::~TEngineBay() { } } -void TEngineBay::AddReadRange(const TTableId& tableId, const TVector<ui32>& columns, const TTableRange& range, +void TEngineBay::AddReadRange(const TTableId& tableId, const TVector<NTable::TColumn>& columns, const TTableRange& range, const TVector<NScheme::TTypeId>& keyTypes, ui64 itemsLimit, bool reverse) { TVector<TKeyDesc::TColumnOp> columnOps; columnOps.reserve(columns.size()); - for (ui32 column : columns) { + for (auto& column : columns) { TKeyDesc::TColumnOp op; - op.Column = column; + op.Column = column.Id; op.Operation = TKeyDesc::EColumnOperation::Read; + op.ExpectedType = column.PType; columnOps.emplace_back(std::move(op)); } diff --git a/ydb/core/tx/datashard/datashard__engine_host.h b/ydb/core/tx/datashard/datashard__engine_host.h index 90a298cfa81..12adf35243c 100644 --- a/ydb/core/tx/datashard/datashard__engine_host.h +++ b/ydb/core/tx/datashard/datashard__engine_host.h @@ -60,7 +60,7 @@ public: return Engine->ValidateKeys(Info); } - void AddReadRange(const TTableId& tableId, const TVector<ui32>& columns, const TTableRange& range, + void AddReadRange(const TTableId& tableId, const TVector<NTable::TColumn>& columns, const TTableRange& range, const TVector<NScheme::TTypeId>& keyTypes, ui64 itemsLimit = 0, bool reverse = false); void AddWriteRange(const TTableId& tableId, const TTableRange& range, const TVector<NScheme::TTypeId>& keyTypes); diff --git a/ydb/core/tx/datashard/datashard_active_transaction.cpp b/ydb/core/tx/datashard/datashard_active_transaction.cpp index 0226bd42e25..7a0ae81275c 100644 --- a/ydb/core/tx/datashard/datashard_active_transaction.cpp +++ b/ydb/core/tx/datashard/datashard_active_transaction.cpp @@ -225,11 +225,20 @@ bool TValidatedDataTx::ReValidateKeys() { using EResult = NMiniKQL::IEngineFlat::EResult; - EResult result = EngineBay.ReValidateKeys(); - if (result != EResult::Ok) { - ErrStr = EngineBay.GetEngine()->GetErrors(); - ErrCode = ConvertErrCode(result); - return false; + if (IsKqpTx()) { + auto [result, error] = EngineBay.GetKqpComputeCtx().ValidateKeys(EngineBay.TxInfo()); + if (result != EResult::Ok) { + ErrStr = std::move(error); + ErrCode = ConvertErrCode(result); + return false; + } + } else { + EResult result = EngineBay.ReValidateKeys(); + if (result != EResult::Ok) { + ErrStr = EngineBay.GetEngine()->GetErrors(); + ErrCode = ConvertErrCode(result); + return false; + } } return true; diff --git a/ydb/core/tx/datashard/datashard_kqp.cpp b/ydb/core/tx/datashard/datashard_kqp.cpp index 5a374b8ac1d..1a5542cee2e 100644 --- a/ydb/core/tx/datashard/datashard_kqp.cpp +++ b/ydb/core/tx/datashard/datashard_kqp.cpp @@ -281,7 +281,7 @@ namespace { template <bool Read> void KqpSetTxKeysImpl(ui64 tabletId, ui64 taskId, const TTableId& tableId, const TUserTable* tableInfo, - const NKikimrTxDataShard::TKqpTransaction_TDataTaskMeta_TKeyRange& rangeKind, const TVector<ui32>& columns, + const NKikimrTxDataShard::TKqpTransaction_TDataTaskMeta_TKeyRange& rangeKind, const TVector<NTable::TColumn>& columns, ui64 itemsLimit, bool reverse, const NScheme::TTypeRegistry& typeRegistry, const TActorContext& ctx, TEngineBay& engineBay) { @@ -368,10 +368,10 @@ void KqpSetTxKeys(ui64 tabletId, ui64 taskId, const TUserTable* tableInfo, tableMeta.GetSchemaVersion()); for (auto& read : meta.GetReads()) { - TVector<ui32> columns; + TVector<NTable::TColumn> columns; columns.reserve(read.GetColumns().size()); for (auto& c : read.GetColumns()) { - columns.push_back(c.GetId()); + columns.emplace_back(NTable::TColumn(c.GetName(), c.GetId(), c.GetType())); } KqpSetTxKeysImpl<true>(tabletId, taskId, tableId, tableInfo, read.GetRange(), columns, read.GetItemsLimit(), diff --git a/ydb/core/tx/datashard/datashard_kqp_compute.cpp b/ydb/core/tx/datashard/datashard_kqp_compute.cpp index 1cefe8f2399..fd56a57609c 100644 --- a/ydb/core/tx/datashard/datashard_kqp_compute.cpp +++ b/ydb/core/tx/datashard/datashard_kqp_compute.cpp @@ -285,6 +285,42 @@ bool TKqpDatashardComputeContext::PinPages(const TVector<IEngineFlat::TValidated return ret; } +std::pair<IEngineFlat::EResult, TString> TKqpDatashardComputeContext::ValidateKeys( + const IEngineFlat::TValidationInfo& validationInfo) +{ + std::pair<ui64, ui64> maxSnapshotTime = {0,0}; // unused for now + for (auto& validKey : validationInfo.Keys) { + TKeyDesc * key = validKey.Key.get(); + + bool valid = EngineHost.IsValidKey(*key, maxSnapshotTime); + + if (valid) { + auto curSchemaVersion = EngineHost.GetTableSchemaVersion(key->TableId); + if (key->TableId.SchemaVersion && curSchemaVersion && curSchemaVersion != key->TableId.SchemaVersion) { + auto error = TStringBuilder() + << "Schema version missmatch for table id: " << key->TableId + << " mkql compiled on: " << key->TableId.SchemaVersion + << " current version: " << curSchemaVersion; + return {IEngineFlat::EResult::SchemeChanged, std::move(error)}; + } + } else { + switch (key->Status) { + case TKeyDesc::EStatus::SnapshotNotExist: + return {IEngineFlat::EResult::SnapshotNotExist, ""}; + case TKeyDesc::EStatus::SnapshotNotReady: + key->Status = TKeyDesc::EStatus::Ok; + return {IEngineFlat::EResult::SnapshotNotReady, ""}; + default: + auto error = TStringBuilder() + << "Validate (" << __LINE__ << "): Key validation status: " << (ui32)key->Status; + return {IEngineFlat::EResult::KeyError, std::move(error)}; + } + } + } + + return {IEngineFlat::EResult::Ok, ""}; +} + static void BuildRowImpl(const TDbTupleRef& dbTuple, const THolderFactory& holderFactory, const TSmallVec<TTag>& systemColumnTags, ui64 shardId, NUdf::TUnboxedValue& result, size_t& rowSize) { diff --git a/ydb/core/tx/datashard/datashard_kqp_compute.h b/ydb/core/tx/datashard/datashard_kqp_compute.h index 35346b67fa6..55a07817fc2 100644 --- a/ydb/core/tx/datashard/datashard_kqp_compute.h +++ b/ydb/core/tx/datashard/datashard_kqp_compute.h @@ -52,6 +52,8 @@ public: TEngineHostCounters& GetTaskCounters(ui64 taskId) { return TaskCounters[taskId]; } TEngineHostCounters& GetDatashardCounters() { return EngineHost.GetCounters(); } + std::pair<IEngineFlat::EResult, TString> ValidateKeys(const IEngineFlat::TValidationInfo& validationInfo); + bool IsTabletNotReady() const { return TabletNotReady; } bool ReadRow(const TTableId& tableId, TArrayRef<const TCell> key, const TSmallVec<NTable::TTag>& columnTags, diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp index c49ee35beec..afe08f99a7b 100644 --- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp @@ -100,6 +100,15 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio ui64 tabletId = DataShard.TabletID(); const TValidatedDataTx::TPtr& dataTx = tx->GetDataTx(); + if (op->IsImmediate() && !dataTx->ReValidateKeys()) { + // Immediate transactions may be reordered with schema changes and become invalid + Y_VERIFY(!dataTx->Ready()); + op->SetAbortedFlag(); + BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::ERROR); + op->Result()->SetProcessError(dataTx->Code(), dataTx->GetErrors()); + return EExecutionStatus::Executed; + } + if (dataTx->CheckCancelled()) { tx->ReleaseTxData(txc, ctx); BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::CANCELLED) |