diff options
author | Vitalii Gridnev <gridnevvvit@gmail.com> | 2022-06-06 21:19:04 +0300 |
---|---|---|
committer | Vitalii Gridnev <gridnevvvit@gmail.com> | 2022-06-06 21:19:04 +0300 |
commit | 8462a6c94f88963f414cef8eb3910dbbc9945b38 (patch) | |
tree | 202a1e1db390a9eb8949a00f47bc2677b78b89d8 | |
parent | 4971fc8fbeb2bb63643b3f1ff5d978f377bdddb2 (diff) | |
download | ydb-8462a6c94f88963f414cef8eb3910dbbc9945b38.tar.gz |
do not implement logic in switch case KIKIMR-15042
ref:834278dbe08becb84406992df64ffe1d4c84160e
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp | 152 |
1 files changed, 79 insertions, 73 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp index 9249fbd2dd..4c867b6b30 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp @@ -279,6 +279,7 @@ private: Y_VERIFY_DEBUG(!Shards.empty()); auto& msg = *ev->Get(); + auto& state = Shards.front(); switch (state.State) { @@ -316,12 +317,12 @@ private: } } - void ProcessScanData() { - Y_VERIFY_DEBUG(ScanData); - Y_VERIFY_DEBUG(!Shards.empty()); - Y_VERIFY(!PendingScanData.empty()); + void ProcessPendingScanDataItem() { auto& ev = PendingScanData.front().first; + auto& state = Shards.front(); + + auto& msg = *ev->Get(); TDuration latency; if (PendingScanData.front().second != TInstant::Zero()) { @@ -329,82 +330,90 @@ private: Counters->ScanQueryRateLimitLatency->Collect(latency.MilliSeconds()); } - auto& msg = *ev->Get(); - auto& state = Shards.front(); - - switch (state.State) { - case EShardState::Running: - case EShardState::PostRunning: { - if (state.Generation == msg.Generation) { - YQL_ENSURE(state.ActorId == ev->Sender, "expected: " << state.ActorId << ", got: " << ev->Sender); + YQL_ENSURE(state.ActorId == ev->Sender, "expected: " << state.ActorId << ", got: " << ev->Sender); - LastKey = std::move(msg.LastKey); - - ui64 bytes = 0; - ui64 rowsCount = 0; - { - auto guard = TaskRunner->BindAllocator(); - switch (msg.GetDataFormat()) { - case NKikimrTxDataShard::EScanDataFormat::CELLVEC: - case NKikimrTxDataShard::EScanDataFormat::UNSPECIFIED: { - if (!msg.Rows.empty()) { - bytes = ScanData->AddRows(msg.Rows, state.TabletId, TaskRunner->GetHolderFactory()); - rowsCount = msg.Rows.size(); - } - break; - } - case NKikimrTxDataShard::EScanDataFormat::ARROW: { - if (msg.ArrowBatch != nullptr) { - bytes = ScanData->AddRows(*msg.ArrowBatch, state.TabletId, TaskRunner->GetHolderFactory()); - rowsCount = msg.ArrowBatch->num_rows(); - } - break; - } - } + LastKey = std::move(msg.LastKey); + ui64 bytes = 0; + ui64 rowsCount = 0; + { + auto guard = TaskRunner->BindAllocator(); + switch (msg.GetDataFormat()) { + case NKikimrTxDataShard::EScanDataFormat::CELLVEC: + case NKikimrTxDataShard::EScanDataFormat::UNSPECIFIED: { + if (!msg.Rows.empty()) { + bytes = ScanData->AddRows(msg.Rows, state.TabletId, TaskRunner->GetHolderFactory()); + rowsCount = msg.Rows.size(); } + break; + } + case NKikimrTxDataShard::EScanDataFormat::ARROW: { + if (msg.ArrowBatch != nullptr) { + bytes = ScanData->AddRows(*msg.ArrowBatch, state.TabletId, TaskRunner->GetHolderFactory()); + rowsCount = msg.ArrowBatch->num_rows(); + } + break; + } + } + } - CA_LOG_D("Got EvScanData, rows: " << rowsCount << ", bytes: " << bytes << ", finished: " << msg.Finished - << ", from: " << ev->Sender << ", shards remain: " << Shards.size() - << ", delayed for: " << latency.SecondsFloat() << " seconds by ratelimitter"); - if (rowsCount == 0 && !msg.Finished && state.State != EShardState::PostRunning) { - ui64 freeSpace = GetMemoryLimits().ScanBufferSize > ScanData->GetStoredBytes() - ? GetMemoryLimits().ScanBufferSize - ScanData->GetStoredBytes() - : 0ul; - SendScanDataAck(state, freeSpace); - } + CA_LOG_D("Got EvScanData, rows: " << rowsCount << ", bytes: " << bytes << ", finished: " << msg.Finished + << ", from: " << ev->Sender << ", shards remain: " << Shards.size() + << ", delayed for: " << latency.SecondsFloat() << " seconds by ratelimitter"); - if (msg.Finished) { - CA_LOG_D("Tablet " << state.TabletId << " scan finished, unlink"); - Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvUnlink(state.TabletId)); + if (rowsCount == 0 && !msg.Finished && state.State != EShardState::PostRunning) { + ui64 freeSpace = GetMemoryLimits().ScanBufferSize > ScanData->GetStoredBytes() + ? GetMemoryLimits().ScanBufferSize - ScanData->GetStoredBytes() + : 0ul; + SendScanDataAck(state, freeSpace); + } - Shards.pop_front(); + if (msg.Finished) { + CA_LOG_D("Tablet " << state.TabletId << " scan finished, unlink"); + Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvUnlink(state.TabletId)); - if (!Shards.empty()) { - CA_LOG_D("Starting next scan"); - StartTableScan(); - } else { - CA_LOG_D("Finish scans"); - ScanData->Finish(); + Shards.pop_front(); - if (ScanData->BasicStats) { - ScanData->BasicStats->AffectedShards = AffectedShards.size(); - } - } - } + if (!Shards.empty()) { + CA_LOG_D("Starting next scan"); + StartTableScan(); + } else { + CA_LOG_D("Finish scans"); + ScanData->Finish(); - if (Y_UNLIKELY(ScanData->ProfileStats)) { - ScanData->ProfileStats->Messages++; - ScanData->ProfileStats->ScanCpuTime += msg.CpuTime; - ScanData->ProfileStats->ScanWaitTime += msg.WaitTime; - if (msg.PageFault) { - ScanData->ProfileStats->PageFaults += msg.PageFaults; - ScanData->ProfileStats->MessagesByPageFault++; - } - } + if (ScanData->BasicStats) { + ScanData->BasicStats->AffectedShards = AffectedShards.size(); + } + } + } - DoExecute(); + if (Y_UNLIKELY(ScanData->ProfileStats)) { + ScanData->ProfileStats->Messages++; + ScanData->ProfileStats->ScanCpuTime += msg.CpuTime; + ScanData->ProfileStats->ScanWaitTime += msg.WaitTime; + if (msg.PageFault) { + ScanData->ProfileStats->PageFaults += msg.PageFaults; + ScanData->ProfileStats->MessagesByPageFault++; + } + } + } + + void ProcessScanData() { + Y_VERIFY_DEBUG(ScanData); + Y_VERIFY_DEBUG(!Shards.empty()); + Y_VERIFY(!PendingScanData.empty()); + + auto& ev = PendingScanData.front().first; + auto& msg = *ev->Get(); + auto& state = Shards.front(); + + switch (state.State) { + case EShardState::Running: + case EShardState::PostRunning: { + if (state.Generation == msg.Generation) { + ProcessPendingScanDataItem(); + DoExecute(); } else if (state.Generation > msg.Generation) { TerminateExpiredScan(ev->Sender, "Cancel expired scan"); } else { @@ -900,9 +909,6 @@ private: << ", attempt #" << state.ResolveAttempt); auto request = MakeHolder<NSchemeCache::TSchemeCacheRequest>(); - // Avoid setting DomainOwnerId to reduce possible races with schemeshard migration - // TODO: request->DatabaseName = ...; - // TODO: request->UserToken = ...; request->ResultSet.emplace_back(std::move(keyDesc)); Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(ScanData->TableId, {})); Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvResolveKeySet(request)); @@ -999,7 +1005,7 @@ private: std::deque<TShardState> Shards; ui32 LastGeneration = 0; std::set<ui64> AffectedShards; - THashSet<ui32> TrackingNodes; + std::set<ui32> TrackingNodes; }; } // anonymous namespace |