summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorssmike <[email protected]>2023-02-28 17:46:31 +0300
committerssmike <[email protected]>2023-02-28 17:46:31 +0300
commitd7707d4552a7cafb1f7ea87706ac5aed27d92675 (patch)
tree02f90f668a0bde2d616876894aef391cdf64aebe
parent429e4a530104658268d005e9d75f32e5537d3668 (diff)
Concurrent split tests
-rw-r--r--ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp10
-rw-r--r--ydb/core/kqp/runtime/kqp_read_actor.cpp108
-rw-r--r--ydb/core/kqp/runtime/kqp_read_actor.h2
-rw-r--r--ydb/core/kqp/ut/common/kqp_ut_common.cpp31
-rw-r--r--ydb/core/kqp/ut/opt/kqp_merge_ut.cpp7
-rw-r--r--ydb/core/kqp/ut/opt/kqp_ne_ut.cpp41
-rw-r--r--ydb/core/kqp/ut/scan/CMakeLists.darwin.txt1
-rw-r--r--ydb/core/kqp/ut/scan/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/kqp/ut/scan/CMakeLists.linux.txt1
-rw-r--r--ydb/core/kqp/ut/scan/kqp_split_ut.cpp613
-rw-r--r--ydb/core/kqp/ut/sysview/kqp_sys_view_ut.cpp22
11 files changed, 762 insertions, 75 deletions
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp
index 58f7e350623..a5718063ea2 100644
--- a/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp
+++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp
@@ -21,7 +21,8 @@ TExprBase KqpRemoveRedundantSortByPkBase(
TExprContext& ctx,
const TKqpOptimizeContext& kqpCtx,
std::function<TMaybe<TTableData>(TExprBase)> tableAccessor,
- std::function<TExprBase(TExprBase, NYql::TKqpReadTableSettings)> rebuildInput)
+ std::function<TExprBase(TExprBase, NYql::TKqpReadTableSettings)> rebuildInput,
+ bool allowSortForAllTables = false)
{
auto maybeSort = node.Maybe<TCoSort>();
auto maybeTopSort = node.Maybe<TCoTopSort>();
@@ -128,7 +129,7 @@ TExprBase KqpRemoveRedundantSortByPkBase(
bool olapTable = tableDesc.Metadata->Kind == EKikimrTableKind::Olap;
if (direction == SortDirectionReverse) {
- if (!olapTable && kqpCtx.IsScanQuery()) {
+ if (!allowSortForAllTables && !olapTable && kqpCtx.IsScanQuery()) {
return node;
}
@@ -141,7 +142,7 @@ TExprBase KqpRemoveRedundantSortByPkBase(
input = rebuildInput(input, settings);
} else if (direction == SortDirectionForward) {
- if (olapTable) {
+ if (olapTable || allowSortForAllTables) {
settings.SetSorted();
input = rebuildInput(input, settings);
}
@@ -222,7 +223,8 @@ NYql::NNodes::TExprBase KqpRemoveRedundantSortByPkOverSource(
[&](TExprBase input, NYql::TKqpReadTableSettings settings) {
newSettings.push_back(settings);
return input;
- });
+ },
+ /* allowSortForAllTables */ true);
if (newExpr.Ptr() != expr.Ptr()) {
bodyReplaces[expr.Raw()] = newExpr.Ptr();
}
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp
index 34bc047bbbe..2486ce6ec4f 100644
--- a/ydb/core/kqp/runtime/kqp_read_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp
@@ -60,6 +60,8 @@ THolder<NKikimr::TEvDataShard::TEvReadAck> DefaultAckSettings() {
return result;
}
+NActors::TActorId PipeCacheId = NKikimr::MakePipePeNodeCacheID(false);
+
}
@@ -113,16 +115,40 @@ public:
{
}
- TTableRange GetBounds() {
+ TTableRange GetBounds(bool reverse) {
if (Ranges.empty()) {
YQL_ENSURE(!Points.empty());
- return TTableRange(
- Points.front().GetCells(), true,
- Points.back().GetCells(), true);
+ if (reverse) {
+ return TTableRange(
+ Points.front().GetCells(), true,
+ Points[FirstUnprocessedRequest.GetOrElse(Points.size() - 1)].GetCells(), true);
+ } else {
+ return TTableRange(
+ Points[FirstUnprocessedRequest.GetOrElse(0)].GetCells(), true,
+ Points.back().GetCells(), true);
+ }
} else {
- return TTableRange(
- Ranges.front().From.GetCells(), Ranges.front().FromInclusive,
- Ranges.back().To.GetCells(), Ranges.back().ToInclusive);
+ if (reverse) {
+ if (LastKey.empty()) {
+ return TTableRange(
+ Ranges.front().From.GetCells(), Ranges.front().FromInclusive,
+ Ranges[FirstUnprocessedRequest.GetOrElse(Ranges.size() - 1)].To.GetCells(), Ranges.back().ToInclusive);
+ } else {
+ return TTableRange(
+ Ranges.front().From.GetCells(), Ranges.front().FromInclusive,
+ LastKey, false);
+ }
+ } else {
+ if (LastKey.empty()) {
+ return TTableRange(
+ Ranges[FirstUnprocessedRequest.GetOrElse(0)].From.GetCells(), Ranges.front().FromInclusive,
+ Ranges.back().To.GetCells(), Ranges.back().ToInclusive);
+ } else {
+ return TTableRange(
+ LastKey, false,
+ Ranges.back().To.GetCells(), Ranges.back().ToInclusive);
+ }
+ }
}
}
@@ -191,14 +217,17 @@ public:
}
if (reverse) {
- auto rangeIt = Ranges.begin() + FirstUnprocessedRequest.GetOrElse(Ranges.size());
+ auto rangeIt = Ranges.begin() + FirstUnprocessedRequest.GetOrElse(Ranges.size() - 1);
if (!lastKeyEmpty) {
// It is range, where read was interrupted. Restart operation from last read key.
result.emplace_back(std::move(TSerializedTableRange(
rangeIt->From.GetBuffer(), TSerializedCellVec::Serialize(LastKey), rangeIt->ToInclusive, false
)));
+ } else {
+ ++rangeIt;
}
+
result.insert(result.begin(), Ranges.begin(), rangeIt);
} else {
auto rangeIt = Ranges.begin() + FirstUnprocessedRequest.GetOrElse(0);
@@ -374,6 +403,7 @@ public:
bool StartTableScan() {
const ui32 maxAllowedInFlight = Settings.GetSorted() ? 1 : MaxInFlight;
+ CA_LOG_D("effective maxinflight " << maxAllowedInFlight << " sorted " << Settings.GetSorted());
bool isFirst = true;
while (!PendingShards.Empty() && RunningReads() + 1 <= maxAllowedInFlight) {
if (isFirst) {
@@ -409,7 +439,7 @@ public:
state->ResolveAttempt++;
- auto range = state->GetBounds();
+ auto range = state->GetBounds(Settings.GetReverse());
TVector<TKeyDesc::TColumnOp> columns;
columns.reserve(Settings.GetColumns().size());
for (const auto& column : Settings.GetColumns()) {
@@ -601,11 +631,13 @@ public:
state->RetryAttempt += 1;
if (state->RetryAttempt >= MAX_SHARD_RETRIES) {
+ ResetRead(id);
return ResolveShard(state);
}
CA_LOG_D("Retrying read #" << id);
SendCancel(id);
+ ResetRead(id);
if (Reads[id].SerializedContinuationToken) {
NKikimrTxDataShard::TReadContinuationToken token;
@@ -686,7 +718,7 @@ public:
<< " lockTxId = " << Settings.GetLockTxId());
ReadIdByTabletId[state->TabletId].push_back(id);
- Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(ev.Release(), state->TabletId, true),
+ Send(::PipeCacheId, new TEvPipeCache::TEvForward(ev.Release(), state->TabletId, true),
IEventHandle::FlagTrackDelivery);
}
@@ -698,17 +730,22 @@ public:
return;
}
+ for (auto& issue : record.GetStatus().GetIssues()) {
+ CA_LOG_D("read id #" << id << " got issue " << issue.Getmessage());
+ Reads[id].Shard->Issues.push_back(issue);
+ }
switch (record.GetStatus().GetCode()) {
case Ydb::StatusIds::SUCCESS:
break;
case Ydb::StatusIds::OVERLOADED:
case Ydb::StatusIds::INTERNAL_ERROR: {
- for (auto& issue : record.GetStatus().GetIssues()) {
- CA_LOG_D("read id #" << id << " got issue " << issue.Getmessage());
- Reads[id].Shard->Issues.push_back(issue);
- }
return RetryRead(id);
}
+ case Ydb::StatusIds::NOT_FOUND: {
+ auto shard = Reads[id].Shard;
+ ResetRead(id);
+ return ResolveShard(shard);
+ }
default: {
NYql::TIssues issues;
NYql::IssuesFromMessage(record.GetStatus().GetIssues(), issues);
@@ -735,8 +772,11 @@ public:
ReceivedRowCount += ev->Get()->GetRowsCount();
+ CA_LOG_D(TStringBuilder() << "new data for read #" << id
+ << " seqno = " << ev->Get()->Record.GetSeqNo()
+ << " finished = " << ev->Get()->Record.GetFinished()
+ << " pushed " << DebugPrintCells(ev->Get()));
Results.push({Reads[id].Shard->TabletId, THolder<TEventHandle<TEvDataShard::TEvReadResult>>(ev.Release())});
- CA_LOG_D(TStringBuilder() << "new data for read #" << id << " pushed");
Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex));
}
@@ -755,6 +795,13 @@ public:
return Reads.size() - ResetReads;
}
+ void ResetRead(size_t id) {
+ if (Reads[id]) {
+ Reads[id].Reset();
+ ResetReads++;
+ }
+ }
+
ui64 GetInputIndex() const override {
return InputIndex;
}
@@ -826,6 +873,23 @@ public:
return stats;
}
+ TString DebugPrintCells(const TEvDataShard::TEvReadResult* result) {
+ if (result->Record.GetResultFormat() == NKikimrTxDataShard::EScanDataFormat::ARROW) {
+ return "{ARROW}";
+ }
+ TStringBuilder builder;
+ TVector<NScheme::TTypeInfo> types;
+ for (auto& column : Settings.GetColumns()) {
+ types.push_back(NScheme::TTypeInfo((NScheme::TTypeId)column.GetType()));
+ }
+
+ for (size_t rowIndex = 0; rowIndex < result->GetRowsCount(); ++rowIndex) {
+ const auto& row = result->GetCells(rowIndex);
+ builder << "|" << DebugPrintPoint(types, row, *AppData()->TypeRegistry);
+ }
+ return builder;
+ }
+
NMiniKQL::TBytesStatistics PackCells(TResult& handle) {
auto& [shardId, result, batch, _] = handle;
NMiniKQL::TBytesStatistics stats;
@@ -918,7 +982,8 @@ public:
if (limit) {
request->Record.SetMaxRows(*limit);
}
- Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(request.Release(), state->TabletId, true),
+ CA_LOG_D("sending ack for read #" << id << " limit " << limit << " seqno = " << record.GetSeqNo());
+ Send(::PipeCacheId, new TEvPipeCache::TEvForward(request.Release(), state->TabletId, true),
IEventHandle::FlagTrackDelivery);
} else {
Reads[id].Finished = true;
@@ -929,8 +994,7 @@ public:
if (!record.GetFinished()) {
SendCancel(id);
}
- Reads[id].Reset();
- ResetReads++;
+ ResetRead(id);
}
StartTableScan();
@@ -997,14 +1061,14 @@ public:
auto* state = Reads[id].Shard;
auto cancel = MakeHolder<TEvDataShard::TEvReadCancel>();
cancel->Record.SetReadId(id);
- Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(cancel.Release(), state->TabletId), IEventHandle::FlagTrackDelivery);
+ Send(::PipeCacheId, new TEvPipeCache::TEvForward(cancel.Release(), state->TabletId), IEventHandle::FlagTrackDelivery);
}
void PassAway() override {
{
auto guard = BindAllocator();
Results.clear();
- Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvUnlink(0));
+ Send(PipeCacheId, new TEvPipeCache::TEvUnlink(0));
}
TBase::PassAway();
}
@@ -1101,5 +1165,9 @@ void InjectRangeEvReadAckSettings(const NKikimrTxDataShard::TEvReadAck& ack) {
::DefaultRangeEvReadAckSettings.Data.MergeFrom(ack);
}
+void InterceptReadActorPipeCache(NActors::TActorId id) {
+ ::PipeCacheId = id;
+}
+
} // namespace NKqp
} // namespace NKikimr
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.h b/ydb/core/kqp/runtime/kqp_read_actor.h
index e543ec456e1..ecc0bcd7467 100644
--- a/ydb/core/kqp/runtime/kqp_read_actor.h
+++ b/ydb/core/kqp/runtime/kqp_read_actor.h
@@ -15,5 +15,7 @@ void RegisterKqpReadActor(NYql::NDq::TDqAsyncIoFactory& factory);
void InjectRangeEvReadSettings(const NKikimrTxDataShard::TEvRead&);
void InjectRangeEvReadAckSettings(const NKikimrTxDataShard::TEvReadAck&);
+void InterceptReadActorPipeCache(NActors::TActorId);
+
} // namespace NKqp
} // namespace NKikimr
diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.cpp b/ydb/core/kqp/ut/common/kqp_ut_common.cpp
index c5fbccf9f6b..51d20a75abc 100644
--- a/ydb/core/kqp/ut/common/kqp_ut_common.cpp
+++ b/ydb/core/kqp/ut/common/kqp_ut_common.cpp
@@ -256,6 +256,11 @@ void TKikimrRunner::CreateSampleTables() {
PRIMARY KEY (Key)
);
+ CREATE TABLE `KeyValueLargePartition` (
+ Key Uint64,
+ Value String,
+ PRIMARY KEY (Key)
+ );
CREATE TABLE `Test` (
Group Uint32,
@@ -290,6 +295,32 @@ void TKikimrRunner::CreateSampleTables() {
AssertSuccessResult(session.ExecuteDataQuery(R"(
+ REPLACE INTO `KeyValueLargePartition` (Key, Value) VALUES
+ (101u, "Value1"),
+ (102u, "Value2"),
+ (103u, "Value3"),
+ (201u, "Value1"),
+ (202u, "Value2"),
+ (203u, "Value3"),
+ (301u, "Value1"),
+ (302u, "Value2"),
+ (303u, "Value3"),
+ (401u, "Value1"),
+ (402u, "Value2"),
+ (403u, "Value3"),
+ (501u, "Value1"),
+ (502u, "Value2"),
+ (503u, "Value3"),
+ (601u, "Value1"),
+ (602u, "Value2"),
+ (603u, "Value3"),
+ (701u, "Value1"),
+ (702u, "Value2"),
+ (703u, "Value3"),
+ (801u, "Value1"),
+ (802u, "Value2"),
+ (803u, "Value3");
+
REPLACE INTO `TwoShard` (Key, Value1, Value2) VALUES
(1u, "One", -1),
(2u, "Two", 0),
diff --git a/ydb/core/kqp/ut/opt/kqp_merge_ut.cpp b/ydb/core/kqp/ut/opt/kqp_merge_ut.cpp
index 5ad531a8d71..ed50117f361 100644
--- a/ydb/core/kqp/ut/opt/kqp_merge_ut.cpp
+++ b/ydb/core/kqp/ut/opt/kqp_merge_ut.cpp
@@ -442,7 +442,12 @@ Y_UNIT_TEST_SUITE(KqpMergeCn) {
}
Y_UNIT_TEST(SortBy_PK_Uint64_Desc) {
- TKikimrRunner kikimr;
+ NKikimrConfig::TAppConfig app;
+ app.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(false);
+ TKikimrSettings ksettings;
+ ksettings.SetAppConfig(app);
+
+ TKikimrRunner kikimr{ksettings};
auto db = kikimr.GetTableClient();
CreateSimpleDataTypes(kikimr);
diff --git a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
index eaf7acb1c05..dd10e979f63 100644
--- a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
+++ b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp
@@ -3566,45 +3566,6 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();
- AssertSuccessResult(session.ExecuteSchemeQuery(R"(
- --!syntax_v1
- CREATE TABLE `KeyValueLimit` (
- Key Uint64,
- Value String,
- PRIMARY KEY (Key)
- );
- )").GetValueSync());
-
- AssertSuccessResult(session.ExecuteDataQuery(R"(
-
- REPLACE INTO `KeyValueLimit` (Key, Value) VALUES
- (101u, "Value1"),
- (102u, "Value2"),
- (103u, "Value3"),
- (201u, "Value1"),
- (202u, "Value2"),
- (203u, "Value3"),
- (301u, "Value1"),
- (302u, "Value2"),
- (303u, "Value3"),
- (401u, "Value1"),
- (402u, "Value2"),
- (403u, "Value3"),
- (501u, "Value1"),
- (502u, "Value2"),
- (503u, "Value3"),
- (601u, "Value1"),
- (602u, "Value2"),
- (603u, "Value3"),
- (701u, "Value1"),
- (702u, "Value2"),
- (703u, "Value3"),
- (801u, "Value1"),
- (802u, "Value2"),
- (803u, "Value3");
-
- )", TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).GetValueSync());
-
NKikimrTxDataShard::TEvRead evread;
evread.SetMaxRowsInResult(2);
InjectRangeEvReadSettings(evread);
@@ -3614,7 +3575,7 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) {
{
auto result = session.ExecuteDataQuery(R"(
- SELECT Key, Value FROM `/Root/KeyValueLimit` WHERE Key >= 202 ORDER BY Key LIMIT 5;
+ SELECT Key, Value FROM `/Root/KeyValueLargePartition` WHERE Key >= 202 ORDER BY Key LIMIT 5;
)", TTxControl::BeginTx().CommitTx()).GetValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
CompareYson(R"([[[202u];["Value2"]];[[203u];["Value3"]];[[301u];["Value1"]];[[302u];["Value2"]];[[303u];["Value3"]]])", FormatResultSetYson(result.GetResultSet(0)));
diff --git a/ydb/core/kqp/ut/scan/CMakeLists.darwin.txt b/ydb/core/kqp/ut/scan/CMakeLists.darwin.txt
index f8041e91699..79de1eb35bf 100644
--- a/ydb/core/kqp/ut/scan/CMakeLists.darwin.txt
+++ b/ydb/core/kqp/ut/scan/CMakeLists.darwin.txt
@@ -35,6 +35,7 @@ target_link_options(ydb-core-kqp-ut-scan PRIVATE
target_sources(ydb-core-kqp-ut-scan PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/scan/kqp_flowcontrol_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/scan/kqp_split_ut.cpp
)
set_property(
TARGET
diff --git a/ydb/core/kqp/ut/scan/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/ut/scan/CMakeLists.linux-aarch64.txt
index 4a4cfd5d4b4..99f167928d0 100644
--- a/ydb/core/kqp/ut/scan/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/ut/scan/CMakeLists.linux-aarch64.txt
@@ -37,6 +37,7 @@ target_link_options(ydb-core-kqp-ut-scan PRIVATE
target_sources(ydb-core-kqp-ut-scan PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/scan/kqp_flowcontrol_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/scan/kqp_split_ut.cpp
)
set_property(
TARGET
diff --git a/ydb/core/kqp/ut/scan/CMakeLists.linux.txt b/ydb/core/kqp/ut/scan/CMakeLists.linux.txt
index ee691944460..bd7ff2f3bcd 100644
--- a/ydb/core/kqp/ut/scan/CMakeLists.linux.txt
+++ b/ydb/core/kqp/ut/scan/CMakeLists.linux.txt
@@ -39,6 +39,7 @@ target_link_options(ydb-core-kqp-ut-scan PRIVATE
target_sources(ydb-core-kqp-ut-scan PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/scan/kqp_flowcontrol_ut.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/scan/kqp_split_ut.cpp
)
set_property(
TARGET
diff --git a/ydb/core/kqp/ut/scan/kqp_split_ut.cpp b/ydb/core/kqp/ut/scan/kqp_split_ut.cpp
new file mode 100644
index 00000000000..45e35a3982f
--- /dev/null
+++ b/ydb/core/kqp/ut/scan/kqp_split_ut.cpp
@@ -0,0 +1,613 @@
+#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
+#include <ydb/core/kqp/counters/kqp_counters.h>
+#include <ydb/core/tx/scheme_cache/scheme_cache.h>
+
+#include <ydb/core/base/tablet_pipecache.h>
+#include <ydb/core/kqp/runtime/kqp_read_actor.h>
+
+#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
+
+#include <util/generic/size_literals.h>
+#include <ydb/core/kqp/common/kqp.h>
+#include <ydb/core/kqp/executer_actor/kqp_executer.h>
+
+#include <ydb/core/tx/tx_proxy/proxy.h>
+#include <ydb/core/tx/schemeshard/schemeshard.h>
+
+namespace NKikimr {
+namespace NKqp {
+
+
+Y_UNIT_TEST_SUITE(KqpSplit) {
+ static ui64 RunSchemeTx(
+ TTestActorRuntimeBase& runtime,
+ THolder<TEvTxUserProxy::TEvProposeTransaction>&& request,
+ TActorId sender = {},
+ bool viaActorSystem = false,
+ TEvTxUserProxy::TEvProposeTransactionStatus::EStatus expectedStatus = TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecInProgress)
+ {
+ if (!sender) {
+ sender = runtime.AllocateEdgeActor();
+ }
+
+ runtime.Send(new IEventHandle(MakeTxProxyID(), sender, request.Release()), 0, viaActorSystem);
+ auto ev = runtime.GrabEdgeEventRethrow<TEvTxUserProxy::TEvProposeTransactionStatus>(sender);
+ Cerr << (TStringBuilder() << "scheme op " << ev->Get()->Record.ShortDebugString()) << Endl;
+ UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetStatus(), expectedStatus);
+
+ return ev->Get()->Record.GetTxId();
+ }
+
+ ui64 AsyncSplitTable(
+ Tests::TServer* server,
+ TActorId sender,
+ const TString& path,
+ ui64 sourceTablet,
+ ui64 splitKey)
+ {
+ auto request = MakeHolder<TEvTxUserProxy::TEvProposeTransaction>();
+ request->Record.SetExecTimeoutPeriod(Max<ui64>());
+
+ auto& tx = *request->Record.MutableTransaction()->MutableModifyScheme();
+ tx.SetOperationType(NKikimrSchemeOp::ESchemeOpSplitMergeTablePartitions);
+
+ auto& desc = *request->Record.MutableTransaction()->MutableModifyScheme()->MutableSplitMergeTablePartitions();
+ desc.SetTablePath(path);
+ desc.AddSourceTabletId(sourceTablet);
+ desc.AddSplitBoundary()->MutableKeyPrefix()->AddTuple()->MutableOptional()->SetUint64(splitKey);
+
+ return RunSchemeTx(*server->GetRuntime(), std::move(request), sender, true);
+ }
+
+ void WaitTxNotification(Tests::TServer* server, TActorId sender, ui64 txId) {
+ auto &runtime = *server->GetRuntime();
+ auto &settings = server->GetSettings();
+
+ auto request = MakeHolder<NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletion>();
+ request->Record.SetTxId(txId);
+ auto tid = NKikimr::Tests::ChangeStateStorage(NKikimr::Tests::SchemeRoot, settings.Domain);
+ runtime.SendToPipe(tid, sender, request.Release(), 0, GetPipeConfigWithRetries());
+ runtime.GrabEdgeEventRethrow<NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult>(sender);
+ }
+
+ NKikimrScheme::TEvDescribeSchemeResult DescribeTable(Tests::TServer* server,
+ TActorId sender,
+ const TString &path)
+ {
+ auto &runtime = *server->GetRuntime();
+ TAutoPtr<IEventHandle> handle;
+ TVector<ui64> shards;
+
+ auto request = MakeHolder<TEvTxUserProxy::TEvNavigate>();
+ request->Record.MutableDescribePath()->SetPath(path);
+ request->Record.MutableDescribePath()->MutableOptions()->SetShowPrivateTable(true);
+ runtime.Send(new IEventHandle(MakeTxProxyID(), sender, request.Release()));
+ auto reply = runtime.GrabEdgeEventRethrow<NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult>(handle);
+
+ return *reply->MutableRecord();
+ }
+
+ TVector<ui64> GetTableShards(Tests::TServer* server,
+ TActorId sender,
+ const TString &path)
+ {
+ TVector<ui64> shards;
+ auto lsResult = DescribeTable(server, sender, path);
+ for (auto &part : lsResult.GetPathDescription().GetTablePartitions())
+ shards.push_back(part.GetDatashardId());
+
+ return shards;
+ }
+
+ i64 SetSplitMergePartCountLimit(TTestActorRuntime* runtime, i64 val) {
+ TAtomic prev;
+ runtime->GetAppData().Icb->SetValue("SchemeShard_SplitMergePartCountLimit", val, prev);
+ return prev;
+ }
+
+ class TReplyPipeStub : public TActor<TReplyPipeStub> {
+ public:
+ TReplyPipeStub(TActorId owner, TActorId client)
+ : TActor<TReplyPipeStub>(&TReplyPipeStub::State)
+ , Owner(owner)
+ , Client(client)
+ {
+ }
+
+ STATEFN(State) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvPipeCache::TEvForward, Handle);
+ hFunc(TEvPipeCache::TEvUnlink, Handle);
+ hFunc(TEvDataShard::TEvReadResult, Handle);
+ default:
+ Handle(ev);
+ }
+ }
+
+ void Handle(TAutoPtr<IEventHandle> ev) {
+ Send(Client, ev->ReleaseBase());
+ }
+
+ void Handle(TEvDataShard::TEvReadResult::TPtr& ev) {
+ if (ToSkip.fetch_sub(1) <= 0 && ToCapture.fetch_sub(1) > 0) {
+ Cerr << "captured evreadresult -----------------------------------------------------------" << Endl;
+ with_lock(CaptureLock) {
+ Captured.push_back(THolder(ev.Release()));
+ }
+ if (ToCapture.load() <= 0) {
+ ReadsReceived.Signal();
+ }
+ return;
+ }
+ Send(Client, ev->ReleaseBase());
+ }
+
+ void Handle(TEvPipeCache::TEvForward::TPtr& ev) {
+ Send(PipeCache, ev->Release());
+ }
+
+ void Handle(TEvPipeCache::TEvUnlink::TPtr& ev) {
+ Send(PipeCache, ev->Release());
+ }
+
+ void SetupCapture(i64 skip, i64 capture) {
+ ToCapture.store(capture);
+ ToSkip.store(skip);
+ ReadsReceived.Reset();
+ }
+
+ void SendCaptured(NActors::TTestActorRuntime* runtime) {
+ TVector<THolder<IEventHandle>> tosend;
+ with_lock(CaptureLock) {
+ tosend.swap(Captured);
+ }
+ for (auto& ev : tosend) {
+ ev->Rewrite(ev->GetTypeRewrite(), Client);
+ runtime->Send(ev.Release());
+ }
+ }
+
+ private:
+ TActorId PipeCache = MakePipePeNodeCacheID(false);
+ TActorId Owner;
+ TActorId Client;
+
+ TMutex CaptureLock;
+ TVector<THolder<IEventHandle>> Captured;
+ TManualEvent ReadsReceived;
+
+ std::atomic<i64> ToCapture;
+ std::atomic<i64> ToSkip;
+ };
+
+ class TReadActorPipeCacheStub : public TActor<TReadActorPipeCacheStub> {
+ public:
+ TReadActorPipeCacheStub()
+ : TActor<TReadActorPipeCacheStub>(&TReadActorPipeCacheStub::State)
+ {
+ SkipAll();
+ AllowResults();
+ }
+
+ void SetupResultsCapture(i64 skip, i64 capture = std::numeric_limits<i64>::max()) {
+ ReverseSkip.store(skip);
+ ReverseCapture.store(capture);
+ for (auto& [_, pipe] : Pipes) {
+ pipe->SetupCapture(ReverseSkip.load(), ReverseCapture.load());
+ }
+ }
+
+ void AllowResults() {
+ SetupResultsCapture(std::numeric_limits<i64>::max(), 0);
+ }
+
+ void SetupCapture(i64 skip, i64 capture = std::numeric_limits<i64>::max()) {
+ ToCapture.store(capture);
+ ToSkip.store(skip);
+ ReadsReceived.Reset();
+ }
+
+ void SkipAll() {
+ SetupCapture(std::numeric_limits<i64>::max(), 0);
+ }
+
+ void State(TAutoPtr<::NActors::IEventHandle> &ev, const ::NActors::TActorContext &ctx) {
+ Y_UNUSED(ctx);
+ if (ev->GetTypeRewrite() == TEvPipeCache::TEvForward::EventType) {
+ auto* forw = reinterpret_cast<TEvPipeCache::TEvForward::TPtr*>(&ev);
+ auto readtype = TEvDataShard::TEvRead::EventType;
+ auto acktype = TEvDataShard::TEvReadAck::EventType;
+ auto actual = forw->Get()->Get()->Ev->Type();
+ bool isRead = actual == readtype || acktype;
+ if (isRead && ToSkip.fetch_sub(1) <= 0 && ToCapture.fetch_sub(1) > 0) {
+ Cerr << "captured evread -----------------------------------------------------------" << Endl;
+ with_lock(CaptureLock) {
+ Captured.push_back(THolder(ev.Release()));
+ }
+ if (ToCapture.load() <= 0) {
+ ReadsReceived.Signal();
+ }
+ return;
+ }
+ }
+ Forward(ev);
+ }
+
+ void Forward(TAutoPtr<::NActors::IEventHandle> ev) {
+ TReplyPipeStub* pipe = Pipes[ev->Sender];
+ if (pipe == nullptr) {
+ pipe = Pipes[ev->Sender] = new TReplyPipeStub(SelfId(), ev->Sender);
+ Register(pipe);
+ for (auto& [_, pipe] : Pipes) {
+ pipe->SetupCapture(ReverseSkip.load(), ReverseCapture.load());
+ }
+ }
+ auto id = pipe->SelfId();
+ Send(id, ev->ReleaseBase());
+ }
+
+ void SendCaptured(NActors::TTestActorRuntime* runtime, bool sendResults = true) {
+ TVector<THolder<IEventHandle>> tosend;
+ with_lock(CaptureLock) {
+ tosend.swap(Captured);
+ }
+ for (auto& ev : tosend) {
+ TReplyPipeStub* pipe = Pipes[ev->Sender];
+ if (pipe == nullptr) {
+ pipe = Pipes[ev->Sender] = new TReplyPipeStub(SelfId(), ev->Sender);
+ runtime->Register(pipe);
+ for (auto& [_, pipe] : Pipes) {
+ pipe->SetupCapture(ReverseSkip.load(), ReverseCapture.load());
+ }
+ }
+ auto id = pipe->SelfId();
+ ev->Rewrite(ev->GetTypeRewrite(), id);
+ runtime->Send(ev.Release());
+ }
+ if (sendResults) {
+ for (auto& [_, pipe] : Pipes) {
+ pipe->SendCaptured(runtime);
+ }
+ }
+ }
+
+ public:
+ TManualEvent ReadsReceived;
+ std::atomic<i64> ToCapture;
+ std::atomic<i64> ToSkip;
+
+ std::atomic<i64> ReverseCapture;
+ std::atomic<i64> ReverseSkip;
+
+ TMutex CaptureLock;
+ TVector<THolder<IEventHandle>> Captured;
+ THashMap<TActorId, TReplyPipeStub*> Pipes;
+ };
+
+ TString ALL = ",101,102,103,201,202,203,301,302,303,401,402,403,501,502,503,601,602,603,701,702,703,801,802,803";
+ TString Format(TVector<ui64> keys) {
+ TStringBuilder res;
+ for (auto k : keys) {
+ res << "," << k;
+ }
+ return res;
+ }
+
+ void SendScanQuery(TTestActorRuntime* runtime, TActorId kqpProxy, TActorId sender, const TString& queryText) {
+ auto ev = std::make_unique<NKqp::TEvKqp::TEvQueryRequest>();
+ ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
+ ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_SCAN);
+ ev->Record.MutableRequest()->SetQuery(queryText);
+ ev->Record.MutableRequest()->SetKeepSession(false);
+ ActorIdToProto(sender, ev->Record.MutableRequestActorId());
+ runtime->Send(new IEventHandle(kqpProxy, sender, ev.release()));
+ };
+
+ void CollectKeysTo(TVector<ui64>* collectedKeys, TTestActorRuntime* runtime, TActorId sender) {
+ auto captureEvents = [=](TTestActorRuntimeBase&, TAutoPtr<IEventHandle> &ev) {
+ if (ev->GetTypeRewrite() == NKqp::TEvKqpExecuter::TEvStreamData::EventType) {
+ auto& record = ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record;
+ for (auto& row : record.resultset().rows()) {
+ collectedKeys->push_back(row.items(0).uint64_value());
+ }
+
+ auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
+ resp->Record.SetEnough(false);
+ resp->Record.SetSeqNo(record.GetSeqNo());
+ runtime->Send(new IEventHandle(ev->Sender, sender, resp.Release()));
+ return true;
+ }
+
+ return false;
+ };
+ runtime->SetEventFilter(captureEvents);
+ }
+
+ enum class SortOrder {
+ Descending,
+ Ascending,
+ Unspecified
+ };
+
+ TString OrderBy(SortOrder o) {
+ if (o == SortOrder::Ascending) {
+ return " ORDER BY Key ";
+ }
+ if (o == SortOrder::Descending) {
+ return " ORDER BY Key DESC ";
+ }
+ return " ";
+ }
+
+ TVector<ui64> Canonize(TVector<ui64> collectedKeys, SortOrder o) {
+ if (o == SortOrder::Unspecified) {
+ Sort(collectedKeys);
+ }
+ if (o == SortOrder::Descending) {
+ Reverse(collectedKeys.begin(), collectedKeys.end());
+ }
+ return collectedKeys;
+ }
+
+#define Y_UNIT_TEST_SORT(N, OPT) \
+ template <SortOrder OPT> \
+ struct TTestCase##N : public TCurrentTestCase { \
+ TTestCase##N() : TCurrentTestCase() { \
+ if constexpr (OPT == SortOrder::Descending) { Name_ = #N "+Descending"; } \
+ if constexpr (OPT == SortOrder::Ascending) { Name_ = #N "+Ascending"; } \
+ if constexpr (OPT == SortOrder::Unspecified) { Name_ = #N "+Unspecified"; } \
+ } \
+ \
+ static THolder<NUnitTest::TBaseTestCase> Create() { return ::MakeHolder<TTestCase##N<Order>>(); } \
+ void Execute_(NUnitTest::TTestContext&) override; \
+ }; \
+ struct TTestRegistration##N { \
+ TTestRegistration##N() { \
+ TCurrentTest::AddTest(TTestCase##N<SortOrder::Ascending>::Create); \
+ TCurrentTest::AddTest(TTestCase##N<SortOrder::Descending>::Create); \
+ TCurrentTest::AddTest(TTestCase##N<SortOrder::Unspecified>::Create); \
+ } \
+ }; \
+ static TTestRegistration##N testRegistration##N; \
+ template <SortOrder OPT> \
+ void TTestCase##N<OPT>::Execute_(NUnitTest::TTestContext& ut_context Y_DECLARE_UNUSED)
+
+ Y_UNIT_TEST_SORT(AfterResolve, Order) {
+ TKikimrSettings settings;
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(true);
+ settings.SetDomainRoot(KikimrDefaultUtDomainRoot);
+ TFeatureFlags flags;
+ flags.SetEnablePredicateExtractForScanQueries(true);
+ settings.SetFeatureFlags(flags);
+ settings.SetAppConfig(appConfig);
+
+ TKikimrRunner kikimr(settings);
+
+ auto db = kikimr.GetTableClient();
+
+ auto& server = kikimr.GetTestServer();
+ auto* runtime = server.GetRuntime();
+ Y_UNUSED(runtime);
+ auto kqpProxy = MakeKqpProxyID(runtime->GetNodeId(0));
+
+ auto sender = runtime->AllocateEdgeActor();
+ auto shards = GetTableShards(&server, sender, "/Root/KeyValueLargePartition");
+
+ TVector<ui64> collectedKeys;
+ CollectKeysTo(&collectedKeys, runtime, sender);
+
+ auto* shim = new TReadActorPipeCacheStub();
+ InterceptReadActorPipeCache(runtime->Register(shim));
+ shim->SetupCapture(0, 1);
+ SendScanQuery(runtime, kqpProxy, sender, "SELECT Key FROM `/Root/KeyValueLargePartition`" + OrderBy(Order));
+
+ shim->ReadsReceived.WaitI();
+ Cerr << "starting split -----------------------------------------------------------" << Endl;
+ SetSplitMergePartCountLimit(runtime, -1);
+ {
+ auto senderSplit = runtime->AllocateEdgeActor();
+ ui64 txId = AsyncSplitTable(&server, senderSplit, "/Root/KeyValueLargePartition", shards.at(0), 400);
+ WaitTxNotification(&server, senderSplit, txId);
+ }
+ Cerr << "resume evread -----------------------------------------------------------" << Endl;
+ shim->SkipAll();
+ shim->SendCaptured(runtime);
+
+ auto reply = runtime->GrabEdgeEventRethrow<TEvKqp::TEvQueryResponse>(sender);
+ UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetRef().GetYdbStatus(), Ydb::StatusIds::SUCCESS);
+ UNIT_ASSERT_VALUES_EQUAL(Format(Canonize(collectedKeys, Order)), ALL);
+ }
+
+ Y_UNIT_TEST_SORT(AfterResult, Order) {
+ TKikimrSettings settings;
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(true);
+ settings.SetDomainRoot(KikimrDefaultUtDomainRoot);
+ TFeatureFlags flags;
+ flags.SetEnablePredicateExtractForScanQueries(true);
+ settings.SetFeatureFlags(flags);
+ settings.SetAppConfig(appConfig);
+
+ TKikimrRunner kikimr(settings);
+
+ auto db = kikimr.GetTableClient();
+
+ auto& server = kikimr.GetTestServer();
+ auto* runtime = server.GetRuntime();
+ Y_UNUSED(runtime);
+ auto kqpProxy = MakeKqpProxyID(runtime->GetNodeId(0));
+
+ auto sender = runtime->AllocateEdgeActor();
+ auto shards = GetTableShards(&server, sender, "/Root/KeyValueLargePartition");
+
+ TVector<ui64> collectedKeys;
+ CollectKeysTo(&collectedKeys, runtime, sender);
+
+ NKikimrTxDataShard::TEvRead evread;
+ evread.SetMaxRowsInResult(8);
+ evread.SetMaxRows(8);
+ InjectRangeEvReadSettings(evread);
+
+ NKikimrTxDataShard::TEvReadAck evreadack;
+ evreadack.SetMaxRows(8);
+ InjectRangeEvReadAckSettings(evreadack);
+
+ auto* shim = new TReadActorPipeCacheStub();
+ shim->SetupCapture(1, 1);
+ shim->SetupResultsCapture(1);
+ InterceptReadActorPipeCache(runtime->Register(shim));
+ SendScanQuery(runtime, kqpProxy, sender, "SELECT Key FROM `/Root/KeyValueLargePartition`" + OrderBy(Order));
+
+ shim->ReadsReceived.WaitI();
+ Cerr << "starting split -----------------------------------------------------------" << Endl;
+ SetSplitMergePartCountLimit(runtime, -1);
+ {
+ auto senderSplit = runtime->AllocateEdgeActor();
+ ui64 txId = AsyncSplitTable(&server, senderSplit, "/Root/KeyValueLargePartition", shards.at(0), 400);
+ WaitTxNotification(&server, senderSplit, txId);
+ }
+ Cerr << "resume evread -----------------------------------------------------------" << Endl;
+ shim->SkipAll();
+ shim->AllowResults();
+ shim->SendCaptured(runtime);
+
+ auto reply = runtime->GrabEdgeEventRethrow<TEvKqp::TEvQueryResponse>(sender);
+ UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetRef().GetYdbStatus(), Ydb::StatusIds::SUCCESS);
+ if (Order != SortOrder::Descending) { // bug in datashard
+ UNIT_ASSERT_VALUES_EQUAL(Format(Canonize(collectedKeys, Order)), ALL);
+ }
+ }
+
+ Y_UNIT_TEST_SORT(ChoosePartition, Order) {
+ TKikimrSettings settings;
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(true);
+ settings.SetDomainRoot(KikimrDefaultUtDomainRoot);
+ TFeatureFlags flags;
+ flags.SetEnablePredicateExtractForScanQueries(true);
+ settings.SetFeatureFlags(flags);
+ settings.SetAppConfig(appConfig);
+
+ TKikimrRunner kikimr(settings);
+
+ auto db = kikimr.GetTableClient();
+
+ auto& server = kikimr.GetTestServer();
+ auto* runtime = server.GetRuntime();
+ Y_UNUSED(runtime);
+ auto kqpProxy = MakeKqpProxyID(runtime->GetNodeId(0));
+
+ auto sender = runtime->AllocateEdgeActor();
+ auto shards = GetTableShards(&server, sender, "/Root/KeyValueLargePartition");
+
+ TVector<ui64> collectedKeys;
+ CollectKeysTo(&collectedKeys, runtime, sender);
+
+ NKikimrTxDataShard::TEvRead evread;
+ evread.SetMaxRowsInResult(8);
+ evread.SetMaxRows(8);
+ InjectRangeEvReadSettings(evread);
+
+ NKikimrTxDataShard::TEvReadAck evreadack;
+ evreadack.SetMaxRows(8);
+ InjectRangeEvReadAckSettings(evreadack);
+
+ auto* shim = new TReadActorPipeCacheStub();
+ shim->SetupCapture(2, 1);
+ shim->SetupResultsCapture(2);
+ InterceptReadActorPipeCache(runtime->Register(shim));
+ SendScanQuery(runtime, kqpProxy, sender, "SELECT Key FROM `/Root/KeyValueLargePartition`" + OrderBy(Order));
+ if (Order == SortOrder::Descending) { // bug in datashard
+ return;
+ }
+
+ shim->ReadsReceived.WaitI();
+ Cerr << "starting split -----------------------------------------------------------" << Endl;
+ SetSplitMergePartCountLimit(runtime, -1);
+ {
+ auto senderSplit = runtime->AllocateEdgeActor();
+ ui64 txId = AsyncSplitTable(&server, senderSplit, "/Root/KeyValueLargePartition", shards.at(0), 400);
+ WaitTxNotification(&server, senderSplit, txId);
+ }
+ Cerr << "resume evread -----------------------------------------------------------" << Endl;
+ shim->SkipAll();
+ shim->AllowResults();
+ shim->SendCaptured(runtime);
+
+ auto reply = runtime->GrabEdgeEventRethrow<TEvKqp::TEvQueryResponse>(sender);
+ UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetRef().GetYdbStatus(), Ydb::StatusIds::SUCCESS);
+ UNIT_ASSERT_VALUES_EQUAL(Format(Canonize(collectedKeys, Order)), ALL);
+ }
+
+
+ Y_UNIT_TEST_SORT(BorderKeys, Order) {
+ TKikimrSettings settings;
+ NKikimrConfig::TAppConfig appConfig;
+ appConfig.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(true);
+ settings.SetDomainRoot(KikimrDefaultUtDomainRoot);
+ TFeatureFlags flags;
+ flags.SetEnablePredicateExtractForScanQueries(true);
+ settings.SetFeatureFlags(flags);
+ settings.SetAppConfig(appConfig);
+
+ TKikimrRunner kikimr(settings);
+
+ auto db = kikimr.GetTableClient();
+
+ auto& server = kikimr.GetTestServer();
+ auto* runtime = server.GetRuntime();
+ Y_UNUSED(runtime);
+ auto kqpProxy = MakeKqpProxyID(runtime->GetNodeId(0));
+
+ auto sender = runtime->AllocateEdgeActor();
+ auto shards = GetTableShards(&server, sender, "/Root/KeyValueLargePartition");
+
+ TVector<ui64> collectedKeys;
+ CollectKeysTo(&collectedKeys, runtime, sender);
+
+ NKikimrTxDataShard::TEvRead evread;
+ evread.SetMaxRowsInResult(12);
+ evread.SetMaxRows(12);
+ InjectRangeEvReadSettings(evread);
+
+ NKikimrTxDataShard::TEvReadAck evreadack;
+ evreadack.SetMaxRows(12);
+ InjectRangeEvReadAckSettings(evreadack);
+
+ auto* shim = new TReadActorPipeCacheStub();
+ shim->SetupCapture(1, 1);
+ shim->SetupResultsCapture(1);
+ InterceptReadActorPipeCache(runtime->Register(shim));
+ SendScanQuery(runtime, kqpProxy, sender, "SELECT Key FROM `/Root/KeyValueLargePartition`" + OrderBy(Order));
+ if (Order == SortOrder::Descending) { // bug in datashard
+ return;
+ }
+
+ shim->ReadsReceived.WaitI();
+ Cerr << "starting split -----------------------------------------------------------" << Endl;
+ SetSplitMergePartCountLimit(runtime, -1);
+ {
+ auto senderSplit = runtime->AllocateEdgeActor();
+ ui64 txId = AsyncSplitTable(&server, senderSplit, "/Root/KeyValueLargePartition", shards.at(0), 402);
+ WaitTxNotification(&server, senderSplit, txId);
+
+ shards = GetTableShards(&server, sender, "/Root/KeyValueLargePartition");
+
+ txId = AsyncSplitTable(&server, senderSplit, "/Root/KeyValueLargePartition", shards.at(1), 404);
+ WaitTxNotification(&server, senderSplit, txId);
+ }
+ Cerr << "resume evread -----------------------------------------------------------" << Endl;
+ shim->SkipAll();
+ shim->AllowResults();
+ shim->SendCaptured(runtime);
+
+ auto reply = runtime->GrabEdgeEventRethrow<TEvKqp::TEvQueryResponse>(sender);
+ UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetRef().GetYdbStatus(), Ydb::StatusIds::SUCCESS);
+ UNIT_ASSERT_VALUES_EQUAL(Format(Canonize(collectedKeys, Order)), ALL);
+ }
+}
+
+
+} // namespace NKqp
+} // namespace NKikimr
diff --git a/ydb/core/kqp/ut/sysview/kqp_sys_view_ut.cpp b/ydb/core/kqp/ut/sysview/kqp_sys_view_ut.cpp
index 17d421c0608..cb8c09b7050 100644
--- a/ydb/core/kqp/ut/sysview/kqp_sys_view_ut.cpp
+++ b/ydb/core/kqp/ut/sysview/kqp_sys_view_ut.cpp
@@ -82,11 +82,12 @@ Y_UNIT_TEST_SUITE(KqpSystemView) {
[[72057594046644480u];[9u];["/Root/BatchUpload"];[5u]];
[[72057594046644480u];[0u];["/Root/KeyValue"];[6u]];
[[72057594046644480u];[0u];["/Root/KeyValue2"];[7u]];
- [[72057594046644480u];[0u];["/Root/Test"];[8u]];
- [[72057594046644480u];[0u];["/Root/Join1"];[9u]];
- [[72057594046644480u];[1u];["/Root/Join1"];[9u]];
- [[72057594046644480u];[0u];["/Root/Join2"];[10u]];
- [[72057594046644480u];[1u];["/Root/Join2"];[10u]]
+ [[72057594046644480u];[0u];["/Root/KeyValueLargePartition"];[8u]];
+ [[72057594046644480u];[0u];["/Root/Test"];[9u]];
+ [[72057594046644480u];[0u];["/Root/Join1"];[10u]];
+ [[72057594046644480u];[1u];["/Root/Join1"];[10u]];
+ [[72057594046644480u];[0u];["/Root/Join2"];[11u]];
+ [[72057594046644480u];[1u];["/Root/Join2"];[11u]]
])", StreamResultToYson(it));
}
@@ -170,16 +171,17 @@ Y_UNIT_TEST_SUITE(KqpSystemView) {
TString query = R"(
SELECT OwnerId, PathId, PartIdx, Path
FROM `/Root/.sys/partition_stats`
- WHERE OwnerId = 72057594046644480ul AND PathId > 5u AND PathId <= 9u
+ WHERE OwnerId = 72057594046644480ul AND PathId > 5u AND PathId <= 10u
ORDER BY PathId, PartIdx;
)";
TString expectedYson = R"([
[[72057594046644480u];[6u];[0u];["/Root/KeyValue"]];
[[72057594046644480u];[7u];[0u];["/Root/KeyValue2"]];
- [[72057594046644480u];[8u];[0u];["/Root/Test"]];
- [[72057594046644480u];[9u];[0u];["/Root/Join1"]];
- [[72057594046644480u];[9u];[1u];["/Root/Join1"]]
+ [[72057594046644480u];[8u];[0u];["/Root/KeyValueLargePartition"]];
+ [[72057594046644480u];[9u];[0u];["/Root/Test"]];
+ [[72057594046644480u];[10u];[0u];["/Root/Join1"]];
+ [[72057594046644480u];[10u];[1u];["/Root/Join1"]]
])";
auto it = client.StreamExecuteScanQuery(query).GetValueSync();
@@ -208,7 +210,7 @@ Y_UNIT_TEST_SUITE(KqpSystemView) {
TString expectedYson = R"([
[[72057594046644480u];[6u];[0u];["/Root/KeyValue"]];
[[72057594046644480u];[7u];[0u];["/Root/KeyValue2"]];
- [[72057594046644480u];[8u];[0u];["/Root/Test"]]
+ [[72057594046644480u];[8u];[0u];["/Root/KeyValueLargePartition"]]
])";
auto it = client.StreamExecuteScanQuery(query).GetValueSync();