aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVitalii Gridnev <gridnevvvit@gmail.com>2022-06-15 16:25:16 +0300
committerVitalii Gridnev <gridnevvvit@gmail.com>2022-06-15 16:25:16 +0300
commit6af27fb8b1cfd7374ed5be2d56ff87e51feb6e15 (patch)
tree7764b6caf96657fa20a89bfdef1c9203cf9f3091
parent11f4e2134fcb18d3e6ae0933ff320191aac8ad6d (diff)
downloadydb-6af27fb8b1cfd7374ed5be2d56ff87e51feb6e15.tar.gz
[kqp] support reading from multiple shards
ref:4d39c1447f4742fe6dcd03a2c71ec6676b8aa90d
-rw-r--r--ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp132
-rw-r--r--ydb/core/kqp/executer/kqp_scan_executer.cpp7
-rw-r--r--ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp4
3 files changed, 104 insertions, 39 deletions
diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
index d150bbd65c..e080777290 100644
--- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
+++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp
@@ -40,6 +40,44 @@ static constexpr ui64 MAX_SHARD_RESOLVES = 3;
static constexpr TDuration RL_MAX_BATCH_DELAY = TDuration::Seconds(50);
+struct TScannedDataStats {
+ std::map<ui64, std::pair<ui64, ui64>> ReadShardInfo;
+ ui64 CompletedShards = 0;
+ ui64 TotalReadRows = 0;
+ ui64 TotalReadBytes = 0;
+
+ TScannedDataStats()
+ {}
+
+ void AddReadStat(ui64 tabletId, ui64 rows, ui64 bytes) {
+ auto [it, success] = ReadShardInfo.emplace(tabletId, std::make_pair(rows, bytes));
+ if (!success) {
+ auto& [currentRows, currentBytes] = it->second;
+ currentRows += rows;
+ currentBytes += bytes;
+ }
+ }
+
+ void CompleteShard(ui64 tabletId) {
+ auto it = ReadShardInfo.find(tabletId);
+ YQL_ENSURE(it != ReadShardInfo.end());
+ auto& [currentRows, currentBytes] = it->second;
+ TotalReadRows += currentRows;
+ TotalReadBytes += currentBytes;
+ ++CompletedShards;
+ ReadShardInfo.erase(it);
+ }
+
+ ui64 AverageReadBytes() const {
+ return (CompletedShards == 0) ? 0 : TotalReadBytes / CompletedShards;
+ }
+
+ ui64 AverageReadRows() const {
+ return (CompletedShards == 0) ? 0 : TotalReadRows / CompletedShards;
+ }
+};
+
+
class TKqpScanComputeActor : public TDqComputeActorBase<TKqpScanComputeActor> {
using TBase = TDqComputeActorBase<TKqpScanComputeActor>;
@@ -253,7 +291,7 @@ private:
state->ActorId = scanActorId;
state->ResetRetry();
AffectedShards.insert(state->TabletId);
- SendScanDataAck(state, GetMemoryLimits().ScanBufferSize);
+ SendScanDataAck(state);
} else {
TerminateExpiredScan(scanActorId, "Got unexpected/expired EvScanInitActor, terminate it");
}
@@ -323,27 +361,23 @@ private:
}
}
+ Stats.AddReadStat(state->TabletId, rowsCount, bytes);
+
CA_LOG_D("Got EvScanData, rows: " << rowsCount << ", bytes: " << bytes << ", finished: " << msg.Finished
<< ", from: " << ev->Sender << ", shards remain: " << PendingShards.size()
<< ", in flight shards " << InFlightShards.size()
<< ", delayed for: " << latency.SecondsFloat() << " seconds by ratelimitter");
if (rowsCount == 0 && !msg.Finished && state->State != EShardState::PostRunning) {
- ui64 freeSpace = GetMemoryLimits().ScanBufferSize > ScanData->GetStoredBytes()
- ? GetMemoryLimits().ScanBufferSize - ScanData->GetStoredBytes()
- : 0ul;
- SendScanDataAck(state, freeSpace);
+ SendScanDataAck(state);
}
if (msg.Finished) {
CA_LOG_D("Tablet " << state->TabletId << " scan finished, unlink");
- Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvUnlink(state->TabletId));
+ Stats.CompleteShard(state->TabletId);
+ StopReadFromTablet(state);
- InFlightShards.erase(state->TabletId);
- if (!PendingShards.empty()) {
- CA_LOG_D("Starting next scan");
- StartTableScan();
- } else {
+ if (!StartTableScan()) {
CA_LOG_D("Finish scans");
ScanData->Finish();
@@ -416,8 +450,7 @@ private:
state->State = EShardState::Initial;
state->ActorId = {};
state->ResetRetry();
- PendingShards.emplace_front(std::move(*state));
- return StartTableScan();
+ return StartReadShard(state);
}
}
@@ -457,11 +490,9 @@ private:
PendingResolveShards.pop_front();
ResolveNextShard();
- CA_LOG_D("Get resolve result, unlink from tablet " << state.TabletId);
- Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvUnlink(state.TabletId));
+ StopReadFromTablet(&state);
YQL_ENSURE(state.State == EShardState::Resolving);
-
CA_LOG_D("Received TEvResolveKeySetResult update for table '" << ScanData->TablePath << "'");
auto* request = ev->Get()->Request.Get();
@@ -557,15 +588,23 @@ private:
PendingShards.emplace_front(std::move(newShards[i]));
}
- if (IsDebugLogEnabled(TlsActivationContext->ActorSystem(), NKikimrServices::KQP_COMPUTE)) {
+ if (IsDebugLogEnabled(TlsActivationContext->ActorSystem(), NKikimrServices::KQP_COMPUTE)
+ && PendingShards.size() + InFlightShards.size() > 0)
+ {
TStringBuilder sb;
- sb << "Pending shards States: ";
- for (auto& st : PendingShards) {
- sb << st.ToString(KeyColumnTypes) << "; ";
+ sb << "Last Key: " << PrintLastKey() << "; ";
+ if (!PendingShards.empty()) {
+ sb << "Pending shards States: ";
+ for (auto& st : PendingShards) {
+ sb << st.ToString(KeyColumnTypes) << "; ";
+ }
}
- sb << "In Flight shards States: ";
- for(auto& [_, st] : InFlightShards) {
- sb << st.ToString(KeyColumnTypes) << "; ";
+
+ if (!InFlightShards.empty()) {
+ sb << "In Flight shards States: ";
+ for(auto& [_, st] : InFlightShards) {
+ sb << st.ToString(KeyColumnTypes) << "; ";
+ }
}
CA_LOG_D(sb);
}
@@ -612,13 +651,32 @@ private:
}
private:
- void StartTableScan() {
- YQL_ENSURE(!PendingShards.empty());
- ui64 tabletId = PendingShards.front().TabletId;
- auto [it, success] = InFlightShards.emplace(tabletId, std::move(PendingShards.front()));
- PendingShards.pop_front();
- auto* state = &(it->second);
+ bool IsSortedOutput() const {
+ return Meta.HasSorted() ? Meta.GetSorted() : true;
+ }
+
+ bool StartTableScan() {
+ // allow reading from multiple shards if data is not sorted
+ const ui32 maxAllowedInFlight = IsSortedOutput() ? 1 : PendingShards.size();
+
+ while (!PendingShards.empty() && InFlightShards.size() + PendingResolveShards.size() + 1 <= maxAllowedInFlight) {
+ ui64 tabletId = PendingShards.front().TabletId;
+ auto [it, success] = InFlightShards.emplace(tabletId, std::move(PendingShards.front()));
+ PendingShards.pop_front();
+ StartReadShard(&(it->second));
+ }
+
+ CA_LOG_D("Scheduled table scans, in flight: " << InFlightShards.size() << " shards. "
+ << "pending shards to read: " << PendingShards.size() << ", "
+ << "pending resolve shards: " << PendingResolveShards.size() << ", "
+ << "average read rows: " << Stats.AverageReadRows() << ", "
+ << "average read bytes: " << Stats.AverageReadBytes() << ", ");
+
+ return InFlightShards.size() + PendingShards.size() + PendingResolveShards.size() > 0;
+ }
+
+ void StartReadShard(TShardState* state) {
YQL_ENSURE(state->State == EShardState::Initial);
state->State = EShardState::Starting;
state->Generation = AllocateGeneration(state);
@@ -626,7 +684,8 @@ private:
SendStartScanRequest(state, state->Generation);
}
- void SendScanDataAck(TShardState* state, ui64 freeSpace) {
+ void SendScanDataAck(TShardState* state) {
+ ui64 freeSpace = CalculateFreeSpace();
CA_LOG_D("Send EvScanDataAck to " << state->ActorId << ", freeSpace: " << freeSpace << ", gen: " << state->Generation);
ui32 flags = IEventHandle::FlagTrackDelivery;
if (TrackingNodes.insert(state->ActorId.NodeId()).second) {
@@ -646,6 +705,7 @@ private:
}
ev->Record.MutableSkipNullKeys()->CopyFrom(Meta.GetSkipNullKeys());
+ CA_LOG_D("Start scan request, " << state->ToString(KeyColumnTypes) << ", LastKey: " << PrintLastKey());
auto ranges = state->GetScanRanges(KeyColumnTypes, LastKey);
auto protoRanges = ev->Record.MutableRanges();
protoRanges->Reserve(ranges.size());
@@ -782,6 +842,12 @@ private:
}
}
+ void StopReadFromTablet(TShardState* state) {
+ CA_LOG_D("Unlink from tablet " << state->TabletId << " and stop reading from it.");
+ Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvUnlink(state->TabletId));
+ InFlightShards.erase(state->TabletId);
+ }
+
void ResolveShard(TShardState* state) {
if (state->ResolveAttempt >= MAX_SHARD_RESOLVES) {
InternalError(TIssuesIds::KIKIMR_SCHEME_ERROR, TStringBuilder()
@@ -840,7 +906,6 @@ private:
return;
}
-
for (auto it = InFlightShards.begin(); it != InFlightShards.end(); ++it) {
auto* state = &(it->second);
const ui64 freeSpace = CalculateFreeSpace();
@@ -852,9 +917,7 @@ private:
if (!ScanData->IsFinished() && state->State != EShardState::PostRunning
&& prevFreeSpace < freeSpace && state->ActorId)
{
- CA_LOG_T("[poll] Send EvScanDataAck to " << state->ActorId << ", gen: " << state->Generation
- << ", freeSpace: " << freeSpace);
- SendScanDataAck(state, freeSpace);
+ SendScanDataAck(state);
}
}
}
@@ -956,6 +1019,7 @@ private:
NMiniKQL::TKqpScanComputeContext ComputeCtx;
NKikimrKqp::TKqpSnapshot Snapshot;
TIntrusivePtr<TKqpCounters> Counters;
+ TScannedDataStats Stats;
NKikimrTxDataShard::TKqpTransaction::TScanTaskMeta Meta;
TVector<NScheme::TTypeId> KeyColumnTypes;
NMiniKQL::TKqpScanComputeContext::TScanData* ScanData = nullptr;
diff --git a/ydb/core/kqp/executer/kqp_scan_executer.cpp b/ydb/core/kqp/executer/kqp_scan_executer.cpp
index 01d1d911bb..5c7c57d6fe 100644
--- a/ydb/core/kqp/executer/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer/kqp_scan_executer.cpp
@@ -370,7 +370,7 @@ private:
}
private:
- void FillReadInfo(TTaskMeta& taskMeta, ui64 itemsLimit, bool reverse,
+ void FillReadInfo(TTaskMeta& taskMeta, ui64 itemsLimit, bool reverse, bool sorted,
const TMaybe<::NKqpProto::TKqpPhyOpReadOlapRanges>& readOlapRange)
{
if (taskMeta.Reads && !taskMeta.Reads.GetRef().empty()) {
@@ -389,6 +389,7 @@ private:
taskMeta.ReadInfo.ItemsLimit = itemsLimit;
taskMeta.ReadInfo.Reverse = reverse;
+ taskMeta.ReadInfo.Sorted = sorted;
if (!readOlapRange || readOlapRange->GetOlapProgram().empty()) {
return;
@@ -512,9 +513,9 @@ private:
if (op.GetTypeCase() == NKqpProto::TKqpPhyTableOperation::kReadOlapRange) {
const auto& readRange = op.GetReadOlapRange();
- FillReadInfo(task.Meta, itemsLimit, reverse, readRange);
+ FillReadInfo(task.Meta, itemsLimit, reverse, sorted, readRange);
} else {
- FillReadInfo(task.Meta, itemsLimit, reverse, TMaybe<::NKqpProto::TKqpPhyOpReadOlapRanges>());
+ FillReadInfo(task.Meta, itemsLimit, reverse, sorted, TMaybe<::NKqpProto::TKqpPhyOpReadOlapRanges>());
}
if (!task.Meta.Reads) {
diff --git a/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp b/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp
index f059e5eb1c..a1e8d5f5ea 100644
--- a/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_kqp_scan.cpp
@@ -173,7 +173,7 @@ Y_UNIT_TEST_SUITE(KqpScan) {
auto &runtime = *server->GetRuntime();
auto sender = runtime.AllocateEdgeActor();
- // EnableLogging(runtime);
+ EnableLogging(runtime);
InitRoot(server, sender);
CreateShardedTable(server, sender, "/Root", "table-1", 7);
@@ -265,7 +265,7 @@ Y_UNIT_TEST_SUITE(KqpScan) {
auto sender = runtime.AllocateEdgeActor();
auto senderSplit = runtime.AllocateEdgeActor();
- // EnableLogging(runtime);
+ EnableLogging(runtime);
SetSplitMergePartCountLimit(&runtime, -1);