diff options
author | ssmike <ssmike@ydb.tech> | 2023-03-02 20:13:13 +0300 |
---|---|---|
committer | ssmike <ssmike@ydb.tech> | 2023-03-02 20:13:13 +0300 |
commit | b2e129b230222de5a18d2b988dc1d96b4a83eb73 (patch) | |
tree | c77717a6cb33c79ddd80a1d289dc7a1d296f50d1 | |
parent | 77c34dfac0f80534c62e53ae4648baa35e1a342a (diff) | |
download | ydb-b2e129b230222de5a18d2b988dc1d96b4a83eb73.tar.gz |
support point reads in shard reresolve
-rw-r--r-- | ydb/core/kqp/runtime/kqp_read_actor.cpp | 70 | ||||
-rw-r--r-- | ydb/core/kqp/ut/scan/kqp_split_ut.cpp | 48 |
2 files changed, 97 insertions, 21 deletions
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp index d65a6a7421..2dd9222f50 100644 --- a/ydb/core/kqp/runtime/kqp_read_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp @@ -572,19 +572,23 @@ public: TVector<THolder<TShardState>> newShards; newShards.reserve(keyDesc->GetPartitions().size()); + auto bounds = state->GetBounds(Settings.GetReverse()); + size_t pointIndex = 0; + for (ui64 idx = 0, i = 0; idx < keyDesc->GetPartitions().size(); ++idx) { const auto& partition = keyDesc->GetPartitions()[idx]; TTableRange partitionRange{ - idx == 0 ? state->Ranges.front().From.GetCells() : keyDesc->GetPartitions()[idx - 1].Range->EndKeyPrefix.GetCells(), - idx == 0 ? state->Ranges.front().FromInclusive : !keyDesc->GetPartitions()[idx - 1].Range->IsInclusive, + idx == 0 ? bounds.From : keyDesc->GetPartitions()[idx - 1].Range->EndKeyPrefix.GetCells(), + idx == 0 ? bounds.InclusiveFrom : !keyDesc->GetPartitions()[idx - 1].Range->IsInclusive, keyDesc->GetPartitions()[idx].Range->EndKeyPrefix.GetCells(), keyDesc->GetPartitions()[idx].Range->IsInclusive }; CA_LOG_D("Processing resolved ShardId# " << partition.ShardId << ", partition range: " << DebugPrintRange(KeyColumnTypes, partitionRange, tr) - << ", i: " << i << ", state ranges: " << state->Ranges.size()); + << ", i: " << i << ", state ranges: " << state->Ranges.size() + << ", points: " << state->Points.size()); auto newShard = MakeHolder<TShardState>(partition.ShardId); @@ -592,28 +596,52 @@ public: newShard->AssignContinuationToken(state.Get()); } - for (ui64 j = i; j < state->Ranges.size(); ++j) { - CA_LOG_D("Intersect state range #" << j << " " << DebugPrintRange(KeyColumnTypes, state->Ranges[j].ToTableRange(), tr) - << " with partition range " << DebugPrintRange(KeyColumnTypes, partitionRange, tr)); + if (state->Points.empty()) { + Y_ASSERT(!state->Ranges.empty()); - auto intersection = Intersect(KeyColumnTypes, partitionRange, state->Ranges[j].ToTableRange()); + for (ui64 j = i; j < state->Ranges.size(); ++j) { + CA_LOG_D("Intersect state range #" << j << " " << DebugPrintRange(KeyColumnTypes, state->Ranges[j].ToTableRange(), tr) + << " with partition range " << DebugPrintRange(KeyColumnTypes, partitionRange, tr)); - if (!intersection.IsEmptyRange(KeyColumnTypes)) { - CA_LOG_D("Add range to new shardId: " << partition.ShardId - << ", range: " << DebugPrintRange(KeyColumnTypes, intersection, tr)); + auto intersection = Intersect(KeyColumnTypes, partitionRange, state->Ranges[j].ToTableRange()); - newShard->Ranges.emplace_back(TSerializedTableRange(intersection)); - } else { - CA_LOG_D("empty intersection"); - if (j > i) { - i = j - 1; + if (!intersection.IsEmptyRange(KeyColumnTypes)) { + CA_LOG_D("Add range to new shardId: " << partition.ShardId + << ", range: " << DebugPrintRange(KeyColumnTypes, intersection, tr)); + + newShard->Ranges.emplace_back(TSerializedTableRange(intersection)); + } else { + CA_LOG_D("empty intersection"); + if (j > i) { + i = j - 1; + } + break; } - break; } - } - if (!newShard->Ranges.empty()) { - newShards.push_back(std::move(newShard)); + if (!newShard->Ranges.empty()) { + newShards.push_back(std::move(newShard)); + } + } else { + while (pointIndex < state->Points.size()) { + int intersection = ComparePointAndRange( + state->Points[pointIndex].GetCells(), + partitionRange, + KeyColumnTypes, + KeyColumnTypes); + + if (intersection == 0) { + newShard->Points.push_back(state->Points[pointIndex]); + CA_LOG_D("Add point to new shardId: " << partition.ShardId); + } + if (intersection < 0) { + break; + } + pointIndex += 1; + } + if (!newShard->Points.empty()) { + newShards.push_back(std::move(newShard)); + } } } @@ -838,8 +866,8 @@ public: ReceivedRowCount += ev->Get()->GetRowsCount(); CA_LOG_D(TStringBuilder() << "new data for read #" << id << " seqno = " << ev->Get()->Record.GetSeqNo() - << " finished = " << ev->Get()->Record.GetFinished() - << " pushed " << DebugPrintCells(ev->Get())); + << " finished = " << ev->Get()->Record.GetFinished()); + CA_LOG_T(TStringBuilder() << "read #" << id << " pushed " << DebugPrintCells(ev->Get())); Results.push({Reads[id].Shard->TabletId, THolder<TEventHandle<TEvDataShard::TEvReadResult>>(ev.Release())}); Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex)); } diff --git a/ydb/core/kqp/ut/scan/kqp_split_ut.cpp b/ydb/core/kqp/ut/scan/kqp_split_ut.cpp index 519e4723c6..84c6b36aeb 100644 --- a/ydb/core/kqp/ut/scan/kqp_split_ut.cpp +++ b/ydb/core/kqp/ut/scan/kqp_split_ut.cpp @@ -598,6 +598,54 @@ Y_UNIT_TEST_SUITE(KqpSplit) { UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetRef().GetYdbStatus(), Ydb::StatusIds::SUCCESS); UNIT_ASSERT_VALUES_EQUAL(Format(Canonize(collectedKeys, Order)), ALL); } + + Y_UNIT_TEST_SORT(AfterResolvePoints, Order) { + TKikimrSettings settings; + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(true); + settings.SetDomainRoot(KikimrDefaultUtDomainRoot); + TFeatureFlags flags; + flags.SetEnablePredicateExtractForScanQueries(true); + settings.SetFeatureFlags(flags); + settings.SetAppConfig(appConfig); + + TKikimrRunner kikimr(settings); + + auto db = kikimr.GetTableClient(); + + auto& server = kikimr.GetTestServer(); + auto* runtime = server.GetRuntime(); + Y_UNUSED(runtime); + auto kqpProxy = MakeKqpProxyID(runtime->GetNodeId(0)); + + auto sender = runtime->AllocateEdgeActor(); + auto shards = GetTableShards(&server, sender, "/Root/KeyValueLargePartition"); + + TVector<ui64> collectedKeys; + CollectKeysTo(&collectedKeys, runtime, sender); + + auto* shim = new TReadActorPipeCacheStub(); + InterceptReadActorPipeCache(runtime->Register(shim)); + shim->SetupCapture(0, 5); + SendScanQuery(runtime, kqpProxy, sender, + "PRAGMA Kikimr.OptEnablePredicateExtract=\"false\"; SELECT Key FROM `/Root/KeyValueLargePartition` where Key in (103, 302, 402, 502, 703)" + OrderBy(Order)); + + shim->ReadsReceived.WaitI(); + Cerr << "starting split -----------------------------------------------------------" << Endl; + SetSplitMergePartCountLimit(runtime, -1); + { + auto senderSplit = runtime->AllocateEdgeActor(); + ui64 txId = AsyncSplitTable(&server, senderSplit, "/Root/KeyValueLargePartition", shards.at(0), 400); + WaitTxNotification(&server, senderSplit, txId); + } + Cerr << "resume evread -----------------------------------------------------------" << Endl; + shim->SkipAll(); + shim->SendCaptured(runtime); + + auto reply = runtime->GrabEdgeEventRethrow<TEvKqp::TEvQueryResponse>(sender); + UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetRef().GetYdbStatus(), Ydb::StatusIds::SUCCESS); + UNIT_ASSERT_VALUES_EQUAL(Format(Canonize(collectedKeys, Order)), ",103,302,402,502,703"); + } } |