diff options
author | ssmike <ssmike@ydb.tech> | 2023-03-07 20:11:10 +0300 |
---|---|---|
committer | ssmike <ssmike@ydb.tech> | 2023-03-07 20:11:10 +0300 |
commit | 66db41e47f76c3f2902855ca1c84509f35e42ff2 (patch) | |
tree | a5173658f5bcdd9cefbcc6e5223edb54e2a1b98b | |
parent | 4535e7738c8863c638701ec4f40badae92a73b98 (diff) | |
download | ydb-66db41e47f76c3f2902855ca1c84509f35e42ff2.tar.gz |
Fix split handling in multi-range requests
-rw-r--r-- | ydb/core/kqp/runtime/kqp_read_actor.cpp | 113 | ||||
-rw-r--r-- | ydb/core/kqp/ut/scan/kqp_split_ut.cpp | 330 |
2 files changed, 236 insertions, 207 deletions
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp index 160b43152e..b6d0e78d0e 100644 --- a/ydb/core/kqp/runtime/kqp_read_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp @@ -94,8 +94,6 @@ public: }; struct TShardState : public TIntrusiveListItem<TShardState> { - TSmallVec<TSerializedTableRange> Ranges; - TSmallVec<TSerializedCellVec> Points; TOwnedCellVec LastKey; TMaybe<ui32> FirstUnprocessedRequest; @@ -107,13 +105,6 @@ public: size_t ResolveAttempt = 0; size_t RetryAttempt = 0; - void AssignContinuationToken(TShardState* state) { - if (state->LastKey.DataSize() != 0) { - LastKey = std::move(state->LastKey); - } - FirstUnprocessedRequest = state->FirstUnprocessedRequest; - } - TShardState(ui64 tabletId) : TabletId(tabletId) { @@ -247,9 +238,6 @@ public: // And push all others result.insert(result.end(), rangeIt, Ranges.end()); } - for (auto& range : result) { - MakePrefixRange(range, keyTypes.size()); - } } void FillUnprocessedPoints(TVector<TSerializedCellVec>& result, bool reverse) const { @@ -267,6 +255,9 @@ public: FillUnprocessedPoints(ev.Keys, reversed); } else { FillUnprocessedRanges(ev.Ranges, keyTypes, reversed); + for (auto& range : ev.Ranges) { + MakePrefixRange(range, keyTypes.size()); + } } } @@ -291,6 +282,26 @@ public: } return DebugPrintPoint(keyTypes, LastKey, *AppData()->TypeRegistry); } + + bool HasRanges() { + return !Ranges.empty(); + } + + bool HasPoints() { + return !Points.empty(); + } + + void AddRange(TSerializedTableRange&& range) { + Ranges.push_back(std::move(range)); + } + + void AddPoint(TSerializedCellVec&& point) { + Points.push_back(std::move(point)); + } + + private: + TSmallVec<TSerializedTableRange> Ranges; + TSmallVec<TSerializedCellVec> Points; }; using TShardQueue = TIntrusiveListWithAutoDelete<TShardState, TDelete>; @@ -410,17 +421,17 @@ public: auto& state = *stateHolder.Release(); if (Settings.HasFullRange()) { - state.Ranges.push_back(TSerializedTableRange(Settings.GetFullRange())); + state.AddRange(TSerializedTableRange(Settings.GetFullRange())); } else { YQL_ENSURE(Settings.HasRanges()); if (Settings.GetRanges().KeyRangesSize() > 0) { YQL_ENSURE(Settings.GetRanges().KeyPointsSize() == 0); for (const auto& range : Settings.GetRanges().GetKeyRanges()) { - state.Ranges.push_back(TSerializedTableRange(range)); + state.AddRange(TSerializedTableRange(range)); } } else { for (const auto& point : Settings.GetRanges().GetKeyPoints()) { - state.Points.push_back(TSerializedCellVec(point)); + state.AddPoint(TSerializedCellVec(point)); } } } @@ -578,8 +589,18 @@ public: auto bounds = state->GetBounds(Settings.GetReverse()); size_t pointIndex = 0; + size_t rangeIndex = 0; + TVector<TSerializedTableRange> ranges; + if (state->HasRanges()) { + state->FillUnprocessedRanges(ranges, KeyColumnTypes, Settings.GetReverse()); + } - for (ui64 idx = 0, i = 0; idx < keyDesc->GetPartitions().size(); ++idx) { + TVector<TSerializedCellVec> points; + if (state->HasPoints()) { + state->FillUnprocessedPoints(points, Settings.GetReverse()); + } + + for (ui64 idx = 0; idx < keyDesc->GetPartitions().size(); ++idx) { const auto& partition = keyDesc->GetPartitions()[idx]; TTableRange partitionRange{ @@ -591,51 +612,46 @@ public: CA_LOG_D("Processing resolved ShardId# " << partition.ShardId << ", partition range: " << DebugPrintRange(KeyColumnTypes, partitionRange, tr) - << ", i: " << i << ", state ranges: " << state->Ranges.size() - << ", points: " << state->Points.size()); + << ", i: " << rangeIndex << ", state ranges: " << ranges.size() + << ", points: " << points.size()); auto newShard = MakeHolder<TShardState>(partition.ShardId); - if (((!Settings.GetReverse() && idx == 0) || (Settings.GetReverse() && idx + 1 == keyDesc->GetPartitions().size())) && state) { - newShard->AssignContinuationToken(state.Get()); - } - - if (state->Points.empty()) { - Y_ASSERT(!state->Ranges.empty()); - - for (ui64 j = i; j < state->Ranges.size(); ++j) { - CA_LOG_D("Intersect state range #" << j << " " << DebugPrintRange(KeyColumnTypes, state->Ranges[j].ToTableRange(), tr) + if (state->HasRanges()) { + for (ui64 j = rangeIndex; j < ranges.size(); ++j) { + CA_LOG_D("Intersect state range #" << j << " " << DebugPrintRange(KeyColumnTypes, ranges[j].ToTableRange(), tr) << " with partition range " << DebugPrintRange(KeyColumnTypes, partitionRange, tr)); - auto intersection = Intersect(KeyColumnTypes, partitionRange, state->Ranges[j].ToTableRange()); + auto intersection = Intersect(KeyColumnTypes, partitionRange, ranges[j].ToTableRange()); if (!intersection.IsEmptyRange(KeyColumnTypes)) { CA_LOG_D("Add range to new shardId: " << partition.ShardId << ", range: " << DebugPrintRange(KeyColumnTypes, intersection, tr)); - newShard->Ranges.emplace_back(TSerializedTableRange(intersection)); + newShard->AddRange(TSerializedTableRange(intersection)); } else { CA_LOG_D("empty intersection"); - if (j > i) { - i = j - 1; + if (j > rangeIndex) { + rangeIndex = j - 1; } break; } } - if (!newShard->Ranges.empty()) { + if (newShard->HasRanges()) { newShards.push_back(std::move(newShard)); } - } else { - while (pointIndex < state->Points.size()) { + } + if (state->HasPoints()) { + while (pointIndex < points.size()) { int intersection = ComparePointAndRange( - state->Points[pointIndex].GetCells(), + points[pointIndex].GetCells(), partitionRange, KeyColumnTypes, KeyColumnTypes); if (intersection == 0) { - newShard->Points.push_back(state->Points[pointIndex]); + newShard->AddPoint(std::move(points[pointIndex])); CA_LOG_D("Add point to new shardId: " << partition.ShardId); } if (intersection < 0) { @@ -643,14 +659,14 @@ public: } pointIndex += 1; } - if (!newShard->Points.empty()) { + if (newShard->HasPoints()) { newShards.push_back(std::move(newShard)); } } } - Counters->IteratorsReadSplits->Add(newShards.size() - 1); YQL_ENSURE(!newShards.empty()); + Counters->IteratorsReadSplits->Add(newShards.size() - 1); if (Settings.GetReverse()) { for (size_t i = 0; i < newShards.size(); ++i) { PendingShards.PushBack(newShards[i].Release()); @@ -802,6 +818,7 @@ public: << ", ranges: " << DebugPrintRanges(KeyColumnTypes, ev->Ranges, *AppData()->TypeRegistry) << ", limit: " << limit << ", readId = " << id + << ", reverse = " << record.GetReverse() << " snapshot = (txid=" << Settings.GetSnapshot().GetTxId() << ",step=" << Settings.GetSnapshot().GetStep() << ")" << " lockTxId = " << Settings.GetLockTxId()); @@ -815,6 +832,24 @@ public: Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex)); } + TString DebugPrintContionuationToken(TString s) { + NKikimrTxDataShard::TReadContinuationToken token; + Y_VERIFY(token.ParseFromString(s)); + TString lastKey = "(empty)"; + if (!token.GetLastProcessedKey().empty()) { + TStringBuilder builder; + TVector<NScheme::TTypeInfo> types; + for (auto& column : Settings.GetColumns()) { + types.push_back(NScheme::TTypeInfo((NScheme::TTypeId)column.GetType())); + } + + TSerializedCellVec row(token.GetLastProcessedKey()); + + lastKey = DebugPrintPoint(types, row.GetCells(), *AppData()->TypeRegistry); + } + return TStringBuilder() << "first request = " << token.GetFirstUnprocessedQuery() << " lastkey = " << lastKey; + } + void HandleRead(TEvDataShard::TEvReadResult::TPtr ev) { const auto& record = ev->Get()->Record; auto id = record.GetReadId(); @@ -875,7 +910,7 @@ public: CA_LOG_D(TStringBuilder() << "new data for read #" << id << " seqno = " << ev->Get()->Record.GetSeqNo() << " finished = " << ev->Get()->Record.GetFinished()); - CA_LOG_T(TStringBuilder() << "read #" << id << " pushed " << DebugPrintCells(ev->Get())); + CA_LOG_T(TStringBuilder() << "read #" << id << " pushed " << DebugPrintCells(ev->Get()) << " continuation token " << DebugPrintContionuationToken(record.GetContinuationToken())); Results.push({Reads[id].Shard->TabletId, THolder<TEventHandle<TEvDataShard::TEvReadResult>>(ev.Release())}); NotifyCA(); } diff --git a/ydb/core/kqp/ut/scan/kqp_split_ut.cpp b/ydb/core/kqp/ut/scan/kqp_split_ut.cpp index 84c6b36aeb..7ed7bcf419 100644 --- a/ydb/core/kqp/ut/scan/kqp_split_ut.cpp +++ b/ydb/core/kqp/ut/scan/kqp_split_ut.cpp @@ -372,135 +372,190 @@ Y_UNIT_TEST_SUITE(KqpSplit) { template <SortOrder OPT> \ void TTestCase##N<OPT>::Execute_(NUnitTest::TTestContext& ut_context Y_DECLARE_UNUSED) - Y_UNIT_TEST_SORT(AfterResolve, Order) { - TKikimrSettings settings; - NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(true); - settings.SetDomainRoot(KikimrDefaultUtDomainRoot); - TFeatureFlags flags; - flags.SetEnablePredicateExtractForScanQueries(true); - settings.SetFeatureFlags(flags); - settings.SetAppConfig(appConfig); + struct TTestSetup { + TTestSetup(TString table = "/Root/KeyValueLargePartition") + : Table(table) + { + TKikimrSettings settings; + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(true); + settings.SetDomainRoot(KikimrDefaultUtDomainRoot); + TFeatureFlags flags; + flags.SetEnablePredicateExtractForScanQueries(true); + settings.SetFeatureFlags(flags); + settings.SetAppConfig(appConfig); + + Kikimr.ConstructInPlace(settings); + + auto db = Kikimr->GetTableClient(); + + Server = &Kikimr->GetTestServer(); + Runtime = Server->GetRuntime(); + KqpProxy = MakeKqpProxyID(Runtime->GetNodeId(0)); - TKikimrRunner kikimr(settings); + Sender = Runtime->AllocateEdgeActor(); - auto db = kikimr.GetTableClient(); + CollectKeysTo(&CollectedKeys, Runtime, Sender); - auto& server = kikimr.GetTestServer(); - auto* runtime = server.GetRuntime(); - Y_UNUSED(runtime); - auto kqpProxy = MakeKqpProxyID(runtime->GetNodeId(0)); + SetSplitMergePartCountLimit(Runtime, -1); + } - auto sender = runtime->AllocateEdgeActor(); - auto shards = GetTableShards(&server, sender, "/Root/KeyValueLargePartition"); + TVector<ui64> Shards() { + return GetTableShards(Server, Sender, Table); + } - TVector<ui64> collectedKeys; - CollectKeysTo(&collectedKeys, runtime, sender); + void Split(ui64 shard, ui32 key) { + auto senderSplit = Runtime->AllocateEdgeActor(); + ui64 txId = AsyncSplitTable(Server, senderSplit, Table, shard, key); + WaitTxNotification(Server, senderSplit, txId); + } + + void AssertSuccess() { + auto reply = Runtime->GrabEdgeEventRethrow<TEvKqp::TEvQueryResponse>(Sender); + UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetRef().GetYdbStatus(), Ydb::StatusIds::SUCCESS); + } + void SendScanQuery(TString text) { + ::NKikimr::NKqp::NTestSuiteKqpSplit::SendScanQuery(Runtime, KqpProxy, Sender, text); + } + + TMaybe<TKikimrRunner> Kikimr; + TVector<ui64> CollectedKeys; + Tests::TServer* Server; + NActors::TTestActorRuntime* Runtime; + TActorId KqpProxy; + TActorId Sender; + + TString Table; + }; + + Y_UNIT_TEST_SORT(AfterResolve, Order) { + TTestSetup s; + + auto shards = s.Shards(); auto* shim = new TReadActorPipeCacheStub(); - InterceptReadActorPipeCache(runtime->Register(shim)); + InterceptReadActorPipeCache(s.Runtime->Register(shim)); shim->SetupCapture(0, 1); - SendScanQuery(runtime, kqpProxy, sender, "SELECT Key FROM `/Root/KeyValueLargePartition`" + OrderBy(Order)); + s.SendScanQuery("SELECT Key FROM `/Root/KeyValueLargePartition`" + OrderBy(Order)); shim->ReadsReceived.WaitI(); Cerr << "starting split -----------------------------------------------------------" << Endl; - SetSplitMergePartCountLimit(runtime, -1); - { - auto senderSplit = runtime->AllocateEdgeActor(); - ui64 txId = AsyncSplitTable(&server, senderSplit, "/Root/KeyValueLargePartition", shards.at(0), 400); - WaitTxNotification(&server, senderSplit, txId); - } + s.Split(shards.at(0), 400); Cerr << "resume evread -----------------------------------------------------------" << Endl; shim->SkipAll(); - shim->SendCaptured(runtime); + shim->SendCaptured(s.Runtime); - auto reply = runtime->GrabEdgeEventRethrow<TEvKqp::TEvQueryResponse>(sender); - UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetRef().GetYdbStatus(), Ydb::StatusIds::SUCCESS); - UNIT_ASSERT_VALUES_EQUAL(Format(Canonize(collectedKeys, Order)), ALL); + s.AssertSuccess(); + UNIT_ASSERT_VALUES_EQUAL(Format(Canonize(s.CollectedKeys, Order)), ALL); } Y_UNIT_TEST_SORT(AfterResult, Order) { - TKikimrSettings settings; - NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(true); - settings.SetDomainRoot(KikimrDefaultUtDomainRoot); - TFeatureFlags flags; - flags.SetEnablePredicateExtractForScanQueries(true); - settings.SetFeatureFlags(flags); - settings.SetAppConfig(appConfig); + TTestSetup s; + auto shards = s.Shards(); - TKikimrRunner kikimr(settings); + NKikimrTxDataShard::TEvRead evread; + evread.SetMaxRowsInResult(8); + evread.SetMaxRows(8); + InjectRangeEvReadSettings(evread); - auto db = kikimr.GetTableClient(); + NKikimrTxDataShard::TEvReadAck evreadack; + evreadack.SetMaxRows(8); + InjectRangeEvReadAckSettings(evreadack); - auto& server = kikimr.GetTestServer(); - auto* runtime = server.GetRuntime(); - Y_UNUSED(runtime); - auto kqpProxy = MakeKqpProxyID(runtime->GetNodeId(0)); + auto* shim = new TReadActorPipeCacheStub(); + shim->SetupCapture(1, 1); + shim->SetupResultsCapture(1); + InterceptReadActorPipeCache(s.Runtime->Register(shim)); + s.SendScanQuery("SELECT Key FROM `/Root/KeyValueLargePartition`" + OrderBy(Order)); - auto sender = runtime->AllocateEdgeActor(); - auto shards = GetTableShards(&server, sender, "/Root/KeyValueLargePartition"); + shim->ReadsReceived.WaitI(); + Cerr << "starting split -----------------------------------------------------------" << Endl; + s.Split(shards.at(0), 400); + Cerr << "resume evread -----------------------------------------------------------" << Endl; + shim->SkipAll(); + shim->AllowResults(); + shim->SendCaptured(s.Runtime); - TVector<ui64> collectedKeys; - CollectKeysTo(&collectedKeys, runtime, sender); + s.AssertSuccess(); + UNIT_ASSERT_VALUES_EQUAL(Format(Canonize(s.CollectedKeys, Order)), ALL); + } + const TString SegmentsResult = ",101,102,103,202,203,301,303,401,403,501,502,601,602,603,702,703,801"; + const TString SegmentsRequest = + "SELECT Key FROM `/Root/KeyValueLargePartition` where \ + (Key >= 101 and Key <= 103) \ + or (Key >= 202 and Key <= 301) \ + or (Key >= 303 and Key <= 401) \ + or (Key >= 403 and Key <= 502) \ + or (Key >= 601 and Key <= 603) \ + or (Key >= 702 and Key <= 801) \ + "; + + Y_UNIT_TEST_SORT(AfterResultMultiRange, Order) { + TTestSetup s; NKikimrTxDataShard::TEvRead evread; - evread.SetMaxRowsInResult(8); - evread.SetMaxRows(8); + evread.SetMaxRowsInResult(5); + evread.SetMaxRows(5); InjectRangeEvReadSettings(evread); + auto shards = s.Shards(); + NKikimrTxDataShard::TEvReadAck evreadack; - evreadack.SetMaxRows(8); + evreadack.SetMaxRows(5); InjectRangeEvReadAckSettings(evreadack); auto* shim = new TReadActorPipeCacheStub(); shim->SetupCapture(1, 1); shim->SetupResultsCapture(1); - InterceptReadActorPipeCache(runtime->Register(shim)); - SendScanQuery(runtime, kqpProxy, sender, "SELECT Key FROM `/Root/KeyValueLargePartition`" + OrderBy(Order)); + InterceptReadActorPipeCache(s.Runtime->Register(shim)); + s.SendScanQuery(SegmentsRequest + OrderBy(Order)); shim->ReadsReceived.WaitI(); Cerr << "starting split -----------------------------------------------------------" << Endl; - SetSplitMergePartCountLimit(runtime, -1); - { - auto senderSplit = runtime->AllocateEdgeActor(); - ui64 txId = AsyncSplitTable(&server, senderSplit, "/Root/KeyValueLargePartition", shards.at(0), 400); - WaitTxNotification(&server, senderSplit, txId); - } + s.Split(shards.at(0), 404); Cerr << "resume evread -----------------------------------------------------------" << Endl; shim->SkipAll(); shim->AllowResults(); - shim->SendCaptured(runtime); + shim->SendCaptured(s.Runtime); - auto reply = runtime->GrabEdgeEventRethrow<TEvKqp::TEvQueryResponse>(sender); - UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetRef().GetYdbStatus(), Ydb::StatusIds::SUCCESS); - UNIT_ASSERT_VALUES_EQUAL(Format(Canonize(collectedKeys, Order)), ALL); + s.AssertSuccess(); + UNIT_ASSERT_VALUES_EQUAL(Format(Canonize(s.CollectedKeys, Order)), SegmentsResult); } - Y_UNIT_TEST_SORT(ChoosePartition, Order) { - TKikimrSettings settings; - NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(true); - settings.SetDomainRoot(KikimrDefaultUtDomainRoot); - TFeatureFlags flags; - flags.SetEnablePredicateExtractForScanQueries(true); - settings.SetFeatureFlags(flags); - settings.SetAppConfig(appConfig); + Y_UNIT_TEST_SORT(AfterResultMultiRangeSegmentPartition, Order) { + TTestSetup s; + auto shards = s.Shards(); - TKikimrRunner kikimr(settings); + NKikimrTxDataShard::TEvRead evread; + evread.SetMaxRowsInResult(5); + evread.SetMaxRows(5); + InjectRangeEvReadSettings(evread); + + NKikimrTxDataShard::TEvReadAck evreadack; + evreadack.SetMaxRows(5); + InjectRangeEvReadAckSettings(evreadack); - auto db = kikimr.GetTableClient(); + auto* shim = new TReadActorPipeCacheStub(); + shim->SetupCapture(1, 1); + shim->SetupResultsCapture(1); + InterceptReadActorPipeCache(s.Runtime->Register(shim)); + s.SendScanQuery(SegmentsRequest + OrderBy(Order)); - auto& server = kikimr.GetTestServer(); - auto* runtime = server.GetRuntime(); - Y_UNUSED(runtime); - auto kqpProxy = MakeKqpProxyID(runtime->GetNodeId(0)); + shim->ReadsReceived.WaitI(); + Cerr << "starting split -----------------------------------------------------------" << Endl; + s.Split(shards.at(0), 501); + Cerr << "resume evread -----------------------------------------------------------" << Endl; + shim->SkipAll(); + shim->AllowResults(); + shim->SendCaptured(s.Runtime); - auto sender = runtime->AllocateEdgeActor(); - auto shards = GetTableShards(&server, sender, "/Root/KeyValueLargePartition"); + s.AssertSuccess(); + UNIT_ASSERT_VALUES_EQUAL(Format(Canonize(s.CollectedKeys, Order)), SegmentsResult); + } - TVector<ui64> collectedKeys; - CollectKeysTo(&collectedKeys, runtime, sender); + Y_UNIT_TEST_SORT(ChoosePartition, Order) { + TTestSetup s; + auto shards = s.Shards(); NKikimrTxDataShard::TEvRead evread; evread.SetMaxRowsInResult(8); @@ -514,52 +569,25 @@ Y_UNIT_TEST_SUITE(KqpSplit) { auto* shim = new TReadActorPipeCacheStub(); shim->SetupCapture(2, 1); shim->SetupResultsCapture(2); - InterceptReadActorPipeCache(runtime->Register(shim)); - SendScanQuery(runtime, kqpProxy, sender, "SELECT Key FROM `/Root/KeyValueLargePartition`" + OrderBy(Order)); + InterceptReadActorPipeCache(s.Runtime->Register(shim)); + s.SendScanQuery("SELECT Key FROM `/Root/KeyValueLargePartition`" + OrderBy(Order)); shim->ReadsReceived.WaitI(); Cerr << "starting split -----------------------------------------------------------" << Endl; - SetSplitMergePartCountLimit(runtime, -1); - { - auto senderSplit = runtime->AllocateEdgeActor(); - ui64 txId = AsyncSplitTable(&server, senderSplit, "/Root/KeyValueLargePartition", shards.at(0), 400); - WaitTxNotification(&server, senderSplit, txId); - } + s.Split(shards.at(0), 400); Cerr << "resume evread -----------------------------------------------------------" << Endl; shim->SkipAll(); shim->AllowResults(); - shim->SendCaptured(runtime); + shim->SendCaptured(s.Runtime); - auto reply = runtime->GrabEdgeEventRethrow<TEvKqp::TEvQueryResponse>(sender); - UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetRef().GetYdbStatus(), Ydb::StatusIds::SUCCESS); - UNIT_ASSERT_VALUES_EQUAL(Format(Canonize(collectedKeys, Order)), ALL); + s.AssertSuccess(); + UNIT_ASSERT_VALUES_EQUAL(Format(Canonize(s.CollectedKeys, Order)), ALL); } Y_UNIT_TEST_SORT(BorderKeys, Order) { - TKikimrSettings settings; - NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(true); - settings.SetDomainRoot(KikimrDefaultUtDomainRoot); - TFeatureFlags flags; - flags.SetEnablePredicateExtractForScanQueries(true); - settings.SetFeatureFlags(flags); - settings.SetAppConfig(appConfig); - - TKikimrRunner kikimr(settings); - - auto db = kikimr.GetTableClient(); - - auto& server = kikimr.GetTestServer(); - auto* runtime = server.GetRuntime(); - Y_UNUSED(runtime); - auto kqpProxy = MakeKqpProxyID(runtime->GetNodeId(0)); - - auto sender = runtime->AllocateEdgeActor(); - auto shards = GetTableShards(&server, sender, "/Root/KeyValueLargePartition"); - - TVector<ui64> collectedKeys; - CollectKeysTo(&collectedKeys, runtime, sender); + TTestSetup s; + auto shards = s.Shards(); NKikimrTxDataShard::TEvRead evread; evread.SetMaxRowsInResult(12); @@ -573,78 +601,44 @@ Y_UNIT_TEST_SUITE(KqpSplit) { auto* shim = new TReadActorPipeCacheStub(); shim->SetupCapture(1, 1); shim->SetupResultsCapture(1); - InterceptReadActorPipeCache(runtime->Register(shim)); - SendScanQuery(runtime, kqpProxy, sender, "SELECT Key FROM `/Root/KeyValueLargePartition`" + OrderBy(Order)); + InterceptReadActorPipeCache(s.Runtime->Register(shim)); + s.SendScanQuery("SELECT Key FROM `/Root/KeyValueLargePartition`" + OrderBy(Order)); shim->ReadsReceived.WaitI(); Cerr << "starting split -----------------------------------------------------------" << Endl; - SetSplitMergePartCountLimit(runtime, -1); - { - auto senderSplit = runtime->AllocateEdgeActor(); - ui64 txId = AsyncSplitTable(&server, senderSplit, "/Root/KeyValueLargePartition", shards.at(0), 402); - WaitTxNotification(&server, senderSplit, txId); - shards = GetTableShards(&server, sender, "/Root/KeyValueLargePartition"); + s.Split(shards.at(0), 402); + shards = s.Shards(); + s.Split(shards.at(1), 404); - txId = AsyncSplitTable(&server, senderSplit, "/Root/KeyValueLargePartition", shards.at(1), 404); - WaitTxNotification(&server, senderSplit, txId); - } Cerr << "resume evread -----------------------------------------------------------" << Endl; shim->SkipAll(); shim->AllowResults(); - shim->SendCaptured(runtime); + shim->SendCaptured(s.Runtime); - auto reply = runtime->GrabEdgeEventRethrow<TEvKqp::TEvQueryResponse>(sender); - UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetRef().GetYdbStatus(), Ydb::StatusIds::SUCCESS); - UNIT_ASSERT_VALUES_EQUAL(Format(Canonize(collectedKeys, Order)), ALL); + s.AssertSuccess(); + UNIT_ASSERT_VALUES_EQUAL(Format(Canonize(s.CollectedKeys, Order)), ALL); } Y_UNIT_TEST_SORT(AfterResolvePoints, Order) { - TKikimrSettings settings; - NKikimrConfig::TAppConfig appConfig; - appConfig.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(true); - settings.SetDomainRoot(KikimrDefaultUtDomainRoot); - TFeatureFlags flags; - flags.SetEnablePredicateExtractForScanQueries(true); - settings.SetFeatureFlags(flags); - settings.SetAppConfig(appConfig); - - TKikimrRunner kikimr(settings); - - auto db = kikimr.GetTableClient(); - - auto& server = kikimr.GetTestServer(); - auto* runtime = server.GetRuntime(); - Y_UNUSED(runtime); - auto kqpProxy = MakeKqpProxyID(runtime->GetNodeId(0)); - - auto sender = runtime->AllocateEdgeActor(); - auto shards = GetTableShards(&server, sender, "/Root/KeyValueLargePartition"); - - TVector<ui64> collectedKeys; - CollectKeysTo(&collectedKeys, runtime, sender); + TTestSetup s; + auto shards = s.Shards(); auto* shim = new TReadActorPipeCacheStub(); - InterceptReadActorPipeCache(runtime->Register(shim)); + InterceptReadActorPipeCache(s.Runtime->Register(shim)); shim->SetupCapture(0, 5); - SendScanQuery(runtime, kqpProxy, sender, + s.SendScanQuery( "PRAGMA Kikimr.OptEnablePredicateExtract=\"false\"; SELECT Key FROM `/Root/KeyValueLargePartition` where Key in (103, 302, 402, 502, 703)" + OrderBy(Order)); shim->ReadsReceived.WaitI(); Cerr << "starting split -----------------------------------------------------------" << Endl; - SetSplitMergePartCountLimit(runtime, -1); - { - auto senderSplit = runtime->AllocateEdgeActor(); - ui64 txId = AsyncSplitTable(&server, senderSplit, "/Root/KeyValueLargePartition", shards.at(0), 400); - WaitTxNotification(&server, senderSplit, txId); - } + s.Split(shards.at(0), 400); Cerr << "resume evread -----------------------------------------------------------" << Endl; shim->SkipAll(); - shim->SendCaptured(runtime); + shim->SendCaptured(s.Runtime); - auto reply = runtime->GrabEdgeEventRethrow<TEvKqp::TEvQueryResponse>(sender); - UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetRef().GetYdbStatus(), Ydb::StatusIds::SUCCESS); - UNIT_ASSERT_VALUES_EQUAL(Format(Canonize(collectedKeys, Order)), ",103,302,402,502,703"); + s.AssertSuccess(); + UNIT_ASSERT_VALUES_EQUAL(Format(Canonize(s.CollectedKeys, Order)), ",103,302,402,502,703"); } } |