diff options
author | ulya-sidorina <yulia@ydb.tech> | 2023-11-15 18:17:30 +0300 |
---|---|---|
committer | ulya-sidorina <yulia@ydb.tech> | 2023-11-15 20:14:54 +0300 |
commit | 750bbdd3edcc13eea77487accc7c8cc78b03dee3 (patch) | |
tree | 2dfc32e15ef377b9a35b99948e75a34f79a70361 | |
parent | 7e1bf1fbfe0570fabfdc744d4dbb40e563c2ea71 (diff) | |
download | ydb-750bbdd3edcc13eea77487accc7c8cc78b03dee3.tar.gz |
KIKIMR-20025: fix stats collecting for stream lookup join
fix(kqp): collect stats
-rw-r--r-- | ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp | 19 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_stream_lookup_actor.h | 4 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp | 103 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_stream_lookup_worker.h | 28 | ||||
-rw-r--r-- | ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp | 50 |
6 files changed, 165 insertions, 41 deletions
diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp index 3ff0d16d39..86e13ce478 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp @@ -28,7 +28,8 @@ public: TKqpStreamLookupActor(ui64 inputIndex, NYql::NDq::TCollectStatsLevel statsLevel, const NUdf::TUnboxedValue& input, const NActors::TActorId& computeActorId, const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory, std::shared_ptr<NMiniKQL::TScopedAlloc>& alloc, - NKikimrKqp::TKqpStreamLookupSettings&& settings, TIntrusivePtr<TKqpCounters> counters) + const NYql::NDqProto::TTaskInput& inputDesc, NKikimrKqp::TKqpStreamLookupSettings&& settings, + TIntrusivePtr<TKqpCounters> counters) : LogPrefix(TStringBuilder() << "StreamLookupActor, inputIndex: " << inputIndex << ", CA Id " << computeActorId) , InputIndex(inputIndex) , Input(input) @@ -38,7 +39,7 @@ public: , Snapshot(settings.GetSnapshot().GetStep(), settings.GetSnapshot().GetTxId()) , LockTxId(settings.HasLockTxId() ? settings.GetLockTxId() : TMaybe<ui64>()) , SchemeCacheRequestTimeout(SCHEME_CACHE_REQUEST_TIMEOUT) - , StreamLookupWorker(CreateStreamLookupWorker(std::move(settings), typeEnv, holderFactory)) + , StreamLookupWorker(CreateStreamLookupWorker(std::move(settings), typeEnv, holderFactory, inputDesc)) , Counters(counters) { IngressStats.Level = statsLevel; @@ -179,8 +180,8 @@ private: YQL_ENSURE(!batch.IsWide(), "Wide stream is not supported"); auto replyResultStats = StreamLookupWorker->ReplyResult(batch, freeSpace); - ReadRowsCount += replyResultStats.RowsCount; - ReadBytesCount += replyResultStats.BytesCount; + ReadRowsCount += replyResultStats.ReadRowsCount; + ReadBytesCount += replyResultStats.ReadBytesCount; auto status = FetchInputRows(); @@ -192,10 +193,10 @@ private: && AllReadsFinished() && StreamLookupWorker->AllRowsProcessed(); - CA_LOG_D("Returned " << replyResultStats.BytesCount << " bytes, " << replyResultStats.RowsCount + CA_LOG_D("Returned " << replyResultStats.ResultBytesCount << " bytes, " << replyResultStats.ResultRowsCount << " rows, finished: " << finished); - return replyResultStats.BytesCount; + return replyResultStats.ResultBytesCount; } TMaybe<google::protobuf::Any> ExtraData() override { @@ -501,10 +502,10 @@ private: std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateStreamLookupActor(ui64 inputIndex, NYql::NDq::TCollectStatsLevel statsLevel, const NUdf::TUnboxedValue& input, const NActors::TActorId& computeActorId, const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory, - std::shared_ptr<NMiniKQL::TScopedAlloc>& alloc, NKikimrKqp::TKqpStreamLookupSettings&& settings, - TIntrusivePtr<TKqpCounters> counters) { + std::shared_ptr<NMiniKQL::TScopedAlloc>& alloc, const NYql::NDqProto::TTaskInput& inputDesc, + NKikimrKqp::TKqpStreamLookupSettings&& settings, TIntrusivePtr<TKqpCounters> counters) { auto actor = new TKqpStreamLookupActor(inputIndex, statsLevel, input, computeActorId, typeEnv, holderFactory, - alloc, std::move(settings), counters); + alloc, inputDesc, std::move(settings), counters); return {actor, actor}; } diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.h b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.h index 6e4b7bad4a..7e8e266dbc 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.h +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.h @@ -10,8 +10,8 @@ namespace NKqp { std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateStreamLookupActor(ui64 inputIndex, NYql::NDq::TCollectStatsLevel statsLevel, const NUdf::TUnboxedValue& input, const NActors::TActorId& computeActorId, const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory, - std::shared_ptr<NMiniKQL::TScopedAlloc>& alloc, NKikimrKqp::TKqpStreamLookupSettings&& settings, - TIntrusivePtr<TKqpCounters>); + std::shared_ptr<NMiniKQL::TScopedAlloc>& alloc, const NYql::NDqProto::TTaskInput& inputDesc, + NKikimrKqp::TKqpStreamLookupSettings&& settings, TIntrusivePtr<TKqpCounters>); } // namespace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp index 98661a47b3..ddcd068a20 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_factory.cpp @@ -8,7 +8,7 @@ void RegisterStreamLookupActorFactory(NYql::NDq::TDqAsyncIoFactory& factory, TIn factory.RegisterInputTransform<NKikimrKqp::TKqpStreamLookupSettings>("StreamLookupInputTransformer", [counters](NKikimrKqp::TKqpStreamLookupSettings&& settings, NYql::NDq::TDqAsyncIoFactory::TInputTransformArguments&& args) { return CreateStreamLookupActor(args.InputIndex, args.StatsLevel, args.TransformInput, args.ComputeActorId, args.TypeEnv, - args.HolderFactory, args.Alloc, std::move(settings), counters); + args.HolderFactory, args.Alloc, args.InputDesc, std::move(settings), counters); }); } diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp index b9947b04a6..ee07f67d16 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp @@ -4,6 +4,8 @@ #include <ydb/core/kqp/runtime/kqp_scan_data.h> #include <ydb/core/tx/datashard/range_ops.h> +#include <ydb/library/yql/minikql/mkql_node_serialization.h> + namespace NKikimr { namespace NKqp { @@ -51,6 +53,20 @@ std::vector<std::pair<ui64, TOwnedTableRange>> GetRangePartitioning(const TKqpSt return rangePartition; } +NScheme::TTypeInfo UnpackTypeInfo(NKikimr::NMiniKQL::TType* type) { + YQL_ENSURE(type); + + if (type->GetKind() == NMiniKQL::TType::EKind::Pg) { + auto pgType = static_cast<NMiniKQL::TPgType*>(type); + auto pgTypeId = pgType->GetTypeId(); + return NScheme::TTypeInfo(NScheme::NTypeIds::Pg, NPg::TypeDescFromPgTypeId(pgTypeId)); + } else { + bool isOptional = false; + auto dataType = NMiniKQL::UnpackOptionalData(type, isOptional); + return NScheme::TTypeInfo(dataType->GetSchemeType()); + } +} + struct THashableKey { TConstArrayRef<TCell> Cells; @@ -118,9 +134,11 @@ struct TKeyEq { } // !namespace TKqpStreamLookupWorker::TKqpStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSettings&& settings, - const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory) + const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory, + const NYql::NDqProto::TTaskInput& inputDesc) : TypeEnv(typeEnv) , HolderFactory(holderFactory) + , InputDesc(inputDesc) , TablePath(settings.GetTable().GetPath()) , TableId(MakeTableId(settings.GetTable())) { @@ -180,7 +198,8 @@ std::vector<NScheme::TTypeInfo> TKqpStreamLookupWorker::GetKeyColumnTypes() cons class TKqpLookupRows : public TKqpStreamLookupWorker { public: TKqpLookupRows(NKikimrKqp::TKqpStreamLookupSettings&& settings, const NMiniKQL::TTypeEnvironment& typeEnv, - const NMiniKQL::THolderFactory& holderFactory) : TKqpStreamLookupWorker(std::move(settings), typeEnv, holderFactory) { + const NMiniKQL::THolderFactory& holderFactory, const NYql::NDqProto::TTaskInput& inputDesc) + : TKqpStreamLookupWorker(std::move(settings), typeEnv, holderFactory, inputDesc) { } virtual ~TKqpLookupRows() {} @@ -289,7 +308,7 @@ public: } } - if (rowSize > freeSpace - (i64)resultStats.BytesCount) { + if (rowSize > freeSpace - (i64)resultStats.ResultBytesCount) { row.DeleteUnreferenced(); sizeLimitExceeded = true; break; @@ -297,8 +316,10 @@ public: batch.push_back(std::move(row)); - resultStats.RowsCount += 1; - resultStats.BytesCount += rowSize; + resultStats.ReadRowsCount += 1; + resultStats.ReadBytesCount += rowSize; + resultStats.ResultRowsCount += 1; + resultStats.ResultBytesCount += rowSize; } if (result.UnprocessedResultRow == result.ReadResult->Get()->GetRowsCount()) { @@ -389,7 +410,8 @@ private: class TKqpJoinRows : public TKqpStreamLookupWorker { public: TKqpJoinRows(NKikimrKqp::TKqpStreamLookupSettings&& settings, const NMiniKQL::TTypeEnvironment& typeEnv, - const NMiniKQL::THolderFactory& holderFactory) : TKqpStreamLookupWorker(std::move(settings), typeEnv, holderFactory) { + const NMiniKQL::THolderFactory& holderFactory, const NYql::NDqProto::TTaskInput& inputDesc) + : TKqpStreamLookupWorker(std::move(settings), typeEnv, holderFactory, inputDesc) { // read columns should contain join key and result columns for (auto joinKey : LookupKeyColumns) { @@ -574,9 +596,9 @@ public: auto leftRowIt = PendingLeftRowsByKey.find(joinKeyCells); YQL_ENSURE(leftRowIt != PendingLeftRowsByKey.end()); - i64 resultRowSize = 0; - i64 availableSpace = freeSpace - (i64)resultStats.BytesCount; - auto resultRow = TryBuildResultRow(leftRowIt->second, row, resultRowSize, availableSpace, result.ShardId); + TReadResultStats rowStats; + i64 availableSpace = freeSpace - (i64)resultStats.ResultBytesCount; + auto resultRow = TryBuildResultRow(leftRowIt->second, row, rowStats, availableSpace, result.ShardId); if (!resultRow.HasValue()) { sizeLimitExceeded = true; @@ -584,9 +606,7 @@ public: } batch.push_back(std::move(resultRow)); - - resultStats.RowsCount += 1; - resultStats.BytesCount += resultRowSize; + resultStats.Add(rowStats); } if (result.UnprocessedResultRow == result.ReadResult->Get()->GetRowsCount()) { @@ -620,15 +640,16 @@ public: && !leftRowIt->second.RightRowExist; if (leftRowCanBeSent) { - i64 resultRowSize = 0; - i64 availableSpace = freeSpace - (i64) resultStats.BytesCount; - auto resultRow = TryBuildResultRow(leftRowIt->second, {}, resultRowSize, availableSpace); + TReadResultStats rowStats; + i64 availableSpace = freeSpace - (i64)resultStats.ResultBytesCount; + auto resultRow = TryBuildResultRow(leftRowIt->second, {}, rowStats, availableSpace); if (!resultRow.HasValue()) { break; } batch.push_back(std::move(resultRow)); + resultStats.Add(rowStats); PendingLeftRowsByKey.erase(leftRowIt++); } else { ++leftRowIt; @@ -697,17 +718,44 @@ private: return range.From.subspan(0, LookupKeyColumns.size()); } - NUdf::TUnboxedValue TryBuildResultRow(TLeftRowInfo& leftRowInfo, TConstArrayRef<TCell> rightRow, i64& resultRowSize, - i64 freeSpace, TMaybe<ui64> shardId = {}) { + NMiniKQL::TStructType* GetLeftRowType() const { + YQL_ENSURE(InputDesc.HasTransform()); + + auto outputTypeNode = NMiniKQL::DeserializeNode(TStringBuf{InputDesc.GetTransform().GetOutputType()}, TypeEnv); + YQL_ENSURE(outputTypeNode, "Failed to deserialize stream lookup transform output type"); + + auto outputType = static_cast<NMiniKQL::TType*>(outputTypeNode); + YQL_ENSURE(outputType->GetKind() == NMiniKQL::TType::EKind::Tuple, "Unexpected stream lookup output type"); + + const auto outputTupleType = AS_TYPE(NMiniKQL::TTupleType, outputType); + YQL_ENSURE(outputTupleType->GetElementsCount() == 2); + + const auto outputLeftRowType = outputTupleType->GetElementType(0); + YQL_ENSURE(outputLeftRowType->GetKind() == NMiniKQL::TType::EKind::Struct); + + return AS_TYPE(NMiniKQL::TStructType, outputLeftRowType); + } + + NUdf::TUnboxedValue TryBuildResultRow(TLeftRowInfo& leftRowInfo, TConstArrayRef<TCell> rightRow, + TReadResultStats& rowStats, i64 freeSpace, TMaybe<ui64> shardId = {}) { NUdf::TUnboxedValue* resultRowItems = nullptr; auto resultRow = HolderFactory.CreateDirectArrayHolder(2, resultRowItems); + ui64 leftRowSize = 0; + ui64 rightRowSize = 0; + resultRowItems[0] = leftRowInfo.Row; + auto leftRowType = GetLeftRowType(); + YQL_ENSURE(leftRowType); + + for (size_t i = 0; i < leftRowType->GetMembersCount(); ++i) { + auto columnTypeInfo = UnpackTypeInfo(leftRowType->GetMemberType(i)); + leftRowSize += NMiniKQL::GetUnboxedValueSize(leftRowInfo.Row.GetElement(i), columnTypeInfo).AllocatedBytes; + } if (!rightRow.empty()) { leftRowInfo.RightRowExist = true; - // TODO: get size for left row NUdf::TUnboxedValue* rightRowItems = nullptr; resultRowItems[1] = HolderFactory.CreateDirectArrayHolder(Columns.size(), rightRowItems); @@ -720,19 +768,25 @@ private: if (IsSystemColumn(column.Name)) { YQL_ENSURE(shardId); NMiniKQL::FillSystemColumn(rightRowItems[colIndex], *shardId, column.Id, column.PType); - resultRowSize += sizeof(NUdf::TUnboxedValue); + rightRowSize += sizeof(NUdf::TUnboxedValue); } else { rightRowItems[colIndex] = NMiniKQL::GetCellValue(rightRow[std::distance(ReadColumns.begin(), it)], column.PType); - resultRowSize += NMiniKQL::GetUnboxedValueSize(rightRowItems[colIndex], column.PType).AllocatedBytes; + rightRowSize += NMiniKQL::GetUnboxedValueSize(rightRowItems[colIndex], column.PType).AllocatedBytes; } } } else { resultRowItems[1] = NUdf::TUnboxedValuePod(); } - if (resultRowSize > freeSpace) { + rowStats.ReadRowsCount += (rightRowSize > 0 ? 1 : 0); + rowStats.ReadBytesCount += rightRowSize; + rowStats.ResultRowsCount += 1; + rowStats.ResultBytesCount += leftRowSize + rightRowSize; + + if (rowStats.ResultBytesCount > (ui64)freeSpace) { resultRow.DeleteUnreferenced(); + rowStats.Clear(); } return resultRow; @@ -748,13 +802,14 @@ private: }; std::unique_ptr<TKqpStreamLookupWorker> CreateStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSettings&& settings, - const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory) { + const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory, + const NYql::NDqProto::TTaskInput& inputDesc) { switch (settings.GetLookupStrategy()) { case NKqpProto::EStreamLookupStrategy::LOOKUP: - return std::make_unique<TKqpLookupRows>(std::move(settings), typeEnv, holderFactory); + return std::make_unique<TKqpLookupRows>(std::move(settings), typeEnv, holderFactory, inputDesc); case NKqpProto::EStreamLookupStrategy::JOIN: - return std::make_unique<TKqpJoinRows>(std::move(settings), typeEnv, holderFactory); + return std::make_unique<TKqpJoinRows>(std::move(settings), typeEnv, holderFactory, inputDesc); default: return {}; } diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h index 56ac40019d..b4bf0f47ef 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.h @@ -22,13 +22,29 @@ public: }; struct TReadResultStats { - ui64 RowsCount = 0; - ui64 BytesCount = 0; + ui64 ReadRowsCount = 0; + ui64 ReadBytesCount = 0; + ui64 ResultRowsCount = 0; + ui64 ResultBytesCount = 0; + + void Add(const TReadResultStats& other) { + ReadRowsCount += other.ReadRowsCount; + ReadBytesCount += other.ReadBytesCount; + ResultRowsCount += other.ResultRowsCount; + ResultBytesCount += other.ResultBytesCount; + } + + void Clear() { + ReadRowsCount = 0; + ReadBytesCount = 0; + ResultRowsCount = 0; + ResultBytesCount = 0; + } }; public: - TKqpStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSettings&& settings, - const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory); + TKqpStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSettings&& settings, const NMiniKQL::TTypeEnvironment& typeEnv, + const NMiniKQL::THolderFactory& holderFactory, const NYql::NDqProto::TTaskInput& inputDesc); virtual ~TKqpStreamLookupWorker(); @@ -46,6 +62,7 @@ public: protected: const NMiniKQL::TTypeEnvironment& TypeEnv; const NMiniKQL::THolderFactory& HolderFactory; + const NYql::NDqProto::TTaskInput& InputDesc; const TString TablePath; const TTableId TableId; std::unordered_map<TString, TSysTables::TTableColumnInfo> KeyColumns; @@ -54,7 +71,8 @@ protected: }; std::unique_ptr<TKqpStreamLookupWorker> CreateStreamLookupWorker(NKikimrKqp::TKqpStreamLookupSettings&& settings, - const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory); + const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory, + const NYql::NDqProto::TTaskInput& inputDesc); } // namespace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp b/ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp index 7c09a845e2..2a243f8c84 100644 --- a/ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp +++ b/ydb/core/kqp/ut/perf/kqp_query_perf_ut.cpp @@ -275,6 +275,56 @@ Y_UNIT_TEST_SUITE(KqpQueryPerf) { UNIT_ASSERT_VALUES_EQUAL(totalTasks, EnableSourceRead ? 2 : 3); } + Y_UNIT_TEST_TWIN(IndexLookupJoin, EnableStreamLookup) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(EnableStreamLookup); + auto settings = TKikimrSettings() + .SetAppConfig(appConfig); + TKikimrRunner kikimr{settings}; + + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + NYdb::NTable::TExecDataQuerySettings execSettings; + execSettings.CollectQueryStats(ECollectQueryStatsMode::Profile); + + auto result = session.ExecuteDataQuery(Q1_(R"( + SELECT l.Key, r.Key1, r.Key2 + FROM `/Root/Join1` AS l + INNER JOIN `/Root/Join2` AS r + ON l.Fk21 = r.Key1 + ORDER BY l.Key, r.Key1, r.Key2; + )"), TTxControl::BeginTx().CommitTx(), execSettings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + CompareYson(R"([ + [[1];[101u];["One"]]; + [[1];[101u];["Three"]]; + [[1];[101u];["Two"]]; + [[2];[102u];["One"]]; + [[3];[103u];["One"]]; + [[4];[104u];["One"]]; + [[5];[105u];["One"]]; + [[5];[105u];["Two"]]; + [[6];[106u];["One"]]; + [[8];[108u];["One"]]; + [[9];[101u];["One"]]; + [[9];[101u];["Three"]]; + [[9];[101u];["Two"]] + ])", FormatResultSetYson(result.GetResultSet(0))); + + auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + + AssertTableStats(result, "/Root/Join1", { + .ExpectedReads = 9, + }); + + AssertTableStats(result, "/Root/Join2", { + .ExpectedReads = EnableStreamLookup ? 13 : 10, + }); + + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), EnableStreamLookup ? 1 : 3); + } + Y_UNIT_TEST(Upsert) { auto kikimr = DefaultKikimrRunner(); auto db = kikimr.GetTableClient(); |