diff options
author | ivanmorozov <ivanmorozov@yandex-team.com> | 2022-09-18 11:16:38 +0300 |
---|---|---|
committer | ivanmorozov <ivanmorozov@yandex-team.com> | 2022-09-18 11:16:38 +0300 |
commit | 9f7e9f2f762bf8fb187d2066336faaf0629ae3e7 (patch) | |
tree | 66b252dcb8751bef16793ace17113c5427822ac7 | |
parent | de375b7e8ef31995ad79398a379c02d528ff6061 (diff) | |
download | ydb-9f7e9f2f762bf8fb187d2066336faaf0629ae3e7.tar.gz |
cost data request/response for column shard
-rw-r--r-- | ydb/core/kqp/compute_actor/CMakeLists.txt | 9 | ||||
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_compute_actor.cpp | 43 | ||||
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_compute_actor.h | 57 | ||||
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_compute_actor_helpers.cpp | 96 | ||||
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_compute_state.cpp | 100 | ||||
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_compute_state.h | 80 | ||||
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp | 687 | ||||
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp | 89 | ||||
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h | 79 | ||||
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_scan_compute_stat.cpp | 111 | ||||
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_scan_compute_stat.h | 63 | ||||
-rw-r--r-- | ydb/core/kqp/ut/kqp_olap_ut.cpp | 108 | ||||
-rw-r--r-- | ydb/core/protos/config.proto | 2 |
13 files changed, 1105 insertions, 419 deletions
diff --git a/ydb/core/kqp/compute_actor/CMakeLists.txt b/ydb/core/kqp/compute_actor/CMakeLists.txt index f32b1cf41d..1a2c969e6d 100644 --- a/ydb/core/kqp/compute_actor/CMakeLists.txt +++ b/ydb/core/kqp/compute_actor/CMakeLists.txt @@ -21,10 +21,19 @@ target_link_libraries(core-kqp-compute_actor PUBLIC core-tx-scheme_cache dq-actors-compute yql-public-issue + tools-enum_parser-enum_serialization_runtime ) target_sources(core-kqp-compute_actor PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/compute_actor/kqp_compute_actor_helpers.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/compute_actor/kqp_compute_state.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/compute_actor/kqp_scan_compute_stat.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp ) +generate_enum_serilization(core-kqp-compute_actor + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/compute_actor/kqp_compute_state.h + INCLUDE_HEADERS + ydb/core/kqp/compute_actor/kqp_compute_state.h +) diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp index 780f865e9e..c75b507035 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp @@ -40,7 +40,48 @@ TComputationNodeFactory GetKqpActorComputeFactory(TKqpScanComputeContext* comput return nullptr; }; } - } // namespace NMiniKQL + +namespace NKqp { + +void TShardsScanningPolicy::FillRequestScanFeatures(const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta, + ui32& maxInFlight, bool& isAggregationRequest) const { + const bool isSorted = (meta.HasSorted() ? meta.GetSorted() : true); + + isAggregationRequest = false; + maxInFlight = 1; + + NKikimrSSA::TProgram program; + bool hasNoGroupBy = false; + bool hasGroupByWithFields = false; + bool hasGroupByWithNoFields = false; + if (meta.HasOlapProgram()) { + Y_VERIFY(program.ParseFromString(meta.GetOlapProgram().GetProgram())); + for (auto&& command : program.GetCommand()) { + if (!command.HasGroupBy()) { + hasNoGroupBy = true; + continue; + } + if (command.GetGroupBy().GetKeyColumns().size()) { + hasGroupByWithFields = true; + } else { + hasGroupByWithNoFields = true; + } + } + } else { + hasNoGroupBy = true; + } + isAggregationRequest = hasGroupByWithFields || hasGroupByWithNoFields; + if (isSorted) { + maxInFlight = 1; + } else if (hasGroupByWithFields) { + maxInFlight = ProtoConfig.GetAggregationGroupByLimit(); + } else if (hasGroupByWithNoFields) { + maxInFlight = ProtoConfig.GetAggregationNoGroupLimit(); + } else { + maxInFlight = ProtoConfig.GetScanLimit(); + } +} +} } // namespace NKikimr diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.h b/ydb/core/kqp/compute_actor/kqp_compute_actor.h index cec92b2440..e031f74f3a 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor.h +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.h @@ -1,11 +1,11 @@ #pragma once - #include <ydb/core/kqp/counters/kqp_counters.h> #include <ydb/core/kqp/kqp_compute.h> #include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h> #include <ydb/core/scheme/scheme_tabledefs.h> namespace NKikimr { + namespace NMiniKQL { class TKqpScanComputeContext; @@ -26,7 +26,17 @@ public: } - ui32 GetMaxInFlightScans(const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta) const; + bool IsParallelScanningAvailable() const { + return ProtoConfig.GetParallelScanningAvailable(); + } + + bool GetShardSplitFactor() const { + return ProtoConfig.GetShardSplitFactor(); + } + + void FillRequestScanFeatures(const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta, + ui32& maxInFlight, bool& isAggregationRequest) const; + }; IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NYql::NDqProto::TDqTask&& task, @@ -41,48 +51,5 @@ IActor* CreateKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, cons const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits, const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId = {}); -namespace NComputeActor { - -enum class EShardState : ui32 { - Initial, - Starting, - Running, - PostRunning, //We already recieve all data, we has not processed it yet. - Resolving, -}; - -std::string_view EShardStateToString(EShardState state); - -bool FindSchemeErrorInIssues(const Ydb::StatusIds::StatusCode& status, const NYql::TIssues& issues); - -struct TShardState { - ui64 TabletId; - TSmallVec<TSerializedTableRange> Ranges; - EShardState State = EShardState::Initial; - ui32 Generation = 0; - bool SubscribedOnTablet = false; - ui32 RetryAttempt = 0; - ui32 TotalRetries = 0; - bool AllowInstantRetry = true; - TDuration LastRetryDelay; - TActorId RetryTimer; - ui32 ResolveAttempt = 0; - TActorId ActorId; - TOwnedCellVec LastKey; - - TString PrintLastKey(TConstArrayRef<NScheme::TTypeId> keyTypes) const; - - explicit TShardState(ui64 tabletId) - : TabletId(tabletId) {} - - TDuration CalcRetryDelay(); - void ResetRetry(); - - TString ToString(TConstArrayRef<NScheme::TTypeId> keyTypes) const; - const TSmallVec<TSerializedTableRange> GetScanRanges(TConstArrayRef<NScheme::TTypeId> keyTypes) const; -}; - -} // namespace NComputeActor - } // namespace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor_helpers.cpp b/ydb/core/kqp/compute_actor/kqp_compute_actor_helpers.cpp index 5eee256a53..51e854da17 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor_helpers.cpp +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor_helpers.cpp @@ -6,102 +6,6 @@ namespace NKikimr::NKqp::NComputeActor { -static constexpr TDuration MIN_RETRY_DELAY = TDuration::MilliSeconds(250); -static constexpr TDuration MAX_RETRY_DELAY = TDuration::Seconds(2); - -std::string_view EShardStateToString(EShardState state) { - switch (state) { - case EShardState::Initial: return "Initial"sv; - case EShardState::Starting: return "Starting"sv; - case EShardState::Running: return "Running"sv; - case EShardState::Resolving: return "Resolving"sv; - case EShardState::PostRunning: return "PostRunning"sv; - } -} - -void TShardState::ResetRetry() { - RetryAttempt = 0; - AllowInstantRetry = true; - LastRetryDelay = {}; - if (RetryTimer) { - TlsActivationContext->Send(new IEventHandle(RetryTimer, RetryTimer, new TEvents::TEvPoison)); - RetryTimer = {}; - } - ResolveAttempt = 0; -} - -TString TShardState::PrintLastKey(TConstArrayRef<NScheme::TTypeId> keyTypes) const { - if (LastKey.empty()) { - return "<none>"; - } - return DebugPrintPoint(keyTypes, LastKey, *AppData()->TypeRegistry); -} - -TDuration TShardState::CalcRetryDelay() { - if (std::exchange(AllowInstantRetry, false)) { - return TDuration::Zero(); - } - - if (LastRetryDelay) { - LastRetryDelay = Min(LastRetryDelay * 2, MAX_RETRY_DELAY); - } else { - LastRetryDelay = MIN_RETRY_DELAY; - } - return LastRetryDelay; -} - -TString TShardState::ToString(TConstArrayRef<NScheme::TTypeId> keyTypes) const { - TStringBuilder sb; - sb << "TShardState{ TabletId: " << TabletId << ", State: " << EShardStateToString(State) - << ", Gen: " << Generation << ", Last Key " << TShardState::PrintLastKey(keyTypes) - << ", Ranges: ["; - for (size_t i = 0; i < Ranges.size(); ++i) { - sb << "#" << i << ": " << DebugPrintRange(keyTypes, Ranges[i].ToTableRange(), *AppData()->TypeRegistry); - if (i + 1 != Ranges.size()) { - sb << ", "; - } - } - sb << "], " - << ", RetryAttempt: " << RetryAttempt << ", TotalRetries: " << TotalRetries - << ", ResolveAttempt: " << ResolveAttempt << ", ActorId: " << ActorId << " }"; - return sb; -} - -const TSmallVec<TSerializedTableRange> TShardState::GetScanRanges(TConstArrayRef<NScheme::TTypeId> keyTypes) const { - // No any data read previously, return all ranges - if (!LastKey.DataSize()) { - return Ranges; - } - - // Form new vector. Skip ranges already read. - TVector<TSerializedTableRange> ranges; - ranges.reserve(Ranges.size()); - - YQL_ENSURE(keyTypes.size() == LastKey.size(), "Key columns size != last key"); - - for (auto rangeIt = Ranges.begin(); rangeIt != Ranges.end(); ++rangeIt) { - int cmp = ComparePointAndRange(LastKey, rangeIt->ToTableRange(), keyTypes, keyTypes); - - YQL_ENSURE(cmp >= 0, "Missed intersection of LastKey and range."); - - if (cmp > 0) { - continue; - } - - // It is range, where read was interrupted. Restart operation from last read key. - ranges.emplace_back(std::move(TSerializedTableRange( - TSerializedCellVec::Serialize(LastKey), rangeIt->To.GetBuffer(), false, rangeIt->ToInclusive - ))); - - // And push all others - ranges.insert(ranges.end(), ++rangeIt, Ranges.end()); - break; - } - - return ranges; -} - - bool FindSchemeErrorInIssues(const Ydb::StatusIds::StatusCode& status, const NYql::TIssues& issues) { bool schemeError = false; if (status == Ydb::StatusIds::SCHEME_ERROR) { diff --git a/ydb/core/kqp/compute_actor/kqp_compute_state.cpp b/ydb/core/kqp/compute_actor/kqp_compute_state.cpp new file mode 100644 index 0000000000..82d98eb152 --- /dev/null +++ b/ydb/core/kqp/compute_actor/kqp_compute_state.cpp @@ -0,0 +1,100 @@ +#include "kqp_compute_state.h" + +#include <ydb/core/base/appdata.h> +#include <ydb/core/kqp/runtime/kqp_compute.h> +#include <ydb/core/kqp/runtime/kqp_read_table.h> +#include <ydb/core/tx/datashard/range_ops.h> + +namespace NKikimr::NKqp::NComputeActor { + +static constexpr TDuration MIN_RETRY_DELAY = TDuration::MilliSeconds(250); +static constexpr TDuration MAX_RETRY_DELAY = TDuration::Seconds(2); + +void TCommonRetriesState::ResetRetry() { + RetryAttempt = 0; + AllowInstantRetry = true; + LastRetryDelay = {}; + if (RetryTimer) { + TlsActivationContext->Send(new IEventHandle(RetryTimer, RetryTimer, new TEvents::TEvPoison)); + RetryTimer = {}; + } + ResolveAttempt = 0; +} + +TString TShardState::PrintLastKey(TConstArrayRef<NScheme::TTypeId> keyTypes) const { + if (LastKey.empty()) { + return "<none>"; + } + return DebugPrintPoint(keyTypes, LastKey, *AppData()->TypeRegistry); +} + +TDuration TCommonRetriesState::CalcRetryDelay() { + if (std::exchange(AllowInstantRetry, false)) { + return TDuration::Zero(); + } + + if (LastRetryDelay) { + LastRetryDelay = Min(LastRetryDelay * 2, MAX_RETRY_DELAY); + } else { + LastRetryDelay = MIN_RETRY_DELAY; + } + return LastRetryDelay; +} + +TString TShardState::ToString(TConstArrayRef<NScheme::TTypeId> keyTypes) const { + TStringBuilder sb; + sb << "TShardState{ TabletId: " << TabletId << ", State: " << State + << ", Gen: " << Generation << ", Last Key " << TShardState::PrintLastKey(keyTypes) + << ", Ranges: ["; + for (size_t i = 0; i < Ranges.size(); ++i) { + sb << "#" << i << ": " << DebugPrintRange(keyTypes, Ranges[i].ToTableRange(), *AppData()->TypeRegistry); + if (i + 1 != Ranges.size()) { + sb << ", "; + } + } + sb << "], " + << ", RetryAttempt: " << RetryAttempt << ", TotalRetries: " << TotalRetries + << ", ResolveAttempt: " << ResolveAttempt << ", ActorId: " << ActorId << " }"; + return sb; +} + +const TSmallVec<TSerializedTableRange> TShardState::GetScanRanges(TConstArrayRef<NScheme::TTypeId> keyTypes) const { + // No any data read previously, return all ranges + if (!LastKey.DataSize()) { + return Ranges; + } + + // Form new vector. Skip ranges already read. + TVector<TSerializedTableRange> ranges; + ranges.reserve(Ranges.size()); + + YQL_ENSURE(keyTypes.size() == LastKey.size(), "Key columns size != last key"); + + for (auto rangeIt = Ranges.begin(); rangeIt != Ranges.end(); ++rangeIt) { + int cmp = ComparePointAndRange(LastKey, rangeIt->ToTableRange(), keyTypes, keyTypes); + + YQL_ENSURE(cmp >= 0, "Missed intersection of LastKey and range."); + + if (cmp > 0) { + continue; + } + + // It is range, where read was interrupted. Restart operation from last read key. + ranges.emplace_back(std::move(TSerializedTableRange( + TSerializedCellVec::Serialize(LastKey), rangeIt->To.GetBuffer(), false, rangeIt->ToInclusive + ))); + + // And push all others + ranges.insert(ranges.end(), ++rangeIt, Ranges.end()); + break; + } + + return ranges; +} + +TString TShardState::GetAddress() const { + TStringBuilder sb; + sb << TabletId << "::" << ScannerIdx; + return sb; +} +}
\ No newline at end of file diff --git a/ydb/core/kqp/compute_actor/kqp_compute_state.h b/ydb/core/kqp/compute_actor/kqp_compute_state.h new file mode 100644 index 0000000000..3c87df2e37 --- /dev/null +++ b/ydb/core/kqp/compute_actor/kqp_compute_state.h @@ -0,0 +1,80 @@ +#pragma once + +#include <ydb/core/kqp/counters/kqp_counters.h> +#include <ydb/core/kqp/kqp_compute.h> +#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h> +#include <ydb/core/scheme/scheme_tabledefs.h> + +namespace NKikimr::NKqp::NComputeActor { + +enum class EShardState: ui32 { + Initial, + Starting, + Running, + PostRunning, //We already receive all data, we has not processed it yet. + Resolving, +}; + +bool FindSchemeErrorInIssues(const Ydb::StatusIds::StatusCode& status, const NYql::TIssues& issues); + +class TCommonRetriesState { +public: + ui32 RetryAttempt = 0; + ui32 TotalRetries = 0; + bool AllowInstantRetry = true; + TDuration LastRetryDelay; + TActorId RetryTimer; + ui32 ResolveAttempt = 0; + TDuration CalcRetryDelay(); + void ResetRetry(); +}; + +class TShardCostsState: public TCommonRetriesState { +private: + const ui32 ScanId; + const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::TReadOpMeta* ReadData; +public: + using TPtr = std::shared_ptr<TShardCostsState>; + const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::TReadOpMeta& GetReadData() const { + return *ReadData; + } + + ui32 GetScanId() const { + return ScanId; + } + + ui64 GetShardId() const { + return ReadData->GetShardId(); + } + + TShardCostsState(const ui32 scanId, const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::TReadOpMeta* readData) + : ScanId(scanId) + , ReadData(readData) + { + + } +}; + +struct TShardState: public TCommonRetriesState { + using TPtr = std::shared_ptr<TShardState>; + const ui64 TabletId; + const ui32 ScannerIdx = 0; + TSmallVec<TSerializedTableRange> Ranges; + EShardState State = EShardState::Initial; + ui32 Generation = 0; + bool SubscribedOnTablet = false; + TActorId ActorId; + TOwnedCellVec LastKey; + + TString PrintLastKey(TConstArrayRef<NScheme::TTypeId> keyTypes) const; + + TShardState(const ui64 tabletId, const ui32 scanIdx) + : TabletId(tabletId) + , ScannerIdx(scanIdx) { + } + + TString ToString(TConstArrayRef<NScheme::TTypeId> keyTypes) const; + const TSmallVec<TSerializedTableRange> GetScanRanges(TConstArrayRef<NScheme::TTypeId> keyTypes) const; + TString GetAddress() const; +}; +} diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp index c1e3587691..69c1ada9b9 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp @@ -1,5 +1,7 @@ #include "kqp_compute_actor.h" #include "kqp_compute_actor_impl.h" +#include "kqp_compute_state.h" +#include "kqp_scan_compute_manager.h" #include <ydb/core/actorlib_impl/long_timer.h> #include <ydb/core/base/tablet_pipecache.h> @@ -20,8 +22,7 @@ #include <util/generic/deque.h> -namespace NKikimr { -namespace NKqp { +namespace NKikimr::NKqp { namespace { @@ -39,18 +40,16 @@ static constexpr ui64 MAX_TOTAL_SHARD_RETRIES = 20; static constexpr ui64 MAX_SHARD_RESOLVES = 3; static constexpr TDuration RL_MAX_BATCH_DELAY = TDuration::Seconds(50); - struct TScannedDataStats { std::map<ui64, std::pair<ui64, ui64>> ReadShardInfo; ui64 CompletedShards = 0; ui64 TotalReadRows = 0; ui64 TotalReadBytes = 0; - TScannedDataStats() - {} + TScannedDataStats() = default; - void AddReadStat(ui64 tabletId, ui64 rows, ui64 bytes) { - auto [it, success] = ReadShardInfo.emplace(tabletId, std::make_pair(rows, bytes)); + void AddReadStat(const ui32 scannerIdx, const ui64 rows, const ui64 bytes) { + auto [it, success] = ReadShardInfo.emplace(scannerIdx, std::make_pair(rows, bytes)); if (!success) { auto& [currentRows, currentBytes] = it->second; currentRows += rows; @@ -58,8 +57,8 @@ struct TScannedDataStats { } } - void CompleteShard(ui64 tabletId) { - auto it = ReadShardInfo.find(tabletId); + void CompleteShard(TShardState::TPtr state) { + auto it = ReadShardInfo.find(state->ScannerIdx); YQL_ENSURE(it != ReadShardInfo.end()); auto& [currentRows, currentBytes] = it->second; TotalReadRows += currentRows; @@ -77,8 +76,7 @@ struct TScannedDataStats { } }; - -class TKqpScanComputeActor : public TDqComputeActorBase<TKqpScanComputeActor> { +class TKqpScanComputeActor: public TDqComputeActorBase<TKqpScanComputeActor> { using TBase = TDqComputeActorBase<TKqpScanComputeActor>; struct TEvPrivate { @@ -86,12 +84,25 @@ class TKqpScanComputeActor : public TDqComputeActorBase<TKqpScanComputeActor> { EvRetryShard = EventSpaceBegin(TKikimrEvents::ES_PRIVATE), }; - struct TEvRetryShard : public TEventLocal<TEvRetryShard, EvRetryShard> { - ui64 TabletId; + struct TEvRetryShard: public TEventLocal<TEvRetryShard, EvRetryShard> { + private: + explicit TEvRetryShard(const ui64 tabletId) + : TabletId(tabletId) + , IsCostsRequest(true) { + } + public: + ui64 TabletId = 0; + ui32 Generation = 0; + bool IsCostsRequest = false; - TEvRetryShard(ui64 tabletId) + static THolder<TEvRetryShard> CostsProblem(const ui64 tabletId) { + return THolder<TEvRetryShard>(new TEvRetryShard(tabletId)); + } + + TEvRetryShard(const ui64 tabletId, const ui32 generation) : TabletId(tabletId) - {} + , Generation(generation) { + } }; }; @@ -110,8 +121,7 @@ public: , ComputeCtx(settings.StatsMode) , Snapshot(snapshot) , ShardsScanningPolicy(shardsScanningPolicy) - , Counters(counters) - { + , Counters(counters) { YQL_ENSURE(GetTask().GetMeta().UnpackTo(&Meta), "Invalid task meta: " << GetTask().GetMeta().DebugString()); YQL_ENSURE(!Meta.GetReads().empty()); YQL_ENSURE(Meta.GetTable().GetTableKind() != (ui32)ETableKind::SysView); @@ -120,6 +130,7 @@ public: } void DoBootstrap() { + CA_LOG_D("EVLOGKQP START"); NDq::TDqTaskRunnerContext execCtx; execCtx.FuncRegistry = AppData()->FunctionRegistry; execCtx.ComputeCtx = &ComputeCtx; @@ -158,31 +169,27 @@ public: ScanData->TaskId = GetTask().GetId(); ScanData->TableReader = CreateKqpTableReader(*ScanData); - - for (const auto& read : Meta.GetReads()) { - auto& state = PendingShards.emplace_back(TShardState(read.GetShardId())); - state.Ranges.reserve(read.GetKeyRanges().size()); - for (const auto& range : read.GetKeyRanges()) { - auto& sr = state.Ranges.emplace_back(TSerializedTableRange(range)); - if (!range.HasTo()) { - sr.To = sr.From; - sr.FromInclusive = sr.ToInclusive = true; - } + ShardsScanningPolicy.FillRequestScanFeatures(Meta, MaxInFlight, IsAggregationRequest); + if (!Meta.HasOlapProgram() || !ShardsScanningPolicy.IsParallelScanningAvailable()) { + for (const auto& read : Meta.GetReads()) { + auto& state = PendingShards.emplace_back(TShardState(read.GetShardId(), ++ScansCounter)); + state.Ranges = BuildSerializedTableRanges(read); + } + StartTableScan(); + ContinueExecute(); + } else { + for (const auto& read : Meta.GetReads()) { + StartCostsRequest(read); } - - Y_VERIFY_DEBUG(!state.Ranges.empty()); } - - StartTableScan(); - - ContinueExecute(); Become(&TKqpScanComputeActor::StateFunc); } STFUNC(StateFunc) { try { switch (ev->GetTypeRewrite()) { - hFunc(TEvKqpCompute::TEvScanInitActor, HandleExecute); + hFunc(TEvKqpCompute::TEvCostData, HandleExecute) + hFunc(TEvKqpCompute::TEvScanInitActor, HandleExecute); hFunc(TEvKqpCompute::TEvScanData, HandleExecute); hFunc(TEvKqpCompute::TEvScanError, HandleExecute); hFunc(TEvPipeCache::TEvDeliveryProblem, HandleExecute); @@ -198,7 +205,7 @@ public: } catch (const TMemoryLimitExceededException& e) { InternalError(NYql::NDqProto::StatusIds::PRECONDITION_FAILED, TIssuesIds::KIKIMR_PRECONDITION_FAILED, TStringBuilder() << "Mkql memory limit exceeded, limit: " << GetMkqlMemoryLimit() - << ", host: " << HostName() << ", canAllocateExtraMemory: " << CanAllocateExtraMemory); + << ", host: " << HostName() << ", canAllocateExtraMemory: " << CanAllocateExtraMemory); } catch (const yexception& e) { InternalError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TIssuesIds::DEFAULT_ERROR, e.what()); } @@ -265,27 +272,154 @@ protected: } private: + TVector<TSerializedTableRange> BuildSerializedTableRanges(const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::TReadOpMeta& readData) { + TVector<TSerializedTableRange> resultLocal; + resultLocal.reserve(readData.GetKeyRanges().size()); + for (const auto& range : readData.GetKeyRanges()) { + auto& sr = resultLocal.emplace_back(TSerializedTableRange(range)); + if (!range.HasTo()) { + sr.To = sr.From; + sr.FromInclusive = sr.ToInclusive = true; + } + } + Y_VERIFY_DEBUG(!resultLocal.empty()); + return resultLocal; + } + + THolder<TEvDataShard::TEvKqpScan> BuildEvKqpScan(const ui32 scanId, const ui32 gen, const TSmallVec<TSerializedTableRange>& ranges) const { + auto ev = MakeHolder<TEvDataShard::TEvKqpScan>(); + ev->Record.SetLocalPathId(ScanData->TableId.PathId.LocalPathId); + for (auto& column : ScanData->GetColumns()) { + ev->Record.AddColumnTags(column.Tag); + ev->Record.AddColumnTypes(column.Type); + } + ev->Record.MutableSkipNullKeys()->CopyFrom(Meta.GetSkipNullKeys()); + + auto protoRanges = ev->Record.MutableRanges(); + protoRanges->Reserve(ranges.size()); + + for (auto& range : ranges) { + auto newRange = protoRanges->Add(); + range.Serialize(*newRange); + } + + ev->Record.MutableSnapshot()->CopyFrom(Snapshot); + if (RuntimeSettings.Timeout) { + ev->Record.SetTimeoutMs(RuntimeSettings.Timeout.Get()->MilliSeconds()); + } + ev->Record.SetStatsMode(RuntimeSettings.StatsMode); + ev->Record.SetScanId(scanId); + ev->Record.SetTxId(std::get<ui64>(TxId)); + ev->Record.SetTablePath(ScanData->TablePath); + ev->Record.SetSchemaVersion(ScanData->TableId.SchemaVersion); + + ev->Record.SetGeneration(gen); + + ev->Record.SetReverse(Meta.GetReverse()); + ev->Record.SetItemsLimit(Meta.GetItemsLimit()); + + if (Meta.HasOlapProgram()) { + TString programBytes; + TStringOutput stream(programBytes); + Meta.GetOlapProgram().SerializeToArcadiaStream(&stream); + ev->Record.SetOlapProgram(programBytes); + ev->Record.SetOlapProgramType( + NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM_WITH_PARAMETERS + ); + } + + ev->Record.SetDataFormat(Meta.GetDataFormat()); + return ev; + } + void ProcessRlNoResourceAndDie() { const NYql::TIssue issue = MakeIssue(NKikimrIssues::TIssuesIds::YDB_RESOURCE_USAGE_LIMITED, "Throughput limit exceeded for query"); - CA_LOG_E("Throughput limit exceeded, we got " - << PendingScanData.size() << " pending messages," - << " stream will be terminated"); + CA_LOG_E("Throughput limit exceeded, we got " << PendingScanData.size() << " pending messages," + << " stream will be terminated"); State = NDqProto::COMPUTE_STATE_FAILURE; - ReportStateAndMaybeDie(NYql::NDqProto::StatusIds::OVERLOADED, TIssues({issue})); + ReportStateAndMaybeDie(NYql::NDqProto::StatusIds::OVERLOADED, TIssues({ issue })); + } + + void RetryCostsRequest(TShardCostsState::TPtr state) { + const ui32 att = state->TotalRetries++; + Counters->ScanQueryShardDisconnect->Inc(); + + if (att > MAX_SHARD_RETRIES) { + CA_LOG_E("TKqpScanComputeActor: broken pipe with tablet " << state->GetReadData().GetShardId() + << ", retries limit exceeded (" << att << ") on costs request"); + return InternalError(NDqProto::StatusIds::UNAVAILABLE, TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE, + TStringBuilder() << "Retries limit with costs requests for shard " << state->GetReadData().GetShardId() << " exceeded."); + } + + auto retryDelay = state->CalcRetryDelay(); + CA_LOG_W("TKqpScanComputeActor: broken pipe with tablet " << state->GetShardId() + << ", restarting costs request" + << ", attempt #" << att + << " schedule after " << retryDelay); + + state->RetryTimer = CreateLongTimer(TlsActivationContext->AsActorContext(), retryDelay, + new IEventHandle(SelfId(), SelfId(), TEvPrivate::TEvRetryShard::CostsProblem(state->GetShardId()).Release())); + } + + void StartCostsRequest(TShardCostsState::TPtr state) { + TSmallVec<TSerializedTableRange> serializedTableRanges = BuildSerializedTableRanges(state->GetReadData()); + THolder<TEvDataShard::TEvKqpScan> ev = BuildEvKqpScan(state->GetScanId(), 0, serializedTableRanges); + ev->Record.SetCostDataOnly(true); + THolder<TEvPipeCache::TEvForward> evForward = MakeHolder<TEvPipeCache::TEvForward>(ev.Release(), state->GetShardId()); + Send(MakePipePeNodeCacheID(false), evForward.Release(), IEventHandle::FlagTrackDelivery); + } + + void StartCostsRequest(const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::TReadOpMeta& read) { + const ui32 scanId = CostRequestsByScanId.size() + 1; + auto costsState = std::make_shared<TShardCostsState>(scanId, &read); + Y_VERIFY(CostRequestsByScanId.emplace(scanId, costsState).second); + Y_VERIFY(CostRequestsByShardId.emplace(costsState->GetShardId(), costsState).second); + StartCostsRequest(costsState); + } + + bool ReceiveCostRequest(TEvKqpCompute::TEvCostData::TPtr& ev, const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::TReadOpMeta*& readData, + TSmallVec<TSerializedTableRange>& result, const bool splitFactor) { + auto it = CostRequestsByScanId.find(ev->Get()->GetScanId()); + Y_VERIFY(it != CostRequestsByScanId.end(), "incorrect generation from cost data event: %u", ev->Get()->GetScanId()); + readData = &it->second->GetReadData(); + if (!ev->Get()->GetTableRanges().ColumnsCount()) { + result = BuildSerializedTableRanges(*readData); + } else { + result = ev->Get()->GetSerializedTableRanges(splitFactor); + } + if (Meta.GetReverse()) { + std::reverse(result.begin(), result.end()); + } + CostRequestsByShardId.erase(it->second->GetShardId()); + CostRequestsByScanId.erase(it); + return true; + } + + void HandleExecute(TEvKqpCompute::TEvCostData::TPtr& ev) { + const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::TReadOpMeta* read = nullptr; + TSmallVec<TSerializedTableRange> ranges; + const double shardSplitFactor = ShardsScanningPolicy.GetShardSplitFactor(); + ReceiveCostRequest(ev, read, ranges, shardSplitFactor); + for (auto&& i : ranges) { + auto& state = PendingShards.emplace_back(TShardState(read->GetShardId(), ++ScansCounter)); + state.Ranges.emplace_back(i); + } + StartTableScan(); + ContinueExecute(); } void HandleExecute(TEvKqpCompute::TEvScanInitActor::TPtr& ev) { YQL_ENSURE(ScanData); auto& msg = ev->Get()->Record; auto scanActorId = ActorIdFromProto(msg.GetScanActorId()); - auto* state = GetShardState(msg, scanActorId); + auto state = GetShardState(msg, scanActorId); if (!state) return; CA_LOG_D("Got EvScanInitActor from " << scanActorId << ", gen: " << msg.GetGeneration() - << ", state: " << EShardStateToString(state->State) << ", stateGen: " << state->Generation + << ", state: " << state->State << ", stateGen: " << state->Generation << ", tabletId: " << state->TabletId); YQL_ENSURE(state->Generation == msg.GetGeneration()); @@ -295,6 +429,7 @@ private: state->ActorId = scanActorId; state->ResetRetry(); AffectedShards.insert(state->TabletId); + InFlightShards.NeedAck(state); SendScanDataAck(state); } else { TerminateExpiredScan(scanActorId, "Got unexpected/expired EvScanInitActor, terminate it"); @@ -304,10 +439,10 @@ private: void HandleExecute(TEvKqpCompute::TEvScanData::TPtr& ev) { YQL_ENSURE(ScanData); auto& msg = *ev->Get(); - auto* state = GetShardState(msg, ev->Sender); - if (!state) + auto state = GetShardState(msg, ev->Sender); + if (!state) { return; - + } YQL_ENSURE(state->Generation == msg.Generation); if (state->State != EShardState::Running) { return TerminateExpiredScan(ev->Sender, "Cancel expired scan"); @@ -327,9 +462,23 @@ private: } } + void StopFinally() { + std::vector<TShardState::TPtr> currentScans; + for (auto&& i : InFlightShards) { + for (auto&& s : i.second) { + currentScans.emplace_back(s.second); + } + } + for (auto&& i : currentScans) { + StopReadChunk(*i); + } + PendingShards.clear(); + } + void ProcessPendingScanDataItem(TEvKqpCompute::TEvScanData::TPtr& ev, const TInstant& enqueuedAt) { auto& msg = *ev->Get(); - auto* state = GetShardState(msg, ev->Sender); + + auto state = GetShardState(msg, ev->Sender); if (!state) return; @@ -342,56 +491,65 @@ private: YQL_ENSURE(state->ActorId == ev->Sender, "expected: " << state->ActorId << ", got: " << ev->Sender); state->LastKey = std::move(msg.LastKey); - ui64 bytes = 0; ui64 rowsCount = 0; + ui64 bytes = 0; { auto guard = TaskRunner->BindAllocator(); switch (msg.GetDataFormat()) { case NKikimrTxDataShard::EScanDataFormat::CELLVEC: - case NKikimrTxDataShard::EScanDataFormat::UNSPECIFIED: { + case NKikimrTxDataShard::EScanDataFormat::UNSPECIFIED: if (!msg.Rows.empty()) { - bytes = ScanData->AddRows(msg.Rows, state->TabletId, TaskRunner->GetHolderFactory()); - rowsCount = msg.Rows.size(); + bytes += ScanData->AddRows(msg.Rows, state->TabletId, TaskRunner->GetHolderFactory()); + rowsCount += msg.Rows.size(); } break; - } - case NKikimrTxDataShard::EScanDataFormat::ARROW: { - if (msg.ArrowBatch != nullptr) { - bytes = ScanData->AddRows(*msg.ArrowBatch, state->TabletId, TaskRunner->GetHolderFactory()); - rowsCount = msg.ArrowBatch->num_rows(); + case NKikimrTxDataShard::EScanDataFormat::ARROW: + if (!!msg.ArrowBatch) { + bytes += ScanData->AddRows(*msg.ArrowBatch, state->TabletId, TaskRunner->GetHolderFactory()); + rowsCount += msg.ArrowBatch->num_rows(); } break; - } } } + InFlightShards.MutableStatistics(state->TabletId).AddPack(rowsCount, bytes); - Stats.AddReadStat(state->TabletId, rowsCount, bytes); - - CA_LOG_D("Got EvScanData, rows: " << rowsCount << ", bytes: " << bytes << ", finished: " << msg.Finished - << ", from: " << ev->Sender << ", shards remain: " << PendingShards.size() - << ", in flight shards " << InFlightShards.size() - << ", delayed for: " << latency.SecondsFloat() << " seconds by ratelimiter" - << ", tabletId: " << state->TabletId); - - if (rowsCount == 0 && !msg.Finished && state->State != EShardState::PostRunning) { - SendScanDataAck(state); - } + Stats.AddReadStat(state->ScannerIdx, rowsCount, bytes); - if (msg.Finished) { - CA_LOG_D("Tablet " << state->TabletId << " scan finished, unlink"); - Stats.CompleteShard(state->TabletId); - StopReadFromTablet(state); + CA_LOG_D("Got EvScanData, rows: " << rowsCount << "bytes: " << bytes << ", finished: " << msg.Finished + << ", from: " << ev->Sender << ", shards remain: " << PendingShards.size() + << ", in flight scans " << InFlightShards.GetScansCount() + << ", in flight shards " << InFlightShards.GetShardsCount() + << ", delayed for: " << latency.SecondsFloat() << " seconds by ratelimiter" + << ", tabletId: " << state->TabletId); + bool stopFinally = false; + if (!IsAggregationRequest && Meta.HasItemsLimit() && Meta.GetItemsLimit() && Stats.TotalReadRows >= Meta.GetItemsLimit()) { + StopFinally(); + stopFinally = true; + } else if (!msg.Finished) { + InFlightShards.NeedAck(state); + } else { + CA_LOG_D("Chunk " << state->TabletId << "/" << state->ScannerIdx << " scan finished"); + Stats.CompleteShard(state); + StopReadChunk(*state); + CA_LOG_T("TRACE:" << InFlightShards.TraceToString() << ":" << CalculateFreeSpace()); if (!StartTableScan()) { - CA_LOG_D("Finish scans"); - ScanData->Finish(); - - if (ScanData->BasicStats) { - ScanData->BasicStats->AffectedShards = AffectedShards.size(); - } + stopFinally = true; + } + } + if (stopFinally) { + ScanData->Finish(); + CA_LOG_D("EVLOGKQP(scans_count:" << ScansCounter << ";max_in_flight:" << MaxInFlight << ")" + << Endl << InFlightShards.GetDurationStats() + << Endl << InFlightShards.StatisticsToString() + ); + if (ScanData->BasicStats) { + ScanData->BasicStats->AffectedShards = AffectedShards.size(); } } + CA_LOG_T("TRACE:" << InFlightShards.TraceToString() << ":" << CalculateFreeSpace() << ":" << rowsCount); + if (Y_UNLIKELY(ScanData->ProfileStats)) { ScanData->ProfileStats->Messages++; ScanData->ProfileStats->ScanCpuTime += msg.CpuTime; @@ -412,7 +570,7 @@ private: PendingScanData.pop_front(); auto& msg = *ev->Get(); - auto* state = GetShardState(msg, ev->Sender); + auto state = GetShardState(msg, ev->Sender); if (!state) return; @@ -432,11 +590,11 @@ private: TIssues issues; IssuesFromMessage(msg.GetIssues(), issues); - auto* state = GetShardState(msg, TActorId()); + auto state = GetShardState(msg, TActorId()); if (!state) return; - CA_LOG_W("Got EvScanError scan state: " << EShardStateToString(state->State) + CA_LOG_W("Got EvScanError scan state: " << state->State << ", status: " << Ydb::StatusIds_StatusCode_Name(status) << ", reason: " << issues.ToString() << ", tablet id: " << state->TabletId); @@ -465,29 +623,45 @@ private: YQL_ENSURE(ScanData); auto& msg = *ev->Get(); - auto stateIt = InFlightShards.find(msg.TabletId); - if (stateIt == InFlightShards.end()) { + auto it = CostRequestsByShardId.find(msg.TabletId); + if (it != CostRequestsByShardId.end()) { + RetryCostsRequest(it->second); + return; + } + + auto* states = InFlightShards.MutableByTabletId(msg.TabletId); + if (!states) { CA_LOG_E("Broken pipe with unknown tablet " << msg.TabletId); return; } - auto* state = &(stateIt->second); - CA_LOG_W("Got EvDeliveryProblem, TabletId: " << msg.TabletId << ", NotDelivered: " << msg.NotDelivered << ", " << EShardStateToString(state->State)); - if (state->State == EShardState::Starting || state->State == EShardState::Running) { - return RetryDeliveryProblem(state); + for (auto& [_, state] : *states) { + const auto shardState = state->State; + CA_LOG_W("Got EvDeliveryProblem, TabletId: " << msg.TabletId << ", NotDelivered: " << msg.NotDelivered << ", " << shardState); + if (state->State == EShardState::Starting || state->State == EShardState::Running) { + return RetryDeliveryProblem(state); + } } } void HandleExecute(TEvPrivate::TEvRetryShard::TPtr& ev) { - ui64 tabletId = ev->Get()->TabletId; - auto stateIt = InFlightShards.find(tabletId); - if (stateIt == InFlightShards.end()) { - CA_LOG_E("Received retry shard for unexpected tablet " << tabletId); - return; - } + if (ev->Get()->IsCostsRequest) { + auto it = CostRequestsByShardId.find(ev->Get()->TabletId); + if (it == CostRequestsByShardId.end()) { + CA_LOG_E("Received retry shard costs for unexpected tablet " << ev->Get()->TabletId); + return; + } + StartCostsRequest(it->second); + } else { + const ui32 scannerIdx = InFlightShards.GetIndexByGeneration(ev->Get()->Generation); + auto state = InFlightShards.GetStateByIndex(scannerIdx); + if (!state) { + CA_LOG_E("Received retry shard for unexpected tablet " << ev->Get()->TabletId << " / " << ev->Get()->Generation); + return; + } - auto* state = &(stateIt->second); - SendStartScanRequest(state, state->Generation); + SendStartScanRequest(state, state->Generation); + } } void HandleExecute(TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr& ev) { @@ -497,7 +671,7 @@ private: PendingResolveShards.pop_front(); ResolveNextShard(); - StopReadFromTablet(&state); + Y_VERIFY(!InFlightShards.GetStateByIndex(state.ScannerIdx)); YQL_ENSURE(state.State == EShardState::Resolving); CA_LOG_D("Received TEvResolveKeySetResult update for table '" << ScanData->TablePath << "'"); @@ -511,7 +685,7 @@ private: TString error; for (const auto& x : request->ResultSet) { - if ((ui32)x.Status < (ui32) NSchemeCache::TSchemeCacheRequest::EStatus::OkScheme) { + if ((ui32)x.Status < (ui32)NSchemeCache::TSchemeCacheRequest::EStatus::OkScheme) { // invalidate table Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(ScanData->TableId, {})); @@ -571,7 +745,7 @@ private: << ", partition range: " << DebugPrintRange(KeyColumnTypes, partitionRange, tr) << ", i: " << i << ", state ranges: " << state.Ranges.size()); - auto newShard = TShardState(partition.ShardId); + auto newShard = TShardState(partition.ShardId, ++ScansCounter); for (ui64 j = i; j < state.Ranges.size(); ++j) { CA_LOG_D("Intersect state range #" << j << " " << DebugPrintRange(KeyColumnTypes, state.Ranges[j].ToTableRange(), tr) @@ -609,8 +783,7 @@ private: } if (IsDebugLogEnabled(TlsActivationContext->ActorSystem(), NKikimrServices::KQP_COMPUTE) - && PendingShards.size() + InFlightShards.size() > 0) - { + && PendingShards.size() + InFlightShards.GetScansCount() > 0) { TStringBuilder sb; if (!PendingShards.empty()) { sb << "Pending shards States: "; @@ -621,37 +794,40 @@ private: if (!InFlightShards.empty()) { sb << "In Flight shards States: "; - for(auto& [_, st] : InFlightShards) { - sb << st.ToString(KeyColumnTypes) << "; "; + for (auto& [_, st] : InFlightShards) { + for (auto&& [_, i] : st) { + sb << i->ToString(KeyColumnTypes) << "; "; + } } } CA_LOG_D(sb); } - StartTableScan(); } void HandleExecute(TEvents::TEvUndelivered::TPtr& ev) { switch (ev->Get()->SourceType) { case TEvDataShard::TEvKqpScan::EventType: - // handled by TEvPipeCache::TEvDeliveryProblem event + // Handled by TEvPipeCache::TEvDeliveryProblem event. + // CostData request is KqpScan request too. return; case TEvKqpCompute::TEvScanDataAck::EventType: ui64 tabletId = ev->Cookie; - auto it = InFlightShards.find(tabletId); - if (it == InFlightShards.end()) { + const auto& shards = InFlightShards.GetByTabletId(tabletId); + if (shards.empty()) { CA_LOG_D("Skip lost TEvScanDataAck to " << ev->Sender << ", " << tabletId); return; } - auto& shard = it->second; - if (shard.State == EShardState::Running && ev->Sender == shard.ActorId) { - CA_LOG_E("TEvScanDataAck lost while running scan, terminate execution. DataShard actor: " - << shard.ActorId); - InternalError(NDqProto::StatusIds::UNAVAILABLE, TIssuesIds::DEFAULT_ERROR, - "Delivery problem: EvScanDataAck lost."); - } else { - CA_LOG_D("Skip lost TEvScanDataAck to " << ev->Sender << ", active scan actor: " << shard.ActorId); + for (auto& [_, state] : shards) { + const auto actorId = state->ActorId; + if (state->State == EShardState::Running && ev->Sender == actorId) { + CA_LOG_E("TEvScanDataAck lost while running scan, terminate execution. DataShard actor: " << actorId); + InternalError(NDqProto::StatusIds::UNAVAILABLE, TIssuesIds::DEFAULT_ERROR, + "Delivery problem: EvScanDataAck lost."); + } else { + CA_LOG_D("Skip lost TEvScanDataAck to " << ev->Sender << ", active scan actor: " << actorId); + } } return; } @@ -663,10 +839,12 @@ private: CA_LOG_N("Disconnected node " << nodeId); TrackingNodes.erase(nodeId); - for(auto& [tabletId, state] : InFlightShards) { - if (state.ActorId && state.ActorId.NodeId() == nodeId) { - InternalError(NDqProto::StatusIds::UNAVAILABLE, TIssuesIds::DEFAULT_ERROR, - TStringBuilder() << "Connection with node " << nodeId << " lost."); + for (auto& [tabletId, states] : InFlightShards) { + for (auto&& [_, state] : states) { + if (state->ActorId && state->ActorId.NodeId() == nodeId) { + InternalError(NDqProto::StatusIds::UNAVAILABLE, TIssuesIds::DEFAULT_ERROR, + TStringBuilder() << "Connection with node " << nodeId << " lost."); + } } } } @@ -674,90 +852,59 @@ private: private: bool StartTableScan() { - // allow reading from multiple shards if data is not sorted - const ui32 maxAllowedInFlight = ShardsScanningPolicy.GetMaxInFlightScans(Meta); - - while (!PendingShards.empty() && InFlightShards.size() + PendingResolveShards.size() + 1 <= maxAllowedInFlight) { - ui64 tabletId = PendingShards.front().TabletId; - auto [it, success] = InFlightShards.emplace(tabletId, std::move(PendingShards.front())); + const ui32 maxAllowedInFlight = MaxInFlight; + bool isFirst = true; + while (!PendingShards.empty() && InFlightShards.GetScansCount() + PendingResolveShards.size() + 1 <= maxAllowedInFlight) { + if (isFirst) { + CA_LOG_D("BEFORE: " << PendingShards.size() << "." << InFlightShards.GetScansCount() << "." << PendingResolveShards.size()); + isFirst = false; + } + auto state = InFlightShards.Put(std::move(PendingShards.front())); PendingShards.pop_front(); - StartReadShard(&(it->second)); + StartReadShard(state); + } + if (!isFirst) { + CA_LOG_D("AFTER: " << PendingShards.size() << "." << InFlightShards.GetScansCount() << "." << PendingResolveShards.size()); } - CA_LOG_D("Scheduled table scans, in flight: " << InFlightShards.size() << " shards. " + CA_LOG_D("Scheduled table scans, in flight: " << InFlightShards.GetScansCount() << " shards. " << "pending shards to read: " << PendingShards.size() << ", " << "pending resolve shards: " << PendingResolveShards.size() << ", " << "average read rows: " << Stats.AverageReadRows() << ", " << "average read bytes: " << Stats.AverageReadBytes() << ", "); - return InFlightShards.size() + PendingShards.size() + PendingResolveShards.size() > 0; + return CostRequestsByScanId.size() + InFlightShards.GetScansCount() + PendingShards.size() + PendingResolveShards.size() > 0; } - void StartReadShard(TShardState* state) { + void StartReadShard(TShardState::TPtr state) { YQL_ENSURE(state->State == EShardState::Initial); state->State = EShardState::Starting; - state->Generation = AllocateGeneration(state); + state->Generation = InFlightShards.AllocateGeneration(state); state->ActorId = {}; SendStartScanRequest(state, state->Generation); } - void SendScanDataAck(TShardState* state) { + bool SendScanDataAck(TShardState::TPtr state) { ui64 freeSpace = CalculateFreeSpace(); + if (!freeSpace) { + return false; + } CA_LOG_D("Send EvScanDataAck to " << state->ActorId << ", freeSpace: " << freeSpace << ", gen: " << state->Generation); ui32 flags = IEventHandle::FlagTrackDelivery; if (TrackingNodes.insert(state->ActorId.NodeId()).second) { flags |= IEventHandle::FlagSubscribeOnSession; } Send(state->ActorId, new TEvKqpCompute::TEvScanDataAck(freeSpace, state->Generation), flags, state->TabletId); + InFlightShards.AckSent(state); + return true; } - void SendStartScanRequest(TShardState* state, ui32 gen) { + void SendStartScanRequest(TShardState::TPtr state, ui32 gen) { YQL_ENSURE(state->State == EShardState::Starting); - auto ev = MakeHolder<TEvDataShard::TEvKqpScan>(); - ev->Record.SetLocalPathId(ScanData->TableId.PathId.LocalPathId); - for (auto& column: ScanData->GetColumns()) { - ev->Record.AddColumnTags(column.Tag); - ev->Record.AddColumnTypes(column.Type); - } - ev->Record.MutableSkipNullKeys()->CopyFrom(Meta.GetSkipNullKeys()); - - CA_LOG_D("Start scan request, " << state->ToString(KeyColumnTypes)); auto ranges = state->GetScanRanges(KeyColumnTypes); - auto protoRanges = ev->Record.MutableRanges(); - protoRanges->Reserve(ranges.size()); - - for (auto& range: ranges) { - auto newRange = protoRanges->Add(); - range.Serialize(*newRange); - } - - ev->Record.MutableSnapshot()->CopyFrom(Snapshot); - if (RuntimeSettings.Timeout) { - ev->Record.SetTimeoutMs(RuntimeSettings.Timeout.Get()->MilliSeconds()); - } - ev->Record.SetStatsMode(RuntimeSettings.StatsMode); - ev->Record.SetScanId(0); - ev->Record.SetTxId(std::get<ui64>(TxId)); - ev->Record.SetTablePath(ScanData->TablePath); - ev->Record.SetSchemaVersion(ScanData->TableId.SchemaVersion); - - ev->Record.SetGeneration(gen); - - ev->Record.SetReverse(Meta.GetReverse()); - ev->Record.SetItemsLimit(Meta.GetItemsLimit()); - - if (Meta.HasOlapProgram()) { - TString programBytes; - TStringOutput stream(programBytes); - Meta.GetOlapProgram().SerializeToArcadiaStream(&stream); - ev->Record.SetOlapProgram(programBytes); - ev->Record.SetOlapProgramType( - NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM_WITH_PARAMETERS - ); - } - - ev->Record.SetDataFormat(Meta.GetDataFormat()); + CA_LOG_D("Start scan request, " << state->ToString(KeyColumnTypes)); + THolder<TEvDataShard::TEvKqpScan> ev = BuildEvKqpScan(0, gen, ranges); bool subscribed = std::exchange(state->SubscribedOnTablet, true); @@ -769,7 +916,7 @@ private: IEventHandle::FlagTrackDelivery); } - void RetryDeliveryProblem(TShardState* state) { + void RetryDeliveryProblem(TShardState::TPtr state) { Counters->ScanQueryShardDisconnect->Inc(); if (state->TotalRetries >= MAX_TOTAL_SHARD_RETRIES) { @@ -784,12 +931,12 @@ private: // so after several consecutive delivery problem responses retry logic should // resolve shard details again. if (state->RetryAttempt >= MAX_SHARD_RETRIES) { - return ResolveShard(state); + return ResolveShard(*state); } state->RetryAttempt++; state->TotalRetries++; - state->Generation = AllocateGeneration(state); + state->Generation = InFlightShards.AllocateGeneration(state); state->ActorId = {}; state->State = EShardState::Starting; state->SubscribedOnTablet = false; @@ -800,7 +947,7 @@ private: << " schedule after " << retryDelay); state->RetryTimer = CreateLongTimer(TlsActivationContext->AsActorContext(), retryDelay, - new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvRetryShard(state->TabletId))); + new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvRetryShard(state->TabletId, state->Generation))); } bool IsQuotingEnabled() const { @@ -821,7 +968,7 @@ private: as->Send(selfId, new TEvents::TEvWakeup(EEvWakeupTag::RlNoResourceTag)); }; - const NRpcService::TRlFullPath rlFullPath { + const NRpcService::TRlFullPath rlFullPath{ .CoordinationNode = rlPath->GetCoordinationNode(), .ResourcePath = rlPath->GetResourcePath(), .DatabaseName = rlPath->GetDatabase(), @@ -845,28 +992,39 @@ private: void ResolveNextShard() { if (!PendingResolveShards.empty()) { auto& state = PendingResolveShards.front(); - ResolveShard(&state); + ResolveShard(state); } } - void EnqueueResolveShard(TShardState* state) { - auto it = InFlightShards.find(state->TabletId); - YQL_ENSURE(it != InFlightShards.end()); - PendingResolveShards.emplace_back(std::move(it->second)); - InFlightShards.erase(it); + void EnqueueResolveShard(TShardState::TPtr state) { + CA_LOG_D("Enqueue for resolve " << state->TabletId << " chunk " << state->ScannerIdx); + YQL_ENSURE(StopReadChunk(*state)); + DoExecute(); + PendingResolveShards.emplace_back(*state); if (PendingResolveShards.size() == 1) { ResolveNextShard(); } } - void StopReadFromTablet(TShardState* state) { - CA_LOG_D("Unlink from tablet " << state->TabletId << " and stop reading from it."); - Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvUnlink(state->TabletId)); - InFlightShards.erase(state->TabletId); + bool StopReadChunk(const TShardState& state) { + CA_LOG_D("Unlink from tablet " << state.TabletId << " chunk " << state.ScannerIdx << " and stop reading from it."); + const ui64 tabletId = state.TabletId; + const ui32 scannerIdx = state.ScannerIdx; + if (!InFlightShards.RemoveIfExists(scannerIdx)) { + return false; + } + const size_t remainChunksCount = InFlightShards.GetByTabletId(tabletId).size(); + if (remainChunksCount == 0) { + CA_LOG_D("Unlink fully for tablet " << state.TabletId); + Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvUnlink(tabletId)); + } else { + CA_LOG_D("Tablet " << state.TabletId << " not ready for unlink. Ramained chunks count: " << remainChunksCount); + } + return true; } - void ResolveShard(TShardState* state) { - if (state->ResolveAttempt >= MAX_SHARD_RESOLVES) { + void ResolveShard(TShardState& state) { + if (state.ResolveAttempt >= MAX_SHARD_RESOLVES) { InternalError(NDqProto::StatusIds::UNAVAILABLE, TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE, TStringBuilder() << "Table '" << ScanData->TablePath << "' resolve limit exceeded"); return; @@ -874,12 +1032,12 @@ private: Counters->ScanQueryShardResolve->Inc(); - state->State = EShardState::Resolving; - state->ResolveAttempt++; - state->SubscribedOnTablet = false; + state.State = EShardState::Resolving; + state.ResolveAttempt++; + state.SubscribedOnTablet = false; - auto range = TTableRange(state->Ranges.front().From.GetCells(), state->Ranges.front().FromInclusive, - state->Ranges.back().To.GetCells(), state->Ranges.back().ToInclusive); + auto range = TTableRange(state.Ranges.front().From.GetCells(), state.Ranges.front().FromInclusive, + state.Ranges.back().To.GetCells(), state.Ranges.back().ToInclusive); TVector<TKeyDesc::TColumnOp> columns; columns.reserve(ScanData->GetColumns().size()); @@ -892,11 +1050,11 @@ private: } auto keyDesc = MakeHolder<TKeyDesc>(ScanData->TableId, range, TKeyDesc::ERowOperation::Read, - KeyColumnTypes, columns); + KeyColumnTypes, columns); CA_LOG_D("Sending TEvResolveKeySet update for table '" << ScanData->TablePath << "'" << ", range: " << DebugPrintRange(KeyColumnTypes, range, *AppData()->TypeRegistry) - << ", attempt #" << state->ResolveAttempt); + << ", attempt #" << state.ResolveAttempt); auto request = MakeHolder<NSchemeCache::TSchemeCacheRequest>(); request->ResultSet.emplace_back(std::move(keyDesc)); @@ -907,36 +1065,45 @@ private: private: ui64 CalculateFreeSpace() const { return GetMemoryLimits().ScanBufferSize > ScanData->GetStoredBytes() - ? GetMemoryLimits().ScanBufferSize - ScanData->GetStoredBytes() - : 0ul; + ? GetMemoryLimits().ScanBufferSize - ScanData->GetStoredBytes() + : 0ul; } std::any GetSourcesState() override { - if (ScanData) { - return CalculateFreeSpace(); + if (!ScanData) { + return 0; } - return {}; + return CalculateFreeSpace(); } void PollSources(std::any prev) override { - if (!prev.has_value() || !ScanData) { + if (!ScanData || ScanData->IsFinished()) { return; } - - for (auto it = InFlightShards.begin(); it != InFlightShards.end(); ++it) { - auto* state = &(it->second); + const auto hasNewMemoryPred = [&]() { + if (!prev.has_value()) { + return false; + } const ui64 freeSpace = CalculateFreeSpace(); const ui64 prevFreeSpace = std::any_cast<ui64>(prev); - - CA_LOG_T("Scan over tablet " << state->TabletId << " finished: " << ScanData->IsFinished() - << ", prevFreeSpace: " << prevFreeSpace << ", freeSpace: " << freeSpace << ", peer: " << state->ActorId); - - if (!ScanData->IsFinished() && state->State != EShardState::PostRunning - && prevFreeSpace < freeSpace && state->ActorId) - { - SendScanDataAck(state); + return freeSpace > prevFreeSpace; + }; + if (!hasNewMemoryPred()) { + return; + } + CA_LOG_D("POLL_SOURCES:START:" << InFlightShards.GetShardsCount() << "." << InFlightShards.GetScansCount()); + while (InFlightShards.GetNeedAck().size()) { + auto state = InFlightShards.GetNeedAck().begin()->second; + + CA_LOG_T("Scan over tablet " << state->TabletId << ", peer: " << state->ActorId); + Y_VERIFY(state->State != EShardState::PostRunning); + Y_VERIFY(!!state->ActorId); + if (!SendScanDataAck(state)) { + CA_LOG_D("POLL_SOURCES:STOP_CANNOT_SEND_ACK:" << InFlightShards.GetShardsCount() << "." << InFlightShards.GetScansCount()); + break; } } + CA_LOG_D("POLL_SOURCES:FINISH:" << InFlightShards.GetShardsCount() << "." << InFlightShards.GetScansCount()); } void TerminateSources(const TIssues& issues, bool success) override { @@ -945,16 +1112,18 @@ private: } auto prio = success ? NActors::NLog::PRI_DEBUG : NActors::NLog::PRI_WARN; - for(auto it = InFlightShards.begin(); it != InFlightShards.end(); ++it) { - auto* state = &(it->second); - if (state->ActorId) { - CA_LOG(prio, "Send abort execution event to scan over tablet: " << state->TabletId << ", table: " - << ScanData->TablePath << ", scan actor: " << state->ActorId << ", message: " << issues.ToOneLineString()); - - Send(state->ActorId, new TEvKqp::TEvAbortExecution( - success ? NYql::NDqProto::StatusIds::SUCCESS : NYql::NDqProto::StatusIds::ABORTED, issues)); - } else { - CA_LOG(prio, "Table: " << ScanData->TablePath << ", scan has not been started yet"); + for (auto&& itTablet : InFlightShards) { + for (auto&& it : itTablet.second) { + auto state = it.second; + if (state->ActorId) { + CA_LOG(prio, "Send abort execution event to scan over tablet: " << state->TabletId << ", table: " + << ScanData->TablePath << ", scan actor: " << state->ActorId << ", message: " << issues.ToOneLineString()); + + Send(state->ActorId, new TEvKqp::TEvAbortExecution( + success ? NYql::NDqProto::StatusIds::SUCCESS : NYql::NDqProto::StatusIds::ABORTED, issues)); + } else { + CA_LOG(prio, "Table: " << ScanData->TablePath << ", scan has not been started yet"); + } } } } @@ -978,22 +1147,22 @@ private: } template<class TMessage> - TShardState* GetShardState(const TMessage& msg, const TActorId& scanActorId) { + TShardState::TPtr GetShardState(const TMessage& msg, const TActorId& scanActorId) { ui32 generation; - if constexpr(std::is_same_v<TMessage, NKikimrKqp::TEvScanError>) { + if constexpr (std::is_same_v<TMessage, NKikimrKqp::TEvScanError>) { generation = msg.GetGeneration(); - } else if constexpr(std::is_same_v<TMessage, NKikimrKqp::TEvScanInitActor>) { + } else if constexpr (std::is_same_v<TMessage, NKikimrKqp::TEvScanInitActor>) { generation = msg.GetGeneration(); } else { generation = msg.Generation; } - auto it = AllocatedGenerations.find(generation); - YQL_ENSURE(it != AllocatedGenerations.end(), "Received message from unknown scan or request."); - ui64 tabletId = it->second; - auto stateIt = InFlightShards.find(tabletId); - if (stateIt == InFlightShards.end()) { - TString error = TStringBuilder() << "Received message from scan shard which is not currently in flight, tablet" << tabletId; + const ui32 scannerIdx = InFlightShards.GetIndexByGeneration(generation); + YQL_ENSURE(scannerIdx, "Received message from unknown scan or request. Generation: " << generation); + + TShardState::TPtr statePtr = InFlightShards.GetStateByIndex(scannerIdx); + if (!statePtr) { + TString error = TStringBuilder() << "Received message from scan shard which is not currently in flight, scannerIdx " << scannerIdx; CA_LOG_W(error); if (scanActorId) { TerminateExpiredScan(scanActorId, error); @@ -1002,7 +1171,7 @@ private: return nullptr; } - auto& state = stateIt->second; + auto& state = *statePtr; if (state.Generation != generation) { TString error = TStringBuilder() << "Received message from expired scan, generation mistmatch, " << "expected: " << state.Generation << ", received: " << generation; @@ -1014,15 +1183,7 @@ private: return nullptr; } - return &state; - } - - ui32 AllocateGeneration(TShardState* state) { - ui32 nextGeneration = ++LastGeneration; - auto[it, success] = AllocatedGenerations.emplace(nextGeneration, state->TabletId); - YQL_ENSURE(success, "Found duplicated while allocating next generation id for scan request: " - << nextGeneration << ", tablet " << state->TabletId << ", allocated for tablet " << it->second); - return nextGeneration; + return statePtr; } private: @@ -1038,52 +1199,26 @@ private: std::deque<TShardState> PendingShards; std::deque<TShardState> PendingResolveShards; - std::map<ui32, ui64> AllocatedGenerations; - std::map<ui64, TShardState> InFlightShards; + TInFlightShards InFlightShards; + ui32 ScansCounter = 0; - ui32 LastGeneration = 0; std::set<ui64> AffectedShards; std::set<ui32> TrackingNodes; + std::map<ui32, TShardCostsState::TPtr> CostRequestsByScanId; + std::map<ui64, TShardCostsState::TPtr> CostRequestsByShardId; + ui32 MaxInFlight = 1024; + bool IsAggregationRequest = false; }; } // anonymous namespace -ui32 TShardsScanningPolicy::GetMaxInFlightScans(const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta) const { - const bool isSorted = (meta.HasSorted() ? meta.GetSorted() : true); - if (meta.HasOlapProgram()) { - NKikimrSSA::TProgram program; - Y_VERIFY(program.ParseFromString(meta.GetOlapProgram().GetProgram())); - std::optional<ui32> aggregationLimit; - for (auto&& command : program.GetCommand()) { - if (!command.HasGroupBy()) { - aggregationLimit = Min(aggregationLimit.value_or(ProtoConfig.GetScanLimit()), ProtoConfig.GetScanLimit()); - continue; - } - ui32 aggrLimit; - if (command.GetGroupBy().GetKeyColumns().size()) { - aggrLimit = ProtoConfig.GetAggregationGroupByLimit(); - } else { - aggrLimit = ProtoConfig.GetAggregationNoGroupLimit(); - } - aggregationLimit = Min(aggregationLimit.value_or(aggrLimit), aggrLimit); - } - return aggregationLimit.value_or(ProtoConfig.GetAggregationGroupByLimit()); - } else if (isSorted) { - return 1; - } else { - return ProtoConfig.GetScanLimit(); - } -} - IActor* CreateKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, const TActorId& executerId, ui64 txId, NDqProto::TDqTask&& task, IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits, - const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId) -{ + const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId) { return new TKqpScanComputeActor(snapshot, executerId, txId, std::move(task), std::move(asyncIoFactory), functionRegistry, settings, memoryLimits, shardsScanningPolicy, counters, std::move(traceId)); } -} // namespace NKqp -} // namespace NKikimr +} diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp new file mode 100644 index 0000000000..e027b329f9 --- /dev/null +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp @@ -0,0 +1,89 @@ +#include "kqp_scan_compute_manager.h"
+#include <util/string/builder.h>
+
+namespace NKikimr::NKqp::NComputeActor {
+
+TString TInFlightShards::TraceToString() const {
+ TStringBuilder sb;
+ for (auto&& i : StatesByIndex) {
+ sb << i.first << ":" << i.second->State << ":" << NeedAckStates.contains(i.first) << ";";
+ }
+ return sb;
+}
+
+TShardState::TPtr TInFlightShards::RemoveIfExists(const ui32 scannerIdx) {
+ if (!scannerIdx) {
+ return nullptr;
+ }
+ auto itScanner = StatesByIndex.find(scannerIdx);
+ if (itScanner == StatesByIndex.end()) {
+ return nullptr;
+ }
+ TShardState::TPtr result = itScanner->second;
+ auto itTablet = Shards.find(result->TabletId);
+ if (itTablet == Shards.end()) {
+ return nullptr;
+ }
+ auto it = itTablet->second.find(result->ScannerIdx);
+ if (it == itTablet->second.end()) {
+ return nullptr;
+ }
+ MutableStatistics(result->TabletId).MutableStatistics(result->ScannerIdx).SetFinishInstant(Now());
+ NeedAckStates.erase(result->ScannerIdx);
+ TScanShardsStatistics::OnScansDiff(Shards.size(), GetScansCount());
+
+ itTablet->second.erase(it);
+ if (itTablet->second.empty()) {
+ Shards.erase(itTablet);
+ }
+ StatesByIndex.erase(itScanner);
+ return result;
+}
+
+TShardState::TPtr TInFlightShards::RemoveIfExists(TShardState::TPtr state) {
+ if (!state) {
+ return state;
+ }
+ return RemoveIfExists(state->ScannerIdx);
+}
+
+TShardState::TPtr TInFlightShards::Put(TShardState&& state) {
+ TScanShardsStatistics::OnScansDiff(Shards.size(), GetScansCount());
+ MutableStatistics(state.TabletId).MutableStatistics(state.ScannerIdx).SetStartInstant(Now());
+
+ TShardState::TPtr result = std::make_shared<TShardState>(std::move(state));
+ StatesByIndex.emplace(result->ScannerIdx, result);
+ Shards[result->TabletId].emplace(result->ScannerIdx, result);
+ return result;
+}
+
+ui32 TInFlightShards::GetIndexByGeneration(const ui32 generation) {
+ auto it = AllocatedGenerations.find(generation);
+ if (it == AllocatedGenerations.end()) {
+ return 0;
+ }
+ return it->second;
+}
+
+ui32 TInFlightShards::AllocateGeneration(TShardState::TPtr state) {
+ {
+ auto itTablet = Shards.find(state->TabletId);
+ Y_VERIFY(itTablet != Shards.end());
+ auto it = itTablet->second.find(state->ScannerIdx);
+ Y_VERIFY(it != itTablet->second.end());
+ }
+
+ const ui32 nextGeneration = ++LastGeneration;
+ Y_VERIFY(AllocatedGenerations.emplace(nextGeneration, state->ScannerIdx).second);
+ return nextGeneration;
+}
+
+ui32 TInFlightShards::GetScansCount() const {
+ ui32 result = 0;
+ for (auto&& i : Shards) {
+ result += i.second.size();
+ }
+ return result;
+}
+
+}
\ No newline at end of file diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h new file mode 100644 index 0000000000..cf808a5d17 --- /dev/null +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h @@ -0,0 +1,79 @@ +#pragma once
+#include "kqp_compute_state.h"
+#include "kqp_scan_compute_stat.h"
+
+namespace NKikimr::NKqp::NComputeActor {
+
+class TInFlightShards: public TScanShardsStatistics {
+private:
+ using TTabletStates = std::map<ui32, TShardState::TPtr>;
+ using TTabletsData = std::map<ui64, TTabletStates>;
+ TTabletsData Shards;
+ std::map<ui32, ui32> AllocatedGenerations;
+ TTabletStates StatesByIndex;
+ std::set<ui32> ActualScannerIds;
+ ui32 LastGeneration = 0;
+ std::map<ui32, TShardState::TPtr> NeedAckStates;
+
+public:
+ TString TraceToString() const;
+
+ const std::map<ui32, TShardState::TPtr>& GetNeedAck() const {
+ return NeedAckStates;
+ }
+
+ void AckSent(TShardState::TPtr state) {
+ Y_VERIFY(StatesByIndex.contains(state->ScannerIdx));
+ NeedAckStates.erase(state->ScannerIdx);
+ }
+
+ void NeedAck(TShardState::TPtr state) {
+ Y_VERIFY(StatesByIndex.contains(state->ScannerIdx));
+ NeedAckStates.emplace(state->ScannerIdx, state);
+ }
+
+ ui32 AllocateGeneration(TShardState::TPtr state);
+ ui32 GetScansCount() const;
+ ui32 GetShardsCount() const {
+ return Shards.size();
+ }
+ bool empty() const {
+ return Shards.empty();
+ }
+ TTabletsData::const_iterator begin() const {
+ return Shards.begin();
+ }
+ TTabletsData::const_iterator end() const {
+ return Shards.end();
+ }
+ TShardState::TPtr GetStateByIndex(const ui32 index) const {
+ auto it = StatesByIndex.find(index);
+ if (it == StatesByIndex.end()) {
+ return nullptr;
+ }
+ return it->second;
+ }
+ ui32 GetIndexByGeneration(const ui32 generation);
+ TShardState::TPtr RemoveIfExists(TShardState::TPtr state);
+
+ TShardState::TPtr RemoveIfExists(const ui32 scannerIdx);
+ TShardState::TPtr Put(TShardState&& state);
+ const TTabletStates& GetByTabletId(const ui64 tabletId) const {
+ auto it = Shards.find(tabletId);
+ if (it == Shards.end()) {
+ return Default<TTabletStates>();
+ } else {
+ return it->second;
+ }
+ }
+ TTabletStates* MutableByTabletId(const ui64 tabletId) {
+ auto it = Shards.find(tabletId);
+ if (it == Shards.end()) {
+ return nullptr;
+ } else {
+ return &it->second;
+ }
+ }
+};
+
+}
\ No newline at end of file diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_stat.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_stat.cpp new file mode 100644 index 0000000000..1ee73017ae --- /dev/null +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_stat.cpp @@ -0,0 +1,111 @@ +#include "kqp_scan_compute_stat.h"
+#include <util/string/builder.h>
+
+namespace NKikimr::NKqp::NComputeActor {
+
+TChunkStatistics& TChunkStatistics::SetStartInstant(const TInstant value) {
+ StartInstant = value;
+ return *this;
+}
+
+TChunkStatistics& TChunkStatistics::SetFinishInstant(const TInstant value) {
+ FinishInstant = value;
+ return *this;
+}
+
+void TShardStatistics::AddPack(const ui32 rowsCount, const ui64 bytes) {
+ if (!MaxPackSize) {
+ MaxPackSize = rowsCount;
+ } else if (*MaxPackSize < rowsCount) {
+ MaxPackSize = rowsCount;
+ }
+ if (!MinPackSize) {
+ MinPackSize = rowsCount;
+ } else if (*MinPackSize < rowsCount) {
+ MinPackSize = rowsCount;
+ }
+ PacksCount += 1;
+ TotalRowsCount += rowsCount;
+ TotalBytesCount += bytes;
+}
+
+TString TShardStatistics::StatisticsToString() const {
+ TStringBuilder sb;
+ std::optional<TInstant> minInstant;
+ std::optional<TInstant> maxInstant;
+ TDuration dMin;
+ TDuration dMax;
+ TDuration dAvg;
+ for (auto&& i : Statistics) {
+ if (!dMin) {
+ dMin = i.second.GetDuration();
+ } else {
+ dMin = Min(i.second.GetDuration(), dMin);
+ }
+ if (!dMax) {
+ dMax = i.second.GetDuration();
+ } else {
+ dMax = Max(i.second.GetDuration(), dMax);
+ }
+ dAvg += i.second.GetDuration();
+ if (!minInstant || !maxInstant) {
+ minInstant = i.second.GetStartInstant();
+ maxInstant = i.second.GetFinishInstant();
+ } else {
+ minInstant = Min(i.second.GetStartInstant(), *minInstant);
+ maxInstant = Max(i.second.GetFinishInstant(), *maxInstant);
+ }
+ }
+ sb << "CHUNKS=" << Statistics.size() << ";";
+ if (minInstant && maxInstant) {
+ sb << "D=" << *maxInstant - *minInstant << ";";
+ }
+ if (Statistics.size()) {
+ sb << "PacksCount=" << PacksCount << ";";
+ if (PacksCount) {
+ sb << "RowsCount=" << TotalRowsCount << ";";
+ sb << "BytesCount=" << TotalBytesCount << ";";
+ sb << "MinPackSize=" << *MinPackSize << ";";
+ sb << "MaxPackSize=" << *MaxPackSize << ";";
+ }
+ sb << "CAVG=" << dAvg / Statistics.size() << ";";
+ sb << "CMIN=" << dMin << ";";
+ sb << "CMAX=" << dMax << ";";
+ }
+ return sb;
+}
+
+TString TScanShardsStatistics::StatisticsToString() const {
+ TStringBuilder sb;
+ for (auto&& i : Statistics) {
+ sb << "{SHARD(" << i.first << "):";
+ sb << i.second.StatisticsToString() << "};";
+ }
+ return sb;
+}
+
+TString TScanShardsStatistics::GetDurationStats() const {
+ TStringBuilder sb;
+ double sumDuration = 0;
+ for (auto&& i : DurationByShardsCount) {
+ sumDuration += i.second.MicroSeconds();
+ }
+ if (sumDuration > 10) {
+ sb << "InFlightScans:";
+ double wScans = 0;
+ for (auto&& i : DurationByScansCount) {
+ wScans += i.first * 1.0 * i.second.MicroSeconds();
+ }
+ sb << "InFlightShards:";
+ double wShards = 0;
+ for (auto&& i : DurationByShardsCount) {
+ wShards += i.first * 1.0 * i.second.MicroSeconds();
+ }
+ sb << ";wScans=" << wScans / sumDuration << ";wShards=" << wShards / sumDuration << ";";
+ } else {
+ sb << "so fast";
+ }
+ return sb;
+}
+
+}
\ No newline at end of file diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_stat.h b/ydb/core/kqp/compute_actor/kqp_scan_compute_stat.h new file mode 100644 index 0000000000..4ccb51c3f5 --- /dev/null +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_stat.h @@ -0,0 +1,63 @@ +#pragma once
+#include <util/datetime/base.h>
+#include <util/system/types.h>
+#include <optional>
+#include <map>
+
+namespace NKikimr::NKqp::NComputeActor {
+
+class TChunkStatistics {
+private:
+ TInstant StartInstant;
+ TInstant FinishInstant;
+public:
+ TChunkStatistics& SetStartInstant(const TInstant value);
+ TChunkStatistics& SetFinishInstant(const TInstant value);
+ TInstant GetStartInstant() const {
+ return StartInstant;
+ }
+ TInstant GetFinishInstant() const {
+ return FinishInstant;
+ }
+ TDuration GetDuration() const {
+ return FinishInstant - StartInstant;
+ }
+};
+
+class TShardStatistics {
+private:
+ std::map<ui32, TChunkStatistics> Statistics;
+ std::optional<ui32> MaxPackSize;
+ std::optional<ui32> MinPackSize;
+ ui32 PacksCount = 0;
+ ui32 TotalRowsCount = 0;
+ ui32 TotalBytesCount = 0;
+public:
+ void AddPack(const ui32 rowsCount, const ui64 bytes);
+ TChunkStatistics& MutableStatistics(const ui32 scannerIdx) {
+ return Statistics[scannerIdx];
+ }
+ TString StatisticsToString() const;
+};
+
+class TScanShardsStatistics {
+private:
+ TInstant PredDiff = Now();
+ std::map<ui32, TDuration> DurationByScansCount;
+ std::map<ui32, TDuration> DurationByShardsCount;
+ std::map<ui64, TShardStatistics> Statistics;
+protected:
+ void OnScansDiff(const ui32 shardsCount, const ui32 scansCount) {
+ DurationByShardsCount[shardsCount] += Now() - PredDiff;
+ DurationByScansCount[scansCount] += Now() - PredDiff;
+ PredDiff = Now();
+ }
+public:
+ TShardStatistics& MutableStatistics(const ui64 shardId) {
+ return Statistics[shardId];
+ }
+ TString StatisticsToString() const;
+ TString GetDurationStats() const;
+
+};
+}
\ No newline at end of file diff --git a/ydb/core/kqp/ut/kqp_olap_ut.cpp b/ydb/core/kqp/ut/kqp_olap_ut.cpp index b78bc3af91..cafd527ff9 100644 --- a/ydb/core/kqp/ut/kqp_olap_ut.cpp +++ b/ydb/core/kqp/ut/kqp_olap_ut.cpp @@ -519,7 +519,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { auto planRes = CollectStreamResult(res); auto ast = planRes.QueryStats->Getquery_ast(); - +// Cerr << ast << Endl; for (auto planNode : planNodes) { UNIT_ASSERT_C(ast.find(planNode) != std::string::npos, TStringBuilder() << planNode << " was not pushed down. Query: " << query); @@ -1106,11 +1106,15 @@ Y_UNIT_TEST_SUITE(KqpOlap) { auto rows = ExecuteScanQuery(tableClient, selectQuery); TInstant tsPrev = TInstant::MicroSeconds(1000000); + + std::set<ui64> results = { 1000096, 1000097, 1000098, 1000099, 1000999, 1001000 }; for (const auto& r : rows) { TInstant ts = GetTimestamp(r.at("timestamp")); UNIT_ASSERT_GE_C(ts, tsPrev, "result is not sorted in ASC order"); + UNIT_ASSERT(results.erase(ts.GetValue())); tsPrev = ts; } + UNIT_ASSERT(rows.size() == 6); } Y_UNIT_TEST_TWIN(ExtractRangesReverse, UseSessionActor) { @@ -1129,6 +1133,13 @@ Y_UNIT_TEST_SUITE(KqpOlap) { SELECT `timestamp` FROM `/Root/olapStore/olapTable` WHERE (`timestamp` < CAST(1000100 AS Timestamp) AND `timestamp` > CAST(1000095 AS Timestamp)) OR + (`timestamp` < CAST(1000300 AS Timestamp) AND `timestamp` >= CAST(1000295 AS Timestamp)) OR + (`timestamp` <= CAST(1000400 AS Timestamp) AND `timestamp` > CAST(1000395 AS Timestamp)) OR + + (`timestamp` <= CAST(1000500 AS Timestamp) AND `timestamp` >= CAST(1000495 AS Timestamp)) OR + (`timestamp` <= CAST(1000505 AS Timestamp) AND `timestamp` >= CAST(1000499 AS Timestamp)) OR + (`timestamp` < CAST(1000510 AS Timestamp) AND `timestamp` >= CAST(1000505 AS Timestamp)) OR + (`timestamp` <= CAST(1001000 AS Timestamp) AND `timestamp` >= CAST(1000999 AS Timestamp)) OR (`timestamp` > CAST(1002000 AS Timestamp)) ORDER BY `timestamp` DESC; @@ -1136,11 +1147,19 @@ Y_UNIT_TEST_SUITE(KqpOlap) { auto rows = ExecuteScanQuery(tableClient, selectQuery); TInstant tsPrev = TInstant::MicroSeconds(2000000); + std::set<ui64> results = { 1000096, 1000097, 1000098, 1000099, + 1000999, 1001000, + 1000295, 1000296, 1000297, 1000298, 1000299, + 1000396, 1000397, 1000398, 1000399, 1000400, + 1000495, 1000496, 1000497, 1000498, 1000499, 1000500, 1000501, 1000502, 1000503, 1000504, 1000505, 1000506, 1000507, 1000508, 1000509 }; + const ui32 expectedCount = results.size(); for (const auto& r : rows) { TInstant ts = GetTimestamp(r.at("timestamp")); UNIT_ASSERT_LE_C(ts, tsPrev, "result is not sorted in DESC order"); + UNIT_ASSERT(results.erase(ts.GetValue())); tsPrev = ts; } + UNIT_ASSERT(rows.size() == expectedCount); } Y_UNIT_TEST_TWIN(PredicatePushdown, UseSessionActor) { @@ -1744,6 +1763,93 @@ Y_UNIT_TEST_SUITE(KqpOlap) { UNIT_ASSERT_VALUES_EQUAL(result, insertRows); } + Y_UNIT_TEST(GranulesInShard) { + TPortManager tp; + ui16 mbusport = tp.GetPort(2134); + auto settings = Tests::TServerSettings(mbusport) + .SetDomainName("Root") + .SetUseRealThreads(false) + .SetNodeCount(2); + + Tests::TServer::TPtr server = new Tests::TServer(settings); + + auto runtime = server->GetRuntime(); + auto sender = runtime->AllocateEdgeActor(); + + InitRoot(server, sender); + EnableDebugLogging(runtime); + + ui32 numShards = 1; + ui32 numIterations = 100; + CreateTestOlapTable(*server, "largeOlapTable", "largeOlapStore", numShards, numShards); + ui32 insertRows = 0; + const ui32 iterationPackSize = 2000; + for (ui64 i = 0; i < numIterations; ++i) { + SendDataViaActorSystem(runtime, "/Root/largeOlapStore/largeOlapTable", 0, 1000000 + i * 1000000, iterationPackSize); + insertRows += iterationPackSize; + } + + ui64 result = 0; + bool testProcessed = false; + auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto { + switch (ev->GetTypeRewrite()) { + case NKqp::TKqpComputeEvents::EvCostData: + { + + auto* msg = ev->Get<NKqp::TEvKqpCompute::TEvCostData>(); + if (msg->GetTableRanges().GetMarksCount()) { + Y_VERIFY(msg->GetSerializedTableRanges(1).size() == 1); + Y_VERIFY(msg->GetSerializedTableRanges(2).size() == 2); + testProcessed = true; + } + break; + } + + case NKqp::TKqpExecuterEvents::EvShardsResolveStatus: + { + + auto* msg = ev->Get<NKqp::TEvKqpExecuter::TEvShardsResolveStatus>(); + for (auto& [shardId, nodeId] : msg->ShardNodes) { + Cerr << "-- nodeId: " << nodeId << Endl; + nodeId = runtime->GetNodeId(0); + } + break; + } + + case NKqp::TKqpExecuterEvents::EvStreamData: + { + auto& record = ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record; + + Cerr << (TStringBuilder() << "-- EvStreamData: " << record.AsJSON() << Endl); + Cerr.Flush(); + + Y_ASSERT(record.GetResultSet().rows().size() == 1); + Y_ASSERT(record.GetResultSet().rows().at(0).items().size() == 1); + result = record.GetResultSet().rows().at(0).items().at(0).uint64_value(); + + auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(); + resp->Record.SetEnough(false); + resp->Record.SetSeqNo(ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record.GetSeqNo()); + resp->Record.SetFreeSpace(100); + runtime->Send(new IEventHandle(ev->Sender, sender, resp.Release())); + return TTestActorRuntime::EEventAction::DROP; + } + } + return TTestActorRuntime::EEventAction::PROCESS; + }; + + runtime->SetObserverFunc(captureEvents); + auto streamSender = runtime->AllocateEdgeActor(); + const TInstant start = Now(); + while (Now() - start < TDuration::Seconds(20) && !testProcessed) { + SendRequest(*runtime, streamSender, MakeStreamRequest(streamSender, "SELECT COUNT(*) FROM `/Root/largeOlapStore/largeOlapTable`;", false)); + auto ev = runtime->GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(streamSender); + UNIT_ASSERT_VALUES_EQUAL(result, insertRows); + Sleep(TDuration::Seconds(1)); + } + UNIT_ASSERT(testProcessed); + } + Y_UNIT_TEST(ManyColumnShardsWithRestarts) { TPortManager tp; ui16 mbusport = tp.GetPort(2134); diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index bca2befc98..0173752ce3 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1005,6 +1005,8 @@ message TTableServiceConfig { optional uint32 AggregationGroupByLimit = 1 [default = 256]; optional uint32 AggregationNoGroupLimit = 2 [default = 1024]; optional uint32 ScanLimit = 3 [default = 16]; + optional bool ParallelScanningAvailable = 4 [default = false]; + optional uint32 ShardSplitFactor = 5 [default = 1]; } message TResourceManager { |