summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIgor Makunin <[email protected]>2022-03-21 17:02:12 +0300
committerIgor Makunin <[email protected]>2022-03-21 17:02:12 +0300
commitad30bd0d2db46b1527ccad96eed69c1da2dcd368 (patch)
treec05467c8fe9425de993068ed056e57adffe9c297
parent09c16353e13ff2477100e5a738cf94b0cc370d49 (diff)
KIKIMR-14541: validate keys at the execution unit
ref:aec4fc2c05ed7cf2dfb5b8ccabb67642bcbe3bd6
-rw-r--r--ydb/core/tx/datashard/datashard__engine_host.cpp7
-rw-r--r--ydb/core/tx/datashard/datashard__engine_host.h2
-rw-r--r--ydb/core/tx/datashard/datashard_active_transaction.cpp19
-rw-r--r--ydb/core/tx/datashard/datashard_kqp.cpp6
-rw-r--r--ydb/core/tx/datashard/datashard_kqp_compute.cpp36
-rw-r--r--ydb/core/tx/datashard/datashard_kqp_compute.h2
-rw-r--r--ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp9
7 files changed, 69 insertions, 12 deletions
diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp
index 5df538f4385..71329019d41 100644
--- a/ydb/core/tx/datashard/datashard__engine_host.cpp
+++ b/ydb/core/tx/datashard/datashard__engine_host.cpp
@@ -571,15 +571,16 @@ TEngineBay::~TEngineBay() {
}
}
-void TEngineBay::AddReadRange(const TTableId& tableId, const TVector<ui32>& columns, const TTableRange& range,
+void TEngineBay::AddReadRange(const TTableId& tableId, const TVector<NTable::TColumn>& columns, const TTableRange& range,
const TVector<NScheme::TTypeId>& keyTypes, ui64 itemsLimit, bool reverse)
{
TVector<TKeyDesc::TColumnOp> columnOps;
columnOps.reserve(columns.size());
- for (ui32 column : columns) {
+ for (auto& column : columns) {
TKeyDesc::TColumnOp op;
- op.Column = column;
+ op.Column = column.Id;
op.Operation = TKeyDesc::EColumnOperation::Read;
+ op.ExpectedType = column.PType;
columnOps.emplace_back(std::move(op));
}
diff --git a/ydb/core/tx/datashard/datashard__engine_host.h b/ydb/core/tx/datashard/datashard__engine_host.h
index 90a298cfa81..12adf35243c 100644
--- a/ydb/core/tx/datashard/datashard__engine_host.h
+++ b/ydb/core/tx/datashard/datashard__engine_host.h
@@ -60,7 +60,7 @@ public:
return Engine->ValidateKeys(Info);
}
- void AddReadRange(const TTableId& tableId, const TVector<ui32>& columns, const TTableRange& range,
+ void AddReadRange(const TTableId& tableId, const TVector<NTable::TColumn>& columns, const TTableRange& range,
const TVector<NScheme::TTypeId>& keyTypes, ui64 itemsLimit = 0, bool reverse = false);
void AddWriteRange(const TTableId& tableId, const TTableRange& range, const TVector<NScheme::TTypeId>& keyTypes);
diff --git a/ydb/core/tx/datashard/datashard_active_transaction.cpp b/ydb/core/tx/datashard/datashard_active_transaction.cpp
index 0226bd42e25..7a0ae81275c 100644
--- a/ydb/core/tx/datashard/datashard_active_transaction.cpp
+++ b/ydb/core/tx/datashard/datashard_active_transaction.cpp
@@ -225,11 +225,20 @@ bool TValidatedDataTx::ReValidateKeys()
{
using EResult = NMiniKQL::IEngineFlat::EResult;
- EResult result = EngineBay.ReValidateKeys();
- if (result != EResult::Ok) {
- ErrStr = EngineBay.GetEngine()->GetErrors();
- ErrCode = ConvertErrCode(result);
- return false;
+ if (IsKqpTx()) {
+ auto [result, error] = EngineBay.GetKqpComputeCtx().ValidateKeys(EngineBay.TxInfo());
+ if (result != EResult::Ok) {
+ ErrStr = std::move(error);
+ ErrCode = ConvertErrCode(result);
+ return false;
+ }
+ } else {
+ EResult result = EngineBay.ReValidateKeys();
+ if (result != EResult::Ok) {
+ ErrStr = EngineBay.GetEngine()->GetErrors();
+ ErrCode = ConvertErrCode(result);
+ return false;
+ }
}
return true;
diff --git a/ydb/core/tx/datashard/datashard_kqp.cpp b/ydb/core/tx/datashard/datashard_kqp.cpp
index 5a374b8ac1d..1a5542cee2e 100644
--- a/ydb/core/tx/datashard/datashard_kqp.cpp
+++ b/ydb/core/tx/datashard/datashard_kqp.cpp
@@ -281,7 +281,7 @@ namespace {
template <bool Read>
void KqpSetTxKeysImpl(ui64 tabletId, ui64 taskId, const TTableId& tableId, const TUserTable* tableInfo,
- const NKikimrTxDataShard::TKqpTransaction_TDataTaskMeta_TKeyRange& rangeKind, const TVector<ui32>& columns,
+ const NKikimrTxDataShard::TKqpTransaction_TDataTaskMeta_TKeyRange& rangeKind, const TVector<NTable::TColumn>& columns,
ui64 itemsLimit, bool reverse, const NScheme::TTypeRegistry& typeRegistry, const TActorContext& ctx,
TEngineBay& engineBay)
{
@@ -368,10 +368,10 @@ void KqpSetTxKeys(ui64 tabletId, ui64 taskId, const TUserTable* tableInfo,
tableMeta.GetSchemaVersion());
for (auto& read : meta.GetReads()) {
- TVector<ui32> columns;
+ TVector<NTable::TColumn> columns;
columns.reserve(read.GetColumns().size());
for (auto& c : read.GetColumns()) {
- columns.push_back(c.GetId());
+ columns.emplace_back(NTable::TColumn(c.GetName(), c.GetId(), c.GetType()));
}
KqpSetTxKeysImpl<true>(tabletId, taskId, tableId, tableInfo, read.GetRange(), columns, read.GetItemsLimit(),
diff --git a/ydb/core/tx/datashard/datashard_kqp_compute.cpp b/ydb/core/tx/datashard/datashard_kqp_compute.cpp
index 1cefe8f2399..fd56a57609c 100644
--- a/ydb/core/tx/datashard/datashard_kqp_compute.cpp
+++ b/ydb/core/tx/datashard/datashard_kqp_compute.cpp
@@ -285,6 +285,42 @@ bool TKqpDatashardComputeContext::PinPages(const TVector<IEngineFlat::TValidated
return ret;
}
+std::pair<IEngineFlat::EResult, TString> TKqpDatashardComputeContext::ValidateKeys(
+ const IEngineFlat::TValidationInfo& validationInfo)
+{
+ std::pair<ui64, ui64> maxSnapshotTime = {0,0}; // unused for now
+ for (auto& validKey : validationInfo.Keys) {
+ TKeyDesc * key = validKey.Key.get();
+
+ bool valid = EngineHost.IsValidKey(*key, maxSnapshotTime);
+
+ if (valid) {
+ auto curSchemaVersion = EngineHost.GetTableSchemaVersion(key->TableId);
+ if (key->TableId.SchemaVersion && curSchemaVersion && curSchemaVersion != key->TableId.SchemaVersion) {
+ auto error = TStringBuilder()
+ << "Schema version missmatch for table id: " << key->TableId
+ << " mkql compiled on: " << key->TableId.SchemaVersion
+ << " current version: " << curSchemaVersion;
+ return {IEngineFlat::EResult::SchemeChanged, std::move(error)};
+ }
+ } else {
+ switch (key->Status) {
+ case TKeyDesc::EStatus::SnapshotNotExist:
+ return {IEngineFlat::EResult::SnapshotNotExist, ""};
+ case TKeyDesc::EStatus::SnapshotNotReady:
+ key->Status = TKeyDesc::EStatus::Ok;
+ return {IEngineFlat::EResult::SnapshotNotReady, ""};
+ default:
+ auto error = TStringBuilder()
+ << "Validate (" << __LINE__ << "): Key validation status: " << (ui32)key->Status;
+ return {IEngineFlat::EResult::KeyError, std::move(error)};
+ }
+ }
+ }
+
+ return {IEngineFlat::EResult::Ok, ""};
+}
+
static void BuildRowImpl(const TDbTupleRef& dbTuple, const THolderFactory& holderFactory,
const TSmallVec<TTag>& systemColumnTags, ui64 shardId, NUdf::TUnboxedValue& result, size_t& rowSize)
{
diff --git a/ydb/core/tx/datashard/datashard_kqp_compute.h b/ydb/core/tx/datashard/datashard_kqp_compute.h
index 35346b67fa6..55a07817fc2 100644
--- a/ydb/core/tx/datashard/datashard_kqp_compute.h
+++ b/ydb/core/tx/datashard/datashard_kqp_compute.h
@@ -52,6 +52,8 @@ public:
TEngineHostCounters& GetTaskCounters(ui64 taskId) { return TaskCounters[taskId]; }
TEngineHostCounters& GetDatashardCounters() { return EngineHost.GetCounters(); }
+ std::pair<IEngineFlat::EResult, TString> ValidateKeys(const IEngineFlat::TValidationInfo& validationInfo);
+
bool IsTabletNotReady() const { return TabletNotReady; }
bool ReadRow(const TTableId& tableId, TArrayRef<const TCell> key, const TSmallVec<NTable::TTag>& columnTags,
diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
index c49ee35beec..afe08f99a7b 100644
--- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
+++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
@@ -100,6 +100,15 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio
ui64 tabletId = DataShard.TabletID();
const TValidatedDataTx::TPtr& dataTx = tx->GetDataTx();
+ if (op->IsImmediate() && !dataTx->ReValidateKeys()) {
+ // Immediate transactions may be reordered with schema changes and become invalid
+ Y_VERIFY(!dataTx->Ready());
+ op->SetAbortedFlag();
+ BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::ERROR);
+ op->Result()->SetProcessError(dataTx->Code(), dataTx->GetErrors());
+ return EExecutionStatus::Executed;
+ }
+
if (dataTx->CheckCancelled()) {
tx->ReleaseTxData(txc, ctx);
BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::CANCELLED)