diff options
author | Vitalii Gridnev <gridnevvvit@gmail.com> | 2022-06-15 16:25:16 +0300 |
---|---|---|
committer | Vitalii Gridnev <gridnevvvit@gmail.com> | 2022-06-15 16:25:16 +0300 |
commit | 6af27fb8b1cfd7374ed5be2d56ff87e51feb6e15 (patch) | |
tree | 7764b6caf96657fa20a89bfdef1c9203cf9f3091 | |
parent | 11f4e2134fcb18d3e6ae0933ff320191aac8ad6d (diff) | |
download | ydb-6af27fb8b1cfd7374ed5be2d56ff87e51feb6e15.tar.gz |
[kqp] support reading from multiple shards
ref:4d39c1447f4742fe6dcd03a2c71ec6676b8aa90d
-rw-r--r-- | ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp | 132 | ||||
-rw-r--r-- | ydb/core/kqp/executer/kqp_scan_executer.cpp | 7 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp | 4 |
3 files changed, 104 insertions, 39 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp index d150bbd65c..e080777290 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp @@ -40,6 +40,44 @@ static constexpr ui64 MAX_SHARD_RESOLVES = 3; static constexpr TDuration RL_MAX_BATCH_DELAY = TDuration::Seconds(50); +struct TScannedDataStats { + std::map<ui64, std::pair<ui64, ui64>> ReadShardInfo; + ui64 CompletedShards = 0; + ui64 TotalReadRows = 0; + ui64 TotalReadBytes = 0; + + TScannedDataStats() + {} + + void AddReadStat(ui64 tabletId, ui64 rows, ui64 bytes) { + auto [it, success] = ReadShardInfo.emplace(tabletId, std::make_pair(rows, bytes)); + if (!success) { + auto& [currentRows, currentBytes] = it->second; + currentRows += rows; + currentBytes += bytes; + } + } + + void CompleteShard(ui64 tabletId) { + auto it = ReadShardInfo.find(tabletId); + YQL_ENSURE(it != ReadShardInfo.end()); + auto& [currentRows, currentBytes] = it->second; + TotalReadRows += currentRows; + TotalReadBytes += currentBytes; + ++CompletedShards; + ReadShardInfo.erase(it); + } + + ui64 AverageReadBytes() const { + return (CompletedShards == 0) ? 0 : TotalReadBytes / CompletedShards; + } + + ui64 AverageReadRows() const { + return (CompletedShards == 0) ? 0 : TotalReadRows / CompletedShards; + } +}; + + class TKqpScanComputeActor : public TDqComputeActorBase<TKqpScanComputeActor> { using TBase = TDqComputeActorBase<TKqpScanComputeActor>; @@ -253,7 +291,7 @@ private: state->ActorId = scanActorId; state->ResetRetry(); AffectedShards.insert(state->TabletId); - SendScanDataAck(state, GetMemoryLimits().ScanBufferSize); + SendScanDataAck(state); } else { TerminateExpiredScan(scanActorId, "Got unexpected/expired EvScanInitActor, terminate it"); } @@ -323,27 +361,23 @@ private: } } + Stats.AddReadStat(state->TabletId, rowsCount, bytes); + CA_LOG_D("Got EvScanData, rows: " << rowsCount << ", bytes: " << bytes << ", finished: " << msg.Finished << ", from: " << ev->Sender << ", shards remain: " << PendingShards.size() << ", in flight shards " << InFlightShards.size() << ", delayed for: " << latency.SecondsFloat() << " seconds by ratelimitter"); if (rowsCount == 0 && !msg.Finished && state->State != EShardState::PostRunning) { - ui64 freeSpace = GetMemoryLimits().ScanBufferSize > ScanData->GetStoredBytes() - ? GetMemoryLimits().ScanBufferSize - ScanData->GetStoredBytes() - : 0ul; - SendScanDataAck(state, freeSpace); + SendScanDataAck(state); } if (msg.Finished) { CA_LOG_D("Tablet " << state->TabletId << " scan finished, unlink"); - Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvUnlink(state->TabletId)); + Stats.CompleteShard(state->TabletId); + StopReadFromTablet(state); - InFlightShards.erase(state->TabletId); - if (!PendingShards.empty()) { - CA_LOG_D("Starting next scan"); - StartTableScan(); - } else { + if (!StartTableScan()) { CA_LOG_D("Finish scans"); ScanData->Finish(); @@ -416,8 +450,7 @@ private: state->State = EShardState::Initial; state->ActorId = {}; state->ResetRetry(); - PendingShards.emplace_front(std::move(*state)); - return StartTableScan(); + return StartReadShard(state); } } @@ -457,11 +490,9 @@ private: PendingResolveShards.pop_front(); ResolveNextShard(); - CA_LOG_D("Get resolve result, unlink from tablet " << state.TabletId); - Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvUnlink(state.TabletId)); + StopReadFromTablet(&state); YQL_ENSURE(state.State == EShardState::Resolving); - CA_LOG_D("Received TEvResolveKeySetResult update for table '" << ScanData->TablePath << "'"); auto* request = ev->Get()->Request.Get(); @@ -557,15 +588,23 @@ private: PendingShards.emplace_front(std::move(newShards[i])); } - if (IsDebugLogEnabled(TlsActivationContext->ActorSystem(), NKikimrServices::KQP_COMPUTE)) { + if (IsDebugLogEnabled(TlsActivationContext->ActorSystem(), NKikimrServices::KQP_COMPUTE) + && PendingShards.size() + InFlightShards.size() > 0) + { TStringBuilder sb; - sb << "Pending shards States: "; - for (auto& st : PendingShards) { - sb << st.ToString(KeyColumnTypes) << "; "; + sb << "Last Key: " << PrintLastKey() << "; "; + if (!PendingShards.empty()) { + sb << "Pending shards States: "; + for (auto& st : PendingShards) { + sb << st.ToString(KeyColumnTypes) << "; "; + } } - sb << "In Flight shards States: "; - for(auto& [_, st] : InFlightShards) { - sb << st.ToString(KeyColumnTypes) << "; "; + + if (!InFlightShards.empty()) { + sb << "In Flight shards States: "; + for(auto& [_, st] : InFlightShards) { + sb << st.ToString(KeyColumnTypes) << "; "; + } } CA_LOG_D(sb); } @@ -612,13 +651,32 @@ private: } private: - void StartTableScan() { - YQL_ENSURE(!PendingShards.empty()); - ui64 tabletId = PendingShards.front().TabletId; - auto [it, success] = InFlightShards.emplace(tabletId, std::move(PendingShards.front())); - PendingShards.pop_front(); - auto* state = &(it->second); + bool IsSortedOutput() const { + return Meta.HasSorted() ? Meta.GetSorted() : true; + } + + bool StartTableScan() { + // allow reading from multiple shards if data is not sorted + const ui32 maxAllowedInFlight = IsSortedOutput() ? 1 : PendingShards.size(); + + while (!PendingShards.empty() && InFlightShards.size() + PendingResolveShards.size() + 1 <= maxAllowedInFlight) { + ui64 tabletId = PendingShards.front().TabletId; + auto [it, success] = InFlightShards.emplace(tabletId, std::move(PendingShards.front())); + PendingShards.pop_front(); + StartReadShard(&(it->second)); + } + + CA_LOG_D("Scheduled table scans, in flight: " << InFlightShards.size() << " shards. " + << "pending shards to read: " << PendingShards.size() << ", " + << "pending resolve shards: " << PendingResolveShards.size() << ", " + << "average read rows: " << Stats.AverageReadRows() << ", " + << "average read bytes: " << Stats.AverageReadBytes() << ", "); + + return InFlightShards.size() + PendingShards.size() + PendingResolveShards.size() > 0; + } + + void StartReadShard(TShardState* state) { YQL_ENSURE(state->State == EShardState::Initial); state->State = EShardState::Starting; state->Generation = AllocateGeneration(state); @@ -626,7 +684,8 @@ private: SendStartScanRequest(state, state->Generation); } - void SendScanDataAck(TShardState* state, ui64 freeSpace) { + void SendScanDataAck(TShardState* state) { + ui64 freeSpace = CalculateFreeSpace(); CA_LOG_D("Send EvScanDataAck to " << state->ActorId << ", freeSpace: " << freeSpace << ", gen: " << state->Generation); ui32 flags = IEventHandle::FlagTrackDelivery; if (TrackingNodes.insert(state->ActorId.NodeId()).second) { @@ -646,6 +705,7 @@ private: } ev->Record.MutableSkipNullKeys()->CopyFrom(Meta.GetSkipNullKeys()); + CA_LOG_D("Start scan request, " << state->ToString(KeyColumnTypes) << ", LastKey: " << PrintLastKey()); auto ranges = state->GetScanRanges(KeyColumnTypes, LastKey); auto protoRanges = ev->Record.MutableRanges(); protoRanges->Reserve(ranges.size()); @@ -782,6 +842,12 @@ private: } } + void StopReadFromTablet(TShardState* state) { + CA_LOG_D("Unlink from tablet " << state->TabletId << " and stop reading from it."); + Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvUnlink(state->TabletId)); + InFlightShards.erase(state->TabletId); + } + void ResolveShard(TShardState* state) { if (state->ResolveAttempt >= MAX_SHARD_RESOLVES) { InternalError(TIssuesIds::KIKIMR_SCHEME_ERROR, TStringBuilder() @@ -840,7 +906,6 @@ private: return; } - for (auto it = InFlightShards.begin(); it != InFlightShards.end(); ++it) { auto* state = &(it->second); const ui64 freeSpace = CalculateFreeSpace(); @@ -852,9 +917,7 @@ private: if (!ScanData->IsFinished() && state->State != EShardState::PostRunning && prevFreeSpace < freeSpace && state->ActorId) { - CA_LOG_T("[poll] Send EvScanDataAck to " << state->ActorId << ", gen: " << state->Generation - << ", freeSpace: " << freeSpace); - SendScanDataAck(state, freeSpace); + SendScanDataAck(state); } } } @@ -956,6 +1019,7 @@ private: NMiniKQL::TKqpScanComputeContext ComputeCtx; NKikimrKqp::TKqpSnapshot Snapshot; TIntrusivePtr<TKqpCounters> Counters; + TScannedDataStats Stats; NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta Meta; TVector<NScheme::TTypeId> KeyColumnTypes; NMiniKQL::TKqpScanComputeContext::TScanData* ScanData = nullptr; diff --git a/ydb/core/kqp/executer/kqp_scan_executer.cpp b/ydb/core/kqp/executer/kqp_scan_executer.cpp index 01d1d911bb..5c7c57d6fe 100644 --- a/ydb/core/kqp/executer/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer/kqp_scan_executer.cpp @@ -370,7 +370,7 @@ private: } private: - void FillReadInfo(TTaskMeta& taskMeta, ui64 itemsLimit, bool reverse, + void FillReadInfo(TTaskMeta& taskMeta, ui64 itemsLimit, bool reverse, bool sorted, const TMaybe<::NKqpProto::TKqpPhyOpReadOlapRanges>& readOlapRange) { if (taskMeta.Reads && !taskMeta.Reads.GetRef().empty()) { @@ -389,6 +389,7 @@ private: taskMeta.ReadInfo.ItemsLimit = itemsLimit; taskMeta.ReadInfo.Reverse = reverse; + taskMeta.ReadInfo.Sorted = sorted; if (!readOlapRange || readOlapRange->GetOlapProgram().empty()) { return; @@ -512,9 +513,9 @@ private: if (op.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kReadOlapRange) { const auto& readRange = op.GetReadOlapRange(); - FillReadInfo(task.Meta, itemsLimit, reverse, readRange); + FillReadInfo(task.Meta, itemsLimit, reverse, sorted, readRange); } else { - FillReadInfo(task.Meta, itemsLimit, reverse, TMaybe<::NKqpProto::TKqpPhyOpReadOlapRanges>()); + FillReadInfo(task.Meta, itemsLimit, reverse, sorted, TMaybe<::NKqpProto::TKqpPhyOpReadOlapRanges>()); } if (!task.Meta.Reads) { diff --git a/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp b/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp index f059e5eb1c..a1e8d5f5ea 100644 --- a/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp +++ b/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp @@ -173,7 +173,7 @@ Y_UNIT_TEST_SUITE(KqpScan) { auto &runtime = *server->GetRuntime(); auto sender = runtime.AllocateEdgeActor(); - // EnableLogging(runtime); + EnableLogging(runtime); InitRoot(server, sender); CreateShardedTable(server, sender, "/Root", "table-1", 7); @@ -265,7 +265,7 @@ Y_UNIT_TEST_SUITE(KqpScan) { auto sender = runtime.AllocateEdgeActor(); auto senderSplit = runtime.AllocateEdgeActor(); - // EnableLogging(runtime); + EnableLogging(runtime); SetSplitMergePartCountLimit(&runtime, -1); |