summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVitalii Gridnev <[email protected]>2022-06-09 21:36:43 +0300
committerVitalii Gridnev <[email protected]>2022-06-09 21:36:43 +0300
commit711e8dcfc5597da133348eb7db05d01ee1f0ce2b (patch)
treeec24d5c8934300c2cec93d12dd6d4c1030fb0200
parenteb97595367a096742a25b6caeb947d6543f0fc2c (diff)
[kqp] support multiple shards reading: preparations for multiple shards in flight KIKIMR-15042
ref:b51d5b46bef7a3b26eb7d3dc0a2c2b9de070a195
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp530
1 files changed, 258 insertions, 272 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
index d8f8441c029..d150bbd65c3 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
@@ -119,7 +119,7 @@ public:
ScanData->TableReader = CreateKqpTableReader(*ScanData);
for (const auto& read : Meta.GetReads()) {
- auto& state = Shards.emplace_back(TShardState(read.GetShardId()));
+ auto& state = PendingShards.emplace_back(TShardState(read.GetShardId()));
state.Ranges.reserve(read.GetKeyRanges().size());
for (const auto& range : read.GetKeyRanges()) {
auto& sr = state.Ranges.emplace_back(TSerializedTableRange(range));
@@ -236,96 +236,60 @@ private:
}
void HandleExecute(TEvKqpCompute::TEvScanInitActor::TPtr& ev) {
- Y_VERIFY_DEBUG(ScanData);
- Y_VERIFY_DEBUG(!Shards.empty());
-
+ YQL_ENSURE(ScanData);
auto& msg = ev->Get()->Record;
-
- auto& state = Shards.front();
auto scanActorId = ActorIdFromProto(msg.GetScanActorId());
+ auto* state = GetShardState(msg, scanActorId);
+ if (!state)
+ return;
CA_LOG_D("Got EvScanInitActor from " << scanActorId << ", gen: " << msg.GetGeneration()
- << ", state: " << EShardStateToString(state.State) << ", stateGen: " << state.Generation);
-
- switch (state.State) {
- case EShardState::Starting: {
- if (state.Generation == msg.GetGeneration()) {
- state.State = EShardState::Running;
- state.ActorId = scanActorId;
-
- state.ResetRetry();
-
- AffectedShards.insert(state.TabletId);
-
- SendScanDataAck(state, GetMemoryLimits().ScanBufferSize);
- return;
- }
+ << ", state: " << EShardStateToString(state->State) << ", stateGen: " << state->Generation);
- if (state.Generation > msg.GetGeneration()) {
- TerminateExpiredScan(scanActorId, "Got expired EvScanInitActor, terminate it");
- return;
- }
+ YQL_ENSURE(state->Generation == msg.GetGeneration());
- YQL_ENSURE(false, "Got EvScanInitActor from the future, gen: " << msg.GetGeneration()
- << ", expected: " << state.Generation);
- }
-
- case EShardState::Initial:
- case EShardState::Running:
- case EShardState::PostRunning:
- case EShardState::Resolving: {
- TerminateExpiredScan(scanActorId, "Got unexpected/expired EvScanInitActor, terminate it");
- return;
- }
+ if (state->State == EShardState::Starting) {
+ state->State = EShardState::Running;
+ state->ActorId = scanActorId;
+ state->ResetRetry();
+ AffectedShards.insert(state->TabletId);
+ SendScanDataAck(state, GetMemoryLimits().ScanBufferSize);
+ } else {
+ TerminateExpiredScan(scanActorId, "Got unexpected/expired EvScanInitActor, terminate it");
}
}
void HandleExecute(TEvKqpCompute::TEvScanData::TPtr& ev) {
- Y_VERIFY_DEBUG(ScanData);
- Y_VERIFY_DEBUG(!Shards.empty());
-
+ YQL_ENSURE(ScanData);
auto& msg = *ev->Get();
+ auto* state = GetShardState(msg, ev->Sender);
+ if (!state)
+ return;
- auto& state = Shards.front();
-
- switch (state.State) {
- case EShardState::Running: {
- if (state.Generation == msg.Generation) {
- YQL_ENSURE(state.ActorId == ev->Sender, "expected: " << state.ActorId << ", got: " << ev->Sender);
-
- TInstant startTime = TActivationContext::Now();
- if (ev->Get()->Finished) {
- state.State = EShardState::PostRunning;
- }
- PendingScanData.emplace_back(std::make_pair(ev, startTime));
+ YQL_ENSURE(state->Generation == msg.Generation);
+ if (state->State != EShardState::Running) {
+ return TerminateExpiredScan(ev->Sender, "Cancel expired scan");
+ }
- if (IsQuotingEnabled()) {
- AcquireRateQuota();
- } else {
- ProcessScanData();
- }
+ YQL_ENSURE(state->ActorId == ev->Sender, "expected: " << state->ActorId << ", got: " << ev->Sender);
+ TInstant startTime = TActivationContext::Now();
+ if (ev->Get()->Finished) {
+ state->State = EShardState::PostRunning;
+ }
+ PendingScanData.emplace_back(std::make_pair(ev, startTime));
- } else if (state.Generation > msg.Generation) {
- TerminateExpiredScan(ev->Sender, "Cancel expired scan");
- } else {
- YQL_ENSURE(false, "EvScanData from the future, expected: " << state.Generation << ", got: " << msg.Generation);
- }
- break;
- }
- case EShardState::PostRunning:
- TerminateExpiredScan(ev->Sender, "Unexpected data after finish");
- break;
- case EShardState::Initial:
- case EShardState::Starting:
- case EShardState::Resolving:
- TerminateExpiredScan(ev->Sender, "Cancel unexpected scan");
- break;
+ if (IsQuotingEnabled()) {
+ AcquireRateQuota();
+ } else {
+ ProcessScanData();
}
}
void ProcessPendingScanDataItem(TEvKqpCompute::TEvScanData::TPtr& ev, const TInstant& enqueuedAt) {
- auto& state = Shards.front();
auto& msg = *ev->Get();
+ auto* state = GetShardState(msg, ev->Sender);
+ if (!state)
+ return;
TDuration latency;
if (enqueuedAt != TInstant::Zero()) {
@@ -333,7 +297,7 @@ private:
Counters->ScanQueryRateLimitLatency->Collect(latency.MilliSeconds());
}
- YQL_ENSURE(state.ActorId == ev->Sender, "expected: " << state.ActorId << ", got: " << ev->Sender);
+ YQL_ENSURE(state->ActorId == ev->Sender, "expected: " << state->ActorId << ", got: " << ev->Sender);
LastKey = std::move(msg.LastKey);
ui64 bytes = 0;
@@ -344,14 +308,14 @@ private:
case NKikimrTxDataShard::EScanDataFormat::CELLVEC:
case NKikimrTxDataShard::EScanDataFormat::UNSPECIFIED: {
if (!msg.Rows.empty()) {
- bytes = ScanData->AddRows(msg.Rows, state.TabletId, TaskRunner->GetHolderFactory());
+ bytes = ScanData->AddRows(msg.Rows, state->TabletId, TaskRunner->GetHolderFactory());
rowsCount = msg.Rows.size();
}
break;
}
case NKikimrTxDataShard::EScanDataFormat::ARROW: {
if (msg.ArrowBatch != nullptr) {
- bytes = ScanData->AddRows(*msg.ArrowBatch, state.TabletId, TaskRunner->GetHolderFactory());
+ bytes = ScanData->AddRows(*msg.ArrowBatch, state->TabletId, TaskRunner->GetHolderFactory());
rowsCount = msg.ArrowBatch->num_rows();
}
break;
@@ -359,12 +323,12 @@ private:
}
}
-
CA_LOG_D("Got EvScanData, rows: " << rowsCount << ", bytes: " << bytes << ", finished: " << msg.Finished
- << ", from: " << ev->Sender << ", shards remain: " << Shards.size()
+ << ", from: " << ev->Sender << ", shards remain: " << PendingShards.size()
+ << ", in flight shards " << InFlightShards.size()
<< ", delayed for: " << latency.SecondsFloat() << " seconds by ratelimitter");
- if (rowsCount == 0 && !msg.Finished && state.State != EShardState::PostRunning) {
+ if (rowsCount == 0 && !msg.Finished && state->State != EShardState::PostRunning) {
ui64 freeSpace = GetMemoryLimits().ScanBufferSize > ScanData->GetStoredBytes()
? GetMemoryLimits().ScanBufferSize - ScanData->GetStoredBytes()
: 0ul;
@@ -372,12 +336,11 @@ private:
}
if (msg.Finished) {
- CA_LOG_D("Tablet " << state.TabletId << " scan finished, unlink");
- Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvUnlink(state.TabletId));
-
- Shards.pop_front();
+ CA_LOG_D("Tablet " << state->TabletId << " scan finished, unlink");
+ Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvUnlink(state->TabletId));
- if (!Shards.empty()) {
+ InFlightShards.erase(state->TabletId);
+ if (!PendingShards.empty()) {
CA_LOG_D("Starting next scan");
StartTableScan();
} else {
@@ -402,153 +365,97 @@ private:
}
void ProcessScanData() {
- Y_VERIFY_DEBUG(ScanData);
- Y_VERIFY_DEBUG(!Shards.empty());
- Y_VERIFY(!PendingScanData.empty());
+ YQL_ENSURE(ScanData);
+ YQL_ENSURE(!PendingScanData.empty());
auto ev = std::move(PendingScanData.front().first);
- TInstant enqueuedAt = std::move(PendingScanData.front().second);
+ auto enqueuedAt = std::move(PendingScanData.front().second);
PendingScanData.pop_front();
auto& msg = *ev->Get();
- auto& state = Shards.front();
-
- switch (state.State) {
- case EShardState::Running:
- case EShardState::PostRunning: {
- if (state.Generation == msg.Generation) {
- ProcessPendingScanDataItem(ev, enqueuedAt);
- DoExecute();
- } else if (state.Generation > msg.Generation) {
- TerminateExpiredScan(ev->Sender, "Cancel expired scan");
- } else {
- YQL_ENSURE(false, "EvScanData from the future, expected: " << state.Generation << ", got: " << msg.Generation);
- }
- break;
- }
+ auto* state = GetShardState(msg, ev->Sender);
+ if (!state)
+ return;
- case EShardState::Initial:
- case EShardState::Starting:
- case EShardState::Resolving: {
- TerminateExpiredScan(ev->Sender, "Cancel unexpected scan");
- break;
- }
+ if (state->State == EShardState::Running || state->State == EShardState::PostRunning) {
+ ProcessPendingScanDataItem(ev, enqueuedAt);
+ DoExecute();
+ } else {
+ TerminateExpiredScan(ev->Sender, "Cancel expired scan");
}
}
void HandleExecute(TEvKqpCompute::TEvScanError::TPtr& ev) {
- Y_VERIFY_DEBUG(ScanData);
- Y_VERIFY_DEBUG(!Shards.empty());
-
+ YQL_ENSURE(ScanData);
auto& msg = ev->Get()->Record;
Ydb::StatusIds::StatusCode status = msg.GetStatus();
TIssues issues;
IssuesFromMessage(msg.GetIssues(), issues);
- auto& state = Shards.front();
+ auto* state = GetShardState(msg, TActorId());
+ if (!state)
+ return;
- switch (state.State) {
- case EShardState::Starting: {
- if (state.Generation == msg.GetGeneration()) {
- CA_LOG_W("Got EvScanError while starting scan, status: " << Ydb::StatusIds_StatusCode_Name(status)
- << ", reason: " << issues.ToString());
+ CA_LOG_W("Got EvScanError scan state: " << EShardStateToString(state->State)
+ << " status: " << Ydb::StatusIds_StatusCode_Name(status)
+ << ", reason: " << issues.ToString());
- if (FindSchemeErrorInIssues(status, issues)) {
- ResolveShard(state);
- return;
- }
+ YQL_ENSURE(state->Generation == msg.GetGeneration());
- State = NDqProto::COMPUTE_STATE_FAILURE;
- ReportStateAndMaybeDie(YdbStatusToDqStatus(status), issues);
- return;
- }
-
- if (state.Generation > msg.GetGeneration()) {
- // expired message
- return;
- }
-
- YQL_ENSURE(false, "Got EvScanError from the future, expected: " << state.Generation
- << ", got: " << msg.GetGeneration());
- break;
+ if (state->State == EShardState::Starting) {
+ if (FindSchemeErrorInIssues(status, issues)) {
+ return EnqueueResolveShard(state);
}
- case EShardState::PostRunning:
- case EShardState::Running: {
- if (state.Generation == msg.GetGeneration()) {
- CA_LOG_W("Got EvScanError while running scan, status: " << Ydb::StatusIds_StatusCode_Name(status)
- << ", reason: " << issues.ToString() << ", restart");
-
- state.State = EShardState::Initial;
- state.ActorId = {};
- state.ResetRetry();
- StartTableScan();
- return;
- }
-
- if (state.Generation > msg.GetGeneration()) {
- // expired message
- return;
- }
-
- YQL_ENSURE(false, "Got EvScanError from the future, expected: " << state.Generation
- << ", got: " << msg.GetGeneration());
- }
+ State = NDqProto::COMPUTE_STATE_FAILURE;
+ return ReportStateAndMaybeDie(YdbStatusToDqStatus(status), issues);
+ }
- case EShardState::Initial:
- case EShardState::Resolving: {
- // do nothing
- return;
- }
+ if (state->State == EShardState::PostRunning || state->State == EShardState::Running) {
+ state->State = EShardState::Initial;
+ state->ActorId = {};
+ state->ResetRetry();
+ PendingShards.emplace_front(std::move(*state));
+ return StartTableScan();
}
}
void HandleExecute(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) {
+ YQL_ENSURE(ScanData);
auto& msg = *ev->Get();
- CA_LOG_W("Got EvDeliveryProblem, TabletId: " << msg.TabletId << ", NotDelivered: " << msg.NotDelivered);
- if (Shards.empty()) {
+ auto stateIt = InFlightShards.find(msg.TabletId);
+ if (stateIt == InFlightShards.end()) {
+ CA_LOG_E("Broken pipe with unknown tablet " << msg.TabletId);
return;
}
- Y_VERIFY_DEBUG(ScanData);
- auto& state = Shards.front();
-
- if (state.TabletId != msg.TabletId) {
- CA_LOG_E("Unknown tablet " << msg.TabletId << ", expected " << state.TabletId);
- return;
+ auto* state = &(stateIt->second);
+ CA_LOG_W("Got EvDeliveryProblem, TabletId: " << msg.TabletId << ", NotDelivered: " << msg.NotDelivered << ", " << EShardStateToString(state->State));
+ if (state->State == EShardState::Starting || state->State == EShardState::Running) {
+ return RetryDeliveryProblem(state);
}
+ }
- switch (state.State) {
- case EShardState::Starting:
- case EShardState::Running: {
- RetryDeliveryProblem(state);
- return;
- }
-
- case EShardState::Initial:
- case EShardState::Resolving:
- case EShardState::PostRunning: {
- CA_LOG_W("TKqpScanComputeActor: broken pipe with tablet " << state.TabletId
- << ", state: " << (int) state.State);
- return;
- }
+ void HandleExecute(TEvPrivate::TEvRetryShard::TPtr& ev) {
+ ui64 tabletId = ev->Get()->TabletId;
+ auto stateIt = InFlightShards.find(tabletId);
+ if (stateIt == InFlightShards.end()) {
+ CA_LOG_E("Received retry shard for unexpected tablet " << tabletId);
+ return;
}
- }
- void HandleExecute(TEvPrivate::TEvRetryShard::TPtr&) {
- Y_VERIFY_DEBUG(!Shards.empty());
- auto& state = Shards.front();
- SendStartScanRequest(state, state.Generation);
+ auto* state = &(stateIt->second);
+ SendStartScanRequest(state, state->Generation);
}
void HandleExecute(TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr& ev) {
- Y_VERIFY_DEBUG(ScanData);
- Y_VERIFY_DEBUG(!Shards.empty());
-
- auto state = std::move(Shards.front());
- Shards.pop_front();
+ YQL_ENSURE(ScanData);
+ YQL_ENSURE(!PendingResolveShards.empty());
+ auto state = std::move(PendingResolveShards.front());
+ PendingResolveShards.pop_front();
+ ResolveNextShard();
CA_LOG_D("Get resolve result, unlink from tablet " << state.TabletId);
Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvUnlink(state.TabletId));
@@ -647,13 +554,17 @@ private:
YQL_ENSURE(!newShards.empty());
for (int i = newShards.ysize() - 1; i >= 0; --i) {
- Shards.emplace_front(std::move(newShards[i]));
+ PendingShards.emplace_front(std::move(newShards[i]));
}
if (IsDebugLogEnabled(TlsActivationContext->ActorSystem(), NKikimrServices::KQP_COMPUTE)) {
TStringBuilder sb;
- sb << "States: ";
- for (auto& st : Shards) {
+ sb << "Pending shards States: ";
+ for (auto& st : PendingShards) {
+ sb << st.ToString(KeyColumnTypes) << "; ";
+ }
+ sb << "In Flight shards States: ";
+ for(auto& [_, st] : InFlightShards) {
sb << st.ToString(KeyColumnTypes) << "; ";
}
CA_LOG_D(sb);
@@ -668,10 +579,14 @@ private:
// handled by TEvPipeCache::TEvDeliveryProblem event
return;
case TEvKqpCompute::TEvScanDataAck::EventType:
- if (Shards.empty()) {
+ ui64 tabletId = ev->Cookie;
+ auto it = InFlightShards.find(tabletId);
+ if (it == InFlightShards.end()) {
+ CA_LOG_D("Skip lost TEvScanDataAck to " << ev->Sender << ", " << tabletId);
return;
}
- auto& shard = Shards.front();
+
+ auto& shard = it->second;
if (shard.State == EShardState::Running && ev->Sender == shard.ActorId) {
CA_LOG_E("TEvScanDataAck lost while running scan, terminate execution. DataShard actor: "
<< shard.ActorId);
@@ -689,41 +604,39 @@ private:
CA_LOG_N("Disconnected node " << nodeId);
TrackingNodes.erase(nodeId);
-
- auto& state = Shards.front();
- if (state.ActorId && state.ActorId.NodeId() == nodeId) {
- InternalError(TIssuesIds::DEFAULT_ERROR, TStringBuilder() << "Connection with node " << nodeId << " lost.");
+ for(auto& [tabletId, state] : InFlightShards) {
+ if (state.ActorId && state.ActorId.NodeId() == nodeId) {
+ InternalError(TIssuesIds::DEFAULT_ERROR, TStringBuilder() << "Connection with node " << nodeId << " lost.");
+ }
}
}
private:
void StartTableScan() {
- YQL_ENSURE(!Shards.empty());
-
- auto& state = Shards.front();
+ YQL_ENSURE(!PendingShards.empty());
+ ui64 tabletId = PendingShards.front().TabletId;
+ auto [it, success] = InFlightShards.emplace(tabletId, std::move(PendingShards.front()));
+ PendingShards.pop_front();
+ auto* state = &(it->second);
- YQL_ENSURE(state.State == EShardState::Initial);
- state.State = EShardState::Starting;
- state.Generation = ++LastGeneration;
- state.ActorId = {};
-
- CA_LOG_D("StartTableScan: '" << ScanData->TablePath << "', shardId: " << state.TabletId << ", gen: " << state.Generation
- << ", ranges: " << DebugPrintRanges(KeyColumnTypes, state.GetScanRanges(KeyColumnTypes, LastKey), *AppData()->TypeRegistry));
-
- SendStartScanRequest(state, state.Generation);
+ YQL_ENSURE(state->State == EShardState::Initial);
+ state->State = EShardState::Starting;
+ state->Generation = AllocateGeneration(state);
+ state->ActorId = {};
+ SendStartScanRequest(state, state->Generation);
}
- void SendScanDataAck(TShardState& state, ui64 freeSpace) {
- CA_LOG_D("Send EvScanDataAck to " << state.ActorId << ", freeSpace: " << freeSpace << ", gen: " << state.Generation);
+ void SendScanDataAck(TShardState* state, ui64 freeSpace) {
+ CA_LOG_D("Send EvScanDataAck to " << state->ActorId << ", freeSpace: " << freeSpace << ", gen: " << state->Generation);
ui32 flags = IEventHandle::FlagTrackDelivery;
- if (TrackingNodes.insert(state.ActorId.NodeId()).second) {
+ if (TrackingNodes.insert(state->ActorId.NodeId()).second) {
flags |= IEventHandle::FlagSubscribeOnSession;
}
- Send(state.ActorId, new TEvKqpCompute::TEvScanDataAck(freeSpace, state.Generation), flags);
+ Send(state->ActorId, new TEvKqpCompute::TEvScanDataAck(freeSpace, state->Generation), flags, state->TabletId);
}
- void SendStartScanRequest(TShardState& state, ui32 gen) {
- YQL_ENSURE(state.State == EShardState::Starting);
+ void SendStartScanRequest(TShardState* state, ui32 gen) {
+ YQL_ENSURE(state->State == EShardState::Starting);
auto ev = MakeHolder<TEvDataShard::TEvKqpScan>();
ev->Record.SetLocalPathId(ScanData->TableId.PathId.LocalPathId);
@@ -733,7 +646,7 @@ private:
}
ev->Record.MutableSkipNullKeys()->CopyFrom(Meta.GetSkipNullKeys());
- auto ranges = state.GetScanRanges(KeyColumnTypes, LastKey);
+ auto ranges = state->GetScanRanges(KeyColumnTypes, LastKey);
auto protoRanges = ev->Record.MutableRanges();
protoRanges->Reserve(ranges.size());
@@ -769,48 +682,48 @@ private:
ev->Record.SetDataFormat(Meta.GetDataFormat());
- bool subscribed = std::exchange(state.SubscribedOnTablet, true);
+ bool subscribed = std::exchange(state->SubscribedOnTablet, true);
- CA_LOG_D("Send EvKqpScan to shardId: " << state.TabletId << ", tablePath: " << ScanData->TablePath
+ CA_LOG_D("Send EvKqpScan to shardId: " << state->TabletId << ", tablePath: " << ScanData->TablePath
<< ", gen: " << gen << ", subscribe: " << (!subscribed)
<< ", range: " << DebugPrintRanges(KeyColumnTypes, ranges, *AppData()->TypeRegistry));
- Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(ev.Release(), state.TabletId, !subscribed),
+ Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(ev.Release(), state->TabletId, !subscribed),
IEventHandle::FlagTrackDelivery);
}
- void RetryDeliveryProblem(TShardState& state) {
+ void RetryDeliveryProblem(TShardState* state) {
Counters->ScanQueryShardDisconnect->Inc();
- if (state.TotalRetries >= MAX_TOTAL_SHARD_RETRIES) {
- CA_LOG_E("TKqpScanComputeActor: broken pipe with tablet " << state.TabletId
- << ", retries limit exceeded (" << state.TotalRetries << ")");
+ if (state->TotalRetries >= MAX_TOTAL_SHARD_RETRIES) {
+ CA_LOG_E("TKqpScanComputeActor: broken pipe with tablet " << state->TabletId
+ << ", retries limit exceeded (" << state->TotalRetries << ")");
return InternalError(TIssuesIds::DEFAULT_ERROR, TStringBuilder()
- << "Retries limit with shard " << state.TabletId << " exceeded.");
+ << "Retries limit with shard " << state->TabletId << " exceeded.");
}
// note: it might be possible that shard is already removed after successful split/merge operation and cannot be found
// in this case the next TEvKqpScan request will receive the delivery problem response.
// so after several consecutive delivery problem responses retry logic should
// resolve shard details again.
- if (state.RetryAttempt >= MAX_SHARD_RETRIES) {
+ if (state->RetryAttempt >= MAX_SHARD_RETRIES) {
return ResolveShard(state);
}
- state.RetryAttempt++;
- state.TotalRetries++;
- state.Generation = ++LastGeneration;
- state.ActorId = {};
- state.State = EShardState::Starting;
- state.SubscribedOnTablet = false;
- auto retryDelay = state.CalcRetryDelay();
- CA_LOG_W("TKqpScanComputeActor: broken pipe with tablet " << state.TabletId
+ state->RetryAttempt++;
+ state->TotalRetries++;
+ state->Generation = AllocateGeneration(state);
+ state->ActorId = {};
+ state->State = EShardState::Starting;
+ state->SubscribedOnTablet = false;
+ auto retryDelay = state->CalcRetryDelay();
+ CA_LOG_W("TKqpScanComputeActor: broken pipe with tablet " << state->TabletId
<< ", restarting scan from last received key " << PrintLastKey()
- << ", attempt #" << state.RetryAttempt << " (total " << state.TotalRetries << ")"
+ << ", attempt #" << state->RetryAttempt << " (total " << state->TotalRetries << ")"
<< " schedule after " << retryDelay);
- state.RetryTimer = CreateLongTimer(TlsActivationContext->AsActorContext(), retryDelay,
- new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvRetryShard(state.TabletId)));
+ state->RetryTimer = CreateLongTimer(TlsActivationContext->AsActorContext(), retryDelay,
+ new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvRetryShard(state->TabletId)));
}
bool IsQuotingEnabled() const {
@@ -852,9 +765,25 @@ private:
Send(actorId, abortEv.Release());
}
- void ResolveShard(TShardState& state) {
- // resolve shards
- if (state.ResolveAttempt >= MAX_SHARD_RESOLVES) {
+ void ResolveNextShard() {
+ if (!PendingResolveShards.empty()) {
+ auto& state = PendingResolveShards.front();
+ ResolveShard(&state);
+ }
+ }
+
+ void EnqueueResolveShard(TShardState* state) {
+ auto it = InFlightShards.find(state->TabletId);
+ YQL_ENSURE(it != InFlightShards.end());
+ PendingResolveShards.emplace_back(std::move(it->second));
+ InFlightShards.erase(it);
+ if (PendingResolveShards.size() == 1) {
+ ResolveNextShard();
+ }
+ }
+
+ void ResolveShard(TShardState* state) {
+ if (state->ResolveAttempt >= MAX_SHARD_RESOLVES) {
InternalError(TIssuesIds::KIKIMR_SCHEME_ERROR, TStringBuilder()
<< "Table '" << ScanData->TablePath << "' resolve limit exceeded");
return;
@@ -862,12 +791,12 @@ private:
Counters->ScanQueryShardResolve->Inc();
- state.State = EShardState::Resolving;
- state.ResolveAttempt++;
- state.SubscribedOnTablet = false;
+ state->State = EShardState::Resolving;
+ state->ResolveAttempt++;
+ state->SubscribedOnTablet = false;
- auto range = TTableRange(state.Ranges.front().From.GetCells(), state.Ranges.front().FromInclusive,
- state.Ranges.back().To.GetCells(), state.Ranges.back().ToInclusive);
+ auto range = TTableRange(state->Ranges.front().From.GetCells(), state->Ranges.front().FromInclusive,
+ state->Ranges.back().To.GetCells(), state->Ranges.back().ToInclusive);
TVector<TKeyDesc::TColumnOp> columns;
columns.reserve(ScanData->GetColumns().size());
@@ -884,7 +813,7 @@ private:
CA_LOG_D("Sending TEvResolveKeySet update for table '" << ScanData->TablePath << "'"
<< ", range: " << DebugPrintRange(KeyColumnTypes, range, *AppData()->TypeRegistry)
- << ", attempt #" << state.ResolveAttempt);
+ << ", attempt #" << state->ResolveAttempt);
auto request = MakeHolder<NSchemeCache::TSchemeCacheRequest>();
request->ResultSet.emplace_back(std::move(keyDesc));
@@ -907,42 +836,46 @@ private:
}
void PollSources(std::any prev) override {
- if (!prev.has_value() || !ScanData || Shards.empty()) {
+ if (!prev.has_value() || !ScanData) {
return;
}
- auto& state = Shards.front();
- const ui64 freeSpace = CalculateFreeSpace();
- const ui64 prevFreeSpace = std::any_cast<ui64>(prev);
+ for (auto it = InFlightShards.begin(); it != InFlightShards.end(); ++it) {
+ auto* state = &(it->second);
+ const ui64 freeSpace = CalculateFreeSpace();
+ const ui64 prevFreeSpace = std::any_cast<ui64>(prev);
- CA_LOG_T("Scan over tablet " << state.TabletId << " finished: " << ScanData->IsFinished()
- << ", prevFreeSpace: " << prevFreeSpace << ", freeSpace: " << freeSpace << ", peer: " << state.ActorId);
+ CA_LOG_T("Scan over tablet " << state->TabletId << " finished: " << ScanData->IsFinished()
+ << ", prevFreeSpace: " << prevFreeSpace << ", freeSpace: " << freeSpace << ", peer: " << state->ActorId);
- if (!ScanData->IsFinished() && state.State != EShardState::PostRunning
- && prevFreeSpace < freeSpace && state.ActorId)
- {
- CA_LOG_T("[poll] Send EvScanDataAck to " << state.ActorId << ", gen: " << state.Generation
- << ", freeSpace: " << freeSpace);
- SendScanDataAck(state, freeSpace);
+ if (!ScanData->IsFinished() && state->State != EShardState::PostRunning
+ && prevFreeSpace < freeSpace && state->ActorId)
+ {
+ CA_LOG_T("[poll] Send EvScanDataAck to " << state->ActorId << ", gen: " << state->Generation
+ << ", freeSpace: " << freeSpace);
+ SendScanDataAck(state, freeSpace);
+ }
}
}
void TerminateSources(const TIssues& issues, bool success) override {
- if (!ScanData || Shards.empty()) {
+ if (!ScanData) {
return;
}
auto prio = success ? NActors::NLog::PRI_DEBUG : NActors::NLog::PRI_WARN;
- auto& state = Shards.front();
- if (state.ActorId) {
- CA_LOG(prio, "Send abort execution event to scan over tablet: " << state.TabletId << ", table: "
- << ScanData->TablePath << ", scan actor: " << state.ActorId << ", message: " << issues.ToOneLineString());
-
- Send(state.ActorId, new TEvKqp::TEvAbortExecution(
- success ? NYql::NDqProto::StatusIds::SUCCESS : NYql::NDqProto::StatusIds::ABORTED, issues));
- } else {
- CA_LOG(prio, "Table: " << ScanData->TablePath << ", scan has not been started yet");
+ for(auto it = InFlightShards.begin(); it != InFlightShards.end(); ++it) {
+ auto* state = &(it->second);
+ if (state->ActorId) {
+ CA_LOG(prio, "Send abort execution event to scan over tablet: " << state->TabletId << ", table: "
+ << ScanData->TablePath << ", scan actor: " << state->ActorId << ", message: " << issues.ToOneLineString());
+
+ Send(state->ActorId, new TEvKqp::TEvAbortExecution(
+ success ? NYql::NDqProto::StatusIds::SUCCESS : NYql::NDqProto::StatusIds::ABORTED, issues));
+ } else {
+ CA_LOG(prio, "Table: " << ScanData->TablePath << ", scan has not been started yet");
+ }
}
}
@@ -971,6 +904,54 @@ private:
return DebugPrintPoint(KeyColumnTypes, LastKey, *AppData()->TypeRegistry);
}
+ template<class TMessage>
+ TShardState* GetShardState(const TMessage& msg, const TActorId& scanActorId) {
+ ui32 generation;
+ if constexpr(std::is_same_v<TMessage, NKikimrKqp::TEvScanError>) {
+ generation = msg.GetGeneration();
+ } else if constexpr(std::is_same_v<TMessage, NKikimrKqp::TEvScanInitActor>) {
+ generation = msg.GetGeneration();
+ } else {
+ generation = msg.Generation;
+ }
+
+ auto it = AllocatedGenerations.find(generation);
+ YQL_ENSURE(it != AllocatedGenerations.end(), "Received message from unknown scan or request.");
+ ui64 tabletId = it->second;
+ auto stateIt = InFlightShards.find(tabletId);
+ if (stateIt == InFlightShards.end()) {
+ TString error = TStringBuilder() << "Received message from scan shard which is not currently in flight, tablet" << tabletId;
+ CA_LOG_W(error);
+ if (scanActorId) {
+ TerminateExpiredScan(scanActorId, error);
+ }
+
+ return nullptr;
+ }
+
+ auto& state = stateIt->second;
+ if (state.Generation != generation) {
+ TString error = TStringBuilder() << "Received message from expired scan, generation mistmatch, "
+ << "expected: " << state.Generation << ", received: " << generation;
+ CA_LOG_W(error);
+ if (scanActorId) {
+ TerminateExpiredScan(scanActorId, error);
+ }
+
+ return nullptr;
+ }
+
+ return &state;
+ }
+
+ ui32 AllocateGeneration(TShardState* state) {
+ ui32 nextGeneration = ++LastGeneration;
+ auto[it, success] = AllocatedGenerations.emplace(nextGeneration, state->TabletId);
+ YQL_ENSURE(success, "Found duplicated while allocating next generation id for scan request: "
+ << nextGeneration << ", tablet " << state->TabletId << ", allocated for tablet " << it->second);
+ return nextGeneration;
+ }
+
private:
NMiniKQL::TKqpScanComputeContext ComputeCtx;
NKikimrKqp::TKqpSnapshot Snapshot;
@@ -980,7 +961,12 @@ private:
NMiniKQL::TKqpScanComputeContext::TScanData* ScanData = nullptr;
TOwnedCellVec LastKey;
std::deque<std::pair<TEvKqpCompute::TEvScanData::TPtr, TInstant>> PendingScanData;
- std::deque<TShardState> Shards;
+ std::deque<TShardState> PendingShards;
+ std::deque<TShardState> PendingResolveShards;
+
+ std::map<ui32, ui64> AllocatedGenerations;
+ std::map<ui64, TShardState> InFlightShards;
+
ui32 LastGeneration = 0;
std::set<ui64> AffectedShards;
std::set<ui32> TrackingNodes;