aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorazevaykin <145343289+azevaykin@users.noreply.github.com>2024-01-22 11:46:49 +0300
committerGitHub <noreply@github.com>2024-01-22 11:46:49 +0300
commit013f39aacfac22cc1982157bdb26fe1ce746fc95 (patch)
tree04832fe6eea378555d88c7fb1225194038b63ae2
parent6fe76108802fcf3f811a5aa0b941d424b298a1f1 (diff)
downloadydb-013f39aacfac22cc1982157bdb26fe1ce746fc95.tar.gz
Move first key validation logic out of engines (#1161)
-rw-r--r--ydb/core/engine/minikql/minikql_engine_host.cpp153
-rw-r--r--ydb/core/engine/minikql/minikql_engine_host.h5
-rw-r--r--ydb/core/engine/mkql_engine_flat.cpp5
-rw-r--r--ydb/core/engine/mkql_engine_flat.h4
-rw-r--r--ydb/core/engine/mkql_engine_flat_host.h2
-rw-r--r--ydb/core/tx/datashard/datashard__engine_host.cpp64
-rw-r--r--ydb/core/tx/datashard/datashard__engine_host.h28
-rw-r--r--ydb/core/tx/datashard/datashard__read_iterator.cpp4
-rw-r--r--ydb/core/tx/datashard/datashard_active_transaction.cpp2
-rw-r--r--ydb/core/tx/datashard/datashard_kqp.cpp28
-rw-r--r--ydb/core/tx/datashard/datashard_kqp_compute.cpp3
-rw-r--r--ydb/core/tx/datashard/datashard_write_operation.cpp8
-rw-r--r--ydb/core/tx/datashard/key_validator.cpp65
-rw-r--r--ydb/core/tx/datashard/key_validator.h27
-rw-r--r--ydb/core/tx/datashard/ya.make1
15 files changed, 224 insertions, 175 deletions
diff --git a/ydb/core/engine/minikql/minikql_engine_host.cpp b/ydb/core/engine/minikql/minikql_engine_host.cpp
index b00a3f8b42..bbddd3310c 100644
--- a/ydb/core/engine/minikql/minikql_engine_host.cpp
+++ b/ydb/core/engine/minikql/minikql_engine_host.cpp
@@ -36,6 +36,20 @@ void ConvertTableKeys(const TScheme& scheme, const TScheme::TTableInfo* tableInf
*keyDataBytes = bytes;
}
+void ConvertTableValues(const TScheme& scheme, const TScheme::TTableInfo* tableInfo, const TArrayRef<const IEngineFlatHost::TUpdateCommand>& commands, TSmallVec<NTable::TUpdateOp>& ops, ui64* valueBytes) {
+ ui64 bytes = 0;
+ ops.reserve(commands.size());
+ for (size_t i = 0; i < commands.size(); i++) {
+ const IEngineFlatHost::TUpdateCommand& upd = commands[i];
+ Y_ABORT_UNLESS(upd.Operation == TKeyDesc::EColumnOperation::Set);
+ auto vtypeinfo = scheme.GetColumnInfo(tableInfo, upd.Column)->PType;
+ ops.emplace_back(upd.Column, NTable::ECellOp::Set, upd.Value.IsNull() ? TRawTypeValue() : TRawTypeValue(upd.Value.Data(), upd.Value.Size(), vtypeinfo));
+ bytes += upd.Value.IsNull() ? 1 : upd.Value.Size();
+ }
+ if (valueBytes)
+ *valueBytes = bytes;
+}
+
TEngineHost::TEngineHost(NTable::TDatabase& db, TEngineHostCounters& counters, const TEngineHostSettings& settings)
: Db(db)
, Scheme(db.GetScheme())
@@ -54,72 +68,10 @@ const TScheme::TTableInfo* TEngineHost::GetTableInfo(const TTableId& tableId) co
bool TEngineHost::IsReadonly() const {
return Settings.IsReadonly;
}
-
-
-bool TEngineHost::IsValidKey(TKeyDesc& key, std::pair<ui64, ui64>& maxSnapshotTime) const {
- Y_UNUSED(maxSnapshotTime);
-
- auto* tableInfo = Scheme.GetTableInfo(LocalTableId(key.TableId));
-
-#define EH_VALIDATE(cond, err_status) \
- do { \
- if (!(cond)) { \
- key.Status = TKeyDesc::EStatus::err_status; \
- return false; \
- } \
- } while(false) \
- /**/
-
- EH_VALIDATE(tableInfo, NotExists); // Table does not exist
- EH_VALIDATE(key.KeyColumnTypes.size() <= tableInfo->KeyColumns.size(), TypeCheckFailed);
-
- // Specified keys types should be valid for any operation
- for (size_t keyIdx = 0; keyIdx < key.KeyColumnTypes.size(); keyIdx++) {
- ui32 keyCol = tableInfo->KeyColumns[keyIdx];
- auto vtype = Scheme.GetColumnInfo(tableInfo, keyCol)->PType;
- EH_VALIDATE(key.KeyColumnTypes[keyIdx] == vtype, TypeCheckFailed);
- }
-
- if (key.RowOperation == TKeyDesc::ERowOperation::Read) {
- if (key.Range.Point) {
- EH_VALIDATE(key.KeyColumnTypes.size() == tableInfo->KeyColumns.size(), TypeCheckFailed);
- } else {
- EH_VALIDATE(key.KeyColumnTypes.size() <= tableInfo->KeyColumns.size(), TypeCheckFailed);
- }
-
- for (size_t i = 0; i < key.Columns.size(); i++) {
- const TKeyDesc::TColumnOp& cop = key.Columns[i];
- if (IsSystemColumn(cop.Column)) {
- continue;
- }
- auto* cinfo = Scheme.GetColumnInfo(tableInfo, cop.Column);
- EH_VALIDATE(cinfo, TypeCheckFailed); // Unknown column
- auto vtype = cinfo->PType;
- EH_VALIDATE(cop.ExpectedType == vtype, TypeCheckFailed);
- EH_VALIDATE(cop.Operation == TKeyDesc::EColumnOperation::Read, OperationNotSupported);
- }
- } else if (key.RowOperation == TKeyDesc::ERowOperation::Update) {
- EH_VALIDATE(key.KeyColumnTypes.size() == tableInfo->KeyColumns.size(), TypeCheckFailed); // Key must be full for updates
- for (size_t i = 0; i < key.Columns.size(); i++) {
- const TKeyDesc::TColumnOp& cop = key.Columns[i];
- auto* cinfo = Scheme.GetColumnInfo(tableInfo, cop.Column);
- EH_VALIDATE(cinfo, TypeCheckFailed); // Unknown column
- auto vtype = cinfo->PType;
- EH_VALIDATE(cop.ExpectedType.GetTypeId() == 0 || cop.ExpectedType == vtype, TypeCheckFailed);
- EH_VALIDATE(cop.Operation == TKeyDesc::EColumnOperation::Set, OperationNotSupported); // TODO[serxa]: support inplace operations in IsValidKey
- }
- } else if (key.RowOperation == TKeyDesc::ERowOperation::Erase) {
- EH_VALIDATE(key.KeyColumnTypes.size() == tableInfo->KeyColumns.size(), TypeCheckFailed);
- } else {
- EH_VALIDATE(false, OperationNotSupported);
- }
-
-#undef EH_VALIDATE
-
- key.Status = TKeyDesc::EStatus::Ok;
- return true;
+bool TEngineHost::IsValidKey(TKeyDesc& key) const {
+ ui64 localTableId = LocalTableId(key.TableId);
+ return NMiniKQL::IsValidKey(Scheme, localTableId, key);
}
-
ui64 TEngineHost::CalculateReadSize(const TVector<const TKeyDesc*>& keys) const {
NTable::TSizeEnv env;
@@ -880,14 +832,7 @@ void TEngineHost::UpdateRow(const TTableId& tableId, const TArrayRef<const TCell
ui64 valueBytes = 0;
TSmallVec<NTable::TUpdateOp> ops;
- for (size_t i = 0; i < commands.size(); i++) {
- const TUpdateCommand& upd = commands[i];
- Y_ABORT_UNLESS(upd.Operation == TKeyDesc::EColumnOperation::Set); // TODO[serxa]: support inplace update in update row
- auto vtypeinfo = Scheme.GetColumnInfo(tableInfo, upd.Column)->PType;
- ops.emplace_back(upd.Column, NTable::ECellOp::Set,
- upd.Value.IsNull() ? TRawTypeValue() : TRawTypeValue(upd.Value.Data(), upd.Value.Size(), vtypeinfo));
- valueBytes += upd.Value.IsNull() ? 1 : upd.Value.Size();
- }
+ ConvertTableValues(Scheme, tableInfo, commands, ops, &valueBytes);
auto* collector = GetChangeCollector(tableId);
@@ -977,6 +922,68 @@ void TEngineHost::SetPeriodicCallback(TPeriodicCallback&& callback) {
PeriodicCallback = std::move(callback);
}
+bool IsValidKey(const TScheme& scheme, ui64 localTableId, TKeyDesc& key) {
+ auto* tableInfo = scheme.GetTableInfo(localTableId);
+ Y_ABORT_UNLESS(tableInfo);
+
+#define EH_VALIDATE(cond, err_status) \
+ do { \
+ if (!(cond)) { \
+ key.Status = TKeyDesc::EStatus::err_status; \
+ return false; \
+ } \
+ } while (false) /**/
+
+ EH_VALIDATE(tableInfo, NotExists); // Table does not exist
+ EH_VALIDATE(key.KeyColumnTypes.size() <= tableInfo->KeyColumns.size(), TypeCheckFailed);
+
+ // Specified keys types should be valid for any operation
+ for (size_t keyIdx = 0; keyIdx < key.KeyColumnTypes.size(); keyIdx++) {
+ ui32 keyCol = tableInfo->KeyColumns[keyIdx];
+ auto vtype = scheme.GetColumnInfo(tableInfo, keyCol)->PType;
+ EH_VALIDATE(key.KeyColumnTypes[keyIdx] == vtype, TypeCheckFailed);
+ }
+
+ if (key.RowOperation == TKeyDesc::ERowOperation::Read) {
+ if (key.Range.Point) {
+ EH_VALIDATE(key.KeyColumnTypes.size() == tableInfo->KeyColumns.size(), TypeCheckFailed);
+ } else {
+ EH_VALIDATE(key.KeyColumnTypes.size() <= tableInfo->KeyColumns.size(), TypeCheckFailed);
+ }
+
+ for (size_t i = 0; i < key.Columns.size(); i++) {
+ const TKeyDesc::TColumnOp& cop = key.Columns[i];
+ if (IsSystemColumn(cop.Column)) {
+ continue;
+ }
+ auto* cinfo = scheme.GetColumnInfo(tableInfo, cop.Column);
+ EH_VALIDATE(cinfo, TypeCheckFailed); // Unknown column
+ auto vtype = cinfo->PType;
+ EH_VALIDATE(cop.ExpectedType == vtype, TypeCheckFailed);
+ EH_VALIDATE(cop.Operation == TKeyDesc::EColumnOperation::Read, OperationNotSupported);
+ }
+ } else if (key.RowOperation == TKeyDesc::ERowOperation::Update) {
+ EH_VALIDATE(key.KeyColumnTypes.size() == tableInfo->KeyColumns.size(), TypeCheckFailed); // Key must be full for updates
+ for (size_t i = 0; i < key.Columns.size(); i++) {
+ const TKeyDesc::TColumnOp& cop = key.Columns[i];
+ auto* cinfo = scheme.GetColumnInfo(tableInfo, cop.Column);
+ EH_VALIDATE(cinfo, TypeCheckFailed); // Unknown column
+ auto vtype = cinfo->PType;
+ EH_VALIDATE(cop.ExpectedType.GetTypeId() == 0 || cop.ExpectedType == vtype, TypeCheckFailed);
+ EH_VALIDATE(cop.Operation == TKeyDesc::EColumnOperation::Set, OperationNotSupported); // TODO[serxa]: support inplace operations in IsValidKey
+ }
+ } else if (key.RowOperation == TKeyDesc::ERowOperation::Erase) {
+ EH_VALIDATE(key.KeyColumnTypes.size() == tableInfo->KeyColumns.size(), TypeCheckFailed);
+ } else {
+ EH_VALIDATE(false, OperationNotSupported);
+ }
+
+#undef EH_VALIDATE
+
+ key.Status = TKeyDesc::EStatus::Ok;
+ return true;
+}
+
void AnalyzeRowType(TStructLiteral* columnIds, TSmallVec<NTable::TTag>& tags, TSmallVec<NTable::TTag>& systemColumnTags) {
// Find out tags that should be read in Select*() functions
tags.reserve(columnIds->GetValuesCount());
diff --git a/ydb/core/engine/minikql/minikql_engine_host.h b/ydb/core/engine/minikql/minikql_engine_host.h
index ab769e703c..8aaf692153 100644
--- a/ydb/core/engine/minikql/minikql_engine_host.h
+++ b/ydb/core/engine/minikql/minikql_engine_host.h
@@ -98,7 +98,7 @@ public:
ui64 GetShardId() const override;
const TScheme::TTableInfo* GetTableInfo(const TTableId& tableId) const override;
bool IsReadonly() const override;
- bool IsValidKey(TKeyDesc& key, std::pair<ui64, ui64>& maxSnapshotTime) const override;
+ bool IsValidKey(TKeyDesc& key) const override;
ui64 CalculateReadSize(const TVector<const TKeyDesc*>& keys) const override;
ui64 CalculateResultSize(const TKeyDesc& key) const override;
void PinPages(const TVector<THolder<TKeyDesc>>& keys, ui64 pageFaultCount) override;
@@ -193,6 +193,7 @@ public:
}
};
+bool IsValidKey(const TEngineHost::TScheme& scheme, ui64 localTableId, TKeyDesc& key);
void AnalyzeRowType(TStructLiteral* columnIds, TSmallVec<NTable::TTag>& tags, TSmallVec<NTable::TTag>& systemColumnTags);
NUdf::TUnboxedValue GetCellValue(const TCell& cell, NScheme::TTypeInfo type);
NUdf::TUnboxedValue CreateSelectRangeLazyRowsList(NTable::TDatabase& db, const NTable::TScheme& scheme,
@@ -203,4 +204,6 @@ NUdf::TUnboxedValue CreateSelectRangeLazyRowsList(NTable::TDatabase& db, const N
void ConvertTableKeys(const NTable::TScheme& scheme, const NTable::TScheme::TTableInfo* tableInfo,
const TArrayRef<const TCell>& row, TSmallVec<TRawTypeValue>& key, ui64* keyDataBytes);
+void ConvertTableValues(const NTable::TScheme& scheme, const NTable::TScheme::TTableInfo* tableInfo,
+ const TArrayRef<const IEngineFlatHost::TUpdateCommand>& commands, TSmallVec<NTable::TUpdateOp>& ops, ui64* valueBytes);
}}
diff --git a/ydb/core/engine/mkql_engine_flat.cpp b/ydb/core/engine/mkql_engine_flat.cpp
index 6a2efed115..2f94b2f260 100644
--- a/ydb/core/engine/mkql_engine_flat.cpp
+++ b/ydb/core/engine/mkql_engine_flat.cpp
@@ -760,11 +760,10 @@ public:
EResult ValidateKeys(TValidationInfo& validationInfo) override {
EResult result = EResult::Ok;
- std::pair<ui64, ui64> maxSnapshotTime = {0,0}; // unused for now
for (auto& validKey : validationInfo.Keys) {
TKeyDesc * key = validKey.Key.get();
- bool valid = Settings.Host->IsValidKey(*key, maxSnapshotTime);
+ bool valid = Settings.Host->IsValidKey(*key);
if (valid) {
auto curSchemaVersion = Settings.Host->GetTableSchemaVersion(key->TableId);
@@ -1042,7 +1041,7 @@ public:
}
if (Y_LIKELY(result == EResult::Ok)) {
- validationInfo.Loaded = true;
+ validationInfo.SetLoaded();
}
IsProgramValidated = true;
diff --git a/ydb/core/engine/mkql_engine_flat.h b/ydb/core/engine/mkql_engine_flat.h
index d4ec11b032..7c2102feee 100644
--- a/ydb/core/engine/mkql_engine_flat.h
+++ b/ydb/core/engine/mkql_engine_flat.h
@@ -185,6 +185,10 @@ public:
HasInReadsets = false;
Loaded = false;
}
+
+ void SetLoaded() {
+ Loaded = true;
+ }
};
//-- error reporting
diff --git a/ydb/core/engine/mkql_engine_flat_host.h b/ydb/core/engine/mkql_engine_flat_host.h
index 0593fb3a9b..6a78fe6b3e 100644
--- a/ydb/core/engine/mkql_engine_flat_host.h
+++ b/ydb/core/engine/mkql_engine_flat_host.h
@@ -30,7 +30,7 @@ public:
virtual bool IsReadonly() const = 0;
// Validate key and fill status into it.
- virtual bool IsValidKey(TKeyDesc& key, std::pair<ui64, ui64>& maxSnapshotTime) const = 0;
+ virtual bool IsValidKey(TKeyDesc& key) const = 0;
// Calculate the whole size of data that needs to be read into memory
virtual ui64 CalculateReadSize(const TVector<const TKeyDesc*>& keys) const = 0;
diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp
index 89d76611a1..21f65bb67f 100644
--- a/ydb/core/tx/datashard/datashard__engine_host.cpp
+++ b/ydb/core/tx/datashard/datashard__engine_host.cpp
@@ -406,10 +406,13 @@ public:
return VolatileCommitOrdered;
}
- bool IsValidKey(TKeyDesc& key, std::pair<ui64, ui64>& maxSnapshotTime) const override {
+ bool IsValidKey(TKeyDesc& key) const override {
if (TSysTables::IsSystemTable(key.TableId))
return DataShardSysTable(key.TableId).IsValidKey(key);
+ ui64 localTableId = Self->GetLocalTableId(key.TableId);
+ Y_ABORT_UNLESS(localTableId != 0, "Unexpected IsValidKey for an unknown table");
+
if (LockTxId) {
// Prevent updates/erases with LockTxId set, unless it's allowed for immediate mvcc txs
if (key.RowOperation != TKeyDesc::ERowOperation::Read &&
@@ -426,7 +429,7 @@ public:
}
}
- return TEngineHost::IsValidKey(key, maxSnapshotTime);
+ return NMiniKQL::IsValidKey(Db.GetScheme(), localTableId, key);
}
NUdf::TUnboxedValue SelectRow(const TTableId& tableId, const TArrayRef<const TCell>& row,
@@ -1088,64 +1091,15 @@ TEngineBay::~TEngineBay() {
}
}
-void TEngineBay::AddReadRange(const TTableId& tableId, const TVector<NTable::TColumn>& columns,
- const TTableRange& range, const TVector<NScheme::TTypeInfo>& keyTypes, ui64 itemsLimit, bool reverse)
-{
- TVector<TKeyDesc::TColumnOp> columnOps;
- columnOps.reserve(columns.size());
- for (auto& column : columns) {
- TKeyDesc::TColumnOp op;
- op.Column = column.Id;
- op.Operation = TKeyDesc::EColumnOperation::Read;
- op.ExpectedType = column.PType;
- columnOps.emplace_back(std::move(op));
- }
-
- LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD,
- "-- AddReadRange: " << DebugPrintRange(keyTypes, range, *AppData()->TypeRegistry) << " table: " << tableId);
-
- auto desc = MakeHolder<TKeyDesc>(tableId, range, TKeyDesc::ERowOperation::Read, keyTypes, columnOps, itemsLimit,
- 0 /* bytesLimit */, reverse);
- Info.Keys.emplace_back(TValidatedKey(std::move(desc), /* isWrite */ false));
- // Info.Keys.back().IsResultPart = not a lock key? // TODO: KIKIMR-11134
- ++Info.ReadsCount;
- Info.Loaded = true;
-}
-
-void TEngineBay::AddWriteRange(const TTableId& tableId, const TTableRange& range,
- const TVector<NScheme::TTypeInfo>& keyTypes, const TVector<TColumnWriteMeta>& columns,
- bool isPureEraseOp)
-{
- TVector<TKeyDesc::TColumnOp> columnOps;
- for (const auto& writeColumn : columns) {
- TKeyDesc::TColumnOp op;
- op.Column = writeColumn.Column.Id;
- op.Operation = TKeyDesc::EColumnOperation::Set;
- op.ExpectedType = writeColumn.Column.PType;
- op.ImmediateUpdateSize = writeColumn.MaxValueSizeBytes;
- columnOps.emplace_back(std::move(op));
- }
-
- LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD,
- "-- AddWriteRange: " << DebugPrintRange(keyTypes, range, *AppData()->TypeRegistry) << " table: " << tableId);
-
- auto rowOp = isPureEraseOp ? TKeyDesc::ERowOperation::Erase : TKeyDesc::ERowOperation::Update;
- auto desc = MakeHolder<TKeyDesc>(tableId, range, rowOp, keyTypes, columnOps);
- Info.Keys.emplace_back(TValidatedKey(std::move(desc), /* isWrite */ true));
- ++Info.WritesCount;
- if (!range.Point) {
- ++Info.DynKeysCount;
- }
- Info.Loaded = true;
-}
-
TEngineBay::TSizes TEngineBay::CalcSizes(bool needsTotalKeysSize) const {
Y_ABORT_UNLESS(EngineHost);
+ const auto& info = KeyValidator.GetInfo();
+
TSizes outSizes;
TVector<const TKeyDesc*> readKeys;
- readKeys.reserve(Info.ReadsCount);
- for (const TValidatedKey& validKey : Info.Keys) {
+ readKeys.reserve(info.ReadsCount);
+ for (const TValidatedKey& validKey : info.Keys) {
if (validKey.IsWrite)
continue;
diff --git a/ydb/core/tx/datashard/datashard__engine_host.h b/ydb/core/tx/datashard/datashard__engine_host.h
index 660ed7ad5e..6ec942aaeb 100644
--- a/ydb/core/tx/datashard/datashard__engine_host.h
+++ b/ydb/core/tx/datashard/datashard__engine_host.h
@@ -2,6 +2,7 @@
#include "defs.h"
#include "change_collector.h"
+#include "key_validator.h"
#include <ydb/core/kqp/runtime/kqp_tasks_runner.h>
#include <ydb/core/tablet_flat/tablet_flat_executor.h>
@@ -40,11 +41,6 @@ public:
ui64 TotalKeysSize = 0;
};
- struct TColumnWriteMeta {
- NTable::TColumn Column;
- ui32 MaxValueSizeBytes = 0;
- };
-
TEngineBay(TDataShard * self, TTransactionContext& txc, const TActorContext& ctx,
std::pair<ui64, ui64> stepTxId);
@@ -57,26 +53,20 @@ public:
void SetUseLlvmRuntime(bool llvmRuntime) { EngineSettings->LlvmRuntime = llvmRuntime; }
EResult Validate() {
- if (Info.Loaded)
+ if (KeyValidator.GetInfo().Loaded)
return EResult::Ok;
Y_ABORT_UNLESS(Engine);
- return Engine->Validate(Info);
+ return Engine->Validate(KeyValidator.GetInfo());
}
EResult ReValidateKeys() {
- Y_ABORT_UNLESS(Info.Loaded);
+ Y_ABORT_UNLESS(KeyValidator.GetInfo().Loaded);
Y_ABORT_UNLESS(Engine);
- return Engine->ValidateKeys(Info);
+ return Engine->ValidateKeys(KeyValidator.GetInfo());
}
- void AddReadRange(const TTableId& tableId, const TVector<NTable::TColumn>& columns, const TTableRange& range,
- const TVector<NScheme::TTypeInfo>& keyTypes, ui64 itemsLimit = 0, bool reverse = false);
-
- void AddWriteRange(const TTableId& tableId, const TTableRange& range, const TVector<NScheme::TTypeInfo>& keyTypes,
- const TVector<TColumnWriteMeta>& columns, bool isPureEraseOp);
-
void MarkTxLoaded() {
- Info.Loaded = true;
+ KeyValidator.GetInfo().SetLoaded();
}
/// @note it expects TValidationInfo keys are materialized outsize of engine's allocs
@@ -99,7 +89,9 @@ public:
ui64 GetStep() const { return StepTxId.first; }
ui64 GetTxId() const { return StepTxId.second; }
- const TValidationInfo& TxInfo() const { return Info; }
+ TKeyValidator& GetKeyValidator() { return KeyValidator; }
+ const TKeyValidator& GetKeyValidator() const { return KeyValidator; }
+ const TValidationInfo& TxInfo() const { return KeyValidator.GetInfo(); }
TEngineBay::TSizes CalcSizes(bool needsTotalKeysSize) const;
void SetWriteVersion(TRowVersion writeVersion);
@@ -129,7 +121,7 @@ private:
THolder<NMiniKQL::TEngineHost> EngineHost;
THolder<NMiniKQL::TEngineFlatSettings> EngineSettings;
THolder<NMiniKQL::IEngineFlat> Engine;
- TValidationInfo Info;
+ TKeyValidator KeyValidator;
TEngineHostCounters EngineHostCounters;
ui64 LockTxId;
ui32 LockNodeId;
diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp
index 80d3143ef8..da99bb4cdc 100644
--- a/ydb/core/tx/datashard/datashard__read_iterator.cpp
+++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp
@@ -1498,7 +1498,7 @@ public:
PrepareValidationInfo(ctx, state);
} else {
// There should be no keys when reading sysm tables
- ValidationInfo.Loaded = true;
+ ValidationInfo.SetLoaded();
}
}
@@ -1797,7 +1797,7 @@ private:
}
}
- ValidationInfo.Loaded = true;
+ ValidationInfo.SetLoaded();
}
void AcquireLock(TReadIteratorState& state, const TActorContext& ctx) {
diff --git a/ydb/core/tx/datashard/datashard_active_transaction.cpp b/ydb/core/tx/datashard/datashard_active_transaction.cpp
index 26e3017052..dff9387950 100644
--- a/ydb/core/tx/datashard/datashard_active_transaction.cpp
+++ b/ydb/core/tx/datashard/datashard_active_transaction.cpp
@@ -62,7 +62,7 @@ TValidatedDataTx::TValidatedDataTx(TDataShard *self,
auto* info = self->TableInfos[tx.GetTableId().GetTableId()].Get();
Y_ABORT_UNLESS(info, "Unexpected missing table info");
TSerializedTableRange range(tx.GetRange());
- EngineBay.AddReadRange(TTableId(tx.GetTableId().GetOwnerId(),
+ EngineBay.GetKeyValidator().AddReadRange(TTableId(tx.GetTableId().GetOwnerId(),
tx.GetTableId().GetTableId()),
{}, range.ToTableRange(), info->KeyColumnTypes);
} else {
diff --git a/ydb/core/tx/datashard/datashard_kqp.cpp b/ydb/core/tx/datashard/datashard_kqp.cpp
index 9b3871ca82..b803537568 100644
--- a/ydb/core/tx/datashard/datashard_kqp.cpp
+++ b/ydb/core/tx/datashard/datashard_kqp.cpp
@@ -344,11 +344,11 @@ TVector<NTable::TColumn> GetColumns(const TReadOpMeta& readMeta) {
return columns;
}
-TVector<TEngineBay::TColumnWriteMeta> GetColumnWrites(const TWriteOpMeta& writeMeta) {
- TVector<TEngineBay::TColumnWriteMeta> writeColumns;
+TVector<TKeyValidator::TColumnWriteMeta> GetColumnWrites(const TWriteOpMeta& writeMeta) {
+ TVector<TKeyValidator::TColumnWriteMeta> writeColumns;
writeColumns.reserve(writeMeta.ColumnsSize());
for (const auto& columnMeta : writeMeta.GetColumns()) {
- TEngineBay::TColumnWriteMeta writeColumn;
+ TKeyValidator::TColumnWriteMeta writeColumn;
writeColumn.Column = GetColumn(columnMeta.GetColumn());
writeColumn.MaxValueSizeBytes = columnMeta.GetMaxValueSizeBytes();
@@ -387,10 +387,10 @@ void KqpSetTxKeysImpl(ui64 tabletId, ui64 taskId, const TTableId& tableId, const
Y_DEBUG_ABORT_UNLESS(!(tableRange.To.GetCells().empty() && tableRange.ToInclusive));
if constexpr (Read) {
- engineBay.AddReadRange(tableId, GetColumns(*readMeta), tableRange.ToTableRange(),
+ engineBay.GetKeyValidator().AddReadRange(tableId, GetColumns(*readMeta), tableRange.ToTableRange(),
tableInfo->KeyColumnTypes, readMeta->GetItemsLimit(), readMeta->GetReverse());
} else {
- engineBay.AddWriteRange(tableId, tableRange.ToTableRange(), tableInfo->KeyColumnTypes,
+ engineBay.GetKeyValidator().AddWriteRange(tableId, tableRange.ToTableRange(), tableInfo->KeyColumnTypes,
GetColumnWrites(*writeMeta), writeMeta->GetIsPureEraseOp());
}
}
@@ -405,11 +405,9 @@ void KqpSetTxKeysImpl(ui64 tabletId, ui64 taskId, const TTableId& tableId, const
<< DebugPrintPoint(tableInfo->KeyColumnTypes, tablePoint.From.GetCells(), typeRegistry));
if constexpr (Read) {
- engineBay.AddReadRange(tableId, GetColumns(*readMeta), tablePoint.ToTableRange(),
- tableInfo->KeyColumnTypes, readMeta->GetItemsLimit(), readMeta->GetReverse());
+ engineBay.GetKeyValidator().AddReadRange(tableId, GetColumns(*readMeta), tablePoint.ToTableRange(), tableInfo->KeyColumnTypes, readMeta->GetItemsLimit(), readMeta->GetReverse());
} else {
- engineBay.AddWriteRange(tableId, tablePoint.ToTableRange(), tableInfo->KeyColumnTypes,
- GetColumnWrites(*writeMeta), writeMeta->GetIsPureEraseOp());
+ engineBay.GetKeyValidator().AddWriteRange(tableId, tablePoint.ToTableRange(), tableInfo->KeyColumnTypes, GetColumnWrites(*writeMeta), writeMeta->GetIsPureEraseOp());
}
}
@@ -426,10 +424,10 @@ void KqpSetTxKeysImpl(ui64 tabletId, ui64 taskId, const TTableId& tableId, const
<< DebugPrintRange(tableInfo->KeyColumnTypes, tableRange.ToTableRange(), typeRegistry));
if constexpr (Read) {
- engineBay.AddReadRange(tableId, GetColumns(*readMeta), tableRange.ToTableRange(),
+ engineBay.GetKeyValidator().AddReadRange(tableId, GetColumns(*readMeta), tableRange.ToTableRange(),
tableInfo->KeyColumnTypes, readMeta->GetItemsLimit(), readMeta->GetReverse());
} else {
- engineBay.AddWriteRange(tableId, tableRange.ToTableRange(), tableInfo->KeyColumnTypes,
+ engineBay.GetKeyValidator().AddWriteRange(tableId, tableRange.ToTableRange(), tableInfo->KeyColumnTypes,
GetColumnWrites(*writeMeta), writeMeta->GetIsPureEraseOp());
}
@@ -442,10 +440,10 @@ void KqpSetTxKeysImpl(ui64 tabletId, ui64 taskId, const TTableId& tableId, const
<< ", task: " << taskId << ", " << (Read ? "read range: UNSPECIFIED" : "write range: UNSPECIFIED"));
if constexpr (Read) {
- engineBay.AddReadRange(tableId, GetColumns(*readMeta), tableInfo->Range.ToTableRange(),
+ engineBay.GetKeyValidator().AddReadRange(tableId, GetColumns(*readMeta), tableInfo->Range.ToTableRange(),
tableInfo->KeyColumnTypes, readMeta->GetItemsLimit(), readMeta->GetReverse());
} else {
- engineBay.AddWriteRange(tableId, tableInfo->Range.ToTableRange(), tableInfo->KeyColumnTypes,
+ engineBay.GetKeyValidator().AddWriteRange(tableId, tableInfo->Range.ToTableRange(), tableInfo->KeyColumnTypes,
GetColumnWrites(*writeMeta), writeMeta->GetIsPureEraseOp());
}
@@ -493,10 +491,10 @@ void KqpSetTxLocksKeys(const NKikimrDataEvents::TKqpLocks& locks, const TSysLock
if (sysLocks.IsMyKey(lockKey)) {
auto point = TTableRange(lockKey, true, {}, true, /* point */ true);
if (NeedValidateLocks(locks.GetOp())) {
- engineBay.AddReadRange(sysLocksTableId, {}, point, lockRowType);
+ engineBay.GetKeyValidator().AddReadRange(sysLocksTableId, {}, point, lockRowType);
}
if (NeedEraseLocks(locks.GetOp())) {
- engineBay.AddWriteRange(sysLocksTableId, point, lockRowType, {}, /* isPureEraseOp */ true);
+ engineBay.GetKeyValidator().AddWriteRange(sysLocksTableId, point, lockRowType, {}, /* isPureEraseOp */ true);
}
}
}
diff --git a/ydb/core/tx/datashard/datashard_kqp_compute.cpp b/ydb/core/tx/datashard/datashard_kqp_compute.cpp
index b3d3dc6f2a..c7da246b02 100644
--- a/ydb/core/tx/datashard/datashard_kqp_compute.cpp
+++ b/ydb/core/tx/datashard/datashard_kqp_compute.cpp
@@ -284,11 +284,10 @@ bool TKqpDatashardComputeContext::PinPages(const TVector<IEngineFlat::TValidated
std::pair<IEngineFlat::EResult, TString> TKqpDatashardComputeContext::ValidateKeys(
const IEngineFlat::TValidationInfo& validationInfo)
{
- std::pair<ui64, ui64> maxSnapshotTime = {0,0}; // unused for now
for (auto& validKey : validationInfo.Keys) {
TKeyDesc * key = validKey.Key.get();
- bool valid = EngineHost.IsValidKey(*key, maxSnapshotTime);
+ bool valid = EngineHost.IsValidKey(*key);
if (valid) {
auto curSchemaVersion = EngineHost.GetTableSchemaVersion(key->TableId);
diff --git a/ydb/core/tx/datashard/datashard_write_operation.cpp b/ydb/core/tx/datashard/datashard_write_operation.cpp
index f602e544aa..fc459ae97e 100644
--- a/ydb/core/tx/datashard/datashard_write_operation.cpp
+++ b/ydb/core/tx/datashard/datashard_write_operation.cpp
@@ -163,11 +163,11 @@ bool TValidatedWriteTx::ParseRecord(const TDataShard::TTableInfos& tableInfos) {
return true;
}
-TVector<TEngineBay::TColumnWriteMeta> GetColumnWrites(const ::google::protobuf::RepeatedField<::NProtoBuf::uint32>& columnTags) {
- TVector<TEngineBay::TColumnWriteMeta> writeColumns;
+TVector<TKeyValidator::TColumnWriteMeta> GetColumnWrites(const ::google::protobuf::RepeatedField<::NProtoBuf::uint32>& columnTags) {
+ TVector<TKeyValidator::TColumnWriteMeta> writeColumns;
writeColumns.reserve(columnTags.size());
for (ui32 columnTag : columnTags) {
- TEngineBay::TColumnWriteMeta writeColumn;
+ TKeyValidator::TColumnWriteMeta writeColumn;
writeColumn.Column = NTable::TColumn("", columnTag, {}, {});
writeColumns.push_back(std::move(writeColumn));
@@ -186,7 +186,7 @@ void TValidatedWriteTx::SetTxKeys(const ::google::protobuf::RepeatedField<::NPro
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Table " << TableInfo->Path << ", shard: " << tabletId << ", "
<< "write point " << DebugPrintPoint(TableInfo->KeyColumnTypes, keyCells, typeRegistry));
TTableRange tableRange(keyCells);
- EngineBay.AddWriteRange(TableId, tableRange, TableInfo->KeyColumnTypes, GetColumnWrites(columnTags), false);
+ EngineBay.GetKeyValidator().AddWriteRange(TableId, tableRange, TableInfo->KeyColumnTypes, GetColumnWrites(columnTags), false);
}
}
diff --git a/ydb/core/tx/datashard/key_validator.cpp b/ydb/core/tx/datashard/key_validator.cpp
new file mode 100644
index 0000000000..fbbf16fd6a
--- /dev/null
+++ b/ydb/core/tx/datashard/key_validator.cpp
@@ -0,0 +1,65 @@
+#include "key_validator.h"
+#include "ydb/core/base/appdata_fwd.h"
+
+
+#include <ydb/library/actors/core/actor.h>
+#include <ydb/library/actors/core/log.h>
+#include <ydb/library/services/services.pb.h>
+#include <ydb/core/tx/datashard/range_ops.h>
+
+using namespace NKikimr;
+using namespace NKikimr::NDataShard;
+
+void TKeyValidator::AddReadRange(const TTableId& tableId, const TVector<NTable::TColumn>& columns, const TTableRange& range, const TVector<NScheme::TTypeInfo>& keyTypes, ui64 itemsLimit, bool reverse)
+{
+ TVector<TKeyDesc::TColumnOp> columnOps;
+ columnOps.reserve(columns.size());
+ for (auto& column : columns) {
+ TKeyDesc::TColumnOp op;
+ op.Column = column.Id;
+ op.Operation = TKeyDesc::EColumnOperation::Read;
+ op.ExpectedType = column.PType;
+ columnOps.emplace_back(std::move(op));
+ }
+
+ LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::TX_DATASHARD, "-- AddReadRange: " << DebugPrintRange(keyTypes, range, *AppData()->TypeRegistry) << " table: " << tableId);
+
+ auto desc = MakeHolder<TKeyDesc>(tableId, range, TKeyDesc::ERowOperation::Read, keyTypes, columnOps, itemsLimit, 0 /* bytesLimit */, reverse);
+
+ Info.Keys.emplace_back(NMiniKQL::IEngineFlat::TValidatedKey(std::move(desc), /* isWrite */ false));
+ ++Info.ReadsCount;
+ Info.SetLoaded();
+}
+
+void TKeyValidator::AddWriteRange(const TTableId& tableId, const TTableRange& range, const TVector<NScheme::TTypeInfo>& keyTypes, const TVector<TColumnWriteMeta>& columns, bool isPureEraseOp)
+{
+ TVector<TKeyDesc::TColumnOp> columnOps;
+ for (const auto& writeColumn : columns) {
+ TKeyDesc::TColumnOp op;
+ op.Column = writeColumn.Column.Id;
+ op.Operation = TKeyDesc::EColumnOperation::Set;
+ op.ExpectedType = writeColumn.Column.PType;
+ op.ImmediateUpdateSize = writeColumn.MaxValueSizeBytes;
+ columnOps.emplace_back(std::move(op));
+ }
+
+ LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::TX_DATASHARD, "-- AddWriteRange: " << DebugPrintRange(keyTypes, range, *AppData()->TypeRegistry) << " table: " << tableId);
+
+ auto rowOp = isPureEraseOp ? TKeyDesc::ERowOperation::Erase : TKeyDesc::ERowOperation::Update;
+ auto desc = MakeHolder<TKeyDesc>(tableId, range, rowOp, keyTypes, columnOps);
+
+ Info.Keys.emplace_back(NMiniKQL::IEngineFlat::TValidatedKey(std::move(desc), /* isWrite */ true));
+ ++Info.WritesCount;
+ if (!range.Point) {
+ ++Info.DynKeysCount;
+ }
+ Info.SetLoaded();
+}
+
+NMiniKQL::IEngineFlat::TValidationInfo& TKeyValidator::GetInfo() {
+ return Info;
+}
+
+const NMiniKQL::IEngineFlat::TValidationInfo& TKeyValidator::GetInfo() const {
+ return Info;
+}
diff --git a/ydb/core/tx/datashard/key_validator.h b/ydb/core/tx/datashard/key_validator.h
new file mode 100644
index 0000000000..bee502fd13
--- /dev/null
+++ b/ydb/core/tx/datashard/key_validator.h
@@ -0,0 +1,27 @@
+#pragma once
+
+
+#include <ydb/core/engine/mkql_engine_flat.h>
+#include <ydb/core/scheme_types/scheme_type_registry.h>
+#include <ydb/core/tablet_flat/flat_table_column.h>
+
+namespace NKikimr::NDataShard {
+
+class TKeyValidator {
+public:
+
+ struct TColumnWriteMeta {
+ NTable::TColumn Column;
+ ui32 MaxValueSizeBytes = 0;
+ };
+
+ void AddReadRange(const TTableId& tableId, const TVector<NTable::TColumn>& columns, const TTableRange& range, const TVector<NScheme::TTypeInfo>& keyTypes, ui64 itemsLimit = 0, bool reverse = false);
+ void AddWriteRange(const TTableId& tableId, const TTableRange& range, const TVector<NScheme::TTypeInfo>& keyTypes, const TVector<TColumnWriteMeta>& columns, bool isPureEraseOp);
+
+ NMiniKQL::IEngineFlat::TValidationInfo& GetInfo();
+ const NMiniKQL::IEngineFlat::TValidationInfo& GetInfo() const;
+private:
+ NMiniKQL::IEngineFlat::TValidationInfo Info;
+};
+
+}
diff --git a/ydb/core/tx/datashard/ya.make b/ydb/core/tx/datashard/ya.make
index e32991e52d..cefaa714b0 100644
--- a/ydb/core/tx/datashard/ya.make
+++ b/ydb/core/tx/datashard/ya.make
@@ -161,6 +161,7 @@ SRCS(
initiate_build_index_unit.cpp
key_conflicts.cpp
key_conflicts.h
+ key_validator.cpp
load_and_wait_in_rs_unit.cpp
load_tx_details_unit.cpp
make_scan_snapshot_unit.cpp