diff options
author | azevaykin <145343289+azevaykin@users.noreply.github.com> | 2024-01-22 11:46:49 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-01-22 11:46:49 +0300 |
commit | 013f39aacfac22cc1982157bdb26fe1ce746fc95 (patch) | |
tree | 04832fe6eea378555d88c7fb1225194038b63ae2 | |
parent | 6fe76108802fcf3f811a5aa0b941d424b298a1f1 (diff) | |
download | ydb-013f39aacfac22cc1982157bdb26fe1ce746fc95.tar.gz |
Move first key validation logic out of engines (#1161)
-rw-r--r-- | ydb/core/engine/minikql/minikql_engine_host.cpp | 153 | ||||
-rw-r--r-- | ydb/core/engine/minikql/minikql_engine_host.h | 5 | ||||
-rw-r--r-- | ydb/core/engine/mkql_engine_flat.cpp | 5 | ||||
-rw-r--r-- | ydb/core/engine/mkql_engine_flat.h | 4 | ||||
-rw-r--r-- | ydb/core/engine/mkql_engine_flat_host.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__engine_host.cpp | 64 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__engine_host.h | 28 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__read_iterator.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_active_transaction.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_kqp.cpp | 28 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_kqp_compute.cpp | 3 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_write_operation.cpp | 8 | ||||
-rw-r--r-- | ydb/core/tx/datashard/key_validator.cpp | 65 | ||||
-rw-r--r-- | ydb/core/tx/datashard/key_validator.h | 27 | ||||
-rw-r--r-- | ydb/core/tx/datashard/ya.make | 1 |
15 files changed, 224 insertions, 175 deletions
diff --git a/ydb/core/engine/minikql/minikql_engine_host.cpp b/ydb/core/engine/minikql/minikql_engine_host.cpp index b00a3f8b42..bbddd3310c 100644 --- a/ydb/core/engine/minikql/minikql_engine_host.cpp +++ b/ydb/core/engine/minikql/minikql_engine_host.cpp @@ -36,6 +36,20 @@ void ConvertTableKeys(const TScheme& scheme, const TScheme::TTableInfo* tableInf *keyDataBytes = bytes; } +void ConvertTableValues(const TScheme& scheme, const TScheme::TTableInfo* tableInfo, const TArrayRef<const IEngineFlatHost::TUpdateCommand>& commands, TSmallVec<NTable::TUpdateOp>& ops, ui64* valueBytes) { + ui64 bytes = 0; + ops.reserve(commands.size()); + for (size_t i = 0; i < commands.size(); i++) { + const IEngineFlatHost::TUpdateCommand& upd = commands[i]; + Y_ABORT_UNLESS(upd.Operation == TKeyDesc::EColumnOperation::Set); + auto vtypeinfo = scheme.GetColumnInfo(tableInfo, upd.Column)->PType; + ops.emplace_back(upd.Column, NTable::ECellOp::Set, upd.Value.IsNull() ? TRawTypeValue() : TRawTypeValue(upd.Value.Data(), upd.Value.Size(), vtypeinfo)); + bytes += upd.Value.IsNull() ? 1 : upd.Value.Size(); + } + if (valueBytes) + *valueBytes = bytes; +} + TEngineHost::TEngineHost(NTable::TDatabase& db, TEngineHostCounters& counters, const TEngineHostSettings& settings) : Db(db) , Scheme(db.GetScheme()) @@ -54,72 +68,10 @@ const TScheme::TTableInfo* TEngineHost::GetTableInfo(const TTableId& tableId) co bool TEngineHost::IsReadonly() const { return Settings.IsReadonly; } - - -bool TEngineHost::IsValidKey(TKeyDesc& key, std::pair<ui64, ui64>& maxSnapshotTime) const { - Y_UNUSED(maxSnapshotTime); - - auto* tableInfo = Scheme.GetTableInfo(LocalTableId(key.TableId)); - -#define EH_VALIDATE(cond, err_status) \ - do { \ - if (!(cond)) { \ - key.Status = TKeyDesc::EStatus::err_status; \ - return false; \ - } \ - } while(false) \ - /**/ - - EH_VALIDATE(tableInfo, NotExists); // Table does not exist - EH_VALIDATE(key.KeyColumnTypes.size() <= tableInfo->KeyColumns.size(), TypeCheckFailed); - - // Specified keys types should be valid for any operation - for (size_t keyIdx = 0; keyIdx < key.KeyColumnTypes.size(); keyIdx++) { - ui32 keyCol = tableInfo->KeyColumns[keyIdx]; - auto vtype = Scheme.GetColumnInfo(tableInfo, keyCol)->PType; - EH_VALIDATE(key.KeyColumnTypes[keyIdx] == vtype, TypeCheckFailed); - } - - if (key.RowOperation == TKeyDesc::ERowOperation::Read) { - if (key.Range.Point) { - EH_VALIDATE(key.KeyColumnTypes.size() == tableInfo->KeyColumns.size(), TypeCheckFailed); - } else { - EH_VALIDATE(key.KeyColumnTypes.size() <= tableInfo->KeyColumns.size(), TypeCheckFailed); - } - - for (size_t i = 0; i < key.Columns.size(); i++) { - const TKeyDesc::TColumnOp& cop = key.Columns[i]; - if (IsSystemColumn(cop.Column)) { - continue; - } - auto* cinfo = Scheme.GetColumnInfo(tableInfo, cop.Column); - EH_VALIDATE(cinfo, TypeCheckFailed); // Unknown column - auto vtype = cinfo->PType; - EH_VALIDATE(cop.ExpectedType == vtype, TypeCheckFailed); - EH_VALIDATE(cop.Operation == TKeyDesc::EColumnOperation::Read, OperationNotSupported); - } - } else if (key.RowOperation == TKeyDesc::ERowOperation::Update) { - EH_VALIDATE(key.KeyColumnTypes.size() == tableInfo->KeyColumns.size(), TypeCheckFailed); // Key must be full for updates - for (size_t i = 0; i < key.Columns.size(); i++) { - const TKeyDesc::TColumnOp& cop = key.Columns[i]; - auto* cinfo = Scheme.GetColumnInfo(tableInfo, cop.Column); - EH_VALIDATE(cinfo, TypeCheckFailed); // Unknown column - auto vtype = cinfo->PType; - EH_VALIDATE(cop.ExpectedType.GetTypeId() == 0 || cop.ExpectedType == vtype, TypeCheckFailed); - EH_VALIDATE(cop.Operation == TKeyDesc::EColumnOperation::Set, OperationNotSupported); // TODO[serxa]: support inplace operations in IsValidKey - } - } else if (key.RowOperation == TKeyDesc::ERowOperation::Erase) { - EH_VALIDATE(key.KeyColumnTypes.size() == tableInfo->KeyColumns.size(), TypeCheckFailed); - } else { - EH_VALIDATE(false, OperationNotSupported); - } - -#undef EH_VALIDATE - - key.Status = TKeyDesc::EStatus::Ok; - return true; +bool TEngineHost::IsValidKey(TKeyDesc& key) const { + ui64 localTableId = LocalTableId(key.TableId); + return NMiniKQL::IsValidKey(Scheme, localTableId, key); } - ui64 TEngineHost::CalculateReadSize(const TVector<const TKeyDesc*>& keys) const { NTable::TSizeEnv env; @@ -880,14 +832,7 @@ void TEngineHost::UpdateRow(const TTableId& tableId, const TArrayRef<const TCell ui64 valueBytes = 0; TSmallVec<NTable::TUpdateOp> ops; - for (size_t i = 0; i < commands.size(); i++) { - const TUpdateCommand& upd = commands[i]; - Y_ABORT_UNLESS(upd.Operation == TKeyDesc::EColumnOperation::Set); // TODO[serxa]: support inplace update in update row - auto vtypeinfo = Scheme.GetColumnInfo(tableInfo, upd.Column)->PType; - ops.emplace_back(upd.Column, NTable::ECellOp::Set, - upd.Value.IsNull() ? TRawTypeValue() : TRawTypeValue(upd.Value.Data(), upd.Value.Size(), vtypeinfo)); - valueBytes += upd.Value.IsNull() ? 1 : upd.Value.Size(); - } + ConvertTableValues(Scheme, tableInfo, commands, ops, &valueBytes); auto* collector = GetChangeCollector(tableId); @@ -977,6 +922,68 @@ void TEngineHost::SetPeriodicCallback(TPeriodicCallback&& callback) { PeriodicCallback = std::move(callback); } +bool IsValidKey(const TScheme& scheme, ui64 localTableId, TKeyDesc& key) { + auto* tableInfo = scheme.GetTableInfo(localTableId); + Y_ABORT_UNLESS(tableInfo); + +#define EH_VALIDATE(cond, err_status) \ + do { \ + if (!(cond)) { \ + key.Status = TKeyDesc::EStatus::err_status; \ + return false; \ + } \ + } while (false) /**/ + + EH_VALIDATE(tableInfo, NotExists); // Table does not exist + EH_VALIDATE(key.KeyColumnTypes.size() <= tableInfo->KeyColumns.size(), TypeCheckFailed); + + // Specified keys types should be valid for any operation + for (size_t keyIdx = 0; keyIdx < key.KeyColumnTypes.size(); keyIdx++) { + ui32 keyCol = tableInfo->KeyColumns[keyIdx]; + auto vtype = scheme.GetColumnInfo(tableInfo, keyCol)->PType; + EH_VALIDATE(key.KeyColumnTypes[keyIdx] == vtype, TypeCheckFailed); + } + + if (key.RowOperation == TKeyDesc::ERowOperation::Read) { + if (key.Range.Point) { + EH_VALIDATE(key.KeyColumnTypes.size() == tableInfo->KeyColumns.size(), TypeCheckFailed); + } else { + EH_VALIDATE(key.KeyColumnTypes.size() <= tableInfo->KeyColumns.size(), TypeCheckFailed); + } + + for (size_t i = 0; i < key.Columns.size(); i++) { + const TKeyDesc::TColumnOp& cop = key.Columns[i]; + if (IsSystemColumn(cop.Column)) { + continue; + } + auto* cinfo = scheme.GetColumnInfo(tableInfo, cop.Column); + EH_VALIDATE(cinfo, TypeCheckFailed); // Unknown column + auto vtype = cinfo->PType; + EH_VALIDATE(cop.ExpectedType == vtype, TypeCheckFailed); + EH_VALIDATE(cop.Operation == TKeyDesc::EColumnOperation::Read, OperationNotSupported); + } + } else if (key.RowOperation == TKeyDesc::ERowOperation::Update) { + EH_VALIDATE(key.KeyColumnTypes.size() == tableInfo->KeyColumns.size(), TypeCheckFailed); // Key must be full for updates + for (size_t i = 0; i < key.Columns.size(); i++) { + const TKeyDesc::TColumnOp& cop = key.Columns[i]; + auto* cinfo = scheme.GetColumnInfo(tableInfo, cop.Column); + EH_VALIDATE(cinfo, TypeCheckFailed); // Unknown column + auto vtype = cinfo->PType; + EH_VALIDATE(cop.ExpectedType.GetTypeId() == 0 || cop.ExpectedType == vtype, TypeCheckFailed); + EH_VALIDATE(cop.Operation == TKeyDesc::EColumnOperation::Set, OperationNotSupported); // TODO[serxa]: support inplace operations in IsValidKey + } + } else if (key.RowOperation == TKeyDesc::ERowOperation::Erase) { + EH_VALIDATE(key.KeyColumnTypes.size() == tableInfo->KeyColumns.size(), TypeCheckFailed); + } else { + EH_VALIDATE(false, OperationNotSupported); + } + +#undef EH_VALIDATE + + key.Status = TKeyDesc::EStatus::Ok; + return true; +} + void AnalyzeRowType(TStructLiteral* columnIds, TSmallVec<NTable::TTag>& tags, TSmallVec<NTable::TTag>& systemColumnTags) { // Find out tags that should be read in Select*() functions tags.reserve(columnIds->GetValuesCount()); diff --git a/ydb/core/engine/minikql/minikql_engine_host.h b/ydb/core/engine/minikql/minikql_engine_host.h index ab769e703c..8aaf692153 100644 --- a/ydb/core/engine/minikql/minikql_engine_host.h +++ b/ydb/core/engine/minikql/minikql_engine_host.h @@ -98,7 +98,7 @@ public: ui64 GetShardId() const override; const TScheme::TTableInfo* GetTableInfo(const TTableId& tableId) const override; bool IsReadonly() const override; - bool IsValidKey(TKeyDesc& key, std::pair<ui64, ui64>& maxSnapshotTime) const override; + bool IsValidKey(TKeyDesc& key) const override; ui64 CalculateReadSize(const TVector<const TKeyDesc*>& keys) const override; ui64 CalculateResultSize(const TKeyDesc& key) const override; void PinPages(const TVector<THolder<TKeyDesc>>& keys, ui64 pageFaultCount) override; @@ -193,6 +193,7 @@ public: } }; +bool IsValidKey(const TEngineHost::TScheme& scheme, ui64 localTableId, TKeyDesc& key); void AnalyzeRowType(TStructLiteral* columnIds, TSmallVec<NTable::TTag>& tags, TSmallVec<NTable::TTag>& systemColumnTags); NUdf::TUnboxedValue GetCellValue(const TCell& cell, NScheme::TTypeInfo type); NUdf::TUnboxedValue CreateSelectRangeLazyRowsList(NTable::TDatabase& db, const NTable::TScheme& scheme, @@ -203,4 +204,6 @@ NUdf::TUnboxedValue CreateSelectRangeLazyRowsList(NTable::TDatabase& db, const N void ConvertTableKeys(const NTable::TScheme& scheme, const NTable::TScheme::TTableInfo* tableInfo, const TArrayRef<const TCell>& row, TSmallVec<TRawTypeValue>& key, ui64* keyDataBytes); +void ConvertTableValues(const NTable::TScheme& scheme, const NTable::TScheme::TTableInfo* tableInfo, + const TArrayRef<const IEngineFlatHost::TUpdateCommand>& commands, TSmallVec<NTable::TUpdateOp>& ops, ui64* valueBytes); }} diff --git a/ydb/core/engine/mkql_engine_flat.cpp b/ydb/core/engine/mkql_engine_flat.cpp index 6a2efed115..2f94b2f260 100644 --- a/ydb/core/engine/mkql_engine_flat.cpp +++ b/ydb/core/engine/mkql_engine_flat.cpp @@ -760,11 +760,10 @@ public: EResult ValidateKeys(TValidationInfo& validationInfo) override { EResult result = EResult::Ok; - std::pair<ui64, ui64> maxSnapshotTime = {0,0}; // unused for now for (auto& validKey : validationInfo.Keys) { TKeyDesc * key = validKey.Key.get(); - bool valid = Settings.Host->IsValidKey(*key, maxSnapshotTime); + bool valid = Settings.Host->IsValidKey(*key); if (valid) { auto curSchemaVersion = Settings.Host->GetTableSchemaVersion(key->TableId); @@ -1042,7 +1041,7 @@ public: } if (Y_LIKELY(result == EResult::Ok)) { - validationInfo.Loaded = true; + validationInfo.SetLoaded(); } IsProgramValidated = true; diff --git a/ydb/core/engine/mkql_engine_flat.h b/ydb/core/engine/mkql_engine_flat.h index d4ec11b032..7c2102feee 100644 --- a/ydb/core/engine/mkql_engine_flat.h +++ b/ydb/core/engine/mkql_engine_flat.h @@ -185,6 +185,10 @@ public: HasInReadsets = false; Loaded = false; } + + void SetLoaded() { + Loaded = true; + } }; //-- error reporting diff --git a/ydb/core/engine/mkql_engine_flat_host.h b/ydb/core/engine/mkql_engine_flat_host.h index 0593fb3a9b..6a78fe6b3e 100644 --- a/ydb/core/engine/mkql_engine_flat_host.h +++ b/ydb/core/engine/mkql_engine_flat_host.h @@ -30,7 +30,7 @@ public: virtual bool IsReadonly() const = 0; // Validate key and fill status into it. - virtual bool IsValidKey(TKeyDesc& key, std::pair<ui64, ui64>& maxSnapshotTime) const = 0; + virtual bool IsValidKey(TKeyDesc& key) const = 0; // Calculate the whole size of data that needs to be read into memory virtual ui64 CalculateReadSize(const TVector<const TKeyDesc*>& keys) const = 0; diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp index 89d76611a1..21f65bb67f 100644 --- a/ydb/core/tx/datashard/datashard__engine_host.cpp +++ b/ydb/core/tx/datashard/datashard__engine_host.cpp @@ -406,10 +406,13 @@ public: return VolatileCommitOrdered; } - bool IsValidKey(TKeyDesc& key, std::pair<ui64, ui64>& maxSnapshotTime) const override { + bool IsValidKey(TKeyDesc& key) const override { if (TSysTables::IsSystemTable(key.TableId)) return DataShardSysTable(key.TableId).IsValidKey(key); + ui64 localTableId = Self->GetLocalTableId(key.TableId); + Y_ABORT_UNLESS(localTableId != 0, "Unexpected IsValidKey for an unknown table"); + if (LockTxId) { // Prevent updates/erases with LockTxId set, unless it's allowed for immediate mvcc txs if (key.RowOperation != TKeyDesc::ERowOperation::Read && @@ -426,7 +429,7 @@ public: } } - return TEngineHost::IsValidKey(key, maxSnapshotTime); + return NMiniKQL::IsValidKey(Db.GetScheme(), localTableId, key); } NUdf::TUnboxedValue SelectRow(const TTableId& tableId, const TArrayRef<const TCell>& row, @@ -1088,64 +1091,15 @@ TEngineBay::~TEngineBay() { } } -void TEngineBay::AddReadRange(const TTableId& tableId, const TVector<NTable::TColumn>& columns, - const TTableRange& range, const TVector<NScheme::TTypeInfo>& keyTypes, ui64 itemsLimit, bool reverse) -{ - TVector<TKeyDesc::TColumnOp> columnOps; - columnOps.reserve(columns.size()); - for (auto& column : columns) { - TKeyDesc::TColumnOp op; - op.Column = column.Id; - op.Operation = TKeyDesc::EColumnOperation::Read; - op.ExpectedType = column.PType; - columnOps.emplace_back(std::move(op)); - } - - LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, - "-- AddReadRange: " << DebugPrintRange(keyTypes, range, *AppData()->TypeRegistry) << " table: " << tableId); - - auto desc = MakeHolder<TKeyDesc>(tableId, range, TKeyDesc::ERowOperation::Read, keyTypes, columnOps, itemsLimit, - 0 /* bytesLimit */, reverse); - Info.Keys.emplace_back(TValidatedKey(std::move(desc), /* isWrite */ false)); - // Info.Keys.back().IsResultPart = not a lock key? // TODO: KIKIMR-11134 - ++Info.ReadsCount; - Info.Loaded = true; -} - -void TEngineBay::AddWriteRange(const TTableId& tableId, const TTableRange& range, - const TVector<NScheme::TTypeInfo>& keyTypes, const TVector<TColumnWriteMeta>& columns, - bool isPureEraseOp) -{ - TVector<TKeyDesc::TColumnOp> columnOps; - for (const auto& writeColumn : columns) { - TKeyDesc::TColumnOp op; - op.Column = writeColumn.Column.Id; - op.Operation = TKeyDesc::EColumnOperation::Set; - op.ExpectedType = writeColumn.Column.PType; - op.ImmediateUpdateSize = writeColumn.MaxValueSizeBytes; - columnOps.emplace_back(std::move(op)); - } - - LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, - "-- AddWriteRange: " << DebugPrintRange(keyTypes, range, *AppData()->TypeRegistry) << " table: " << tableId); - - auto rowOp = isPureEraseOp ? TKeyDesc::ERowOperation::Erase : TKeyDesc::ERowOperation::Update; - auto desc = MakeHolder<TKeyDesc>(tableId, range, rowOp, keyTypes, columnOps); - Info.Keys.emplace_back(TValidatedKey(std::move(desc), /* isWrite */ true)); - ++Info.WritesCount; - if (!range.Point) { - ++Info.DynKeysCount; - } - Info.Loaded = true; -} - TEngineBay::TSizes TEngineBay::CalcSizes(bool needsTotalKeysSize) const { Y_ABORT_UNLESS(EngineHost); + const auto& info = KeyValidator.GetInfo(); + TSizes outSizes; TVector<const TKeyDesc*> readKeys; - readKeys.reserve(Info.ReadsCount); - for (const TValidatedKey& validKey : Info.Keys) { + readKeys.reserve(info.ReadsCount); + for (const TValidatedKey& validKey : info.Keys) { if (validKey.IsWrite) continue; diff --git a/ydb/core/tx/datashard/datashard__engine_host.h b/ydb/core/tx/datashard/datashard__engine_host.h index 660ed7ad5e..6ec942aaeb 100644 --- a/ydb/core/tx/datashard/datashard__engine_host.h +++ b/ydb/core/tx/datashard/datashard__engine_host.h @@ -2,6 +2,7 @@ #include "defs.h" #include "change_collector.h" +#include "key_validator.h" #include <ydb/core/kqp/runtime/kqp_tasks_runner.h> #include <ydb/core/tablet_flat/tablet_flat_executor.h> @@ -40,11 +41,6 @@ public: ui64 TotalKeysSize = 0; }; - struct TColumnWriteMeta { - NTable::TColumn Column; - ui32 MaxValueSizeBytes = 0; - }; - TEngineBay(TDataShard * self, TTransactionContext& txc, const TActorContext& ctx, std::pair<ui64, ui64> stepTxId); @@ -57,26 +53,20 @@ public: void SetUseLlvmRuntime(bool llvmRuntime) { EngineSettings->LlvmRuntime = llvmRuntime; } EResult Validate() { - if (Info.Loaded) + if (KeyValidator.GetInfo().Loaded) return EResult::Ok; Y_ABORT_UNLESS(Engine); - return Engine->Validate(Info); + return Engine->Validate(KeyValidator.GetInfo()); } EResult ReValidateKeys() { - Y_ABORT_UNLESS(Info.Loaded); + Y_ABORT_UNLESS(KeyValidator.GetInfo().Loaded); Y_ABORT_UNLESS(Engine); - return Engine->ValidateKeys(Info); + return Engine->ValidateKeys(KeyValidator.GetInfo()); } - void AddReadRange(const TTableId& tableId, const TVector<NTable::TColumn>& columns, const TTableRange& range, - const TVector<NScheme::TTypeInfo>& keyTypes, ui64 itemsLimit = 0, bool reverse = false); - - void AddWriteRange(const TTableId& tableId, const TTableRange& range, const TVector<NScheme::TTypeInfo>& keyTypes, - const TVector<TColumnWriteMeta>& columns, bool isPureEraseOp); - void MarkTxLoaded() { - Info.Loaded = true; + KeyValidator.GetInfo().SetLoaded(); } /// @note it expects TValidationInfo keys are materialized outsize of engine's allocs @@ -99,7 +89,9 @@ public: ui64 GetStep() const { return StepTxId.first; } ui64 GetTxId() const { return StepTxId.second; } - const TValidationInfo& TxInfo() const { return Info; } + TKeyValidator& GetKeyValidator() { return KeyValidator; } + const TKeyValidator& GetKeyValidator() const { return KeyValidator; } + const TValidationInfo& TxInfo() const { return KeyValidator.GetInfo(); } TEngineBay::TSizes CalcSizes(bool needsTotalKeysSize) const; void SetWriteVersion(TRowVersion writeVersion); @@ -129,7 +121,7 @@ private: THolder<NMiniKQL::TEngineHost> EngineHost; THolder<NMiniKQL::TEngineFlatSettings> EngineSettings; THolder<NMiniKQL::IEngineFlat> Engine; - TValidationInfo Info; + TKeyValidator KeyValidator; TEngineHostCounters EngineHostCounters; ui64 LockTxId; ui32 LockNodeId; diff --git a/ydb/core/tx/datashard/datashard__read_iterator.cpp b/ydb/core/tx/datashard/datashard__read_iterator.cpp index 80d3143ef8..da99bb4cdc 100644 --- a/ydb/core/tx/datashard/datashard__read_iterator.cpp +++ b/ydb/core/tx/datashard/datashard__read_iterator.cpp @@ -1498,7 +1498,7 @@ public: PrepareValidationInfo(ctx, state); } else { // There should be no keys when reading sysm tables - ValidationInfo.Loaded = true; + ValidationInfo.SetLoaded(); } } @@ -1797,7 +1797,7 @@ private: } } - ValidationInfo.Loaded = true; + ValidationInfo.SetLoaded(); } void AcquireLock(TReadIteratorState& state, const TActorContext& ctx) { diff --git a/ydb/core/tx/datashard/datashard_active_transaction.cpp b/ydb/core/tx/datashard/datashard_active_transaction.cpp index 26e3017052..dff9387950 100644 --- a/ydb/core/tx/datashard/datashard_active_transaction.cpp +++ b/ydb/core/tx/datashard/datashard_active_transaction.cpp @@ -62,7 +62,7 @@ TValidatedDataTx::TValidatedDataTx(TDataShard *self, auto* info = self->TableInfos[tx.GetTableId().GetTableId()].Get(); Y_ABORT_UNLESS(info, "Unexpected missing table info"); TSerializedTableRange range(tx.GetRange()); - EngineBay.AddReadRange(TTableId(tx.GetTableId().GetOwnerId(), + EngineBay.GetKeyValidator().AddReadRange(TTableId(tx.GetTableId().GetOwnerId(), tx.GetTableId().GetTableId()), {}, range.ToTableRange(), info->KeyColumnTypes); } else { diff --git a/ydb/core/tx/datashard/datashard_kqp.cpp b/ydb/core/tx/datashard/datashard_kqp.cpp index 9b3871ca82..b803537568 100644 --- a/ydb/core/tx/datashard/datashard_kqp.cpp +++ b/ydb/core/tx/datashard/datashard_kqp.cpp @@ -344,11 +344,11 @@ TVector<NTable::TColumn> GetColumns(const TReadOpMeta& readMeta) { return columns; } -TVector<TEngineBay::TColumnWriteMeta> GetColumnWrites(const TWriteOpMeta& writeMeta) { - TVector<TEngineBay::TColumnWriteMeta> writeColumns; +TVector<TKeyValidator::TColumnWriteMeta> GetColumnWrites(const TWriteOpMeta& writeMeta) { + TVector<TKeyValidator::TColumnWriteMeta> writeColumns; writeColumns.reserve(writeMeta.ColumnsSize()); for (const auto& columnMeta : writeMeta.GetColumns()) { - TEngineBay::TColumnWriteMeta writeColumn; + TKeyValidator::TColumnWriteMeta writeColumn; writeColumn.Column = GetColumn(columnMeta.GetColumn()); writeColumn.MaxValueSizeBytes = columnMeta.GetMaxValueSizeBytes(); @@ -387,10 +387,10 @@ void KqpSetTxKeysImpl(ui64 tabletId, ui64 taskId, const TTableId& tableId, const Y_DEBUG_ABORT_UNLESS(!(tableRange.To.GetCells().empty() && tableRange.ToInclusive)); if constexpr (Read) { - engineBay.AddReadRange(tableId, GetColumns(*readMeta), tableRange.ToTableRange(), + engineBay.GetKeyValidator().AddReadRange(tableId, GetColumns(*readMeta), tableRange.ToTableRange(), tableInfo->KeyColumnTypes, readMeta->GetItemsLimit(), readMeta->GetReverse()); } else { - engineBay.AddWriteRange(tableId, tableRange.ToTableRange(), tableInfo->KeyColumnTypes, + engineBay.GetKeyValidator().AddWriteRange(tableId, tableRange.ToTableRange(), tableInfo->KeyColumnTypes, GetColumnWrites(*writeMeta), writeMeta->GetIsPureEraseOp()); } } @@ -405,11 +405,9 @@ void KqpSetTxKeysImpl(ui64 tabletId, ui64 taskId, const TTableId& tableId, const << DebugPrintPoint(tableInfo->KeyColumnTypes, tablePoint.From.GetCells(), typeRegistry)); if constexpr (Read) { - engineBay.AddReadRange(tableId, GetColumns(*readMeta), tablePoint.ToTableRange(), - tableInfo->KeyColumnTypes, readMeta->GetItemsLimit(), readMeta->GetReverse()); + engineBay.GetKeyValidator().AddReadRange(tableId, GetColumns(*readMeta), tablePoint.ToTableRange(), tableInfo->KeyColumnTypes, readMeta->GetItemsLimit(), readMeta->GetReverse()); } else { - engineBay.AddWriteRange(tableId, tablePoint.ToTableRange(), tableInfo->KeyColumnTypes, - GetColumnWrites(*writeMeta), writeMeta->GetIsPureEraseOp()); + engineBay.GetKeyValidator().AddWriteRange(tableId, tablePoint.ToTableRange(), tableInfo->KeyColumnTypes, GetColumnWrites(*writeMeta), writeMeta->GetIsPureEraseOp()); } } @@ -426,10 +424,10 @@ void KqpSetTxKeysImpl(ui64 tabletId, ui64 taskId, const TTableId& tableId, const << DebugPrintRange(tableInfo->KeyColumnTypes, tableRange.ToTableRange(), typeRegistry)); if constexpr (Read) { - engineBay.AddReadRange(tableId, GetColumns(*readMeta), tableRange.ToTableRange(), + engineBay.GetKeyValidator().AddReadRange(tableId, GetColumns(*readMeta), tableRange.ToTableRange(), tableInfo->KeyColumnTypes, readMeta->GetItemsLimit(), readMeta->GetReverse()); } else { - engineBay.AddWriteRange(tableId, tableRange.ToTableRange(), tableInfo->KeyColumnTypes, + engineBay.GetKeyValidator().AddWriteRange(tableId, tableRange.ToTableRange(), tableInfo->KeyColumnTypes, GetColumnWrites(*writeMeta), writeMeta->GetIsPureEraseOp()); } @@ -442,10 +440,10 @@ void KqpSetTxKeysImpl(ui64 tabletId, ui64 taskId, const TTableId& tableId, const << ", task: " << taskId << ", " << (Read ? "read range: UNSPECIFIED" : "write range: UNSPECIFIED")); if constexpr (Read) { - engineBay.AddReadRange(tableId, GetColumns(*readMeta), tableInfo->Range.ToTableRange(), + engineBay.GetKeyValidator().AddReadRange(tableId, GetColumns(*readMeta), tableInfo->Range.ToTableRange(), tableInfo->KeyColumnTypes, readMeta->GetItemsLimit(), readMeta->GetReverse()); } else { - engineBay.AddWriteRange(tableId, tableInfo->Range.ToTableRange(), tableInfo->KeyColumnTypes, + engineBay.GetKeyValidator().AddWriteRange(tableId, tableInfo->Range.ToTableRange(), tableInfo->KeyColumnTypes, GetColumnWrites(*writeMeta), writeMeta->GetIsPureEraseOp()); } @@ -493,10 +491,10 @@ void KqpSetTxLocksKeys(const NKikimrDataEvents::TKqpLocks& locks, const TSysLock if (sysLocks.IsMyKey(lockKey)) { auto point = TTableRange(lockKey, true, {}, true, /* point */ true); if (NeedValidateLocks(locks.GetOp())) { - engineBay.AddReadRange(sysLocksTableId, {}, point, lockRowType); + engineBay.GetKeyValidator().AddReadRange(sysLocksTableId, {}, point, lockRowType); } if (NeedEraseLocks(locks.GetOp())) { - engineBay.AddWriteRange(sysLocksTableId, point, lockRowType, {}, /* isPureEraseOp */ true); + engineBay.GetKeyValidator().AddWriteRange(sysLocksTableId, point, lockRowType, {}, /* isPureEraseOp */ true); } } } diff --git a/ydb/core/tx/datashard/datashard_kqp_compute.cpp b/ydb/core/tx/datashard/datashard_kqp_compute.cpp index b3d3dc6f2a..c7da246b02 100644 --- a/ydb/core/tx/datashard/datashard_kqp_compute.cpp +++ b/ydb/core/tx/datashard/datashard_kqp_compute.cpp @@ -284,11 +284,10 @@ bool TKqpDatashardComputeContext::PinPages(const TVector<IEngineFlat::TValidated std::pair<IEngineFlat::EResult, TString> TKqpDatashardComputeContext::ValidateKeys( const IEngineFlat::TValidationInfo& validationInfo) { - std::pair<ui64, ui64> maxSnapshotTime = {0,0}; // unused for now for (auto& validKey : validationInfo.Keys) { TKeyDesc * key = validKey.Key.get(); - bool valid = EngineHost.IsValidKey(*key, maxSnapshotTime); + bool valid = EngineHost.IsValidKey(*key); if (valid) { auto curSchemaVersion = EngineHost.GetTableSchemaVersion(key->TableId); diff --git a/ydb/core/tx/datashard/datashard_write_operation.cpp b/ydb/core/tx/datashard/datashard_write_operation.cpp index f602e544aa..fc459ae97e 100644 --- a/ydb/core/tx/datashard/datashard_write_operation.cpp +++ b/ydb/core/tx/datashard/datashard_write_operation.cpp @@ -163,11 +163,11 @@ bool TValidatedWriteTx::ParseRecord(const TDataShard::TTableInfos& tableInfos) { return true; } -TVector<TEngineBay::TColumnWriteMeta> GetColumnWrites(const ::google::protobuf::RepeatedField<::NProtoBuf::uint32>& columnTags) { - TVector<TEngineBay::TColumnWriteMeta> writeColumns; +TVector<TKeyValidator::TColumnWriteMeta> GetColumnWrites(const ::google::protobuf::RepeatedField<::NProtoBuf::uint32>& columnTags) { + TVector<TKeyValidator::TColumnWriteMeta> writeColumns; writeColumns.reserve(columnTags.size()); for (ui32 columnTag : columnTags) { - TEngineBay::TColumnWriteMeta writeColumn; + TKeyValidator::TColumnWriteMeta writeColumn; writeColumn.Column = NTable::TColumn("", columnTag, {}, {}); writeColumns.push_back(std::move(writeColumn)); @@ -186,7 +186,7 @@ void TValidatedWriteTx::SetTxKeys(const ::google::protobuf::RepeatedField<::NPro LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Table " << TableInfo->Path << ", shard: " << tabletId << ", " << "write point " << DebugPrintPoint(TableInfo->KeyColumnTypes, keyCells, typeRegistry)); TTableRange tableRange(keyCells); - EngineBay.AddWriteRange(TableId, tableRange, TableInfo->KeyColumnTypes, GetColumnWrites(columnTags), false); + EngineBay.GetKeyValidator().AddWriteRange(TableId, tableRange, TableInfo->KeyColumnTypes, GetColumnWrites(columnTags), false); } } diff --git a/ydb/core/tx/datashard/key_validator.cpp b/ydb/core/tx/datashard/key_validator.cpp new file mode 100644 index 0000000000..fbbf16fd6a --- /dev/null +++ b/ydb/core/tx/datashard/key_validator.cpp @@ -0,0 +1,65 @@ +#include "key_validator.h" +#include "ydb/core/base/appdata_fwd.h" + + +#include <ydb/library/actors/core/actor.h> +#include <ydb/library/actors/core/log.h> +#include <ydb/library/services/services.pb.h> +#include <ydb/core/tx/datashard/range_ops.h> + +using namespace NKikimr; +using namespace NKikimr::NDataShard; + +void TKeyValidator::AddReadRange(const TTableId& tableId, const TVector<NTable::TColumn>& columns, const TTableRange& range, const TVector<NScheme::TTypeInfo>& keyTypes, ui64 itemsLimit, bool reverse) +{ + TVector<TKeyDesc::TColumnOp> columnOps; + columnOps.reserve(columns.size()); + for (auto& column : columns) { + TKeyDesc::TColumnOp op; + op.Column = column.Id; + op.Operation = TKeyDesc::EColumnOperation::Read; + op.ExpectedType = column.PType; + columnOps.emplace_back(std::move(op)); + } + + LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::TX_DATASHARD, "-- AddReadRange: " << DebugPrintRange(keyTypes, range, *AppData()->TypeRegistry) << " table: " << tableId); + + auto desc = MakeHolder<TKeyDesc>(tableId, range, TKeyDesc::ERowOperation::Read, keyTypes, columnOps, itemsLimit, 0 /* bytesLimit */, reverse); + + Info.Keys.emplace_back(NMiniKQL::IEngineFlat::TValidatedKey(std::move(desc), /* isWrite */ false)); + ++Info.ReadsCount; + Info.SetLoaded(); +} + +void TKeyValidator::AddWriteRange(const TTableId& tableId, const TTableRange& range, const TVector<NScheme::TTypeInfo>& keyTypes, const TVector<TColumnWriteMeta>& columns, bool isPureEraseOp) +{ + TVector<TKeyDesc::TColumnOp> columnOps; + for (const auto& writeColumn : columns) { + TKeyDesc::TColumnOp op; + op.Column = writeColumn.Column.Id; + op.Operation = TKeyDesc::EColumnOperation::Set; + op.ExpectedType = writeColumn.Column.PType; + op.ImmediateUpdateSize = writeColumn.MaxValueSizeBytes; + columnOps.emplace_back(std::move(op)); + } + + LOG_TRACE_S(*NActors::TlsActivationContext, NKikimrServices::TX_DATASHARD, "-- AddWriteRange: " << DebugPrintRange(keyTypes, range, *AppData()->TypeRegistry) << " table: " << tableId); + + auto rowOp = isPureEraseOp ? TKeyDesc::ERowOperation::Erase : TKeyDesc::ERowOperation::Update; + auto desc = MakeHolder<TKeyDesc>(tableId, range, rowOp, keyTypes, columnOps); + + Info.Keys.emplace_back(NMiniKQL::IEngineFlat::TValidatedKey(std::move(desc), /* isWrite */ true)); + ++Info.WritesCount; + if (!range.Point) { + ++Info.DynKeysCount; + } + Info.SetLoaded(); +} + +NMiniKQL::IEngineFlat::TValidationInfo& TKeyValidator::GetInfo() { + return Info; +} + +const NMiniKQL::IEngineFlat::TValidationInfo& TKeyValidator::GetInfo() const { + return Info; +} diff --git a/ydb/core/tx/datashard/key_validator.h b/ydb/core/tx/datashard/key_validator.h new file mode 100644 index 0000000000..bee502fd13 --- /dev/null +++ b/ydb/core/tx/datashard/key_validator.h @@ -0,0 +1,27 @@ +#pragma once + + +#include <ydb/core/engine/mkql_engine_flat.h> +#include <ydb/core/scheme_types/scheme_type_registry.h> +#include <ydb/core/tablet_flat/flat_table_column.h> + +namespace NKikimr::NDataShard { + +class TKeyValidator { +public: + + struct TColumnWriteMeta { + NTable::TColumn Column; + ui32 MaxValueSizeBytes = 0; + }; + + void AddReadRange(const TTableId& tableId, const TVector<NTable::TColumn>& columns, const TTableRange& range, const TVector<NScheme::TTypeInfo>& keyTypes, ui64 itemsLimit = 0, bool reverse = false); + void AddWriteRange(const TTableId& tableId, const TTableRange& range, const TVector<NScheme::TTypeInfo>& keyTypes, const TVector<TColumnWriteMeta>& columns, bool isPureEraseOp); + + NMiniKQL::IEngineFlat::TValidationInfo& GetInfo(); + const NMiniKQL::IEngineFlat::TValidationInfo& GetInfo() const; +private: + NMiniKQL::IEngineFlat::TValidationInfo Info; +}; + +} diff --git a/ydb/core/tx/datashard/ya.make b/ydb/core/tx/datashard/ya.make index e32991e52d..cefaa714b0 100644 --- a/ydb/core/tx/datashard/ya.make +++ b/ydb/core/tx/datashard/ya.make @@ -161,6 +161,7 @@ SRCS( initiate_build_index_unit.cpp key_conflicts.cpp key_conflicts.h + key_validator.cpp load_and_wait_in_rs_unit.cpp load_tx_details_unit.cpp make_scan_snapshot_unit.cpp |