diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2023-11-06 15:02:32 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2023-11-06 15:19:17 +0300 |
commit | 2f26a2697d3f615c7546ff1a46c91bbe511d3b93 (patch) | |
tree | e46e64238d282523045d3d399022fbafda71097f | |
parent | a75d931e987608f3e7cd857ddd6c5aea464e4233 (diff) | |
download | ydb-2f26a2697d3f615c7546ff1a46c91bbe511d3b93.tar.gz |
Intermediate changes
29 files changed, 409 insertions, 172 deletions
diff --git a/.github/workflows/nightly_run.yaml b/.github/workflows/nightly_run.yaml index db624cb603..5570844040 100644 --- a/.github/workflows/nightly_run.yaml +++ b/.github/workflows/nightly_run.yaml @@ -40,6 +40,5 @@ jobs: secrets: inherit with: build_target: ydb/ - sanitizer: none run_build: true run_tests: true diff --git a/.mapping.json b/.mapping.json index 027f520e1f..bbc838ef39 100644 --- a/.mapping.json +++ b/.mapping.json @@ -8800,6 +8800,11 @@ "ydb/library/yql/utils/simd/exec/stream_store/CMakeLists.linux-x86_64.txt":"", "ydb/library/yql/utils/simd/exec/stream_store/CMakeLists.txt":"", "ydb/library/yql/utils/simd/exec/stream_store/CMakeLists.windows-x86_64.txt":"", + "ydb/library/yql/utils/simd/exec/tuples_to_bucket/CMakeLists.darwin-x86_64.txt":"", + "ydb/library/yql/utils/simd/exec/tuples_to_bucket/CMakeLists.linux-aarch64.txt":"", + "ydb/library/yql/utils/simd/exec/tuples_to_bucket/CMakeLists.linux-x86_64.txt":"", + "ydb/library/yql/utils/simd/exec/tuples_to_bucket/CMakeLists.txt":"", + "ydb/library/yql/utils/simd/exec/tuples_to_bucket/CMakeLists.windows-x86_64.txt":"", "ydb/library/yql/utils/simd/ut/CMakeLists.darwin-x86_64.txt":"", "ydb/library/yql/utils/simd/ut/CMakeLists.linux-aarch64.txt":"", "ydb/library/yql/utils/simd/ut/CMakeLists.linux-x86_64.txt":"", diff --git a/build/conf/project_specific/yql_udf.conf b/build/conf/project_specific/yql_udf.conf index a134843634..85024f4ad8 100644 --- a/build/conf/project_specific/yql_udf.conf +++ b/build/conf/project_specific/yql_udf.conf @@ -36,12 +36,16 @@ macro UDF_NO_PROBE() { module YQL_UDF_TEST: PY3TEST_BIN { SET_APPEND(_MAKEFILE_INCLUDE_LIKE_DEPS canondata/result.json) - PEERDIR(ydb/library/yql/tests/common/udf_test) + PEERDIR(contrib/ydb/library/yql/tests/common/udf_test) - DEPENDS(ydb/library/yql/tools/astdiff) - DEPENDS(ydb/library/yql/tools/yqlrun) - DATA(arcadia/ydb/library/yql/mount) - DATA(arcadia/ydb/library/yql/cfg/udf_test) + DEPENDS(contrib/ydb/library/yql/tools/astdiff) + DEPENDS(contrib/ydb/library/yql/tools/yqlrun) + DATA(arcadia/contrib/ydb/library/yql/mount) + DATA(arcadia/contrib/ydb/library/yql/cfg/udf_test) + ENV(YQL_ASTDIFF_PATH="contrib/ydb/library/yql/tools/astdiff/astdiff") + ENV(YQL_CONFIG_DIR="contrib/ydb/library/yql/cfg/udf_test") + ENV(YQL_YQLRUN_PATH="contrib/ydb/library/yql/tools/yqlrun/yqlrun") + ENV(YQL_SQL2YQL_PATH="contrib/ydb/library/yql/tools/sql2yql/sql2yql") } module YQL_UDF_YDB_TEST: PY3TEST_BIN { @@ -74,8 +78,8 @@ module YQL_UDF_TEST_CONTRIB: PY3TEST_BIN { ### ### https://yql.yandex-team.ru/docs/yt/udf/cpp/ macro _ADD_YQL_UDF_DEPS() { - PEERDIR(ydb/library/yql/public/udf) - PEERDIR(ydb/library/yql/public/udf/support) + PEERDIR(contrib/ydb/library/yql/public/udf) + PEERDIR(contrib/ydb/library/yql/public/udf/support) } macro _ADD_YQL_UDF_YDB_DEPS() { @@ -97,7 +101,7 @@ macro _MAKE_YQL_UDF() { _ADD_YQL_UDF_DEPS() SET_APPEND(USER_CXXFLAGS -DBUILD_UDF) # For Windows using declspecs - DEFAULT(YQL_UDF_EXPORT ${ARCADIA_ROOT}/ydb/library/yql/public/udf/udfs_exports.exports) + DEFAULT(YQL_UDF_EXPORT ${ARCADIA_ROOT}/contrib/ydb/library/yql/public/udf/udfs_exports.exports) when ($WINDOWS == "yes") { YQL_UDF_EXPORT= diff --git a/build/plugins/yql_python_udf.py b/build/plugins/yql_python_udf.py index b0f9570090..5a0bb69c63 100644 --- a/build/plugins/yql_python_udf.py +++ b/build/plugins/yql_python_udf.py @@ -19,7 +19,7 @@ def onregister_yql_python_udf(unit, *args): unit.onyql_abi_version(['2', '27', '0']) unit.onpeerdir(['yql/udfs/common/python/python_udf']) - unit.onpeerdir(['ydb/library/yql/public/udf']) + unit.onpeerdir(['contrib/ydb/library/yql/public/udf']) if add_libra_modules: unit.onpeerdir(['quality/user_sessions/libra_arc/noyql']) @@ -37,7 +37,7 @@ def onregister_yql_python_udf(unit, *args): output_includes = [ 'yql/udfs/common/python/python_udf/python_udf.h', - 'ydb/library/yql/public/udf/udf_registrator.h', + 'contrib/ydb/library/yql/public/udf/udf_registrator.h', ] if add_libra_modules: output_includes.append('yql/udfs/quality/libra/module/module.h') diff --git a/build/scripts/gen_yql_python_udf.py b/build/scripts/gen_yql_python_udf.py index 13b5898117..127b4b8867 100644 --- a/build/scripts/gen_yql_python_udf.py +++ b/build/scripts/gen_yql_python_udf.py @@ -3,7 +3,7 @@ import sys TEMPLATE=""" #include <yql/udfs/common/python/python_udf/python_udf.h> -#include <ydb/library/yql/public/udf/udf_registrator.h> +#include <contrib/ydb/library/yql/public/udf/udf_registrator.h> #if @WITH_LIBRA@ #include <yql/udfs/quality/libra/module/module.h> diff --git a/ydb/core/kqp/opt/kqp_query_plan.cpp b/ydb/core/kqp/opt/kqp_query_plan.cpp index 61419143e3..cab38fb67b 100644 --- a/ydb/core/kqp/opt/kqp_query_plan.cpp +++ b/ydb/core/kqp/opt/kqp_query_plan.cpp @@ -1975,9 +1975,8 @@ TString AddExecStatsToTxPlan(const TString& txPlanJson, const NYql::NDqProto::TD SetNonZero(node, "ComputeTimeUs", taskStats.GetComputeCpuTimeUs()); - SetNonZero(node, "WaitTimeUs", taskStats.GetWaitTimeUs()); // need to be reviewed - SetNonZero(node, "PendingInputTimeUs", taskStats.GetPendingInputTimeUs()); // need to be reviewed - SetNonZero(node, "PendingOutputTimeUs", taskStats.GetPendingOutputTimeUs()); // need to be reviewed + SetNonZero(node, "WaitInputTimeUs", taskStats.GetWaitInputTimeUs()); + SetNonZero(node, "WaitOutputTimeUs", taskStats.GetWaitOutputTimeUs()); NKqpProto::TKqpTaskExtraStats taskExtraStats; if (taskStats.GetExtra().UnpackTo(&taskExtraStats)) { diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 5e3b90e812..73d12d8077 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -625,8 +625,10 @@ protected: void InternalError(NYql::NDqProto::StatusIds::StatusCode statusCode, TIssues issues) { CA_LOG_E(InternalErrorLogString(statusCode, issues)); - TaskRunner->GetAllocatorPtr()->InvalidateMemInfo(); - TaskRunner->GetAllocatorPtr()->DisableStrictAllocationCheck(); + if (TaskRunner) { + TaskRunner->GetAllocatorPtr()->InvalidateMemInfo(); + TaskRunner->GetAllocatorPtr()->DisableStrictAllocationCheck(); + } std::optional<TGuard<NKikimr::NMiniKQL::TScopedAlloc>> guard = MaybeBindAllocator(); State = NDqProto::COMPUTE_STATE_FAILURE; ReportStateAndMaybeDie(statusCode, issues); @@ -1940,11 +1942,7 @@ private: } virtual const NYql::NDq::TTaskRunnerStatsBase* GetTaskRunnerStats() { - if (!TaskRunner) { - return nullptr; - } - TaskRunner->UpdateStats(); - return TaskRunner->GetStats(); + return TaskRunner ? TaskRunner->GetStats() : nullptr; } virtual const IDqAsyncOutputBuffer* GetSink(ui64, const TAsyncOutputInfoBase& sinkInfo) const { diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_stats.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_stats.cpp index bf0bc688d3..34385c17c4 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_stats.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_stats.cpp @@ -44,14 +44,8 @@ void FillTaskRunnerStats(ui64 taskId, ui32 stageId, const TTaskRunnerStatsBase& protoTask->SetComputeCpuTimeUs(taskStats.ComputeCpuTime.MicroSeconds()); protoTask->SetBuildCpuTimeUs(taskStats.BuildCpuTime.MicroSeconds()); - protoTask->SetWaitTimeUs(taskStats.WaitTime.MicroSeconds()); // to be reviewed - protoTask->SetWaitOutputTimeUs(taskStats.WaitOutputTime.MicroSeconds()); // to be reviewed - - // All run statuses metrics - protoTask->SetPendingInputTimeUs(taskStats.RunStatusTimeMetrics[ERunStatus::PendingInput].MicroSeconds()); // to be reviewed - protoTask->SetPendingOutputTimeUs(taskStats.RunStatusTimeMetrics[ERunStatus::PendingOutput].MicroSeconds()); // to be reviewed - protoTask->SetFinishTimeUs(taskStats.RunStatusTimeMetrics[ERunStatus::Finished].MicroSeconds()); // to be reviewed - static_assert(TRunStatusTimeMetrics::StatusesCount == 3); // Add all statuses here + protoTask->SetWaitInputTimeUs(taskStats.WaitInputTime.MicroSeconds()); + protoTask->SetWaitOutputTimeUs(taskStats.WaitOutputTime.MicroSeconds()); if (StatsLevelCollectProfile(level)) { if (taskStats.ComputeCpuTimeByRun) { @@ -106,6 +100,7 @@ void FillTaskRunnerStats(ui64 taskId, ui32 stageId, const TTaskRunnerStatsBase& } { auto& protoChannel = *protoTask->AddInputChannels(); + protoChannel.SetChannelId(pushStats.ChannelId); // only one of ids protoChannel.SetSrcStageId(srcStageId); FillAsyncStats(*protoChannel.MutablePush(), pushStats); FillAsyncStats(*protoChannel.MutablePop(), popStats); @@ -135,8 +130,8 @@ void FillTaskRunnerStats(ui64 taskId, ui32 stageId, const TTaskRunnerStatsBase& // // task runner is not aware of ingress/egress stats, fill in in CA // - for (auto& [inputIndex, sources] : taskStats.Sources) { - if (StatsLevelCollectFull(level)) { + if (StatsLevelCollectFull(level)) { + for (auto& [inputIndex, sources] : taskStats.Sources) { auto& protoSource = *protoTask->AddSources(); protoSource.SetInputIndex(inputIndex); FillAsyncStats(*protoSource.MutablePush(), sources->GetPushStats()); @@ -184,6 +179,7 @@ void FillTaskRunnerStats(ui64 taskId, ui32 stageId, const TTaskRunnerStatsBase& } { auto& protoChannel = *protoTask->AddOutputChannels(); + protoChannel.SetChannelId(popStats.ChannelId); // only one of ids protoChannel.SetDstStageId(dstStageId); FillAsyncStats(*protoChannel.MutablePush(), pushStats); FillAsyncStats(*protoChannel.MutablePop(), popStats); diff --git a/ydb/library/yql/dq/actors/protos/dq_stats.proto b/ydb/library/yql/dq/actors/protos/dq_stats.proto index a67d87d70d..bcc01b4084 100644 --- a/ydb/library/yql/dq/actors/protos/dq_stats.proto +++ b/ydb/library/yql/dq/actors/protos/dq_stats.proto @@ -205,11 +205,12 @@ message TDqTaskStats { uint64 StartTimeMs = 158; uint64 FinishTimeMs = 5; // task finish time, timestamp in millis uint64 FirstRowTimeMs = 4; // first row time, timestamp in millis - uint64 WaitTimeUs = 104; // total wait (input + output) wall time + reserved 104; + uint64 WaitInputTimeUs = 111; // wait input wall time (any input: channels, source, ...) uint64 WaitOutputTimeUs = 105; // wait output wall time (any output: channels, sinks, ...) - uint64 PendingInputTimeUs = 107; // time waiting input data - uint64 PendingOutputTimeUs = 108; // time waiting output data - uint64 FinishTimeUs = 109; // time in finished state // ComputeCpuTimeUs + PendingInputTimeUs + PendingOutputTimeUs + FinishTimeUs == 100% (or == const in aggregated graphs for several stages/tasks) + reserved 107; + reserved 108; + reserved 109; } message TDqComputeActorStats { diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp index a9bb0e48d2..b8ade00c4f 100644 --- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp +++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp @@ -83,7 +83,6 @@ public: private: void OnStatisticsRequest(TEvStatistics::TPtr& ev) { - TaskRunner->UpdateStats(); THashMap<ui32, const IDqAsyncOutputBuffer*> sinks; for (const auto sinkId : ev->Get()->SinkIds) { @@ -220,7 +219,6 @@ private: { auto st = MakeHolder<TEvStatistics>(std::move(ev->Get()->SinkIds), std::move(ev->Get()->InputTransformIds)); - TaskRunner->UpdateStats(); THashMap<ui32, const IDqAsyncOutputBuffer*> sinks; for (const auto sinkId : st->SinkIds) { sinks[sinkId] = TaskRunner->GetSink(sinkId).Get(); diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp index c35a07e506..03c9c2a3a5 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp @@ -709,11 +709,12 @@ public: RunComputeTime = TDuration::Zero(); - auto runStatus = FetchAndDispatch(); - if (Stats) { - Stats->RunStatusTimeMetrics.SetCurrentStatus(runStatus, RunComputeTime); + if (Y_LIKELY(CollectBasic())) { + StopWaiting(); } + auto runStatus = FetchAndDispatch(); + if (Y_UNLIKELY(CollectFull())) { Stats->ComputeCpuTimeByRun->Collect(RunComputeTime.MilliSeconds()); @@ -725,24 +726,21 @@ public: } } - if (runStatus == ERunStatus::Finished) { - if (CollectBasic()) { - Stats->FinishTs = TInstant::Now(); - StopWaiting(Stats->FinishTs); - } - - return ERunStatus::Finished; - } - - if (CollectBasic()) { - auto now = TInstant::Now(); - StartWaiting(now); - if (runStatus == ERunStatus::PendingOutput) { - StartWaitingOutput(now); + if (Y_LIKELY(CollectBasic())) { + switch (runStatus) { + case ERunStatus::Finished: + Stats->FinishTs = TInstant::Now(); + break; + case ERunStatus::PendingInput: + StartWaitingInput(); + break; + case ERunStatus::PendingOutput: + StartWaitingOutput(); + break; } } - return runStatus; // PendingInput or PendingOutput + return runStatus; } bool HasEffects() const final { @@ -833,12 +831,6 @@ public: return Context.RandomProvider; } - void UpdateStats() override { - if (Stats) { - Stats->RunStatusTimeMetrics.UpdateStatusTime(); - } - } - const TDqTaskRunnerStats* GetStats() const override { return Stats.get(); } @@ -901,12 +893,6 @@ private: wideBuffer.resize(AllocatedHolder->OutputWideType->GetElementsCount()); } while (!AllocatedHolder->Output->IsFull()) { - if (CollectBasic()) { - auto now = TInstant::Now(); - StopWaitingOutput(now); - StopWaiting(now); - } - NUdf::TUnboxedValue value; NUdf::EFetchStatus fetchStatus; if (isWide) { @@ -1000,32 +986,29 @@ private: private: // statistics support + std::optional<TInstant> StartWaitInputTime; std::optional<TInstant> StartWaitOutputTime; - std::optional<TInstant> StartWaitTime; - void StartWaitingOutput(TInstant now) { - if (CollectBasic() && !StartWaitOutputTime) { - StartWaitOutputTime = now; + void StartWaitingInput() { + if (!StartWaitInputTime) { + StartWaitInputTime = TInstant::Now(); } } - void StopWaitingOutput(TInstant now) { - if (CollectBasic() && StartWaitOutputTime) { - Stats->WaitOutputTime += (now - *StartWaitOutputTime); - StartWaitOutputTime.reset(); + void StartWaitingOutput() { + if (!StartWaitOutputTime) { + StartWaitOutputTime = TInstant::Now(); } } - void StartWaiting(TInstant now) { - if (CollectBasic() && !StartWaitTime) { - StartWaitTime = now; + void StopWaiting() { + if (StartWaitInputTime) { + Stats->WaitInputTime += (TInstant::Now() - *StartWaitInputTime); + StartWaitInputTime.reset(); } - } - - void StopWaiting(TInstant now) { - if (CollectBasic() && StartWaitTime) { - Stats->WaitTime += (now - *StartWaitTime); - StartWaitTime.reset(); + if (StartWaitOutputTime) { + Stats->WaitOutputTime += (TInstant::Now() - *StartWaitOutputTime); + StartWaitOutputTime.reset(); } } }; diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.h b/ydb/library/yql/dq/runtime/dq_tasks_runner.h index f89243ad6a..42df8bb027 100644 --- a/ydb/library/yql/dq/runtime/dq_tasks_runner.h +++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.h @@ -30,40 +30,6 @@ enum class ERunStatus : ui32 { PendingOutput }; -class TRunStatusTimeMetrics { -public: - void UpdateStatusTime(TDuration computeCpuTime = TDuration::Zero()) { - auto now = TInstant::Now(); - StatusTime[ui32(CurrentStatus)] += now - StatusStartTime - computeCpuTime; - StatusStartTime = now; - } - - void SetCurrentStatus(ERunStatus status, TDuration computeCpuTime) { - Y_ABORT_UNLESS(ui32(status) < StatusesCount); - UpdateStatusTime(computeCpuTime); - CurrentStatus = status; - } - - TDuration operator[](ERunStatus status) const { - const ui32 index = ui32(status); - Y_ABORT_UNLESS(index < StatusesCount); - return StatusTime[index]; - } - - void Load(ERunStatus status, TDuration d) { - const ui32 index = ui32(status); - Y_ABORT_UNLESS(index < StatusesCount); - StatusTime[index] = d; - } - - static constexpr ui32 StatusesCount = 3; - -private: - TInstant StatusStartTime = TInstant::Now(); - ERunStatus CurrentStatus = ERunStatus::PendingInput; - TDuration StatusTime[StatusesCount]; -}; - struct TMkqlStat { NKikimr::NMiniKQL::TStatKey Key; i64 Value = 0; @@ -76,12 +42,10 @@ struct TTaskRunnerStatsBase { TInstant StartTs; TDuration ComputeCpuTime; - TRunStatusTimeMetrics RunStatusTimeMetrics; // ComputeCpuTime + RunStatusTimeMetrics == 100% time - - // profile stats - TDuration WaitTime; // wall time of waiting for input, scans & output + TDuration WaitInputTime; TDuration WaitOutputTime; + // profile stats NMonitoring::IHistogramCollectorPtr ComputeCpuTimeByRun; // in millis THashMap<ui32, THashMap<ui64, IDqInputChannel::TPtr>> InputChannels; // SrcStageId => {ChannelId => Channel} @@ -408,7 +372,6 @@ public: virtual const THashMap<TString, TString>& GetTaskParams() const = 0; virtual const TVector<TString>& GetReadRanges() const = 0; - virtual void UpdateStats() = 0; virtual const TDqTaskRunnerStats* GetStats() const = 0; virtual const TDqMeteringStats* GetMeteringStats() const = 0; @@ -432,7 +395,7 @@ inline void Out<NYql::NDq::TTaskRunnerStatsBase>(IOutputStream& os, TTypeTraits< << "\tStartTs: " << stats.StartTs << Endl << "\tFinishTs: " << stats.FinishTs << Endl << "\tComputeCpuTime: " << stats.ComputeCpuTime << Endl - << "\tWaitTime: " << stats.WaitTime << Endl + << "\tWaitInputTime: " << stats.WaitInputTime << Endl << "\tWaitOutputTime: " << stats.WaitOutputTime << Endl << "\tsize of InputChannels: " << stats.InputChannels.size() << Endl << "\tsize of Sources: " << stats.Sources.size() << Endl diff --git a/ydb/library/yql/providers/dq/actors/task_controller_impl.h b/ydb/library/yql/providers/dq/actors/task_controller_impl.h index fd3a49f4b0..ec5434f430 100644 --- a/ydb/library/yql/providers/dq/actors/task_controller_impl.h +++ b/ydb/library/yql/providers/dq/actors/task_controller_impl.h @@ -84,7 +84,12 @@ public: } ~TTaskControllerImpl() override { - SetTaskCountMetric(0); + // we want to clear TaskCount instantly to use with real-time monitoring + // all other counters will be kept for some time to upload to Monitoring + // and removed together + auto aggrStats = AggregateQueryStatsByStage(TaskStat, Stages, CollectFull()); + aggrStats.SetCounter(TaskStat.GetCounterName("TaskRunner", {{"Stage", "Total"}}, "TaskCount"), 0); + ExportStats(aggrStats, 0); } public: @@ -291,6 +296,9 @@ private: } else if (name == "InputRows") { if (labels.count(SinkLabel)) publicCounterName = "query.sink_output_records"; isDeriv = true; + } else if (name == "TaskCount") { + publicCounterName = "query.running_tasks"; + isDeriv = true; } else if (name == "MultiHop_LateThrownEventsCount") { publicCounterName = "query.late_events"; isDeriv = true; @@ -369,13 +377,11 @@ private: ADD_COUNTER(OutputBytes) // ADD_COUNTER(StartTimeMs) + ADD_COUNTER(WaitInputTimeUs) + ADD_COUNTER(WaitOutputTimeUs) + // profile stats ADD_COUNTER(BuildCpuTimeUs) - // ADD_COUNTER(WaitTimeUs) - // ADD_COUNTER(WaitOutputTimeUs) - // ADD_COUNTER(PendingInputTimeUs) - // ADD_COUNTER(PendingOutputTimeUs) - // ADD_COUNTER(FinishTimeUs) for (const auto& ingress : s.GetIngress()) { TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, "Ingress" + ingress.GetName() + "Bytes"), ingress.GetBytes()); @@ -460,18 +466,6 @@ private: } } - void SetTaskCountMetric(ui64 count) { - if (!ServiceCounters.Counters) { - return; - } - *ServiceCounters.Counters->GetCounter("TaskCount") = count; - - if (!ServiceCounters.PublicCounters) { - return; - } - *ServiceCounters.PublicCounters->GetNamedCounter("name", "query.running_tasks") = count; - } - public: void OnReadyState(TEvReadyState::TPtr& ev) { @@ -486,8 +480,6 @@ public: const auto& actorIds = ev->Get()->Record.GetActorId(); Y_ABORT_UNLESS(tasks.size() == actorIds.size()); - SetTaskCountMetric(tasks.size()); - for (int i = 0; i < static_cast<int>(tasks.size()); ++i) { auto actorId = ActorIdFromProto(actorIds[i]); const auto& task = Tasks.emplace_back(NDq::TDqTaskSettings(&tasks[i]), actorId).first; @@ -498,6 +490,13 @@ public: Stages.emplace(task.GetId(), taskMeta.GetStageId()); } + for (const auto& [taskId, stageId] : Stages) { + TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", + {{"Task", ToString(taskId)}, {"Stage", ToString(stageId)}}, "CpuTimeUs"), 0); + } + + ExportStats(AggregateQueryStatsByStage(TaskStat, Stages, CollectFull()), 0); + YQL_CLOG(DEBUG, ProviderDq) << "Ready State: " << SelfId(); MaybeUpdateChannels(); diff --git a/ydb/library/yql/providers/dq/counters/task_counters.cpp b/ydb/library/yql/providers/dq/counters/task_counters.cpp index cd56913042..a40a781eda 100644 --- a/ydb/library/yql/providers/dq/counters/task_counters.cpp +++ b/ydb/library/yql/providers/dq/counters/task_counters.cpp @@ -77,7 +77,6 @@ TTaskCounters AggregateQueryStatsByStage(TTaskCounters& queryStat, const THashMa labels.erase(maybeSrcStageId); } stage2Input[stageId].insert(channelId); - stage2Input["Total"].insert(channelId); labels.erase(maybeInputChannel); labels["Input"] = ToString(stage); input = true; @@ -94,7 +93,6 @@ TTaskCounters AggregateQueryStatsByStage(TTaskCounters& queryStat, const THashMa labels.erase(maybeDstStageId); } stage2Output[stageId].insert(channelId); - stage2Output["Total"].insert(channelId); labels.erase(maybeOutputChannel); labels["Output"] = ToString(stage); output = true; @@ -102,7 +100,6 @@ TTaskCounters AggregateQueryStatsByStage(TTaskCounters& queryStat, const THashMa labels.erase(maybeTask); labels["Stage"] = ToString(stageId); stage2Tasks[stageId].insert(taskId); - stage2Tasks["Total"].insert(taskId); if (collectFull) { aggregatedQueryStat.AddCounter(queryStat.GetCounterName("TaskRunner", labels, name), v); } @@ -148,23 +145,32 @@ TTaskCounters AggregateQueryStatsByStage(TTaskCounters& queryStat, const THashMa } } - for (const auto& [stageId, v] : stage2Tasks) { - if (collectFull || stageId == "Total") { + for (const auto& [stageId, tasks] : stage2Tasks) { + auto taskCount = tasks.size(); + if (collectFull) { aggregatedQueryStat.AddCounter(queryStat.GetCounterName("TaskRunner", - {{"Stage", stageId}}, "TasksCount"), static_cast<ui64>(v.size())); + {{"Stage", stageId}}, "TaskCount"), taskCount); } + aggregatedQueryStat.AddCounter(queryStat.GetCounterName("TaskRunner", + {{"Stage", "Total"}}, "TaskCount"), taskCount); } - for (const auto& [stageId, v] : stage2Input) { - if (collectFull || stageId == "Total") { + for (const auto& [stageId, channels] : stage2Input) { + auto channelCount = channels.size(); + if (collectFull) { aggregatedQueryStat.AddCounter(queryStat.GetCounterName("TaskRunner", - {{"Stage", stageId},{"Input", "Total"}}, "ChannelsCount"), static_cast<ui64>(v.size())); + {{"Stage", stageId},{"Input", "Total"}}, "ChannelCount"), channelCount); } + aggregatedQueryStat.AddCounter(queryStat.GetCounterName("TaskRunner", + {{"Stage", "Total"},{"Input", "Total"}}, "ChannelCount"), channelCount); } - for (const auto& [stageId, v] : stage2Output) { - if (collectFull || stageId == "Total") { + for (const auto& [stageId, channels] : stage2Output) { + auto channelCount = channels.size(); + if (collectFull) { aggregatedQueryStat.AddCounter(queryStat.GetCounterName("TaskRunner", - {{"Stage", stageId},{"Output", "Total"}}, "ChannelsCount"), static_cast<ui64>(v.size())); + {{"Stage", stageId},{"Output", "Total"}}, "ChannelCount"), channelCount); } + aggregatedQueryStat.AddCounter(queryStat.GetCounterName("TaskRunner", + {{"Stage", "Total"},{"Output", "Total"}}, "ChannelCount"), channelCount); } return aggregatedQueryStat; diff --git a/ydb/library/yql/providers/dq/counters/task_counters.h b/ydb/library/yql/providers/dq/counters/task_counters.h index aee8bad563..2399d1d2c1 100644 --- a/ydb/library/yql/providers/dq/counters/task_counters.h +++ b/ydb/library/yql/providers/dq/counters/task_counters.h @@ -119,7 +119,7 @@ struct TTaskCounters : public TCounters { }; if (stats.ComputeCpuTime) SetCounter(GetCounterName("TaskRunner", labels, "ComputeCpuTime"), stats.ComputeCpuTime.MicroSeconds()); if (stats.BuildCpuTime) SetCounter(GetCounterName("TaskRunner", labels, "BuildCpuTime"), stats.BuildCpuTime.MicroSeconds()); - if (stats.WaitTime) SetCounter(GetCounterName("TaskRunner", labels, "WaitTime"), stats.WaitTime.MicroSeconds()); + if (stats.WaitInputTime) SetCounter(GetCounterName("TaskRunner", labels, "WaitInputTime"), stats.WaitInputTime.MicroSeconds()); if (stats.WaitOutputTime) SetCounter(GetCounterName("TaskRunner", labels, "WaitOutputTime"), stats.WaitOutputTime.MicroSeconds()); } diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp index d82727f03f..3a1bfc821b 100644 --- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp +++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp @@ -1646,9 +1646,6 @@ public: return nullptr; } - void UpdateStats() override { - } - const TDqMeteringStats* GetMeteringStats() const override { try { NDqProto::TCommandHeader header; diff --git a/ydb/library/yql/providers/generic/connector/app/server/service_pprof.go b/ydb/library/yql/providers/generic/connector/app/server/service_pprof.go index 7fcef47bd7..96c50fcfab 100644 --- a/ydb/library/yql/providers/generic/connector/app/server/service_pprof.go +++ b/ydb/library/yql/providers/generic/connector/app/server/service_pprof.go @@ -14,7 +14,6 @@ import ( type servicePprof struct { httpServer *http.Server - mux *http.ServeMux logger log.Logger } @@ -41,10 +40,6 @@ func (s *servicePprof) stop() { } func newServicePprof(logger log.Logger, cfg *config.TPprofServerConfig) service { - httpServer := &http.Server{ - Addr: utils.EndpointToString(cfg.Endpoint), - } - mux := http.NewServeMux() mux.HandleFunc("/debug/pprof/", pprof.Index) mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) @@ -52,12 +47,16 @@ func newServicePprof(logger log.Logger, cfg *config.TPprofServerConfig) service mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) mux.HandleFunc("/debug/pprof/trace", pprof.Trace) + httpServer := &http.Server{ + Addr: utils.EndpointToString(cfg.Endpoint), + Handler: mux, + } + // TODO: TLS logger.Warn("server will use insecure connections") return &servicePprof{ httpServer: httpServer, logger: logger, - mux: mux, } } diff --git a/ydb/library/yql/utils/simd/exec/CMakeLists.txt b/ydb/library/yql/utils/simd/exec/CMakeLists.txt index a3b4da0bea..ecb5351e08 100644 --- a/ydb/library/yql/utils/simd/exec/CMakeLists.txt +++ b/ydb/library/yql/utils/simd/exec/CMakeLists.txt @@ -8,3 +8,4 @@ add_subdirectory(pack_tuple) add_subdirectory(stream_store) +add_subdirectory(tuples_to_bucket) diff --git a/ydb/library/yql/utils/simd/exec/pack_tuple/main.cpp b/ydb/library/yql/utils/simd/exec/pack_tuple/main.cpp index ed9a4e0444..0c1cba5471 100644 --- a/ydb/library/yql/utils/simd/exec/pack_tuple/main.cpp +++ b/ydb/library/yql/utils/simd/exec/pack_tuple/main.cpp @@ -170,6 +170,6 @@ int main() { auto worker = tp.Create<NSimd::TSimdAVX2Traits>(); bool fine = true; - fine &= worker->PackTuple(true); + fine &= worker->PackTuple(false); return !fine; }
\ No newline at end of file diff --git a/ydb/library/yql/utils/simd/exec/stream_store/main.cpp b/ydb/library/yql/utils/simd/exec/stream_store/main.cpp index b879bf6fd7..ad731915f2 100644 --- a/ydb/library/yql/utils/simd/exec/stream_store/main.cpp +++ b/ydb/library/yql/utils/simd/exec/stream_store/main.cpp @@ -119,6 +119,6 @@ int main() { auto worker = NSimd::SelectSimdTraits(tp); bool fine = true; - fine &= worker->StoreStream(true); + fine &= worker->StoreStream(false); return !fine; }
\ No newline at end of file diff --git a/ydb/library/yql/utils/simd/exec/tuples_to_bucket/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/utils/simd/exec/tuples_to_bucket/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..44362215fd --- /dev/null +++ b/ydb/library/yql/utils/simd/exec/tuples_to_bucket/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,28 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(tuples_to_bucket) +target_link_libraries(tuples_to_bucket PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + yql-utils-simd +) +target_link_options(tuples_to_bucket PRIVATE + -Wl,-platform_version,macos,11.0,11.0 + -fPIC + -fPIC +) +target_sources(tuples_to_bucket PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/utils/simd/exec/tuples_to_bucket/main.cpp +) +target_allocator(tuples_to_bucket + system_allocator +) +vcs_info(tuples_to_bucket) diff --git a/ydb/library/yql/utils/simd/exec/tuples_to_bucket/CMakeLists.linux-aarch64.txt b/ydb/library/yql/utils/simd/exec/tuples_to_bucket/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..36edc4e5e3 --- /dev/null +++ b/ydb/library/yql/utils/simd/exec/tuples_to_bucket/CMakeLists.linux-aarch64.txt @@ -0,0 +1,33 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(tuples_to_bucket) +target_link_libraries(tuples_to_bucket PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + yql-utils-simd +) +target_link_options(tuples_to_bucket PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(tuples_to_bucket PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/utils/simd/exec/tuples_to_bucket/main.cpp +) +target_allocator(tuples_to_bucket + cpp-malloc-jemalloc +) +vcs_info(tuples_to_bucket) diff --git a/ydb/library/yql/utils/simd/exec/tuples_to_bucket/CMakeLists.linux-x86_64.txt b/ydb/library/yql/utils/simd/exec/tuples_to_bucket/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..0114e548de --- /dev/null +++ b/ydb/library/yql/utils/simd/exec/tuples_to_bucket/CMakeLists.linux-x86_64.txt @@ -0,0 +1,35 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(tuples_to_bucket) +target_link_libraries(tuples_to_bucket PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + yql-utils-simd +) +target_link_options(tuples_to_bucket PRIVATE + -ldl + -lrt + -Wl,--no-as-needed + -fPIC + -fPIC + -lpthread + -lrt + -ldl +) +target_sources(tuples_to_bucket PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/utils/simd/exec/tuples_to_bucket/main.cpp +) +target_allocator(tuples_to_bucket + cpp-malloc-tcmalloc + libs-tcmalloc-no_percpu_cache +) +vcs_info(tuples_to_bucket) diff --git a/ydb/library/yql/utils/simd/exec/tuples_to_bucket/CMakeLists.txt b/ydb/library/yql/utils/simd/exec/tuples_to_bucket/CMakeLists.txt new file mode 100644 index 0000000000..f8b31df0c1 --- /dev/null +++ b/ydb/library/yql/utils/simd/exec/tuples_to_bucket/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/library/yql/utils/simd/exec/tuples_to_bucket/CMakeLists.windows-x86_64.txt b/ydb/library/yql/utils/simd/exec/tuples_to_bucket/CMakeLists.windows-x86_64.txt new file mode 100644 index 0000000000..4a735e1f87 --- /dev/null +++ b/ydb/library/yql/utils/simd/exec/tuples_to_bucket/CMakeLists.windows-x86_64.txt @@ -0,0 +1,23 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_executable(tuples_to_bucket) +target_link_libraries(tuples_to_bucket PUBLIC + contrib-libs-cxxsupp + yutil + library-cpp-cpuid_check + yql-utils-simd +) +target_sources(tuples_to_bucket PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/library/yql/utils/simd/exec/tuples_to_bucket/main.cpp +) +target_allocator(tuples_to_bucket + system_allocator +) +vcs_info(tuples_to_bucket) diff --git a/ydb/library/yql/utils/simd/exec/tuples_to_bucket/main.cpp b/ydb/library/yql/utils/simd/exec/tuples_to_bucket/main.cpp new file mode 100644 index 0000000000..6945465ec9 --- /dev/null +++ b/ydb/library/yql/utils/simd/exec/tuples_to_bucket/main.cpp @@ -0,0 +1,142 @@ + +#include <util/generic/ptr.h> +#include <util/system/cpu_id.h> +#include <util/system/types.h> + +#include <ydb/library/yql/utils/simd/simd.h> + +struct TPerfomancer { + TPerfomancer() = default; + + struct TWrapWorker { + virtual int TuplesToBucket(bool log) = 0; + virtual ~TWrapWorker() = default; + }; + + template<typename TTraits> + struct TWorker : TWrapWorker { + template<typename T> + using TSimd = typename TTraits::template TSimd8<T>; + using TRegister = typename TTraits::TRegister; + TWorker() = default; + + void Info() { + if (TTraits::Size == 8) { + Cerr << "Fallback implementation:" << Endl; + } else if (TTraits::Size == 16) { + Cerr << "SSE42 implementation:" << Endl; + } else if (TTraits::Size == 32) { + Cerr << "AVX2 implementation:" << Endl; + } + } + + int TuplesToBucket(bool log = true) override { + const ui64 NTuples = 32 << 18; + const ui64 TupleSize = 4 * sizeof(ui64); + + ui64* arrHash __attribute__((aligned(32))) = new ui64[NTuples]; + ui64* arrData __attribute__((aligned(32))) = new ui64[4 * NTuples]; + + + for (ui32 i = 0; i < NTuples; i++) { + arrHash[i] = std::rand(); + arrData[4*i] = i; + arrData[4*i+1] = i+1; + arrData[4*i+2] = i+2; + arrData[4*i+3] = i+3; + } + + const ui64 NBuckets = 1 << 11; + const ui64 BufSize = 64; + const ui64 BucketSize = 1024 * 1024; + + ui64* arrBuckets __attribute__((aligned(32))) = new ui64[BucketSize * NBuckets / sizeof(ui64)]; + + for (ui32 i = 0; i < (BucketSize * NBuckets) / sizeof(ui64); i ++) { + arrBuckets[i] = i; + } + + ui32 offsets[NBuckets]; + ui32 bigOffsets[NBuckets]; + ui64* accum __attribute__((aligned(32))) = new ui64[NBuckets * (BufSize / sizeof(ui64))]; + + for (ui32 i = 0; i < NBuckets; i++ ) { + offsets[i] = 0; + bigOffsets[i] = 0; + accum[i] = 0; + } + + TSimd<ui8> readReg1; + + TRegister* addr1 = (TRegister*) arrData; + + std::chrono::steady_clock::time_point begin01 = + std::chrono::steady_clock::now(); + + ui64 hash1 = 0; + ui64 bucketNum = 0; + ui64* bufAddr; + ui64* bucketAddr; + + for (ui32 i = 0; i < NTuples / 2; i += 2) { + hash1 = arrHash[i]; + bucketNum = hash1 & (NBuckets - 1); + bufAddr = accum + bucketNum * (BufSize / sizeof(ui64)); + readReg1 = TSimd<ui8>((ui8*) addr1); + ui32 currOffset = offsets[bucketNum]; + readReg1.Store((ui8*) (bufAddr + currOffset)); + offsets[bucketNum] += TTraits::Size / 8; + if (currOffset + 4 >= (BufSize / sizeof(ui64))) { + offsets[bucketNum] = 0; + bucketAddr = arrBuckets + bucketNum * (BucketSize / sizeof(ui64)); + for (ui32 j = 0; j < BufSize / TTraits::Size; j++ ) { + readReg1 = TSimd<ui8>((ui8*) (bufAddr + TTraits::Size / 8 * j)); + readReg1.Load((ui8*) (bucketAddr + bigOffsets[bucketNum] + TTraits::Size / 8 * j)); + } + bigOffsets[bucketNum] += (BufSize / sizeof(ui64)); + } + addr1++; + } + + std::chrono::steady_clock::time_point end01 = + std::chrono::steady_clock::now(); + + ui64 microseconds = + std::chrono::duration_cast<std::chrono::microseconds>(end01 - begin01).count(); + if (log) { + Info(); + Cerr << "hash accum[12] arrBuckets[12]: " << hash1 << " " << accum[12] << " " << arrBuckets[12] << Endl; + Cerr << "Time for stream load = " << microseconds << "[microseconds]" + << Endl; + Cerr << "Data size = " << ((NTuples * TupleSize) / (1024 * 1024)) + << " [MB]" << Endl; + Cerr << "Stream load/save/accum speed = " + << (NTuples * TupleSize * 1000 * 1000) / + (1024 * 1024 * (microseconds + 1)) + << " MB/sec" << Endl; + Cerr << Endl; + } + + + delete[] arrHash; + delete[] arrData; + delete[] arrBuckets; + delete[] accum; + + return 1; + } + + ~TWorker() = default; + }; + + template<typename TTraits> + THolder<TWrapWorker> Create() const { + return MakeHolder<TWorker<TTraits>>(); + }; +}; + +int main() { + TPerfomancer tp; + auto worker = NSimd::SelectSimdTraits(tp); + return !worker->TuplesToBucket(false); +}
\ No newline at end of file diff --git a/ydb/library/yql/utils/simd/exec/tuples_to_bucket/ya.make b/ydb/library/yql/utils/simd/exec/tuples_to_bucket/ya.make new file mode 100644 index 0000000000..704a7b6c0a --- /dev/null +++ b/ydb/library/yql/utils/simd/exec/tuples_to_bucket/ya.make @@ -0,0 +1,7 @@ +PROGRAM() + +SRCS(main.cpp) + +PEERDIR(ydb/library/yql/utils/simd) + +END()
\ No newline at end of file diff --git a/ydb/library/yql/utils/simd/exec/ya.make b/ydb/library/yql/utils/simd/exec/ya.make index 5d18536908..c36eb91977 100644 --- a/ydb/library/yql/utils/simd/exec/ya.make +++ b/ydb/library/yql/utils/simd/exec/ya.make @@ -8,9 +8,14 @@ RUN( pack_tuple ) +RUN( + tuples_to_bucket +) + DEPENDS( ydb/library/yql/utils/simd/exec/stream_store ydb/library/yql/utils/simd/exec/pack_tuple + ydb/library/yql/utils/simd/exec/tuples_to_bucket ) PEERDIR( @@ -21,5 +26,6 @@ END() RECURSE( pack_tuple + tuples_to_bucket stream_store )
\ No newline at end of file diff --git a/ydb/library/yql/utils/simd/simd.h b/ydb/library/yql/utils/simd/simd.h index c55d3af3b8..5a8bcb6d6a 100644 --- a/ydb/library/yql/utils/simd/simd.h +++ b/ydb/library/yql/utils/simd/simd.h @@ -29,10 +29,8 @@ template<typename TFactory> auto SelectSimdTraits(const TFactory& factory) { if (NX86::HaveAVX2()) { return factory.template Create<TSimdAVX2Traits>(); - } else if (NX86::HaveSSE42()) { - return factory.template Create<TSimdSSE42Traits>(); } else { - return factory.template Create<TSimdFallbackTraits>(); + return factory.template Create<TSimdSSE42Traits>(); } } |