aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgvit <gvit@ydb.tech>2022-07-10 20:38:19 +0300
committergvit <gvit@ydb.tech>2022-07-10 20:38:19 +0300
commite2a74bfb7de7566fe3c99c4424725b03cc05db9c (patch)
tree3672c2cb214757b6cbdcf9425752e19e43956c57
parent9b4f67031921f90daa279b28361aad4af068637d (diff)
downloadydb-e2a74bfb7de7566fe3c99c4424725b03cc05db9c.tar.gz
fix last key handling for multiple shards
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_actor.h9
-rw-r--r--ydb/core/kqp/compute_actor/kqp_compute_actor_helpers.cpp21
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp22
-rw-r--r--ydb/core/kqp/executer/kqp_planner.cpp2
-rw-r--r--ydb/core/kqp/ut/kqp_olap_ut.cpp34
5 files changed, 45 insertions, 43 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor.h b/ydb/core/kqp/compute_actor/kqp_compute_actor.h
index 6f26a4830a..d9c57edc27 100644
--- a/ydb/core/kqp/compute_actor/kqp_compute_actor.h
+++ b/ydb/core/kqp/compute_actor/kqp_compute_actor.h
@@ -4,8 +4,6 @@
#include <ydb/core/kqp/kqp_compute.h>
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
#include <ydb/core/scheme/scheme_tabledefs.h>
-#include <ydb/core/base/appdata.h>
-#include <ydb/core/tx/datashard/range_ops.h>
namespace NKikimr {
namespace NMiniKQL {
@@ -43,7 +41,6 @@ std::string_view EShardStateToString(EShardState state);
bool FindSchemeErrorInIssues(const Ydb::StatusIds::StatusCode& status, const NYql::TIssues& issues);
-// scan over datashards
struct TShardState {
ui64 TabletId;
TSmallVec<TSerializedTableRange> Ranges;
@@ -57,6 +54,9 @@ struct TShardState {
TActorId RetryTimer;
ui32 ResolveAttempt = 0;
TActorId ActorId;
+ TOwnedCellVec LastKey;
+
+ TString PrintLastKey(TConstArrayRef<NScheme::TTypeId> keyTypes) const;
explicit TShardState(ui64 tabletId)
: TabletId(tabletId) {}
@@ -65,8 +65,7 @@ struct TShardState {
void ResetRetry();
TString ToString(TConstArrayRef<NScheme::TTypeId> keyTypes) const;
- const TSmallVec<TSerializedTableRange> GetScanRanges(
- TConstArrayRef<NScheme::TTypeId> keyTypes, const TOwnedCellVec& lastKey) const;
+ const TSmallVec<TSerializedTableRange> GetScanRanges(TConstArrayRef<NScheme::TTypeId> keyTypes) const;
};
} // namespace NComputeActor
diff --git a/ydb/core/kqp/compute_actor/kqp_compute_actor_helpers.cpp b/ydb/core/kqp/compute_actor/kqp_compute_actor_helpers.cpp
index 5e9e49250d..5eee256a53 100644
--- a/ydb/core/kqp/compute_actor/kqp_compute_actor_helpers.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_compute_actor_helpers.cpp
@@ -1,6 +1,7 @@
#include "kqp_compute_actor.h"
#include <ydb/core/base/appdata.h>
+#include <ydb/core/tx/datashard/range_ops.h>
#include <ydb/library/yql/core/issue/yql_issue.h>
namespace NKikimr::NKqp::NComputeActor {
@@ -29,6 +30,13 @@ void TShardState::ResetRetry() {
ResolveAttempt = 0;
}
+TString TShardState::PrintLastKey(TConstArrayRef<NScheme::TTypeId> keyTypes) const {
+ if (LastKey.empty()) {
+ return "<none>";
+ }
+ return DebugPrintPoint(keyTypes, LastKey, *AppData()->TypeRegistry);
+}
+
TDuration TShardState::CalcRetryDelay() {
if (std::exchange(AllowInstantRetry, false)) {
return TDuration::Zero();
@@ -45,7 +53,8 @@ TDuration TShardState::CalcRetryDelay() {
TString TShardState::ToString(TConstArrayRef<NScheme::TTypeId> keyTypes) const {
TStringBuilder sb;
sb << "TShardState{ TabletId: " << TabletId << ", State: " << EShardStateToString(State)
- << ", Gen: " << Generation << ", Ranges: [";
+ << ", Gen: " << Generation << ", Last Key " << TShardState::PrintLastKey(keyTypes)
+ << ", Ranges: [";
for (size_t i = 0; i < Ranges.size(); ++i) {
sb << "#" << i << ": " << DebugPrintRange(keyTypes, Ranges[i].ToTableRange(), *AppData()->TypeRegistry);
if (i + 1 != Ranges.size()) {
@@ -58,9 +67,9 @@ TString TShardState::ToString(TConstArrayRef<NScheme::TTypeId> keyTypes) const {
return sb;
}
-const TSmallVec<TSerializedTableRange> TShardState::GetScanRanges(TConstArrayRef<NScheme::TTypeId> keyTypes, const TOwnedCellVec& lastKey) const {
+const TSmallVec<TSerializedTableRange> TShardState::GetScanRanges(TConstArrayRef<NScheme::TTypeId> keyTypes) const {
// No any data read previously, return all ranges
- if (!lastKey.DataSize()) {
+ if (!LastKey.DataSize()) {
return Ranges;
}
@@ -68,10 +77,10 @@ const TSmallVec<TSerializedTableRange> TShardState::GetScanRanges(TConstArrayRef
TVector<TSerializedTableRange> ranges;
ranges.reserve(Ranges.size());
- YQL_ENSURE(keyTypes.size() == lastKey.size(), "Key columns size != last key");
+ YQL_ENSURE(keyTypes.size() == LastKey.size(), "Key columns size != last key");
for (auto rangeIt = Ranges.begin(); rangeIt != Ranges.end(); ++rangeIt) {
- int cmp = ComparePointAndRange(lastKey, rangeIt->ToTableRange(), keyTypes, keyTypes);
+ int cmp = ComparePointAndRange(LastKey, rangeIt->ToTableRange(), keyTypes, keyTypes);
YQL_ENSURE(cmp >= 0, "Missed intersection of LastKey and range.");
@@ -81,7 +90,7 @@ const TSmallVec<TSerializedTableRange> TShardState::GetScanRanges(TConstArrayRef
// It is range, where read was interrupted. Restart operation from last read key.
ranges.emplace_back(std::move(TSerializedTableRange(
- TSerializedCellVec::Serialize(lastKey), rangeIt->To.GetBuffer(), false, rangeIt->ToInclusive
+ TSerializedCellVec::Serialize(LastKey), rangeIt->To.GetBuffer(), false, rangeIt->ToInclusive
)));
// And push all others
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
index 12ab5c738b..034f202f40 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
@@ -338,7 +338,7 @@ private:
YQL_ENSURE(state->ActorId == ev->Sender, "expected: " << state->ActorId << ", got: " << ev->Sender);
- LastKey = std::move(msg.LastKey);
+ state->LastKey = std::move(msg.LastKey);
ui64 bytes = 0;
ui64 rowsCount = 0;
{
@@ -367,7 +367,6 @@ private:
CA_LOG_D("Got EvScanData, rows: " << rowsCount << ", bytes: " << bytes << ", finished: " << msg.Finished
<< ", from: " << ev->Sender << ", shards remain: " << PendingShards.size()
<< ", in flight shards " << InFlightShards.size()
- << ", LastKey " << PrintLastKey()
<< ", delayed for: " << latency.SecondsFloat() << " seconds by ratelimiter"
<< ", tabletId: " << state->TabletId);
@@ -592,11 +591,14 @@ private:
PendingShards.emplace_front(std::move(newShards[i]));
}
+ if (!state.LastKey.empty()) {
+ PendingShards.front().LastKey = std::move(state.LastKey);
+ }
+
if (IsDebugLogEnabled(TlsActivationContext->ActorSystem(), NKikimrServices::KQP_COMPUTE)
&& PendingShards.size() + InFlightShards.size() > 0)
{
TStringBuilder sb;
- sb << "Last Key: " << PrintLastKey() << "; ";
if (!PendingShards.empty()) {
sb << "Pending shards States: ";
for (auto& st : PendingShards) {
@@ -709,8 +711,8 @@ private:
}
ev->Record.MutableSkipNullKeys()->CopyFrom(Meta.GetSkipNullKeys());
- CA_LOG_D("Start scan request, " << state->ToString(KeyColumnTypes) << ", LastKey: " << PrintLastKey());
- auto ranges = state->GetScanRanges(KeyColumnTypes, LastKey);
+ CA_LOG_D("Start scan request, " << state->ToString(KeyColumnTypes));
+ auto ranges = state->GetScanRanges(KeyColumnTypes);
auto protoRanges = ev->Record.MutableRanges();
protoRanges->Reserve(ranges.size());
@@ -782,7 +784,7 @@ private:
state->SubscribedOnTablet = false;
auto retryDelay = state->CalcRetryDelay();
CA_LOG_W("TKqpScanComputeActor: broken pipe with tablet " << state->TabletId
- << ", restarting scan from last received key " << PrintLastKey()
+ << ", restarting scan from last received key " << state->PrintLastKey(KeyColumnTypes)
<< ", attempt #" << state->RetryAttempt << " (total " << state->TotalRetries << ")"
<< " schedule after " << retryDelay);
@@ -964,13 +966,6 @@ private:
TBase::PassAway();
}
- TString PrintLastKey() const {
- if (LastKey.empty()) {
- return "<none>";
- }
- return DebugPrintPoint(KeyColumnTypes, LastKey, *AppData()->TypeRegistry);
- }
-
template<class TMessage>
TShardState* GetShardState(const TMessage& msg, const TActorId& scanActorId) {
ui32 generation;
@@ -1027,7 +1022,6 @@ private:
NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta Meta;
TVector<NScheme::TTypeId> KeyColumnTypes;
NMiniKQL::TKqpScanComputeContext::TScanData* ScanData = nullptr;
- TOwnedCellVec LastKey;
std::deque<std::pair<TEvKqpCompute::TEvScanData::TPtr, TInstant>> PendingScanData;
std::deque<TShardState> PendingShards;
std::deque<TShardState> PendingResolveShards;
diff --git a/ydb/core/kqp/executer/kqp_planner.cpp b/ydb/core/kqp/executer/kqp_planner.cpp
index 6c7b2068a3..385dd1fe68 100644
--- a/ydb/core/kqp/executer/kqp_planner.cpp
+++ b/ydb/core/kqp/executer/kqp_planner.cpp
@@ -175,6 +175,7 @@ void TKqpPlanner::Process(const TVector<NKikimrKqp::TKqpNodeResources>& snapshot
AddScansToKqpNodeRequest(ev, nodeId);
auto target = MakeKqpNodeServiceID(nodeId);
+ LOG_D("Send request to kqpnode: " << target << ", node_id: " << SelfId().NodeId() << ", TxId: " << TxId);
TlsActivationContext->Send(new IEventHandle(target, ExecuterId, ev.Release(),
CalcSendMessageFlagsForNode(target.NodeId())));
}
@@ -198,6 +199,7 @@ void TKqpPlanner::RunLocal(const TVector<NKikimrKqp::TKqpNodeResources>& snapsho
AddScansToKqpNodeRequest(ev, SelfId().NodeId());
auto target = MakeKqpNodeServiceID(SelfId().NodeId());
+ LOG_D("Send request to kqpnode: " << target << ", node_id: " << SelfId().NodeId() << ", TxId: " << TxId);
TlsActivationContext->Send(new IEventHandle(target, ExecuterId, ev.Release(), IEventHandle::FlagTrackDelivery));
diff --git a/ydb/core/kqp/ut/kqp_olap_ut.cpp b/ydb/core/kqp/ut/kqp_olap_ut.cpp
index e145eb779a..663291d100 100644
--- a/ydb/core/kqp/ut/kqp_olap_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_olap_ut.cpp
@@ -1452,10 +1452,6 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
}
Y_UNIT_TEST_TWIN(ManyColumnShardsWithRestarts, UseSessionActor) {
- // remove this return when bug with scan is fixed.
- // todo: KIKIMR-15200
- return;
-
TPortManager tp;
ui16 mbusport = tp.GetPort(2134);
auto settings = Tests::TServerSettings(mbusport)
@@ -1483,8 +1479,8 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
ui64 result = 0;
THashSet<TActorId> columnShardScans;
- ui64 rebootedScanCount = 0;
std::set<ui64> tabletIds;
+ bool prevIsFinished = false;
auto captureEvents = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle> &ev) -> auto {
switch (ev->GetTypeRewrite()) {
@@ -1518,22 +1514,24 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
}
case NKqp::TKqpComputeEvents::EvScanData: {
- auto it = columnShardScans.find(ev->Sender);
- if (it != columnShardScans.end()) {
- ++rebootedScanCount;
- if (rebootedScanCount == 1) {
- ui64 tabletIdToKill = *tabletIds.begin();
- NKikimr::RebootTablet(*runtime, tabletIdToKill, sender);
- Cerr << (TStringBuilder() << "-- EvScanData from " << ev->Sender << ": hijack event, kill tablet " << tabletIdToKill << Endl);
+ auto [it, success] = columnShardScans.emplace(ev->Sender);
+ auto* msg = ev->Get<NKqp::TEvKqpCompute::TEvScanData>();
+ Cerr << (TStringBuilder() << "-- EvScanData from " << ev->Sender << Endl);
+ if (success) {
+ // first scan response.
+ prevIsFinished = msg->Finished;
+ return TTestActorRuntime::EEventAction::PROCESS;
+ } else {
+ if (prevIsFinished) {
+ Cerr << (TStringBuilder() << "-- EvScanData from " << ev->Sender << ": hijack event");
Cerr.Flush();
+ auto resp = std::make_unique<NKqp::TEvKqpCompute::TEvScanError>(msg->Generation);
+ runtime->Send(new IEventHandle(ev->Recipient, ev->Sender, resp.release()));
+ } else {
+ prevIsFinished = msg->Finished;
}
- } else {
- columnShardScans.insert(ev->Sender);
- runtime->EnableScheduleForActor(ev->Sender);
- Cerr << (TStringBuilder() << "-- EvScanData from " << ev->Sender << Endl);
- Cerr.Flush();
+ return TTestActorRuntime::EEventAction::PROCESS;
}
-
break;
}