diff options
author | Vitalii Gridnev <[email protected]> | 2022-06-09 21:36:43 +0300 |
---|---|---|
committer | Vitalii Gridnev <[email protected]> | 2022-06-09 21:36:43 +0300 |
commit | 711e8dcfc5597da133348eb7db05d01ee1f0ce2b (patch) | |
tree | ec24d5c8934300c2cec93d12dd6d4c1030fb0200 | |
parent | eb97595367a096742a25b6caeb947d6543f0fc2c (diff) |
[kqp] support multiple shards reading: preparations for multiple shards in flight KIKIMR-15042
ref:b51d5b46bef7a3b26eb7d3dc0a2c2b9de070a195
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp | 530 |
1 files changed, 258 insertions, 272 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp index d8f8441c029..d150bbd65c3 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp @@ -119,7 +119,7 @@ public: ScanData->TableReader = CreateKqpTableReader(*ScanData); for (const auto& read : Meta.GetReads()) { - auto& state = Shards.emplace_back(TShardState(read.GetShardId())); + auto& state = PendingShards.emplace_back(TShardState(read.GetShardId())); state.Ranges.reserve(read.GetKeyRanges().size()); for (const auto& range : read.GetKeyRanges()) { auto& sr = state.Ranges.emplace_back(TSerializedTableRange(range)); @@ -236,96 +236,60 @@ private: } void HandleExecute(TEvKqpCompute::TEvScanInitActor::TPtr& ev) { - Y_VERIFY_DEBUG(ScanData); - Y_VERIFY_DEBUG(!Shards.empty()); - + YQL_ENSURE(ScanData); auto& msg = ev->Get()->Record; - - auto& state = Shards.front(); auto scanActorId = ActorIdFromProto(msg.GetScanActorId()); + auto* state = GetShardState(msg, scanActorId); + if (!state) + return; CA_LOG_D("Got EvScanInitActor from " << scanActorId << ", gen: " << msg.GetGeneration() - << ", state: " << EShardStateToString(state.State) << ", stateGen: " << state.Generation); - - switch (state.State) { - case EShardState::Starting: { - if (state.Generation == msg.GetGeneration()) { - state.State = EShardState::Running; - state.ActorId = scanActorId; - - state.ResetRetry(); - - AffectedShards.insert(state.TabletId); - - SendScanDataAck(state, GetMemoryLimits().ScanBufferSize); - return; - } + << ", state: " << EShardStateToString(state->State) << ", stateGen: " << state->Generation); - if (state.Generation > msg.GetGeneration()) { - TerminateExpiredScan(scanActorId, "Got expired EvScanInitActor, terminate it"); - return; - } + YQL_ENSURE(state->Generation == msg.GetGeneration()); - YQL_ENSURE(false, "Got EvScanInitActor from the future, gen: " << msg.GetGeneration() - << ", expected: " << state.Generation); - } - - case EShardState::Initial: - case EShardState::Running: - case EShardState::PostRunning: - case EShardState::Resolving: { - TerminateExpiredScan(scanActorId, "Got unexpected/expired EvScanInitActor, terminate it"); - return; - } + if (state->State == EShardState::Starting) { + state->State = EShardState::Running; + state->ActorId = scanActorId; + state->ResetRetry(); + AffectedShards.insert(state->TabletId); + SendScanDataAck(state, GetMemoryLimits().ScanBufferSize); + } else { + TerminateExpiredScan(scanActorId, "Got unexpected/expired EvScanInitActor, terminate it"); } } void HandleExecute(TEvKqpCompute::TEvScanData::TPtr& ev) { - Y_VERIFY_DEBUG(ScanData); - Y_VERIFY_DEBUG(!Shards.empty()); - + YQL_ENSURE(ScanData); auto& msg = *ev->Get(); + auto* state = GetShardState(msg, ev->Sender); + if (!state) + return; - auto& state = Shards.front(); - - switch (state.State) { - case EShardState::Running: { - if (state.Generation == msg.Generation) { - YQL_ENSURE(state.ActorId == ev->Sender, "expected: " << state.ActorId << ", got: " << ev->Sender); - - TInstant startTime = TActivationContext::Now(); - if (ev->Get()->Finished) { - state.State = EShardState::PostRunning; - } - PendingScanData.emplace_back(std::make_pair(ev, startTime)); + YQL_ENSURE(state->Generation == msg.Generation); + if (state->State != EShardState::Running) { + return TerminateExpiredScan(ev->Sender, "Cancel expired scan"); + } - if (IsQuotingEnabled()) { - AcquireRateQuota(); - } else { - ProcessScanData(); - } + YQL_ENSURE(state->ActorId == ev->Sender, "expected: " << state->ActorId << ", got: " << ev->Sender); + TInstant startTime = TActivationContext::Now(); + if (ev->Get()->Finished) { + state->State = EShardState::PostRunning; + } + PendingScanData.emplace_back(std::make_pair(ev, startTime)); - } else if (state.Generation > msg.Generation) { - TerminateExpiredScan(ev->Sender, "Cancel expired scan"); - } else { - YQL_ENSURE(false, "EvScanData from the future, expected: " << state.Generation << ", got: " << msg.Generation); - } - break; - } - case EShardState::PostRunning: - TerminateExpiredScan(ev->Sender, "Unexpected data after finish"); - break; - case EShardState::Initial: - case EShardState::Starting: - case EShardState::Resolving: - TerminateExpiredScan(ev->Sender, "Cancel unexpected scan"); - break; + if (IsQuotingEnabled()) { + AcquireRateQuota(); + } else { + ProcessScanData(); } } void ProcessPendingScanDataItem(TEvKqpCompute::TEvScanData::TPtr& ev, const TInstant& enqueuedAt) { - auto& state = Shards.front(); auto& msg = *ev->Get(); + auto* state = GetShardState(msg, ev->Sender); + if (!state) + return; TDuration latency; if (enqueuedAt != TInstant::Zero()) { @@ -333,7 +297,7 @@ private: Counters->ScanQueryRateLimitLatency->Collect(latency.MilliSeconds()); } - YQL_ENSURE(state.ActorId == ev->Sender, "expected: " << state.ActorId << ", got: " << ev->Sender); + YQL_ENSURE(state->ActorId == ev->Sender, "expected: " << state->ActorId << ", got: " << ev->Sender); LastKey = std::move(msg.LastKey); ui64 bytes = 0; @@ -344,14 +308,14 @@ private: case NKikimrTxDataShard::EScanDataFormat::CELLVEC: case NKikimrTxDataShard::EScanDataFormat::UNSPECIFIED: { if (!msg.Rows.empty()) { - bytes = ScanData->AddRows(msg.Rows, state.TabletId, TaskRunner->GetHolderFactory()); + bytes = ScanData->AddRows(msg.Rows, state->TabletId, TaskRunner->GetHolderFactory()); rowsCount = msg.Rows.size(); } break; } case NKikimrTxDataShard::EScanDataFormat::ARROW: { if (msg.ArrowBatch != nullptr) { - bytes = ScanData->AddRows(*msg.ArrowBatch, state.TabletId, TaskRunner->GetHolderFactory()); + bytes = ScanData->AddRows(*msg.ArrowBatch, state->TabletId, TaskRunner->GetHolderFactory()); rowsCount = msg.ArrowBatch->num_rows(); } break; @@ -359,12 +323,12 @@ private: } } - CA_LOG_D("Got EvScanData, rows: " << rowsCount << ", bytes: " << bytes << ", finished: " << msg.Finished - << ", from: " << ev->Sender << ", shards remain: " << Shards.size() + << ", from: " << ev->Sender << ", shards remain: " << PendingShards.size() + << ", in flight shards " << InFlightShards.size() << ", delayed for: " << latency.SecondsFloat() << " seconds by ratelimitter"); - if (rowsCount == 0 && !msg.Finished && state.State != EShardState::PostRunning) { + if (rowsCount == 0 && !msg.Finished && state->State != EShardState::PostRunning) { ui64 freeSpace = GetMemoryLimits().ScanBufferSize > ScanData->GetStoredBytes() ? GetMemoryLimits().ScanBufferSize - ScanData->GetStoredBytes() : 0ul; @@ -372,12 +336,11 @@ private: } if (msg.Finished) { - CA_LOG_D("Tablet " << state.TabletId << " scan finished, unlink"); - Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvUnlink(state.TabletId)); - - Shards.pop_front(); + CA_LOG_D("Tablet " << state->TabletId << " scan finished, unlink"); + Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvUnlink(state->TabletId)); - if (!Shards.empty()) { + InFlightShards.erase(state->TabletId); + if (!PendingShards.empty()) { CA_LOG_D("Starting next scan"); StartTableScan(); } else { @@ -402,153 +365,97 @@ private: } void ProcessScanData() { - Y_VERIFY_DEBUG(ScanData); - Y_VERIFY_DEBUG(!Shards.empty()); - Y_VERIFY(!PendingScanData.empty()); + YQL_ENSURE(ScanData); + YQL_ENSURE(!PendingScanData.empty()); auto ev = std::move(PendingScanData.front().first); - TInstant enqueuedAt = std::move(PendingScanData.front().second); + auto enqueuedAt = std::move(PendingScanData.front().second); PendingScanData.pop_front(); auto& msg = *ev->Get(); - auto& state = Shards.front(); - - switch (state.State) { - case EShardState::Running: - case EShardState::PostRunning: { - if (state.Generation == msg.Generation) { - ProcessPendingScanDataItem(ev, enqueuedAt); - DoExecute(); - } else if (state.Generation > msg.Generation) { - TerminateExpiredScan(ev->Sender, "Cancel expired scan"); - } else { - YQL_ENSURE(false, "EvScanData from the future, expected: " << state.Generation << ", got: " << msg.Generation); - } - break; - } + auto* state = GetShardState(msg, ev->Sender); + if (!state) + return; - case EShardState::Initial: - case EShardState::Starting: - case EShardState::Resolving: { - TerminateExpiredScan(ev->Sender, "Cancel unexpected scan"); - break; - } + if (state->State == EShardState::Running || state->State == EShardState::PostRunning) { + ProcessPendingScanDataItem(ev, enqueuedAt); + DoExecute(); + } else { + TerminateExpiredScan(ev->Sender, "Cancel expired scan"); } } void HandleExecute(TEvKqpCompute::TEvScanError::TPtr& ev) { - Y_VERIFY_DEBUG(ScanData); - Y_VERIFY_DEBUG(!Shards.empty()); - + YQL_ENSURE(ScanData); auto& msg = ev->Get()->Record; Ydb::StatusIds::StatusCode status = msg.GetStatus(); TIssues issues; IssuesFromMessage(msg.GetIssues(), issues); - auto& state = Shards.front(); + auto* state = GetShardState(msg, TActorId()); + if (!state) + return; - switch (state.State) { - case EShardState::Starting: { - if (state.Generation == msg.GetGeneration()) { - CA_LOG_W("Got EvScanError while starting scan, status: " << Ydb::StatusIds_StatusCode_Name(status) - << ", reason: " << issues.ToString()); + CA_LOG_W("Got EvScanError scan state: " << EShardStateToString(state->State) + << " status: " << Ydb::StatusIds_StatusCode_Name(status) + << ", reason: " << issues.ToString()); - if (FindSchemeErrorInIssues(status, issues)) { - ResolveShard(state); - return; - } + YQL_ENSURE(state->Generation == msg.GetGeneration()); - State = NDqProto::COMPUTE_STATE_FAILURE; - ReportStateAndMaybeDie(YdbStatusToDqStatus(status), issues); - return; - } - - if (state.Generation > msg.GetGeneration()) { - // expired message - return; - } - - YQL_ENSURE(false, "Got EvScanError from the future, expected: " << state.Generation - << ", got: " << msg.GetGeneration()); - break; + if (state->State == EShardState::Starting) { + if (FindSchemeErrorInIssues(status, issues)) { + return EnqueueResolveShard(state); } - case EShardState::PostRunning: - case EShardState::Running: { - if (state.Generation == msg.GetGeneration()) { - CA_LOG_W("Got EvScanError while running scan, status: " << Ydb::StatusIds_StatusCode_Name(status) - << ", reason: " << issues.ToString() << ", restart"); - - state.State = EShardState::Initial; - state.ActorId = {}; - state.ResetRetry(); - StartTableScan(); - return; - } - - if (state.Generation > msg.GetGeneration()) { - // expired message - return; - } - - YQL_ENSURE(false, "Got EvScanError from the future, expected: " << state.Generation - << ", got: " << msg.GetGeneration()); - } + State = NDqProto::COMPUTE_STATE_FAILURE; + return ReportStateAndMaybeDie(YdbStatusToDqStatus(status), issues); + } - case EShardState::Initial: - case EShardState::Resolving: { - // do nothing - return; - } + if (state->State == EShardState::PostRunning || state->State == EShardState::Running) { + state->State = EShardState::Initial; + state->ActorId = {}; + state->ResetRetry(); + PendingShards.emplace_front(std::move(*state)); + return StartTableScan(); } } void HandleExecute(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) { + YQL_ENSURE(ScanData); auto& msg = *ev->Get(); - CA_LOG_W("Got EvDeliveryProblem, TabletId: " << msg.TabletId << ", NotDelivered: " << msg.NotDelivered); - if (Shards.empty()) { + auto stateIt = InFlightShards.find(msg.TabletId); + if (stateIt == InFlightShards.end()) { + CA_LOG_E("Broken pipe with unknown tablet " << msg.TabletId); return; } - Y_VERIFY_DEBUG(ScanData); - auto& state = Shards.front(); - - if (state.TabletId != msg.TabletId) { - CA_LOG_E("Unknown tablet " << msg.TabletId << ", expected " << state.TabletId); - return; + auto* state = &(stateIt->second); + CA_LOG_W("Got EvDeliveryProblem, TabletId: " << msg.TabletId << ", NotDelivered: " << msg.NotDelivered << ", " << EShardStateToString(state->State)); + if (state->State == EShardState::Starting || state->State == EShardState::Running) { + return RetryDeliveryProblem(state); } + } - switch (state.State) { - case EShardState::Starting: - case EShardState::Running: { - RetryDeliveryProblem(state); - return; - } - - case EShardState::Initial: - case EShardState::Resolving: - case EShardState::PostRunning: { - CA_LOG_W("TKqpScanComputeActor: broken pipe with tablet " << state.TabletId - << ", state: " << (int) state.State); - return; - } + void HandleExecute(TEvPrivate::TEvRetryShard::TPtr& ev) { + ui64 tabletId = ev->Get()->TabletId; + auto stateIt = InFlightShards.find(tabletId); + if (stateIt == InFlightShards.end()) { + CA_LOG_E("Received retry shard for unexpected tablet " << tabletId); + return; } - } - void HandleExecute(TEvPrivate::TEvRetryShard::TPtr&) { - Y_VERIFY_DEBUG(!Shards.empty()); - auto& state = Shards.front(); - SendStartScanRequest(state, state.Generation); + auto* state = &(stateIt->second); + SendStartScanRequest(state, state->Generation); } void HandleExecute(TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr& ev) { - Y_VERIFY_DEBUG(ScanData); - Y_VERIFY_DEBUG(!Shards.empty()); - - auto state = std::move(Shards.front()); - Shards.pop_front(); + YQL_ENSURE(ScanData); + YQL_ENSURE(!PendingResolveShards.empty()); + auto state = std::move(PendingResolveShards.front()); + PendingResolveShards.pop_front(); + ResolveNextShard(); CA_LOG_D("Get resolve result, unlink from tablet " << state.TabletId); Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvUnlink(state.TabletId)); @@ -647,13 +554,17 @@ private: YQL_ENSURE(!newShards.empty()); for (int i = newShards.ysize() - 1; i >= 0; --i) { - Shards.emplace_front(std::move(newShards[i])); + PendingShards.emplace_front(std::move(newShards[i])); } if (IsDebugLogEnabled(TlsActivationContext->ActorSystem(), NKikimrServices::KQP_COMPUTE)) { TStringBuilder sb; - sb << "States: "; - for (auto& st : Shards) { + sb << "Pending shards States: "; + for (auto& st : PendingShards) { + sb << st.ToString(KeyColumnTypes) << "; "; + } + sb << "In Flight shards States: "; + for(auto& [_, st] : InFlightShards) { sb << st.ToString(KeyColumnTypes) << "; "; } CA_LOG_D(sb); @@ -668,10 +579,14 @@ private: // handled by TEvPipeCache::TEvDeliveryProblem event return; case TEvKqpCompute::TEvScanDataAck::EventType: - if (Shards.empty()) { + ui64 tabletId = ev->Cookie; + auto it = InFlightShards.find(tabletId); + if (it == InFlightShards.end()) { + CA_LOG_D("Skip lost TEvScanDataAck to " << ev->Sender << ", " << tabletId); return; } - auto& shard = Shards.front(); + + auto& shard = it->second; if (shard.State == EShardState::Running && ev->Sender == shard.ActorId) { CA_LOG_E("TEvScanDataAck lost while running scan, terminate execution. DataShard actor: " << shard.ActorId); @@ -689,41 +604,39 @@ private: CA_LOG_N("Disconnected node " << nodeId); TrackingNodes.erase(nodeId); - - auto& state = Shards.front(); - if (state.ActorId && state.ActorId.NodeId() == nodeId) { - InternalError(TIssuesIds::DEFAULT_ERROR, TStringBuilder() << "Connection with node " << nodeId << " lost."); + for(auto& [tabletId, state] : InFlightShards) { + if (state.ActorId && state.ActorId.NodeId() == nodeId) { + InternalError(TIssuesIds::DEFAULT_ERROR, TStringBuilder() << "Connection with node " << nodeId << " lost."); + } } } private: void StartTableScan() { - YQL_ENSURE(!Shards.empty()); - - auto& state = Shards.front(); + YQL_ENSURE(!PendingShards.empty()); + ui64 tabletId = PendingShards.front().TabletId; + auto [it, success] = InFlightShards.emplace(tabletId, std::move(PendingShards.front())); + PendingShards.pop_front(); + auto* state = &(it->second); - YQL_ENSURE(state.State == EShardState::Initial); - state.State = EShardState::Starting; - state.Generation = ++LastGeneration; - state.ActorId = {}; - - CA_LOG_D("StartTableScan: '" << ScanData->TablePath << "', shardId: " << state.TabletId << ", gen: " << state.Generation - << ", ranges: " << DebugPrintRanges(KeyColumnTypes, state.GetScanRanges(KeyColumnTypes, LastKey), *AppData()->TypeRegistry)); - - SendStartScanRequest(state, state.Generation); + YQL_ENSURE(state->State == EShardState::Initial); + state->State = EShardState::Starting; + state->Generation = AllocateGeneration(state); + state->ActorId = {}; + SendStartScanRequest(state, state->Generation); } - void SendScanDataAck(TShardState& state, ui64 freeSpace) { - CA_LOG_D("Send EvScanDataAck to " << state.ActorId << ", freeSpace: " << freeSpace << ", gen: " << state.Generation); + void SendScanDataAck(TShardState* state, ui64 freeSpace) { + CA_LOG_D("Send EvScanDataAck to " << state->ActorId << ", freeSpace: " << freeSpace << ", gen: " << state->Generation); ui32 flags = IEventHandle::FlagTrackDelivery; - if (TrackingNodes.insert(state.ActorId.NodeId()).second) { + if (TrackingNodes.insert(state->ActorId.NodeId()).second) { flags |= IEventHandle::FlagSubscribeOnSession; } - Send(state.ActorId, new TEvKqpCompute::TEvScanDataAck(freeSpace, state.Generation), flags); + Send(state->ActorId, new TEvKqpCompute::TEvScanDataAck(freeSpace, state->Generation), flags, state->TabletId); } - void SendStartScanRequest(TShardState& state, ui32 gen) { - YQL_ENSURE(state.State == EShardState::Starting); + void SendStartScanRequest(TShardState* state, ui32 gen) { + YQL_ENSURE(state->State == EShardState::Starting); auto ev = MakeHolder<TEvDataShard::TEvKqpScan>(); ev->Record.SetLocalPathId(ScanData->TableId.PathId.LocalPathId); @@ -733,7 +646,7 @@ private: } ev->Record.MutableSkipNullKeys()->CopyFrom(Meta.GetSkipNullKeys()); - auto ranges = state.GetScanRanges(KeyColumnTypes, LastKey); + auto ranges = state->GetScanRanges(KeyColumnTypes, LastKey); auto protoRanges = ev->Record.MutableRanges(); protoRanges->Reserve(ranges.size()); @@ -769,48 +682,48 @@ private: ev->Record.SetDataFormat(Meta.GetDataFormat()); - bool subscribed = std::exchange(state.SubscribedOnTablet, true); + bool subscribed = std::exchange(state->SubscribedOnTablet, true); - CA_LOG_D("Send EvKqpScan to shardId: " << state.TabletId << ", tablePath: " << ScanData->TablePath + CA_LOG_D("Send EvKqpScan to shardId: " << state->TabletId << ", tablePath: " << ScanData->TablePath << ", gen: " << gen << ", subscribe: " << (!subscribed) << ", range: " << DebugPrintRanges(KeyColumnTypes, ranges, *AppData()->TypeRegistry)); - Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(ev.Release(), state.TabletId, !subscribed), + Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(ev.Release(), state->TabletId, !subscribed), IEventHandle::FlagTrackDelivery); } - void RetryDeliveryProblem(TShardState& state) { + void RetryDeliveryProblem(TShardState* state) { Counters->ScanQueryShardDisconnect->Inc(); - if (state.TotalRetries >= MAX_TOTAL_SHARD_RETRIES) { - CA_LOG_E("TKqpScanComputeActor: broken pipe with tablet " << state.TabletId - << ", retries limit exceeded (" << state.TotalRetries << ")"); + if (state->TotalRetries >= MAX_TOTAL_SHARD_RETRIES) { + CA_LOG_E("TKqpScanComputeActor: broken pipe with tablet " << state->TabletId + << ", retries limit exceeded (" << state->TotalRetries << ")"); return InternalError(TIssuesIds::DEFAULT_ERROR, TStringBuilder() - << "Retries limit with shard " << state.TabletId << " exceeded."); + << "Retries limit with shard " << state->TabletId << " exceeded."); } // note: it might be possible that shard is already removed after successful split/merge operation and cannot be found // in this case the next TEvKqpScan request will receive the delivery problem response. // so after several consecutive delivery problem responses retry logic should // resolve shard details again. - if (state.RetryAttempt >= MAX_SHARD_RETRIES) { + if (state->RetryAttempt >= MAX_SHARD_RETRIES) { return ResolveShard(state); } - state.RetryAttempt++; - state.TotalRetries++; - state.Generation = ++LastGeneration; - state.ActorId = {}; - state.State = EShardState::Starting; - state.SubscribedOnTablet = false; - auto retryDelay = state.CalcRetryDelay(); - CA_LOG_W("TKqpScanComputeActor: broken pipe with tablet " << state.TabletId + state->RetryAttempt++; + state->TotalRetries++; + state->Generation = AllocateGeneration(state); + state->ActorId = {}; + state->State = EShardState::Starting; + state->SubscribedOnTablet = false; + auto retryDelay = state->CalcRetryDelay(); + CA_LOG_W("TKqpScanComputeActor: broken pipe with tablet " << state->TabletId << ", restarting scan from last received key " << PrintLastKey() - << ", attempt #" << state.RetryAttempt << " (total " << state.TotalRetries << ")" + << ", attempt #" << state->RetryAttempt << " (total " << state->TotalRetries << ")" << " schedule after " << retryDelay); - state.RetryTimer = CreateLongTimer(TlsActivationContext->AsActorContext(), retryDelay, - new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvRetryShard(state.TabletId))); + state->RetryTimer = CreateLongTimer(TlsActivationContext->AsActorContext(), retryDelay, + new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvRetryShard(state->TabletId))); } bool IsQuotingEnabled() const { @@ -852,9 +765,25 @@ private: Send(actorId, abortEv.Release()); } - void ResolveShard(TShardState& state) { - // resolve shards - if (state.ResolveAttempt >= MAX_SHARD_RESOLVES) { + void ResolveNextShard() { + if (!PendingResolveShards.empty()) { + auto& state = PendingResolveShards.front(); + ResolveShard(&state); + } + } + + void EnqueueResolveShard(TShardState* state) { + auto it = InFlightShards.find(state->TabletId); + YQL_ENSURE(it != InFlightShards.end()); + PendingResolveShards.emplace_back(std::move(it->second)); + InFlightShards.erase(it); + if (PendingResolveShards.size() == 1) { + ResolveNextShard(); + } + } + + void ResolveShard(TShardState* state) { + if (state->ResolveAttempt >= MAX_SHARD_RESOLVES) { InternalError(TIssuesIds::KIKIMR_SCHEME_ERROR, TStringBuilder() << "Table '" << ScanData->TablePath << "' resolve limit exceeded"); return; @@ -862,12 +791,12 @@ private: Counters->ScanQueryShardResolve->Inc(); - state.State = EShardState::Resolving; - state.ResolveAttempt++; - state.SubscribedOnTablet = false; + state->State = EShardState::Resolving; + state->ResolveAttempt++; + state->SubscribedOnTablet = false; - auto range = TTableRange(state.Ranges.front().From.GetCells(), state.Ranges.front().FromInclusive, - state.Ranges.back().To.GetCells(), state.Ranges.back().ToInclusive); + auto range = TTableRange(state->Ranges.front().From.GetCells(), state->Ranges.front().FromInclusive, + state->Ranges.back().To.GetCells(), state->Ranges.back().ToInclusive); TVector<TKeyDesc::TColumnOp> columns; columns.reserve(ScanData->GetColumns().size()); @@ -884,7 +813,7 @@ private: CA_LOG_D("Sending TEvResolveKeySet update for table '" << ScanData->TablePath << "'" << ", range: " << DebugPrintRange(KeyColumnTypes, range, *AppData()->TypeRegistry) - << ", attempt #" << state.ResolveAttempt); + << ", attempt #" << state->ResolveAttempt); auto request = MakeHolder<NSchemeCache::TSchemeCacheRequest>(); request->ResultSet.emplace_back(std::move(keyDesc)); @@ -907,42 +836,46 @@ private: } void PollSources(std::any prev) override { - if (!prev.has_value() || !ScanData || Shards.empty()) { + if (!prev.has_value() || !ScanData) { return; } - auto& state = Shards.front(); - const ui64 freeSpace = CalculateFreeSpace(); - const ui64 prevFreeSpace = std::any_cast<ui64>(prev); + for (auto it = InFlightShards.begin(); it != InFlightShards.end(); ++it) { + auto* state = &(it->second); + const ui64 freeSpace = CalculateFreeSpace(); + const ui64 prevFreeSpace = std::any_cast<ui64>(prev); - CA_LOG_T("Scan over tablet " << state.TabletId << " finished: " << ScanData->IsFinished() - << ", prevFreeSpace: " << prevFreeSpace << ", freeSpace: " << freeSpace << ", peer: " << state.ActorId); + CA_LOG_T("Scan over tablet " << state->TabletId << " finished: " << ScanData->IsFinished() + << ", prevFreeSpace: " << prevFreeSpace << ", freeSpace: " << freeSpace << ", peer: " << state->ActorId); - if (!ScanData->IsFinished() && state.State != EShardState::PostRunning - && prevFreeSpace < freeSpace && state.ActorId) - { - CA_LOG_T("[poll] Send EvScanDataAck to " << state.ActorId << ", gen: " << state.Generation - << ", freeSpace: " << freeSpace); - SendScanDataAck(state, freeSpace); + if (!ScanData->IsFinished() && state->State != EShardState::PostRunning + && prevFreeSpace < freeSpace && state->ActorId) + { + CA_LOG_T("[poll] Send EvScanDataAck to " << state->ActorId << ", gen: " << state->Generation + << ", freeSpace: " << freeSpace); + SendScanDataAck(state, freeSpace); + } } } void TerminateSources(const TIssues& issues, bool success) override { - if (!ScanData || Shards.empty()) { + if (!ScanData) { return; } auto prio = success ? NActors::NLog::PRI_DEBUG : NActors::NLog::PRI_WARN; - auto& state = Shards.front(); - if (state.ActorId) { - CA_LOG(prio, "Send abort execution event to scan over tablet: " << state.TabletId << ", table: " - << ScanData->TablePath << ", scan actor: " << state.ActorId << ", message: " << issues.ToOneLineString()); - - Send(state.ActorId, new TEvKqp::TEvAbortExecution( - success ? NYql::NDqProto::StatusIds::SUCCESS : NYql::NDqProto::StatusIds::ABORTED, issues)); - } else { - CA_LOG(prio, "Table: " << ScanData->TablePath << ", scan has not been started yet"); + for(auto it = InFlightShards.begin(); it != InFlightShards.end(); ++it) { + auto* state = &(it->second); + if (state->ActorId) { + CA_LOG(prio, "Send abort execution event to scan over tablet: " << state->TabletId << ", table: " + << ScanData->TablePath << ", scan actor: " << state->ActorId << ", message: " << issues.ToOneLineString()); + + Send(state->ActorId, new TEvKqp::TEvAbortExecution( + success ? NYql::NDqProto::StatusIds::SUCCESS : NYql::NDqProto::StatusIds::ABORTED, issues)); + } else { + CA_LOG(prio, "Table: " << ScanData->TablePath << ", scan has not been started yet"); + } } } @@ -971,6 +904,54 @@ private: return DebugPrintPoint(KeyColumnTypes, LastKey, *AppData()->TypeRegistry); } + template<class TMessage> + TShardState* GetShardState(const TMessage& msg, const TActorId& scanActorId) { + ui32 generation; + if constexpr(std::is_same_v<TMessage, NKikimrKqp::TEvScanError>) { + generation = msg.GetGeneration(); + } else if constexpr(std::is_same_v<TMessage, NKikimrKqp::TEvScanInitActor>) { + generation = msg.GetGeneration(); + } else { + generation = msg.Generation; + } + + auto it = AllocatedGenerations.find(generation); + YQL_ENSURE(it != AllocatedGenerations.end(), "Received message from unknown scan or request."); + ui64 tabletId = it->second; + auto stateIt = InFlightShards.find(tabletId); + if (stateIt == InFlightShards.end()) { + TString error = TStringBuilder() << "Received message from scan shard which is not currently in flight, tablet" << tabletId; + CA_LOG_W(error); + if (scanActorId) { + TerminateExpiredScan(scanActorId, error); + } + + return nullptr; + } + + auto& state = stateIt->second; + if (state.Generation != generation) { + TString error = TStringBuilder() << "Received message from expired scan, generation mistmatch, " + << "expected: " << state.Generation << ", received: " << generation; + CA_LOG_W(error); + if (scanActorId) { + TerminateExpiredScan(scanActorId, error); + } + + return nullptr; + } + + return &state; + } + + ui32 AllocateGeneration(TShardState* state) { + ui32 nextGeneration = ++LastGeneration; + auto[it, success] = AllocatedGenerations.emplace(nextGeneration, state->TabletId); + YQL_ENSURE(success, "Found duplicated while allocating next generation id for scan request: " + << nextGeneration << ", tablet " << state->TabletId << ", allocated for tablet " << it->second); + return nextGeneration; + } + private: NMiniKQL::TKqpScanComputeContext ComputeCtx; NKikimrKqp::TKqpSnapshot Snapshot; @@ -980,7 +961,12 @@ private: NMiniKQL::TKqpScanComputeContext::TScanData* ScanData = nullptr; TOwnedCellVec LastKey; std::deque<std::pair<TEvKqpCompute::TEvScanData::TPtr, TInstant>> PendingScanData; - std::deque<TShardState> Shards; + std::deque<TShardState> PendingShards; + std::deque<TShardState> PendingResolveShards; + + std::map<ui32, ui64> AllocatedGenerations; + std::map<ui64, TShardState> InFlightShards; + ui32 LastGeneration = 0; std::set<ui64> AffectedShards; std::set<ui32> TrackingNodes; |