aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <ivanmorozov@yandex-team.com>2022-09-18 11:16:38 +0300
committerivanmorozov <ivanmorozov@yandex-team.com>2022-09-18 11:16:38 +0300
commit9f7e9f2f762bf8fb187d2066336faaf0629ae3e7 (patch)
tree66b252dcb8751bef16793ace17113c5427822ac7
parentde375b7e8ef31995ad79398a379c02d528ff6061 (diff)
downloadydb-9f7e9f2f762bf8fb187d2066336faaf0629ae3e7.tar.gz
cost data request/response for column shard
-rw-r--r--ydb/core/kqp/compute_actor/CMakeLists.txt9
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_actor.cpp43
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_actor.h57
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_actor_helpers.cpp96
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_state.cpp100
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_state.h80
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp687
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp89
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h79
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_stat.cpp111
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_stat.h63
-rw-r--r--ydb/core/kqp/ut/kqp_olap_ut.cpp108
-rw-r--r--ydb/core/protos/config.proto2
13 files changed, 1105 insertions, 419 deletions
diff --git a/ydb/core/kqp/compute_actor/CMakeLists.txt b/ydb/core/kqp/compute_actor/CMakeLists.txt
index f32b1cf41d..1a2c969e6d 100644
--- a/ydb/core/kqp/compute_actor/CMakeLists.txt
+++ b/ydb/core/kqp/compute_actor/CMakeLists.txt
@@ -21,10 +21,19 @@ target_link_libraries(core-kqp-compute_actor PUBLIC
core-tx-scheme_cache
dq-actors-compute
yql-public-issue
+ tools-enum_parser-enum_serialization_runtime
)
target_sources(core-kqp-compute_actor PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/compute_actor/kqp_compute_actor_helpers.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/compute_actor/kqp_compute_state.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/compute_actor/kqp_pure_compute_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/compute_actor/kqp_scan_compute_stat.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
)
+generate_enum_serilization(core-kqp-compute_actor
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/compute_actor/kqp_compute_state.h
+ INCLUDE_HEADERS
+ ydb/core/kqp/compute_actor/kqp_compute_state.h
+)
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp
index 780f865e9e..c75b507035 100644
--- a/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.cpp
@@ -40,7 +40,48 @@ TComputationNodeFactory GetKqpActorComputeFactory(TKqpScanComputeContext* comput
return nullptr;
};
}
-
} // namespace NMiniKQL
+
+namespace NKqp {
+
+void TShardsScanningPolicy::FillRequestScanFeatures(const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta,
+ ui32& maxInFlight, bool& isAggregationRequest) const {
+ const bool isSorted = (meta.HasSorted() ? meta.GetSorted() : true);
+
+ isAggregationRequest = false;
+ maxInFlight = 1;
+
+ NKikimrSSA::TProgram program;
+ bool hasNoGroupBy = false;
+ bool hasGroupByWithFields = false;
+ bool hasGroupByWithNoFields = false;
+ if (meta.HasOlapProgram()) {
+ Y_VERIFY(program.ParseFromString(meta.GetOlapProgram().GetProgram()));
+ for (auto&& command : program.GetCommand()) {
+ if (!command.HasGroupBy()) {
+ hasNoGroupBy = true;
+ continue;
+ }
+ if (command.GetGroupBy().GetKeyColumns().size()) {
+ hasGroupByWithFields = true;
+ } else {
+ hasGroupByWithNoFields = true;
+ }
+ }
+ } else {
+ hasNoGroupBy = true;
+ }
+ isAggregationRequest = hasGroupByWithFields || hasGroupByWithNoFields;
+ if (isSorted) {
+ maxInFlight = 1;
+ } else if (hasGroupByWithFields) {
+ maxInFlight = ProtoConfig.GetAggregationGroupByLimit();
+ } else if (hasGroupByWithNoFields) {
+ maxInFlight = ProtoConfig.GetAggregationNoGroupLimit();
+ } else {
+ maxInFlight = ProtoConfig.GetScanLimit();
+ }
+}
+}
} // namespace NKikimr
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.h b/ydb/core/kqp/compute_actor/kqp_compute_actor.h
index cec92b2440..e031f74f3a 100644
--- a/ydb/core/kqp/compute_actor/kqp_compute_actor.h
+++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.h
@@ -1,11 +1,11 @@
#pragma once
-
#include <ydb/core/kqp/counters/kqp_counters.h>
#include <ydb/core/kqp/kqp_compute.h>
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
#include <ydb/core/scheme/scheme_tabledefs.h>
namespace NKikimr {
+
namespace NMiniKQL {
class TKqpScanComputeContext;
@@ -26,7 +26,17 @@ public:
}
- ui32 GetMaxInFlightScans(const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta) const;
+ bool IsParallelScanningAvailable() const {
+ return ProtoConfig.GetParallelScanningAvailable();
+ }
+
+ bool GetShardSplitFactor() const {
+ return ProtoConfig.GetShardSplitFactor();
+ }
+
+ void FillRequestScanFeatures(const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta,
+ ui32& maxInFlight, bool& isAggregationRequest) const;
+
};
IActor* CreateKqpComputeActor(const TActorId& executerId, ui64 txId, NYql::NDqProto::TDqTask&& task,
@@ -41,48 +51,5 @@ IActor* CreateKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, cons
const NYql::NDq::TComputeRuntimeSettings& settings, const NYql::NDq::TComputeMemoryLimits& memoryLimits,
const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId = {});
-namespace NComputeActor {
-
-enum class EShardState : ui32 {
- Initial,
- Starting,
- Running,
- PostRunning, //We already recieve all data, we has not processed it yet.
- Resolving,
-};
-
-std::string_view EShardStateToString(EShardState state);
-
-bool FindSchemeErrorInIssues(const Ydb::StatusIds::StatusCode& status, const NYql::TIssues& issues);
-
-struct TShardState {
- ui64 TabletId;
- TSmallVec<TSerializedTableRange> Ranges;
- EShardState State = EShardState::Initial;
- ui32 Generation = 0;
- bool SubscribedOnTablet = false;
- ui32 RetryAttempt = 0;
- ui32 TotalRetries = 0;
- bool AllowInstantRetry = true;
- TDuration LastRetryDelay;
- TActorId RetryTimer;
- ui32 ResolveAttempt = 0;
- TActorId ActorId;
- TOwnedCellVec LastKey;
-
- TString PrintLastKey(TConstArrayRef<NScheme::TTypeId> keyTypes) const;
-
- explicit TShardState(ui64 tabletId)
- : TabletId(tabletId) {}
-
- TDuration CalcRetryDelay();
- void ResetRetry();
-
- TString ToString(TConstArrayRef<NScheme::TTypeId> keyTypes) const;
- const TSmallVec<TSerializedTableRange> GetScanRanges(TConstArrayRef<NScheme::TTypeId> keyTypes) const;
-};
-
-} // namespace NComputeActor
-
} // namespace NKqp
} // namespace NKikimr
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor_helpers.cpp b/ydb/core/kqp/compute_actor/kqp_compute_actor_helpers.cpp
index 5eee256a53..51e854da17 100644
--- a/ydb/core/kqp/compute_actor/kqp_compute_actor_helpers.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_compute_actor_helpers.cpp
@@ -6,102 +6,6 @@
namespace NKikimr::NKqp::NComputeActor {
-static constexpr TDuration MIN_RETRY_DELAY = TDuration::MilliSeconds(250);
-static constexpr TDuration MAX_RETRY_DELAY = TDuration::Seconds(2);
-
-std::string_view EShardStateToString(EShardState state) {
- switch (state) {
- case EShardState::Initial: return "Initial"sv;
- case EShardState::Starting: return "Starting"sv;
- case EShardState::Running: return "Running"sv;
- case EShardState::Resolving: return "Resolving"sv;
- case EShardState::PostRunning: return "PostRunning"sv;
- }
-}
-
-void TShardState::ResetRetry() {
- RetryAttempt = 0;
- AllowInstantRetry = true;
- LastRetryDelay = {};
- if (RetryTimer) {
- TlsActivationContext->Send(new IEventHandle(RetryTimer, RetryTimer, new TEvents::TEvPoison));
- RetryTimer = {};
- }
- ResolveAttempt = 0;
-}
-
-TString TShardState::PrintLastKey(TConstArrayRef<NScheme::TTypeId> keyTypes) const {
- if (LastKey.empty()) {
- return "<none>";
- }
- return DebugPrintPoint(keyTypes, LastKey, *AppData()->TypeRegistry);
-}
-
-TDuration TShardState::CalcRetryDelay() {
- if (std::exchange(AllowInstantRetry, false)) {
- return TDuration::Zero();
- }
-
- if (LastRetryDelay) {
- LastRetryDelay = Min(LastRetryDelay * 2, MAX_RETRY_DELAY);
- } else {
- LastRetryDelay = MIN_RETRY_DELAY;
- }
- return LastRetryDelay;
-}
-
-TString TShardState::ToString(TConstArrayRef<NScheme::TTypeId> keyTypes) const {
- TStringBuilder sb;
- sb << "TShardState{ TabletId: " << TabletId << ", State: " << EShardStateToString(State)
- << ", Gen: " << Generation << ", Last Key " << TShardState::PrintLastKey(keyTypes)
- << ", Ranges: [";
- for (size_t i = 0; i < Ranges.size(); ++i) {
- sb << "#" << i << ": " << DebugPrintRange(keyTypes, Ranges[i].ToTableRange(), *AppData()->TypeRegistry);
- if (i + 1 != Ranges.size()) {
- sb << ", ";
- }
- }
- sb << "], "
- << ", RetryAttempt: " << RetryAttempt << ", TotalRetries: " << TotalRetries
- << ", ResolveAttempt: " << ResolveAttempt << ", ActorId: " << ActorId << " }";
- return sb;
-}
-
-const TSmallVec<TSerializedTableRange> TShardState::GetScanRanges(TConstArrayRef<NScheme::TTypeId> keyTypes) const {
- // No any data read previously, return all ranges
- if (!LastKey.DataSize()) {
- return Ranges;
- }
-
- // Form new vector. Skip ranges already read.
- TVector<TSerializedTableRange> ranges;
- ranges.reserve(Ranges.size());
-
- YQL_ENSURE(keyTypes.size() == LastKey.size(), "Key columns size != last key");
-
- for (auto rangeIt = Ranges.begin(); rangeIt != Ranges.end(); ++rangeIt) {
- int cmp = ComparePointAndRange(LastKey, rangeIt->ToTableRange(), keyTypes, keyTypes);
-
- YQL_ENSURE(cmp >= 0, "Missed intersection of LastKey and range.");
-
- if (cmp > 0) {
- continue;
- }
-
- // It is range, where read was interrupted. Restart operation from last read key.
- ranges.emplace_back(std::move(TSerializedTableRange(
- TSerializedCellVec::Serialize(LastKey), rangeIt->To.GetBuffer(), false, rangeIt->ToInclusive
- )));
-
- // And push all others
- ranges.insert(ranges.end(), ++rangeIt, Ranges.end());
- break;
- }
-
- return ranges;
-}
-
-
bool FindSchemeErrorInIssues(const Ydb::StatusIds::StatusCode& status, const NYql::TIssues& issues) {
bool schemeError = false;
if (status == Ydb::StatusIds::SCHEME_ERROR) {
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_state.cpp b/ydb/core/kqp/compute_actor/kqp_compute_state.cpp
new file mode 100644
index 0000000000..82d98eb152
--- /dev/null
+++ b/ydb/core/kqp/compute_actor/kqp_compute_state.cpp
@@ -0,0 +1,100 @@
+#include "kqp_compute_state.h"
+
+#include <ydb/core/base/appdata.h>
+#include <ydb/core/kqp/runtime/kqp_compute.h>
+#include <ydb/core/kqp/runtime/kqp_read_table.h>
+#include <ydb/core/tx/datashard/range_ops.h>
+
+namespace NKikimr::NKqp::NComputeActor {
+
+static constexpr TDuration MIN_RETRY_DELAY = TDuration::MilliSeconds(250);
+static constexpr TDuration MAX_RETRY_DELAY = TDuration::Seconds(2);
+
+void TCommonRetriesState::ResetRetry() {
+ RetryAttempt = 0;
+ AllowInstantRetry = true;
+ LastRetryDelay = {};
+ if (RetryTimer) {
+ TlsActivationContext->Send(new IEventHandle(RetryTimer, RetryTimer, new TEvents::TEvPoison));
+ RetryTimer = {};
+ }
+ ResolveAttempt = 0;
+}
+
+TString TShardState::PrintLastKey(TConstArrayRef<NScheme::TTypeId> keyTypes) const {
+ if (LastKey.empty()) {
+ return "<none>";
+ }
+ return DebugPrintPoint(keyTypes, LastKey, *AppData()->TypeRegistry);
+}
+
+TDuration TCommonRetriesState::CalcRetryDelay() {
+ if (std::exchange(AllowInstantRetry, false)) {
+ return TDuration::Zero();
+ }
+
+ if (LastRetryDelay) {
+ LastRetryDelay = Min(LastRetryDelay * 2, MAX_RETRY_DELAY);
+ } else {
+ LastRetryDelay = MIN_RETRY_DELAY;
+ }
+ return LastRetryDelay;
+}
+
+TString TShardState::ToString(TConstArrayRef<NScheme::TTypeId> keyTypes) const {
+ TStringBuilder sb;
+ sb << "TShardState{ TabletId: " << TabletId << ", State: " << State
+ << ", Gen: " << Generation << ", Last Key " << TShardState::PrintLastKey(keyTypes)
+ << ", Ranges: [";
+ for (size_t i = 0; i < Ranges.size(); ++i) {
+ sb << "#" << i << ": " << DebugPrintRange(keyTypes, Ranges[i].ToTableRange(), *AppData()->TypeRegistry);
+ if (i + 1 != Ranges.size()) {
+ sb << ", ";
+ }
+ }
+ sb << "], "
+ << ", RetryAttempt: " << RetryAttempt << ", TotalRetries: " << TotalRetries
+ << ", ResolveAttempt: " << ResolveAttempt << ", ActorId: " << ActorId << " }";
+ return sb;
+}
+
+const TSmallVec<TSerializedTableRange> TShardState::GetScanRanges(TConstArrayRef<NScheme::TTypeId> keyTypes) const {
+ // No any data read previously, return all ranges
+ if (!LastKey.DataSize()) {
+ return Ranges;
+ }
+
+ // Form new vector. Skip ranges already read.
+ TVector<TSerializedTableRange> ranges;
+ ranges.reserve(Ranges.size());
+
+ YQL_ENSURE(keyTypes.size() == LastKey.size(), "Key columns size != last key");
+
+ for (auto rangeIt = Ranges.begin(); rangeIt != Ranges.end(); ++rangeIt) {
+ int cmp = ComparePointAndRange(LastKey, rangeIt->ToTableRange(), keyTypes, keyTypes);
+
+ YQL_ENSURE(cmp >= 0, "Missed intersection of LastKey and range.");
+
+ if (cmp > 0) {
+ continue;
+ }
+
+ // It is range, where read was interrupted. Restart operation from last read key.
+ ranges.emplace_back(std::move(TSerializedTableRange(
+ TSerializedCellVec::Serialize(LastKey), rangeIt->To.GetBuffer(), false, rangeIt->ToInclusive
+ )));
+
+ // And push all others
+ ranges.insert(ranges.end(), ++rangeIt, Ranges.end());
+ break;
+ }
+
+ return ranges;
+}
+
+TString TShardState::GetAddress() const {
+ TStringBuilder sb;
+ sb << TabletId << "::" << ScannerIdx;
+ return sb;
+}
+} \ No newline at end of file
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_state.h b/ydb/core/kqp/compute_actor/kqp_compute_state.h
new file mode 100644
index 0000000000..3c87df2e37
--- /dev/null
+++ b/ydb/core/kqp/compute_actor/kqp_compute_state.h
@@ -0,0 +1,80 @@
+#pragma once
+
+#include <ydb/core/kqp/counters/kqp_counters.h>
+#include <ydb/core/kqp/kqp_compute.h>
+#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
+#include <ydb/core/scheme/scheme_tabledefs.h>
+
+namespace NKikimr::NKqp::NComputeActor {
+
+enum class EShardState: ui32 {
+ Initial,
+ Starting,
+ Running,
+ PostRunning, //We already receive all data, we has not processed it yet.
+ Resolving,
+};
+
+bool FindSchemeErrorInIssues(const Ydb::StatusIds::StatusCode& status, const NYql::TIssues& issues);
+
+class TCommonRetriesState {
+public:
+ ui32 RetryAttempt = 0;
+ ui32 TotalRetries = 0;
+ bool AllowInstantRetry = true;
+ TDuration LastRetryDelay;
+ TActorId RetryTimer;
+ ui32 ResolveAttempt = 0;
+ TDuration CalcRetryDelay();
+ void ResetRetry();
+};
+
+class TShardCostsState: public TCommonRetriesState {
+private:
+ const ui32 ScanId;
+ const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::TReadOpMeta* ReadData;
+public:
+ using TPtr = std::shared_ptr<TShardCostsState>;
+ const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::TReadOpMeta& GetReadData() const {
+ return *ReadData;
+ }
+
+ ui32 GetScanId() const {
+ return ScanId;
+ }
+
+ ui64 GetShardId() const {
+ return ReadData->GetShardId();
+ }
+
+ TShardCostsState(const ui32 scanId, const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::TReadOpMeta* readData)
+ : ScanId(scanId)
+ , ReadData(readData)
+ {
+
+ }
+};
+
+struct TShardState: public TCommonRetriesState {
+ using TPtr = std::shared_ptr<TShardState>;
+ const ui64 TabletId;
+ const ui32 ScannerIdx = 0;
+ TSmallVec<TSerializedTableRange> Ranges;
+ EShardState State = EShardState::Initial;
+ ui32 Generation = 0;
+ bool SubscribedOnTablet = false;
+ TActorId ActorId;
+ TOwnedCellVec LastKey;
+
+ TString PrintLastKey(TConstArrayRef<NScheme::TTypeId> keyTypes) const;
+
+ TShardState(const ui64 tabletId, const ui32 scanIdx)
+ : TabletId(tabletId)
+ , ScannerIdx(scanIdx) {
+ }
+
+ TString ToString(TConstArrayRef<NScheme::TTypeId> keyTypes) const;
+ const TSmallVec<TSerializedTableRange> GetScanRanges(TConstArrayRef<NScheme::TTypeId> keyTypes) const;
+ TString GetAddress() const;
+};
+}
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
index c1e3587691..69c1ada9b9 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
@@ -1,5 +1,7 @@
#include "kqp_compute_actor.h"
#include "kqp_compute_actor_impl.h"
+#include "kqp_compute_state.h"
+#include "kqp_scan_compute_manager.h"
#include <ydb/core/actorlib_impl/long_timer.h>
#include <ydb/core/base/tablet_pipecache.h>
@@ -20,8 +22,7 @@
#include <util/generic/deque.h>
-namespace NKikimr {
-namespace NKqp {
+namespace NKikimr::NKqp {
namespace {
@@ -39,18 +40,16 @@ static constexpr ui64 MAX_TOTAL_SHARD_RETRIES = 20;
static constexpr ui64 MAX_SHARD_RESOLVES = 3;
static constexpr TDuration RL_MAX_BATCH_DELAY = TDuration::Seconds(50);
-
struct TScannedDataStats {
std::map<ui64, std::pair<ui64, ui64>> ReadShardInfo;
ui64 CompletedShards = 0;
ui64 TotalReadRows = 0;
ui64 TotalReadBytes = 0;
- TScannedDataStats()
- {}
+ TScannedDataStats() = default;
- void AddReadStat(ui64 tabletId, ui64 rows, ui64 bytes) {
- auto [it, success] = ReadShardInfo.emplace(tabletId, std::make_pair(rows, bytes));
+ void AddReadStat(const ui32 scannerIdx, const ui64 rows, const ui64 bytes) {
+ auto [it, success] = ReadShardInfo.emplace(scannerIdx, std::make_pair(rows, bytes));
if (!success) {
auto& [currentRows, currentBytes] = it->second;
currentRows += rows;
@@ -58,8 +57,8 @@ struct TScannedDataStats {
}
}
- void CompleteShard(ui64 tabletId) {
- auto it = ReadShardInfo.find(tabletId);
+ void CompleteShard(TShardState::TPtr state) {
+ auto it = ReadShardInfo.find(state->ScannerIdx);
YQL_ENSURE(it != ReadShardInfo.end());
auto& [currentRows, currentBytes] = it->second;
TotalReadRows += currentRows;
@@ -77,8 +76,7 @@ struct TScannedDataStats {
}
};
-
-class TKqpScanComputeActor : public TDqComputeActorBase<TKqpScanComputeActor> {
+class TKqpScanComputeActor: public TDqComputeActorBase<TKqpScanComputeActor> {
using TBase = TDqComputeActorBase<TKqpScanComputeActor>;
struct TEvPrivate {
@@ -86,12 +84,25 @@ class TKqpScanComputeActor : public TDqComputeActorBase<TKqpScanComputeActor> {
EvRetryShard = EventSpaceBegin(TKikimrEvents::ES_PRIVATE),
};
- struct TEvRetryShard : public TEventLocal<TEvRetryShard, EvRetryShard> {
- ui64 TabletId;
+ struct TEvRetryShard: public TEventLocal<TEvRetryShard, EvRetryShard> {
+ private:
+ explicit TEvRetryShard(const ui64 tabletId)
+ : TabletId(tabletId)
+ , IsCostsRequest(true) {
+ }
+ public:
+ ui64 TabletId = 0;
+ ui32 Generation = 0;
+ bool IsCostsRequest = false;
- TEvRetryShard(ui64 tabletId)
+ static THolder<TEvRetryShard> CostsProblem(const ui64 tabletId) {
+ return THolder<TEvRetryShard>(new TEvRetryShard(tabletId));
+ }
+
+ TEvRetryShard(const ui64 tabletId, const ui32 generation)
: TabletId(tabletId)
- {}
+ , Generation(generation) {
+ }
};
};
@@ -110,8 +121,7 @@ public:
, ComputeCtx(settings.StatsMode)
, Snapshot(snapshot)
, ShardsScanningPolicy(shardsScanningPolicy)
- , Counters(counters)
- {
+ , Counters(counters) {
YQL_ENSURE(GetTask().GetMeta().UnpackTo(&Meta), "Invalid task meta: " << GetTask().GetMeta().DebugString());
YQL_ENSURE(!Meta.GetReads().empty());
YQL_ENSURE(Meta.GetTable().GetTableKind() != (ui32)ETableKind::SysView);
@@ -120,6 +130,7 @@ public:
}
void DoBootstrap() {
+ CA_LOG_D("EVLOGKQP START");
NDq::TDqTaskRunnerContext execCtx;
execCtx.FuncRegistry = AppData()->FunctionRegistry;
execCtx.ComputeCtx = &ComputeCtx;
@@ -158,31 +169,27 @@ public:
ScanData->TaskId = GetTask().GetId();
ScanData->TableReader = CreateKqpTableReader(*ScanData);
-
- for (const auto& read : Meta.GetReads()) {
- auto& state = PendingShards.emplace_back(TShardState(read.GetShardId()));
- state.Ranges.reserve(read.GetKeyRanges().size());
- for (const auto& range : read.GetKeyRanges()) {
- auto& sr = state.Ranges.emplace_back(TSerializedTableRange(range));
- if (!range.HasTo()) {
- sr.To = sr.From;
- sr.FromInclusive = sr.ToInclusive = true;
- }
+ ShardsScanningPolicy.FillRequestScanFeatures(Meta, MaxInFlight, IsAggregationRequest);
+ if (!Meta.HasOlapProgram() || !ShardsScanningPolicy.IsParallelScanningAvailable()) {
+ for (const auto& read : Meta.GetReads()) {
+ auto& state = PendingShards.emplace_back(TShardState(read.GetShardId(), ++ScansCounter));
+ state.Ranges = BuildSerializedTableRanges(read);
+ }
+ StartTableScan();
+ ContinueExecute();
+ } else {
+ for (const auto& read : Meta.GetReads()) {
+ StartCostsRequest(read);
}
-
- Y_VERIFY_DEBUG(!state.Ranges.empty());
}
-
- StartTableScan();
-
- ContinueExecute();
Become(&TKqpScanComputeActor::StateFunc);
}
STFUNC(StateFunc) {
try {
switch (ev->GetTypeRewrite()) {
- hFunc(TEvKqpCompute::TEvScanInitActor, HandleExecute);
+ hFunc(TEvKqpCompute::TEvCostData, HandleExecute)
+ hFunc(TEvKqpCompute::TEvScanInitActor, HandleExecute);
hFunc(TEvKqpCompute::TEvScanData, HandleExecute);
hFunc(TEvKqpCompute::TEvScanError, HandleExecute);
hFunc(TEvPipeCache::TEvDeliveryProblem, HandleExecute);
@@ -198,7 +205,7 @@ public:
} catch (const TMemoryLimitExceededException& e) {
InternalError(NYql::NDqProto::StatusIds::PRECONDITION_FAILED, TIssuesIds::KIKIMR_PRECONDITION_FAILED,
TStringBuilder() << "Mkql memory limit exceeded, limit: " << GetMkqlMemoryLimit()
- << ", host: " << HostName() << ", canAllocateExtraMemory: " << CanAllocateExtraMemory);
+ << ", host: " << HostName() << ", canAllocateExtraMemory: " << CanAllocateExtraMemory);
} catch (const yexception& e) {
InternalError(NYql::NDqProto::StatusIds::INTERNAL_ERROR, TIssuesIds::DEFAULT_ERROR, e.what());
}
@@ -265,27 +272,154 @@ protected:
}
private:
+ TVector<TSerializedTableRange> BuildSerializedTableRanges(const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::TReadOpMeta& readData) {
+ TVector<TSerializedTableRange> resultLocal;
+ resultLocal.reserve(readData.GetKeyRanges().size());
+ for (const auto& range : readData.GetKeyRanges()) {
+ auto& sr = resultLocal.emplace_back(TSerializedTableRange(range));
+ if (!range.HasTo()) {
+ sr.To = sr.From;
+ sr.FromInclusive = sr.ToInclusive = true;
+ }
+ }
+ Y_VERIFY_DEBUG(!resultLocal.empty());
+ return resultLocal;
+ }
+
+ THolder<TEvDataShard::TEvKqpScan> BuildEvKqpScan(const ui32 scanId, const ui32 gen, const TSmallVec<TSerializedTableRange>& ranges) const {
+ auto ev = MakeHolder<TEvDataShard::TEvKqpScan>();
+ ev->Record.SetLocalPathId(ScanData->TableId.PathId.LocalPathId);
+ for (auto& column : ScanData->GetColumns()) {
+ ev->Record.AddColumnTags(column.Tag);
+ ev->Record.AddColumnTypes(column.Type);
+ }
+ ev->Record.MutableSkipNullKeys()->CopyFrom(Meta.GetSkipNullKeys());
+
+ auto protoRanges = ev->Record.MutableRanges();
+ protoRanges->Reserve(ranges.size());
+
+ for (auto& range : ranges) {
+ auto newRange = protoRanges->Add();
+ range.Serialize(*newRange);
+ }
+
+ ev->Record.MutableSnapshot()->CopyFrom(Snapshot);
+ if (RuntimeSettings.Timeout) {
+ ev->Record.SetTimeoutMs(RuntimeSettings.Timeout.Get()->MilliSeconds());
+ }
+ ev->Record.SetStatsMode(RuntimeSettings.StatsMode);
+ ev->Record.SetScanId(scanId);
+ ev->Record.SetTxId(std::get<ui64>(TxId));
+ ev->Record.SetTablePath(ScanData->TablePath);
+ ev->Record.SetSchemaVersion(ScanData->TableId.SchemaVersion);
+
+ ev->Record.SetGeneration(gen);
+
+ ev->Record.SetReverse(Meta.GetReverse());
+ ev->Record.SetItemsLimit(Meta.GetItemsLimit());
+
+ if (Meta.HasOlapProgram()) {
+ TString programBytes;
+ TStringOutput stream(programBytes);
+ Meta.GetOlapProgram().SerializeToArcadiaStream(&stream);
+ ev->Record.SetOlapProgram(programBytes);
+ ev->Record.SetOlapProgramType(
+ NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM_WITH_PARAMETERS
+ );
+ }
+
+ ev->Record.SetDataFormat(Meta.GetDataFormat());
+ return ev;
+ }
+
void ProcessRlNoResourceAndDie() {
const NYql::TIssue issue = MakeIssue(NKikimrIssues::TIssuesIds::YDB_RESOURCE_USAGE_LIMITED,
"Throughput limit exceeded for query");
- CA_LOG_E("Throughput limit exceeded, we got "
- << PendingScanData.size() << " pending messages,"
- << " stream will be terminated");
+ CA_LOG_E("Throughput limit exceeded, we got " << PendingScanData.size() << " pending messages,"
+ << " stream will be terminated");
State = NDqProto::COMPUTE_STATE_FAILURE;
- ReportStateAndMaybeDie(NYql::NDqProto::StatusIds::OVERLOADED, TIssues({issue}));
+ ReportStateAndMaybeDie(NYql::NDqProto::StatusIds::OVERLOADED, TIssues({ issue }));
+ }
+
+ void RetryCostsRequest(TShardCostsState::TPtr state) {
+ const ui32 att = state->TotalRetries++;
+ Counters->ScanQueryShardDisconnect->Inc();
+
+ if (att > MAX_SHARD_RETRIES) {
+ CA_LOG_E("TKqpScanComputeActor: broken pipe with tablet " << state->GetReadData().GetShardId()
+ << ", retries limit exceeded (" << att << ") on costs request");
+ return InternalError(NDqProto::StatusIds::UNAVAILABLE, TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE,
+ TStringBuilder() << "Retries limit with costs requests for shard " << state->GetReadData().GetShardId() << " exceeded.");
+ }
+
+ auto retryDelay = state->CalcRetryDelay();
+ CA_LOG_W("TKqpScanComputeActor: broken pipe with tablet " << state->GetShardId()
+ << ", restarting costs request"
+ << ", attempt #" << att
+ << " schedule after " << retryDelay);
+
+ state->RetryTimer = CreateLongTimer(TlsActivationContext->AsActorContext(), retryDelay,
+ new IEventHandle(SelfId(), SelfId(), TEvPrivate::TEvRetryShard::CostsProblem(state->GetShardId()).Release()));
+ }
+
+ void StartCostsRequest(TShardCostsState::TPtr state) {
+ TSmallVec<TSerializedTableRange> serializedTableRanges = BuildSerializedTableRanges(state->GetReadData());
+ THolder<TEvDataShard::TEvKqpScan> ev = BuildEvKqpScan(state->GetScanId(), 0, serializedTableRanges);
+ ev->Record.SetCostDataOnly(true);
+ THolder<TEvPipeCache::TEvForward> evForward = MakeHolder<TEvPipeCache::TEvForward>(ev.Release(), state->GetShardId());
+ Send(MakePipePeNodeCacheID(false), evForward.Release(), IEventHandle::FlagTrackDelivery);
+ }
+
+ void StartCostsRequest(const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::TReadOpMeta& read) {
+ const ui32 scanId = CostRequestsByScanId.size() + 1;
+ auto costsState = std::make_shared<TShardCostsState>(scanId, &read);
+ Y_VERIFY(CostRequestsByScanId.emplace(scanId, costsState).second);
+ Y_VERIFY(CostRequestsByShardId.emplace(costsState->GetShardId(), costsState).second);
+ StartCostsRequest(costsState);
+ }
+
+ bool ReceiveCostRequest(TEvKqpCompute::TEvCostData::TPtr& ev, const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::TReadOpMeta*& readData,
+ TSmallVec<TSerializedTableRange>& result, const bool splitFactor) {
+ auto it = CostRequestsByScanId.find(ev->Get()->GetScanId());
+ Y_VERIFY(it != CostRequestsByScanId.end(), "incorrect generation from cost data event: %u", ev->Get()->GetScanId());
+ readData = &it->second->GetReadData();
+ if (!ev->Get()->GetTableRanges().ColumnsCount()) {
+ result = BuildSerializedTableRanges(*readData);
+ } else {
+ result = ev->Get()->GetSerializedTableRanges(splitFactor);
+ }
+ if (Meta.GetReverse()) {
+ std::reverse(result.begin(), result.end());
+ }
+ CostRequestsByShardId.erase(it->second->GetShardId());
+ CostRequestsByScanId.erase(it);
+ return true;
+ }
+
+ void HandleExecute(TEvKqpCompute::TEvCostData::TPtr& ev) {
+ const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta::TReadOpMeta* read = nullptr;
+ TSmallVec<TSerializedTableRange> ranges;
+ const double shardSplitFactor = ShardsScanningPolicy.GetShardSplitFactor();
+ ReceiveCostRequest(ev, read, ranges, shardSplitFactor);
+ for (auto&& i : ranges) {
+ auto& state = PendingShards.emplace_back(TShardState(read->GetShardId(), ++ScansCounter));
+ state.Ranges.emplace_back(i);
+ }
+ StartTableScan();
+ ContinueExecute();
}
void HandleExecute(TEvKqpCompute::TEvScanInitActor::TPtr& ev) {
YQL_ENSURE(ScanData);
auto& msg = ev->Get()->Record;
auto scanActorId = ActorIdFromProto(msg.GetScanActorId());
- auto* state = GetShardState(msg, scanActorId);
+ auto state = GetShardState(msg, scanActorId);
if (!state)
return;
CA_LOG_D("Got EvScanInitActor from " << scanActorId << ", gen: " << msg.GetGeneration()
- << ", state: " << EShardStateToString(state->State) << ", stateGen: " << state->Generation
+ << ", state: " << state->State << ", stateGen: " << state->Generation
<< ", tabletId: " << state->TabletId);
YQL_ENSURE(state->Generation == msg.GetGeneration());
@@ -295,6 +429,7 @@ private:
state->ActorId = scanActorId;
state->ResetRetry();
AffectedShards.insert(state->TabletId);
+ InFlightShards.NeedAck(state);
SendScanDataAck(state);
} else {
TerminateExpiredScan(scanActorId, "Got unexpected/expired EvScanInitActor, terminate it");
@@ -304,10 +439,10 @@ private:
void HandleExecute(TEvKqpCompute::TEvScanData::TPtr& ev) {
YQL_ENSURE(ScanData);
auto& msg = *ev->Get();
- auto* state = GetShardState(msg, ev->Sender);
- if (!state)
+ auto state = GetShardState(msg, ev->Sender);
+ if (!state) {
return;
-
+ }
YQL_ENSURE(state->Generation == msg.Generation);
if (state->State != EShardState::Running) {
return TerminateExpiredScan(ev->Sender, "Cancel expired scan");
@@ -327,9 +462,23 @@ private:
}
}
+ void StopFinally() {
+ std::vector<TShardState::TPtr> currentScans;
+ for (auto&& i : InFlightShards) {
+ for (auto&& s : i.second) {
+ currentScans.emplace_back(s.second);
+ }
+ }
+ for (auto&& i : currentScans) {
+ StopReadChunk(*i);
+ }
+ PendingShards.clear();
+ }
+
void ProcessPendingScanDataItem(TEvKqpCompute::TEvScanData::TPtr& ev, const TInstant& enqueuedAt) {
auto& msg = *ev->Get();
- auto* state = GetShardState(msg, ev->Sender);
+
+ auto state = GetShardState(msg, ev->Sender);
if (!state)
return;
@@ -342,56 +491,65 @@ private:
YQL_ENSURE(state->ActorId == ev->Sender, "expected: " << state->ActorId << ", got: " << ev->Sender);
state->LastKey = std::move(msg.LastKey);
- ui64 bytes = 0;
ui64 rowsCount = 0;
+ ui64 bytes = 0;
{
auto guard = TaskRunner->BindAllocator();
switch (msg.GetDataFormat()) {
case NKikimrTxDataShard::EScanDataFormat::CELLVEC:
- case NKikimrTxDataShard::EScanDataFormat::UNSPECIFIED: {
+ case NKikimrTxDataShard::EScanDataFormat::UNSPECIFIED:
if (!msg.Rows.empty()) {
- bytes = ScanData->AddRows(msg.Rows, state->TabletId, TaskRunner->GetHolderFactory());
- rowsCount = msg.Rows.size();
+ bytes += ScanData->AddRows(msg.Rows, state->TabletId, TaskRunner->GetHolderFactory());
+ rowsCount += msg.Rows.size();
}
break;
- }
- case NKikimrTxDataShard::EScanDataFormat::ARROW: {
- if (msg.ArrowBatch != nullptr) {
- bytes = ScanData->AddRows(*msg.ArrowBatch, state->TabletId, TaskRunner->GetHolderFactory());
- rowsCount = msg.ArrowBatch->num_rows();
+ case NKikimrTxDataShard::EScanDataFormat::ARROW:
+ if (!!msg.ArrowBatch) {
+ bytes += ScanData->AddRows(*msg.ArrowBatch, state->TabletId, TaskRunner->GetHolderFactory());
+ rowsCount += msg.ArrowBatch->num_rows();
}
break;
- }
}
}
+ InFlightShards.MutableStatistics(state->TabletId).AddPack(rowsCount, bytes);
- Stats.AddReadStat(state->TabletId, rowsCount, bytes);
-
- CA_LOG_D("Got EvScanData, rows: " << rowsCount << ", bytes: " << bytes << ", finished: " << msg.Finished
- << ", from: " << ev->Sender << ", shards remain: " << PendingShards.size()
- << ", in flight shards " << InFlightShards.size()
- << ", delayed for: " << latency.SecondsFloat() << " seconds by ratelimiter"
- << ", tabletId: " << state->TabletId);
-
- if (rowsCount == 0 && !msg.Finished && state->State != EShardState::PostRunning) {
- SendScanDataAck(state);
- }
+ Stats.AddReadStat(state->ScannerIdx, rowsCount, bytes);
- if (msg.Finished) {
- CA_LOG_D("Tablet " << state->TabletId << " scan finished, unlink");
- Stats.CompleteShard(state->TabletId);
- StopReadFromTablet(state);
+ CA_LOG_D("Got EvScanData, rows: " << rowsCount << "bytes: " << bytes << ", finished: " << msg.Finished
+ << ", from: " << ev->Sender << ", shards remain: " << PendingShards.size()
+ << ", in flight scans " << InFlightShards.GetScansCount()
+ << ", in flight shards " << InFlightShards.GetShardsCount()
+ << ", delayed for: " << latency.SecondsFloat() << " seconds by ratelimiter"
+ << ", tabletId: " << state->TabletId);
+ bool stopFinally = false;
+ if (!IsAggregationRequest && Meta.HasItemsLimit() && Meta.GetItemsLimit() && Stats.TotalReadRows >= Meta.GetItemsLimit()) {
+ StopFinally();
+ stopFinally = true;
+ } else if (!msg.Finished) {
+ InFlightShards.NeedAck(state);
+ } else {
+ CA_LOG_D("Chunk " << state->TabletId << "/" << state->ScannerIdx << " scan finished");
+ Stats.CompleteShard(state);
+ StopReadChunk(*state);
+ CA_LOG_T("TRACE:" << InFlightShards.TraceToString() << ":" << CalculateFreeSpace());
if (!StartTableScan()) {
- CA_LOG_D("Finish scans");
- ScanData->Finish();
-
- if (ScanData->BasicStats) {
- ScanData->BasicStats->AffectedShards = AffectedShards.size();
- }
+ stopFinally = true;
+ }
+ }
+ if (stopFinally) {
+ ScanData->Finish();
+ CA_LOG_D("EVLOGKQP(scans_count:" << ScansCounter << ";max_in_flight:" << MaxInFlight << ")"
+ << Endl << InFlightShards.GetDurationStats()
+ << Endl << InFlightShards.StatisticsToString()
+ );
+ if (ScanData->BasicStats) {
+ ScanData->BasicStats->AffectedShards = AffectedShards.size();
}
}
+ CA_LOG_T("TRACE:" << InFlightShards.TraceToString() << ":" << CalculateFreeSpace() << ":" << rowsCount);
+
if (Y_UNLIKELY(ScanData->ProfileStats)) {
ScanData->ProfileStats->Messages++;
ScanData->ProfileStats->ScanCpuTime += msg.CpuTime;
@@ -412,7 +570,7 @@ private:
PendingScanData.pop_front();
auto& msg = *ev->Get();
- auto* state = GetShardState(msg, ev->Sender);
+ auto state = GetShardState(msg, ev->Sender);
if (!state)
return;
@@ -432,11 +590,11 @@ private:
TIssues issues;
IssuesFromMessage(msg.GetIssues(), issues);
- auto* state = GetShardState(msg, TActorId());
+ auto state = GetShardState(msg, TActorId());
if (!state)
return;
- CA_LOG_W("Got EvScanError scan state: " << EShardStateToString(state->State)
+ CA_LOG_W("Got EvScanError scan state: " << state->State
<< ", status: " << Ydb::StatusIds_StatusCode_Name(status)
<< ", reason: " << issues.ToString()
<< ", tablet id: " << state->TabletId);
@@ -465,29 +623,45 @@ private:
YQL_ENSURE(ScanData);
auto& msg = *ev->Get();
- auto stateIt = InFlightShards.find(msg.TabletId);
- if (stateIt == InFlightShards.end()) {
+ auto it = CostRequestsByShardId.find(msg.TabletId);
+ if (it != CostRequestsByShardId.end()) {
+ RetryCostsRequest(it->second);
+ return;
+ }
+
+ auto* states = InFlightShards.MutableByTabletId(msg.TabletId);
+ if (!states) {
CA_LOG_E("Broken pipe with unknown tablet " << msg.TabletId);
return;
}
- auto* state = &(stateIt->second);
- CA_LOG_W("Got EvDeliveryProblem, TabletId: " << msg.TabletId << ", NotDelivered: " << msg.NotDelivered << ", " << EShardStateToString(state->State));
- if (state->State == EShardState::Starting || state->State == EShardState::Running) {
- return RetryDeliveryProblem(state);
+ for (auto& [_, state] : *states) {
+ const auto shardState = state->State;
+ CA_LOG_W("Got EvDeliveryProblem, TabletId: " << msg.TabletId << ", NotDelivered: " << msg.NotDelivered << ", " << shardState);
+ if (state->State == EShardState::Starting || state->State == EShardState::Running) {
+ return RetryDeliveryProblem(state);
+ }
}
}
void HandleExecute(TEvPrivate::TEvRetryShard::TPtr& ev) {
- ui64 tabletId = ev->Get()->TabletId;
- auto stateIt = InFlightShards.find(tabletId);
- if (stateIt == InFlightShards.end()) {
- CA_LOG_E("Received retry shard for unexpected tablet " << tabletId);
- return;
- }
+ if (ev->Get()->IsCostsRequest) {
+ auto it = CostRequestsByShardId.find(ev->Get()->TabletId);
+ if (it == CostRequestsByShardId.end()) {
+ CA_LOG_E("Received retry shard costs for unexpected tablet " << ev->Get()->TabletId);
+ return;
+ }
+ StartCostsRequest(it->second);
+ } else {
+ const ui32 scannerIdx = InFlightShards.GetIndexByGeneration(ev->Get()->Generation);
+ auto state = InFlightShards.GetStateByIndex(scannerIdx);
+ if (!state) {
+ CA_LOG_E("Received retry shard for unexpected tablet " << ev->Get()->TabletId << " / " << ev->Get()->Generation);
+ return;
+ }
- auto* state = &(stateIt->second);
- SendStartScanRequest(state, state->Generation);
+ SendStartScanRequest(state, state->Generation);
+ }
}
void HandleExecute(TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr& ev) {
@@ -497,7 +671,7 @@ private:
PendingResolveShards.pop_front();
ResolveNextShard();
- StopReadFromTablet(&state);
+ Y_VERIFY(!InFlightShards.GetStateByIndex(state.ScannerIdx));
YQL_ENSURE(state.State == EShardState::Resolving);
CA_LOG_D("Received TEvResolveKeySetResult update for table '" << ScanData->TablePath << "'");
@@ -511,7 +685,7 @@ private:
TString error;
for (const auto& x : request->ResultSet) {
- if ((ui32)x.Status < (ui32) NSchemeCache::TSchemeCacheRequest::EStatus::OkScheme) {
+ if ((ui32)x.Status < (ui32)NSchemeCache::TSchemeCacheRequest::EStatus::OkScheme) {
// invalidate table
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(ScanData->TableId, {}));
@@ -571,7 +745,7 @@ private:
<< ", partition range: " << DebugPrintRange(KeyColumnTypes, partitionRange, tr)
<< ", i: " << i << ", state ranges: " << state.Ranges.size());
- auto newShard = TShardState(partition.ShardId);
+ auto newShard = TShardState(partition.ShardId, ++ScansCounter);
for (ui64 j = i; j < state.Ranges.size(); ++j) {
CA_LOG_D("Intersect state range #" << j << " " << DebugPrintRange(KeyColumnTypes, state.Ranges[j].ToTableRange(), tr)
@@ -609,8 +783,7 @@ private:
}
if (IsDebugLogEnabled(TlsActivationContext->ActorSystem(), NKikimrServices::KQP_COMPUTE)
- && PendingShards.size() + InFlightShards.size() > 0)
- {
+ && PendingShards.size() + InFlightShards.GetScansCount() > 0) {
TStringBuilder sb;
if (!PendingShards.empty()) {
sb << "Pending shards States: ";
@@ -621,37 +794,40 @@ private:
if (!InFlightShards.empty()) {
sb << "In Flight shards States: ";
- for(auto& [_, st] : InFlightShards) {
- sb << st.ToString(KeyColumnTypes) << "; ";
+ for (auto& [_, st] : InFlightShards) {
+ for (auto&& [_, i] : st) {
+ sb << i->ToString(KeyColumnTypes) << "; ";
+ }
}
}
CA_LOG_D(sb);
}
-
StartTableScan();
}
void HandleExecute(TEvents::TEvUndelivered::TPtr& ev) {
switch (ev->Get()->SourceType) {
case TEvDataShard::TEvKqpScan::EventType:
- // handled by TEvPipeCache::TEvDeliveryProblem event
+ // Handled by TEvPipeCache::TEvDeliveryProblem event.
+ // CostData request is KqpScan request too.
return;
case TEvKqpCompute::TEvScanDataAck::EventType:
ui64 tabletId = ev->Cookie;
- auto it = InFlightShards.find(tabletId);
- if (it == InFlightShards.end()) {
+ const auto& shards = InFlightShards.GetByTabletId(tabletId);
+ if (shards.empty()) {
CA_LOG_D("Skip lost TEvScanDataAck to " << ev->Sender << ", " << tabletId);
return;
}
- auto& shard = it->second;
- if (shard.State == EShardState::Running && ev->Sender == shard.ActorId) {
- CA_LOG_E("TEvScanDataAck lost while running scan, terminate execution. DataShard actor: "
- << shard.ActorId);
- InternalError(NDqProto::StatusIds::UNAVAILABLE, TIssuesIds::DEFAULT_ERROR,
- "Delivery problem: EvScanDataAck lost.");
- } else {
- CA_LOG_D("Skip lost TEvScanDataAck to " << ev->Sender << ", active scan actor: " << shard.ActorId);
+ for (auto& [_, state] : shards) {
+ const auto actorId = state->ActorId;
+ if (state->State == EShardState::Running && ev->Sender == actorId) {
+ CA_LOG_E("TEvScanDataAck lost while running scan, terminate execution. DataShard actor: " << actorId);
+ InternalError(NDqProto::StatusIds::UNAVAILABLE, TIssuesIds::DEFAULT_ERROR,
+ "Delivery problem: EvScanDataAck lost.");
+ } else {
+ CA_LOG_D("Skip lost TEvScanDataAck to " << ev->Sender << ", active scan actor: " << actorId);
+ }
}
return;
}
@@ -663,10 +839,12 @@ private:
CA_LOG_N("Disconnected node " << nodeId);
TrackingNodes.erase(nodeId);
- for(auto& [tabletId, state] : InFlightShards) {
- if (state.ActorId && state.ActorId.NodeId() == nodeId) {
- InternalError(NDqProto::StatusIds::UNAVAILABLE, TIssuesIds::DEFAULT_ERROR,
- TStringBuilder() << "Connection with node " << nodeId << " lost.");
+ for (auto& [tabletId, states] : InFlightShards) {
+ for (auto&& [_, state] : states) {
+ if (state->ActorId && state->ActorId.NodeId() == nodeId) {
+ InternalError(NDqProto::StatusIds::UNAVAILABLE, TIssuesIds::DEFAULT_ERROR,
+ TStringBuilder() << "Connection with node " << nodeId << " lost.");
+ }
}
}
}
@@ -674,90 +852,59 @@ private:
private:
bool StartTableScan() {
- // allow reading from multiple shards if data is not sorted
- const ui32 maxAllowedInFlight = ShardsScanningPolicy.GetMaxInFlightScans(Meta);
-
- while (!PendingShards.empty() && InFlightShards.size() + PendingResolveShards.size() + 1 <= maxAllowedInFlight) {
- ui64 tabletId = PendingShards.front().TabletId;
- auto [it, success] = InFlightShards.emplace(tabletId, std::move(PendingShards.front()));
+ const ui32 maxAllowedInFlight = MaxInFlight;
+ bool isFirst = true;
+ while (!PendingShards.empty() && InFlightShards.GetScansCount() + PendingResolveShards.size() + 1 <= maxAllowedInFlight) {
+ if (isFirst) {
+ CA_LOG_D("BEFORE: " << PendingShards.size() << "." << InFlightShards.GetScansCount() << "." << PendingResolveShards.size());
+ isFirst = false;
+ }
+ auto state = InFlightShards.Put(std::move(PendingShards.front()));
PendingShards.pop_front();
- StartReadShard(&(it->second));
+ StartReadShard(state);
+ }
+ if (!isFirst) {
+ CA_LOG_D("AFTER: " << PendingShards.size() << "." << InFlightShards.GetScansCount() << "." << PendingResolveShards.size());
}
- CA_LOG_D("Scheduled table scans, in flight: " << InFlightShards.size() << " shards. "
+ CA_LOG_D("Scheduled table scans, in flight: " << InFlightShards.GetScansCount() << " shards. "
<< "pending shards to read: " << PendingShards.size() << ", "
<< "pending resolve shards: " << PendingResolveShards.size() << ", "
<< "average read rows: " << Stats.AverageReadRows() << ", "
<< "average read bytes: " << Stats.AverageReadBytes() << ", ");
- return InFlightShards.size() + PendingShards.size() + PendingResolveShards.size() > 0;
+ return CostRequestsByScanId.size() + InFlightShards.GetScansCount() + PendingShards.size() + PendingResolveShards.size() > 0;
}
- void StartReadShard(TShardState* state) {
+ void StartReadShard(TShardState::TPtr state) {
YQL_ENSURE(state->State == EShardState::Initial);
state->State = EShardState::Starting;
- state->Generation = AllocateGeneration(state);
+ state->Generation = InFlightShards.AllocateGeneration(state);
state->ActorId = {};
SendStartScanRequest(state, state->Generation);
}
- void SendScanDataAck(TShardState* state) {
+ bool SendScanDataAck(TShardState::TPtr state) {
ui64 freeSpace = CalculateFreeSpace();
+ if (!freeSpace) {
+ return false;
+ }
CA_LOG_D("Send EvScanDataAck to " << state->ActorId << ", freeSpace: " << freeSpace << ", gen: " << state->Generation);
ui32 flags = IEventHandle::FlagTrackDelivery;
if (TrackingNodes.insert(state->ActorId.NodeId()).second) {
flags |= IEventHandle::FlagSubscribeOnSession;
}
Send(state->ActorId, new TEvKqpCompute::TEvScanDataAck(freeSpace, state->Generation), flags, state->TabletId);
+ InFlightShards.AckSent(state);
+ return true;
}
- void SendStartScanRequest(TShardState* state, ui32 gen) {
+ void SendStartScanRequest(TShardState::TPtr state, ui32 gen) {
YQL_ENSURE(state->State == EShardState::Starting);
- auto ev = MakeHolder<TEvDataShard::TEvKqpScan>();
- ev->Record.SetLocalPathId(ScanData->TableId.PathId.LocalPathId);
- for (auto& column: ScanData->GetColumns()) {
- ev->Record.AddColumnTags(column.Tag);
- ev->Record.AddColumnTypes(column.Type);
- }
- ev->Record.MutableSkipNullKeys()->CopyFrom(Meta.GetSkipNullKeys());
-
- CA_LOG_D("Start scan request, " << state->ToString(KeyColumnTypes));
auto ranges = state->GetScanRanges(KeyColumnTypes);
- auto protoRanges = ev->Record.MutableRanges();
- protoRanges->Reserve(ranges.size());
-
- for (auto& range: ranges) {
- auto newRange = protoRanges->Add();
- range.Serialize(*newRange);
- }
-
- ev->Record.MutableSnapshot()->CopyFrom(Snapshot);
- if (RuntimeSettings.Timeout) {
- ev->Record.SetTimeoutMs(RuntimeSettings.Timeout.Get()->MilliSeconds());
- }
- ev->Record.SetStatsMode(RuntimeSettings.StatsMode);
- ev->Record.SetScanId(0);
- ev->Record.SetTxId(std::get<ui64>(TxId));
- ev->Record.SetTablePath(ScanData->TablePath);
- ev->Record.SetSchemaVersion(ScanData->TableId.SchemaVersion);
-
- ev->Record.SetGeneration(gen);
-
- ev->Record.SetReverse(Meta.GetReverse());
- ev->Record.SetItemsLimit(Meta.GetItemsLimit());
-
- if (Meta.HasOlapProgram()) {
- TString programBytes;
- TStringOutput stream(programBytes);
- Meta.GetOlapProgram().SerializeToArcadiaStream(&stream);
- ev->Record.SetOlapProgram(programBytes);
- ev->Record.SetOlapProgramType(
- NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM_WITH_PARAMETERS
- );
- }
-
- ev->Record.SetDataFormat(Meta.GetDataFormat());
+ CA_LOG_D("Start scan request, " << state->ToString(KeyColumnTypes));
+ THolder<TEvDataShard::TEvKqpScan> ev = BuildEvKqpScan(0, gen, ranges);
bool subscribed = std::exchange(state->SubscribedOnTablet, true);
@@ -769,7 +916,7 @@ private:
IEventHandle::FlagTrackDelivery);
}
- void RetryDeliveryProblem(TShardState* state) {
+ void RetryDeliveryProblem(TShardState::TPtr state) {
Counters->ScanQueryShardDisconnect->Inc();
if (state->TotalRetries >= MAX_TOTAL_SHARD_RETRIES) {
@@ -784,12 +931,12 @@ private:
// so after several consecutive delivery problem responses retry logic should
// resolve shard details again.
if (state->RetryAttempt >= MAX_SHARD_RETRIES) {
- return ResolveShard(state);
+ return ResolveShard(*state);
}
state->RetryAttempt++;
state->TotalRetries++;
- state->Generation = AllocateGeneration(state);
+ state->Generation = InFlightShards.AllocateGeneration(state);
state->ActorId = {};
state->State = EShardState::Starting;
state->SubscribedOnTablet = false;
@@ -800,7 +947,7 @@ private:
<< " schedule after " << retryDelay);
state->RetryTimer = CreateLongTimer(TlsActivationContext->AsActorContext(), retryDelay,
- new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvRetryShard(state->TabletId)));
+ new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvRetryShard(state->TabletId, state->Generation)));
}
bool IsQuotingEnabled() const {
@@ -821,7 +968,7 @@ private:
as->Send(selfId, new TEvents::TEvWakeup(EEvWakeupTag::RlNoResourceTag));
};
- const NRpcService::TRlFullPath rlFullPath {
+ const NRpcService::TRlFullPath rlFullPath{
.CoordinationNode = rlPath->GetCoordinationNode(),
.ResourcePath = rlPath->GetResourcePath(),
.DatabaseName = rlPath->GetDatabase(),
@@ -845,28 +992,39 @@ private:
void ResolveNextShard() {
if (!PendingResolveShards.empty()) {
auto& state = PendingResolveShards.front();
- ResolveShard(&state);
+ ResolveShard(state);
}
}
- void EnqueueResolveShard(TShardState* state) {
- auto it = InFlightShards.find(state->TabletId);
- YQL_ENSURE(it != InFlightShards.end());
- PendingResolveShards.emplace_back(std::move(it->second));
- InFlightShards.erase(it);
+ void EnqueueResolveShard(TShardState::TPtr state) {
+ CA_LOG_D("Enqueue for resolve " << state->TabletId << " chunk " << state->ScannerIdx);
+ YQL_ENSURE(StopReadChunk(*state));
+ DoExecute();
+ PendingResolveShards.emplace_back(*state);
if (PendingResolveShards.size() == 1) {
ResolveNextShard();
}
}
- void StopReadFromTablet(TShardState* state) {
- CA_LOG_D("Unlink from tablet " << state->TabletId << " and stop reading from it.");
- Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvUnlink(state->TabletId));
- InFlightShards.erase(state->TabletId);
+ bool StopReadChunk(const TShardState& state) {
+ CA_LOG_D("Unlink from tablet " << state.TabletId << " chunk " << state.ScannerIdx << " and stop reading from it.");
+ const ui64 tabletId = state.TabletId;
+ const ui32 scannerIdx = state.ScannerIdx;
+ if (!InFlightShards.RemoveIfExists(scannerIdx)) {
+ return false;
+ }
+ const size_t remainChunksCount = InFlightShards.GetByTabletId(tabletId).size();
+ if (remainChunksCount == 0) {
+ CA_LOG_D("Unlink fully for tablet " << state.TabletId);
+ Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvUnlink(tabletId));
+ } else {
+ CA_LOG_D("Tablet " << state.TabletId << " not ready for unlink. Ramained chunks count: " << remainChunksCount);
+ }
+ return true;
}
- void ResolveShard(TShardState* state) {
- if (state->ResolveAttempt >= MAX_SHARD_RESOLVES) {
+ void ResolveShard(TShardState& state) {
+ if (state.ResolveAttempt >= MAX_SHARD_RESOLVES) {
InternalError(NDqProto::StatusIds::UNAVAILABLE, TIssuesIds::KIKIMR_TEMPORARILY_UNAVAILABLE,
TStringBuilder() << "Table '" << ScanData->TablePath << "' resolve limit exceeded");
return;
@@ -874,12 +1032,12 @@ private:
Counters->ScanQueryShardResolve->Inc();
- state->State = EShardState::Resolving;
- state->ResolveAttempt++;
- state->SubscribedOnTablet = false;
+ state.State = EShardState::Resolving;
+ state.ResolveAttempt++;
+ state.SubscribedOnTablet = false;
- auto range = TTableRange(state->Ranges.front().From.GetCells(), state->Ranges.front().FromInclusive,
- state->Ranges.back().To.GetCells(), state->Ranges.back().ToInclusive);
+ auto range = TTableRange(state.Ranges.front().From.GetCells(), state.Ranges.front().FromInclusive,
+ state.Ranges.back().To.GetCells(), state.Ranges.back().ToInclusive);
TVector<TKeyDesc::TColumnOp> columns;
columns.reserve(ScanData->GetColumns().size());
@@ -892,11 +1050,11 @@ private:
}
auto keyDesc = MakeHolder<TKeyDesc>(ScanData->TableId, range, TKeyDesc::ERowOperation::Read,
- KeyColumnTypes, columns);
+ KeyColumnTypes, columns);
CA_LOG_D("Sending TEvResolveKeySet update for table '" << ScanData->TablePath << "'"
<< ", range: " << DebugPrintRange(KeyColumnTypes, range, *AppData()->TypeRegistry)
- << ", attempt #" << state->ResolveAttempt);
+ << ", attempt #" << state.ResolveAttempt);
auto request = MakeHolder<NSchemeCache::TSchemeCacheRequest>();
request->ResultSet.emplace_back(std::move(keyDesc));
@@ -907,36 +1065,45 @@ private:
private:
ui64 CalculateFreeSpace() const {
return GetMemoryLimits().ScanBufferSize > ScanData->GetStoredBytes()
- ? GetMemoryLimits().ScanBufferSize - ScanData->GetStoredBytes()
- : 0ul;
+ ? GetMemoryLimits().ScanBufferSize - ScanData->GetStoredBytes()
+ : 0ul;
}
std::any GetSourcesState() override {
- if (ScanData) {
- return CalculateFreeSpace();
+ if (!ScanData) {
+ return 0;
}
- return {};
+ return CalculateFreeSpace();
}
void PollSources(std::any prev) override {
- if (!prev.has_value() || !ScanData) {
+ if (!ScanData || ScanData->IsFinished()) {
return;
}
-
- for (auto it = InFlightShards.begin(); it != InFlightShards.end(); ++it) {
- auto* state = &(it->second);
+ const auto hasNewMemoryPred = [&]() {
+ if (!prev.has_value()) {
+ return false;
+ }
const ui64 freeSpace = CalculateFreeSpace();
const ui64 prevFreeSpace = std::any_cast<ui64>(prev);
-
- CA_LOG_T("Scan over tablet " << state->TabletId << " finished: " << ScanData->IsFinished()
- << ", prevFreeSpace: " << prevFreeSpace << ", freeSpace: " << freeSpace << ", peer: " << state->ActorId);
-
- if (!ScanData->IsFinished() && state->State != EShardState::PostRunning
- && prevFreeSpace < freeSpace && state->ActorId)
- {
- SendScanDataAck(state);
+ return freeSpace > prevFreeSpace;
+ };
+ if (!hasNewMemoryPred()) {
+ return;
+ }
+ CA_LOG_D("POLL_SOURCES:START:" << InFlightShards.GetShardsCount() << "." << InFlightShards.GetScansCount());
+ while (InFlightShards.GetNeedAck().size()) {
+ auto state = InFlightShards.GetNeedAck().begin()->second;
+
+ CA_LOG_T("Scan over tablet " << state->TabletId << ", peer: " << state->ActorId);
+ Y_VERIFY(state->State != EShardState::PostRunning);
+ Y_VERIFY(!!state->ActorId);
+ if (!SendScanDataAck(state)) {
+ CA_LOG_D("POLL_SOURCES:STOP_CANNOT_SEND_ACK:" << InFlightShards.GetShardsCount() << "." << InFlightShards.GetScansCount());
+ break;
}
}
+ CA_LOG_D("POLL_SOURCES:FINISH:" << InFlightShards.GetShardsCount() << "." << InFlightShards.GetScansCount());
}
void TerminateSources(const TIssues& issues, bool success) override {
@@ -945,16 +1112,18 @@ private:
}
auto prio = success ? NActors::NLog::PRI_DEBUG : NActors::NLog::PRI_WARN;
- for(auto it = InFlightShards.begin(); it != InFlightShards.end(); ++it) {
- auto* state = &(it->second);
- if (state->ActorId) {
- CA_LOG(prio, "Send abort execution event to scan over tablet: " << state->TabletId << ", table: "
- << ScanData->TablePath << ", scan actor: " << state->ActorId << ", message: " << issues.ToOneLineString());
-
- Send(state->ActorId, new TEvKqp::TEvAbortExecution(
- success ? NYql::NDqProto::StatusIds::SUCCESS : NYql::NDqProto::StatusIds::ABORTED, issues));
- } else {
- CA_LOG(prio, "Table: " << ScanData->TablePath << ", scan has not been started yet");
+ for (auto&& itTablet : InFlightShards) {
+ for (auto&& it : itTablet.second) {
+ auto state = it.second;
+ if (state->ActorId) {
+ CA_LOG(prio, "Send abort execution event to scan over tablet: " << state->TabletId << ", table: "
+ << ScanData->TablePath << ", scan actor: " << state->ActorId << ", message: " << issues.ToOneLineString());
+
+ Send(state->ActorId, new TEvKqp::TEvAbortExecution(
+ success ? NYql::NDqProto::StatusIds::SUCCESS : NYql::NDqProto::StatusIds::ABORTED, issues));
+ } else {
+ CA_LOG(prio, "Table: " << ScanData->TablePath << ", scan has not been started yet");
+ }
}
}
}
@@ -978,22 +1147,22 @@ private:
}
template<class TMessage>
- TShardState* GetShardState(const TMessage& msg, const TActorId& scanActorId) {
+ TShardState::TPtr GetShardState(const TMessage& msg, const TActorId& scanActorId) {
ui32 generation;
- if constexpr(std::is_same_v<TMessage, NKikimrKqp::TEvScanError>) {
+ if constexpr (std::is_same_v<TMessage, NKikimrKqp::TEvScanError>) {
generation = msg.GetGeneration();
- } else if constexpr(std::is_same_v<TMessage, NKikimrKqp::TEvScanInitActor>) {
+ } else if constexpr (std::is_same_v<TMessage, NKikimrKqp::TEvScanInitActor>) {
generation = msg.GetGeneration();
} else {
generation = msg.Generation;
}
- auto it = AllocatedGenerations.find(generation);
- YQL_ENSURE(it != AllocatedGenerations.end(), "Received message from unknown scan or request.");
- ui64 tabletId = it->second;
- auto stateIt = InFlightShards.find(tabletId);
- if (stateIt == InFlightShards.end()) {
- TString error = TStringBuilder() << "Received message from scan shard which is not currently in flight, tablet" << tabletId;
+ const ui32 scannerIdx = InFlightShards.GetIndexByGeneration(generation);
+ YQL_ENSURE(scannerIdx, "Received message from unknown scan or request. Generation: " << generation);
+
+ TShardState::TPtr statePtr = InFlightShards.GetStateByIndex(scannerIdx);
+ if (!statePtr) {
+ TString error = TStringBuilder() << "Received message from scan shard which is not currently in flight, scannerIdx " << scannerIdx;
CA_LOG_W(error);
if (scanActorId) {
TerminateExpiredScan(scanActorId, error);
@@ -1002,7 +1171,7 @@ private:
return nullptr;
}
- auto& state = stateIt->second;
+ auto& state = *statePtr;
if (state.Generation != generation) {
TString error = TStringBuilder() << "Received message from expired scan, generation mistmatch, "
<< "expected: " << state.Generation << ", received: " << generation;
@@ -1014,15 +1183,7 @@ private:
return nullptr;
}
- return &state;
- }
-
- ui32 AllocateGeneration(TShardState* state) {
- ui32 nextGeneration = ++LastGeneration;
- auto[it, success] = AllocatedGenerations.emplace(nextGeneration, state->TabletId);
- YQL_ENSURE(success, "Found duplicated while allocating next generation id for scan request: "
- << nextGeneration << ", tablet " << state->TabletId << ", allocated for tablet " << it->second);
- return nextGeneration;
+ return statePtr;
}
private:
@@ -1038,52 +1199,26 @@ private:
std::deque<TShardState> PendingShards;
std::deque<TShardState> PendingResolveShards;
- std::map<ui32, ui64> AllocatedGenerations;
- std::map<ui64, TShardState> InFlightShards;
+ TInFlightShards InFlightShards;
+ ui32 ScansCounter = 0;
- ui32 LastGeneration = 0;
std::set<ui64> AffectedShards;
std::set<ui32> TrackingNodes;
+ std::map<ui32, TShardCostsState::TPtr> CostRequestsByScanId;
+ std::map<ui64, TShardCostsState::TPtr> CostRequestsByShardId;
+ ui32 MaxInFlight = 1024;
+ bool IsAggregationRequest = false;
};
} // anonymous namespace
-ui32 TShardsScanningPolicy::GetMaxInFlightScans(const NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta& meta) const {
- const bool isSorted = (meta.HasSorted() ? meta.GetSorted() : true);
- if (meta.HasOlapProgram()) {
- NKikimrSSA::TProgram program;
- Y_VERIFY(program.ParseFromString(meta.GetOlapProgram().GetProgram()));
- std::optional<ui32> aggregationLimit;
- for (auto&& command : program.GetCommand()) {
- if (!command.HasGroupBy()) {
- aggregationLimit = Min(aggregationLimit.value_or(ProtoConfig.GetScanLimit()), ProtoConfig.GetScanLimit());
- continue;
- }
- ui32 aggrLimit;
- if (command.GetGroupBy().GetKeyColumns().size()) {
- aggrLimit = ProtoConfig.GetAggregationGroupByLimit();
- } else {
- aggrLimit = ProtoConfig.GetAggregationNoGroupLimit();
- }
- aggregationLimit = Min(aggregationLimit.value_or(aggrLimit), aggrLimit);
- }
- return aggregationLimit.value_or(ProtoConfig.GetAggregationGroupByLimit());
- } else if (isSorted) {
- return 1;
- } else {
- return ProtoConfig.GetScanLimit();
- }
-}
-
IActor* CreateKqpScanComputeActor(const NKikimrKqp::TKqpSnapshot& snapshot, const TActorId& executerId, ui64 txId,
NDqProto::TDqTask&& task, IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const TComputeRuntimeSettings& settings, const TComputeMemoryLimits& memoryLimits,
- const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId)
-{
+ const TShardsScanningPolicy& shardsScanningPolicy, TIntrusivePtr<TKqpCounters> counters, NWilson::TTraceId traceId) {
return new TKqpScanComputeActor(snapshot, executerId, txId, std::move(task), std::move(asyncIoFactory),
functionRegistry, settings, memoryLimits, shardsScanningPolicy, counters, std::move(traceId));
}
-} // namespace NKqp
-} // namespace NKikimr
+}
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp
new file mode 100644
index 0000000000..e027b329f9
--- /dev/null
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.cpp
@@ -0,0 +1,89 @@
+#include "kqp_scan_compute_manager.h"
+#include <util/string/builder.h>
+
+namespace NKikimr::NKqp::NComputeActor {
+
+TString TInFlightShards::TraceToString() const {
+ TStringBuilder sb;
+ for (auto&& i : StatesByIndex) {
+ sb << i.first << ":" << i.second->State << ":" << NeedAckStates.contains(i.first) << ";";
+ }
+ return sb;
+}
+
+TShardState::TPtr TInFlightShards::RemoveIfExists(const ui32 scannerIdx) {
+ if (!scannerIdx) {
+ return nullptr;
+ }
+ auto itScanner = StatesByIndex.find(scannerIdx);
+ if (itScanner == StatesByIndex.end()) {
+ return nullptr;
+ }
+ TShardState::TPtr result = itScanner->second;
+ auto itTablet = Shards.find(result->TabletId);
+ if (itTablet == Shards.end()) {
+ return nullptr;
+ }
+ auto it = itTablet->second.find(result->ScannerIdx);
+ if (it == itTablet->second.end()) {
+ return nullptr;
+ }
+ MutableStatistics(result->TabletId).MutableStatistics(result->ScannerIdx).SetFinishInstant(Now());
+ NeedAckStates.erase(result->ScannerIdx);
+ TScanShardsStatistics::OnScansDiff(Shards.size(), GetScansCount());
+
+ itTablet->second.erase(it);
+ if (itTablet->second.empty()) {
+ Shards.erase(itTablet);
+ }
+ StatesByIndex.erase(itScanner);
+ return result;
+}
+
+TShardState::TPtr TInFlightShards::RemoveIfExists(TShardState::TPtr state) {
+ if (!state) {
+ return state;
+ }
+ return RemoveIfExists(state->ScannerIdx);
+}
+
+TShardState::TPtr TInFlightShards::Put(TShardState&& state) {
+ TScanShardsStatistics::OnScansDiff(Shards.size(), GetScansCount());
+ MutableStatistics(state.TabletId).MutableStatistics(state.ScannerIdx).SetStartInstant(Now());
+
+ TShardState::TPtr result = std::make_shared<TShardState>(std::move(state));
+ StatesByIndex.emplace(result->ScannerIdx, result);
+ Shards[result->TabletId].emplace(result->ScannerIdx, result);
+ return result;
+}
+
+ui32 TInFlightShards::GetIndexByGeneration(const ui32 generation) {
+ auto it = AllocatedGenerations.find(generation);
+ if (it == AllocatedGenerations.end()) {
+ return 0;
+ }
+ return it->second;
+}
+
+ui32 TInFlightShards::AllocateGeneration(TShardState::TPtr state) {
+ {
+ auto itTablet = Shards.find(state->TabletId);
+ Y_VERIFY(itTablet != Shards.end());
+ auto it = itTablet->second.find(state->ScannerIdx);
+ Y_VERIFY(it != itTablet->second.end());
+ }
+
+ const ui32 nextGeneration = ++LastGeneration;
+ Y_VERIFY(AllocatedGenerations.emplace(nextGeneration, state->ScannerIdx).second);
+ return nextGeneration;
+}
+
+ui32 TInFlightShards::GetScansCount() const {
+ ui32 result = 0;
+ for (auto&& i : Shards) {
+ result += i.second.size();
+ }
+ return result;
+}
+
+} \ No newline at end of file
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h
new file mode 100644
index 0000000000..cf808a5d17
--- /dev/null
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h
@@ -0,0 +1,79 @@
+#pragma once
+#include "kqp_compute_state.h"
+#include "kqp_scan_compute_stat.h"
+
+namespace NKikimr::NKqp::NComputeActor {
+
+class TInFlightShards: public TScanShardsStatistics {
+private:
+ using TTabletStates = std::map<ui32, TShardState::TPtr>;
+ using TTabletsData = std::map<ui64, TTabletStates>;
+ TTabletsData Shards;
+ std::map<ui32, ui32> AllocatedGenerations;
+ TTabletStates StatesByIndex;
+ std::set<ui32> ActualScannerIds;
+ ui32 LastGeneration = 0;
+ std::map<ui32, TShardState::TPtr> NeedAckStates;
+
+public:
+ TString TraceToString() const;
+
+ const std::map<ui32, TShardState::TPtr>& GetNeedAck() const {
+ return NeedAckStates;
+ }
+
+ void AckSent(TShardState::TPtr state) {
+ Y_VERIFY(StatesByIndex.contains(state->ScannerIdx));
+ NeedAckStates.erase(state->ScannerIdx);
+ }
+
+ void NeedAck(TShardState::TPtr state) {
+ Y_VERIFY(StatesByIndex.contains(state->ScannerIdx));
+ NeedAckStates.emplace(state->ScannerIdx, state);
+ }
+
+ ui32 AllocateGeneration(TShardState::TPtr state);
+ ui32 GetScansCount() const;
+ ui32 GetShardsCount() const {
+ return Shards.size();
+ }
+ bool empty() const {
+ return Shards.empty();
+ }
+ TTabletsData::const_iterator begin() const {
+ return Shards.begin();
+ }
+ TTabletsData::const_iterator end() const {
+ return Shards.end();
+ }
+ TShardState::TPtr GetStateByIndex(const ui32 index) const {
+ auto it = StatesByIndex.find(index);
+ if (it == StatesByIndex.end()) {
+ return nullptr;
+ }
+ return it->second;
+ }
+ ui32 GetIndexByGeneration(const ui32 generation);
+ TShardState::TPtr RemoveIfExists(TShardState::TPtr state);
+
+ TShardState::TPtr RemoveIfExists(const ui32 scannerIdx);
+ TShardState::TPtr Put(TShardState&& state);
+ const TTabletStates& GetByTabletId(const ui64 tabletId) const {
+ auto it = Shards.find(tabletId);
+ if (it == Shards.end()) {
+ return Default<TTabletStates>();
+ } else {
+ return it->second;
+ }
+ }
+ TTabletStates* MutableByTabletId(const ui64 tabletId) {
+ auto it = Shards.find(tabletId);
+ if (it == Shards.end()) {
+ return nullptr;
+ } else {
+ return &it->second;
+ }
+ }
+};
+
+} \ No newline at end of file
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_stat.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_stat.cpp
new file mode 100644
index 0000000000..1ee73017ae
--- /dev/null
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_stat.cpp
@@ -0,0 +1,111 @@
+#include "kqp_scan_compute_stat.h"
+#include <util/string/builder.h>
+
+namespace NKikimr::NKqp::NComputeActor {
+
+TChunkStatistics& TChunkStatistics::SetStartInstant(const TInstant value) {
+ StartInstant = value;
+ return *this;
+}
+
+TChunkStatistics& TChunkStatistics::SetFinishInstant(const TInstant value) {
+ FinishInstant = value;
+ return *this;
+}
+
+void TShardStatistics::AddPack(const ui32 rowsCount, const ui64 bytes) {
+ if (!MaxPackSize) {
+ MaxPackSize = rowsCount;
+ } else if (*MaxPackSize < rowsCount) {
+ MaxPackSize = rowsCount;
+ }
+ if (!MinPackSize) {
+ MinPackSize = rowsCount;
+ } else if (*MinPackSize < rowsCount) {
+ MinPackSize = rowsCount;
+ }
+ PacksCount += 1;
+ TotalRowsCount += rowsCount;
+ TotalBytesCount += bytes;
+}
+
+TString TShardStatistics::StatisticsToString() const {
+ TStringBuilder sb;
+ std::optional<TInstant> minInstant;
+ std::optional<TInstant> maxInstant;
+ TDuration dMin;
+ TDuration dMax;
+ TDuration dAvg;
+ for (auto&& i : Statistics) {
+ if (!dMin) {
+ dMin = i.second.GetDuration();
+ } else {
+ dMin = Min(i.second.GetDuration(), dMin);
+ }
+ if (!dMax) {
+ dMax = i.second.GetDuration();
+ } else {
+ dMax = Max(i.second.GetDuration(), dMax);
+ }
+ dAvg += i.second.GetDuration();
+ if (!minInstant || !maxInstant) {
+ minInstant = i.second.GetStartInstant();
+ maxInstant = i.second.GetFinishInstant();
+ } else {
+ minInstant = Min(i.second.GetStartInstant(), *minInstant);
+ maxInstant = Max(i.second.GetFinishInstant(), *maxInstant);
+ }
+ }
+ sb << "CHUNKS=" << Statistics.size() << ";";
+ if (minInstant && maxInstant) {
+ sb << "D=" << *maxInstant - *minInstant << ";";
+ }
+ if (Statistics.size()) {
+ sb << "PacksCount=" << PacksCount << ";";
+ if (PacksCount) {
+ sb << "RowsCount=" << TotalRowsCount << ";";
+ sb << "BytesCount=" << TotalBytesCount << ";";
+ sb << "MinPackSize=" << *MinPackSize << ";";
+ sb << "MaxPackSize=" << *MaxPackSize << ";";
+ }
+ sb << "CAVG=" << dAvg / Statistics.size() << ";";
+ sb << "CMIN=" << dMin << ";";
+ sb << "CMAX=" << dMax << ";";
+ }
+ return sb;
+}
+
+TString TScanShardsStatistics::StatisticsToString() const {
+ TStringBuilder sb;
+ for (auto&& i : Statistics) {
+ sb << "{SHARD(" << i.first << "):";
+ sb << i.second.StatisticsToString() << "};";
+ }
+ return sb;
+}
+
+TString TScanShardsStatistics::GetDurationStats() const {
+ TStringBuilder sb;
+ double sumDuration = 0;
+ for (auto&& i : DurationByShardsCount) {
+ sumDuration += i.second.MicroSeconds();
+ }
+ if (sumDuration > 10) {
+ sb << "InFlightScans:";
+ double wScans = 0;
+ for (auto&& i : DurationByScansCount) {
+ wScans += i.first * 1.0 * i.second.MicroSeconds();
+ }
+ sb << "InFlightShards:";
+ double wShards = 0;
+ for (auto&& i : DurationByShardsCount) {
+ wShards += i.first * 1.0 * i.second.MicroSeconds();
+ }
+ sb << ";wScans=" << wScans / sumDuration << ";wShards=" << wShards / sumDuration << ";";
+ } else {
+ sb << "so fast";
+ }
+ return sb;
+}
+
+} \ No newline at end of file
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_stat.h b/ydb/core/kqp/compute_actor/kqp_scan_compute_stat.h
new file mode 100644
index 0000000000..4ccb51c3f5
--- /dev/null
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_stat.h
@@ -0,0 +1,63 @@
+#pragma once
+#include <util/datetime/base.h>
+#include <util/system/types.h>
+#include <optional>
+#include <map>
+
+namespace NKikimr::NKqp::NComputeActor {
+
+class TChunkStatistics {
+private:
+ TInstant StartInstant;
+ TInstant FinishInstant;
+public:
+ TChunkStatistics& SetStartInstant(const TInstant value);
+ TChunkStatistics& SetFinishInstant(const TInstant value);
+ TInstant GetStartInstant() const {
+ return StartInstant;
+ }
+ TInstant GetFinishInstant() const {
+ return FinishInstant;
+ }
+ TDuration GetDuration() const {
+ return FinishInstant - StartInstant;
+ }
+};
+
+class TShardStatistics {
+private:
+ std::map<ui32, TChunkStatistics> Statistics;
+ std::optional<ui32> MaxPackSize;
+ std::optional<ui32> MinPackSize;
+ ui32 PacksCount = 0;
+ ui32 TotalRowsCount = 0;
+ ui32 TotalBytesCount = 0;
+public:
+ void AddPack(const ui32 rowsCount, const ui64 bytes);
+ TChunkStatistics& MutableStatistics(const ui32 scannerIdx) {
+ return Statistics[scannerIdx];
+ }
+ TString StatisticsToString() const;
+};
+
+class TScanShardsStatistics {
+private:
+ TInstant PredDiff = Now();
+ std::map<ui32, TDuration> DurationByScansCount;
+ std::map<ui32, TDuration> DurationByShardsCount;
+ std::map<ui64, TShardStatistics> Statistics;
+protected:
+ void OnScansDiff(const ui32 shardsCount, const ui32 scansCount) {
+ DurationByShardsCount[shardsCount] += Now() - PredDiff;
+ DurationByScansCount[scansCount] += Now() - PredDiff;
+ PredDiff = Now();
+ }
+public:
+ TShardStatistics& MutableStatistics(const ui64 shardId) {
+ return Statistics[shardId];
+ }
+ TString StatisticsToString() const;
+ TString GetDurationStats() const;
+
+};
+} \ No newline at end of file
diff --git a/ydb/core/kqp/ut/kqp_olap_ut.cpp b/ydb/core/kqp/ut/kqp_olap_ut.cpp
index b78bc3af91..cafd527ff9 100644
--- a/ydb/core/kqp/ut/kqp_olap_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_olap_ut.cpp
@@ -519,7 +519,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
auto planRes = CollectStreamResult(res);
auto ast = planRes.QueryStats->Getquery_ast();
-
+// Cerr << ast << Endl;
for (auto planNode : planNodes) {
UNIT_ASSERT_C(ast.find(planNode) != std::string::npos,
TStringBuilder() << planNode << " was not pushed down. Query: " << query);
@@ -1106,11 +1106,15 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
auto rows = ExecuteScanQuery(tableClient, selectQuery);
TInstant tsPrev = TInstant::MicroSeconds(1000000);
+
+ std::set<ui64> results = { 1000096, 1000097, 1000098, 1000099, 1000999, 1001000 };
for (const auto& r : rows) {
TInstant ts = GetTimestamp(r.at("timestamp"));
UNIT_ASSERT_GE_C(ts, tsPrev, "result is not sorted in ASC order");
+ UNIT_ASSERT(results.erase(ts.GetValue()));
tsPrev = ts;
}
+ UNIT_ASSERT(rows.size() == 6);
}
Y_UNIT_TEST_TWIN(ExtractRangesReverse, UseSessionActor) {
@@ -1129,6 +1133,13 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
SELECT `timestamp` FROM `/Root/olapStore/olapTable`
WHERE
(`timestamp` < CAST(1000100 AS Timestamp) AND `timestamp` > CAST(1000095 AS Timestamp)) OR
+ (`timestamp` < CAST(1000300 AS Timestamp) AND `timestamp` >= CAST(1000295 AS Timestamp)) OR
+ (`timestamp` <= CAST(1000400 AS Timestamp) AND `timestamp` > CAST(1000395 AS Timestamp)) OR
+
+ (`timestamp` <= CAST(1000500 AS Timestamp) AND `timestamp` >= CAST(1000495 AS Timestamp)) OR
+ (`timestamp` <= CAST(1000505 AS Timestamp) AND `timestamp` >= CAST(1000499 AS Timestamp)) OR
+ (`timestamp` < CAST(1000510 AS Timestamp) AND `timestamp` >= CAST(1000505 AS Timestamp)) OR
+
(`timestamp` <= CAST(1001000 AS Timestamp) AND `timestamp` >= CAST(1000999 AS Timestamp)) OR
(`timestamp` > CAST(1002000 AS Timestamp))
ORDER BY `timestamp` DESC;
@@ -1136,11 +1147,19 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
auto rows = ExecuteScanQuery(tableClient, selectQuery);
TInstant tsPrev = TInstant::MicroSeconds(2000000);
+ std::set<ui64> results = { 1000096, 1000097, 1000098, 1000099,
+ 1000999, 1001000,
+ 1000295, 1000296, 1000297, 1000298, 1000299,
+ 1000396, 1000397, 1000398, 1000399, 1000400,
+ 1000495, 1000496, 1000497, 1000498, 1000499, 1000500, 1000501, 1000502, 1000503, 1000504, 1000505, 1000506, 1000507, 1000508, 1000509 };
+ const ui32 expectedCount = results.size();
for (const auto& r : rows) {
TInstant ts = GetTimestamp(r.at("timestamp"));
UNIT_ASSERT_LE_C(ts, tsPrev, "result is not sorted in DESC order");
+ UNIT_ASSERT(results.erase(ts.GetValue()));
tsPrev = ts;
}
+ UNIT_ASSERT(rows.size() == expectedCount);
}
Y_UNIT_TEST_TWIN(PredicatePushdown, UseSessionActor) {
@@ -1744,6 +1763,93 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
UNIT_ASSERT_VALUES_EQUAL(result, insertRows);
}
+ Y_UNIT_TEST(GranulesInShard) {
+ TPortManager tp;
+ ui16 mbusport = tp.GetPort(2134);
+ auto settings = Tests::TServerSettings(mbusport)
+ .SetDomainName("Root")
+ .SetUseRealThreads(false)
+ .SetNodeCount(2);
+
+ Tests::TServer::TPtr server = new Tests::TServer(settings);
+
+ auto runtime = server->GetRuntime();
+ auto sender = runtime->AllocateEdgeActor();
+
+ InitRoot(server, sender);
+ EnableDebugLogging(runtime);
+
+ ui32 numShards = 1;
+ ui32 numIterations = 100;
+ CreateTestOlapTable(*server, "largeOlapTable", "largeOlapStore", numShards, numShards);
+ ui32 insertRows = 0;
+ const ui32 iterationPackSize = 2000;
+ for (ui64 i = 0; i < numIterations; ++i) {
+ SendDataViaActorSystem(runtime, "/Root/largeOlapStore/largeOlapTable", 0, 1000000 + i * 1000000, iterationPackSize);
+ insertRows += iterationPackSize;
+ }
+
+ ui64 result = 0;
+ bool testProcessed = false;
+ auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& ev) -> auto {
+ switch (ev->GetTypeRewrite()) {
+ case NKqp::TKqpComputeEvents::EvCostData:
+ {
+
+ auto* msg = ev->Get<NKqp::TEvKqpCompute::TEvCostData>();
+ if (msg->GetTableRanges().GetMarksCount()) {
+ Y_VERIFY(msg->GetSerializedTableRanges(1).size() == 1);
+ Y_VERIFY(msg->GetSerializedTableRanges(2).size() == 2);
+ testProcessed = true;
+ }
+ break;
+ }
+
+ case NKqp::TKqpExecuterEvents::EvShardsResolveStatus:
+ {
+
+ auto* msg = ev->Get<NKqp::TEvKqpExecuter::TEvShardsResolveStatus>();
+ for (auto& [shardId, nodeId] : msg->ShardNodes) {
+ Cerr << "-- nodeId: " << nodeId << Endl;
+ nodeId = runtime->GetNodeId(0);
+ }
+ break;
+ }
+
+ case NKqp::TKqpExecuterEvents::EvStreamData:
+ {
+ auto& record = ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record;
+
+ Cerr << (TStringBuilder() << "-- EvStreamData: " << record.AsJSON() << Endl);
+ Cerr.Flush();
+
+ Y_ASSERT(record.GetResultSet().rows().size() == 1);
+ Y_ASSERT(record.GetResultSet().rows().at(0).items().size() == 1);
+ result = record.GetResultSet().rows().at(0).items().at(0).uint64_value();
+
+ auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
+ resp->Record.SetEnough(false);
+ resp->Record.SetSeqNo(ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record.GetSeqNo());
+ resp->Record.SetFreeSpace(100);
+ runtime->Send(new IEventHandle(ev->Sender, sender, resp.Release()));
+ return TTestActorRuntime::EEventAction::DROP;
+ }
+ }
+ return TTestActorRuntime::EEventAction::PROCESS;
+ };
+
+ runtime->SetObserverFunc(captureEvents);
+ auto streamSender = runtime->AllocateEdgeActor();
+ const TInstant start = Now();
+ while (Now() - start < TDuration::Seconds(20) && !testProcessed) {
+ SendRequest(*runtime, streamSender, MakeStreamRequest(streamSender, "SELECT COUNT(*) FROM `/Root/largeOlapStore/largeOlapTable`;", false));
+ auto ev = runtime->GrabEdgeEventRethrow<NKqp::TEvKqp::TEvQueryResponse>(streamSender);
+ UNIT_ASSERT_VALUES_EQUAL(result, insertRows);
+ Sleep(TDuration::Seconds(1));
+ }
+ UNIT_ASSERT(testProcessed);
+ }
+
Y_UNIT_TEST(ManyColumnShardsWithRestarts) {
TPortManager tp;
ui16 mbusport = tp.GetPort(2134);
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index bca2befc98..0173752ce3 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -1005,6 +1005,8 @@ message TTableServiceConfig {
optional uint32 AggregationGroupByLimit = 1 [default = 256];
optional uint32 AggregationNoGroupLimit = 2 [default = 1024];
optional uint32 ScanLimit = 3 [default = 16];
+ optional bool ParallelScanningAvailable = 4 [default = false];
+ optional uint32 ShardSplitFactor = 5 [default = 1];
}
message TResourceManager {