diff options
author | Igor Makunin <igor.makunin@gmail.com> | 2022-02-16 18:52:02 +0300 |
---|---|---|
committer | Igor Makunin <igor.makunin@gmail.com> | 2022-02-16 18:52:02 +0300 |
commit | 0c4c91540e1b2b874a87135cf87d22a4325ac102 (patch) | |
tree | e693cec140cc5e80f48025e2d0749f03fb205482 | |
parent | e1e97356e553c2f4a13eda00a625caa975f4ac61 (diff) | |
download | ydb-0c4c91540e1b2b874a87135cf87d22a4325ac102.tar.gz |
KIKIMR-14356: rework reads and support cdc
ref:67221a54ace9765874e54698d3ad82baacdf26a5
-rw-r--r-- | ydb/core/engine/minikql/minikql_engine_host.h | 1 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__engine_host.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard__engine_host.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_kqp_compute.cpp | 317 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_kqp_compute.h | 66 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_kqp_lookup_table.cpp | 152 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_kqp_read_table.cpp | 238 |
7 files changed, 422 insertions, 356 deletions
diff --git a/ydb/core/engine/minikql/minikql_engine_host.h b/ydb/core/engine/minikql/minikql_engine_host.h index 012ee6891b..9c6e39a364 100644 --- a/ydb/core/engine/minikql/minikql_engine_host.h +++ b/ydb/core/engine/minikql/minikql_engine_host.h @@ -123,6 +123,7 @@ public: void ExecPeriodicCallback() { if (PeriodicCallback) { PeriodicCallback();} } TEngineHostCounters& GetCounters() const { return Counters; } + const TEngineHostSettings& GetSettings() const { return Settings; } virtual TRowVersion GetWriteVersion(const TTableId& tableId) const = 0; virtual TRowVersion GetReadVersion(const TTableId& tableId) const = 0; diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp index 876c337093..5df538f438 100644 --- a/ydb/core/tx/datashard/datashard__engine_host.cpp +++ b/ydb/core/tx/datashard/datashard__engine_host.cpp @@ -541,7 +541,7 @@ TEngineBay::TEngineBay(TDataShard * self, TTransactionContext& txc, const TActor << ": " << message); }; - ComputeCtx = MakeHolder<TKqpDatashardComputeContext>(self, EngineHostCounters, now); + ComputeCtx = MakeHolder<TKqpDatashardComputeContext>(self, *EngineHost, now); ComputeCtx->Database = &txc.DB; auto kqpApplyCtx = MakeHolder<TKqpDatashardApplyContext>(); diff --git a/ydb/core/tx/datashard/datashard__engine_host.h b/ydb/core/tx/datashard/datashard__engine_host.h index 8e3064c83d..90a298cfa8 100644 --- a/ydb/core/tx/datashard/datashard__engine_host.h +++ b/ydb/core/tx/datashard/datashard__engine_host.h @@ -104,7 +104,7 @@ public: private: std::pair<ui64, ui64> StepTxId; - THolder<NMiniKQL::IEngineFlatHost> EngineHost; + THolder<NMiniKQL::TEngineHost> EngineHost; THolder<NMiniKQL::TEngineFlatSettings> EngineSettings; THolder<NMiniKQL::IEngineFlat> Engine; TValidationInfo Info; diff --git a/ydb/core/tx/datashard/datashard_kqp_compute.cpp b/ydb/core/tx/datashard/datashard_kqp_compute.cpp index 885c633780..1cefe8f239 100644 --- a/ydb/core/tx/datashard/datashard_kqp_compute.cpp +++ b/ydb/core/tx/datashard/datashard_kqp_compute.cpp @@ -148,20 +148,17 @@ const NDataShard::TUserTable* TKqpDatashardComputeContext::GetTable(const TTable return ptr->Get(); } -void TKqpDatashardComputeContext::ReadTable(const TTableId& tableId, const TTableRange& range) const { - MKQL_ENSURE_S(Shard); +void TKqpDatashardComputeContext::TouchTableRange(const TTableId& tableId, const TTableRange& range) const { Shard->SysLocksTable().SetLock(tableId, range, LockTxId); Shard->SetTableAccessTime(tableId, Now); } -void TKqpDatashardComputeContext::ReadTable(const TTableId& tableId, const TArrayRef<const TCell>& key) const { - MKQL_ENSURE_S(Shard); +void TKqpDatashardComputeContext::TouchTablePoint(const TTableId& tableId, const TArrayRef<const TCell>& key) const { Shard->SysLocksTable().SetLock(tableId, key, LockTxId); Shard->SetTableAccessTime(tableId, Now); } void TKqpDatashardComputeContext::BreakSetLocks() const { - MKQL_ENSURE_S(Shard); Shard->SysLocksTable().BreakSetLocks(LockTxId); } @@ -169,10 +166,6 @@ void TKqpDatashardComputeContext::SetLockTxId(ui64 lockTxId) { LockTxId = lockTxId; } -ui64 TKqpDatashardComputeContext::GetShardId() const { - return Shard->TabletID(); -} - void TKqpDatashardComputeContext::SetReadVersion(TRowVersion readVersion) { ReadVersion = readVersion; } @@ -228,7 +221,25 @@ bool TKqpDatashardComputeContext::PinPages(const TVector<IEngineFlat::TValidated continue; } - if (key.RowOperation != TKeyDesc::ERowOperation::Read) { + TSet<TKeyDesc::EColumnOperation> columnOpFilter; + switch (key.RowOperation) { + case TKeyDesc::ERowOperation::Read: + columnOpFilter.insert(TKeyDesc::EColumnOperation::Read); + break; + case TKeyDesc::ERowOperation::Update: + case TKeyDesc::ERowOperation::Erase: { + const auto collector = EngineHost.GetChangeCollector(key.TableId); + if (collector && collector->NeedToReadKeys()) { + columnOpFilter.insert(TKeyDesc::EColumnOperation::Set); + columnOpFilter.insert(TKeyDesc::EColumnOperation::InplaceUpdate); + } + break; + } + default: + break; + } + + if (columnOpFilter.empty()) { continue; } @@ -245,7 +256,7 @@ bool TKqpDatashardComputeContext::PinPages(const TVector<IEngineFlat::TValidated TSmallVec<NTable::TTag> columnTags; for (const auto& column : key.Columns) { - if (Y_LIKELY(column.Operation == TKeyDesc::EColumnOperation::Read)) { + if (columnOpFilter.contains(column.Operation)) { columnTags.push_back(column.Column); } } @@ -254,7 +265,7 @@ bool TKqpDatashardComputeContext::PinPages(const TVector<IEngineFlat::TValidated from, key.Range.Point ? from : to, columnTags, - 0 /* readFlags */, + EngineHost.GetSettings().DisableByKeyFilter ? (ui64)NTable::NoByKey : 0, adjustLimit(key.RangeLimits.ItemsLimit), adjustLimit(key.RangeLimits.BytesLimit), key.Reverse ? NTable::EDirection::Reverse : NTable::EDirection::Forward, @@ -274,8 +285,288 @@ bool TKqpDatashardComputeContext::PinPages(const TVector<IEngineFlat::TValidated return ret; } -void TKqpDatashardComputeContext::AddKeyAccessSample(const TTableId& tableId, const TArrayRef<const TCell>& key) { +static void BuildRowImpl(const TDbTupleRef& dbTuple, const THolderFactory& holderFactory, + const TSmallVec<TTag>& systemColumnTags, ui64 shardId, NUdf::TUnboxedValue& result, size_t& rowSize) +{ + size_t columnsCount = dbTuple.ColumnCount + systemColumnTags.size(); + + TUnboxedValue* rowItems = nullptr; + result = holderFactory.CreateDirectArrayHolder(columnsCount, rowItems); + + rowSize = 0; + for (ui32 i = 0; i < dbTuple.ColumnCount; ++i) { + const auto& cell = dbTuple.Cells()[i]; + rowSize += cell.IsNull() ? 1 : cell.Size(); + rowItems[i] = GetCellValue(cell, dbTuple.Types[i]); + } + + // Some per-row overhead to deal with the case when no columns were requested + rowSize = std::max(rowSize, (size_t) 8); + + for (ui32 i = dbTuple.ColumnCount, j = 0; i < columnsCount; ++i, ++j) { + switch (systemColumnTags[j]) { + case TKeyDesc::EColumnIdDataShard: + rowItems[i] = TUnboxedValue(TUnboxedValuePod(shardId)); + break; + default: + throw TSchemeErrorTabletException(); + } + } +} + +static void BuildRowWideImpl(const TDbTupleRef& dbTuple, const TSmallVec<TTag>& systemColumnTags, ui64 shardId, + NUdf::TUnboxedValue* const* result, size_t& rowSize) +{ + size_t columnsCount = dbTuple.ColumnCount + systemColumnTags.size(); + + rowSize = 0; + for (ui32 i = 0; i < dbTuple.ColumnCount; ++i) { + const auto& cell = dbTuple.Cells()[i]; + rowSize += cell.IsNull() ? 1 : cell.Size(); + if (auto out = *result++) { + *out = GetCellValue(cell, dbTuple.Types[i]); + } + } + + // Some per-row overhead to deal with the case when no columns were requested + rowSize = std::max(rowSize, (size_t) 8); + + for (ui32 i = dbTuple.ColumnCount, j = 0; i < columnsCount; ++i, ++j) { + auto out = *result++; + if (!out) { + continue; + } + + switch (systemColumnTags[j]) { + case TKeyDesc::EColumnIdDataShard: + *out = TUnboxedValue(TUnboxedValuePod(shardId)); + break; + default: + throw TSchemeErrorTabletException(); + } + } +} + +bool TKqpDatashardComputeContext::ReadRow(const TTableId& tableId, TArrayRef<const TCell> key, + const TSmallVec<NTable::TTag>& columnTags, const TSmallVec<NTable::TTag>& systemColumnTags, + const THolderFactory& holderFactory, NUdf::TUnboxedValue& result, TKqpTableStats& kqpStats) +{ + MKQL_ENSURE_S(Shard); + + auto localTid = Shard->GetLocalTableId(tableId); + auto tableInfo = Database->GetScheme().GetTableInfo(localTid); + MKQL_ENSURE_S(tableInfo, "Can not resolve table " << tableId); + + TSmallVec<TRawTypeValue> keyValues; + ConvertTableKeys(Database->GetScheme(), tableInfo, key, keyValues, /* keyDataBytes */ nullptr); + + if (Y_UNLIKELY(keyValues.size() != tableInfo->KeyColumns.size())) { + throw TSchemeErrorTabletException(); + } + + TouchTablePoint(tableId, key); Shard->GetKeyAccessSampler()->AddSample(tableId, key); + + NTable::TRowState dbRow; + NTable::TSelectStats stats; + ui64 flags = EngineHost.GetSettings().DisableByKeyFilter ? (ui64) NTable::NoByKey : 0; + auto ready = Database->Select(localTid, keyValues, columnTags, dbRow, stats, flags, GetReadVersion()); + + kqpStats.NSelectRow = 1; + kqpStats.InvisibleRowSkips = stats.Invisible; + + switch (ready) { + case EReady::Page: + if (auto collector = EngineHost.GetChangeCollector(tableId)) { + collector->Reset(); + } + SetTabletNotReady(); + return false; + case EReady::Gone: + return false; + case EReady::Data: + break; + }; + + MKQL_ENSURE_S(columnTags.size() == dbRow.Size(), "Invalid local db row size."); + + TVector<NScheme::TTypeId> types(columnTags.size()); + for (size_t i = 0; i < columnTags.size(); ++i) { + types[i] = tableInfo->Columns.at(columnTags[i]).PType; + } + auto dbTuple = TDbTupleRef(types.data(), (*dbRow).data(), dbRow.Size()); + + size_t rowSize = 0; + BuildRowImpl(dbTuple, holderFactory, systemColumnTags, Shard->TabletID(), result, rowSize); + + kqpStats.SelectRowRows = 1; + kqpStats.SelectRowBytes += rowSize; + + return true; +} + +TAutoPtr<NTable::TTableIt> TKqpDatashardComputeContext::CreateIterator(const TTableId& tableId, const TTableRange& range, + const TSmallVec<NTable::TTag>& columnTags) +{ + auto localTid = Shard->GetLocalTableId(tableId); + auto tableInfo = Database->GetScheme().GetTableInfo(localTid); + MKQL_ENSURE_S(tableInfo, "Can not resolve table " << tableId); + + TSmallVec<TRawTypeValue> from, to; + ConvertTableKeys(Database->GetScheme(), tableInfo, range.From, from, /* keyDataBytes */ nullptr); + ConvertTableKeys(Database->GetScheme(), tableInfo, range.To, to, /* keyDataBytes */ nullptr); + + NTable::TKeyRange keyRange; + keyRange.MinKey = from; + keyRange.MaxKey = to; + keyRange.MinInclusive = range.InclusiveFrom; + keyRange.MaxInclusive = range.InclusiveTo; + + TouchTableRange(tableId, range); + return Database->IterateRange(localTid, keyRange, columnTags, GetReadVersion()); +} + +TAutoPtr<NTable::TTableReverseIt> TKqpDatashardComputeContext::CreateReverseIterator(const TTableId& tableId, + const TTableRange& range, const TSmallVec<NTable::TTag>& columnTags) +{ + auto localTid = Shard->GetLocalTableId(tableId); + auto tableInfo = Database->GetScheme().GetTableInfo(localTid); + MKQL_ENSURE_S(tableInfo, "Can not resolve table " << tableId); + + TSmallVec<TRawTypeValue> from, to; + ConvertTableKeys(Database->GetScheme(), tableInfo, range.From, from, /* keyDataBytes */ nullptr); + ConvertTableKeys(Database->GetScheme(), tableInfo, range.To, to, /* keyDataBytes */ nullptr); + + NTable::TKeyRange keyRange; + keyRange.MinKey = from; + keyRange.MaxKey = to; + keyRange.MinInclusive = range.InclusiveFrom; + keyRange.MaxInclusive = range.InclusiveTo; + + TouchTableRange(tableId, range); + return Database->IterateRangeReverse(localTid, keyRange, columnTags, GetReadVersion()); +} + +template <typename TReadTableIterator> +bool TKqpDatashardComputeContext::ReadRowImpl(const TTableId& tableId, TReadTableIterator& iterator, + const TSmallVec<NTable::TTag>& systemColumnTags, const TSmallVec<bool>& skipNullKeys, + const THolderFactory& holderFactory, NUdf::TUnboxedValue& result, TKqpTableStats& stats) +{ + while (iterator.Next(NTable::ENext::Data) == NTable::EReady::Data) { + TDbTupleRef rowKey = iterator.GetKey(); + MKQL_ENSURE_S(skipNullKeys.size() <= rowKey.ColumnCount); + + Shard->GetKeyAccessSampler()->AddSample(tableId, rowKey.Cells()); + + bool skipRow = false; + for (ui32 i = 0; i < skipNullKeys.size(); ++i) { + if (skipNullKeys[i] && rowKey.Columns[i].IsNull()) { + skipRow = true; + break; + } + } + + if (skipRow) { + continue; + } + + TDbTupleRef rowValues = iterator.GetValues(); + size_t rowSize = 0; + + BuildRowImpl(rowValues, holderFactory, systemColumnTags, Shard->TabletID(), result, rowSize); + + stats.SelectRangeRows = 1; + stats.SelectRangeBytes = rowSize; + + stats.InvisibleRowSkips = std::exchange(iterator.Stats.InvisibleRowSkips, 0); + stats.SelectRangeDeletedRowSkips = std::exchange(iterator.Stats.DeletedRowSkips, 0); + + return true; + } + + if (iterator.Last() == NTable::EReady::Page) { + if (auto collector = EngineHost.GetChangeCollector(tableId)) { + collector->Reset(); + } + SetTabletNotReady(); + } + + return false; +} + +template <typename TReadTableIterator> +bool TKqpDatashardComputeContext::ReadRowWideImpl(const TTableId& tableId, TReadTableIterator& iterator, + const TSmallVec<NTable::TTag>& systemColumnTags, const TSmallVec<bool>& skipNullKeys, + NUdf::TUnboxedValue* const* result, TKqpTableStats& stats) +{ + while (iterator.Next(NTable::ENext::Data) == NTable::EReady::Data) { + TDbTupleRef rowKey = iterator.GetKey(); + MKQL_ENSURE_S(skipNullKeys.size() <= rowKey.ColumnCount); + + Shard->GetKeyAccessSampler()->AddSample(tableId, rowKey.Cells()); + + bool skipRow = false; + for (ui32 i = 0; i < skipNullKeys.size(); ++i) { + if (skipNullKeys[i] && rowKey.Columns[i].IsNull()) { + skipRow = true; + break; + } + } + + if (skipRow) { + continue; + } + + TDbTupleRef rowValues = iterator.GetValues(); + size_t rowSize = 0; + + BuildRowWideImpl(rowValues, systemColumnTags, Shard->TabletID(), result, rowSize); + + stats.SelectRangeRows = 1; + stats.SelectRangeBytes = rowSize; + + stats.InvisibleRowSkips = std::exchange(iterator.Stats.InvisibleRowSkips, 0); + stats.SelectRangeDeletedRowSkips = std::exchange(iterator.Stats.DeletedRowSkips, 0); + + return true; + } + + if (iterator.Last() == NTable::EReady::Page) { + if (auto collector = EngineHost.GetChangeCollector(tableId)) { + collector->Reset(); + } + SetTabletNotReady(); + } + + return false; +} + +bool TKqpDatashardComputeContext::ReadRow(const TTableId& tableId, NTable::TTableIt& iterator, + const TSmallVec<NTable::TTag>& systemColumnTags, const TSmallVec<bool>& skipNullKeys, + const THolderFactory& holderFactory, NUdf::TUnboxedValue& result, TKqpTableStats& stats) +{ + return ReadRowImpl(tableId, iterator, systemColumnTags, skipNullKeys, holderFactory, result, stats); +} + +bool TKqpDatashardComputeContext::ReadRow(const TTableId& tableId, NTable::TTableReverseIt& iterator, + const TSmallVec<NTable::TTag>& systemColumnTags, const TSmallVec<bool>& skipNullKeys, + const THolderFactory& holderFactory, NUdf::TUnboxedValue& result, TKqpTableStats& stats) +{ + return ReadRowImpl(tableId, iterator, systemColumnTags, skipNullKeys, holderFactory, result, stats); +} + +bool TKqpDatashardComputeContext::ReadRowWide(const TTableId& tableId, NTable::TTableIt& iterator, + const TSmallVec<NTable::TTag>& systemColumnTags, const TSmallVec<bool>& skipNullKeys, + NUdf::TUnboxedValue* const* result, TKqpTableStats& stats) +{ + return ReadRowWideImpl(tableId, iterator, systemColumnTags, skipNullKeys,result, stats); +} + +bool TKqpDatashardComputeContext::ReadRowWide(const TTableId& tableId, NTable::TTableReverseIt& iterator, + const TSmallVec<NTable::TTag>& systemColumnTags, const TSmallVec<bool>& skipNullKeys, + NUdf::TUnboxedValue* const* result, TKqpTableStats& stats) +{ + return ReadRowWideImpl(tableId, iterator, systemColumnTags, skipNullKeys, result, stats); } } // namespace NMiniKQL diff --git a/ydb/core/tx/datashard/datashard_kqp_compute.h b/ydb/core/tx/datashard/datashard_kqp_compute.h index 1862763081..35346b67fa 100644 --- a/ydb/core/tx/datashard/datashard_kqp_compute.h +++ b/ydb/core/tx/datashard/datashard_kqp_compute.h @@ -22,19 +22,16 @@ using TKqpTableStats = TEngineHostCounters; class TKqpDatashardComputeContext : public TKqpComputeContextBase { public: - TKqpDatashardComputeContext(NDataShard::TDataShard* shard, TEngineHostCounters& counters, TInstant now) + TKqpDatashardComputeContext(NDataShard::TDataShard* shard, TEngineHost& engineHost, TInstant now) : Shard(shard) - , DatashardCounters(counters) + , EngineHost(engineHost) , Now(now) {} ui64 GetLocalTableId(const TTableId& tableId) const; TString GetTablePath(const TTableId& tableId) const; const NDataShard::TUserTable* GetTable(const TTableId& tableId) const; - void ReadTable(const TTableId& tableId, const TTableRange& range) const; - void ReadTable(const TTableId& tableId, const TArrayRef<const TCell>& key) const; void BreakSetLocks() const; void SetLockTxId(ui64 lockTxId); - ui64 GetShardId() const; TVector<std::pair<NScheme::TTypeId, TString>> GetKeyColumnsInfo(const TTableId &tableId) const; THashMap<TString, NScheme::TTypeId> GetKeyColumnsMap(const TTableId &tableId) const; @@ -53,12 +50,51 @@ public: TRowVersion GetReadVersion() const; TEngineHostCounters& GetTaskCounters(ui64 taskId) { return TaskCounters[taskId]; } - TEngineHostCounters& GetDatashardCounters() { return DatashardCounters; } + TEngineHostCounters& GetDatashardCounters() { return EngineHost.GetCounters(); } - void SetTabletNotReady() { Y_VERIFY_DEBUG(!TabletNotReady); TabletNotReady = true; }; bool IsTabletNotReady() const { return TabletNotReady; } - void AddKeyAccessSample(const TTableId& tableId, const TArrayRef<const TCell>& key); + bool ReadRow(const TTableId& tableId, TArrayRef<const TCell> key, const TSmallVec<NTable::TTag>& columnTags, + const TSmallVec<NTable::TTag>& systemColumnTags, const THolderFactory& holderFactory, + NUdf::TUnboxedValue& result, TKqpTableStats& stats); + + TAutoPtr<NTable::TTableIt> CreateIterator(const TTableId& tableId, const TTableRange& range, + const TSmallVec<NTable::TTag>& columnTags); + + TAutoPtr<NTable::TTableReverseIt> CreateReverseIterator(const TTableId& tableId, const TTableRange& range, + const TSmallVec<NTable::TTag>& columnTags); + + bool ReadRow(const TTableId& tableId, NTable::TTableIt& iterator, + const TSmallVec<NTable::TTag>& systemColumnTags, const TSmallVec<bool>& skipNullKeys, + const THolderFactory& holderFactory, NUdf::TUnboxedValue& result, TKqpTableStats& stats); + + bool ReadRow(const TTableId& tableId, NTable::TTableReverseIt& iterator, + const TSmallVec<NTable::TTag>& systemColumnTags, const TSmallVec<bool>& skipNullKeys, + const THolderFactory& holderFactory, NUdf::TUnboxedValue& result, TKqpTableStats& stats); + + bool ReadRowWide(const TTableId& tableId, NTable::TTableIt& iterator, + const TSmallVec<NTable::TTag>& systemColumnTags, const TSmallVec<bool>& skipNullKeys, + NUdf::TUnboxedValue* const* result, TKqpTableStats& stats); + + bool ReadRowWide(const TTableId& tableId, NTable::TTableReverseIt& iterator, + const TSmallVec<NTable::TTag>& systemColumnTags, const TSmallVec<bool>& skipNullKeys, + NUdf::TUnboxedValue* const* result, TKqpTableStats& stats); + +private: + void TouchTableRange(const TTableId& tableId, const TTableRange& range) const; + void TouchTablePoint(const TTableId& tableId, const TArrayRef<const TCell>& key) const; + + template <typename TReadTableIterator> + bool ReadRowImpl(const TTableId& tableId, TReadTableIterator& iterator, + const TSmallVec<NTable::TTag>& systemColumnTags, const TSmallVec<bool>& skipNullKeys, + const THolderFactory& holderFactory, NUdf::TUnboxedValue& result, TKqpTableStats& stats); + + template <typename TReadTableIterator> + bool ReadRowWideImpl(const TTableId& tableId, TReadTableIterator& iterator, + const TSmallVec<NTable::TTag>& systemColumnTags, const TSmallVec<bool>& skipNullKeys, + NUdf::TUnboxedValue* const* result, TKqpTableStats& stats); + + void SetTabletNotReady() { Y_VERIFY_DEBUG(!TabletNotReady); TabletNotReady = true; }; public: NTable::TDatabase* Database = nullptr; @@ -66,7 +102,7 @@ public: private: NDataShard::TDataShard* Shard; std::unordered_map<ui64, TEngineHostCounters> TaskCounters; - TEngineHostCounters& DatashardCounters; + TEngineHost& EngineHost; TInstant Now; ui64 LockTxId = 0; bool PersistentChannels = false; @@ -82,18 +118,6 @@ public: TSmallVec<NTable::TTag> ExtractTags(const TSmallVec<TKqpComputeContextBase::TColumn>& columns); -bool TryFetchRow(const TTableId& tableId, NTable::TTableIt& iterator, NYql::NUdf::TUnboxedValue& row, - TComputationContext& ctx, TKqpTableStats& tableStats, TKqpDatashardComputeContext& computeCtx, - const TSmallVec<NTable::TTag>& systemColumnTags, const TSmallVec<bool>& skipNullKeys); - -bool TryFetchRow(const TTableId& tableId, NTable::TTableReverseIt& iterator, NYql::NUdf::TUnboxedValue& row, - TComputationContext& ctx, TKqpTableStats& tableStats, TKqpDatashardComputeContext& computeCtx, - const TSmallVec<NTable::TTag>& systemColumnTags, const TSmallVec<bool>& skipNullKeys); - -void FetchRow(const TDbTupleRef& dbTuple, NYql::NUdf::TUnboxedValue& row, TComputationContext& ctx, - TKqpTableStats& tableStats, const TKqpDatashardComputeContext& computeCtx, - const TSmallVec<NTable::TTag>& systemColumnTags); - IComputationNode* WrapKqpWideReadTableRanges(TCallable& callable, const TComputationNodeFactoryContext& ctx, TKqpDatashardComputeContext& computeCtx); IComputationNode* WrapKqpLookupTable(TCallable& callable, const TComputationNodeFactoryContext& ctx, diff --git a/ydb/core/tx/datashard/datashard_kqp_lookup_table.cpp b/ydb/core/tx/datashard/datashard_kqp_lookup_table.cpp index 7d6ed38420..ee179cd91c 100644 --- a/ydb/core/tx/datashard/datashard_kqp_lookup_table.cpp +++ b/ydb/core/tx/datashard/datashard_kqp_lookup_table.cpp @@ -88,94 +88,62 @@ public: , TypeEnv(typeEnv) , ParseResult(parseResult) , LookupKeysNode(lookupKeysNode) - , LocalTid(ComputeCtx.GetLocalTableId(ParseResult.TableId)) , ColumnTags(ExtractTags(ParseResult.Columns)) , SystemColumnTags(ExtractTags(ParseResult.SystemColumns)) , ShardTableStats(ComputeCtx.GetDatashardCounters()) , TaskTableStats(ComputeCtx.GetTaskCounters(ComputeCtx.GetCurrentTaskId())) - , TableInfo(ComputeCtx.Database->GetScheme().GetTableInfo(LocalTid)) { - MKQL_ENSURE_S(TableInfo); - - MKQL_ENSURE_S(TableInfo->KeyColumns.size() == ParseResult.KeyIndices.size(), + auto localTid = ComputeCtx.GetLocalTableId(ParseResult.TableId); + auto tableInfo = ComputeCtx.Database->GetScheme().GetTableInfo(localTid); + MKQL_ENSURE_S(tableInfo, "Unknown table " << ParseResult.TableId); + MKQL_ENSURE_S(tableInfo->KeyColumns.size() == ParseResult.KeyIndices.size(), "Incomplete row key in LookupRows."); - - CellTypes.reserve(ColumnTags.size()); - for (size_t i = 0; i < ColumnTags.size(); ++i) { - CellTypes.emplace_back(TableInfo->Columns.at(ColumnTags[i]).PType); - } } TUnboxedValue DoCalculate(TComputationContext& ctx) const { auto keysValues = LookupKeysNode->GetValue(ctx); - while (!Finished) { + while (true) { NUdf::TUnboxedValue key; - auto status = keysValues.Fetch(key); - switch (status) { + + switch (keysValues.Fetch(key)) { case NUdf::EFetchStatus::Ok: { - TVector<TCell> keyCells(TableInfo->KeyColumns.size()); + TVector<TCell> keyCells(ParseResult.KeyIndices.size()); FillKeyTupleValue(key, ParseResult.KeyIndices, ParseResult.KeyTypes, keyCells, TypeEnv); - TSmallVec<TRawTypeValue> keyValues; - ConvertTableKeys(ComputeCtx.Database->GetScheme(), TableInfo, keyCells, keyValues, nullptr); + NUdf::TUnboxedValue result; + TKqpTableStats stats; + bool fetched = ComputeCtx.ReadRow(ParseResult.TableId, keyCells, ColumnTags, SystemColumnTags, + ctx.HolderFactory, result, stats); - if (keyValues.size() != TableInfo->KeyColumns.size()) { - throw TSchemeErrorTabletException(); + if (stats.InvisibleRowSkips) { + ComputeCtx.BreakSetLocks(); } - ComputeCtx.ReadTable(ParseResult.TableId, keyCells); - ComputeCtx.AddKeyAccessSample(ParseResult.TableId, keyCells); + ShardTableStats += stats; + TaskTableStats += stats; - NTable::TRowState dbRow; - NTable::TSelectStats stats; - ui64 flags = 0; // TODO: Check DisableByKeyFilter - auto ready = ComputeCtx.Database->Select(LocalTid, keyValues, ColumnTags, dbRow, stats, flags, ComputeCtx.GetReadVersion()); - if (stats.Invisible) - ComputeCtx.BreakSetLocks(); + if (fetched) { + return std::move(result); + } - switch (ready) { - case EReady::Page: - ComputeCtx.SetTabletNotReady(); - return TUnboxedValue::MakeYield(); - case EReady::Gone: - continue; - case EReady::Data: - break; - default: - MKQL_ENSURE_S(false, "Unexpected local db select status: " << (ui32)ready); - }; - - MKQL_ENSURE_S(CellTypes.size() == dbRow.Size(), "Invalid local db row size."); - - TDbTupleRef dbTuple(CellTypes.data(), (*dbRow).data(), dbRow.Size()); - TUnboxedValue result; - TKqpTableStats tableStats; - FetchRow(dbTuple, result, ctx, tableStats, ComputeCtx, SystemColumnTags); - - ShardTableStats.NSelectRow++; - ShardTableStats.SelectRowRows++; - ShardTableStats.SelectRowBytes += tableStats.SelectRowBytes; - - TaskTableStats.NSelectRow++; - TaskTableStats.SelectRowRows++; - TaskTableStats.SelectRowBytes += tableStats.SelectRowBytes; - - return result; + if (ComputeCtx.IsTabletNotReady()) { + return NUdf::TUnboxedValue::MakeYield(); + } + + continue; } case NUdf::EFetchStatus::Finish: - Finished = true; return TUnboxedValue::MakeFinish(); case NUdf::EFetchStatus::Yield: + MKQL_ENSURE_S(false); return TUnboxedValue::MakeYield(); } - - MKQL_ENSURE_S(false, "Unexpected key fetch status: " << (ui32)status); } - return TUnboxedValue::MakeFinish(); + Y_UNREACHABLE(); } private: @@ -188,14 +156,10 @@ private: const TTypeEnvironment& TypeEnv; TParseLookupTableResult ParseResult; IComputationNode* LookupKeysNode; - ui64 LocalTid; TSmallVec<TTag> ColumnTags; TSmallVec<TTag> SystemColumnTags; TKqpTableStats& ShardTableStats; TKqpTableStats& TaskTableStats; - const NTable::TScheme::TTableInfo* TableInfo; - TSmallVec<NScheme::TTypeId> CellTypes; - mutable bool Finished = false; }; class TKqpLookupTableWrapper : public TStatelessFlowComputationNode<TKqpLookupTableWrapper> { @@ -209,18 +173,13 @@ public: , TypeEnv(typeEnv) , ParseResult(parseResult) , LookupKeysNode(lookupKeysNode) - , LocalTid(ComputeCtx.GetLocalTableId(ParseResult.TableId)) , ColumnTags(ExtractTags(ParseResult.Columns)) , SystemColumnTags(ExtractTags(ParseResult.SystemColumns)) , ShardTableStats(ComputeCtx.GetDatashardCounters()) - , TaskTableStats(ComputeCtx.GetTaskCounters(computeCtx.GetCurrentTaskId())) - , TableInfo(ComputeCtx.Database->GetScheme().GetTableInfo(LocalTid)) - { - MKQL_ENSURE_S(TableInfo); - } + , TaskTableStats(ComputeCtx.GetTaskCounters(computeCtx.GetCurrentTaskId())) {} TUnboxedValue DoCalculate(TComputationContext& ctx) const { - while (!Finished) { + while (true) { if (!Iterator) { auto keysValues = LookupKeysNode->GetValue(ctx); @@ -229,7 +188,11 @@ public: switch (status) { case NUdf::EFetchStatus::Ok: { - TVector<TCell> fromCells(TableInfo->KeyColumns.size()); + auto localTid = ComputeCtx.GetLocalTableId(ParseResult.TableId); + auto tableInfo = ComputeCtx.Database->GetScheme().GetTableInfo(localTid); + MKQL_ENSURE_S(tableInfo); + + TVector<TCell> fromCells(tableInfo->KeyColumns.size()); FillKeyTupleValue(key, ParseResult.KeyIndices, ParseResult.KeyTypes, fromCells, TypeEnv); TVector<TCell> toCells(ParseResult.KeyIndices.size()); @@ -237,64 +200,48 @@ public: auto range = TTableRange(fromCells, true, toCells, true); - TSmallVec<TRawTypeValue> from, to; - ConvertTableKeys(ComputeCtx.Database->GetScheme(), TableInfo, range.From, from, nullptr); - ConvertTableKeys(ComputeCtx.Database->GetScheme(), TableInfo, range.To, to, nullptr); + Iterator = ComputeCtx.CreateIterator(ParseResult.TableId, range, ColumnTags); - NTable::TKeyRange keyRange; - keyRange.MinKey = from; - keyRange.MaxKey = to; - keyRange.MinInclusive = range.InclusiveFrom; - keyRange.MaxInclusive = range.InclusiveTo; - - ComputeCtx.ReadTable(ParseResult.TableId, range); - Iterator = ComputeCtx.Database->IterateRange(LocalTid, keyRange, ColumnTags, ComputeCtx.GetReadVersion()); + ShardTableStats.NSelectRange++; + TaskTableStats.NSelectRange++; break; } case NUdf::EFetchStatus::Finish: - Finished = true; return TUnboxedValue::MakeFinish(); case NUdf::EFetchStatus::Yield: + MKQL_ENSURE_S(false); return TUnboxedValue::MakeYield(); } } TUnboxedValue result; - TKqpTableStats tableStats; - auto fetched = TryFetchRow(ParseResult.TableId, *Iterator, result, ctx, tableStats, ComputeCtx, - SystemColumnTags, ParseResult.SkipNullKeys); + TKqpTableStats stats; - if (Iterator->Stats.InvisibleRowSkips) { - ComputeCtx.BreakSetLocks(); - } - - ShardTableStats += tableStats; - TaskTableStats += tableStats; + bool fetched = ComputeCtx.ReadRow(ParseResult.TableId, *Iterator, SystemColumnTags, + ParseResult.SkipNullKeys, ctx.HolderFactory, result, stats); - ui64 deletedRowSkips = std::exchange(Iterator->Stats.DeletedRowSkips, 0); - ui64 invisibleRowSkips = std::exchange(Iterator->Stats.InvisibleRowSkips, 0); - ShardTableStats.SelectRangeDeletedRowSkips += deletedRowSkips; - ShardTableStats.InvisibleRowSkips += invisibleRowSkips; + if (stats.InvisibleRowSkips) { + ComputeCtx.BreakSetLocks(); + } - TaskTableStats.SelectRangeDeletedRowSkips += deletedRowSkips; - TaskTableStats.InvisibleRowSkips += invisibleRowSkips; + ShardTableStats += stats; + TaskTableStats += stats; if (fetched) { return result; } - if (Iterator->Last() == NTable::EReady::Page) { - ComputeCtx.SetTabletNotReady(); - return TUnboxedValue::MakeYield(); + if (ComputeCtx.IsTabletNotReady()) { + return NUdf::TUnboxedValue::MakeYield(); } Iterator = nullptr; } - return TUnboxedValue::MakeFinish(); + Y_UNREACHABLE(); } private: @@ -307,14 +254,11 @@ private: const TTypeEnvironment& TypeEnv; TParseLookupTableResult ParseResult; IComputationNode* LookupKeysNode; - ui64 LocalTid; TSmallVec<TTag> ColumnTags; TSmallVec<TTag> SystemColumnTags; TKqpTableStats& ShardTableStats; TKqpTableStats& TaskTableStats; - const NTable::TScheme::TTableInfo* TableInfo; mutable TAutoPtr<NTable::TTableIt> Iterator; - mutable bool Finished = false; }; IComputationNode* WrapKqpLookupTableInternal(TCallable& callable, const TComputationNodeFactoryContext& ctx, diff --git a/ydb/core/tx/datashard/datashard_kqp_read_table.cpp b/ydb/core/tx/datashard/datashard_kqp_read_table.cpp index ded2e109aa..f3d5848c4c 100644 --- a/ydb/core/tx/datashard/datashard_kqp_read_table.cpp +++ b/ydb/core/tx/datashard/datashard_kqp_read_table.cpp @@ -74,18 +74,6 @@ TMaybe<TKeyRangesType> ParseKeyRangesType(const TTupleType* rangeTupleType) { }; } -TSerializedTableRange CreateTableRange(const TParseReadTableResult& parseResult, const IComputationNode* fromNode, - const IComputationNode* toNode, const TTypeEnvironment& typeEnv, TComputationContext& ctx) -{ - TVector<TCell> fromCells; - BuildKeyTupleCells(parseResult.FromTuple->GetType(), fromNode->GetValue(ctx), fromCells, typeEnv); - - TVector<TCell> toCells; - BuildKeyTupleCells(parseResult.ToTuple->GetType(), toNode->GetValue(ctx), toCells, typeEnv); - - return TSerializedTableRange(fromCells, parseResult.FromInclusive, toCells, parseResult.ToInclusive); -} - TSerializedTableRange BuildFullRange(ui32 keyColumnsSize) { /* Build range from NULL, ... NULL to +inf, ... +inf */ TVector<TCell> fromKeyValues(keyColumnsSize); @@ -183,15 +171,6 @@ TVector<TSerializedTableRange> CreateTableRanges(const TParseReadTableRangesResu return ranges; } -void CreateRangePoints(ui64 localTid, const TSerializedTableRange& serializedTableRange, TSmallVec<TRawTypeValue>& from, - TSmallVec<TRawTypeValue>& to, TKqpDatashardComputeContext& computeCtx) -{ - const auto* tableInfo = computeCtx.Database->GetScheme().GetTableInfo(localTid); - auto tableRange = serializedTableRange.ToTableRange(); - ConvertTableKeys(computeCtx.Database->GetScheme(), tableInfo, tableRange.From, from, nullptr); - ConvertTableKeys(computeCtx.Database->GetScheme(), tableInfo, tableRange.To, to, nullptr); -} - template <bool IsReverse> class TKqpWideReadTableWrapperBase : public TStatelessWideFlowCodegeneratorNode<TKqpWideReadTableWrapperBase<IsReverse>> { public: @@ -222,94 +201,25 @@ protected: virtual EFetchResult ReadValue(TComputationContext& ctx, NUdf::TUnboxedValue* const* output) const = 0; EFetchResult ReadNext(NUdf::TUnboxedValue* const* output) const { - bool breakLocks = false; - while (Iterator->Next(NTable::ENext::Data) == NTable::EReady::Data) { - if (!breakLocks && (breakLocks = bool(Iterator->Stats.InvisibleRowSkips))) { - ComputeCtx.BreakSetLocks(); - } - TDbTupleRef rowKey = Iterator->GetKey(); - - ComputeCtx.AddKeyAccessSample(TableId, rowKey.Cells()); - - ui64 deletedRowSkips = std::exchange(Iterator->Stats.DeletedRowSkips, 0); - ui64 invisibleRowSkips = std::exchange(Iterator->Stats.InvisibleRowSkips, 0); - - ShardTableStats.SelectRangeDeletedRowSkips += deletedRowSkips; - ShardTableStats.InvisibleRowSkips += invisibleRowSkips; - - TaskTableStats.SelectRangeDeletedRowSkips += deletedRowSkips; - TaskTableStats.InvisibleRowSkips += invisibleRowSkips; - - Y_VERIFY(SkipNullKeys.size() <= rowKey.ColumnCount); - bool skipRow = false; - for (ui32 i = 0; i < SkipNullKeys.size(); ++i) { - if (SkipNullKeys[i] && rowKey.Columns[i].IsNull()) { - skipRow = true; - break; - } - } - if (skipRow) { - continue; - } + TKqpTableStats stats; + bool fetched = ComputeCtx.ReadRowWide(TableId, *Iterator, SystemColumnTags, SkipNullKeys, output, stats); - TDbTupleRef rowValues = Iterator->GetValues(); - - size_t columnsCount = rowValues.ColumnCount + SystemColumnTags.size(); - - ui64 rowSize = 0; - for (ui32 i = 0; i < rowValues.ColumnCount; ++i) { - rowSize += rowValues.Columns[i].IsNull() ? 1 : rowValues.Columns[i].Size(); - if (auto out = *output++) { - *out = GetCellValue(rowValues.Cells()[i], rowValues.Types[i]); - } - } - - // Some per-row overhead to deal with the case when no columns were requested - rowSize = std::max(rowSize, (ui64) 8); + if (stats.InvisibleRowSkips) { + ComputeCtx.BreakSetLocks(); + } - for (ui32 i = rowValues.ColumnCount, j = 0; i < columnsCount; ++i, ++j) { - auto out = *output++; - if (!out) { - continue; - } - - switch (SystemColumnTags[j]) { - case TKeyDesc::EColumnIdDataShard: - *out = TUnboxedValue(TUnboxedValuePod(ComputeCtx.GetShardId())); - break; - default: - throw TSchemeErrorTabletException(); - } - } + ShardTableStats += stats; + TaskTableStats += stats; + if (fetched) { if (Remains) { Remains = *Remains - 1; } - ShardTableStats.SelectRangeRows++; - ShardTableStats.SelectRangeBytes += rowSize; - - TaskTableStats.SelectRangeRows++; - TaskTableStats.SelectRangeBytes += rowSize; - return EFetchResult::One; } - if (!breakLocks && bool(Iterator->Stats.InvisibleRowSkips)) { - ComputeCtx.BreakSetLocks(); - } - - auto deletedRowSkips = std::exchange(Iterator->Stats.DeletedRowSkips, 0); - auto invisibleRowSkips = std::exchange(Iterator->Stats.InvisibleRowSkips, 0); - - ShardTableStats.SelectRangeDeletedRowSkips += deletedRowSkips; - ShardTableStats.InvisibleRowSkips += invisibleRowSkips; - - TaskTableStats.SelectRangeDeletedRowSkips += deletedRowSkips; - TaskTableStats.InvisibleRowSkips += invisibleRowSkips; - - if (Iterator->Last() == NTable::EReady::Page) { - ComputeCtx.SetTabletNotReady(); + if (ComputeCtx.IsTabletNotReady()) { return EFetchResult::Yield; } @@ -341,7 +251,6 @@ public: , FromNode(fromNode) , ToNode(toNode) , ItemsLimit(itemsLimit) - , LocalTid(computeCtx.GetLocalTableId(parseResult.TableId)) , ColumnTags(ExtractTags(parseResult.Columns)) { this->ShardTableStats.NSelectRange++; @@ -351,27 +260,22 @@ public: private: EFetchResult ReadValue(TComputationContext& ctx, NUdf::TUnboxedValue* const* output) const final { if (!this->Iterator) { - auto serializedTableRange = CreateTableRange(ParseResult, FromNode, ToNode, this->TypeEnv, ctx); - auto tableRange = serializedTableRange.ToTableRange(); - this->ComputeCtx.ReadTable(ParseResult.TableId, tableRange); + TVector<TCell> fromCells; + BuildKeyTupleCells(ParseResult.FromTuple->GetType(), FromNode->GetValue(ctx), fromCells, this->TypeEnv); - TSmallVec<TRawTypeValue> from, to; - CreateRangePoints(LocalTid, serializedTableRange, from, to, this->ComputeCtx); + TVector<TCell> toCells; + BuildKeyTupleCells(ParseResult.ToTuple->GetType(), ToNode->GetValue(ctx), toCells, this->TypeEnv); - NTable::TKeyRange keyRange; - keyRange.MinKey = from; - keyRange.MaxKey = to; - keyRange.MinInclusive = tableRange.InclusiveFrom; - keyRange.MaxInclusive = tableRange.InclusiveTo; + auto range = TTableRange(fromCells, ParseResult.FromInclusive, toCells, ParseResult.ToInclusive); if (ItemsLimit) { this->Remains = ItemsLimit->GetValue(ctx).Get<ui64>(); } if constexpr (IsReverse) { - this->Iterator = this->ComputeCtx.Database->IterateRangeReverse(LocalTid, keyRange, ColumnTags, this->ComputeCtx.GetReadVersion()); + this->Iterator = this->ComputeCtx.CreateReverseIterator(ParseResult.TableId, range, ColumnTags); } else { - this->Iterator = this->ComputeCtx.Database->IterateRange(LocalTid, keyRange, ColumnTags, this->ComputeCtx.GetReadVersion()); + this->Iterator = this->ComputeCtx.CreateIterator(ParseResult.TableId, range, ColumnTags); } } @@ -396,9 +300,7 @@ private: IComputationNode* FromNode; IComputationNode* ToNode; IComputationNode* ItemsLimit; - ui64 LocalTid; TSmallVec<TTag> ColumnTags; - ui64 TaskId; }; template <bool IsReverse> @@ -411,14 +313,13 @@ public: , ParseResult(parseResult) , RangesNode(rangesNode) , ItemsLimit(itemsLimit) - , LocalTid(computeCtx.GetLocalTableId(parseResult.TableId)) - , ColumnTags(ExtractTags(parseResult.Columns)) { - } + , ColumnTags(ExtractTags(parseResult.Columns)) {} private: EFetchResult ReadValue(TComputationContext& ctx, NUdf::TUnboxedValue* const* output) const final { if (!RangeId) { - const auto* tableInfo = this->ComputeCtx.Database->GetScheme().GetTableInfo(LocalTid); + const auto localTid = this->ComputeCtx.GetLocalTableId(ParseResult.TableId); + const auto* tableInfo = this->ComputeCtx.Database->GetScheme().GetTableInfo(localTid); Ranges = CreateTableRanges<IsReverse>(ParseResult, RangesNode, this->TypeEnv, ctx, tableInfo->KeyColumns.size()); RangeId = 0; @@ -433,23 +334,12 @@ private: this->TaskTableStats.NSelectRange++; if (!this->Iterator) { - auto& range = Ranges[*RangeId]; - auto tableRange = range.ToTableRange(); - this->ComputeCtx.ReadTable(ParseResult.TableId, tableRange); - - TSmallVec<TRawTypeValue> from, to; - CreateRangePoints(LocalTid, range, from, to, this->ComputeCtx); - - NTable::TKeyRange keyRange; - keyRange.MinKey = from; - keyRange.MaxKey = to; - keyRange.MinInclusive = tableRange.InclusiveFrom; - keyRange.MaxInclusive = tableRange.InclusiveTo; + auto range = Ranges[*RangeId].ToTableRange(); if constexpr (IsReverse) { - this->Iterator = this->ComputeCtx.Database->IterateRangeReverse(LocalTid, keyRange, ColumnTags, this->ComputeCtx.GetReadVersion()); + this->Iterator = this->ComputeCtx.CreateReverseIterator(ParseResult.TableId, range, ColumnTags); } else { - this->Iterator = this->ComputeCtx.Database->IterateRange(LocalTid, keyRange, ColumnTags, this->ComputeCtx.GetReadVersion()); + this->Iterator = this->ComputeCtx.CreateIterator(ParseResult.TableId, range, ColumnTags); } } @@ -479,95 +369,11 @@ private: TParseReadTableRangesResult ParseResult; IComputationNode* RangesNode; IComputationNode* ItemsLimit; - ui64 LocalTid; TSmallVec<TTag> ColumnTags; - ui64 TaskId; mutable TVector<TSerializedTableRange> Ranges; mutable std::optional<ui32> RangeId; }; -void FetchRowImpl(const TDbTupleRef& dbTuple, TUnboxedValue& row, TComputationContext& ctx, TKqpTableStats& tableStats, - const TKqpDatashardComputeContext& computeCtx, const TSmallVec<TTag>& systemColumnTags) -{ - size_t columnsCount = dbTuple.ColumnCount + systemColumnTags.size(); - - TUnboxedValue* rowItems = nullptr; - row = ctx.HolderFactory.CreateDirectArrayHolder(columnsCount, rowItems); - - ui64 rowSize = 0; - for (ui32 i = 0; i < dbTuple.ColumnCount; ++i) { - rowSize += dbTuple.Columns[i].IsNull() ? 1 : dbTuple.Columns[i].Size(); - rowItems[i] = GetCellValue(dbTuple.Cells()[i], dbTuple.Types[i]); - } - - // Some per-row overhead to deal with the case when no columns were requested - rowSize = std::max(rowSize, (ui64)8); - - for (ui32 i = dbTuple.ColumnCount, j = 0; i < columnsCount; ++i, ++j) { - switch (systemColumnTags[j]) { - case TKeyDesc::EColumnIdDataShard: - rowItems[i] = TUnboxedValue(TUnboxedValuePod(computeCtx.GetShardId())); - break; - default: - throw TSchemeErrorTabletException(); - } - } - - tableStats.NSelectRow++; - tableStats.SelectRowRows++; - tableStats.SelectRowBytes += rowSize; -} - -template <typename TTableIterator> -bool TryFetchRowImpl(const TTableId& tableId, TTableIterator& iterator, TUnboxedValue& row, TComputationContext& ctx, - TKqpTableStats& tableStats, TKqpDatashardComputeContext& computeCtx, const TSmallVec<TTag>& systemColumnTags, - const TSmallVec<bool>& skipNullKeys) -{ - while (iterator.Next(NTable::ENext::Data) == NTable::EReady::Data) { - TDbTupleRef rowKey = iterator.GetKey(); - computeCtx.AddKeyAccessSample(tableId, rowKey.Cells()); - - Y_VERIFY(skipNullKeys.size() <= rowKey.ColumnCount); - bool skipRow = false; - for (ui32 i = 0; i < skipNullKeys.size(); ++i) { - if (skipNullKeys[i] && rowKey.Columns[i].IsNull()) { - skipRow = true; - break; - } - } - - if (skipRow) { - continue; - } - - TDbTupleRef rowValues = iterator.GetValues(); - FetchRowImpl(rowValues, row, ctx, tableStats, computeCtx, systemColumnTags); - return true; - } - - return false; -} - -} // namespace - -bool TryFetchRow(const TTableId& tableId, TTableIt& iterator, TUnboxedValue& row, TComputationContext& ctx, - TKqpTableStats& tableStats, TKqpDatashardComputeContext& computeCtx, const TSmallVec<TTag>& systemColumnTags, - const TSmallVec<bool>& skipNullKeys) -{ - return TryFetchRowImpl(tableId, iterator, row, ctx, tableStats, computeCtx, systemColumnTags, skipNullKeys); -} - -bool TryFetchRow(const TTableId& tableId, TTableReverseIt& iterator, TUnboxedValue& row, TComputationContext& ctx, - TKqpTableStats& tableStats, TKqpDatashardComputeContext& computeCtx, const TSmallVec<TTag>& systemColumnTags, - const TSmallVec<bool>& skipNullKeys) -{ - return TryFetchRowImpl(tableId, iterator, row, ctx, tableStats, computeCtx, systemColumnTags, skipNullKeys); -} - -void FetchRow(const TDbTupleRef& dbTuple, TUnboxedValue& row, TComputationContext& ctx, TKqpTableStats& tableStats, - const TKqpDatashardComputeContext& computeCtx, const TSmallVec<TTag>& systemColumnTags) -{ - return FetchRowImpl(dbTuple, row, ctx, tableStats, computeCtx, systemColumnTags); } IComputationNode* WrapKqpWideReadTableRanges(TCallable& callable, const TComputationNodeFactoryContext& ctx, |