aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVitalii Gridnev <gridnevvvit@gmail.com>2022-06-04 15:30:48 +0300
committerVitalii Gridnev <gridnevvvit@gmail.com>2022-06-04 15:30:48 +0300
commitc1b198927ea6195d3cebd2a31c0042e9b45d1094 (patch)
treef14f782cbdfcf0f45f28daa1e89c47c604c8f6e0
parent521a57de8da8fdde59d36e2149f27506d4c1b8db (diff)
downloadydb-c1b198927ea6195d3cebd2a31c0042e9b45d1094.tar.gz
[kqp] refactor scan state KIKIMR-15042
ref:2bfd3a255ca3a3abcb7c291cc762b5a3ba1cd899
-rw-r--r--ydb/core/kqp/compute_actor/CMakeLists.txt1
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_actor.h42
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_actor_helpers.cpp94
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp167
-rw-r--r--ydb/core/kqp/kqp_compute_actor_helpers.cpp0
-rw-r--r--ydb/core/tx/datashard/range_ops.cpp12
-rw-r--r--ydb/core/tx/datashard/range_ops.h3
7 files changed, 168 insertions, 151 deletions
diff --git a/ydb/core/kqp/compute_actor/CMakeLists.txt b/ydb/core/kqp/compute_actor/CMakeLists.txt
index 364e6e8ca8f..f32b1cf41de 100644
--- a/ydb/core/kqp/compute_actor/CMakeLists.txt
+++ b/ydb/core/kqp/compute_actor/CMakeLists.txt
@@ -24,6 +24,7 @@ target_link_libraries(core-kqp-compute_actor PUBLIC
)
target_sources(core-kqp-compute_actor PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/compute_actor/kqp_compute_actor_helpers.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
)
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.h b/ydb/core/kqp/compute_actor/kqp_compute_actor.h
index 8b36af9efd5..399dbcbad37 100644
--- a/ydb/core/kqp/compute_actor/kqp_compute_actor.h
+++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.h
@@ -3,6 +3,9 @@
#include <ydb/core/kqp/counters/kqp_counters.h>
#include <ydb/core/kqp/kqp_compute.h>
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
+#include <ydb/core/scheme/scheme_tabledefs.h>
+#include <ydb/core/base/appdata.h>
+#include <ydb/core/tx/datashard/range_ops.h>
namespace NKikimr {
namespace NMiniKQL {
@@ -26,6 +29,45 @@ IActor* CreateKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, cons
const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits,
TIntrusivePtr<TKqpCounters> counters);
+namespace NComputeActor {
+
+enum class EShardState : ui32 {
+ Initial,
+ Starting,
+ Running,
+ PostRunning, //We already recieve all data, we has not processed it yet.
+ Resolving,
+};
+
+std::string_view EShardStateToString(EShardState state);
+
+// scan over datashards
+struct TShardState {
+ ui64 TabletId;
+ TSmallVec<TSerializedTableRange> Ranges;
+ EShardState State = EShardState::Initial;
+ ui32 Generation = 0;
+ bool SubscribedOnTablet = false;
+ ui32 RetryAttempt = 0;
+ ui32 TotalRetries = 0;
+ bool AllowInstantRetry = true;
+ TDuration LastRetryDelay;
+ TActorId RetryTimer;
+ ui32 ResolveAttempt = 0;
+ TActorId ActorId;
+
+ explicit TShardState(ui64 tabletId)
+ : TabletId(tabletId) {}
+
+ TDuration CalcRetryDelay();
+ void ResetRetry();
+
+ TString ToString(TConstArrayRef<NScheme::TTypeId> keyTypes) const;
+ const TSmallVec<TSerializedTableRange> GetScanRanges(
+ TConstArrayRef<NScheme::TTypeId> keyTypes, const TOwnedCellVec& lastKey) const;
+};
+
+} // namespace NComputeActor
} // namespace NKqp
} // namespace NKikimr
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor_helpers.cpp b/ydb/core/kqp/compute_actor/kqp_compute_actor_helpers.cpp
new file mode 100644
index 00000000000..ac5f402b475
--- /dev/null
+++ b/ydb/core/kqp/compute_actor/kqp_compute_actor_helpers.cpp
@@ -0,0 +1,94 @@
+#include "kqp_compute_actor.h"
+
+#include <ydb/core/base/appdata.h>
+
+namespace NKikimr::NKqp::NComputeActor {
+
+static constexpr TDuration MIN_RETRY_DELAY = TDuration::MilliSeconds(250);
+static constexpr TDuration MAX_RETRY_DELAY = TDuration::Seconds(2);
+
+std::string_view EShardStateToString(EShardState state) {
+ switch (state) {
+ case EShardState::Initial: return "Initial"sv;
+ case EShardState::Starting: return "Starting"sv;
+ case EShardState::Running: return "Running"sv;
+ case EShardState::Resolving: return "Resolving"sv;
+ case EShardState::PostRunning: return "PostRunning"sv;
+ }
+}
+
+void TShardState::ResetRetry() {
+ RetryAttempt = 0;
+ AllowInstantRetry = true;
+ LastRetryDelay = {};
+ if (RetryTimer) {
+ TlsActivationContext->Send(new IEventHandle(RetryTimer, RetryTimer, new TEvents::TEvPoison));
+ RetryTimer = {};
+ }
+ ResolveAttempt = 0;
+}
+
+TDuration TShardState::CalcRetryDelay() {
+ if (std::exchange(AllowInstantRetry, false)) {
+ return TDuration::Zero();
+ }
+
+ if (LastRetryDelay) {
+ LastRetryDelay = Min(LastRetryDelay * 2, MAX_RETRY_DELAY);
+ } else {
+ LastRetryDelay = MIN_RETRY_DELAY;
+ }
+ return LastRetryDelay;
+}
+
+TString TShardState::ToString(TConstArrayRef<NScheme::TTypeId> keyTypes) const {
+ TStringBuilder sb;
+ sb << "TShardState{ TabletId: " << TabletId << ", State: " << EShardStateToString(State)
+ << ", Gen: " << Generation << ", Ranges: [";
+ for (size_t i = 0; i < Ranges.size(); ++i) {
+ sb << "#" << i << ": " << DebugPrintRange(keyTypes, Ranges[i].ToTableRange(), *AppData()->TypeRegistry);
+ if (i + 1 != Ranges.size()) {
+ sb << ", ";
+ }
+ }
+ sb << "], "
+ << ", RetryAttempt: " << RetryAttempt << ", TotalRetries: " << TotalRetries
+ << ", ResolveAttempt: " << ResolveAttempt << ", ActorId: " << ActorId << " }";
+ return sb;
+}
+
+const TSmallVec<TSerializedTableRange> TShardState::GetScanRanges(TConstArrayRef<NScheme::TTypeId> keyTypes, const TOwnedCellVec& lastKey) const {
+ // No any data read previously, return all ranges
+ if (!lastKey.DataSize()) {
+ return Ranges;
+ }
+
+ // Form new vector. Skip ranges already read.
+ TVector<TSerializedTableRange> ranges;
+ ranges.reserve(Ranges.size());
+
+ YQL_ENSURE(keyTypes.size() == lastKey.size(), "Key columns size != last key");
+
+ for (auto rangeIt = Ranges.begin(); rangeIt != Ranges.end(); ++rangeIt) {
+ int cmp = ComparePointAndRange(lastKey, rangeIt->ToTableRange(), keyTypes, keyTypes);
+
+ YQL_ENSURE(cmp >= 0, "Missed intersection of LastKey and range.");
+
+ if (cmp > 0) {
+ continue;
+ }
+
+ // It is range, where read was interrupted. Restart operation from last read key.
+ ranges.emplace_back(std::move(TSerializedTableRange(
+ TSerializedCellVec::Serialize(lastKey), rangeIt->To.GetBuffer(), false, rangeIt->ToInclusive
+ )));
+
+ // And push all others
+ ranges.insert(ranges.end(), ++rangeIt, Ranges.end());
+ break;
+ }
+
+ return ranges;
+}
+
+}
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
index 48dfeb1dcd3..9249fbd2dd3 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
@@ -27,28 +27,13 @@ namespace {
using namespace NYql;
using namespace NYql::NDq;
+using namespace NKikimr::NKqp::NComputeActor;
bool IsDebugLogEnabled(const TActorSystem* actorSystem, NActors::NLog::EComponent component) {
auto* settings = actorSystem->LoggerSettings();
return settings && settings->Satisfies(NActors::NLog::EPriority::PRI_DEBUG, component);
}
-TString DebugPrintRanges(TConstArrayRef<NScheme::TTypeId> types,
- const TSmallVec<TSerializedTableRange>& ranges)
-{
- auto typeRegistry = AppData()->TypeRegistry;
- auto out = TStringBuilder();
-
- for (auto& range: ranges) {
- out << DebugPrintRange(types, range.ToTableRange(), *typeRegistry);
- out << " ";
- }
-
- return out;
-}
-
-static constexpr TDuration MIN_RETRY_DELAY = TDuration::MilliSeconds(250);
-static constexpr TDuration MAX_RETRY_DELAY = TDuration::Seconds(2);
static constexpr ui64 MAX_SHARD_RETRIES = 5; // retry after: 0, 250, 500, 1000, 2000
static constexpr ui64 MAX_TOTAL_SHARD_RETRIES = 20;
static constexpr ui64 MAX_SHARD_RESOLVES = 3;
@@ -254,7 +239,7 @@ private:
auto scanActorId = ActorIdFromProto(msg.GetScanActorId());
CA_LOG_D("Got EvScanInitActor from " << scanActorId << ", gen: " << msg.GetGeneration()
- << ", state: " << ToString(state.State) << ", stateGen: " << state.Generation);
+ << ", state: " << EShardStateToString(state.State) << ", stateGen: " << state.Generation);
switch (state.State) {
case EShardState::Starting: {
@@ -334,7 +319,7 @@ private:
void ProcessScanData() {
Y_VERIFY_DEBUG(ScanData);
Y_VERIFY_DEBUG(!Shards.empty());
- Y_VERIFY(PendingScanData);
+ Y_VERIFY(!PendingScanData.empty());
auto& ev = PendingScanData.front().first;
@@ -716,8 +701,6 @@ private:
}
private:
- struct TShardState;
-
void StartTableScan() {
YQL_ENSURE(!Shards.empty());
@@ -729,7 +712,7 @@ private:
state.ActorId = {};
CA_LOG_D("StartTableScan: '" << ScanData->TablePath << "', shardId: " << state.TabletId << ", gen: " << state.Generation
- << ", ranges: " << DebugPrintRanges(KeyColumnTypes, GetScanRanges(state)));
+ << ", ranges: " << DebugPrintRanges(KeyColumnTypes, state.GetScanRanges(KeyColumnTypes, LastKey), *AppData()->TypeRegistry));
SendStartScanRequest(state, state.Generation);
}
@@ -754,7 +737,7 @@ private:
}
ev->Record.MutableSkipNullKeys()->CopyFrom(Meta.GetSkipNullKeys());
- auto ranges = GetScanRanges(state);
+ auto ranges = state.GetScanRanges(KeyColumnTypes, LastKey);
auto protoRanges = ev->Record.MutableRanges();
protoRanges->Reserve(ranges.size());
@@ -794,7 +777,7 @@ private:
CA_LOG_D("Send EvKqpScan to shardId: " << state.TabletId << ", tablePath: " << ScanData->TablePath
<< ", gen: " << gen << ", subscribe: " << (!subscribed)
- << ", range: " << DebugPrintRanges(KeyColumnTypes, GetScanRanges(state)));
+ << ", range: " << DebugPrintRanges(KeyColumnTypes, ranges, *AppData()->TypeRegistry));
Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(ev.Release(), state.TabletId, !subscribed),
IEventHandle::FlagTrackDelivery);
@@ -875,47 +858,6 @@ private:
CA_LOG_D("Launch rate limiter actor: " << rlActor);
}
- const TSmallVec<TSerializedTableRange> GetScanRanges(const TShardState& state) const {
- // No any data read previously, return all ranges
- if (!LastKey.DataSize()) {
- return state.Ranges;
- }
-
- // Form new vector. Skip ranges already read.
- TVector<TSerializedTableRange> ranges;
- ranges.reserve(state.Ranges.size());
-
- YQL_ENSURE(KeyColumnTypes.size() == LastKey.size(), "Key columns size != last key");
-
- for (auto rangeIt = state.Ranges.begin(); rangeIt != state.Ranges.end(); ++rangeIt) {
- int cmp = ComparePointAndRange(LastKey, rangeIt->ToTableRange(), KeyColumnTypes, KeyColumnTypes);
-
- YQL_ENSURE(cmp >= 0, "Missed intersection of LastKey and range.");
-
- if (cmp > 0) {
- continue;
- }
-
- // It is range, where read was interrupted. Restart operation from last read key.
- ranges.emplace_back(std::move(TSerializedTableRange(
- TSerializedCellVec::Serialize(LastKey), rangeIt->To.GetBuffer(), false, rangeIt->ToInclusive
- )));
-
- // And push all others
- ranges.insert(ranges.end(), ++rangeIt, state.Ranges.end());
- break;
- }
-
- return ranges;
- }
-
- TString PrintLastKey() const {
- if (LastKey.empty()) {
- return "<none>";
- }
- return DebugPrintPoint(KeyColumnTypes, LastKey, *AppData()->TypeRegistry);
- }
-
void TerminateExpiredScan(const TActorId& actorId, TStringBuf msg) {
CA_LOG_W(msg);
@@ -1038,102 +980,25 @@ private:
TBase::PassAway();
}
+ TString PrintLastKey() const {
+ if (LastKey.empty()) {
+ return "<none>";
+ }
+ return DebugPrintPoint(KeyColumnTypes, LastKey, *AppData()->TypeRegistry);
+ }
+
private:
NMiniKQL::TKqpScanComputeContext ComputeCtx;
NKikimrKqp::TKqpSnapshot Snapshot;
TIntrusivePtr<TKqpCounters> Counters;
NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta Meta;
TVector<NScheme::TTypeId> KeyColumnTypes;
-
NMiniKQL::TKqpScanComputeContext::TScanData* ScanData = nullptr;
-
TOwnedCellVec LastKey;
- TDeque<std::pair<TEvKqpCompute::TEvScanData::TPtr, TInstant>> PendingScanData;
-
- enum class EShardState {
- Initial,
- Starting,
- Running,
- PostRunning, //We already recieve all data, we has not processed it yet.
- Resolving,
- };
-
- static std::string_view ToString(EShardState state) {
- switch (state) {
- case EShardState::Initial: return "Initial"sv;
- case EShardState::Starting: return "Starting"sv;
- case EShardState::Running: return "Running"sv;
- case EShardState::Resolving: return "Resolving"sv;
- case EShardState::PostRunning: return "PostRunning"sv;
- }
- }
-
- // scan over datashards
- struct TShardState {
- ui64 TabletId;
- TSmallVec<TSerializedTableRange> Ranges;
-
- EShardState State = EShardState::Initial;
- ui32 Generation = 0;
- bool SubscribedOnTablet = false;
-
- ui32 RetryAttempt = 0;
- ui32 TotalRetries = 0;
- bool AllowInstantRetry = true;
- TDuration LastRetryDelay;
- TActorId RetryTimer;
-
- ui32 ResolveAttempt = 0;
-
- TActorId ActorId;
-
- explicit TShardState(ui64 tabletId)
- : TabletId(tabletId) {}
-
- TDuration CalcRetryDelay() {
- if (std::exchange(AllowInstantRetry, false)) {
- return TDuration::Zero();
- }
-
- if (LastRetryDelay) {
- LastRetryDelay = Min(LastRetryDelay * 2, MAX_RETRY_DELAY);
- } else {
- LastRetryDelay = MIN_RETRY_DELAY;
- }
- return LastRetryDelay;
- }
-
- void ResetRetry() {
- RetryAttempt = 0;
- AllowInstantRetry = true;
- LastRetryDelay = {};
- if (RetryTimer) {
- TlsActivationContext->Send(new IEventHandle(RetryTimer, RetryTimer, new TEvents::TEvPoison));
- RetryTimer = {};
- }
- ResolveAttempt = 0;
- }
-
- TString ToString(TConstArrayRef<NScheme::TTypeId> keyTypes) const {
- TStringBuilder sb;
- sb << "TShardState{ TabletId: " << TabletId << ", State: " << TKqpScanComputeActor::ToString(State)
- << ", Gen: " << Generation << ", Ranges: [";
- for (size_t i = 0; i < Ranges.size(); ++i) {
- sb << "#" << i << ": " << DebugPrintRange(keyTypes, Ranges[i].ToTableRange(), *AppData()->TypeRegistry);
- if (i + 1 != Ranges.size()) {
- sb << ", ";
- }
- }
- sb << "], "
- << ", RetryAttempt: " << RetryAttempt << ", TotalRetries: " << TotalRetries
- << ", ResolveAttempt: " << ResolveAttempt << ", ActorId: " << ActorId << " }";
- return sb;
- }
- };
- TDeque<TShardState> Shards; // always work with head
-
+ std::deque<std::pair<TEvKqpCompute::TEvScanData::TPtr, TInstant>> PendingScanData;
+ std::deque<TShardState> Shards;
ui32 LastGeneration = 0;
- TSet<ui64> AffectedShards;
+ std::set<ui64> AffectedShards;
THashSet<ui32> TrackingNodes;
};
diff --git a/ydb/core/kqp/kqp_compute_actor_helpers.cpp b/ydb/core/kqp/kqp_compute_actor_helpers.cpp
new file mode 100644
index 00000000000..e69de29bb2d
--- /dev/null
+++ b/ydb/core/kqp/kqp_compute_actor_helpers.cpp
diff --git a/ydb/core/tx/datashard/range_ops.cpp b/ydb/core/tx/datashard/range_ops.cpp
index 64abea2b604..8566740c387 100644
--- a/ydb/core/tx/datashard/range_ops.cpp
+++ b/ydb/core/tx/datashard/range_ops.cpp
@@ -211,6 +211,18 @@ TString NKikimr::DebugPrintRange(TConstArrayRef<NScheme::TTypeId> types, const N
<< (range.InclusiveTo ? "]" : ")");
}
+TString NKikimr::DebugPrintRanges(TConstArrayRef<NScheme::TTypeId> types,
+ const TSmallVec<TSerializedTableRange>& ranges, const NScheme::TTypeRegistry& typeRegistry)
+{
+ auto out = TStringBuilder();
+ for (auto& range: ranges) {
+ out << DebugPrintRange(types, range.ToTableRange(), typeRegistry);
+ out << " ";
+ }
+
+ return out;
+}
+
TString NKikimr::DebugPrintPoint(TConstArrayRef<NScheme::TTypeId> types, const TConstArrayRef<TCell> &point, const NScheme::TTypeRegistry& typeRegistry) {
Y_VERIFY(types.size() >= point.size());
TDbTupleRef pointRef(types.data(), point.data(), point.size());
diff --git a/ydb/core/tx/datashard/range_ops.h b/ydb/core/tx/datashard/range_ops.h
index 0ca0a6eeb8b..a7098dd893c 100644
--- a/ydb/core/tx/datashard/range_ops.h
+++ b/ydb/core/tx/datashard/range_ops.h
@@ -2,11 +2,14 @@
#include <ydb/core/scheme/scheme_tabledefs.h>
+#include <library/cpp/containers/stack_vector/stack_vec.h>
+
namespace NKikimr {
TTableRange Intersect(TConstArrayRef<NScheme::TTypeId> types, const TTableRange& first, const TTableRange& second);
TString DebugPrintRange(TConstArrayRef<NScheme::TTypeId> types, const TTableRange& range, const NScheme::TTypeRegistry& typeRegistry);
+TString DebugPrintRanges(TConstArrayRef<NScheme::TTypeId> types, const TSmallVec<TSerializedTableRange>& ranges, const NScheme::TTypeRegistry& typeRegistry);
TString DebugPrintPoint(TConstArrayRef<NScheme::TTypeId> types, const TConstArrayRef<TCell>& point, const NScheme::TTypeRegistry& typeRegistry);
TString DebugPrintPartitionInfo(const TKeyDesc::TPartitionInfo& partition, const TVector<NScheme::TTypeId>& keyTypes,