diff options
author | Vitalii Gridnev <gridnevvvit@gmail.com> | 2022-06-04 15:30:48 +0300 |
---|---|---|
committer | Vitalii Gridnev <gridnevvvit@gmail.com> | 2022-06-04 15:30:48 +0300 |
commit | c1b198927ea6195d3cebd2a31c0042e9b45d1094 (patch) | |
tree | f14f782cbdfcf0f45f28daa1e89c47c604c8f6e0 | |
parent | 521a57de8da8fdde59d36e2149f27506d4c1b8db (diff) | |
download | ydb-c1b198927ea6195d3cebd2a31c0042e9b45d1094.tar.gz |
[kqp] refactor scan state KIKIMR-15042
ref:2bfd3a255ca3a3abcb7c291cc762b5a3ba1cd899
-rw-r--r-- | ydb/core/kqp/compute_actor/CMakeLists.txt | 1 | ||||
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_compute_actor.h | 42 | ||||
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_compute_actor_helpers.cpp | 94 | ||||
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp | 167 | ||||
-rw-r--r-- | ydb/core/kqp/kqp_compute_actor_helpers.cpp | 0 | ||||
-rw-r--r-- | ydb/core/tx/datashard/range_ops.cpp | 12 | ||||
-rw-r--r-- | ydb/core/tx/datashard/range_ops.h | 3 |
7 files changed, 168 insertions, 151 deletions
diff --git a/ydb/core/kqp/compute_actor/CMakeLists.txt b/ydb/core/kqp/compute_actor/CMakeLists.txt index 364e6e8ca8f..f32b1cf41de 100644 --- a/ydb/core/kqp/compute_actor/CMakeLists.txt +++ b/ydb/core/kqp/compute_actor/CMakeLists.txt @@ -24,6 +24,7 @@ target_link_libraries(core-kqp-compute_actor PUBLIC ) target_sources(core-kqp-compute_actor PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/compute_actor/kqp_compute_actor_helpers.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp ) diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.h b/ydb/core/kqp/compute_actor/kqp_compute_actor.h index 8b36af9efd5..399dbcbad37 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor.h +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.h @@ -3,6 +3,9 @@ #include <ydb/core/kqp/counters/kqp_counters.h> #include <ydb/core/kqp/kqp_compute.h> #include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h> +#include <ydb/core/scheme/scheme_tabledefs.h> +#include <ydb/core/base/appdata.h> +#include <ydb/core/tx/datashard/range_ops.h> namespace NKikimr { namespace NMiniKQL { @@ -26,6 +29,45 @@ IActor* CreateKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, cons const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits, TIntrusivePtr<TKqpCounters> counters); +namespace NComputeActor { + +enum class EShardState : ui32 { + Initial, + Starting, + Running, + PostRunning, //We already recieve all data, we has not processed it yet. + Resolving, +}; + +std::string_view EShardStateToString(EShardState state); + +// scan over datashards +struct TShardState { + ui64 TabletId; + TSmallVec<TSerializedTableRange> Ranges; + EShardState State = EShardState::Initial; + ui32 Generation = 0; + bool SubscribedOnTablet = false; + ui32 RetryAttempt = 0; + ui32 TotalRetries = 0; + bool AllowInstantRetry = true; + TDuration LastRetryDelay; + TActorId RetryTimer; + ui32 ResolveAttempt = 0; + TActorId ActorId; + + explicit TShardState(ui64 tabletId) + : TabletId(tabletId) {} + + TDuration CalcRetryDelay(); + void ResetRetry(); + + TString ToString(TConstArrayRef<NScheme::TTypeId> keyTypes) const; + const TSmallVec<TSerializedTableRange> GetScanRanges( + TConstArrayRef<NScheme::TTypeId> keyTypes, const TOwnedCellVec& lastKey) const; +}; + +} // namespace NComputeActor } // namespace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor_helpers.cpp b/ydb/core/kqp/compute_actor/kqp_compute_actor_helpers.cpp new file mode 100644 index 00000000000..ac5f402b475 --- /dev/null +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor_helpers.cpp @@ -0,0 +1,94 @@ +#include "kqp_compute_actor.h" + +#include <ydb/core/base/appdata.h> + +namespace NKikimr::NKqp::NComputeActor { + +static constexpr TDuration MIN_RETRY_DELAY = TDuration::MilliSeconds(250); +static constexpr TDuration MAX_RETRY_DELAY = TDuration::Seconds(2); + +std::string_view EShardStateToString(EShardState state) { + switch (state) { + case EShardState::Initial: return "Initial"sv; + case EShardState::Starting: return "Starting"sv; + case EShardState::Running: return "Running"sv; + case EShardState::Resolving: return "Resolving"sv; + case EShardState::PostRunning: return "PostRunning"sv; + } +} + +void TShardState::ResetRetry() { + RetryAttempt = 0; + AllowInstantRetry = true; + LastRetryDelay = {}; + if (RetryTimer) { + TlsActivationContext->Send(new IEventHandle(RetryTimer, RetryTimer, new TEvents::TEvPoison)); + RetryTimer = {}; + } + ResolveAttempt = 0; +} + +TDuration TShardState::CalcRetryDelay() { + if (std::exchange(AllowInstantRetry, false)) { + return TDuration::Zero(); + } + + if (LastRetryDelay) { + LastRetryDelay = Min(LastRetryDelay * 2, MAX_RETRY_DELAY); + } else { + LastRetryDelay = MIN_RETRY_DELAY; + } + return LastRetryDelay; +} + +TString TShardState::ToString(TConstArrayRef<NScheme::TTypeId> keyTypes) const { + TStringBuilder sb; + sb << "TShardState{ TabletId: " << TabletId << ", State: " << EShardStateToString(State) + << ", Gen: " << Generation << ", Ranges: ["; + for (size_t i = 0; i < Ranges.size(); ++i) { + sb << "#" << i << ": " << DebugPrintRange(keyTypes, Ranges[i].ToTableRange(), *AppData()->TypeRegistry); + if (i + 1 != Ranges.size()) { + sb << ", "; + } + } + sb << "], " + << ", RetryAttempt: " << RetryAttempt << ", TotalRetries: " << TotalRetries + << ", ResolveAttempt: " << ResolveAttempt << ", ActorId: " << ActorId << " }"; + return sb; +} + +const TSmallVec<TSerializedTableRange> TShardState::GetScanRanges(TConstArrayRef<NScheme::TTypeId> keyTypes, const TOwnedCellVec& lastKey) const { + // No any data read previously, return all ranges + if (!lastKey.DataSize()) { + return Ranges; + } + + // Form new vector. Skip ranges already read. + TVector<TSerializedTableRange> ranges; + ranges.reserve(Ranges.size()); + + YQL_ENSURE(keyTypes.size() == lastKey.size(), "Key columns size != last key"); + + for (auto rangeIt = Ranges.begin(); rangeIt != Ranges.end(); ++rangeIt) { + int cmp = ComparePointAndRange(lastKey, rangeIt->ToTableRange(), keyTypes, keyTypes); + + YQL_ENSURE(cmp >= 0, "Missed intersection of LastKey and range."); + + if (cmp > 0) { + continue; + } + + // It is range, where read was interrupted. Restart operation from last read key. + ranges.emplace_back(std::move(TSerializedTableRange( + TSerializedCellVec::Serialize(lastKey), rangeIt->To.GetBuffer(), false, rangeIt->ToInclusive + ))); + + // And push all others + ranges.insert(ranges.end(), ++rangeIt, Ranges.end()); + break; + } + + return ranges; +} + +} diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp index 48dfeb1dcd3..9249fbd2dd3 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp @@ -27,28 +27,13 @@ namespace { using namespace NYql; using namespace NYql::NDq; +using namespace NKikimr::NKqp::NComputeActor; bool IsDebugLogEnabled(const TActorSystem* actorSystem, NActors::NLog::EComponent component) { auto* settings = actorSystem->LoggerSettings(); return settings && settings->Satisfies(NActors::NLog::EPriority::PRI_DEBUG, component); } -TString DebugPrintRanges(TConstArrayRef<NScheme::TTypeId> types, - const TSmallVec<TSerializedTableRange>& ranges) -{ - auto typeRegistry = AppData()->TypeRegistry; - auto out = TStringBuilder(); - - for (auto& range: ranges) { - out << DebugPrintRange(types, range.ToTableRange(), *typeRegistry); - out << " "; - } - - return out; -} - -static constexpr TDuration MIN_RETRY_DELAY = TDuration::MilliSeconds(250); -static constexpr TDuration MAX_RETRY_DELAY = TDuration::Seconds(2); static constexpr ui64 MAX_SHARD_RETRIES = 5; // retry after: 0, 250, 500, 1000, 2000 static constexpr ui64 MAX_TOTAL_SHARD_RETRIES = 20; static constexpr ui64 MAX_SHARD_RESOLVES = 3; @@ -254,7 +239,7 @@ private: auto scanActorId = ActorIdFromProto(msg.GetScanActorId()); CA_LOG_D("Got EvScanInitActor from " << scanActorId << ", gen: " << msg.GetGeneration() - << ", state: " << ToString(state.State) << ", stateGen: " << state.Generation); + << ", state: " << EShardStateToString(state.State) << ", stateGen: " << state.Generation); switch (state.State) { case EShardState::Starting: { @@ -334,7 +319,7 @@ private: void ProcessScanData() { Y_VERIFY_DEBUG(ScanData); Y_VERIFY_DEBUG(!Shards.empty()); - Y_VERIFY(PendingScanData); + Y_VERIFY(!PendingScanData.empty()); auto& ev = PendingScanData.front().first; @@ -716,8 +701,6 @@ private: } private: - struct TShardState; - void StartTableScan() { YQL_ENSURE(!Shards.empty()); @@ -729,7 +712,7 @@ private: state.ActorId = {}; CA_LOG_D("StartTableScan: '" << ScanData->TablePath << "', shardId: " << state.TabletId << ", gen: " << state.Generation - << ", ranges: " << DebugPrintRanges(KeyColumnTypes, GetScanRanges(state))); + << ", ranges: " << DebugPrintRanges(KeyColumnTypes, state.GetScanRanges(KeyColumnTypes, LastKey), *AppData()->TypeRegistry)); SendStartScanRequest(state, state.Generation); } @@ -754,7 +737,7 @@ private: } ev->Record.MutableSkipNullKeys()->CopyFrom(Meta.GetSkipNullKeys()); - auto ranges = GetScanRanges(state); + auto ranges = state.GetScanRanges(KeyColumnTypes, LastKey); auto protoRanges = ev->Record.MutableRanges(); protoRanges->Reserve(ranges.size()); @@ -794,7 +777,7 @@ private: CA_LOG_D("Send EvKqpScan to shardId: " << state.TabletId << ", tablePath: " << ScanData->TablePath << ", gen: " << gen << ", subscribe: " << (!subscribed) - << ", range: " << DebugPrintRanges(KeyColumnTypes, GetScanRanges(state))); + << ", range: " << DebugPrintRanges(KeyColumnTypes, ranges, *AppData()->TypeRegistry)); Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(ev.Release(), state.TabletId, !subscribed), IEventHandle::FlagTrackDelivery); @@ -875,47 +858,6 @@ private: CA_LOG_D("Launch rate limiter actor: " << rlActor); } - const TSmallVec<TSerializedTableRange> GetScanRanges(const TShardState& state) const { - // No any data read previously, return all ranges - if (!LastKey.DataSize()) { - return state.Ranges; - } - - // Form new vector. Skip ranges already read. - TVector<TSerializedTableRange> ranges; - ranges.reserve(state.Ranges.size()); - - YQL_ENSURE(KeyColumnTypes.size() == LastKey.size(), "Key columns size != last key"); - - for (auto rangeIt = state.Ranges.begin(); rangeIt != state.Ranges.end(); ++rangeIt) { - int cmp = ComparePointAndRange(LastKey, rangeIt->ToTableRange(), KeyColumnTypes, KeyColumnTypes); - - YQL_ENSURE(cmp >= 0, "Missed intersection of LastKey and range."); - - if (cmp > 0) { - continue; - } - - // It is range, where read was interrupted. Restart operation from last read key. - ranges.emplace_back(std::move(TSerializedTableRange( - TSerializedCellVec::Serialize(LastKey), rangeIt->To.GetBuffer(), false, rangeIt->ToInclusive - ))); - - // And push all others - ranges.insert(ranges.end(), ++rangeIt, state.Ranges.end()); - break; - } - - return ranges; - } - - TString PrintLastKey() const { - if (LastKey.empty()) { - return "<none>"; - } - return DebugPrintPoint(KeyColumnTypes, LastKey, *AppData()->TypeRegistry); - } - void TerminateExpiredScan(const TActorId& actorId, TStringBuf msg) { CA_LOG_W(msg); @@ -1038,102 +980,25 @@ private: TBase::PassAway(); } + TString PrintLastKey() const { + if (LastKey.empty()) { + return "<none>"; + } + return DebugPrintPoint(KeyColumnTypes, LastKey, *AppData()->TypeRegistry); + } + private: NMiniKQL::TKqpScanComputeContext ComputeCtx; NKikimrKqp::TKqpSnapshot Snapshot; TIntrusivePtr<TKqpCounters> Counters; NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta Meta; TVector<NScheme::TTypeId> KeyColumnTypes; - NMiniKQL::TKqpScanComputeContext::TScanData* ScanData = nullptr; - TOwnedCellVec LastKey; - TDeque<std::pair<TEvKqpCompute::TEvScanData::TPtr, TInstant>> PendingScanData; - - enum class EShardState { - Initial, - Starting, - Running, - PostRunning, //We already recieve all data, we has not processed it yet. - Resolving, - }; - - static std::string_view ToString(EShardState state) { - switch (state) { - case EShardState::Initial: return "Initial"sv; - case EShardState::Starting: return "Starting"sv; - case EShardState::Running: return "Running"sv; - case EShardState::Resolving: return "Resolving"sv; - case EShardState::PostRunning: return "PostRunning"sv; - } - } - - // scan over datashards - struct TShardState { - ui64 TabletId; - TSmallVec<TSerializedTableRange> Ranges; - - EShardState State = EShardState::Initial; - ui32 Generation = 0; - bool SubscribedOnTablet = false; - - ui32 RetryAttempt = 0; - ui32 TotalRetries = 0; - bool AllowInstantRetry = true; - TDuration LastRetryDelay; - TActorId RetryTimer; - - ui32 ResolveAttempt = 0; - - TActorId ActorId; - - explicit TShardState(ui64 tabletId) - : TabletId(tabletId) {} - - TDuration CalcRetryDelay() { - if (std::exchange(AllowInstantRetry, false)) { - return TDuration::Zero(); - } - - if (LastRetryDelay) { - LastRetryDelay = Min(LastRetryDelay * 2, MAX_RETRY_DELAY); - } else { - LastRetryDelay = MIN_RETRY_DELAY; - } - return LastRetryDelay; - } - - void ResetRetry() { - RetryAttempt = 0; - AllowInstantRetry = true; - LastRetryDelay = {}; - if (RetryTimer) { - TlsActivationContext->Send(new IEventHandle(RetryTimer, RetryTimer, new TEvents::TEvPoison)); - RetryTimer = {}; - } - ResolveAttempt = 0; - } - - TString ToString(TConstArrayRef<NScheme::TTypeId> keyTypes) const { - TStringBuilder sb; - sb << "TShardState{ TabletId: " << TabletId << ", State: " << TKqpScanComputeActor::ToString(State) - << ", Gen: " << Generation << ", Ranges: ["; - for (size_t i = 0; i < Ranges.size(); ++i) { - sb << "#" << i << ": " << DebugPrintRange(keyTypes, Ranges[i].ToTableRange(), *AppData()->TypeRegistry); - if (i + 1 != Ranges.size()) { - sb << ", "; - } - } - sb << "], " - << ", RetryAttempt: " << RetryAttempt << ", TotalRetries: " << TotalRetries - << ", ResolveAttempt: " << ResolveAttempt << ", ActorId: " << ActorId << " }"; - return sb; - } - }; - TDeque<TShardState> Shards; // always work with head - + std::deque<std::pair<TEvKqpCompute::TEvScanData::TPtr, TInstant>> PendingScanData; + std::deque<TShardState> Shards; ui32 LastGeneration = 0; - TSet<ui64> AffectedShards; + std::set<ui64> AffectedShards; THashSet<ui32> TrackingNodes; }; diff --git a/ydb/core/kqp/kqp_compute_actor_helpers.cpp b/ydb/core/kqp/kqp_compute_actor_helpers.cpp new file mode 100644 index 00000000000..e69de29bb2d --- /dev/null +++ b/ydb/core/kqp/kqp_compute_actor_helpers.cpp diff --git a/ydb/core/tx/datashard/range_ops.cpp b/ydb/core/tx/datashard/range_ops.cpp index 64abea2b604..8566740c387 100644 --- a/ydb/core/tx/datashard/range_ops.cpp +++ b/ydb/core/tx/datashard/range_ops.cpp @@ -211,6 +211,18 @@ TString NKikimr::DebugPrintRange(TConstArrayRef<NScheme::TTypeId> types, const N << (range.InclusiveTo ? "]" : ")"); } +TString NKikimr::DebugPrintRanges(TConstArrayRef<NScheme::TTypeId> types, + const TSmallVec<TSerializedTableRange>& ranges, const NScheme::TTypeRegistry& typeRegistry) +{ + auto out = TStringBuilder(); + for (auto& range: ranges) { + out << DebugPrintRange(types, range.ToTableRange(), typeRegistry); + out << " "; + } + + return out; +} + TString NKikimr::DebugPrintPoint(TConstArrayRef<NScheme::TTypeId> types, const TConstArrayRef<TCell> &point, const NScheme::TTypeRegistry& typeRegistry) { Y_VERIFY(types.size() >= point.size()); TDbTupleRef pointRef(types.data(), point.data(), point.size()); diff --git a/ydb/core/tx/datashard/range_ops.h b/ydb/core/tx/datashard/range_ops.h index 0ca0a6eeb8b..a7098dd893c 100644 --- a/ydb/core/tx/datashard/range_ops.h +++ b/ydb/core/tx/datashard/range_ops.h @@ -2,11 +2,14 @@ #include <ydb/core/scheme/scheme_tabledefs.h> +#include <library/cpp/containers/stack_vector/stack_vec.h> + namespace NKikimr { TTableRange Intersect(TConstArrayRef<NScheme::TTypeId> types, const TTableRange& first, const TTableRange& second); TString DebugPrintRange(TConstArrayRef<NScheme::TTypeId> types, const TTableRange& range, const NScheme::TTypeRegistry& typeRegistry); +TString DebugPrintRanges(TConstArrayRef<NScheme::TTypeId> types, const TSmallVec<TSerializedTableRange>& ranges, const NScheme::TTypeRegistry& typeRegistry); TString DebugPrintPoint(TConstArrayRef<NScheme::TTypeId> types, const TConstArrayRef<TCell>& point, const NScheme::TTypeRegistry& typeRegistry); TString DebugPrintPartitionInfo(const TKeyDesc::TPartitionInfo& partition, const TVector<NScheme::TTypeId>& keyTypes, |