aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2023-11-06 15:02:32 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2023-11-06 15:19:17 +0300
commit2f26a2697d3f615c7546ff1a46c91bbe511d3b93 (patch)
treee46e64238d282523045d3d399022fbafda71097f
parenta75d931e987608f3e7cd857ddd6c5aea464e4233 (diff)
downloadydb-2f26a2697d3f615c7546ff1a46c91bbe511d3b93.tar.gz
Intermediate changes
-rw-r--r--.github/workflows/nightly_run.yaml1
-rw-r--r--.mapping.json5
-rw-r--r--build/conf/project_specific/yql_udf.conf20
-rw-r--r--build/plugins/yql_python_udf.py4
-rw-r--r--build/scripts/gen_yql_python_udf.py2
-rw-r--r--ydb/core/kqp/opt/kqp_query_plan.cpp5
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h12
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_stats.cpp16
-rw-r--r--ydb/library/yql/dq/actors/protos/dq_stats.proto9
-rw-r--r--ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp2
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.cpp77
-rw-r--r--ydb/library/yql/dq/runtime/dq_tasks_runner.h43
-rw-r--r--ydb/library/yql/providers/dq/actors/task_controller_impl.h39
-rw-r--r--ydb/library/yql/providers/dq/counters/task_counters.cpp30
-rw-r--r--ydb/library/yql/providers/dq/counters/task_counters.h2
-rw-r--r--ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp3
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/service_pprof.go11
-rw-r--r--ydb/library/yql/utils/simd/exec/CMakeLists.txt1
-rw-r--r--ydb/library/yql/utils/simd/exec/pack_tuple/main.cpp2
-rw-r--r--ydb/library/yql/utils/simd/exec/stream_store/main.cpp2
-rw-r--r--ydb/library/yql/utils/simd/exec/tuples_to_bucket/CMakeLists.darwin-x86_64.txt28
-rw-r--r--ydb/library/yql/utils/simd/exec/tuples_to_bucket/CMakeLists.linux-aarch64.txt33
-rw-r--r--ydb/library/yql/utils/simd/exec/tuples_to_bucket/CMakeLists.linux-x86_64.txt35
-rw-r--r--ydb/library/yql/utils/simd/exec/tuples_to_bucket/CMakeLists.txt17
-rw-r--r--ydb/library/yql/utils/simd/exec/tuples_to_bucket/CMakeLists.windows-x86_64.txt23
-rw-r--r--ydb/library/yql/utils/simd/exec/tuples_to_bucket/main.cpp142
-rw-r--r--ydb/library/yql/utils/simd/exec/tuples_to_bucket/ya.make7
-rw-r--r--ydb/library/yql/utils/simd/exec/ya.make6
-rw-r--r--ydb/library/yql/utils/simd/simd.h4
29 files changed, 409 insertions, 172 deletions
diff --git a/.github/workflows/nightly_run.yaml b/.github/workflows/nightly_run.yaml
index db624cb603..5570844040 100644
--- a/.github/workflows/nightly_run.yaml
+++ b/.github/workflows/nightly_run.yaml
@@ -40,6 +40,5 @@ jobs:
secrets: inherit
with:
build_target: ydb/
- sanitizer: none
run_build: true
run_tests: true
diff --git a/.mapping.json b/.mapping.json
index 027f520e1f..bbc838ef39 100644
--- a/.mapping.json
+++ b/.mapping.json
@@ -8800,6 +8800,11 @@
"ydb/library/yql/utils/simd/exec/stream_store/CMakeLists.linux-x86_64.txt":"",
"ydb/library/yql/utils/simd/exec/stream_store/CMakeLists.txt":"",
"ydb/library/yql/utils/simd/exec/stream_store/CMakeLists.windows-x86_64.txt":"",
+ "ydb/library/yql/utils/simd/exec/tuples_to_bucket/CMakeLists.darwin-x86_64.txt":"",
+ "ydb/library/yql/utils/simd/exec/tuples_to_bucket/CMakeLists.linux-aarch64.txt":"",
+ "ydb/library/yql/utils/simd/exec/tuples_to_bucket/CMakeLists.linux-x86_64.txt":"",
+ "ydb/library/yql/utils/simd/exec/tuples_to_bucket/CMakeLists.txt":"",
+ "ydb/library/yql/utils/simd/exec/tuples_to_bucket/CMakeLists.windows-x86_64.txt":"",
"ydb/library/yql/utils/simd/ut/CMakeLists.darwin-x86_64.txt":"",
"ydb/library/yql/utils/simd/ut/CMakeLists.linux-aarch64.txt":"",
"ydb/library/yql/utils/simd/ut/CMakeLists.linux-x86_64.txt":"",
diff --git a/build/conf/project_specific/yql_udf.conf b/build/conf/project_specific/yql_udf.conf
index a134843634..85024f4ad8 100644
--- a/build/conf/project_specific/yql_udf.conf
+++ b/build/conf/project_specific/yql_udf.conf
@@ -36,12 +36,16 @@ macro UDF_NO_PROBE() {
module YQL_UDF_TEST: PY3TEST_BIN {
SET_APPEND(_MAKEFILE_INCLUDE_LIKE_DEPS canondata/result.json)
- PEERDIR(ydb/library/yql/tests/common/udf_test)
+ PEERDIR(contrib/ydb/library/yql/tests/common/udf_test)
- DEPENDS(ydb/library/yql/tools/astdiff)
- DEPENDS(ydb/library/yql/tools/yqlrun)
- DATA(arcadia/ydb/library/yql/mount)
- DATA(arcadia/ydb/library/yql/cfg/udf_test)
+ DEPENDS(contrib/ydb/library/yql/tools/astdiff)
+ DEPENDS(contrib/ydb/library/yql/tools/yqlrun)
+ DATA(arcadia/contrib/ydb/library/yql/mount)
+ DATA(arcadia/contrib/ydb/library/yql/cfg/udf_test)
+ ENV(YQL_ASTDIFF_PATH="contrib/ydb/library/yql/tools/astdiff/astdiff")
+ ENV(YQL_CONFIG_DIR="contrib/ydb/library/yql/cfg/udf_test")
+ ENV(YQL_YQLRUN_PATH="contrib/ydb/library/yql/tools/yqlrun/yqlrun")
+ ENV(YQL_SQL2YQL_PATH="contrib/ydb/library/yql/tools/sql2yql/sql2yql")
}
module YQL_UDF_YDB_TEST: PY3TEST_BIN {
@@ -74,8 +78,8 @@ module YQL_UDF_TEST_CONTRIB: PY3TEST_BIN {
###
### https://yql.yandex-team.ru/docs/yt/udf/cpp/
macro _ADD_YQL_UDF_DEPS() {
- PEERDIR(ydb/library/yql/public/udf)
- PEERDIR(ydb/library/yql/public/udf/support)
+ PEERDIR(contrib/ydb/library/yql/public/udf)
+ PEERDIR(contrib/ydb/library/yql/public/udf/support)
}
macro _ADD_YQL_UDF_YDB_DEPS() {
@@ -97,7 +101,7 @@ macro _MAKE_YQL_UDF() {
_ADD_YQL_UDF_DEPS()
SET_APPEND(USER_CXXFLAGS -DBUILD_UDF)
# For Windows using declspecs
- DEFAULT(YQL_UDF_EXPORT ${ARCADIA_ROOT}/ydb/library/yql/public/udf/udfs_exports.exports)
+ DEFAULT(YQL_UDF_EXPORT ${ARCADIA_ROOT}/contrib/ydb/library/yql/public/udf/udfs_exports.exports)
when ($WINDOWS == "yes") {
YQL_UDF_EXPORT=
diff --git a/build/plugins/yql_python_udf.py b/build/plugins/yql_python_udf.py
index b0f9570090..5a0bb69c63 100644
--- a/build/plugins/yql_python_udf.py
+++ b/build/plugins/yql_python_udf.py
@@ -19,7 +19,7 @@ def onregister_yql_python_udf(unit, *args):
unit.onyql_abi_version(['2', '27', '0'])
unit.onpeerdir(['yql/udfs/common/python/python_udf'])
- unit.onpeerdir(['ydb/library/yql/public/udf'])
+ unit.onpeerdir(['contrib/ydb/library/yql/public/udf'])
if add_libra_modules:
unit.onpeerdir(['quality/user_sessions/libra_arc/noyql'])
@@ -37,7 +37,7 @@ def onregister_yql_python_udf(unit, *args):
output_includes = [
'yql/udfs/common/python/python_udf/python_udf.h',
- 'ydb/library/yql/public/udf/udf_registrator.h',
+ 'contrib/ydb/library/yql/public/udf/udf_registrator.h',
]
if add_libra_modules:
output_includes.append('yql/udfs/quality/libra/module/module.h')
diff --git a/build/scripts/gen_yql_python_udf.py b/build/scripts/gen_yql_python_udf.py
index 13b5898117..127b4b8867 100644
--- a/build/scripts/gen_yql_python_udf.py
+++ b/build/scripts/gen_yql_python_udf.py
@@ -3,7 +3,7 @@ import sys
TEMPLATE="""
#include <yql/udfs/common/python/python_udf/python_udf.h>
-#include <ydb/library/yql/public/udf/udf_registrator.h>
+#include <contrib/ydb/library/yql/public/udf/udf_registrator.h>
#if @WITH_LIBRA@
#include <yql/udfs/quality/libra/module/module.h>
diff --git a/ydb/core/kqp/opt/kqp_query_plan.cpp b/ydb/core/kqp/opt/kqp_query_plan.cpp
index 61419143e3..cab38fb67b 100644
--- a/ydb/core/kqp/opt/kqp_query_plan.cpp
+++ b/ydb/core/kqp/opt/kqp_query_plan.cpp
@@ -1975,9 +1975,8 @@ TString AddExecStatsToTxPlan(const TString& txPlanJson, const NYql::NDqProto::TD
SetNonZero(node, "ComputeTimeUs", taskStats.GetComputeCpuTimeUs());
- SetNonZero(node, "WaitTimeUs", taskStats.GetWaitTimeUs()); // need to be reviewed
- SetNonZero(node, "PendingInputTimeUs", taskStats.GetPendingInputTimeUs()); // need to be reviewed
- SetNonZero(node, "PendingOutputTimeUs", taskStats.GetPendingOutputTimeUs()); // need to be reviewed
+ SetNonZero(node, "WaitInputTimeUs", taskStats.GetWaitInputTimeUs());
+ SetNonZero(node, "WaitOutputTimeUs", taskStats.GetWaitOutputTimeUs());
NKqpProto::TKqpTaskExtraStats taskExtraStats;
if (taskStats.GetExtra().UnpackTo(&taskExtraStats)) {
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index 5e3b90e812..73d12d8077 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -625,8 +625,10 @@ protected:
void InternalError(NYql::NDqProto::StatusIds::StatusCode statusCode, TIssues issues) {
CA_LOG_E(InternalErrorLogString(statusCode, issues));
- TaskRunner->GetAllocatorPtr()->InvalidateMemInfo();
- TaskRunner->GetAllocatorPtr()->DisableStrictAllocationCheck();
+ if (TaskRunner) {
+ TaskRunner->GetAllocatorPtr()->InvalidateMemInfo();
+ TaskRunner->GetAllocatorPtr()->DisableStrictAllocationCheck();
+ }
std::optional<TGuard<NKikimr::NMiniKQL::TScopedAlloc>> guard = MaybeBindAllocator();
State = NDqProto::COMPUTE_STATE_FAILURE;
ReportStateAndMaybeDie(statusCode, issues);
@@ -1940,11 +1942,7 @@ private:
}
virtual const NYql::NDq::TTaskRunnerStatsBase* GetTaskRunnerStats() {
- if (!TaskRunner) {
- return nullptr;
- }
- TaskRunner->UpdateStats();
- return TaskRunner->GetStats();
+ return TaskRunner ? TaskRunner->GetStats() : nullptr;
}
virtual const IDqAsyncOutputBuffer* GetSink(ui64, const TAsyncOutputInfoBase& sinkInfo) const {
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_stats.cpp b/ydb/library/yql/dq/actors/compute/dq_compute_actor_stats.cpp
index bf0bc688d3..34385c17c4 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_stats.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_stats.cpp
@@ -44,14 +44,8 @@ void FillTaskRunnerStats(ui64 taskId, ui32 stageId, const TTaskRunnerStatsBase&
protoTask->SetComputeCpuTimeUs(taskStats.ComputeCpuTime.MicroSeconds());
protoTask->SetBuildCpuTimeUs(taskStats.BuildCpuTime.MicroSeconds());
- protoTask->SetWaitTimeUs(taskStats.WaitTime.MicroSeconds()); // to be reviewed
- protoTask->SetWaitOutputTimeUs(taskStats.WaitOutputTime.MicroSeconds()); // to be reviewed
-
- // All run statuses metrics
- protoTask->SetPendingInputTimeUs(taskStats.RunStatusTimeMetrics[ERunStatus::PendingInput].MicroSeconds()); // to be reviewed
- protoTask->SetPendingOutputTimeUs(taskStats.RunStatusTimeMetrics[ERunStatus::PendingOutput].MicroSeconds()); // to be reviewed
- protoTask->SetFinishTimeUs(taskStats.RunStatusTimeMetrics[ERunStatus::Finished].MicroSeconds()); // to be reviewed
- static_assert(TRunStatusTimeMetrics::StatusesCount == 3); // Add all statuses here
+ protoTask->SetWaitInputTimeUs(taskStats.WaitInputTime.MicroSeconds());
+ protoTask->SetWaitOutputTimeUs(taskStats.WaitOutputTime.MicroSeconds());
if (StatsLevelCollectProfile(level)) {
if (taskStats.ComputeCpuTimeByRun) {
@@ -106,6 +100,7 @@ void FillTaskRunnerStats(ui64 taskId, ui32 stageId, const TTaskRunnerStatsBase&
}
{
auto& protoChannel = *protoTask->AddInputChannels();
+ protoChannel.SetChannelId(pushStats.ChannelId); // only one of ids
protoChannel.SetSrcStageId(srcStageId);
FillAsyncStats(*protoChannel.MutablePush(), pushStats);
FillAsyncStats(*protoChannel.MutablePop(), popStats);
@@ -135,8 +130,8 @@ void FillTaskRunnerStats(ui64 taskId, ui32 stageId, const TTaskRunnerStatsBase&
//
// task runner is not aware of ingress/egress stats, fill in in CA
//
- for (auto& [inputIndex, sources] : taskStats.Sources) {
- if (StatsLevelCollectFull(level)) {
+ if (StatsLevelCollectFull(level)) {
+ for (auto& [inputIndex, sources] : taskStats.Sources) {
auto& protoSource = *protoTask->AddSources();
protoSource.SetInputIndex(inputIndex);
FillAsyncStats(*protoSource.MutablePush(), sources->GetPushStats());
@@ -184,6 +179,7 @@ void FillTaskRunnerStats(ui64 taskId, ui32 stageId, const TTaskRunnerStatsBase&
}
{
auto& protoChannel = *protoTask->AddOutputChannels();
+ protoChannel.SetChannelId(popStats.ChannelId); // only one of ids
protoChannel.SetDstStageId(dstStageId);
FillAsyncStats(*protoChannel.MutablePush(), pushStats);
FillAsyncStats(*protoChannel.MutablePop(), popStats);
diff --git a/ydb/library/yql/dq/actors/protos/dq_stats.proto b/ydb/library/yql/dq/actors/protos/dq_stats.proto
index a67d87d70d..bcc01b4084 100644
--- a/ydb/library/yql/dq/actors/protos/dq_stats.proto
+++ b/ydb/library/yql/dq/actors/protos/dq_stats.proto
@@ -205,11 +205,12 @@ message TDqTaskStats {
uint64 StartTimeMs = 158;
uint64 FinishTimeMs = 5; // task finish time, timestamp in millis
uint64 FirstRowTimeMs = 4; // first row time, timestamp in millis
- uint64 WaitTimeUs = 104; // total wait (input + output) wall time
+ reserved 104;
+ uint64 WaitInputTimeUs = 111; // wait input wall time (any input: channels, source, ...)
uint64 WaitOutputTimeUs = 105; // wait output wall time (any output: channels, sinks, ...)
- uint64 PendingInputTimeUs = 107; // time waiting input data
- uint64 PendingOutputTimeUs = 108; // time waiting output data
- uint64 FinishTimeUs = 109; // time in finished state // ComputeCpuTimeUs + PendingInputTimeUs + PendingOutputTimeUs + FinishTimeUs == 100% (or == const in aggregated graphs for several stages/tasks)
+ reserved 107;
+ reserved 108;
+ reserved 109;
}
message TDqComputeActorStats {
diff --git a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
index a9bb0e48d2..b8ade00c4f 100644
--- a/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
+++ b/ydb/library/yql/dq/actors/task_runner/task_runner_actor_local.cpp
@@ -83,7 +83,6 @@ public:
private:
void OnStatisticsRequest(TEvStatistics::TPtr& ev) {
- TaskRunner->UpdateStats();
THashMap<ui32, const IDqAsyncOutputBuffer*> sinks;
for (const auto sinkId : ev->Get()->SinkIds) {
@@ -220,7 +219,6 @@ private:
{
auto st = MakeHolder<TEvStatistics>(std::move(ev->Get()->SinkIds), std::move(ev->Get()->InputTransformIds));
- TaskRunner->UpdateStats();
THashMap<ui32, const IDqAsyncOutputBuffer*> sinks;
for (const auto sinkId : st->SinkIds) {
sinks[sinkId] = TaskRunner->GetSink(sinkId).Get();
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
index c35a07e506..03c9c2a3a5 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.cpp
@@ -709,11 +709,12 @@ public:
RunComputeTime = TDuration::Zero();
- auto runStatus = FetchAndDispatch();
- if (Stats) {
- Stats->RunStatusTimeMetrics.SetCurrentStatus(runStatus, RunComputeTime);
+ if (Y_LIKELY(CollectBasic())) {
+ StopWaiting();
}
+ auto runStatus = FetchAndDispatch();
+
if (Y_UNLIKELY(CollectFull())) {
Stats->ComputeCpuTimeByRun->Collect(RunComputeTime.MilliSeconds());
@@ -725,24 +726,21 @@ public:
}
}
- if (runStatus == ERunStatus::Finished) {
- if (CollectBasic()) {
- Stats->FinishTs = TInstant::Now();
- StopWaiting(Stats->FinishTs);
- }
-
- return ERunStatus::Finished;
- }
-
- if (CollectBasic()) {
- auto now = TInstant::Now();
- StartWaiting(now);
- if (runStatus == ERunStatus::PendingOutput) {
- StartWaitingOutput(now);
+ if (Y_LIKELY(CollectBasic())) {
+ switch (runStatus) {
+ case ERunStatus::Finished:
+ Stats->FinishTs = TInstant::Now();
+ break;
+ case ERunStatus::PendingInput:
+ StartWaitingInput();
+ break;
+ case ERunStatus::PendingOutput:
+ StartWaitingOutput();
+ break;
}
}
- return runStatus; // PendingInput or PendingOutput
+ return runStatus;
}
bool HasEffects() const final {
@@ -833,12 +831,6 @@ public:
return Context.RandomProvider;
}
- void UpdateStats() override {
- if (Stats) {
- Stats->RunStatusTimeMetrics.UpdateStatusTime();
- }
- }
-
const TDqTaskRunnerStats* GetStats() const override {
return Stats.get();
}
@@ -901,12 +893,6 @@ private:
wideBuffer.resize(AllocatedHolder->OutputWideType->GetElementsCount());
}
while (!AllocatedHolder->Output->IsFull()) {
- if (CollectBasic()) {
- auto now = TInstant::Now();
- StopWaitingOutput(now);
- StopWaiting(now);
- }
-
NUdf::TUnboxedValue value;
NUdf::EFetchStatus fetchStatus;
if (isWide) {
@@ -1000,32 +986,29 @@ private:
private:
// statistics support
+ std::optional<TInstant> StartWaitInputTime;
std::optional<TInstant> StartWaitOutputTime;
- std::optional<TInstant> StartWaitTime;
- void StartWaitingOutput(TInstant now) {
- if (CollectBasic() && !StartWaitOutputTime) {
- StartWaitOutputTime = now;
+ void StartWaitingInput() {
+ if (!StartWaitInputTime) {
+ StartWaitInputTime = TInstant::Now();
}
}
- void StopWaitingOutput(TInstant now) {
- if (CollectBasic() && StartWaitOutputTime) {
- Stats->WaitOutputTime += (now - *StartWaitOutputTime);
- StartWaitOutputTime.reset();
+ void StartWaitingOutput() {
+ if (!StartWaitOutputTime) {
+ StartWaitOutputTime = TInstant::Now();
}
}
- void StartWaiting(TInstant now) {
- if (CollectBasic() && !StartWaitTime) {
- StartWaitTime = now;
+ void StopWaiting() {
+ if (StartWaitInputTime) {
+ Stats->WaitInputTime += (TInstant::Now() - *StartWaitInputTime);
+ StartWaitInputTime.reset();
}
- }
-
- void StopWaiting(TInstant now) {
- if (CollectBasic() && StartWaitTime) {
- Stats->WaitTime += (now - *StartWaitTime);
- StartWaitTime.reset();
+ if (StartWaitOutputTime) {
+ Stats->WaitOutputTime += (TInstant::Now() - *StartWaitOutputTime);
+ StartWaitOutputTime.reset();
}
}
};
diff --git a/ydb/library/yql/dq/runtime/dq_tasks_runner.h b/ydb/library/yql/dq/runtime/dq_tasks_runner.h
index f89243ad6a..42df8bb027 100644
--- a/ydb/library/yql/dq/runtime/dq_tasks_runner.h
+++ b/ydb/library/yql/dq/runtime/dq_tasks_runner.h
@@ -30,40 +30,6 @@ enum class ERunStatus : ui32 {
PendingOutput
};
-class TRunStatusTimeMetrics {
-public:
- void UpdateStatusTime(TDuration computeCpuTime = TDuration::Zero()) {
- auto now = TInstant::Now();
- StatusTime[ui32(CurrentStatus)] += now - StatusStartTime - computeCpuTime;
- StatusStartTime = now;
- }
-
- void SetCurrentStatus(ERunStatus status, TDuration computeCpuTime) {
- Y_ABORT_UNLESS(ui32(status) < StatusesCount);
- UpdateStatusTime(computeCpuTime);
- CurrentStatus = status;
- }
-
- TDuration operator[](ERunStatus status) const {
- const ui32 index = ui32(status);
- Y_ABORT_UNLESS(index < StatusesCount);
- return StatusTime[index];
- }
-
- void Load(ERunStatus status, TDuration d) {
- const ui32 index = ui32(status);
- Y_ABORT_UNLESS(index < StatusesCount);
- StatusTime[index] = d;
- }
-
- static constexpr ui32 StatusesCount = 3;
-
-private:
- TInstant StatusStartTime = TInstant::Now();
- ERunStatus CurrentStatus = ERunStatus::PendingInput;
- TDuration StatusTime[StatusesCount];
-};
-
struct TMkqlStat {
NKikimr::NMiniKQL::TStatKey Key;
i64 Value = 0;
@@ -76,12 +42,10 @@ struct TTaskRunnerStatsBase {
TInstant StartTs;
TDuration ComputeCpuTime;
- TRunStatusTimeMetrics RunStatusTimeMetrics; // ComputeCpuTime + RunStatusTimeMetrics == 100% time
-
- // profile stats
- TDuration WaitTime; // wall time of waiting for input, scans & output
+ TDuration WaitInputTime;
TDuration WaitOutputTime;
+ // profile stats
NMonitoring::IHistogramCollectorPtr ComputeCpuTimeByRun; // in millis
THashMap<ui32, THashMap<ui64, IDqInputChannel::TPtr>> InputChannels; // SrcStageId => {ChannelId => Channel}
@@ -408,7 +372,6 @@ public:
virtual const THashMap<TString, TString>& GetTaskParams() const = 0;
virtual const TVector<TString>& GetReadRanges() const = 0;
- virtual void UpdateStats() = 0;
virtual const TDqTaskRunnerStats* GetStats() const = 0;
virtual const TDqMeteringStats* GetMeteringStats() const = 0;
@@ -432,7 +395,7 @@ inline void Out<NYql::NDq::TTaskRunnerStatsBase>(IOutputStream& os, TTypeTraits<
<< "\tStartTs: " << stats.StartTs << Endl
<< "\tFinishTs: " << stats.FinishTs << Endl
<< "\tComputeCpuTime: " << stats.ComputeCpuTime << Endl
- << "\tWaitTime: " << stats.WaitTime << Endl
+ << "\tWaitInputTime: " << stats.WaitInputTime << Endl
<< "\tWaitOutputTime: " << stats.WaitOutputTime << Endl
<< "\tsize of InputChannels: " << stats.InputChannels.size() << Endl
<< "\tsize of Sources: " << stats.Sources.size() << Endl
diff --git a/ydb/library/yql/providers/dq/actors/task_controller_impl.h b/ydb/library/yql/providers/dq/actors/task_controller_impl.h
index fd3a49f4b0..ec5434f430 100644
--- a/ydb/library/yql/providers/dq/actors/task_controller_impl.h
+++ b/ydb/library/yql/providers/dq/actors/task_controller_impl.h
@@ -84,7 +84,12 @@ public:
}
~TTaskControllerImpl() override {
- SetTaskCountMetric(0);
+ // we want to clear TaskCount instantly to use with real-time monitoring
+ // all other counters will be kept for some time to upload to Monitoring
+ // and removed together
+ auto aggrStats = AggregateQueryStatsByStage(TaskStat, Stages, CollectFull());
+ aggrStats.SetCounter(TaskStat.GetCounterName("TaskRunner", {{"Stage", "Total"}}, "TaskCount"), 0);
+ ExportStats(aggrStats, 0);
}
public:
@@ -291,6 +296,9 @@ private:
} else if (name == "InputRows") {
if (labels.count(SinkLabel)) publicCounterName = "query.sink_output_records";
isDeriv = true;
+ } else if (name == "TaskCount") {
+ publicCounterName = "query.running_tasks";
+ isDeriv = true;
} else if (name == "MultiHop_LateThrownEventsCount") {
publicCounterName = "query.late_events";
isDeriv = true;
@@ -369,13 +377,11 @@ private:
ADD_COUNTER(OutputBytes)
// ADD_COUNTER(StartTimeMs)
+ ADD_COUNTER(WaitInputTimeUs)
+ ADD_COUNTER(WaitOutputTimeUs)
+
// profile stats
ADD_COUNTER(BuildCpuTimeUs)
- // ADD_COUNTER(WaitTimeUs)
- // ADD_COUNTER(WaitOutputTimeUs)
- // ADD_COUNTER(PendingInputTimeUs)
- // ADD_COUNTER(PendingOutputTimeUs)
- // ADD_COUNTER(FinishTimeUs)
for (const auto& ingress : s.GetIngress()) {
TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner", labels, "Ingress" + ingress.GetName() + "Bytes"), ingress.GetBytes());
@@ -460,18 +466,6 @@ private:
}
}
- void SetTaskCountMetric(ui64 count) {
- if (!ServiceCounters.Counters) {
- return;
- }
- *ServiceCounters.Counters->GetCounter("TaskCount") = count;
-
- if (!ServiceCounters.PublicCounters) {
- return;
- }
- *ServiceCounters.PublicCounters->GetNamedCounter("name", "query.running_tasks") = count;
- }
-
public:
void OnReadyState(TEvReadyState::TPtr& ev) {
@@ -486,8 +480,6 @@ public:
const auto& actorIds = ev->Get()->Record.GetActorId();
Y_ABORT_UNLESS(tasks.size() == actorIds.size());
- SetTaskCountMetric(tasks.size());
-
for (int i = 0; i < static_cast<int>(tasks.size()); ++i) {
auto actorId = ActorIdFromProto(actorIds[i]);
const auto& task = Tasks.emplace_back(NDq::TDqTaskSettings(&tasks[i]), actorId).first;
@@ -498,6 +490,13 @@ public:
Stages.emplace(task.GetId(), taskMeta.GetStageId());
}
+ for (const auto& [taskId, stageId] : Stages) {
+ TaskStat.SetCounter(TaskStat.GetCounterName("TaskRunner",
+ {{"Task", ToString(taskId)}, {"Stage", ToString(stageId)}}, "CpuTimeUs"), 0);
+ }
+
+ ExportStats(AggregateQueryStatsByStage(TaskStat, Stages, CollectFull()), 0);
+
YQL_CLOG(DEBUG, ProviderDq) << "Ready State: " << SelfId();
MaybeUpdateChannels();
diff --git a/ydb/library/yql/providers/dq/counters/task_counters.cpp b/ydb/library/yql/providers/dq/counters/task_counters.cpp
index cd56913042..a40a781eda 100644
--- a/ydb/library/yql/providers/dq/counters/task_counters.cpp
+++ b/ydb/library/yql/providers/dq/counters/task_counters.cpp
@@ -77,7 +77,6 @@ TTaskCounters AggregateQueryStatsByStage(TTaskCounters& queryStat, const THashMa
labels.erase(maybeSrcStageId);
}
stage2Input[stageId].insert(channelId);
- stage2Input["Total"].insert(channelId);
labels.erase(maybeInputChannel);
labels["Input"] = ToString(stage);
input = true;
@@ -94,7 +93,6 @@ TTaskCounters AggregateQueryStatsByStage(TTaskCounters& queryStat, const THashMa
labels.erase(maybeDstStageId);
}
stage2Output[stageId].insert(channelId);
- stage2Output["Total"].insert(channelId);
labels.erase(maybeOutputChannel);
labels["Output"] = ToString(stage);
output = true;
@@ -102,7 +100,6 @@ TTaskCounters AggregateQueryStatsByStage(TTaskCounters& queryStat, const THashMa
labels.erase(maybeTask);
labels["Stage"] = ToString(stageId);
stage2Tasks[stageId].insert(taskId);
- stage2Tasks["Total"].insert(taskId);
if (collectFull) {
aggregatedQueryStat.AddCounter(queryStat.GetCounterName("TaskRunner", labels, name), v);
}
@@ -148,23 +145,32 @@ TTaskCounters AggregateQueryStatsByStage(TTaskCounters& queryStat, const THashMa
}
}
- for (const auto& [stageId, v] : stage2Tasks) {
- if (collectFull || stageId == "Total") {
+ for (const auto& [stageId, tasks] : stage2Tasks) {
+ auto taskCount = tasks.size();
+ if (collectFull) {
aggregatedQueryStat.AddCounter(queryStat.GetCounterName("TaskRunner",
- {{"Stage", stageId}}, "TasksCount"), static_cast<ui64>(v.size()));
+ {{"Stage", stageId}}, "TaskCount"), taskCount);
}
+ aggregatedQueryStat.AddCounter(queryStat.GetCounterName("TaskRunner",
+ {{"Stage", "Total"}}, "TaskCount"), taskCount);
}
- for (const auto& [stageId, v] : stage2Input) {
- if (collectFull || stageId == "Total") {
+ for (const auto& [stageId, channels] : stage2Input) {
+ auto channelCount = channels.size();
+ if (collectFull) {
aggregatedQueryStat.AddCounter(queryStat.GetCounterName("TaskRunner",
- {{"Stage", stageId},{"Input", "Total"}}, "ChannelsCount"), static_cast<ui64>(v.size()));
+ {{"Stage", stageId},{"Input", "Total"}}, "ChannelCount"), channelCount);
}
+ aggregatedQueryStat.AddCounter(queryStat.GetCounterName("TaskRunner",
+ {{"Stage", "Total"},{"Input", "Total"}}, "ChannelCount"), channelCount);
}
- for (const auto& [stageId, v] : stage2Output) {
- if (collectFull || stageId == "Total") {
+ for (const auto& [stageId, channels] : stage2Output) {
+ auto channelCount = channels.size();
+ if (collectFull) {
aggregatedQueryStat.AddCounter(queryStat.GetCounterName("TaskRunner",
- {{"Stage", stageId},{"Output", "Total"}}, "ChannelsCount"), static_cast<ui64>(v.size()));
+ {{"Stage", stageId},{"Output", "Total"}}, "ChannelCount"), channelCount);
}
+ aggregatedQueryStat.AddCounter(queryStat.GetCounterName("TaskRunner",
+ {{"Stage", "Total"},{"Output", "Total"}}, "ChannelCount"), channelCount);
}
return aggregatedQueryStat;
diff --git a/ydb/library/yql/providers/dq/counters/task_counters.h b/ydb/library/yql/providers/dq/counters/task_counters.h
index aee8bad563..2399d1d2c1 100644
--- a/ydb/library/yql/providers/dq/counters/task_counters.h
+++ b/ydb/library/yql/providers/dq/counters/task_counters.h
@@ -119,7 +119,7 @@ struct TTaskCounters : public TCounters {
};
if (stats.ComputeCpuTime) SetCounter(GetCounterName("TaskRunner", labels, "ComputeCpuTime"), stats.ComputeCpuTime.MicroSeconds());
if (stats.BuildCpuTime) SetCounter(GetCounterName("TaskRunner", labels, "BuildCpuTime"), stats.BuildCpuTime.MicroSeconds());
- if (stats.WaitTime) SetCounter(GetCounterName("TaskRunner", labels, "WaitTime"), stats.WaitTime.MicroSeconds());
+ if (stats.WaitInputTime) SetCounter(GetCounterName("TaskRunner", labels, "WaitInputTime"), stats.WaitInputTime.MicroSeconds());
if (stats.WaitOutputTime) SetCounter(GetCounterName("TaskRunner", labels, "WaitOutputTime"), stats.WaitOutputTime.MicroSeconds());
}
diff --git a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
index d82727f03f..3a1bfc821b 100644
--- a/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
+++ b/ydb/library/yql/providers/dq/task_runner/tasks_runner_pipe.cpp
@@ -1646,9 +1646,6 @@ public:
return nullptr;
}
- void UpdateStats() override {
- }
-
const TDqMeteringStats* GetMeteringStats() const override {
try {
NDqProto::TCommandHeader header;
diff --git a/ydb/library/yql/providers/generic/connector/app/server/service_pprof.go b/ydb/library/yql/providers/generic/connector/app/server/service_pprof.go
index 7fcef47bd7..96c50fcfab 100644
--- a/ydb/library/yql/providers/generic/connector/app/server/service_pprof.go
+++ b/ydb/library/yql/providers/generic/connector/app/server/service_pprof.go
@@ -14,7 +14,6 @@ import (
type servicePprof struct {
httpServer *http.Server
- mux *http.ServeMux
logger log.Logger
}
@@ -41,10 +40,6 @@ func (s *servicePprof) stop() {
}
func newServicePprof(logger log.Logger, cfg *config.TPprofServerConfig) service {
- httpServer := &http.Server{
- Addr: utils.EndpointToString(cfg.Endpoint),
- }
-
mux := http.NewServeMux()
mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
@@ -52,12 +47,16 @@ func newServicePprof(logger log.Logger, cfg *config.TPprofServerConfig) service
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
+ httpServer := &http.Server{
+ Addr: utils.EndpointToString(cfg.Endpoint),
+ Handler: mux,
+ }
+
// TODO: TLS
logger.Warn("server will use insecure connections")
return &servicePprof{
httpServer: httpServer,
logger: logger,
- mux: mux,
}
}
diff --git a/ydb/library/yql/utils/simd/exec/CMakeLists.txt b/ydb/library/yql/utils/simd/exec/CMakeLists.txt
index a3b4da0bea..ecb5351e08 100644
--- a/ydb/library/yql/utils/simd/exec/CMakeLists.txt
+++ b/ydb/library/yql/utils/simd/exec/CMakeLists.txt
@@ -8,3 +8,4 @@
add_subdirectory(pack_tuple)
add_subdirectory(stream_store)
+add_subdirectory(tuples_to_bucket)
diff --git a/ydb/library/yql/utils/simd/exec/pack_tuple/main.cpp b/ydb/library/yql/utils/simd/exec/pack_tuple/main.cpp
index ed9a4e0444..0c1cba5471 100644
--- a/ydb/library/yql/utils/simd/exec/pack_tuple/main.cpp
+++ b/ydb/library/yql/utils/simd/exec/pack_tuple/main.cpp
@@ -170,6 +170,6 @@ int main() {
auto worker = tp.Create<NSimd::TSimdAVX2Traits>();
bool fine = true;
- fine &= worker->PackTuple(true);
+ fine &= worker->PackTuple(false);
return !fine;
} \ No newline at end of file
diff --git a/ydb/library/yql/utils/simd/exec/stream_store/main.cpp b/ydb/library/yql/utils/simd/exec/stream_store/main.cpp
index b879bf6fd7..ad731915f2 100644
--- a/ydb/library/yql/utils/simd/exec/stream_store/main.cpp
+++ b/ydb/library/yql/utils/simd/exec/stream_store/main.cpp
@@ -119,6 +119,6 @@ int main() {
auto worker = NSimd::SelectSimdTraits(tp);
bool fine = true;
- fine &= worker->StoreStream(true);
+ fine &= worker->StoreStream(false);
return !fine;
} \ No newline at end of file
diff --git a/ydb/library/yql/utils/simd/exec/tuples_to_bucket/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/utils/simd/exec/tuples_to_bucket/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 0000000000..44362215fd
--- /dev/null
+++ b/ydb/library/yql/utils/simd/exec/tuples_to_bucket/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,28 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(tuples_to_bucket)
+target_link_libraries(tuples_to_bucket PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-cpuid_check
+ yql-utils-simd
+)
+target_link_options(tuples_to_bucket PRIVATE
+ -Wl,-platform_version,macos,11.0,11.0
+ -fPIC
+ -fPIC
+)
+target_sources(tuples_to_bucket PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/utils/simd/exec/tuples_to_bucket/main.cpp
+)
+target_allocator(tuples_to_bucket
+ system_allocator
+)
+vcs_info(tuples_to_bucket)
diff --git a/ydb/library/yql/utils/simd/exec/tuples_to_bucket/CMakeLists.linux-aarch64.txt b/ydb/library/yql/utils/simd/exec/tuples_to_bucket/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..36edc4e5e3
--- /dev/null
+++ b/ydb/library/yql/utils/simd/exec/tuples_to_bucket/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,33 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(tuples_to_bucket)
+target_link_libraries(tuples_to_bucket PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ yql-utils-simd
+)
+target_link_options(tuples_to_bucket PRIVATE
+ -ldl
+ -lrt
+ -Wl,--no-as-needed
+ -fPIC
+ -fPIC
+ -lpthread
+ -lrt
+ -ldl
+)
+target_sources(tuples_to_bucket PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/utils/simd/exec/tuples_to_bucket/main.cpp
+)
+target_allocator(tuples_to_bucket
+ cpp-malloc-jemalloc
+)
+vcs_info(tuples_to_bucket)
diff --git a/ydb/library/yql/utils/simd/exec/tuples_to_bucket/CMakeLists.linux-x86_64.txt b/ydb/library/yql/utils/simd/exec/tuples_to_bucket/CMakeLists.linux-x86_64.txt
new file mode 100644
index 0000000000..0114e548de
--- /dev/null
+++ b/ydb/library/yql/utils/simd/exec/tuples_to_bucket/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,35 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(tuples_to_bucket)
+target_link_libraries(tuples_to_bucket PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-cpuid_check
+ yql-utils-simd
+)
+target_link_options(tuples_to_bucket PRIVATE
+ -ldl
+ -lrt
+ -Wl,--no-as-needed
+ -fPIC
+ -fPIC
+ -lpthread
+ -lrt
+ -ldl
+)
+target_sources(tuples_to_bucket PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/utils/simd/exec/tuples_to_bucket/main.cpp
+)
+target_allocator(tuples_to_bucket
+ cpp-malloc-tcmalloc
+ libs-tcmalloc-no_percpu_cache
+)
+vcs_info(tuples_to_bucket)
diff --git a/ydb/library/yql/utils/simd/exec/tuples_to_bucket/CMakeLists.txt b/ydb/library/yql/utils/simd/exec/tuples_to_bucket/CMakeLists.txt
new file mode 100644
index 0000000000..f8b31df0c1
--- /dev/null
+++ b/ydb/library/yql/utils/simd/exec/tuples_to_bucket/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/library/yql/utils/simd/exec/tuples_to_bucket/CMakeLists.windows-x86_64.txt b/ydb/library/yql/utils/simd/exec/tuples_to_bucket/CMakeLists.windows-x86_64.txt
new file mode 100644
index 0000000000..4a735e1f87
--- /dev/null
+++ b/ydb/library/yql/utils/simd/exec/tuples_to_bucket/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,23 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_executable(tuples_to_bucket)
+target_link_libraries(tuples_to_bucket PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ library-cpp-cpuid_check
+ yql-utils-simd
+)
+target_sources(tuples_to_bucket PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/utils/simd/exec/tuples_to_bucket/main.cpp
+)
+target_allocator(tuples_to_bucket
+ system_allocator
+)
+vcs_info(tuples_to_bucket)
diff --git a/ydb/library/yql/utils/simd/exec/tuples_to_bucket/main.cpp b/ydb/library/yql/utils/simd/exec/tuples_to_bucket/main.cpp
new file mode 100644
index 0000000000..6945465ec9
--- /dev/null
+++ b/ydb/library/yql/utils/simd/exec/tuples_to_bucket/main.cpp
@@ -0,0 +1,142 @@
+
+#include <util/generic/ptr.h>
+#include <util/system/cpu_id.h>
+#include <util/system/types.h>
+
+#include <ydb/library/yql/utils/simd/simd.h>
+
+struct TPerfomancer {
+ TPerfomancer() = default;
+
+ struct TWrapWorker {
+ virtual int TuplesToBucket(bool log) = 0;
+ virtual ~TWrapWorker() = default;
+ };
+
+ template<typename TTraits>
+ struct TWorker : TWrapWorker {
+ template<typename T>
+ using TSimd = typename TTraits::template TSimd8<T>;
+ using TRegister = typename TTraits::TRegister;
+ TWorker() = default;
+
+ void Info() {
+ if (TTraits::Size == 8) {
+ Cerr << "Fallback implementation:" << Endl;
+ } else if (TTraits::Size == 16) {
+ Cerr << "SSE42 implementation:" << Endl;
+ } else if (TTraits::Size == 32) {
+ Cerr << "AVX2 implementation:" << Endl;
+ }
+ }
+
+ int TuplesToBucket(bool log = true) override {
+ const ui64 NTuples = 32 << 18;
+ const ui64 TupleSize = 4 * sizeof(ui64);
+
+ ui64* arrHash __attribute__((aligned(32))) = new ui64[NTuples];
+ ui64* arrData __attribute__((aligned(32))) = new ui64[4 * NTuples];
+
+
+ for (ui32 i = 0; i < NTuples; i++) {
+ arrHash[i] = std::rand();
+ arrData[4*i] = i;
+ arrData[4*i+1] = i+1;
+ arrData[4*i+2] = i+2;
+ arrData[4*i+3] = i+3;
+ }
+
+ const ui64 NBuckets = 1 << 11;
+ const ui64 BufSize = 64;
+ const ui64 BucketSize = 1024 * 1024;
+
+ ui64* arrBuckets __attribute__((aligned(32))) = new ui64[BucketSize * NBuckets / sizeof(ui64)];
+
+ for (ui32 i = 0; i < (BucketSize * NBuckets) / sizeof(ui64); i ++) {
+ arrBuckets[i] = i;
+ }
+
+ ui32 offsets[NBuckets];
+ ui32 bigOffsets[NBuckets];
+ ui64* accum __attribute__((aligned(32))) = new ui64[NBuckets * (BufSize / sizeof(ui64))];
+
+ for (ui32 i = 0; i < NBuckets; i++ ) {
+ offsets[i] = 0;
+ bigOffsets[i] = 0;
+ accum[i] = 0;
+ }
+
+ TSimd<ui8> readReg1;
+
+ TRegister* addr1 = (TRegister*) arrData;
+
+ std::chrono::steady_clock::time_point begin01 =
+ std::chrono::steady_clock::now();
+
+ ui64 hash1 = 0;
+ ui64 bucketNum = 0;
+ ui64* bufAddr;
+ ui64* bucketAddr;
+
+ for (ui32 i = 0; i < NTuples / 2; i += 2) {
+ hash1 = arrHash[i];
+ bucketNum = hash1 & (NBuckets - 1);
+ bufAddr = accum + bucketNum * (BufSize / sizeof(ui64));
+ readReg1 = TSimd<ui8>((ui8*) addr1);
+ ui32 currOffset = offsets[bucketNum];
+ readReg1.Store((ui8*) (bufAddr + currOffset));
+ offsets[bucketNum] += TTraits::Size / 8;
+ if (currOffset + 4 >= (BufSize / sizeof(ui64))) {
+ offsets[bucketNum] = 0;
+ bucketAddr = arrBuckets + bucketNum * (BucketSize / sizeof(ui64));
+ for (ui32 j = 0; j < BufSize / TTraits::Size; j++ ) {
+ readReg1 = TSimd<ui8>((ui8*) (bufAddr + TTraits::Size / 8 * j));
+ readReg1.Load((ui8*) (bucketAddr + bigOffsets[bucketNum] + TTraits::Size / 8 * j));
+ }
+ bigOffsets[bucketNum] += (BufSize / sizeof(ui64));
+ }
+ addr1++;
+ }
+
+ std::chrono::steady_clock::time_point end01 =
+ std::chrono::steady_clock::now();
+
+ ui64 microseconds =
+ std::chrono::duration_cast<std::chrono::microseconds>(end01 - begin01).count();
+ if (log) {
+ Info();
+ Cerr << "hash accum[12] arrBuckets[12]: " << hash1 << " " << accum[12] << " " << arrBuckets[12] << Endl;
+ Cerr << "Time for stream load = " << microseconds << "[microseconds]"
+ << Endl;
+ Cerr << "Data size = " << ((NTuples * TupleSize) / (1024 * 1024))
+ << " [MB]" << Endl;
+ Cerr << "Stream load/save/accum speed = "
+ << (NTuples * TupleSize * 1000 * 1000) /
+ (1024 * 1024 * (microseconds + 1))
+ << " MB/sec" << Endl;
+ Cerr << Endl;
+ }
+
+
+ delete[] arrHash;
+ delete[] arrData;
+ delete[] arrBuckets;
+ delete[] accum;
+
+ return 1;
+ }
+
+ ~TWorker() = default;
+ };
+
+ template<typename TTraits>
+ THolder<TWrapWorker> Create() const {
+ return MakeHolder<TWorker<TTraits>>();
+ };
+};
+
+int main() {
+ TPerfomancer tp;
+ auto worker = NSimd::SelectSimdTraits(tp);
+ return !worker->TuplesToBucket(false);
+} \ No newline at end of file
diff --git a/ydb/library/yql/utils/simd/exec/tuples_to_bucket/ya.make b/ydb/library/yql/utils/simd/exec/tuples_to_bucket/ya.make
new file mode 100644
index 0000000000..704a7b6c0a
--- /dev/null
+++ b/ydb/library/yql/utils/simd/exec/tuples_to_bucket/ya.make
@@ -0,0 +1,7 @@
+PROGRAM()
+
+SRCS(main.cpp)
+
+PEERDIR(ydb/library/yql/utils/simd)
+
+END() \ No newline at end of file
diff --git a/ydb/library/yql/utils/simd/exec/ya.make b/ydb/library/yql/utils/simd/exec/ya.make
index 5d18536908..c36eb91977 100644
--- a/ydb/library/yql/utils/simd/exec/ya.make
+++ b/ydb/library/yql/utils/simd/exec/ya.make
@@ -8,9 +8,14 @@ RUN(
pack_tuple
)
+RUN(
+ tuples_to_bucket
+)
+
DEPENDS(
ydb/library/yql/utils/simd/exec/stream_store
ydb/library/yql/utils/simd/exec/pack_tuple
+ ydb/library/yql/utils/simd/exec/tuples_to_bucket
)
PEERDIR(
@@ -21,5 +26,6 @@ END()
RECURSE(
pack_tuple
+ tuples_to_bucket
stream_store
) \ No newline at end of file
diff --git a/ydb/library/yql/utils/simd/simd.h b/ydb/library/yql/utils/simd/simd.h
index c55d3af3b8..5a8bcb6d6a 100644
--- a/ydb/library/yql/utils/simd/simd.h
+++ b/ydb/library/yql/utils/simd/simd.h
@@ -29,10 +29,8 @@ template<typename TFactory>
auto SelectSimdTraits(const TFactory& factory) {
if (NX86::HaveAVX2()) {
return factory.template Create<TSimdAVX2Traits>();
- } else if (NX86::HaveSSE42()) {
- return factory.template Create<TSimdSSE42Traits>();
} else {
- return factory.template Create<TSimdFallbackTraits>();
+ return factory.template Create<TSimdSSE42Traits>();
}
}