aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorulya-sidorina <yulia@ydb.tech>2023-11-15 18:17:30 +0300
committerulya-sidorina <yulia@ydb.tech>2023-11-15 20:14:54 +0300
commit750bbdd3edcc13eea77487accc7c8cc78b03dee3 (patch)
tree2dfc32e15ef377b9a35b99948e75a34f79a70361
parent7e1bf1fbfe0570fabfdc744d4dbb40e563c2ea71 (diff)
downloadydb-750bbdd3edcc13eea77487accc7c8cc78b03dee3.tar.gz
KIKIMR-20025: fix stats collecting for stream lookup join
fix(kqp): collect stats
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp19
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_actor.h4
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp2
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp103
-rw-r--r--ydb/core/kqp/runtime/kqp_stream_lookup_worker.h28
-rw-r--r--ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp50
6 files changed, 165 insertions, 41 deletions
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
index 3ff0d16d39..86e13ce478 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
@@ -28,7 +28,8 @@ public:
TKqpStreamLookupActor(ui64 inputIndex, NYql::NDq::TCollectStatsLevel statsLevel, const NUdf::TUnboxedValue& input,
const NActors::TActorId& computeActorId, const NMiniKQL::TTypeEnvironment& typeEnv,
const NMiniKQL::THolderFactory& holderFactory, std::shared_ptr<NMiniKQL::TScopedAlloc>& alloc,
- NKikimrKqp::TKqpStreamLookupSettings&& settings, TIntrusivePtr<TKqpCounters> counters)
+ const NYql::NDqProto::TTaskInput& inputDesc, NKikimrKqp::TKqpStreamLookupSettings&& settings,
+ TIntrusivePtr<TKqpCounters> counters)
: LogPrefix(TStringBuilder() << "StreamLookupActor, inputIndex: " << inputIndex << ", CA Id " << computeActorId)
, InputIndex(inputIndex)
, Input(input)
@@ -38,7 +39,7 @@ public:
, Snapshot(settings.GetSnapshot().GetStep(), settings.GetSnapshot().GetTxId())
, LockTxId(settings.HasLockTxId() ? settings.GetLockTxId() : TMaybe<ui64>())
, SchemeCacheRequestTimeout(SCHEME_CACHE_REQUEST_TIMEOUT)
- , StreamLookupWorker(CreateStreamLookupWorker(std::move(settings), typeEnv, holderFactory))
+ , StreamLookupWorker(CreateStreamLookupWorker(std::move(settings), typeEnv, holderFactory, inputDesc))
, Counters(counters)
{
IngressStats.Level = statsLevel;
@@ -179,8 +180,8 @@ private:
YQL_ENSURE(!batch.IsWide(), "Wide stream is not supported");
auto replyResultStats = StreamLookupWorker->ReplyResult(batch, freeSpace);
- ReadRowsCount += replyResultStats.RowsCount;
- ReadBytesCount += replyResultStats.BytesCount;
+ ReadRowsCount += replyResultStats.ReadRowsCount;
+ ReadBytesCount += replyResultStats.ReadBytesCount;
auto status = FetchInputRows();
@@ -192,10 +193,10 @@ private:
&& AllReadsFinished()
&& StreamLookupWorker->AllRowsProcessed();
- CA_LOG_D("Returned " << replyResultStats.BytesCount << " bytes, " << replyResultStats.RowsCount
+ CA_LOG_D("Returned " << replyResultStats.ResultBytesCount << " bytes, " << replyResultStats.ResultRowsCount
<< " rows, finished: " << finished);
- return replyResultStats.BytesCount;
+ return replyResultStats.ResultBytesCount;
}
TMaybe<google::protobuf::Any> ExtraData() override {
@@ -501,10 +502,10 @@ private:
std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateStreamLookupActor(ui64 inputIndex,
NYql::NDq::TCollectStatsLevel statsLevel, const NUdf::TUnboxedValue& input, const NActors::TActorId& computeActorId,
const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory,
- std::shared_ptr<NMiniKQL::TScopedAlloc>& alloc, NKikimrKqp::TKqpStreamLookupSettings&& settings,
- TIntrusivePtr<TKqpCounters> counters) {
+ std::shared_ptr<NMiniKQL::TScopedAlloc>& alloc, const NYql::NDqProto::TTaskInput& inputDesc,
+ NKikimrKqp::TKqpStreamLookupSettings&& settings, TIntrusivePtr<TKqpCounters> counters) {
auto actor = new TKqpStreamLookupActor(inputIndex, statsLevel, input, computeActorId, typeEnv, holderFactory,
- alloc, std::move(settings), counters);
+ alloc, inputDesc, std::move(settings), counters);
return {actor, actor};
}
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.h b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.h
index 6e4b7bad4a..7e8e266dbc 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.h
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.h
@@ -10,8 +10,8 @@ namespace NKqp {
std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateStreamLookupActor(ui64 inputIndex,
NYql::NDq::TCollectStatsLevel statsLevel, const NUdf::TUnboxedValue& input, const NActors::TActorId& computeActorId,
const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory,
- std::shared_ptr<NMiniKQL::TScopedAlloc>& alloc, NKikimrKqp::TKqpStreamLookupSettings&& settings,
- TIntrusivePtr<TKqpCounters>);
+ std::shared_ptr<NMiniKQL::TScopedAlloc>& alloc, const NYql::NDqProto::TTaskInput& inputDesc,
+ NKikimrKqp::TKqpStreamLookupSettings&& settings, TIntrusivePtr<TKqpCounters>);
} // namespace NKqp
} // namespace NKikimr
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp
index 98661a47b3..ddcd068a20 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp
@@ -8,7 +8,7 @@ void RegisterStreamLookupActorFactory(NYql::NDq::TDqAsyncIoFactory& factory, TIn
factory.RegisterInputTransform<NKikimrKqp::TKqpStreamLookupSettings>("StreamLookupInputTransformer", [counters](NKikimrKqp::TKqpStreamLookupSettings&& settings,
NYql::NDq::TDqAsyncIoFactory::TInputTransformArguments&& args) {
return CreateStreamLookupActor(args.InputIndex, args.StatsLevel, args.TransformInput, args.ComputeActorId, args.TypeEnv,
- args.HolderFactory, args.Alloc, std::move(settings), counters);
+ args.HolderFactory, args.Alloc, args.InputDesc, std::move(settings), counters);
});
}
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp
index b9947b04a6..ee07f67d16 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp
@@ -4,6 +4,8 @@
#include <ydb/core/kqp/runtime/kqp_scan_data.h>
#include <ydb/core/tx/datashard/range_ops.h>
+#include <ydb/library/yql/minikql/mkql_node_serialization.h>
+
namespace NKikimr {
namespace NKqp {
@@ -51,6 +53,20 @@ std::vector<std::pair<ui64, TOwnedTableRange>> GetRangePartitioning(const TKqpSt
return rangePartition;
}
+NScheme::TTypeInfo UnpackTypeInfo(NKikimr::NMiniKQL::TType* type) {
+ YQL_ENSURE(type);
+
+ if (type->GetKind() == NMiniKQL::TType::EKind::Pg) {
+ auto pgType = static_cast<NMiniKQL::TPgType*>(type);
+ auto pgTypeId = pgType->GetTypeId();
+ return NScheme::TTypeInfo(NScheme::NTypeIds::Pg, NPg::TypeDescFromPgTypeId(pgTypeId));
+ } else {
+ bool isOptional = false;
+ auto dataType = NMiniKQL::UnpackOptionalData(type, isOptional);
+ return NScheme::TTypeInfo(dataType->GetSchemeType());
+ }
+}
+
struct THashableKey {
TConstArrayRef<TCell> Cells;
@@ -118,9 +134,11 @@ struct TKeyEq {
} // !namespace
TKqpStreamLookupWorker::TKqpStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSettings&& settings,
- const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory)
+ const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory,
+ const NYql::NDqProto::TTaskInput& inputDesc)
: TypeEnv(typeEnv)
, HolderFactory(holderFactory)
+ , InputDesc(inputDesc)
, TablePath(settings.GetTable().GetPath())
, TableId(MakeTableId(settings.GetTable())) {
@@ -180,7 +198,8 @@ std::vector<NScheme::TTypeInfo> TKqpStreamLookupWorker::GetKeyColumnTypes() cons
class TKqpLookupRows : public TKqpStreamLookupWorker {
public:
TKqpLookupRows(NKikimrKqp::TKqpStreamLookupSettings&& settings, const NMiniKQL::TTypeEnvironment& typeEnv,
- const NMiniKQL::THolderFactory& holderFactory) : TKqpStreamLookupWorker(std::move(settings), typeEnv, holderFactory) {
+ const NMiniKQL::THolderFactory& holderFactory, const NYql::NDqProto::TTaskInput& inputDesc)
+ : TKqpStreamLookupWorker(std::move(settings), typeEnv, holderFactory, inputDesc) {
}
virtual ~TKqpLookupRows() {}
@@ -289,7 +308,7 @@ public:
}
}
- if (rowSize > freeSpace - (i64)resultStats.BytesCount) {
+ if (rowSize > freeSpace - (i64)resultStats.ResultBytesCount) {
row.DeleteUnreferenced();
sizeLimitExceeded = true;
break;
@@ -297,8 +316,10 @@ public:
batch.push_back(std::move(row));
- resultStats.RowsCount += 1;
- resultStats.BytesCount += rowSize;
+ resultStats.ReadRowsCount += 1;
+ resultStats.ReadBytesCount += rowSize;
+ resultStats.ResultRowsCount += 1;
+ resultStats.ResultBytesCount += rowSize;
}
if (result.UnprocessedResultRow == result.ReadResult->Get()->GetRowsCount()) {
@@ -389,7 +410,8 @@ private:
class TKqpJoinRows : public TKqpStreamLookupWorker {
public:
TKqpJoinRows(NKikimrKqp::TKqpStreamLookupSettings&& settings, const NMiniKQL::TTypeEnvironment& typeEnv,
- const NMiniKQL::THolderFactory& holderFactory) : TKqpStreamLookupWorker(std::move(settings), typeEnv, holderFactory) {
+ const NMiniKQL::THolderFactory& holderFactory, const NYql::NDqProto::TTaskInput& inputDesc)
+ : TKqpStreamLookupWorker(std::move(settings), typeEnv, holderFactory, inputDesc) {
// read columns should contain join key and result columns
for (auto joinKey : LookupKeyColumns) {
@@ -574,9 +596,9 @@ public:
auto leftRowIt = PendingLeftRowsByKey.find(joinKeyCells);
YQL_ENSURE(leftRowIt != PendingLeftRowsByKey.end());
- i64 resultRowSize = 0;
- i64 availableSpace = freeSpace - (i64)resultStats.BytesCount;
- auto resultRow = TryBuildResultRow(leftRowIt->second, row, resultRowSize, availableSpace, result.ShardId);
+ TReadResultStats rowStats;
+ i64 availableSpace = freeSpace - (i64)resultStats.ResultBytesCount;
+ auto resultRow = TryBuildResultRow(leftRowIt->second, row, rowStats, availableSpace, result.ShardId);
if (!resultRow.HasValue()) {
sizeLimitExceeded = true;
@@ -584,9 +606,7 @@ public:
}
batch.push_back(std::move(resultRow));
-
- resultStats.RowsCount += 1;
- resultStats.BytesCount += resultRowSize;
+ resultStats.Add(rowStats);
}
if (result.UnprocessedResultRow == result.ReadResult->Get()->GetRowsCount()) {
@@ -620,15 +640,16 @@ public:
&& !leftRowIt->second.RightRowExist;
if (leftRowCanBeSent) {
- i64 resultRowSize = 0;
- i64 availableSpace = freeSpace - (i64) resultStats.BytesCount;
- auto resultRow = TryBuildResultRow(leftRowIt->second, {}, resultRowSize, availableSpace);
+ TReadResultStats rowStats;
+ i64 availableSpace = freeSpace - (i64)resultStats.ResultBytesCount;
+ auto resultRow = TryBuildResultRow(leftRowIt->second, {}, rowStats, availableSpace);
if (!resultRow.HasValue()) {
break;
}
batch.push_back(std::move(resultRow));
+ resultStats.Add(rowStats);
PendingLeftRowsByKey.erase(leftRowIt++);
} else {
++leftRowIt;
@@ -697,17 +718,44 @@ private:
return range.From.subspan(0, LookupKeyColumns.size());
}
- NUdf::TUnboxedValue TryBuildResultRow(TLeftRowInfo& leftRowInfo, TConstArrayRef<TCell> rightRow, i64& resultRowSize,
- i64 freeSpace, TMaybe<ui64> shardId = {}) {
+ NMiniKQL::TStructType* GetLeftRowType() const {
+ YQL_ENSURE(InputDesc.HasTransform());
+
+ auto outputTypeNode = NMiniKQL::DeserializeNode(TStringBuf{InputDesc.GetTransform().GetOutputType()}, TypeEnv);
+ YQL_ENSURE(outputTypeNode, "Failed to deserialize stream lookup transform output type");
+
+ auto outputType = static_cast<NMiniKQL::TType*>(outputTypeNode);
+ YQL_ENSURE(outputType->GetKind() == NMiniKQL::TType::EKind::Tuple, "Unexpected stream lookup output type");
+
+ const auto outputTupleType = AS_TYPE(NMiniKQL::TTupleType, outputType);
+ YQL_ENSURE(outputTupleType->GetElementsCount() == 2);
+
+ const auto outputLeftRowType = outputTupleType->GetElementType(0);
+ YQL_ENSURE(outputLeftRowType->GetKind() == NMiniKQL::TType::EKind::Struct);
+
+ return AS_TYPE(NMiniKQL::TStructType, outputLeftRowType);
+ }
+
+ NUdf::TUnboxedValue TryBuildResultRow(TLeftRowInfo& leftRowInfo, TConstArrayRef<TCell> rightRow,
+ TReadResultStats& rowStats, i64 freeSpace, TMaybe<ui64> shardId = {}) {
NUdf::TUnboxedValue* resultRowItems = nullptr;
auto resultRow = HolderFactory.CreateDirectArrayHolder(2, resultRowItems);
+ ui64 leftRowSize = 0;
+ ui64 rightRowSize = 0;
+
resultRowItems[0] = leftRowInfo.Row;
+ auto leftRowType = GetLeftRowType();
+ YQL_ENSURE(leftRowType);
+
+ for (size_t i = 0; i < leftRowType->GetMembersCount(); ++i) {
+ auto columnTypeInfo = UnpackTypeInfo(leftRowType->GetMemberType(i));
+ leftRowSize += NMiniKQL::GetUnboxedValueSize(leftRowInfo.Row.GetElement(i), columnTypeInfo).AllocatedBytes;
+ }
if (!rightRow.empty()) {
leftRowInfo.RightRowExist = true;
- // TODO: get size for left row
NUdf::TUnboxedValue* rightRowItems = nullptr;
resultRowItems[1] = HolderFactory.CreateDirectArrayHolder(Columns.size(), rightRowItems);
@@ -720,19 +768,25 @@ private:
if (IsSystemColumn(column.Name)) {
YQL_ENSURE(shardId);
NMiniKQL::FillSystemColumn(rightRowItems[colIndex], *shardId, column.Id, column.PType);
- resultRowSize += sizeof(NUdf::TUnboxedValue);
+ rightRowSize += sizeof(NUdf::TUnboxedValue);
} else {
rightRowItems[colIndex] = NMiniKQL::GetCellValue(rightRow[std::distance(ReadColumns.begin(), it)],
column.PType);
- resultRowSize += NMiniKQL::GetUnboxedValueSize(rightRowItems[colIndex], column.PType).AllocatedBytes;
+ rightRowSize += NMiniKQL::GetUnboxedValueSize(rightRowItems[colIndex], column.PType).AllocatedBytes;
}
}
} else {
resultRowItems[1] = NUdf::TUnboxedValuePod();
}
- if (resultRowSize > freeSpace) {
+ rowStats.ReadRowsCount += (rightRowSize > 0 ? 1 : 0);
+ rowStats.ReadBytesCount += rightRowSize;
+ rowStats.ResultRowsCount += 1;
+ rowStats.ResultBytesCount += leftRowSize + rightRowSize;
+
+ if (rowStats.ResultBytesCount > (ui64)freeSpace) {
resultRow.DeleteUnreferenced();
+ rowStats.Clear();
}
return resultRow;
@@ -748,13 +802,14 @@ private:
};
std::unique_ptr<TKqpStreamLookupWorker> CreateStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSettings&& settings,
- const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory) {
+ const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory,
+ const NYql::NDqProto::TTaskInput& inputDesc) {
switch (settings.GetLookupStrategy()) {
case NKqpProto::EStreamLookupStrategy::LOOKUP:
- return std::make_unique<TKqpLookupRows>(std::move(settings), typeEnv, holderFactory);
+ return std::make_unique<TKqpLookupRows>(std::move(settings), typeEnv, holderFactory, inputDesc);
case NKqpProto::EStreamLookupStrategy::JOIN:
- return std::make_unique<TKqpJoinRows>(std::move(settings), typeEnv, holderFactory);
+ return std::make_unique<TKqpJoinRows>(std::move(settings), typeEnv, holderFactory, inputDesc);
default:
return {};
}
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h
index 56ac40019d..b4bf0f47ef 100644
--- a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h
+++ b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h
@@ -22,13 +22,29 @@ public:
};
struct TReadResultStats {
- ui64 RowsCount = 0;
- ui64 BytesCount = 0;
+ ui64 ReadRowsCount = 0;
+ ui64 ReadBytesCount = 0;
+ ui64 ResultRowsCount = 0;
+ ui64 ResultBytesCount = 0;
+
+ void Add(const TReadResultStats& other) {
+ ReadRowsCount += other.ReadRowsCount;
+ ReadBytesCount += other.ReadBytesCount;
+ ResultRowsCount += other.ResultRowsCount;
+ ResultBytesCount += other.ResultBytesCount;
+ }
+
+ void Clear() {
+ ReadRowsCount = 0;
+ ReadBytesCount = 0;
+ ResultRowsCount = 0;
+ ResultBytesCount = 0;
+ }
};
public:
- TKqpStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSettings&& settings,
- const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory);
+ TKqpStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSettings&& settings, const NMiniKQL::TTypeEnvironment& typeEnv,
+ const NMiniKQL::THolderFactory& holderFactory, const NYql::NDqProto::TTaskInput& inputDesc);
virtual ~TKqpStreamLookupWorker();
@@ -46,6 +62,7 @@ public:
protected:
const NMiniKQL::TTypeEnvironment& TypeEnv;
const NMiniKQL::THolderFactory& HolderFactory;
+ const NYql::NDqProto::TTaskInput& InputDesc;
const TString TablePath;
const TTableId TableId;
std::unordered_map<TString, TSysTables::TTableColumnInfo> KeyColumns;
@@ -54,7 +71,8 @@ protected:
};
std::unique_ptr<TKqpStreamLookupWorker> CreateStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSettings&& settings,
- const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory);
+ const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory,
+ const NYql::NDqProto::TTaskInput& inputDesc);
} // namespace NKqp
} // namespace NKikimr
diff --git a/ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp b/ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp
index 7c09a845e2..2a243f8c84 100644
--- a/ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp
+++ b/ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp
@@ -275,6 +275,56 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) {
UNIT_ASSERT_VALUES_EQUAL(totalTasks, EnableSourceRead ? 2 : 3);
}
+ Y_UNIT_TEST_TWIN(IndexLookupJoin, EnableStreamLookup) {
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(EnableStreamLookup);
+ auto settings = TKikimrSettings()
+ .SetAppConfig(appConfig);
+ TKikimrRunner kikimr{settings};
+
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+
+ NYdb::NTable::TExecDataQuerySettings execSettings;
+ execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile);
+
+ auto result = session.ExecuteDataQuery(Q1_(R"(
+ SELECT l.Key, r.Key1, r.Key2
+ FROM `/Root/Join1` AS l
+ INNER JOIN `/Root/Join2` AS r
+ ON l.Fk21 = r.Key1
+ ORDER BY l.Key, r.Key1, r.Key2;
+ )"), TTxControl::BeginTx().CommitTx(), execSettings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
+ CompareYson(R"([
+ [[1];[101u];["One"]];
+ [[1];[101u];["Three"]];
+ [[1];[101u];["Two"]];
+ [[2];[102u];["One"]];
+ [[3];[103u];["One"]];
+ [[4];[104u];["One"]];
+ [[5];[105u];["One"]];
+ [[5];[105u];["Two"]];
+ [[6];[106u];["One"]];
+ [[8];[108u];["One"]];
+ [[9];[101u];["One"]];
+ [[9];[101u];["Three"]];
+ [[9];[101u];["Two"]]
+ ])", FormatResultSetYson(result.GetResultSet(0)));
+
+ auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());
+
+ AssertTableStats(result, "/Root/Join1", {
+ .ExpectedReads = 9,
+ });
+
+ AssertTableStats(result, "/Root/Join2", {
+ .ExpectedReads = EnableStreamLookup ? 13 : 10,
+ });
+
+ UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), EnableStreamLookup ? 1 : 3);
+ }
+
Y_UNIT_TEST(Upsert) {
auto kikimr = DefaultKikimrRunner();
auto db = kikimr.GetTableClient();