aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVitalii Gridnev <gridnevvvit@gmail.com>2022-06-06 21:19:04 +0300
committerVitalii Gridnev <gridnevvvit@gmail.com>2022-06-06 21:19:04 +0300
commit8462a6c94f88963f414cef8eb3910dbbc9945b38 (patch)
tree202a1e1db390a9eb8949a00f47bc2677b78b89d8
parent4971fc8fbeb2bb63643b3f1ff5d978f377bdddb2 (diff)
downloadydb-8462a6c94f88963f414cef8eb3910dbbc9945b38.tar.gz
do not implement logic in switch case KIKIMR-15042
ref:834278dbe08becb84406992df64ffe1d4c84160e
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp152
1 files changed, 79 insertions, 73 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
index 9249fbd2dd..4c867b6b30 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
@@ -279,6 +279,7 @@ private:
Y_VERIFY_DEBUG(!Shards.empty());
auto& msg = *ev->Get();
+
auto& state = Shards.front();
switch (state.State) {
@@ -316,12 +317,12 @@ private:
}
}
- void ProcessScanData() {
- Y_VERIFY_DEBUG(ScanData);
- Y_VERIFY_DEBUG(!Shards.empty());
- Y_VERIFY(!PendingScanData.empty());
+ void ProcessPendingScanDataItem() {
auto& ev = PendingScanData.front().first;
+ auto& state = Shards.front();
+
+ auto& msg = *ev->Get();
TDuration latency;
if (PendingScanData.front().second != TInstant::Zero()) {
@@ -329,82 +330,90 @@ private:
Counters->ScanQueryRateLimitLatency->Collect(latency.MilliSeconds());
}
- auto& msg = *ev->Get();
- auto& state = Shards.front();
-
- switch (state.State) {
- case EShardState::Running:
- case EShardState::PostRunning: {
- if (state.Generation == msg.Generation) {
- YQL_ENSURE(state.ActorId == ev->Sender, "expected: " << state.ActorId << ", got: " << ev->Sender);
+ YQL_ENSURE(state.ActorId == ev->Sender, "expected: " << state.ActorId << ", got: " << ev->Sender);
- LastKey = std::move(msg.LastKey);
-
- ui64 bytes = 0;
- ui64 rowsCount = 0;
- {
- auto guard = TaskRunner->BindAllocator();
- switch (msg.GetDataFormat()) {
- case NKikimrTxDataShard::EScanDataFormat::CELLVEC:
- case NKikimrTxDataShard::EScanDataFormat::UNSPECIFIED: {
- if (!msg.Rows.empty()) {
- bytes = ScanData->AddRows(msg.Rows, state.TabletId, TaskRunner->GetHolderFactory());
- rowsCount = msg.Rows.size();
- }
- break;
- }
- case NKikimrTxDataShard::EScanDataFormat::ARROW: {
- if (msg.ArrowBatch != nullptr) {
- bytes = ScanData->AddRows(*msg.ArrowBatch, state.TabletId, TaskRunner->GetHolderFactory());
- rowsCount = msg.ArrowBatch->num_rows();
- }
- break;
- }
- }
+ LastKey = std::move(msg.LastKey);
+ ui64 bytes = 0;
+ ui64 rowsCount = 0;
+ {
+ auto guard = TaskRunner->BindAllocator();
+ switch (msg.GetDataFormat()) {
+ case NKikimrTxDataShard::EScanDataFormat::CELLVEC:
+ case NKikimrTxDataShard::EScanDataFormat::UNSPECIFIED: {
+ if (!msg.Rows.empty()) {
+ bytes = ScanData->AddRows(msg.Rows, state.TabletId, TaskRunner->GetHolderFactory());
+ rowsCount = msg.Rows.size();
}
+ break;
+ }
+ case NKikimrTxDataShard::EScanDataFormat::ARROW: {
+ if (msg.ArrowBatch != nullptr) {
+ bytes = ScanData->AddRows(*msg.ArrowBatch, state.TabletId, TaskRunner->GetHolderFactory());
+ rowsCount = msg.ArrowBatch->num_rows();
+ }
+ break;
+ }
+ }
+ }
- CA_LOG_D("Got EvScanData, rows: " << rowsCount << ", bytes: " << bytes << ", finished: " << msg.Finished
- << ", from: " << ev->Sender << ", shards remain: " << Shards.size()
- << ", delayed for: " << latency.SecondsFloat() << " seconds by ratelimitter");
- if (rowsCount == 0 && !msg.Finished && state.State != EShardState::PostRunning) {
- ui64 freeSpace = GetMemoryLimits().ScanBufferSize > ScanData->GetStoredBytes()
- ? GetMemoryLimits().ScanBufferSize - ScanData->GetStoredBytes()
- : 0ul;
- SendScanDataAck(state, freeSpace);
- }
+ CA_LOG_D("Got EvScanData, rows: " << rowsCount << ", bytes: " << bytes << ", finished: " << msg.Finished
+ << ", from: " << ev->Sender << ", shards remain: " << Shards.size()
+ << ", delayed for: " << latency.SecondsFloat() << " seconds by ratelimitter");
- if (msg.Finished) {
- CA_LOG_D("Tablet " << state.TabletId << " scan finished, unlink");
- Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvUnlink(state.TabletId));
+ if (rowsCount == 0 && !msg.Finished && state.State != EShardState::PostRunning) {
+ ui64 freeSpace = GetMemoryLimits().ScanBufferSize > ScanData->GetStoredBytes()
+ ? GetMemoryLimits().ScanBufferSize - ScanData->GetStoredBytes()
+ : 0ul;
+ SendScanDataAck(state, freeSpace);
+ }
- Shards.pop_front();
+ if (msg.Finished) {
+ CA_LOG_D("Tablet " << state.TabletId << " scan finished, unlink");
+ Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvUnlink(state.TabletId));
- if (!Shards.empty()) {
- CA_LOG_D("Starting next scan");
- StartTableScan();
- } else {
- CA_LOG_D("Finish scans");
- ScanData->Finish();
+ Shards.pop_front();
- if (ScanData->BasicStats) {
- ScanData->BasicStats->AffectedShards = AffectedShards.size();
- }
- }
- }
+ if (!Shards.empty()) {
+ CA_LOG_D("Starting next scan");
+ StartTableScan();
+ } else {
+ CA_LOG_D("Finish scans");
+ ScanData->Finish();
- if (Y_UNLIKELY(ScanData->ProfileStats)) {
- ScanData->ProfileStats->Messages++;
- ScanData->ProfileStats->ScanCpuTime += msg.CpuTime;
- ScanData->ProfileStats->ScanWaitTime += msg.WaitTime;
- if (msg.PageFault) {
- ScanData->ProfileStats->PageFaults += msg.PageFaults;
- ScanData->ProfileStats->MessagesByPageFault++;
- }
- }
+ if (ScanData->BasicStats) {
+ ScanData->BasicStats->AffectedShards = AffectedShards.size();
+ }
+ }
+ }
- DoExecute();
+ if (Y_UNLIKELY(ScanData->ProfileStats)) {
+ ScanData->ProfileStats->Messages++;
+ ScanData->ProfileStats->ScanCpuTime += msg.CpuTime;
+ ScanData->ProfileStats->ScanWaitTime += msg.WaitTime;
+ if (msg.PageFault) {
+ ScanData->ProfileStats->PageFaults += msg.PageFaults;
+ ScanData->ProfileStats->MessagesByPageFault++;
+ }
+ }
+ }
+
+ void ProcessScanData() {
+ Y_VERIFY_DEBUG(ScanData);
+ Y_VERIFY_DEBUG(!Shards.empty());
+ Y_VERIFY(!PendingScanData.empty());
+
+ auto& ev = PendingScanData.front().first;
+ auto& msg = *ev->Get();
+ auto& state = Shards.front();
+
+ switch (state.State) {
+ case EShardState::Running:
+ case EShardState::PostRunning: {
+ if (state.Generation == msg.Generation) {
+ ProcessPendingScanDataItem();
+ DoExecute();
} else if (state.Generation > msg.Generation) {
TerminateExpiredScan(ev->Sender, "Cancel expired scan");
} else {
@@ -900,9 +909,6 @@ private:
<< ", attempt #" << state.ResolveAttempt);
auto request = MakeHolder<NSchemeCache::TSchemeCacheRequest>();
- // Avoid setting DomainOwnerId to reduce possible races with schemeshard migration
- // TODO: request->DatabaseName = ...;
- // TODO: request->UserToken = ...;
request->ResultSet.emplace_back(std::move(keyDesc));
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(ScanData->TableId, {}));
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvResolveKeySet(request));
@@ -999,7 +1005,7 @@ private:
std::deque<TShardState> Shards;
ui32 LastGeneration = 0;
std::set<ui64> AffectedShards;
- THashSet<ui32> TrackingNodes;
+ std::set<ui32> TrackingNodes;
};
} // anonymous namespace