aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorssmike <ssmike@ydb.tech>2023-03-02 20:13:13 +0300
committerssmike <ssmike@ydb.tech>2023-03-02 20:13:13 +0300
commitb2e129b230222de5a18d2b988dc1d96b4a83eb73 (patch)
treec77717a6cb33c79ddd80a1d289dc7a1d296f50d1
parent77c34dfac0f80534c62e53ae4648baa35e1a342a (diff)
downloadydb-b2e129b230222de5a18d2b988dc1d96b4a83eb73.tar.gz
support point reads in shard reresolve
-rw-r--r--ydb/core/kqp/runtime/kqp_read_actor.cpp70
-rw-r--r--ydb/core/kqp/ut/scan/kqp_split_ut.cpp48
2 files changed, 97 insertions, 21 deletions
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp
index d65a6a7421..2dd9222f50 100644
--- a/ydb/core/kqp/runtime/kqp_read_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp
@@ -572,19 +572,23 @@ public:
TVector<THolder<TShardState>> newShards;
newShards.reserve(keyDesc->GetPartitions().size());
+ auto bounds = state->GetBounds(Settings.GetReverse());
+ size_t pointIndex = 0;
+
for (ui64 idx = 0, i = 0; idx < keyDesc->GetPartitions().size(); ++idx) {
const auto& partition = keyDesc->GetPartitions()[idx];
TTableRange partitionRange{
- idx == 0 ? state->Ranges.front().From.GetCells() : keyDesc->GetPartitions()[idx - 1].Range->EndKeyPrefix.GetCells(),
- idx == 0 ? state->Ranges.front().FromInclusive : !keyDesc->GetPartitions()[idx - 1].Range->IsInclusive,
+ idx == 0 ? bounds.From : keyDesc->GetPartitions()[idx - 1].Range->EndKeyPrefix.GetCells(),
+ idx == 0 ? bounds.InclusiveFrom : !keyDesc->GetPartitions()[idx - 1].Range->IsInclusive,
keyDesc->GetPartitions()[idx].Range->EndKeyPrefix.GetCells(),
keyDesc->GetPartitions()[idx].Range->IsInclusive
};
CA_LOG_D("Processing resolved ShardId# " << partition.ShardId
<< ", partition range: " << DebugPrintRange(KeyColumnTypes, partitionRange, tr)
- << ", i: " << i << ", state ranges: " << state->Ranges.size());
+ << ", i: " << i << ", state ranges: " << state->Ranges.size()
+ << ", points: " << state->Points.size());
auto newShard = MakeHolder<TShardState>(partition.ShardId);
@@ -592,28 +596,52 @@ public:
newShard->AssignContinuationToken(state.Get());
}
- for (ui64 j = i; j < state->Ranges.size(); ++j) {
- CA_LOG_D("Intersect state range #" << j << " " << DebugPrintRange(KeyColumnTypes, state->Ranges[j].ToTableRange(), tr)
- << " with partition range " << DebugPrintRange(KeyColumnTypes, partitionRange, tr));
+ if (state->Points.empty()) {
+ Y_ASSERT(!state->Ranges.empty());
- auto intersection = Intersect(KeyColumnTypes, partitionRange, state->Ranges[j].ToTableRange());
+ for (ui64 j = i; j < state->Ranges.size(); ++j) {
+ CA_LOG_D("Intersect state range #" << j << " " << DebugPrintRange(KeyColumnTypes, state->Ranges[j].ToTableRange(), tr)
+ << " with partition range " << DebugPrintRange(KeyColumnTypes, partitionRange, tr));
- if (!intersection.IsEmptyRange(KeyColumnTypes)) {
- CA_LOG_D("Add range to new shardId: " << partition.ShardId
- << ", range: " << DebugPrintRange(KeyColumnTypes, intersection, tr));
+ auto intersection = Intersect(KeyColumnTypes, partitionRange, state->Ranges[j].ToTableRange());
- newShard->Ranges.emplace_back(TSerializedTableRange(intersection));
- } else {
- CA_LOG_D("empty intersection");
- if (j > i) {
- i = j - 1;
+ if (!intersection.IsEmptyRange(KeyColumnTypes)) {
+ CA_LOG_D("Add range to new shardId: " << partition.ShardId
+ << ", range: " << DebugPrintRange(KeyColumnTypes, intersection, tr));
+
+ newShard->Ranges.emplace_back(TSerializedTableRange(intersection));
+ } else {
+ CA_LOG_D("empty intersection");
+ if (j > i) {
+ i = j - 1;
+ }
+ break;
}
- break;
}
- }
- if (!newShard->Ranges.empty()) {
- newShards.push_back(std::move(newShard));
+ if (!newShard->Ranges.empty()) {
+ newShards.push_back(std::move(newShard));
+ }
+ } else {
+ while (pointIndex < state->Points.size()) {
+ int intersection = ComparePointAndRange(
+ state->Points[pointIndex].GetCells(),
+ partitionRange,
+ KeyColumnTypes,
+ KeyColumnTypes);
+
+ if (intersection == 0) {
+ newShard->Points.push_back(state->Points[pointIndex]);
+ CA_LOG_D("Add point to new shardId: " << partition.ShardId);
+ }
+ if (intersection < 0) {
+ break;
+ }
+ pointIndex += 1;
+ }
+ if (!newShard->Points.empty()) {
+ newShards.push_back(std::move(newShard));
+ }
}
}
@@ -838,8 +866,8 @@ public:
ReceivedRowCount += ev->Get()->GetRowsCount();
CA_LOG_D(TStringBuilder() << "new data for read #" << id
<< " seqno = " << ev->Get()->Record.GetSeqNo()
- << " finished = " << ev->Get()->Record.GetFinished()
- << " pushed " << DebugPrintCells(ev->Get()));
+ << " finished = " << ev->Get()->Record.GetFinished());
+ CA_LOG_T(TStringBuilder() << "read #" << id << " pushed " << DebugPrintCells(ev->Get()));
Results.push({Reads[id].Shard->TabletId, THolder<TEventHandle<TEvDataShard::TEvReadResult>>(ev.Release())});
Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex));
}
diff --git a/ydb/core/kqp/ut/scan/kqp_split_ut.cpp b/ydb/core/kqp/ut/scan/kqp_split_ut.cpp
index 519e4723c6..84c6b36aeb 100644
--- a/ydb/core/kqp/ut/scan/kqp_split_ut.cpp
+++ b/ydb/core/kqp/ut/scan/kqp_split_ut.cpp
@@ -598,6 +598,54 @@ Y_UNIT_TEST_SUITE(KqpSplit) {
UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetRef().GetYdbStatus(), Ydb::StatusIds::SUCCESS);
UNIT_ASSERT_VALUES_EQUAL(Format(Canonize(collectedKeys, Order)), ALL);
}
+
+ Y_UNIT_TEST_SORT(AfterResolvePoints, Order) {
+ TKikimrSettings settings;
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(true);
+ settings.SetDomainRoot(KikimrDefaultUtDomainRoot);
+ TFeatureFlags flags;
+ flags.SetEnablePredicateExtractForScanQueries(true);
+ settings.SetFeatureFlags(flags);
+ settings.SetAppConfig(appConfig);
+
+ TKikimrRunner kikimr(settings);
+
+ auto db = kikimr.GetTableClient();
+
+ auto& server = kikimr.GetTestServer();
+ auto* runtime = server.GetRuntime();
+ Y_UNUSED(runtime);
+ auto kqpProxy = MakeKqpProxyID(runtime->GetNodeId(0));
+
+ auto sender = runtime->AllocateEdgeActor();
+ auto shards = GetTableShards(&server, sender, "/Root/KeyValueLargePartition");
+
+ TVector<ui64> collectedKeys;
+ CollectKeysTo(&collectedKeys, runtime, sender);
+
+ auto* shim = new TReadActorPipeCacheStub();
+ InterceptReadActorPipeCache(runtime->Register(shim));
+ shim->SetupCapture(0, 5);
+ SendScanQuery(runtime, kqpProxy, sender,
+ "PRAGMA Kikimr.OptEnablePredicateExtract=\"false\"; SELECT Key FROM `/Root/KeyValueLargePartition` where Key in (103, 302, 402, 502, 703)" + OrderBy(Order));
+
+ shim->ReadsReceived.WaitI();
+ Cerr << "starting split -----------------------------------------------------------" << Endl;
+ SetSplitMergePartCountLimit(runtime, -1);
+ {
+ auto senderSplit = runtime->AllocateEdgeActor();
+ ui64 txId = AsyncSplitTable(&server, senderSplit, "/Root/KeyValueLargePartition", shards.at(0), 400);
+ WaitTxNotification(&server, senderSplit, txId);
+ }
+ Cerr << "resume evread -----------------------------------------------------------" << Endl;
+ shim->SkipAll();
+ shim->SendCaptured(runtime);
+
+ auto reply = runtime->GrabEdgeEventRethrow<TEvKqp::TEvQueryResponse>(sender);
+ UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetRef().GetYdbStatus(), Ydb::StatusIds::SUCCESS);
+ UNIT_ASSERT_VALUES_EQUAL(Format(Canonize(collectedKeys, Order)), ",103,302,402,502,703");
+ }
}