aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorssmike <ssmike@ydb.tech>2023-03-07 20:11:10 +0300
committerssmike <ssmike@ydb.tech>2023-03-07 20:11:10 +0300
commit66db41e47f76c3f2902855ca1c84509f35e42ff2 (patch)
treea5173658f5bcdd9cefbcc6e5223edb54e2a1b98b
parent4535e7738c8863c638701ec4f40badae92a73b98 (diff)
downloadydb-66db41e47f76c3f2902855ca1c84509f35e42ff2.tar.gz
Fix split handling in multi-range requests
-rw-r--r--ydb/core/kqp/runtime/kqp_read_actor.cpp113
-rw-r--r--ydb/core/kqp/ut/scan/kqp_split_ut.cpp330
2 files changed, 236 insertions, 207 deletions
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp
index 160b43152e..b6d0e78d0e 100644
--- a/ydb/core/kqp/runtime/kqp_read_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp
@@ -94,8 +94,6 @@ public:
};
struct TShardState : public TIntrusiveListItem<TShardState> {
- TSmallVec<TSerializedTableRange> Ranges;
- TSmallVec<TSerializedCellVec> Points;
TOwnedCellVec LastKey;
TMaybe<ui32> FirstUnprocessedRequest;
@@ -107,13 +105,6 @@ public:
size_t ResolveAttempt = 0;
size_t RetryAttempt = 0;
- void AssignContinuationToken(TShardState* state) {
- if (state->LastKey.DataSize() != 0) {
- LastKey = std::move(state->LastKey);
- }
- FirstUnprocessedRequest = state->FirstUnprocessedRequest;
- }
-
TShardState(ui64 tabletId)
: TabletId(tabletId)
{
@@ -247,9 +238,6 @@ public:
// And push all others
result.insert(result.end(), rangeIt, Ranges.end());
}
- for (auto& range : result) {
- MakePrefixRange(range, keyTypes.size());
- }
}
void FillUnprocessedPoints(TVector<TSerializedCellVec>& result, bool reverse) const {
@@ -267,6 +255,9 @@ public:
FillUnprocessedPoints(ev.Keys, reversed);
} else {
FillUnprocessedRanges(ev.Ranges, keyTypes, reversed);
+ for (auto& range : ev.Ranges) {
+ MakePrefixRange(range, keyTypes.size());
+ }
}
}
@@ -291,6 +282,26 @@ public:
}
return DebugPrintPoint(keyTypes, LastKey, *AppData()->TypeRegistry);
}
+
+ bool HasRanges() {
+ return !Ranges.empty();
+ }
+
+ bool HasPoints() {
+ return !Points.empty();
+ }
+
+ void AddRange(TSerializedTableRange&& range) {
+ Ranges.push_back(std::move(range));
+ }
+
+ void AddPoint(TSerializedCellVec&& point) {
+ Points.push_back(std::move(point));
+ }
+
+ private:
+ TSmallVec<TSerializedTableRange> Ranges;
+ TSmallVec<TSerializedCellVec> Points;
};
using TShardQueue = TIntrusiveListWithAutoDelete<TShardState, TDelete>;
@@ -410,17 +421,17 @@ public:
auto& state = *stateHolder.Release();
if (Settings.HasFullRange()) {
- state.Ranges.push_back(TSerializedTableRange(Settings.GetFullRange()));
+ state.AddRange(TSerializedTableRange(Settings.GetFullRange()));
} else {
YQL_ENSURE(Settings.HasRanges());
if (Settings.GetRanges().KeyRangesSize() > 0) {
YQL_ENSURE(Settings.GetRanges().KeyPointsSize() == 0);
for (const auto& range : Settings.GetRanges().GetKeyRanges()) {
- state.Ranges.push_back(TSerializedTableRange(range));
+ state.AddRange(TSerializedTableRange(range));
}
} else {
for (const auto& point : Settings.GetRanges().GetKeyPoints()) {
- state.Points.push_back(TSerializedCellVec(point));
+ state.AddPoint(TSerializedCellVec(point));
}
}
}
@@ -578,8 +589,18 @@ public:
auto bounds = state->GetBounds(Settings.GetReverse());
size_t pointIndex = 0;
+ size_t rangeIndex = 0;
+ TVector<TSerializedTableRange> ranges;
+ if (state->HasRanges()) {
+ state->FillUnprocessedRanges(ranges, KeyColumnTypes, Settings.GetReverse());
+ }
- for (ui64 idx = 0, i = 0; idx < keyDesc->GetPartitions().size(); ++idx) {
+ TVector<TSerializedCellVec> points;
+ if (state->HasPoints()) {
+ state->FillUnprocessedPoints(points, Settings.GetReverse());
+ }
+
+ for (ui64 idx = 0; idx < keyDesc->GetPartitions().size(); ++idx) {
const auto& partition = keyDesc->GetPartitions()[idx];
TTableRange partitionRange{
@@ -591,51 +612,46 @@ public:
CA_LOG_D("Processing resolved ShardId# " << partition.ShardId
<< ", partition range: " << DebugPrintRange(KeyColumnTypes, partitionRange, tr)
- << ", i: " << i << ", state ranges: " << state->Ranges.size()
- << ", points: " << state->Points.size());
+ << ", i: " << rangeIndex << ", state ranges: " << ranges.size()
+ << ", points: " << points.size());
auto newShard = MakeHolder<TShardState>(partition.ShardId);
- if (((!Settings.GetReverse() && idx == 0) || (Settings.GetReverse() && idx + 1 == keyDesc->GetPartitions().size())) && state) {
- newShard->AssignContinuationToken(state.Get());
- }
-
- if (state->Points.empty()) {
- Y_ASSERT(!state->Ranges.empty());
-
- for (ui64 j = i; j < state->Ranges.size(); ++j) {
- CA_LOG_D("Intersect state range #" << j << " " << DebugPrintRange(KeyColumnTypes, state->Ranges[j].ToTableRange(), tr)
+ if (state->HasRanges()) {
+ for (ui64 j = rangeIndex; j < ranges.size(); ++j) {
+ CA_LOG_D("Intersect state range #" << j << " " << DebugPrintRange(KeyColumnTypes, ranges[j].ToTableRange(), tr)
<< " with partition range " << DebugPrintRange(KeyColumnTypes, partitionRange, tr));
- auto intersection = Intersect(KeyColumnTypes, partitionRange, state->Ranges[j].ToTableRange());
+ auto intersection = Intersect(KeyColumnTypes, partitionRange, ranges[j].ToTableRange());
if (!intersection.IsEmptyRange(KeyColumnTypes)) {
CA_LOG_D("Add range to new shardId: " << partition.ShardId
<< ", range: " << DebugPrintRange(KeyColumnTypes, intersection, tr));
- newShard->Ranges.emplace_back(TSerializedTableRange(intersection));
+ newShard->AddRange(TSerializedTableRange(intersection));
} else {
CA_LOG_D("empty intersection");
- if (j > i) {
- i = j - 1;
+ if (j > rangeIndex) {
+ rangeIndex = j - 1;
}
break;
}
}
- if (!newShard->Ranges.empty()) {
+ if (newShard->HasRanges()) {
newShards.push_back(std::move(newShard));
}
- } else {
- while (pointIndex < state->Points.size()) {
+ }
+ if (state->HasPoints()) {
+ while (pointIndex < points.size()) {
int intersection = ComparePointAndRange(
- state->Points[pointIndex].GetCells(),
+ points[pointIndex].GetCells(),
partitionRange,
KeyColumnTypes,
KeyColumnTypes);
if (intersection == 0) {
- newShard->Points.push_back(state->Points[pointIndex]);
+ newShard->AddPoint(std::move(points[pointIndex]));
CA_LOG_D("Add point to new shardId: " << partition.ShardId);
}
if (intersection < 0) {
@@ -643,14 +659,14 @@ public:
}
pointIndex += 1;
}
- if (!newShard->Points.empty()) {
+ if (newShard->HasPoints()) {
newShards.push_back(std::move(newShard));
}
}
}
- Counters->IteratorsReadSplits->Add(newShards.size() - 1);
YQL_ENSURE(!newShards.empty());
+ Counters->IteratorsReadSplits->Add(newShards.size() - 1);
if (Settings.GetReverse()) {
for (size_t i = 0; i < newShards.size(); ++i) {
PendingShards.PushBack(newShards[i].Release());
@@ -802,6 +818,7 @@ public:
<< ", ranges: " << DebugPrintRanges(KeyColumnTypes, ev->Ranges, *AppData()->TypeRegistry)
<< ", limit: " << limit
<< ", readId = " << id
+ << ", reverse = " << record.GetReverse()
<< " snapshot = (txid=" << Settings.GetSnapshot().GetTxId() << ",step=" << Settings.GetSnapshot().GetStep() << ")"
<< " lockTxId = " << Settings.GetLockTxId());
@@ -815,6 +832,24 @@ public:
Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex));
}
+ TString DebugPrintContionuationToken(TString s) {
+ NKikimrTxDataShard::TReadContinuationToken token;
+ Y_VERIFY(token.ParseFromString(s));
+ TString lastKey = "(empty)";
+ if (!token.GetLastProcessedKey().empty()) {
+ TStringBuilder builder;
+ TVector<NScheme::TTypeInfo> types;
+ for (auto& column : Settings.GetColumns()) {
+ types.push_back(NScheme::TTypeInfo((NScheme::TTypeId)column.GetType()));
+ }
+
+ TSerializedCellVec row(token.GetLastProcessedKey());
+
+ lastKey = DebugPrintPoint(types, row.GetCells(), *AppData()->TypeRegistry);
+ }
+ return TStringBuilder() << "first request = " << token.GetFirstUnprocessedQuery() << " lastkey = " << lastKey;
+ }
+
void HandleRead(TEvDataShard::TEvReadResult::TPtr ev) {
const auto& record = ev->Get()->Record;
auto id = record.GetReadId();
@@ -875,7 +910,7 @@ public:
CA_LOG_D(TStringBuilder() << "new data for read #" << id
<< " seqno = " << ev->Get()->Record.GetSeqNo()
<< " finished = " << ev->Get()->Record.GetFinished());
- CA_LOG_T(TStringBuilder() << "read #" << id << " pushed " << DebugPrintCells(ev->Get()));
+ CA_LOG_T(TStringBuilder() << "read #" << id << " pushed " << DebugPrintCells(ev->Get()) << " continuation token " << DebugPrintContionuationToken(record.GetContinuationToken()));
Results.push({Reads[id].Shard->TabletId, THolder<TEventHandle<TEvDataShard::TEvReadResult>>(ev.Release())});
NotifyCA();
}
diff --git a/ydb/core/kqp/ut/scan/kqp_split_ut.cpp b/ydb/core/kqp/ut/scan/kqp_split_ut.cpp
index 84c6b36aeb..7ed7bcf419 100644
--- a/ydb/core/kqp/ut/scan/kqp_split_ut.cpp
+++ b/ydb/core/kqp/ut/scan/kqp_split_ut.cpp
@@ -372,135 +372,190 @@ Y_UNIT_TEST_SUITE(KqpSplit) {
template <SortOrder OPT> \
void TTestCase##N<OPT>::Execute_(NUnitTest::TTestContext& ut_context Y_DECLARE_UNUSED)
- Y_UNIT_TEST_SORT(AfterResolve, Order) {
- TKikimrSettings settings;
- NKikimrConfig::TAppConfig appConfig;
- appConfig.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(true);
- settings.SetDomainRoot(KikimrDefaultUtDomainRoot);
- TFeatureFlags flags;
- flags.SetEnablePredicateExtractForScanQueries(true);
- settings.SetFeatureFlags(flags);
- settings.SetAppConfig(appConfig);
+ struct TTestSetup {
+ TTestSetup(TString table = "/Root/KeyValueLargePartition")
+ : Table(table)
+ {
+ TKikimrSettings settings;
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(true);
+ settings.SetDomainRoot(KikimrDefaultUtDomainRoot);
+ TFeatureFlags flags;
+ flags.SetEnablePredicateExtractForScanQueries(true);
+ settings.SetFeatureFlags(flags);
+ settings.SetAppConfig(appConfig);
+
+ Kikimr.ConstructInPlace(settings);
+
+ auto db = Kikimr->GetTableClient();
+
+ Server = &Kikimr->GetTestServer();
+ Runtime = Server->GetRuntime();
+ KqpProxy = MakeKqpProxyID(Runtime->GetNodeId(0));
- TKikimrRunner kikimr(settings);
+ Sender = Runtime->AllocateEdgeActor();
- auto db = kikimr.GetTableClient();
+ CollectKeysTo(&CollectedKeys, Runtime, Sender);
- auto& server = kikimr.GetTestServer();
- auto* runtime = server.GetRuntime();
- Y_UNUSED(runtime);
- auto kqpProxy = MakeKqpProxyID(runtime->GetNodeId(0));
+ SetSplitMergePartCountLimit(Runtime, -1);
+ }
- auto sender = runtime->AllocateEdgeActor();
- auto shards = GetTableShards(&server, sender, "/Root/KeyValueLargePartition");
+ TVector<ui64> Shards() {
+ return GetTableShards(Server, Sender, Table);
+ }
- TVector<ui64> collectedKeys;
- CollectKeysTo(&collectedKeys, runtime, sender);
+ void Split(ui64 shard, ui32 key) {
+ auto senderSplit = Runtime->AllocateEdgeActor();
+ ui64 txId = AsyncSplitTable(Server, senderSplit, Table, shard, key);
+ WaitTxNotification(Server, senderSplit, txId);
+ }
+
+ void AssertSuccess() {
+ auto reply = Runtime->GrabEdgeEventRethrow<TEvKqp::TEvQueryResponse>(Sender);
+ UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetRef().GetYdbStatus(), Ydb::StatusIds::SUCCESS);
+ }
+ void SendScanQuery(TString text) {
+ ::NKikimr::NKqp::NTestSuiteKqpSplit::SendScanQuery(Runtime, KqpProxy, Sender, text);
+ }
+
+ TMaybe<TKikimrRunner> Kikimr;
+ TVector<ui64> CollectedKeys;
+ Tests::TServer* Server;
+ NActors::TTestActorRuntime* Runtime;
+ TActorId KqpProxy;
+ TActorId Sender;
+
+ TString Table;
+ };
+
+ Y_UNIT_TEST_SORT(AfterResolve, Order) {
+ TTestSetup s;
+
+ auto shards = s.Shards();
auto* shim = new TReadActorPipeCacheStub();
- InterceptReadActorPipeCache(runtime->Register(shim));
+ InterceptReadActorPipeCache(s.Runtime->Register(shim));
shim->SetupCapture(0, 1);
- SendScanQuery(runtime, kqpProxy, sender, "SELECT Key FROM `/Root/KeyValueLargePartition`" + OrderBy(Order));
+ s.SendScanQuery("SELECT Key FROM `/Root/KeyValueLargePartition`" + OrderBy(Order));
shim->ReadsReceived.WaitI();
Cerr << "starting split -----------------------------------------------------------" << Endl;
- SetSplitMergePartCountLimit(runtime, -1);
- {
- auto senderSplit = runtime->AllocateEdgeActor();
- ui64 txId = AsyncSplitTable(&server, senderSplit, "/Root/KeyValueLargePartition", shards.at(0), 400);
- WaitTxNotification(&server, senderSplit, txId);
- }
+ s.Split(shards.at(0), 400);
Cerr << "resume evread -----------------------------------------------------------" << Endl;
shim->SkipAll();
- shim->SendCaptured(runtime);
+ shim->SendCaptured(s.Runtime);
- auto reply = runtime->GrabEdgeEventRethrow<TEvKqp::TEvQueryResponse>(sender);
- UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetRef().GetYdbStatus(), Ydb::StatusIds::SUCCESS);
- UNIT_ASSERT_VALUES_EQUAL(Format(Canonize(collectedKeys, Order)), ALL);
+ s.AssertSuccess();
+ UNIT_ASSERT_VALUES_EQUAL(Format(Canonize(s.CollectedKeys, Order)), ALL);
}
Y_UNIT_TEST_SORT(AfterResult, Order) {
- TKikimrSettings settings;
- NKikimrConfig::TAppConfig appConfig;
- appConfig.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(true);
- settings.SetDomainRoot(KikimrDefaultUtDomainRoot);
- TFeatureFlags flags;
- flags.SetEnablePredicateExtractForScanQueries(true);
- settings.SetFeatureFlags(flags);
- settings.SetAppConfig(appConfig);
+ TTestSetup s;
+ auto shards = s.Shards();
- TKikimrRunner kikimr(settings);
+ NKikimrTxDataShard::TEvRead evread;
+ evread.SetMaxRowsInResult(8);
+ evread.SetMaxRows(8);
+ InjectRangeEvReadSettings(evread);
- auto db = kikimr.GetTableClient();
+ NKikimrTxDataShard::TEvReadAck evreadack;
+ evreadack.SetMaxRows(8);
+ InjectRangeEvReadAckSettings(evreadack);
- auto& server = kikimr.GetTestServer();
- auto* runtime = server.GetRuntime();
- Y_UNUSED(runtime);
- auto kqpProxy = MakeKqpProxyID(runtime->GetNodeId(0));
+ auto* shim = new TReadActorPipeCacheStub();
+ shim->SetupCapture(1, 1);
+ shim->SetupResultsCapture(1);
+ InterceptReadActorPipeCache(s.Runtime->Register(shim));
+ s.SendScanQuery("SELECT Key FROM `/Root/KeyValueLargePartition`" + OrderBy(Order));
- auto sender = runtime->AllocateEdgeActor();
- auto shards = GetTableShards(&server, sender, "/Root/KeyValueLargePartition");
+ shim->ReadsReceived.WaitI();
+ Cerr << "starting split -----------------------------------------------------------" << Endl;
+ s.Split(shards.at(0), 400);
+ Cerr << "resume evread -----------------------------------------------------------" << Endl;
+ shim->SkipAll();
+ shim->AllowResults();
+ shim->SendCaptured(s.Runtime);
- TVector<ui64> collectedKeys;
- CollectKeysTo(&collectedKeys, runtime, sender);
+ s.AssertSuccess();
+ UNIT_ASSERT_VALUES_EQUAL(Format(Canonize(s.CollectedKeys, Order)), ALL);
+ }
+ const TString SegmentsResult = ",101,102,103,202,203,301,303,401,403,501,502,601,602,603,702,703,801";
+ const TString SegmentsRequest =
+ "SELECT Key FROM `/Root/KeyValueLargePartition` where \
+ (Key >= 101 and Key <= 103) \
+ or (Key >= 202 and Key <= 301) \
+ or (Key >= 303 and Key <= 401) \
+ or (Key >= 403 and Key <= 502) \
+ or (Key >= 601 and Key <= 603) \
+ or (Key >= 702 and Key <= 801) \
+ ";
+
+ Y_UNIT_TEST_SORT(AfterResultMultiRange, Order) {
+ TTestSetup s;
NKikimrTxDataShard::TEvRead evread;
- evread.SetMaxRowsInResult(8);
- evread.SetMaxRows(8);
+ evread.SetMaxRowsInResult(5);
+ evread.SetMaxRows(5);
InjectRangeEvReadSettings(evread);
+ auto shards = s.Shards();
+
NKikimrTxDataShard::TEvReadAck evreadack;
- evreadack.SetMaxRows(8);
+ evreadack.SetMaxRows(5);
InjectRangeEvReadAckSettings(evreadack);
auto* shim = new TReadActorPipeCacheStub();
shim->SetupCapture(1, 1);
shim->SetupResultsCapture(1);
- InterceptReadActorPipeCache(runtime->Register(shim));
- SendScanQuery(runtime, kqpProxy, sender, "SELECT Key FROM `/Root/KeyValueLargePartition`" + OrderBy(Order));
+ InterceptReadActorPipeCache(s.Runtime->Register(shim));
+ s.SendScanQuery(SegmentsRequest + OrderBy(Order));
shim->ReadsReceived.WaitI();
Cerr << "starting split -----------------------------------------------------------" << Endl;
- SetSplitMergePartCountLimit(runtime, -1);
- {
- auto senderSplit = runtime->AllocateEdgeActor();
- ui64 txId = AsyncSplitTable(&server, senderSplit, "/Root/KeyValueLargePartition", shards.at(0), 400);
- WaitTxNotification(&server, senderSplit, txId);
- }
+ s.Split(shards.at(0), 404);
Cerr << "resume evread -----------------------------------------------------------" << Endl;
shim->SkipAll();
shim->AllowResults();
- shim->SendCaptured(runtime);
+ shim->SendCaptured(s.Runtime);
- auto reply = runtime->GrabEdgeEventRethrow<TEvKqp::TEvQueryResponse>(sender);
- UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetRef().GetYdbStatus(), Ydb::StatusIds::SUCCESS);
- UNIT_ASSERT_VALUES_EQUAL(Format(Canonize(collectedKeys, Order)), ALL);
+ s.AssertSuccess();
+ UNIT_ASSERT_VALUES_EQUAL(Format(Canonize(s.CollectedKeys, Order)), SegmentsResult);
}
- Y_UNIT_TEST_SORT(ChoosePartition, Order) {
- TKikimrSettings settings;
- NKikimrConfig::TAppConfig appConfig;
- appConfig.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(true);
- settings.SetDomainRoot(KikimrDefaultUtDomainRoot);
- TFeatureFlags flags;
- flags.SetEnablePredicateExtractForScanQueries(true);
- settings.SetFeatureFlags(flags);
- settings.SetAppConfig(appConfig);
+ Y_UNIT_TEST_SORT(AfterResultMultiRangeSegmentPartition, Order) {
+ TTestSetup s;
+ auto shards = s.Shards();
- TKikimrRunner kikimr(settings);
+ NKikimrTxDataShard::TEvRead evread;
+ evread.SetMaxRowsInResult(5);
+ evread.SetMaxRows(5);
+ InjectRangeEvReadSettings(evread);
+
+ NKikimrTxDataShard::TEvReadAck evreadack;
+ evreadack.SetMaxRows(5);
+ InjectRangeEvReadAckSettings(evreadack);
- auto db = kikimr.GetTableClient();
+ auto* shim = new TReadActorPipeCacheStub();
+ shim->SetupCapture(1, 1);
+ shim->SetupResultsCapture(1);
+ InterceptReadActorPipeCache(s.Runtime->Register(shim));
+ s.SendScanQuery(SegmentsRequest + OrderBy(Order));
- auto& server = kikimr.GetTestServer();
- auto* runtime = server.GetRuntime();
- Y_UNUSED(runtime);
- auto kqpProxy = MakeKqpProxyID(runtime->GetNodeId(0));
+ shim->ReadsReceived.WaitI();
+ Cerr << "starting split -----------------------------------------------------------" << Endl;
+ s.Split(shards.at(0), 501);
+ Cerr << "resume evread -----------------------------------------------------------" << Endl;
+ shim->SkipAll();
+ shim->AllowResults();
+ shim->SendCaptured(s.Runtime);
- auto sender = runtime->AllocateEdgeActor();
- auto shards = GetTableShards(&server, sender, "/Root/KeyValueLargePartition");
+ s.AssertSuccess();
+ UNIT_ASSERT_VALUES_EQUAL(Format(Canonize(s.CollectedKeys, Order)), SegmentsResult);
+ }
- TVector<ui64> collectedKeys;
- CollectKeysTo(&collectedKeys, runtime, sender);
+ Y_UNIT_TEST_SORT(ChoosePartition, Order) {
+ TTestSetup s;
+ auto shards = s.Shards();
NKikimrTxDataShard::TEvRead evread;
evread.SetMaxRowsInResult(8);
@@ -514,52 +569,25 @@ Y_UNIT_TEST_SUITE(KqpSplit) {
auto* shim = new TReadActorPipeCacheStub();
shim->SetupCapture(2, 1);
shim->SetupResultsCapture(2);
- InterceptReadActorPipeCache(runtime->Register(shim));
- SendScanQuery(runtime, kqpProxy, sender, "SELECT Key FROM `/Root/KeyValueLargePartition`" + OrderBy(Order));
+ InterceptReadActorPipeCache(s.Runtime->Register(shim));
+ s.SendScanQuery("SELECT Key FROM `/Root/KeyValueLargePartition`" + OrderBy(Order));
shim->ReadsReceived.WaitI();
Cerr << "starting split -----------------------------------------------------------" << Endl;
- SetSplitMergePartCountLimit(runtime, -1);
- {
- auto senderSplit = runtime->AllocateEdgeActor();
- ui64 txId = AsyncSplitTable(&server, senderSplit, "/Root/KeyValueLargePartition", shards.at(0), 400);
- WaitTxNotification(&server, senderSplit, txId);
- }
+ s.Split(shards.at(0), 400);
Cerr << "resume evread -----------------------------------------------------------" << Endl;
shim->SkipAll();
shim->AllowResults();
- shim->SendCaptured(runtime);
+ shim->SendCaptured(s.Runtime);
- auto reply = runtime->GrabEdgeEventRethrow<TEvKqp::TEvQueryResponse>(sender);
- UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetRef().GetYdbStatus(), Ydb::StatusIds::SUCCESS);
- UNIT_ASSERT_VALUES_EQUAL(Format(Canonize(collectedKeys, Order)), ALL);
+ s.AssertSuccess();
+ UNIT_ASSERT_VALUES_EQUAL(Format(Canonize(s.CollectedKeys, Order)), ALL);
}
Y_UNIT_TEST_SORT(BorderKeys, Order) {
- TKikimrSettings settings;
- NKikimrConfig::TAppConfig appConfig;
- appConfig.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(true);
- settings.SetDomainRoot(KikimrDefaultUtDomainRoot);
- TFeatureFlags flags;
- flags.SetEnablePredicateExtractForScanQueries(true);
- settings.SetFeatureFlags(flags);
- settings.SetAppConfig(appConfig);
-
- TKikimrRunner kikimr(settings);
-
- auto db = kikimr.GetTableClient();
-
- auto& server = kikimr.GetTestServer();
- auto* runtime = server.GetRuntime();
- Y_UNUSED(runtime);
- auto kqpProxy = MakeKqpProxyID(runtime->GetNodeId(0));
-
- auto sender = runtime->AllocateEdgeActor();
- auto shards = GetTableShards(&server, sender, "/Root/KeyValueLargePartition");
-
- TVector<ui64> collectedKeys;
- CollectKeysTo(&collectedKeys, runtime, sender);
+ TTestSetup s;
+ auto shards = s.Shards();
NKikimrTxDataShard::TEvRead evread;
evread.SetMaxRowsInResult(12);
@@ -573,78 +601,44 @@ Y_UNIT_TEST_SUITE(KqpSplit) {
auto* shim = new TReadActorPipeCacheStub();
shim->SetupCapture(1, 1);
shim->SetupResultsCapture(1);
- InterceptReadActorPipeCache(runtime->Register(shim));
- SendScanQuery(runtime, kqpProxy, sender, "SELECT Key FROM `/Root/KeyValueLargePartition`" + OrderBy(Order));
+ InterceptReadActorPipeCache(s.Runtime->Register(shim));
+ s.SendScanQuery("SELECT Key FROM `/Root/KeyValueLargePartition`" + OrderBy(Order));
shim->ReadsReceived.WaitI();
Cerr << "starting split -----------------------------------------------------------" << Endl;
- SetSplitMergePartCountLimit(runtime, -1);
- {
- auto senderSplit = runtime->AllocateEdgeActor();
- ui64 txId = AsyncSplitTable(&server, senderSplit, "/Root/KeyValueLargePartition", shards.at(0), 402);
- WaitTxNotification(&server, senderSplit, txId);
- shards = GetTableShards(&server, sender, "/Root/KeyValueLargePartition");
+ s.Split(shards.at(0), 402);
+ shards = s.Shards();
+ s.Split(shards.at(1), 404);
- txId = AsyncSplitTable(&server, senderSplit, "/Root/KeyValueLargePartition", shards.at(1), 404);
- WaitTxNotification(&server, senderSplit, txId);
- }
Cerr << "resume evread -----------------------------------------------------------" << Endl;
shim->SkipAll();
shim->AllowResults();
- shim->SendCaptured(runtime);
+ shim->SendCaptured(s.Runtime);
- auto reply = runtime->GrabEdgeEventRethrow<TEvKqp::TEvQueryResponse>(sender);
- UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetRef().GetYdbStatus(), Ydb::StatusIds::SUCCESS);
- UNIT_ASSERT_VALUES_EQUAL(Format(Canonize(collectedKeys, Order)), ALL);
+ s.AssertSuccess();
+ UNIT_ASSERT_VALUES_EQUAL(Format(Canonize(s.CollectedKeys, Order)), ALL);
}
Y_UNIT_TEST_SORT(AfterResolvePoints, Order) {
- TKikimrSettings settings;
- NKikimrConfig::TAppConfig appConfig;
- appConfig.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(true);
- settings.SetDomainRoot(KikimrDefaultUtDomainRoot);
- TFeatureFlags flags;
- flags.SetEnablePredicateExtractForScanQueries(true);
- settings.SetFeatureFlags(flags);
- settings.SetAppConfig(appConfig);
-
- TKikimrRunner kikimr(settings);
-
- auto db = kikimr.GetTableClient();
-
- auto& server = kikimr.GetTestServer();
- auto* runtime = server.GetRuntime();
- Y_UNUSED(runtime);
- auto kqpProxy = MakeKqpProxyID(runtime->GetNodeId(0));
-
- auto sender = runtime->AllocateEdgeActor();
- auto shards = GetTableShards(&server, sender, "/Root/KeyValueLargePartition");
-
- TVector<ui64> collectedKeys;
- CollectKeysTo(&collectedKeys, runtime, sender);
+ TTestSetup s;
+ auto shards = s.Shards();
auto* shim = new TReadActorPipeCacheStub();
- InterceptReadActorPipeCache(runtime->Register(shim));
+ InterceptReadActorPipeCache(s.Runtime->Register(shim));
shim->SetupCapture(0, 5);
- SendScanQuery(runtime, kqpProxy, sender,
+ s.SendScanQuery(
"PRAGMA Kikimr.OptEnablePredicateExtract=\"false\"; SELECT Key FROM `/Root/KeyValueLargePartition` where Key in (103, 302, 402, 502, 703)" + OrderBy(Order));
shim->ReadsReceived.WaitI();
Cerr << "starting split -----------------------------------------------------------" << Endl;
- SetSplitMergePartCountLimit(runtime, -1);
- {
- auto senderSplit = runtime->AllocateEdgeActor();
- ui64 txId = AsyncSplitTable(&server, senderSplit, "/Root/KeyValueLargePartition", shards.at(0), 400);
- WaitTxNotification(&server, senderSplit, txId);
- }
+ s.Split(shards.at(0), 400);
Cerr << "resume evread -----------------------------------------------------------" << Endl;
shim->SkipAll();
- shim->SendCaptured(runtime);
+ shim->SendCaptured(s.Runtime);
- auto reply = runtime->GrabEdgeEventRethrow<TEvKqp::TEvQueryResponse>(sender);
- UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetRef().GetYdbStatus(), Ydb::StatusIds::SUCCESS);
- UNIT_ASSERT_VALUES_EQUAL(Format(Canonize(collectedKeys, Order)), ",103,302,402,502,703");
+ s.AssertSuccess();
+ UNIT_ASSERT_VALUES_EQUAL(Format(Canonize(s.CollectedKeys, Order)), ",103,302,402,502,703");
}
}