diff options
author | ulya-sidorina <yulia@ydb.tech> | 2023-02-23 20:32:27 +0300 |
---|---|---|
committer | ulya-sidorina <yulia@ydb.tech> | 2023-02-23 20:32:27 +0300 |
commit | 09f337323884478aee3d0da15e9bbfd28411f6de (patch) | |
tree | 2d1b05498e53e7c449379784380ecf2d5e474ddc | |
parent | 73dbf37dfe3ffaf8923d0ce1f89c0b00148345fb (diff) | |
download | ydb-09f337323884478aee3d0da15e9bbfd28411f6de.tar.gz |
fix stats collecting for stream lookup actor
bugfix(kqp): collect stats for stream lookup actor
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_compute_actor_helpers.cpp | 33 | ||||
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h | 5 | ||||
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp | 5 | ||||
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp | 5 | ||||
-rw-r--r-- | ydb/core/kqp/executer_actor/kqp_executer_stats.cpp | 7 | ||||
-rw-r--r-- | ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp | 39 | ||||
-rw-r--r-- | ydb/core/kqp/ut/query/kqp_stats_ut.cpp | 36 | ||||
-rw-r--r-- | ydb/core/protos/kqp_stats.proto | 5 | ||||
-rw-r--r-- | ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h | 6 |
9 files changed, 89 insertions, 52 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor_helpers.cpp b/ydb/core/kqp/compute_actor/kqp_compute_actor_helpers.cpp index cb2c43db43..51e854da17 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor_helpers.cpp +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor_helpers.cpp @@ -25,37 +25,4 @@ bool FindSchemeErrorInIssues(const Ydb::StatusIds::StatusCode& status, const NYq return schemeError; } -void FillTaskInputStats(const NYql::NDqProto::TDqTask& task, NYql::NDqProto::TDqTaskStats& taskStats) { - THashMap<ui32, TString> inputTables; - - for (ui32 inputIndex = 0; inputIndex < task.InputsSize(); ++inputIndex) { - const auto& taskInput = task.GetInputs(inputIndex); - if (taskInput.HasTransform()) { - const auto& transform = taskInput.GetTransform(); - YQL_ENSURE(transform.GetType() == "StreamLookupInputTransformer", - "Unexpected input transform type: " << transform.GetType()); - - const google::protobuf::Any &settingsAny = transform.GetSettings(); - YQL_ENSURE(settingsAny.Is<NKikimrKqp::TKqpStreamLookupSettings>(), "Expected settings type: " - << NKikimrKqp::TKqpStreamLookupSettings::descriptor()->full_name() - << " , but got: " << settingsAny.type_url()); - - NKikimrKqp::TKqpStreamLookupSettings settings; - YQL_ENSURE(settingsAny.UnpackTo(&settings), "Failed to unpack settings"); - - inputTables.insert({inputIndex, settings.GetTable().GetPath()}); - } - } - - for (const auto& transformerStats : taskStats.GetInputTransforms()) { - auto tableIt = inputTables.find(transformerStats.GetInputIndex()); - YQL_ENSURE(tableIt != inputTables.end()); - - auto* tableStats = taskStats.AddTables(); - tableStats->SetTablePath(tableIt->second); - tableStats->SetReadRows(transformerStats.GetRowsOut()); - tableStats->SetReadBytes(transformerStats.GetBytes()); - } -} - } // namespace NKikimr::NKqp::NComputeActor diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h b/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h index cab30fb4a5..b32537d951 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor_impl.h @@ -12,11 +12,6 @@ namespace NKqp { using namespace NYql; using namespace NYql::NDq; -namespace NComputeActor { -void FillTaskInputStats(const NDqProto::TDqTask& task, NDqProto::TDqTaskStats& taskStats); - -} // namespace NComputeActor - class TKqpTaskRunnerExecutionContext : public IDqTaskRunnerExecutionContext { public: TKqpTaskRunnerExecutionContext(ui64 txId, bool withSpilling, IDqChannelStorage::TWakeUpCallback&& wakeUp, diff --git a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp index 4a8ca2587d..4be90552a9 100644 --- a/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp @@ -193,11 +193,6 @@ public: tableStats->MutableExtra()->PackFrom(tableExtraStats); } } - - if (last && dst->TasksSize() > 0) { - YQL_ENSURE(dst->TasksSize() == 1); - NComputeActor::FillTaskInputStats(GetTask(), *dst->MutableTasks(0)); - } } private: diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp index f3098274db..911fb69ca1 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp @@ -285,11 +285,6 @@ public: // dst->MutableExtra()->PackFrom(extraStats); } - - if (last && dst->TasksSize() > 0) { - YQL_ENSURE(dst->TasksSize() == 1); - NComputeActor::FillTaskInputStats(GetTask(), *dst->MutableTasks(0)); - } } protected: diff --git a/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp b/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp index c8961e499f..3a59d76574 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp +++ b/ydb/core/kqp/executer_actor/kqp_executer_stats.cpp @@ -136,6 +136,13 @@ void TQueryExecutionStats::AddComputeActorStats(ui32 /* nodeId */, NYql::NDqProt tableAggr->SetWriteBytes(tableAggr->GetWriteBytes() + table.GetWriteBytes()); tableAggr->SetEraseRows(tableAggr->GetEraseRows() + table.GetEraseRows()); tableAggr->SetAffectedPartitions(tableAggr->GetAffectedPartitions() + table.GetAffectedPartitions()); + + NKqpProto::TKqpReadActorTableAggrExtraStats tableExtraStats; + if (table.GetExtra().UnpackTo(&tableExtraStats)) { + for (const auto& shardId : tableExtraStats.GetAffectedShards()) { + AffectedShards.insert(shardId); + } + } } } diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp index 95d0908192..ed1fee9b20 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp @@ -8,6 +8,7 @@ #include <ydb/core/kqp/common/kqp_resolve.h> #include <ydb/core/kqp/gateway/kqp_gateway.h> #include <ydb/core/protos/kqp.pb.h> +#include <ydb/core/protos/kqp_stats.pb.h> #include <ydb/core/tx/scheme_cache/scheme_cache.h> #include <ydb/core/tx/datashard/datashard.h> #include <ydb/core/kqp/common/kqp_event_ids.h> @@ -28,7 +29,8 @@ public: const NMiniKQL::TTypeEnvironment& typeEnv, const NMiniKQL::THolderFactory& holderFactory, std::shared_ptr<NMiniKQL::TScopedAlloc>& alloc, NKikimrKqp::TKqpStreamLookupSettings&& settings) : InputIndex(inputIndex), Input(input), ComputeActorId(computeActorId), TypeEnv(typeEnv) - , HolderFactory(holderFactory), Alloc(alloc), TableId(MakeTableId(settings.GetTable())) + , HolderFactory(holderFactory), Alloc(alloc), TablePath(settings.GetTable().GetPath()) + , TableId(MakeTableId(settings.GetTable())) , Snapshot(settings.GetSnapshot().GetStep(), settings.GetSnapshot().GetTxId()) , LockTxId(settings.HasLockTxId() ? settings.GetLockTxId() : TMaybe<ui64>()) , ImmediateTx(settings.GetImmediateTx()) @@ -82,6 +84,34 @@ public: return NKikimrServices::TActivity::KQP_STREAM_LOOKUP_ACTOR; } + void FillExtraStats(NYql::NDqProto::TDqTaskStats* stats , bool last) override { + if (last) { + NYql::NDqProto::TDqTableStats* tableStats = nullptr; + for (auto& table : *stats->MutableTables()) { + if (table.GetTablePath() == TablePath) { + tableStats = &table; + } + } + + if (!tableStats) { + tableStats = stats->AddTables(); + tableStats->SetTablePath(TablePath); + } + + // TODO: use evread statistics after KIKIMR-16924 + tableStats->SetReadRows(tableStats->GetReadRows() + ReadRowsCount); + tableStats->SetReadBytes(tableStats->GetReadBytes() + ReadBytesCount); + tableStats->SetAffectedPartitions(tableStats->GetAffectedPartitions() + ReadsPerShard.size()); + + NKqpProto::TKqpReadActorTableAggrExtraStats tableExtraStats; + for (const auto& [shardId, _] : ReadsPerShard) { + tableExtraStats.AddAffectedShards(shardId); + } + + tableStats->MutableExtra()->PackFrom(tableExtraStats); + } + } + private: enum class EReadState { Initial, @@ -346,6 +376,8 @@ private: } batch.push_back(std::move(row)); + ++ReadRowsCount; + ReadBytesCount += rowSize; totalSize += rowSize; } @@ -580,6 +612,7 @@ private: const NMiniKQL::TTypeEnvironment& TypeEnv; const NMiniKQL::THolderFactory& HolderFactory; std::shared_ptr<NKikimr::NMiniKQL::TScopedAlloc> Alloc; + const TString TablePath; const TTableId TableId; IKqpGateway::TKqpSnapshot Snapshot; const TMaybe<ui64> LockTxId; @@ -596,6 +629,10 @@ private: NActors::TActorId SchemeCacheRequestTimeoutTimer; TVector<NKikimrTxDataShard::TLock> Locks; TVector<NKikimrTxDataShard::TLock> BrokenLocks; + + // stats + ui64 ReadRowsCount = 0; + ui64 ReadBytesCount = 0; }; } // namespace diff --git a/ydb/core/kqp/ut/query/kqp_stats_ut.cpp b/ydb/core/kqp/ut/query/kqp_stats_ut.cpp index 04a58539bb..b91955cb65 100644 --- a/ydb/core/kqp/ut/query/kqp_stats_ut.cpp +++ b/ydb/core/kqp/ut/query/kqp_stats_ut.cpp @@ -1,6 +1,7 @@ #include <ydb/core/kqp/ut/common/kqp_ut_common.h> #include <ydb/public/sdk/cpp/client/ydb_table/table.h> #include <ydb/public/sdk/cpp/client/resources/ydb_resources.h> +#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h> #include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h> @@ -345,6 +346,41 @@ Y_UNIT_TEST(StatsProfile) { //UNIT_ASSERT_EQUAL(node2.GetMap().at("Stats").GetMapSafe().at("ComputeNodes").GetArraySafe().size(), 1); } +Y_UNIT_TEST(StreamLookupStats) { + NKikimrConfig::TAppConfig app; + app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(true); + TKikimrRunner kikimr(TKikimrSettings().SetAppConfig(app)); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + TExecDataQuerySettings settings; + settings.CollectQueryStats(ECollectQueryStatsMode::Full); + + auto result = session.ExecuteDataQuery(R"( + $keys = SELECT Key FROM `/Root/KeyValue`; + SELECT * FROM `/Root/TwoShard` WHERE Key in $keys; + )", TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + Cerr << result.GetQueryPlan() << Endl; + + NJson::TJsonValue plan; + NJson::ReadJsonTree(result.GetQueryPlan(), &plan, true); + auto streamLookup = FindPlanNodeByKv(plan, "Node Type", "TableLookup"); + UNIT_ASSERT(streamLookup.IsDefined()); + + auto& stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases().size(), 2); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).affected_shards(), 1); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access().size(), 1); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access(0).name(), "/Root/TwoShard"); + UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(1).table_access(0).partitions_count(), 1); + + AssertTableStats(result, "/Root/TwoShard", { + .ExpectedReads = 2, + }); +} + } // suite } // namespace NKqp diff --git a/ydb/core/protos/kqp_stats.proto b/ydb/core/protos/kqp_stats.proto index c2579c24ff..d13a1a5a39 100644 --- a/ydb/core/protos/kqp_stats.proto +++ b/ydb/core/protos/kqp_stats.proto @@ -24,6 +24,11 @@ message TKqpShardTableAggrExtraStats { NYql.NDqProto.TDqStatsAggr ShardCpuTimeUs = 2; } +// aggregated read actor extra stats for table +message TKqpReadActorTableAggrExtraStats { + repeated uint64 AffectedShards = 1; +} + message TKqpScanTableExtraStats { // IScan stats uint64 IScanStartTimeMs = 1; // start IScan timestamp diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index d52073182e..8dc0d00ae0 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -1973,10 +1973,10 @@ public: protoTransform->SetIngressBytes(ingressBytes); protoTransform->SetMaxMemoryUsage(transformStats->MaxMemoryUsage); + } - if (auto* transform = transformInfo.AsyncInput) { - transform->FillExtraStats(protoTask, last); - } + if (auto* transform = transformInfo.AsyncInput) { + transform->FillExtraStats(protoTask, last); } } |