aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorIgor Makunin <igor.makunin@gmail.com>2022-02-16 18:52:02 +0300
committerIgor Makunin <igor.makunin@gmail.com>2022-02-16 18:52:02 +0300
commit0c4c91540e1b2b874a87135cf87d22a4325ac102 (patch)
treee693cec140cc5e80f48025e2d0749f03fb205482
parente1e97356e553c2f4a13eda00a625caa975f4ac61 (diff)
downloadydb-0c4c91540e1b2b874a87135cf87d22a4325ac102.tar.gz
KIKIMR-14356: rework reads and support cdc
ref:67221a54ace9765874e54698d3ad82baacdf26a5
-rw-r--r--ydb/core/engine/minikql/minikql_engine_host.h1
-rw-r--r--ydb/core/tx/datashard/datashard__engine_host.cpp2
-rw-r--r--ydb/core/tx/datashard/datashard__engine_host.h2
-rw-r--r--ydb/core/tx/datashard/datashard_kqp_compute.cpp317
-rw-r--r--ydb/core/tx/datashard/datashard_kqp_compute.h66
-rw-r--r--ydb/core/tx/datashard/datashard_kqp_lookup_table.cpp152
-rw-r--r--ydb/core/tx/datashard/datashard_kqp_read_table.cpp238
7 files changed, 422 insertions, 356 deletions
diff --git a/ydb/core/engine/minikql/minikql_engine_host.h b/ydb/core/engine/minikql/minikql_engine_host.h
index 012ee6891b..9c6e39a364 100644
--- a/ydb/core/engine/minikql/minikql_engine_host.h
+++ b/ydb/core/engine/minikql/minikql_engine_host.h
@@ -123,6 +123,7 @@ public:
void ExecPeriodicCallback() { if (PeriodicCallback) { PeriodicCallback();} }
TEngineHostCounters& GetCounters() const { return Counters; }
+ const TEngineHostSettings& GetSettings() const { return Settings; }
virtual TRowVersion GetWriteVersion(const TTableId& tableId) const = 0;
virtual TRowVersion GetReadVersion(const TTableId& tableId) const = 0;
diff --git a/ydb/core/tx/datashard/datashard__engine_host.cpp b/ydb/core/tx/datashard/datashard__engine_host.cpp
index 876c337093..5df538f438 100644
--- a/ydb/core/tx/datashard/datashard__engine_host.cpp
+++ b/ydb/core/tx/datashard/datashard__engine_host.cpp
@@ -541,7 +541,7 @@ TEngineBay::TEngineBay(TDataShard * self, TTransactionContext& txc, const TActor
<< ": " << message);
};
- ComputeCtx = MakeHolder<TKqpDatashardComputeContext>(self, EngineHostCounters, now);
+ ComputeCtx = MakeHolder<TKqpDatashardComputeContext>(self, *EngineHost, now);
ComputeCtx->Database = &txc.DB;
auto kqpApplyCtx = MakeHolder<TKqpDatashardApplyContext>();
diff --git a/ydb/core/tx/datashard/datashard__engine_host.h b/ydb/core/tx/datashard/datashard__engine_host.h
index 8e3064c83d..90a298cfa8 100644
--- a/ydb/core/tx/datashard/datashard__engine_host.h
+++ b/ydb/core/tx/datashard/datashard__engine_host.h
@@ -104,7 +104,7 @@ public:
private:
std::pair<ui64, ui64> StepTxId;
- THolder<NMiniKQL::IEngineFlatHost> EngineHost;
+ THolder<NMiniKQL::TEngineHost> EngineHost;
THolder<NMiniKQL::TEngineFlatSettings> EngineSettings;
THolder<NMiniKQL::IEngineFlat> Engine;
TValidationInfo Info;
diff --git a/ydb/core/tx/datashard/datashard_kqp_compute.cpp b/ydb/core/tx/datashard/datashard_kqp_compute.cpp
index 885c633780..1cefe8f239 100644
--- a/ydb/core/tx/datashard/datashard_kqp_compute.cpp
+++ b/ydb/core/tx/datashard/datashard_kqp_compute.cpp
@@ -148,20 +148,17 @@ const NDataShard::TUserTable* TKqpDatashardComputeContext::GetTable(const TTable
return ptr->Get();
}
-void TKqpDatashardComputeContext::ReadTable(const TTableId& tableId, const TTableRange& range) const {
- MKQL_ENSURE_S(Shard);
+void TKqpDatashardComputeContext::TouchTableRange(const TTableId& tableId, const TTableRange& range) const {
Shard->SysLocksTable().SetLock(tableId, range, LockTxId);
Shard->SetTableAccessTime(tableId, Now);
}
-void TKqpDatashardComputeContext::ReadTable(const TTableId& tableId, const TArrayRef<const TCell>& key) const {
- MKQL_ENSURE_S(Shard);
+void TKqpDatashardComputeContext::TouchTablePoint(const TTableId& tableId, const TArrayRef<const TCell>& key) const {
Shard->SysLocksTable().SetLock(tableId, key, LockTxId);
Shard->SetTableAccessTime(tableId, Now);
}
void TKqpDatashardComputeContext::BreakSetLocks() const {
- MKQL_ENSURE_S(Shard);
Shard->SysLocksTable().BreakSetLocks(LockTxId);
}
@@ -169,10 +166,6 @@ void TKqpDatashardComputeContext::SetLockTxId(ui64 lockTxId) {
LockTxId = lockTxId;
}
-ui64 TKqpDatashardComputeContext::GetShardId() const {
- return Shard->TabletID();
-}
-
void TKqpDatashardComputeContext::SetReadVersion(TRowVersion readVersion) {
ReadVersion = readVersion;
}
@@ -228,7 +221,25 @@ bool TKqpDatashardComputeContext::PinPages(const TVector<IEngineFlat::TValidated
continue;
}
- if (key.RowOperation != TKeyDesc::ERowOperation::Read) {
+ TSet<TKeyDesc::EColumnOperation> columnOpFilter;
+ switch (key.RowOperation) {
+ case TKeyDesc::ERowOperation::Read:
+ columnOpFilter.insert(TKeyDesc::EColumnOperation::Read);
+ break;
+ case TKeyDesc::ERowOperation::Update:
+ case TKeyDesc::ERowOperation::Erase: {
+ const auto collector = EngineHost.GetChangeCollector(key.TableId);
+ if (collector && collector->NeedToReadKeys()) {
+ columnOpFilter.insert(TKeyDesc::EColumnOperation::Set);
+ columnOpFilter.insert(TKeyDesc::EColumnOperation::InplaceUpdate);
+ }
+ break;
+ }
+ default:
+ break;
+ }
+
+ if (columnOpFilter.empty()) {
continue;
}
@@ -245,7 +256,7 @@ bool TKqpDatashardComputeContext::PinPages(const TVector<IEngineFlat::TValidated
TSmallVec<NTable::TTag> columnTags;
for (const auto& column : key.Columns) {
- if (Y_LIKELY(column.Operation == TKeyDesc::EColumnOperation::Read)) {
+ if (columnOpFilter.contains(column.Operation)) {
columnTags.push_back(column.Column);
}
}
@@ -254,7 +265,7 @@ bool TKqpDatashardComputeContext::PinPages(const TVector<IEngineFlat::TValidated
from,
key.Range.Point ? from : to,
columnTags,
- 0 /* readFlags */,
+ EngineHost.GetSettings().DisableByKeyFilter ? (ui64)NTable::NoByKey : 0,
adjustLimit(key.RangeLimits.ItemsLimit),
adjustLimit(key.RangeLimits.BytesLimit),
key.Reverse ? NTable::EDirection::Reverse : NTable::EDirection::Forward,
@@ -274,8 +285,288 @@ bool TKqpDatashardComputeContext::PinPages(const TVector<IEngineFlat::TValidated
return ret;
}
-void TKqpDatashardComputeContext::AddKeyAccessSample(const TTableId& tableId, const TArrayRef<const TCell>& key) {
+static void BuildRowImpl(const TDbTupleRef& dbTuple, const THolderFactory& holderFactory,
+ const TSmallVec<TTag>& systemColumnTags, ui64 shardId, NUdf::TUnboxedValue& result, size_t& rowSize)
+{
+ size_t columnsCount = dbTuple.ColumnCount + systemColumnTags.size();
+
+ TUnboxedValue* rowItems = nullptr;
+ result = holderFactory.CreateDirectArrayHolder(columnsCount, rowItems);
+
+ rowSize = 0;
+ for (ui32 i = 0; i < dbTuple.ColumnCount; ++i) {
+ const auto& cell = dbTuple.Cells()[i];
+ rowSize += cell.IsNull() ? 1 : cell.Size();
+ rowItems[i] = GetCellValue(cell, dbTuple.Types[i]);
+ }
+
+ // Some per-row overhead to deal with the case when no columns were requested
+ rowSize = std::max(rowSize, (size_t) 8);
+
+ for (ui32 i = dbTuple.ColumnCount, j = 0; i < columnsCount; ++i, ++j) {
+ switch (systemColumnTags[j]) {
+ case TKeyDesc::EColumnIdDataShard:
+ rowItems[i] = TUnboxedValue(TUnboxedValuePod(shardId));
+ break;
+ default:
+ throw TSchemeErrorTabletException();
+ }
+ }
+}
+
+static void BuildRowWideImpl(const TDbTupleRef& dbTuple, const TSmallVec<TTag>& systemColumnTags, ui64 shardId,
+ NUdf::TUnboxedValue* const* result, size_t& rowSize)
+{
+ size_t columnsCount = dbTuple.ColumnCount + systemColumnTags.size();
+
+ rowSize = 0;
+ for (ui32 i = 0; i < dbTuple.ColumnCount; ++i) {
+ const auto& cell = dbTuple.Cells()[i];
+ rowSize += cell.IsNull() ? 1 : cell.Size();
+ if (auto out = *result++) {
+ *out = GetCellValue(cell, dbTuple.Types[i]);
+ }
+ }
+
+ // Some per-row overhead to deal with the case when no columns were requested
+ rowSize = std::max(rowSize, (size_t) 8);
+
+ for (ui32 i = dbTuple.ColumnCount, j = 0; i < columnsCount; ++i, ++j) {
+ auto out = *result++;
+ if (!out) {
+ continue;
+ }
+
+ switch (systemColumnTags[j]) {
+ case TKeyDesc::EColumnIdDataShard:
+ *out = TUnboxedValue(TUnboxedValuePod(shardId));
+ break;
+ default:
+ throw TSchemeErrorTabletException();
+ }
+ }
+}
+
+bool TKqpDatashardComputeContext::ReadRow(const TTableId& tableId, TArrayRef<const TCell> key,
+ const TSmallVec<NTable::TTag>& columnTags, const TSmallVec<NTable::TTag>& systemColumnTags,
+ const THolderFactory& holderFactory, NUdf::TUnboxedValue& result, TKqpTableStats& kqpStats)
+{
+ MKQL_ENSURE_S(Shard);
+
+ auto localTid = Shard->GetLocalTableId(tableId);
+ auto tableInfo = Database->GetScheme().GetTableInfo(localTid);
+ MKQL_ENSURE_S(tableInfo, "Can not resolve table " << tableId);
+
+ TSmallVec<TRawTypeValue> keyValues;
+ ConvertTableKeys(Database->GetScheme(), tableInfo, key, keyValues, /* keyDataBytes */ nullptr);
+
+ if (Y_UNLIKELY(keyValues.size() != tableInfo->KeyColumns.size())) {
+ throw TSchemeErrorTabletException();
+ }
+
+ TouchTablePoint(tableId, key);
Shard->GetKeyAccessSampler()->AddSample(tableId, key);
+
+ NTable::TRowState dbRow;
+ NTable::TSelectStats stats;
+ ui64 flags = EngineHost.GetSettings().DisableByKeyFilter ? (ui64) NTable::NoByKey : 0;
+ auto ready = Database->Select(localTid, keyValues, columnTags, dbRow, stats, flags, GetReadVersion());
+
+ kqpStats.NSelectRow = 1;
+ kqpStats.InvisibleRowSkips = stats.Invisible;
+
+ switch (ready) {
+ case EReady::Page:
+ if (auto collector = EngineHost.GetChangeCollector(tableId)) {
+ collector->Reset();
+ }
+ SetTabletNotReady();
+ return false;
+ case EReady::Gone:
+ return false;
+ case EReady::Data:
+ break;
+ };
+
+ MKQL_ENSURE_S(columnTags.size() == dbRow.Size(), "Invalid local db row size.");
+
+ TVector<NScheme::TTypeId> types(columnTags.size());
+ for (size_t i = 0; i < columnTags.size(); ++i) {
+ types[i] = tableInfo->Columns.at(columnTags[i]).PType;
+ }
+ auto dbTuple = TDbTupleRef(types.data(), (*dbRow).data(), dbRow.Size());
+
+ size_t rowSize = 0;
+ BuildRowImpl(dbTuple, holderFactory, systemColumnTags, Shard->TabletID(), result, rowSize);
+
+ kqpStats.SelectRowRows = 1;
+ kqpStats.SelectRowBytes += rowSize;
+
+ return true;
+}
+
+TAutoPtr<NTable::TTableIt> TKqpDatashardComputeContext::CreateIterator(const TTableId& tableId, const TTableRange& range,
+ const TSmallVec<NTable::TTag>& columnTags)
+{
+ auto localTid = Shard->GetLocalTableId(tableId);
+ auto tableInfo = Database->GetScheme().GetTableInfo(localTid);
+ MKQL_ENSURE_S(tableInfo, "Can not resolve table " << tableId);
+
+ TSmallVec<TRawTypeValue> from, to;
+ ConvertTableKeys(Database->GetScheme(), tableInfo, range.From, from, /* keyDataBytes */ nullptr);
+ ConvertTableKeys(Database->GetScheme(), tableInfo, range.To, to, /* keyDataBytes */ nullptr);
+
+ NTable::TKeyRange keyRange;
+ keyRange.MinKey = from;
+ keyRange.MaxKey = to;
+ keyRange.MinInclusive = range.InclusiveFrom;
+ keyRange.MaxInclusive = range.InclusiveTo;
+
+ TouchTableRange(tableId, range);
+ return Database->IterateRange(localTid, keyRange, columnTags, GetReadVersion());
+}
+
+TAutoPtr<NTable::TTableReverseIt> TKqpDatashardComputeContext::CreateReverseIterator(const TTableId& tableId,
+ const TTableRange& range, const TSmallVec<NTable::TTag>& columnTags)
+{
+ auto localTid = Shard->GetLocalTableId(tableId);
+ auto tableInfo = Database->GetScheme().GetTableInfo(localTid);
+ MKQL_ENSURE_S(tableInfo, "Can not resolve table " << tableId);
+
+ TSmallVec<TRawTypeValue> from, to;
+ ConvertTableKeys(Database->GetScheme(), tableInfo, range.From, from, /* keyDataBytes */ nullptr);
+ ConvertTableKeys(Database->GetScheme(), tableInfo, range.To, to, /* keyDataBytes */ nullptr);
+
+ NTable::TKeyRange keyRange;
+ keyRange.MinKey = from;
+ keyRange.MaxKey = to;
+ keyRange.MinInclusive = range.InclusiveFrom;
+ keyRange.MaxInclusive = range.InclusiveTo;
+
+ TouchTableRange(tableId, range);
+ return Database->IterateRangeReverse(localTid, keyRange, columnTags, GetReadVersion());
+}
+
+template <typename TReadTableIterator>
+bool TKqpDatashardComputeContext::ReadRowImpl(const TTableId& tableId, TReadTableIterator& iterator,
+ const TSmallVec<NTable::TTag>& systemColumnTags, const TSmallVec<bool>& skipNullKeys,
+ const THolderFactory& holderFactory, NUdf::TUnboxedValue& result, TKqpTableStats& stats)
+{
+ while (iterator.Next(NTable::ENext::Data) == NTable::EReady::Data) {
+ TDbTupleRef rowKey = iterator.GetKey();
+ MKQL_ENSURE_S(skipNullKeys.size() <= rowKey.ColumnCount);
+
+ Shard->GetKeyAccessSampler()->AddSample(tableId, rowKey.Cells());
+
+ bool skipRow = false;
+ for (ui32 i = 0; i < skipNullKeys.size(); ++i) {
+ if (skipNullKeys[i] && rowKey.Columns[i].IsNull()) {
+ skipRow = true;
+ break;
+ }
+ }
+
+ if (skipRow) {
+ continue;
+ }
+
+ TDbTupleRef rowValues = iterator.GetValues();
+ size_t rowSize = 0;
+
+ BuildRowImpl(rowValues, holderFactory, systemColumnTags, Shard->TabletID(), result, rowSize);
+
+ stats.SelectRangeRows = 1;
+ stats.SelectRangeBytes = rowSize;
+
+ stats.InvisibleRowSkips = std::exchange(iterator.Stats.InvisibleRowSkips, 0);
+ stats.SelectRangeDeletedRowSkips = std::exchange(iterator.Stats.DeletedRowSkips, 0);
+
+ return true;
+ }
+
+ if (iterator.Last() == NTable::EReady::Page) {
+ if (auto collector = EngineHost.GetChangeCollector(tableId)) {
+ collector->Reset();
+ }
+ SetTabletNotReady();
+ }
+
+ return false;
+}
+
+template <typename TReadTableIterator>
+bool TKqpDatashardComputeContext::ReadRowWideImpl(const TTableId& tableId, TReadTableIterator& iterator,
+ const TSmallVec<NTable::TTag>& systemColumnTags, const TSmallVec<bool>& skipNullKeys,
+ NUdf::TUnboxedValue* const* result, TKqpTableStats& stats)
+{
+ while (iterator.Next(NTable::ENext::Data) == NTable::EReady::Data) {
+ TDbTupleRef rowKey = iterator.GetKey();
+ MKQL_ENSURE_S(skipNullKeys.size() <= rowKey.ColumnCount);
+
+ Shard->GetKeyAccessSampler()->AddSample(tableId, rowKey.Cells());
+
+ bool skipRow = false;
+ for (ui32 i = 0; i < skipNullKeys.size(); ++i) {
+ if (skipNullKeys[i] && rowKey.Columns[i].IsNull()) {
+ skipRow = true;
+ break;
+ }
+ }
+
+ if (skipRow) {
+ continue;
+ }
+
+ TDbTupleRef rowValues = iterator.GetValues();
+ size_t rowSize = 0;
+
+ BuildRowWideImpl(rowValues, systemColumnTags, Shard->TabletID(), result, rowSize);
+
+ stats.SelectRangeRows = 1;
+ stats.SelectRangeBytes = rowSize;
+
+ stats.InvisibleRowSkips = std::exchange(iterator.Stats.InvisibleRowSkips, 0);
+ stats.SelectRangeDeletedRowSkips = std::exchange(iterator.Stats.DeletedRowSkips, 0);
+
+ return true;
+ }
+
+ if (iterator.Last() == NTable::EReady::Page) {
+ if (auto collector = EngineHost.GetChangeCollector(tableId)) {
+ collector->Reset();
+ }
+ SetTabletNotReady();
+ }
+
+ return false;
+}
+
+bool TKqpDatashardComputeContext::ReadRow(const TTableId& tableId, NTable::TTableIt& iterator,
+ const TSmallVec<NTable::TTag>& systemColumnTags, const TSmallVec<bool>& skipNullKeys,
+ const THolderFactory& holderFactory, NUdf::TUnboxedValue& result, TKqpTableStats& stats)
+{
+ return ReadRowImpl(tableId, iterator, systemColumnTags, skipNullKeys, holderFactory, result, stats);
+}
+
+bool TKqpDatashardComputeContext::ReadRow(const TTableId& tableId, NTable::TTableReverseIt& iterator,
+ const TSmallVec<NTable::TTag>& systemColumnTags, const TSmallVec<bool>& skipNullKeys,
+ const THolderFactory& holderFactory, NUdf::TUnboxedValue& result, TKqpTableStats& stats)
+{
+ return ReadRowImpl(tableId, iterator, systemColumnTags, skipNullKeys, holderFactory, result, stats);
+}
+
+bool TKqpDatashardComputeContext::ReadRowWide(const TTableId& tableId, NTable::TTableIt& iterator,
+ const TSmallVec<NTable::TTag>& systemColumnTags, const TSmallVec<bool>& skipNullKeys,
+ NUdf::TUnboxedValue* const* result, TKqpTableStats& stats)
+{
+ return ReadRowWideImpl(tableId, iterator, systemColumnTags, skipNullKeys,result, stats);
+}
+
+bool TKqpDatashardComputeContext::ReadRowWide(const TTableId& tableId, NTable::TTableReverseIt& iterator,
+ const TSmallVec<NTable::TTag>& systemColumnTags, const TSmallVec<bool>& skipNullKeys,
+ NUdf::TUnboxedValue* const* result, TKqpTableStats& stats)
+{
+ return ReadRowWideImpl(tableId, iterator, systemColumnTags, skipNullKeys, result, stats);
}
} // namespace NMiniKQL
diff --git a/ydb/core/tx/datashard/datashard_kqp_compute.h b/ydb/core/tx/datashard/datashard_kqp_compute.h
index 1862763081..35346b67fa 100644
--- a/ydb/core/tx/datashard/datashard_kqp_compute.h
+++ b/ydb/core/tx/datashard/datashard_kqp_compute.h
@@ -22,19 +22,16 @@ using TKqpTableStats = TEngineHostCounters;
class TKqpDatashardComputeContext : public TKqpComputeContextBase {
public:
- TKqpDatashardComputeContext(NDataShard::TDataShard* shard, TEngineHostCounters& counters, TInstant now)
+ TKqpDatashardComputeContext(NDataShard::TDataShard* shard, TEngineHost& engineHost, TInstant now)
: Shard(shard)
- , DatashardCounters(counters)
+ , EngineHost(engineHost)
, Now(now) {}
ui64 GetLocalTableId(const TTableId& tableId) const;
TString GetTablePath(const TTableId& tableId) const;
const NDataShard::TUserTable* GetTable(const TTableId& tableId) const;
- void ReadTable(const TTableId& tableId, const TTableRange& range) const;
- void ReadTable(const TTableId& tableId, const TArrayRef<const TCell>& key) const;
void BreakSetLocks() const;
void SetLockTxId(ui64 lockTxId);
- ui64 GetShardId() const;
TVector<std::pair<NScheme::TTypeId, TString>> GetKeyColumnsInfo(const TTableId &tableId) const;
THashMap<TString, NScheme::TTypeId> GetKeyColumnsMap(const TTableId &tableId) const;
@@ -53,12 +50,51 @@ public:
TRowVersion GetReadVersion() const;
TEngineHostCounters& GetTaskCounters(ui64 taskId) { return TaskCounters[taskId]; }
- TEngineHostCounters& GetDatashardCounters() { return DatashardCounters; }
+ TEngineHostCounters& GetDatashardCounters() { return EngineHost.GetCounters(); }
- void SetTabletNotReady() { Y_VERIFY_DEBUG(!TabletNotReady); TabletNotReady = true; };
bool IsTabletNotReady() const { return TabletNotReady; }
- void AddKeyAccessSample(const TTableId& tableId, const TArrayRef<const TCell>& key);
+ bool ReadRow(const TTableId& tableId, TArrayRef<const TCell> key, const TSmallVec<NTable::TTag>& columnTags,
+ const TSmallVec<NTable::TTag>& systemColumnTags, const THolderFactory& holderFactory,
+ NUdf::TUnboxedValue& result, TKqpTableStats& stats);
+
+ TAutoPtr<NTable::TTableIt> CreateIterator(const TTableId& tableId, const TTableRange& range,
+ const TSmallVec<NTable::TTag>& columnTags);
+
+ TAutoPtr<NTable::TTableReverseIt> CreateReverseIterator(const TTableId& tableId, const TTableRange& range,
+ const TSmallVec<NTable::TTag>& columnTags);
+
+ bool ReadRow(const TTableId& tableId, NTable::TTableIt& iterator,
+ const TSmallVec<NTable::TTag>& systemColumnTags, const TSmallVec<bool>& skipNullKeys,
+ const THolderFactory& holderFactory, NUdf::TUnboxedValue& result, TKqpTableStats& stats);
+
+ bool ReadRow(const TTableId& tableId, NTable::TTableReverseIt& iterator,
+ const TSmallVec<NTable::TTag>& systemColumnTags, const TSmallVec<bool>& skipNullKeys,
+ const THolderFactory& holderFactory, NUdf::TUnboxedValue& result, TKqpTableStats& stats);
+
+ bool ReadRowWide(const TTableId& tableId, NTable::TTableIt& iterator,
+ const TSmallVec<NTable::TTag>& systemColumnTags, const TSmallVec<bool>& skipNullKeys,
+ NUdf::TUnboxedValue* const* result, TKqpTableStats& stats);
+
+ bool ReadRowWide(const TTableId& tableId, NTable::TTableReverseIt& iterator,
+ const TSmallVec<NTable::TTag>& systemColumnTags, const TSmallVec<bool>& skipNullKeys,
+ NUdf::TUnboxedValue* const* result, TKqpTableStats& stats);
+
+private:
+ void TouchTableRange(const TTableId& tableId, const TTableRange& range) const;
+ void TouchTablePoint(const TTableId& tableId, const TArrayRef<const TCell>& key) const;
+
+ template <typename TReadTableIterator>
+ bool ReadRowImpl(const TTableId& tableId, TReadTableIterator& iterator,
+ const TSmallVec<NTable::TTag>& systemColumnTags, const TSmallVec<bool>& skipNullKeys,
+ const THolderFactory& holderFactory, NUdf::TUnboxedValue& result, TKqpTableStats& stats);
+
+ template <typename TReadTableIterator>
+ bool ReadRowWideImpl(const TTableId& tableId, TReadTableIterator& iterator,
+ const TSmallVec<NTable::TTag>& systemColumnTags, const TSmallVec<bool>& skipNullKeys,
+ NUdf::TUnboxedValue* const* result, TKqpTableStats& stats);
+
+ void SetTabletNotReady() { Y_VERIFY_DEBUG(!TabletNotReady); TabletNotReady = true; };
public:
NTable::TDatabase* Database = nullptr;
@@ -66,7 +102,7 @@ public:
private:
NDataShard::TDataShard* Shard;
std::unordered_map<ui64, TEngineHostCounters> TaskCounters;
- TEngineHostCounters& DatashardCounters;
+ TEngineHost& EngineHost;
TInstant Now;
ui64 LockTxId = 0;
bool PersistentChannels = false;
@@ -82,18 +118,6 @@ public:
TSmallVec<NTable::TTag> ExtractTags(const TSmallVec<TKqpComputeContextBase::TColumn>& columns);
-bool TryFetchRow(const TTableId& tableId, NTable::TTableIt& iterator, NYql::NUdf::TUnboxedValue& row,
- TComputationContext& ctx, TKqpTableStats& tableStats, TKqpDatashardComputeContext& computeCtx,
- const TSmallVec<NTable::TTag>& systemColumnTags, const TSmallVec<bool>& skipNullKeys);
-
-bool TryFetchRow(const TTableId& tableId, NTable::TTableReverseIt& iterator, NYql::NUdf::TUnboxedValue& row,
- TComputationContext& ctx, TKqpTableStats& tableStats, TKqpDatashardComputeContext& computeCtx,
- const TSmallVec<NTable::TTag>& systemColumnTags, const TSmallVec<bool>& skipNullKeys);
-
-void FetchRow(const TDbTupleRef& dbTuple, NYql::NUdf::TUnboxedValue& row, TComputationContext& ctx,
- TKqpTableStats& tableStats, const TKqpDatashardComputeContext& computeCtx,
- const TSmallVec<NTable::TTag>& systemColumnTags);
-
IComputationNode* WrapKqpWideReadTableRanges(TCallable& callable, const TComputationNodeFactoryContext& ctx,
TKqpDatashardComputeContext& computeCtx);
IComputationNode* WrapKqpLookupTable(TCallable& callable, const TComputationNodeFactoryContext& ctx,
diff --git a/ydb/core/tx/datashard/datashard_kqp_lookup_table.cpp b/ydb/core/tx/datashard/datashard_kqp_lookup_table.cpp
index 7d6ed38420..ee179cd91c 100644
--- a/ydb/core/tx/datashard/datashard_kqp_lookup_table.cpp
+++ b/ydb/core/tx/datashard/datashard_kqp_lookup_table.cpp
@@ -88,94 +88,62 @@ public:
, TypeEnv(typeEnv)
, ParseResult(parseResult)
, LookupKeysNode(lookupKeysNode)
- , LocalTid(ComputeCtx.GetLocalTableId(ParseResult.TableId))
, ColumnTags(ExtractTags(ParseResult.Columns))
, SystemColumnTags(ExtractTags(ParseResult.SystemColumns))
, ShardTableStats(ComputeCtx.GetDatashardCounters())
, TaskTableStats(ComputeCtx.GetTaskCounters(ComputeCtx.GetCurrentTaskId()))
- , TableInfo(ComputeCtx.Database->GetScheme().GetTableInfo(LocalTid))
{
- MKQL_ENSURE_S(TableInfo);
-
- MKQL_ENSURE_S(TableInfo->KeyColumns.size() == ParseResult.KeyIndices.size(),
+ auto localTid = ComputeCtx.GetLocalTableId(ParseResult.TableId);
+ auto tableInfo = ComputeCtx.Database->GetScheme().GetTableInfo(localTid);
+ MKQL_ENSURE_S(tableInfo, "Unknown table " << ParseResult.TableId);
+ MKQL_ENSURE_S(tableInfo->KeyColumns.size() == ParseResult.KeyIndices.size(),
"Incomplete row key in LookupRows.");
-
- CellTypes.reserve(ColumnTags.size());
- for (size_t i = 0; i < ColumnTags.size(); ++i) {
- CellTypes.emplace_back(TableInfo->Columns.at(ColumnTags[i]).PType);
- }
}
TUnboxedValue DoCalculate(TComputationContext& ctx) const {
auto keysValues = LookupKeysNode->GetValue(ctx);
- while (!Finished) {
+ while (true) {
NUdf::TUnboxedValue key;
- auto status = keysValues.Fetch(key);
- switch (status) {
+
+ switch (keysValues.Fetch(key)) {
case NUdf::EFetchStatus::Ok: {
- TVector<TCell> keyCells(TableInfo->KeyColumns.size());
+ TVector<TCell> keyCells(ParseResult.KeyIndices.size());
FillKeyTupleValue(key, ParseResult.KeyIndices, ParseResult.KeyTypes, keyCells, TypeEnv);
- TSmallVec<TRawTypeValue> keyValues;
- ConvertTableKeys(ComputeCtx.Database->GetScheme(), TableInfo, keyCells, keyValues, nullptr);
+ NUdf::TUnboxedValue result;
+ TKqpTableStats stats;
+ bool fetched = ComputeCtx.ReadRow(ParseResult.TableId, keyCells, ColumnTags, SystemColumnTags,
+ ctx.HolderFactory, result, stats);
- if (keyValues.size() != TableInfo->KeyColumns.size()) {
- throw TSchemeErrorTabletException();
+ if (stats.InvisibleRowSkips) {
+ ComputeCtx.BreakSetLocks();
}
- ComputeCtx.ReadTable(ParseResult.TableId, keyCells);
- ComputeCtx.AddKeyAccessSample(ParseResult.TableId, keyCells);
+ ShardTableStats += stats;
+ TaskTableStats += stats;
- NTable::TRowState dbRow;
- NTable::TSelectStats stats;
- ui64 flags = 0; // TODO: Check DisableByKeyFilter
- auto ready = ComputeCtx.Database->Select(LocalTid, keyValues, ColumnTags, dbRow, stats, flags, ComputeCtx.GetReadVersion());
- if (stats.Invisible)
- ComputeCtx.BreakSetLocks();
+ if (fetched) {
+ return std::move(result);
+ }
- switch (ready) {
- case EReady::Page:
- ComputeCtx.SetTabletNotReady();
- return TUnboxedValue::MakeYield();
- case EReady::Gone:
- continue;
- case EReady::Data:
- break;
- default:
- MKQL_ENSURE_S(false, "Unexpected local db select status: " << (ui32)ready);
- };
-
- MKQL_ENSURE_S(CellTypes.size() == dbRow.Size(), "Invalid local db row size.");
-
- TDbTupleRef dbTuple(CellTypes.data(), (*dbRow).data(), dbRow.Size());
- TUnboxedValue result;
- TKqpTableStats tableStats;
- FetchRow(dbTuple, result, ctx, tableStats, ComputeCtx, SystemColumnTags);
-
- ShardTableStats.NSelectRow++;
- ShardTableStats.SelectRowRows++;
- ShardTableStats.SelectRowBytes += tableStats.SelectRowBytes;
-
- TaskTableStats.NSelectRow++;
- TaskTableStats.SelectRowRows++;
- TaskTableStats.SelectRowBytes += tableStats.SelectRowBytes;
-
- return result;
+ if (ComputeCtx.IsTabletNotReady()) {
+ return NUdf::TUnboxedValue::MakeYield();
+ }
+
+ continue;
}
case NUdf::EFetchStatus::Finish:
- Finished = true;
return TUnboxedValue::MakeFinish();
case NUdf::EFetchStatus::Yield:
+ MKQL_ENSURE_S(false);
return TUnboxedValue::MakeYield();
}
-
- MKQL_ENSURE_S(false, "Unexpected key fetch status: " << (ui32)status);
}
- return TUnboxedValue::MakeFinish();
+ Y_UNREACHABLE();
}
private:
@@ -188,14 +156,10 @@ private:
const TTypeEnvironment& TypeEnv;
TParseLookupTableResult ParseResult;
IComputationNode* LookupKeysNode;
- ui64 LocalTid;
TSmallVec<TTag> ColumnTags;
TSmallVec<TTag> SystemColumnTags;
TKqpTableStats& ShardTableStats;
TKqpTableStats& TaskTableStats;
- const NTable::TScheme::TTableInfo* TableInfo;
- TSmallVec<NScheme::TTypeId> CellTypes;
- mutable bool Finished = false;
};
class TKqpLookupTableWrapper : public TStatelessFlowComputationNode<TKqpLookupTableWrapper> {
@@ -209,18 +173,13 @@ public:
, TypeEnv(typeEnv)
, ParseResult(parseResult)
, LookupKeysNode(lookupKeysNode)
- , LocalTid(ComputeCtx.GetLocalTableId(ParseResult.TableId))
, ColumnTags(ExtractTags(ParseResult.Columns))
, SystemColumnTags(ExtractTags(ParseResult.SystemColumns))
, ShardTableStats(ComputeCtx.GetDatashardCounters())
- , TaskTableStats(ComputeCtx.GetTaskCounters(computeCtx.GetCurrentTaskId()))
- , TableInfo(ComputeCtx.Database->GetScheme().GetTableInfo(LocalTid))
- {
- MKQL_ENSURE_S(TableInfo);
- }
+ , TaskTableStats(ComputeCtx.GetTaskCounters(computeCtx.GetCurrentTaskId())) {}
TUnboxedValue DoCalculate(TComputationContext& ctx) const {
- while (!Finished) {
+ while (true) {
if (!Iterator) {
auto keysValues = LookupKeysNode->GetValue(ctx);
@@ -229,7 +188,11 @@ public:
switch (status) {
case NUdf::EFetchStatus::Ok: {
- TVector<TCell> fromCells(TableInfo->KeyColumns.size());
+ auto localTid = ComputeCtx.GetLocalTableId(ParseResult.TableId);
+ auto tableInfo = ComputeCtx.Database->GetScheme().GetTableInfo(localTid);
+ MKQL_ENSURE_S(tableInfo);
+
+ TVector<TCell> fromCells(tableInfo->KeyColumns.size());
FillKeyTupleValue(key, ParseResult.KeyIndices, ParseResult.KeyTypes, fromCells, TypeEnv);
TVector<TCell> toCells(ParseResult.KeyIndices.size());
@@ -237,64 +200,48 @@ public:
auto range = TTableRange(fromCells, true, toCells, true);
- TSmallVec<TRawTypeValue> from, to;
- ConvertTableKeys(ComputeCtx.Database->GetScheme(), TableInfo, range.From, from, nullptr);
- ConvertTableKeys(ComputeCtx.Database->GetScheme(), TableInfo, range.To, to, nullptr);
+ Iterator = ComputeCtx.CreateIterator(ParseResult.TableId, range, ColumnTags);
- NTable::TKeyRange keyRange;
- keyRange.MinKey = from;
- keyRange.MaxKey = to;
- keyRange.MinInclusive = range.InclusiveFrom;
- keyRange.MaxInclusive = range.InclusiveTo;
-
- ComputeCtx.ReadTable(ParseResult.TableId, range);
- Iterator = ComputeCtx.Database->IterateRange(LocalTid, keyRange, ColumnTags, ComputeCtx.GetReadVersion());
+ ShardTableStats.NSelectRange++;
+ TaskTableStats.NSelectRange++;
break;
}
case NUdf::EFetchStatus::Finish:
- Finished = true;
return TUnboxedValue::MakeFinish();
case NUdf::EFetchStatus::Yield:
+ MKQL_ENSURE_S(false);
return TUnboxedValue::MakeYield();
}
}
TUnboxedValue result;
- TKqpTableStats tableStats;
- auto fetched = TryFetchRow(ParseResult.TableId, *Iterator, result, ctx, tableStats, ComputeCtx,
- SystemColumnTags, ParseResult.SkipNullKeys);
+ TKqpTableStats stats;
- if (Iterator->Stats.InvisibleRowSkips) {
- ComputeCtx.BreakSetLocks();
- }
-
- ShardTableStats += tableStats;
- TaskTableStats += tableStats;
+ bool fetched = ComputeCtx.ReadRow(ParseResult.TableId, *Iterator, SystemColumnTags,
+ ParseResult.SkipNullKeys, ctx.HolderFactory, result, stats);
- ui64 deletedRowSkips = std::exchange(Iterator->Stats.DeletedRowSkips, 0);
- ui64 invisibleRowSkips = std::exchange(Iterator->Stats.InvisibleRowSkips, 0);
- ShardTableStats.SelectRangeDeletedRowSkips += deletedRowSkips;
- ShardTableStats.InvisibleRowSkips += invisibleRowSkips;
+ if (stats.InvisibleRowSkips) {
+ ComputeCtx.BreakSetLocks();
+ }
- TaskTableStats.SelectRangeDeletedRowSkips += deletedRowSkips;
- TaskTableStats.InvisibleRowSkips += invisibleRowSkips;
+ ShardTableStats += stats;
+ TaskTableStats += stats;
if (fetched) {
return result;
}
- if (Iterator->Last() == NTable::EReady::Page) {
- ComputeCtx.SetTabletNotReady();
- return TUnboxedValue::MakeYield();
+ if (ComputeCtx.IsTabletNotReady()) {
+ return NUdf::TUnboxedValue::MakeYield();
}
Iterator = nullptr;
}
- return TUnboxedValue::MakeFinish();
+ Y_UNREACHABLE();
}
private:
@@ -307,14 +254,11 @@ private:
const TTypeEnvironment& TypeEnv;
TParseLookupTableResult ParseResult;
IComputationNode* LookupKeysNode;
- ui64 LocalTid;
TSmallVec<TTag> ColumnTags;
TSmallVec<TTag> SystemColumnTags;
TKqpTableStats& ShardTableStats;
TKqpTableStats& TaskTableStats;
- const NTable::TScheme::TTableInfo* TableInfo;
mutable TAutoPtr<NTable::TTableIt> Iterator;
- mutable bool Finished = false;
};
IComputationNode* WrapKqpLookupTableInternal(TCallable& callable, const TComputationNodeFactoryContext& ctx,
diff --git a/ydb/core/tx/datashard/datashard_kqp_read_table.cpp b/ydb/core/tx/datashard/datashard_kqp_read_table.cpp
index ded2e109aa..f3d5848c4c 100644
--- a/ydb/core/tx/datashard/datashard_kqp_read_table.cpp
+++ b/ydb/core/tx/datashard/datashard_kqp_read_table.cpp
@@ -74,18 +74,6 @@ TMaybe<TKeyRangesType> ParseKeyRangesType(const TTupleType* rangeTupleType) {
};
}
-TSerializedTableRange CreateTableRange(const TParseReadTableResult& parseResult, const IComputationNode* fromNode,
- const IComputationNode* toNode, const TTypeEnvironment& typeEnv, TComputationContext& ctx)
-{
- TVector<TCell> fromCells;
- BuildKeyTupleCells(parseResult.FromTuple->GetType(), fromNode->GetValue(ctx), fromCells, typeEnv);
-
- TVector<TCell> toCells;
- BuildKeyTupleCells(parseResult.ToTuple->GetType(), toNode->GetValue(ctx), toCells, typeEnv);
-
- return TSerializedTableRange(fromCells, parseResult.FromInclusive, toCells, parseResult.ToInclusive);
-}
-
TSerializedTableRange BuildFullRange(ui32 keyColumnsSize) {
/* Build range from NULL, ... NULL to +inf, ... +inf */
TVector<TCell> fromKeyValues(keyColumnsSize);
@@ -183,15 +171,6 @@ TVector<TSerializedTableRange> CreateTableRanges(const TParseReadTableRangesResu
return ranges;
}
-void CreateRangePoints(ui64 localTid, const TSerializedTableRange& serializedTableRange, TSmallVec<TRawTypeValue>& from,
- TSmallVec<TRawTypeValue>& to, TKqpDatashardComputeContext& computeCtx)
-{
- const auto* tableInfo = computeCtx.Database->GetScheme().GetTableInfo(localTid);
- auto tableRange = serializedTableRange.ToTableRange();
- ConvertTableKeys(computeCtx.Database->GetScheme(), tableInfo, tableRange.From, from, nullptr);
- ConvertTableKeys(computeCtx.Database->GetScheme(), tableInfo, tableRange.To, to, nullptr);
-}
-
template <bool IsReverse>
class TKqpWideReadTableWrapperBase : public TStatelessWideFlowCodegeneratorNode<TKqpWideReadTableWrapperBase<IsReverse>> {
public:
@@ -222,94 +201,25 @@ protected:
virtual EFetchResult ReadValue(TComputationContext& ctx, NUdf::TUnboxedValue* const* output) const = 0;
EFetchResult ReadNext(NUdf::TUnboxedValue* const* output) const {
- bool breakLocks = false;
- while (Iterator->Next(NTable::ENext::Data) == NTable::EReady::Data) {
- if (!breakLocks && (breakLocks = bool(Iterator->Stats.InvisibleRowSkips))) {
- ComputeCtx.BreakSetLocks();
- }
- TDbTupleRef rowKey = Iterator->GetKey();
-
- ComputeCtx.AddKeyAccessSample(TableId, rowKey.Cells());
-
- ui64 deletedRowSkips = std::exchange(Iterator->Stats.DeletedRowSkips, 0);
- ui64 invisibleRowSkips = std::exchange(Iterator->Stats.InvisibleRowSkips, 0);
-
- ShardTableStats.SelectRangeDeletedRowSkips += deletedRowSkips;
- ShardTableStats.InvisibleRowSkips += invisibleRowSkips;
-
- TaskTableStats.SelectRangeDeletedRowSkips += deletedRowSkips;
- TaskTableStats.InvisibleRowSkips += invisibleRowSkips;
-
- Y_VERIFY(SkipNullKeys.size() <= rowKey.ColumnCount);
- bool skipRow = false;
- for (ui32 i = 0; i < SkipNullKeys.size(); ++i) {
- if (SkipNullKeys[i] && rowKey.Columns[i].IsNull()) {
- skipRow = true;
- break;
- }
- }
- if (skipRow) {
- continue;
- }
+ TKqpTableStats stats;
+ bool fetched = ComputeCtx.ReadRowWide(TableId, *Iterator, SystemColumnTags, SkipNullKeys, output, stats);
- TDbTupleRef rowValues = Iterator->GetValues();
-
- size_t columnsCount = rowValues.ColumnCount + SystemColumnTags.size();
-
- ui64 rowSize = 0;
- for (ui32 i = 0; i < rowValues.ColumnCount; ++i) {
- rowSize += rowValues.Columns[i].IsNull() ? 1 : rowValues.Columns[i].Size();
- if (auto out = *output++) {
- *out = GetCellValue(rowValues.Cells()[i], rowValues.Types[i]);
- }
- }
-
- // Some per-row overhead to deal with the case when no columns were requested
- rowSize = std::max(rowSize, (ui64) 8);
+ if (stats.InvisibleRowSkips) {
+ ComputeCtx.BreakSetLocks();
+ }
- for (ui32 i = rowValues.ColumnCount, j = 0; i < columnsCount; ++i, ++j) {
- auto out = *output++;
- if (!out) {
- continue;
- }
-
- switch (SystemColumnTags[j]) {
- case TKeyDesc::EColumnIdDataShard:
- *out = TUnboxedValue(TUnboxedValuePod(ComputeCtx.GetShardId()));
- break;
- default:
- throw TSchemeErrorTabletException();
- }
- }
+ ShardTableStats += stats;
+ TaskTableStats += stats;
+ if (fetched) {
if (Remains) {
Remains = *Remains - 1;
}
- ShardTableStats.SelectRangeRows++;
- ShardTableStats.SelectRangeBytes += rowSize;
-
- TaskTableStats.SelectRangeRows++;
- TaskTableStats.SelectRangeBytes += rowSize;
-
return EFetchResult::One;
}
- if (!breakLocks && bool(Iterator->Stats.InvisibleRowSkips)) {
- ComputeCtx.BreakSetLocks();
- }
-
- auto deletedRowSkips = std::exchange(Iterator->Stats.DeletedRowSkips, 0);
- auto invisibleRowSkips = std::exchange(Iterator->Stats.InvisibleRowSkips, 0);
-
- ShardTableStats.SelectRangeDeletedRowSkips += deletedRowSkips;
- ShardTableStats.InvisibleRowSkips += invisibleRowSkips;
-
- TaskTableStats.SelectRangeDeletedRowSkips += deletedRowSkips;
- TaskTableStats.InvisibleRowSkips += invisibleRowSkips;
-
- if (Iterator->Last() == NTable::EReady::Page) {
- ComputeCtx.SetTabletNotReady();
+ if (ComputeCtx.IsTabletNotReady()) {
return EFetchResult::Yield;
}
@@ -341,7 +251,6 @@ public:
, FromNode(fromNode)
, ToNode(toNode)
, ItemsLimit(itemsLimit)
- , LocalTid(computeCtx.GetLocalTableId(parseResult.TableId))
, ColumnTags(ExtractTags(parseResult.Columns))
{
this->ShardTableStats.NSelectRange++;
@@ -351,27 +260,22 @@ public:
private:
EFetchResult ReadValue(TComputationContext& ctx, NUdf::TUnboxedValue* const* output) const final {
if (!this->Iterator) {
- auto serializedTableRange = CreateTableRange(ParseResult, FromNode, ToNode, this->TypeEnv, ctx);
- auto tableRange = serializedTableRange.ToTableRange();
- this->ComputeCtx.ReadTable(ParseResult.TableId, tableRange);
+ TVector<TCell> fromCells;
+ BuildKeyTupleCells(ParseResult.FromTuple->GetType(), FromNode->GetValue(ctx), fromCells, this->TypeEnv);
- TSmallVec<TRawTypeValue> from, to;
- CreateRangePoints(LocalTid, serializedTableRange, from, to, this->ComputeCtx);
+ TVector<TCell> toCells;
+ BuildKeyTupleCells(ParseResult.ToTuple->GetType(), ToNode->GetValue(ctx), toCells, this->TypeEnv);
- NTable::TKeyRange keyRange;
- keyRange.MinKey = from;
- keyRange.MaxKey = to;
- keyRange.MinInclusive = tableRange.InclusiveFrom;
- keyRange.MaxInclusive = tableRange.InclusiveTo;
+ auto range = TTableRange(fromCells, ParseResult.FromInclusive, toCells, ParseResult.ToInclusive);
if (ItemsLimit) {
this->Remains = ItemsLimit->GetValue(ctx).Get<ui64>();
}
if constexpr (IsReverse) {
- this->Iterator = this->ComputeCtx.Database->IterateRangeReverse(LocalTid, keyRange, ColumnTags, this->ComputeCtx.GetReadVersion());
+ this->Iterator = this->ComputeCtx.CreateReverseIterator(ParseResult.TableId, range, ColumnTags);
} else {
- this->Iterator = this->ComputeCtx.Database->IterateRange(LocalTid, keyRange, ColumnTags, this->ComputeCtx.GetReadVersion());
+ this->Iterator = this->ComputeCtx.CreateIterator(ParseResult.TableId, range, ColumnTags);
}
}
@@ -396,9 +300,7 @@ private:
IComputationNode* FromNode;
IComputationNode* ToNode;
IComputationNode* ItemsLimit;
- ui64 LocalTid;
TSmallVec<TTag> ColumnTags;
- ui64 TaskId;
};
template <bool IsReverse>
@@ -411,14 +313,13 @@ public:
, ParseResult(parseResult)
, RangesNode(rangesNode)
, ItemsLimit(itemsLimit)
- , LocalTid(computeCtx.GetLocalTableId(parseResult.TableId))
- , ColumnTags(ExtractTags(parseResult.Columns)) {
- }
+ , ColumnTags(ExtractTags(parseResult.Columns)) {}
private:
EFetchResult ReadValue(TComputationContext& ctx, NUdf::TUnboxedValue* const* output) const final {
if (!RangeId) {
- const auto* tableInfo = this->ComputeCtx.Database->GetScheme().GetTableInfo(LocalTid);
+ const auto localTid = this->ComputeCtx.GetLocalTableId(ParseResult.TableId);
+ const auto* tableInfo = this->ComputeCtx.Database->GetScheme().GetTableInfo(localTid);
Ranges = CreateTableRanges<IsReverse>(ParseResult, RangesNode, this->TypeEnv, ctx, tableInfo->KeyColumns.size());
RangeId = 0;
@@ -433,23 +334,12 @@ private:
this->TaskTableStats.NSelectRange++;
if (!this->Iterator) {
- auto& range = Ranges[*RangeId];
- auto tableRange = range.ToTableRange();
- this->ComputeCtx.ReadTable(ParseResult.TableId, tableRange);
-
- TSmallVec<TRawTypeValue> from, to;
- CreateRangePoints(LocalTid, range, from, to, this->ComputeCtx);
-
- NTable::TKeyRange keyRange;
- keyRange.MinKey = from;
- keyRange.MaxKey = to;
- keyRange.MinInclusive = tableRange.InclusiveFrom;
- keyRange.MaxInclusive = tableRange.InclusiveTo;
+ auto range = Ranges[*RangeId].ToTableRange();
if constexpr (IsReverse) {
- this->Iterator = this->ComputeCtx.Database->IterateRangeReverse(LocalTid, keyRange, ColumnTags, this->ComputeCtx.GetReadVersion());
+ this->Iterator = this->ComputeCtx.CreateReverseIterator(ParseResult.TableId, range, ColumnTags);
} else {
- this->Iterator = this->ComputeCtx.Database->IterateRange(LocalTid, keyRange, ColumnTags, this->ComputeCtx.GetReadVersion());
+ this->Iterator = this->ComputeCtx.CreateIterator(ParseResult.TableId, range, ColumnTags);
}
}
@@ -479,95 +369,11 @@ private:
TParseReadTableRangesResult ParseResult;
IComputationNode* RangesNode;
IComputationNode* ItemsLimit;
- ui64 LocalTid;
TSmallVec<TTag> ColumnTags;
- ui64 TaskId;
mutable TVector<TSerializedTableRange> Ranges;
mutable std::optional<ui32> RangeId;
};
-void FetchRowImpl(const TDbTupleRef& dbTuple, TUnboxedValue& row, TComputationContext& ctx, TKqpTableStats& tableStats,
- const TKqpDatashardComputeContext& computeCtx, const TSmallVec<TTag>& systemColumnTags)
-{
- size_t columnsCount = dbTuple.ColumnCount + systemColumnTags.size();
-
- TUnboxedValue* rowItems = nullptr;
- row = ctx.HolderFactory.CreateDirectArrayHolder(columnsCount, rowItems);
-
- ui64 rowSize = 0;
- for (ui32 i = 0; i < dbTuple.ColumnCount; ++i) {
- rowSize += dbTuple.Columns[i].IsNull() ? 1 : dbTuple.Columns[i].Size();
- rowItems[i] = GetCellValue(dbTuple.Cells()[i], dbTuple.Types[i]);
- }
-
- // Some per-row overhead to deal with the case when no columns were requested
- rowSize = std::max(rowSize, (ui64)8);
-
- for (ui32 i = dbTuple.ColumnCount, j = 0; i < columnsCount; ++i, ++j) {
- switch (systemColumnTags[j]) {
- case TKeyDesc::EColumnIdDataShard:
- rowItems[i] = TUnboxedValue(TUnboxedValuePod(computeCtx.GetShardId()));
- break;
- default:
- throw TSchemeErrorTabletException();
- }
- }
-
- tableStats.NSelectRow++;
- tableStats.SelectRowRows++;
- tableStats.SelectRowBytes += rowSize;
-}
-
-template <typename TTableIterator>
-bool TryFetchRowImpl(const TTableId& tableId, TTableIterator& iterator, TUnboxedValue& row, TComputationContext& ctx,
- TKqpTableStats& tableStats, TKqpDatashardComputeContext& computeCtx, const TSmallVec<TTag>& systemColumnTags,
- const TSmallVec<bool>& skipNullKeys)
-{
- while (iterator.Next(NTable::ENext::Data) == NTable::EReady::Data) {
- TDbTupleRef rowKey = iterator.GetKey();
- computeCtx.AddKeyAccessSample(tableId, rowKey.Cells());
-
- Y_VERIFY(skipNullKeys.size() <= rowKey.ColumnCount);
- bool skipRow = false;
- for (ui32 i = 0; i < skipNullKeys.size(); ++i) {
- if (skipNullKeys[i] && rowKey.Columns[i].IsNull()) {
- skipRow = true;
- break;
- }
- }
-
- if (skipRow) {
- continue;
- }
-
- TDbTupleRef rowValues = iterator.GetValues();
- FetchRowImpl(rowValues, row, ctx, tableStats, computeCtx, systemColumnTags);
- return true;
- }
-
- return false;
-}
-
-} // namespace
-
-bool TryFetchRow(const TTableId& tableId, TTableIt& iterator, TUnboxedValue& row, TComputationContext& ctx,
- TKqpTableStats& tableStats, TKqpDatashardComputeContext& computeCtx, const TSmallVec<TTag>& systemColumnTags,
- const TSmallVec<bool>& skipNullKeys)
-{
- return TryFetchRowImpl(tableId, iterator, row, ctx, tableStats, computeCtx, systemColumnTags, skipNullKeys);
-}
-
-bool TryFetchRow(const TTableId& tableId, TTableReverseIt& iterator, TUnboxedValue& row, TComputationContext& ctx,
- TKqpTableStats& tableStats, TKqpDatashardComputeContext& computeCtx, const TSmallVec<TTag>& systemColumnTags,
- const TSmallVec<bool>& skipNullKeys)
-{
- return TryFetchRowImpl(tableId, iterator, row, ctx, tableStats, computeCtx, systemColumnTags, skipNullKeys);
-}
-
-void FetchRow(const TDbTupleRef& dbTuple, TUnboxedValue& row, TComputationContext& ctx, TKqpTableStats& tableStats,
- const TKqpDatashardComputeContext& computeCtx, const TSmallVec<TTag>& systemColumnTags)
-{
- return FetchRowImpl(dbTuple, row, ctx, tableStats, computeCtx, systemColumnTags);
}
IComputationNode* WrapKqpWideReadTableRanges(TCallable& callable, const TComputationNodeFactoryContext& ctx,