diff options
author | gvit <gvit@ydb.tech> | 2022-07-10 20:38:19 +0300 |
---|---|---|
committer | gvit <gvit@ydb.tech> | 2022-07-10 20:38:19 +0300 |
commit | e2a74bfb7de7566fe3c99c4424725b03cc05db9c (patch) | |
tree | 3672c2cb214757b6cbdcf9425752e19e43956c57 | |
parent | 9b4f67031921f90daa279b28361aad4af068637d (diff) | |
download | ydb-e2a74bfb7de7566fe3c99c4424725b03cc05db9c.tar.gz |
fix last key handling for multiple shards
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_compute_actor.h | 9 | ||||
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_compute_actor_helpers.cpp | 21 | ||||
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp | 22 | ||||
-rw-r--r-- | ydb/core/kqp/executer/kqp_planner.cpp | 2 | ||||
-rw-r--r-- | ydb/core/kqp/ut/kqp_olap_ut.cpp | 34 |
5 files changed, 45 insertions, 43 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.h b/ydb/core/kqp/compute_actor/kqp_compute_actor.h index 6f26a4830a..d9c57edc27 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor.h +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.h @@ -4,8 +4,6 @@ #include <ydb/core/kqp/kqp_compute.h> #include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h> #include <ydb/core/scheme/scheme_tabledefs.h> -#include <ydb/core/base/appdata.h> -#include <ydb/core/tx/datashard/range_ops.h> namespace NKikimr { namespace NMiniKQL { @@ -43,7 +41,6 @@ std::string_view EShardStateToString(EShardState state); bool FindSchemeErrorInIssues(const Ydb::StatusIds::StatusCode& status, const NYql::TIssues& issues); -// scan over datashards struct TShardState { ui64 TabletId; TSmallVec<TSerializedTableRange> Ranges; @@ -57,6 +54,9 @@ struct TShardState { TActorId RetryTimer; ui32 ResolveAttempt = 0; TActorId ActorId; + TOwnedCellVec LastKey; + + TString PrintLastKey(TConstArrayRef<NScheme::TTypeId> keyTypes) const; explicit TShardState(ui64 tabletId) : TabletId(tabletId) {} @@ -65,8 +65,7 @@ struct TShardState { void ResetRetry(); TString ToString(TConstArrayRef<NScheme::TTypeId> keyTypes) const; - const TSmallVec<TSerializedTableRange> GetScanRanges( - TConstArrayRef<NScheme::TTypeId> keyTypes, const TOwnedCellVec& lastKey) const; + const TSmallVec<TSerializedTableRange> GetScanRanges(TConstArrayRef<NScheme::TTypeId> keyTypes) const; }; } // namespace NComputeActor diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor_helpers.cpp b/ydb/core/kqp/compute_actor/kqp_compute_actor_helpers.cpp index 5e9e49250d..5eee256a53 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_actor_helpers.cpp +++ b/ydb/core/kqp/compute_actor/kqp_compute_actor_helpers.cpp @@ -1,6 +1,7 @@ #include "kqp_compute_actor.h" #include <ydb/core/base/appdata.h> +#include <ydb/core/tx/datashard/range_ops.h> #include <ydb/library/yql/core/issue/yql_issue.h> namespace NKikimr::NKqp::NComputeActor { @@ -29,6 +30,13 @@ void TShardState::ResetRetry() { ResolveAttempt = 0; } +TString TShardState::PrintLastKey(TConstArrayRef<NScheme::TTypeId> keyTypes) const { + if (LastKey.empty()) { + return "<none>"; + } + return DebugPrintPoint(keyTypes, LastKey, *AppData()->TypeRegistry); +} + TDuration TShardState::CalcRetryDelay() { if (std::exchange(AllowInstantRetry, false)) { return TDuration::Zero(); @@ -45,7 +53,8 @@ TDuration TShardState::CalcRetryDelay() { TString TShardState::ToString(TConstArrayRef<NScheme::TTypeId> keyTypes) const { TStringBuilder sb; sb << "TShardState{ TabletId: " << TabletId << ", State: " << EShardStateToString(State) - << ", Gen: " << Generation << ", Ranges: ["; + << ", Gen: " << Generation << ", Last Key " << TShardState::PrintLastKey(keyTypes) + << ", Ranges: ["; for (size_t i = 0; i < Ranges.size(); ++i) { sb << "#" << i << ": " << DebugPrintRange(keyTypes, Ranges[i].ToTableRange(), *AppData()->TypeRegistry); if (i + 1 != Ranges.size()) { @@ -58,9 +67,9 @@ TString TShardState::ToString(TConstArrayRef<NScheme::TTypeId> keyTypes) const { return sb; } -const TSmallVec<TSerializedTableRange> TShardState::GetScanRanges(TConstArrayRef<NScheme::TTypeId> keyTypes, const TOwnedCellVec& lastKey) const { +const TSmallVec<TSerializedTableRange> TShardState::GetScanRanges(TConstArrayRef<NScheme::TTypeId> keyTypes) const { // No any data read previously, return all ranges - if (!lastKey.DataSize()) { + if (!LastKey.DataSize()) { return Ranges; } @@ -68,10 +77,10 @@ const TSmallVec<TSerializedTableRange> TShardState::GetScanRanges(TConstArrayRef TVector<TSerializedTableRange> ranges; ranges.reserve(Ranges.size()); - YQL_ENSURE(keyTypes.size() == lastKey.size(), "Key columns size != last key"); + YQL_ENSURE(keyTypes.size() == LastKey.size(), "Key columns size != last key"); for (auto rangeIt = Ranges.begin(); rangeIt != Ranges.end(); ++rangeIt) { - int cmp = ComparePointAndRange(lastKey, rangeIt->ToTableRange(), keyTypes, keyTypes); + int cmp = ComparePointAndRange(LastKey, rangeIt->ToTableRange(), keyTypes, keyTypes); YQL_ENSURE(cmp >= 0, "Missed intersection of LastKey and range."); @@ -81,7 +90,7 @@ const TSmallVec<TSerializedTableRange> TShardState::GetScanRanges(TConstArrayRef // It is range, where read was interrupted. Restart operation from last read key. ranges.emplace_back(std::move(TSerializedTableRange( - TSerializedCellVec::Serialize(lastKey), rangeIt->To.GetBuffer(), false, rangeIt->ToInclusive + TSerializedCellVec::Serialize(LastKey), rangeIt->To.GetBuffer(), false, rangeIt->ToInclusive ))); // And push all others diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp index 12ab5c738b..034f202f40 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp @@ -338,7 +338,7 @@ private: YQL_ENSURE(state->ActorId == ev->Sender, "expected: " << state->ActorId << ", got: " << ev->Sender); - LastKey = std::move(msg.LastKey); + state->LastKey = std::move(msg.LastKey); ui64 bytes = 0; ui64 rowsCount = 0; { @@ -367,7 +367,6 @@ private: CA_LOG_D("Got EvScanData, rows: " << rowsCount << ", bytes: " << bytes << ", finished: " << msg.Finished << ", from: " << ev->Sender << ", shards remain: " << PendingShards.size() << ", in flight shards " << InFlightShards.size() - << ", LastKey " << PrintLastKey() << ", delayed for: " << latency.SecondsFloat() << " seconds by ratelimiter" << ", tabletId: " << state->TabletId); @@ -592,11 +591,14 @@ private: PendingShards.emplace_front(std::move(newShards[i])); } + if (!state.LastKey.empty()) { + PendingShards.front().LastKey = std::move(state.LastKey); + } + if (IsDebugLogEnabled(TlsActivationContext->ActorSystem(), NKikimrServices::KQP_COMPUTE) && PendingShards.size() + InFlightShards.size() > 0) { TStringBuilder sb; - sb << "Last Key: " << PrintLastKey() << "; "; if (!PendingShards.empty()) { sb << "Pending shards States: "; for (auto& st : PendingShards) { @@ -709,8 +711,8 @@ private: } ev->Record.MutableSkipNullKeys()->CopyFrom(Meta.GetSkipNullKeys()); - CA_LOG_D("Start scan request, " << state->ToString(KeyColumnTypes) << ", LastKey: " << PrintLastKey()); - auto ranges = state->GetScanRanges(KeyColumnTypes, LastKey); + CA_LOG_D("Start scan request, " << state->ToString(KeyColumnTypes)); + auto ranges = state->GetScanRanges(KeyColumnTypes); auto protoRanges = ev->Record.MutableRanges(); protoRanges->Reserve(ranges.size()); @@ -782,7 +784,7 @@ private: state->SubscribedOnTablet = false; auto retryDelay = state->CalcRetryDelay(); CA_LOG_W("TKqpScanComputeActor: broken pipe with tablet " << state->TabletId - << ", restarting scan from last received key " << PrintLastKey() + << ", restarting scan from last received key " << state->PrintLastKey(KeyColumnTypes) << ", attempt #" << state->RetryAttempt << " (total " << state->TotalRetries << ")" << " schedule after " << retryDelay); @@ -964,13 +966,6 @@ private: TBase::PassAway(); } - TString PrintLastKey() const { - if (LastKey.empty()) { - return "<none>"; - } - return DebugPrintPoint(KeyColumnTypes, LastKey, *AppData()->TypeRegistry); - } - template<class TMessage> TShardState* GetShardState(const TMessage& msg, const TActorId& scanActorId) { ui32 generation; @@ -1027,7 +1022,6 @@ private: NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta Meta; TVector<NScheme::TTypeId> KeyColumnTypes; NMiniKQL::TKqpScanComputeContext::TScanData* ScanData = nullptr; - TOwnedCellVec LastKey; std::deque<std::pair<TEvKqpCompute::TEvScanData::TPtr, TInstant>> PendingScanData; std::deque<TShardState> PendingShards; std::deque<TShardState> PendingResolveShards; diff --git a/ydb/core/kqp/executer/kqp_planner.cpp b/ydb/core/kqp/executer/kqp_planner.cpp index 6c7b2068a3..385dd1fe68 100644 --- a/ydb/core/kqp/executer/kqp_planner.cpp +++ b/ydb/core/kqp/executer/kqp_planner.cpp @@ -175,6 +175,7 @@ void TKqpPlanner::Process(const TVector<NKikimrKqp::TKqpNodeResources>& snapshot AddScansToKqpNodeRequest(ev, nodeId); auto target = MakeKqpNodeServiceID(nodeId); + LOG_D("Send request to kqpnode: " << target << ", node_id: " << SelfId().NodeId() << ", TxId: " << TxId); TlsActivationContext->Send(new IEventHandle(target, ExecuterId, ev.Release(), CalcSendMessageFlagsForNode(target.NodeId()))); } @@ -198,6 +199,7 @@ void TKqpPlanner::RunLocal(const TVector<NKikimrKqp::TKqpNodeResources>& snapsho AddScansToKqpNodeRequest(ev, SelfId().NodeId()); auto target = MakeKqpNodeServiceID(SelfId().NodeId()); + LOG_D("Send request to kqpnode: " << target << ", node_id: " << SelfId().NodeId() << ", TxId: " << TxId); TlsActivationContext->Send(new IEventHandle(target, ExecuterId, ev.Release(), IEventHandle::FlagTrackDelivery)); diff --git a/ydb/core/kqp/ut/kqp_olap_ut.cpp b/ydb/core/kqp/ut/kqp_olap_ut.cpp index e145eb779a..663291d100 100644 --- a/ydb/core/kqp/ut/kqp_olap_ut.cpp +++ b/ydb/core/kqp/ut/kqp_olap_ut.cpp @@ -1452,10 +1452,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) { } Y_UNIT_TEST_TWIN(ManyColumnShardsWithRestarts, UseSessionActor) { - // remove this return when bug with scan is fixed. - // todo: KIKIMR-15200 - return; - TPortManager tp; ui16 mbusport = tp.GetPort(2134); auto settings = Tests::TServerSettings(mbusport) @@ -1483,8 +1479,8 @@ Y_UNIT_TEST_SUITE(KqpOlap) { ui64 result = 0; THashSet<TActorId> columnShardScans; - ui64 rebootedScanCount = 0; std::set<ui64> tabletIds; + bool prevIsFinished = false; auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle> &ev) -> auto { switch (ev->GetTypeRewrite()) { @@ -1518,22 +1514,24 @@ Y_UNIT_TEST_SUITE(KqpOlap) { } case NKqp::TKqpComputeEvents::EvScanData: { - auto it = columnShardScans.find(ev->Sender); - if (it != columnShardScans.end()) { - ++rebootedScanCount; - if (rebootedScanCount == 1) { - ui64 tabletIdToKill = *tabletIds.begin(); - NKikimr::RebootTablet(*runtime, tabletIdToKill, sender); - Cerr << (TStringBuilder() << "-- EvScanData from " << ev->Sender << ": hijack event, kill tablet " << tabletIdToKill << Endl); + auto [it, success] = columnShardScans.emplace(ev->Sender); + auto* msg = ev->Get<NKqp::TEvKqpCompute::TEvScanData>(); + Cerr << (TStringBuilder() << "-- EvScanData from " << ev->Sender << Endl); + if (success) { + // first scan response. + prevIsFinished = msg->Finished; + return TTestActorRuntime::EEventAction::PROCESS; + } else { + if (prevIsFinished) { + Cerr << (TStringBuilder() << "-- EvScanData from " << ev->Sender << ": hijack event"); Cerr.Flush(); + auto resp = std::make_unique<NKqp::TEvKqpCompute::TEvScanError>(msg->Generation); + runtime->Send(new IEventHandle(ev->Recipient, ev->Sender, resp.release())); + } else { + prevIsFinished = msg->Finished; } - } else { - columnShardScans.insert(ev->Sender); - runtime->EnableScheduleForActor(ev->Sender); - Cerr << (TStringBuilder() << "-- EvScanData from " << ev->Sender << Endl); - Cerr.Flush(); + return TTestActorRuntime::EEventAction::PROCESS; } - break; } |