diff options
| author | ssmike <[email protected]> | 2023-02-28 17:46:31 +0300 |
|---|---|---|
| committer | ssmike <[email protected]> | 2023-02-28 17:46:31 +0300 |
| commit | d7707d4552a7cafb1f7ea87706ac5aed27d92675 (patch) | |
| tree | 02f90f668a0bde2d616876894aef391cdf64aebe | |
| parent | 429e4a530104658268d005e9d75f32e5537d3668 (diff) | |
Concurrent split tests
| -rw-r--r-- | ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp | 10 | ||||
| -rw-r--r-- | ydb/core/kqp/runtime/kqp_read_actor.cpp | 108 | ||||
| -rw-r--r-- | ydb/core/kqp/runtime/kqp_read_actor.h | 2 | ||||
| -rw-r--r-- | ydb/core/kqp/ut/common/kqp_ut_common.cpp | 31 | ||||
| -rw-r--r-- | ydb/core/kqp/ut/opt/kqp_merge_ut.cpp | 7 | ||||
| -rw-r--r-- | ydb/core/kqp/ut/opt/kqp_ne_ut.cpp | 41 | ||||
| -rw-r--r-- | ydb/core/kqp/ut/scan/CMakeLists.darwin.txt | 1 | ||||
| -rw-r--r-- | ydb/core/kqp/ut/scan/CMakeLists.linux-aarch64.txt | 1 | ||||
| -rw-r--r-- | ydb/core/kqp/ut/scan/CMakeLists.linux.txt | 1 | ||||
| -rw-r--r-- | ydb/core/kqp/ut/scan/kqp_split_ut.cpp | 613 | ||||
| -rw-r--r-- | ydb/core/kqp/ut/sysview/kqp_sys_view_ut.cpp | 22 |
11 files changed, 762 insertions, 75 deletions
diff --git a/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp b/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp index 58f7e350623..a5718063ea2 100644 --- a/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp +++ b/ydb/core/kqp/opt/physical/kqp_opt_phy_sort.cpp @@ -21,7 +21,8 @@ TExprBase KqpRemoveRedundantSortByPkBase( TExprContext& ctx, const TKqpOptimizeContext& kqpCtx, std::function<TMaybe<TTableData>(TExprBase)> tableAccessor, - std::function<TExprBase(TExprBase, NYql::TKqpReadTableSettings)> rebuildInput) + std::function<TExprBase(TExprBase, NYql::TKqpReadTableSettings)> rebuildInput, + bool allowSortForAllTables = false) { auto maybeSort = node.Maybe<TCoSort>(); auto maybeTopSort = node.Maybe<TCoTopSort>(); @@ -128,7 +129,7 @@ TExprBase KqpRemoveRedundantSortByPkBase( bool olapTable = tableDesc.Metadata->Kind == EKikimrTableKind::Olap; if (direction == SortDirectionReverse) { - if (!olapTable && kqpCtx.IsScanQuery()) { + if (!allowSortForAllTables && !olapTable && kqpCtx.IsScanQuery()) { return node; } @@ -141,7 +142,7 @@ TExprBase KqpRemoveRedundantSortByPkBase( input = rebuildInput(input, settings); } else if (direction == SortDirectionForward) { - if (olapTable) { + if (olapTable || allowSortForAllTables) { settings.SetSorted(); input = rebuildInput(input, settings); } @@ -222,7 +223,8 @@ NYql::NNodes::TExprBase KqpRemoveRedundantSortByPkOverSource( [&](TExprBase input, NYql::TKqpReadTableSettings settings) { newSettings.push_back(settings); return input; - }); + }, + /* allowSortForAllTables */ true); if (newExpr.Ptr() != expr.Ptr()) { bodyReplaces[expr.Raw()] = newExpr.Ptr(); } diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp index 34bc047bbbe..2486ce6ec4f 100644 --- a/ydb/core/kqp/runtime/kqp_read_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp @@ -60,6 +60,8 @@ THolder<NKikimr::TEvDataShard::TEvReadAck> DefaultAckSettings() { return result; } +NActors::TActorId PipeCacheId = NKikimr::MakePipePeNodeCacheID(false); + } @@ -113,16 +115,40 @@ public: { } - TTableRange GetBounds() { + TTableRange GetBounds(bool reverse) { if (Ranges.empty()) { YQL_ENSURE(!Points.empty()); - return TTableRange( - Points.front().GetCells(), true, - Points.back().GetCells(), true); + if (reverse) { + return TTableRange( + Points.front().GetCells(), true, + Points[FirstUnprocessedRequest.GetOrElse(Points.size() - 1)].GetCells(), true); + } else { + return TTableRange( + Points[FirstUnprocessedRequest.GetOrElse(0)].GetCells(), true, + Points.back().GetCells(), true); + } } else { - return TTableRange( - Ranges.front().From.GetCells(), Ranges.front().FromInclusive, - Ranges.back().To.GetCells(), Ranges.back().ToInclusive); + if (reverse) { + if (LastKey.empty()) { + return TTableRange( + Ranges.front().From.GetCells(), Ranges.front().FromInclusive, + Ranges[FirstUnprocessedRequest.GetOrElse(Ranges.size() - 1)].To.GetCells(), Ranges.back().ToInclusive); + } else { + return TTableRange( + Ranges.front().From.GetCells(), Ranges.front().FromInclusive, + LastKey, false); + } + } else { + if (LastKey.empty()) { + return TTableRange( + Ranges[FirstUnprocessedRequest.GetOrElse(0)].From.GetCells(), Ranges.front().FromInclusive, + Ranges.back().To.GetCells(), Ranges.back().ToInclusive); + } else { + return TTableRange( + LastKey, false, + Ranges.back().To.GetCells(), Ranges.back().ToInclusive); + } + } } } @@ -191,14 +217,17 @@ public: } if (reverse) { - auto rangeIt = Ranges.begin() + FirstUnprocessedRequest.GetOrElse(Ranges.size()); + auto rangeIt = Ranges.begin() + FirstUnprocessedRequest.GetOrElse(Ranges.size() - 1); if (!lastKeyEmpty) { // It is range, where read was interrupted. Restart operation from last read key. result.emplace_back(std::move(TSerializedTableRange( rangeIt->From.GetBuffer(), TSerializedCellVec::Serialize(LastKey), rangeIt->ToInclusive, false ))); + } else { + ++rangeIt; } + result.insert(result.begin(), Ranges.begin(), rangeIt); } else { auto rangeIt = Ranges.begin() + FirstUnprocessedRequest.GetOrElse(0); @@ -374,6 +403,7 @@ public: bool StartTableScan() { const ui32 maxAllowedInFlight = Settings.GetSorted() ? 1 : MaxInFlight; + CA_LOG_D("effective maxinflight " << maxAllowedInFlight << " sorted " << Settings.GetSorted()); bool isFirst = true; while (!PendingShards.Empty() && RunningReads() + 1 <= maxAllowedInFlight) { if (isFirst) { @@ -409,7 +439,7 @@ public: state->ResolveAttempt++; - auto range = state->GetBounds(); + auto range = state->GetBounds(Settings.GetReverse()); TVector<TKeyDesc::TColumnOp> columns; columns.reserve(Settings.GetColumns().size()); for (const auto& column : Settings.GetColumns()) { @@ -601,11 +631,13 @@ public: state->RetryAttempt += 1; if (state->RetryAttempt >= MAX_SHARD_RETRIES) { + ResetRead(id); return ResolveShard(state); } CA_LOG_D("Retrying read #" << id); SendCancel(id); + ResetRead(id); if (Reads[id].SerializedContinuationToken) { NKikimrTxDataShard::TReadContinuationToken token; @@ -686,7 +718,7 @@ public: << " lockTxId = " << Settings.GetLockTxId()); ReadIdByTabletId[state->TabletId].push_back(id); - Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(ev.Release(), state->TabletId, true), + Send(::PipeCacheId, new TEvPipeCache::TEvForward(ev.Release(), state->TabletId, true), IEventHandle::FlagTrackDelivery); } @@ -698,17 +730,22 @@ public: return; } + for (auto& issue : record.GetStatus().GetIssues()) { + CA_LOG_D("read id #" << id << " got issue " << issue.Getmessage()); + Reads[id].Shard->Issues.push_back(issue); + } switch (record.GetStatus().GetCode()) { case Ydb::StatusIds::SUCCESS: break; case Ydb::StatusIds::OVERLOADED: case Ydb::StatusIds::INTERNAL_ERROR: { - for (auto& issue : record.GetStatus().GetIssues()) { - CA_LOG_D("read id #" << id << " got issue " << issue.Getmessage()); - Reads[id].Shard->Issues.push_back(issue); - } return RetryRead(id); } + case Ydb::StatusIds::NOT_FOUND: { + auto shard = Reads[id].Shard; + ResetRead(id); + return ResolveShard(shard); + } default: { NYql::TIssues issues; NYql::IssuesFromMessage(record.GetStatus().GetIssues(), issues); @@ -735,8 +772,11 @@ public: ReceivedRowCount += ev->Get()->GetRowsCount(); + CA_LOG_D(TStringBuilder() << "new data for read #" << id + << " seqno = " << ev->Get()->Record.GetSeqNo() + << " finished = " << ev->Get()->Record.GetFinished() + << " pushed " << DebugPrintCells(ev->Get())); Results.push({Reads[id].Shard->TabletId, THolder<TEventHandle<TEvDataShard::TEvReadResult>>(ev.Release())}); - CA_LOG_D(TStringBuilder() << "new data for read #" << id << " pushed"); Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex)); } @@ -755,6 +795,13 @@ public: return Reads.size() - ResetReads; } + void ResetRead(size_t id) { + if (Reads[id]) { + Reads[id].Reset(); + ResetReads++; + } + } + ui64 GetInputIndex() const override { return InputIndex; } @@ -826,6 +873,23 @@ public: return stats; } + TString DebugPrintCells(const TEvDataShard::TEvReadResult* result) { + if (result->Record.GetResultFormat() == NKikimrTxDataShard::EScanDataFormat::ARROW) { + return "{ARROW}"; + } + TStringBuilder builder; + TVector<NScheme::TTypeInfo> types; + for (auto& column : Settings.GetColumns()) { + types.push_back(NScheme::TTypeInfo((NScheme::TTypeId)column.GetType())); + } + + for (size_t rowIndex = 0; rowIndex < result->GetRowsCount(); ++rowIndex) { + const auto& row = result->GetCells(rowIndex); + builder << "|" << DebugPrintPoint(types, row, *AppData()->TypeRegistry); + } + return builder; + } + NMiniKQL::TBytesStatistics PackCells(TResult& handle) { auto& [shardId, result, batch, _] = handle; NMiniKQL::TBytesStatistics stats; @@ -918,7 +982,8 @@ public: if (limit) { request->Record.SetMaxRows(*limit); } - Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(request.Release(), state->TabletId, true), + CA_LOG_D("sending ack for read #" << id << " limit " << limit << " seqno = " << record.GetSeqNo()); + Send(::PipeCacheId, new TEvPipeCache::TEvForward(request.Release(), state->TabletId, true), IEventHandle::FlagTrackDelivery); } else { Reads[id].Finished = true; @@ -929,8 +994,7 @@ public: if (!record.GetFinished()) { SendCancel(id); } - Reads[id].Reset(); - ResetReads++; + ResetRead(id); } StartTableScan(); @@ -997,14 +1061,14 @@ public: auto* state = Reads[id].Shard; auto cancel = MakeHolder<TEvDataShard::TEvReadCancel>(); cancel->Record.SetReadId(id); - Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(cancel.Release(), state->TabletId), IEventHandle::FlagTrackDelivery); + Send(::PipeCacheId, new TEvPipeCache::TEvForward(cancel.Release(), state->TabletId), IEventHandle::FlagTrackDelivery); } void PassAway() override { { auto guard = BindAllocator(); Results.clear(); - Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvUnlink(0)); + Send(PipeCacheId, new TEvPipeCache::TEvUnlink(0)); } TBase::PassAway(); } @@ -1101,5 +1165,9 @@ void InjectRangeEvReadAckSettings(const NKikimrTxDataShard::TEvReadAck& ack) { ::DefaultRangeEvReadAckSettings.Data.MergeFrom(ack); } +void InterceptReadActorPipeCache(NActors::TActorId id) { + ::PipeCacheId = id; +} + } // namespace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/runtime/kqp_read_actor.h b/ydb/core/kqp/runtime/kqp_read_actor.h index e543ec456e1..ecc0bcd7467 100644 --- a/ydb/core/kqp/runtime/kqp_read_actor.h +++ b/ydb/core/kqp/runtime/kqp_read_actor.h @@ -15,5 +15,7 @@ void RegisterKqpReadActor(NYql::NDq::TDqAsyncIoFactory& factory); void InjectRangeEvReadSettings(const NKikimrTxDataShard::TEvRead&); void InjectRangeEvReadAckSettings(const NKikimrTxDataShard::TEvReadAck&); +void InterceptReadActorPipeCache(NActors::TActorId); + } // namespace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/ut/common/kqp_ut_common.cpp b/ydb/core/kqp/ut/common/kqp_ut_common.cpp index c5fbccf9f6b..51d20a75abc 100644 --- a/ydb/core/kqp/ut/common/kqp_ut_common.cpp +++ b/ydb/core/kqp/ut/common/kqp_ut_common.cpp @@ -256,6 +256,11 @@ void TKikimrRunner::CreateSampleTables() { PRIMARY KEY (Key) ); + CREATE TABLE `KeyValueLargePartition` ( + Key Uint64, + Value String, + PRIMARY KEY (Key) + ); CREATE TABLE `Test` ( Group Uint32, @@ -290,6 +295,32 @@ void TKikimrRunner::CreateSampleTables() { AssertSuccessResult(session.ExecuteDataQuery(R"( + REPLACE INTO `KeyValueLargePartition` (Key, Value) VALUES + (101u, "Value1"), + (102u, "Value2"), + (103u, "Value3"), + (201u, "Value1"), + (202u, "Value2"), + (203u, "Value3"), + (301u, "Value1"), + (302u, "Value2"), + (303u, "Value3"), + (401u, "Value1"), + (402u, "Value2"), + (403u, "Value3"), + (501u, "Value1"), + (502u, "Value2"), + (503u, "Value3"), + (601u, "Value1"), + (602u, "Value2"), + (603u, "Value3"), + (701u, "Value1"), + (702u, "Value2"), + (703u, "Value3"), + (801u, "Value1"), + (802u, "Value2"), + (803u, "Value3"); + REPLACE INTO `TwoShard` (Key, Value1, Value2) VALUES (1u, "One", -1), (2u, "Two", 0), diff --git a/ydb/core/kqp/ut/opt/kqp_merge_ut.cpp b/ydb/core/kqp/ut/opt/kqp_merge_ut.cpp index 5ad531a8d71..ed50117f361 100644 --- a/ydb/core/kqp/ut/opt/kqp_merge_ut.cpp +++ b/ydb/core/kqp/ut/opt/kqp_merge_ut.cpp @@ -442,7 +442,12 @@ Y_UNIT_TEST_SUITE(KqpMergeCn) { } Y_UNIT_TEST(SortBy_PK_Uint64_Desc) { - TKikimrRunner kikimr; + NKikimrConfig::TAppConfig app; + app.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(false); + TKikimrSettings ksettings; + ksettings.SetAppConfig(app); + + TKikimrRunner kikimr{ksettings}; auto db = kikimr.GetTableClient(); CreateSimpleDataTypes(kikimr); diff --git a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp index eaf7acb1c05..dd10e979f63 100644 --- a/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp +++ b/ydb/core/kqp/ut/opt/kqp_ne_ut.cpp @@ -3566,45 +3566,6 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); - AssertSuccessResult(session.ExecuteSchemeQuery(R"( - --!syntax_v1 - CREATE TABLE `KeyValueLimit` ( - Key Uint64, - Value String, - PRIMARY KEY (Key) - ); - )").GetValueSync()); - - AssertSuccessResult(session.ExecuteDataQuery(R"( - - REPLACE INTO `KeyValueLimit` (Key, Value) VALUES - (101u, "Value1"), - (102u, "Value2"), - (103u, "Value3"), - (201u, "Value1"), - (202u, "Value2"), - (203u, "Value3"), - (301u, "Value1"), - (302u, "Value2"), - (303u, "Value3"), - (401u, "Value1"), - (402u, "Value2"), - (403u, "Value3"), - (501u, "Value1"), - (502u, "Value2"), - (503u, "Value3"), - (601u, "Value1"), - (602u, "Value2"), - (603u, "Value3"), - (701u, "Value1"), - (702u, "Value2"), - (703u, "Value3"), - (801u, "Value1"), - (802u, "Value2"), - (803u, "Value3"); - - )", TTxControl::BeginTx(TTxSettings::SerializableRW()).CommitTx()).GetValueSync()); - NKikimrTxDataShard::TEvRead evread; evread.SetMaxRowsInResult(2); InjectRangeEvReadSettings(evread); @@ -3614,7 +3575,7 @@ Y_UNIT_TEST_SUITE(KqpNewEngine) { { auto result = session.ExecuteDataQuery(R"( - SELECT Key, Value FROM `/Root/KeyValueLimit` WHERE Key >= 202 ORDER BY Key LIMIT 5; + SELECT Key, Value FROM `/Root/KeyValueLargePartition` WHERE Key >= 202 ORDER BY Key LIMIT 5; )", TTxControl::BeginTx().CommitTx()).GetValueSync(); UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); CompareYson(R"([[[202u];["Value2"]];[[203u];["Value3"]];[[301u];["Value1"]];[[302u];["Value2"]];[[303u];["Value3"]]])", FormatResultSetYson(result.GetResultSet(0))); diff --git a/ydb/core/kqp/ut/scan/CMakeLists.darwin.txt b/ydb/core/kqp/ut/scan/CMakeLists.darwin.txt index f8041e91699..79de1eb35bf 100644 --- a/ydb/core/kqp/ut/scan/CMakeLists.darwin.txt +++ b/ydb/core/kqp/ut/scan/CMakeLists.darwin.txt @@ -35,6 +35,7 @@ target_link_options(ydb-core-kqp-ut-scan PRIVATE target_sources(ydb-core-kqp-ut-scan PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/scan/kqp_flowcontrol_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/scan/kqp_split_ut.cpp ) set_property( TARGET diff --git a/ydb/core/kqp/ut/scan/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/ut/scan/CMakeLists.linux-aarch64.txt index 4a4cfd5d4b4..99f167928d0 100644 --- a/ydb/core/kqp/ut/scan/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/ut/scan/CMakeLists.linux-aarch64.txt @@ -37,6 +37,7 @@ target_link_options(ydb-core-kqp-ut-scan PRIVATE target_sources(ydb-core-kqp-ut-scan PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/scan/kqp_flowcontrol_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/scan/kqp_split_ut.cpp ) set_property( TARGET diff --git a/ydb/core/kqp/ut/scan/CMakeLists.linux.txt b/ydb/core/kqp/ut/scan/CMakeLists.linux.txt index ee691944460..bd7ff2f3bcd 100644 --- a/ydb/core/kqp/ut/scan/CMakeLists.linux.txt +++ b/ydb/core/kqp/ut/scan/CMakeLists.linux.txt @@ -39,6 +39,7 @@ target_link_options(ydb-core-kqp-ut-scan PRIVATE target_sources(ydb-core-kqp-ut-scan PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/scan/kqp_flowcontrol_ut.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/scan/kqp_scan_ut.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/ut/scan/kqp_split_ut.cpp ) set_property( TARGET diff --git a/ydb/core/kqp/ut/scan/kqp_split_ut.cpp b/ydb/core/kqp/ut/scan/kqp_split_ut.cpp new file mode 100644 index 00000000000..45e35a3982f --- /dev/null +++ b/ydb/core/kqp/ut/scan/kqp_split_ut.cpp @@ -0,0 +1,613 @@ +#include <ydb/core/kqp/ut/common/kqp_ut_common.h> +#include <ydb/core/kqp/counters/kqp_counters.h> +#include <ydb/core/tx/scheme_cache/scheme_cache.h> + +#include <ydb/core/base/tablet_pipecache.h> +#include <ydb/core/kqp/runtime/kqp_read_actor.h> + +#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h> + +#include <util/generic/size_literals.h> +#include <ydb/core/kqp/common/kqp.h> +#include <ydb/core/kqp/executer_actor/kqp_executer.h> + +#include <ydb/core/tx/tx_proxy/proxy.h> +#include <ydb/core/tx/schemeshard/schemeshard.h> + +namespace NKikimr { +namespace NKqp { + + +Y_UNIT_TEST_SUITE(KqpSplit) { + static ui64 RunSchemeTx( + TTestActorRuntimeBase& runtime, + THolder<TEvTxUserProxy::TEvProposeTransaction>&& request, + TActorId sender = {}, + bool viaActorSystem = false, + TEvTxUserProxy::TEvProposeTransactionStatus::EStatus expectedStatus = TEvTxUserProxy::TEvProposeTransactionStatus::EStatus::ExecInProgress) + { + if (!sender) { + sender = runtime.AllocateEdgeActor(); + } + + runtime.Send(new IEventHandle(MakeTxProxyID(), sender, request.Release()), 0, viaActorSystem); + auto ev = runtime.GrabEdgeEventRethrow<TEvTxUserProxy::TEvProposeTransactionStatus>(sender); + Cerr << (TStringBuilder() << "scheme op " << ev->Get()->Record.ShortDebugString()) << Endl; + UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetStatus(), expectedStatus); + + return ev->Get()->Record.GetTxId(); + } + + ui64 AsyncSplitTable( + Tests::TServer* server, + TActorId sender, + const TString& path, + ui64 sourceTablet, + ui64 splitKey) + { + auto request = MakeHolder<TEvTxUserProxy::TEvProposeTransaction>(); + request->Record.SetExecTimeoutPeriod(Max<ui64>()); + + auto& tx = *request->Record.MutableTransaction()->MutableModifyScheme(); + tx.SetOperationType(NKikimrSchemeOp::ESchemeOpSplitMergeTablePartitions); + + auto& desc = *request->Record.MutableTransaction()->MutableModifyScheme()->MutableSplitMergeTablePartitions(); + desc.SetTablePath(path); + desc.AddSourceTabletId(sourceTablet); + desc.AddSplitBoundary()->MutableKeyPrefix()->AddTuple()->MutableOptional()->SetUint64(splitKey); + + return RunSchemeTx(*server->GetRuntime(), std::move(request), sender, true); + } + + void WaitTxNotification(Tests::TServer* server, TActorId sender, ui64 txId) { + auto &runtime = *server->GetRuntime(); + auto &settings = server->GetSettings(); + + auto request = MakeHolder<NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletion>(); + request->Record.SetTxId(txId); + auto tid = NKikimr::Tests::ChangeStateStorage(NKikimr::Tests::SchemeRoot, settings.Domain); + runtime.SendToPipe(tid, sender, request.Release(), 0, GetPipeConfigWithRetries()); + runtime.GrabEdgeEventRethrow<NSchemeShard::TEvSchemeShard::TEvNotifyTxCompletionResult>(sender); + } + + NKikimrScheme::TEvDescribeSchemeResult DescribeTable(Tests::TServer* server, + TActorId sender, + const TString &path) + { + auto &runtime = *server->GetRuntime(); + TAutoPtr<IEventHandle> handle; + TVector<ui64> shards; + + auto request = MakeHolder<TEvTxUserProxy::TEvNavigate>(); + request->Record.MutableDescribePath()->SetPath(path); + request->Record.MutableDescribePath()->MutableOptions()->SetShowPrivateTable(true); + runtime.Send(new IEventHandle(MakeTxProxyID(), sender, request.Release())); + auto reply = runtime.GrabEdgeEventRethrow<NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult>(handle); + + return *reply->MutableRecord(); + } + + TVector<ui64> GetTableShards(Tests::TServer* server, + TActorId sender, + const TString &path) + { + TVector<ui64> shards; + auto lsResult = DescribeTable(server, sender, path); + for (auto &part : lsResult.GetPathDescription().GetTablePartitions()) + shards.push_back(part.GetDatashardId()); + + return shards; + } + + i64 SetSplitMergePartCountLimit(TTestActorRuntime* runtime, i64 val) { + TAtomic prev; + runtime->GetAppData().Icb->SetValue("SchemeShard_SplitMergePartCountLimit", val, prev); + return prev; + } + + class TReplyPipeStub : public TActor<TReplyPipeStub> { + public: + TReplyPipeStub(TActorId owner, TActorId client) + : TActor<TReplyPipeStub>(&TReplyPipeStub::State) + , Owner(owner) + , Client(client) + { + } + + STATEFN(State) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvPipeCache::TEvForward, Handle); + hFunc(TEvPipeCache::TEvUnlink, Handle); + hFunc(TEvDataShard::TEvReadResult, Handle); + default: + Handle(ev); + } + } + + void Handle(TAutoPtr<IEventHandle> ev) { + Send(Client, ev->ReleaseBase()); + } + + void Handle(TEvDataShard::TEvReadResult::TPtr& ev) { + if (ToSkip.fetch_sub(1) <= 0 && ToCapture.fetch_sub(1) > 0) { + Cerr << "captured evreadresult -----------------------------------------------------------" << Endl; + with_lock(CaptureLock) { + Captured.push_back(THolder(ev.Release())); + } + if (ToCapture.load() <= 0) { + ReadsReceived.Signal(); + } + return; + } + Send(Client, ev->ReleaseBase()); + } + + void Handle(TEvPipeCache::TEvForward::TPtr& ev) { + Send(PipeCache, ev->Release()); + } + + void Handle(TEvPipeCache::TEvUnlink::TPtr& ev) { + Send(PipeCache, ev->Release()); + } + + void SetupCapture(i64 skip, i64 capture) { + ToCapture.store(capture); + ToSkip.store(skip); + ReadsReceived.Reset(); + } + + void SendCaptured(NActors::TTestActorRuntime* runtime) { + TVector<THolder<IEventHandle>> tosend; + with_lock(CaptureLock) { + tosend.swap(Captured); + } + for (auto& ev : tosend) { + ev->Rewrite(ev->GetTypeRewrite(), Client); + runtime->Send(ev.Release()); + } + } + + private: + TActorId PipeCache = MakePipePeNodeCacheID(false); + TActorId Owner; + TActorId Client; + + TMutex CaptureLock; + TVector<THolder<IEventHandle>> Captured; + TManualEvent ReadsReceived; + + std::atomic<i64> ToCapture; + std::atomic<i64> ToSkip; + }; + + class TReadActorPipeCacheStub : public TActor<TReadActorPipeCacheStub> { + public: + TReadActorPipeCacheStub() + : TActor<TReadActorPipeCacheStub>(&TReadActorPipeCacheStub::State) + { + SkipAll(); + AllowResults(); + } + + void SetupResultsCapture(i64 skip, i64 capture = std::numeric_limits<i64>::max()) { + ReverseSkip.store(skip); + ReverseCapture.store(capture); + for (auto& [_, pipe] : Pipes) { + pipe->SetupCapture(ReverseSkip.load(), ReverseCapture.load()); + } + } + + void AllowResults() { + SetupResultsCapture(std::numeric_limits<i64>::max(), 0); + } + + void SetupCapture(i64 skip, i64 capture = std::numeric_limits<i64>::max()) { + ToCapture.store(capture); + ToSkip.store(skip); + ReadsReceived.Reset(); + } + + void SkipAll() { + SetupCapture(std::numeric_limits<i64>::max(), 0); + } + + void State(TAutoPtr<::NActors::IEventHandle> &ev, const ::NActors::TActorContext &ctx) { + Y_UNUSED(ctx); + if (ev->GetTypeRewrite() == TEvPipeCache::TEvForward::EventType) { + auto* forw = reinterpret_cast<TEvPipeCache::TEvForward::TPtr*>(&ev); + auto readtype = TEvDataShard::TEvRead::EventType; + auto acktype = TEvDataShard::TEvReadAck::EventType; + auto actual = forw->Get()->Get()->Ev->Type(); + bool isRead = actual == readtype || acktype; + if (isRead && ToSkip.fetch_sub(1) <= 0 && ToCapture.fetch_sub(1) > 0) { + Cerr << "captured evread -----------------------------------------------------------" << Endl; + with_lock(CaptureLock) { + Captured.push_back(THolder(ev.Release())); + } + if (ToCapture.load() <= 0) { + ReadsReceived.Signal(); + } + return; + } + } + Forward(ev); + } + + void Forward(TAutoPtr<::NActors::IEventHandle> ev) { + TReplyPipeStub* pipe = Pipes[ev->Sender]; + if (pipe == nullptr) { + pipe = Pipes[ev->Sender] = new TReplyPipeStub(SelfId(), ev->Sender); + Register(pipe); + for (auto& [_, pipe] : Pipes) { + pipe->SetupCapture(ReverseSkip.load(), ReverseCapture.load()); + } + } + auto id = pipe->SelfId(); + Send(id, ev->ReleaseBase()); + } + + void SendCaptured(NActors::TTestActorRuntime* runtime, bool sendResults = true) { + TVector<THolder<IEventHandle>> tosend; + with_lock(CaptureLock) { + tosend.swap(Captured); + } + for (auto& ev : tosend) { + TReplyPipeStub* pipe = Pipes[ev->Sender]; + if (pipe == nullptr) { + pipe = Pipes[ev->Sender] = new TReplyPipeStub(SelfId(), ev->Sender); + runtime->Register(pipe); + for (auto& [_, pipe] : Pipes) { + pipe->SetupCapture(ReverseSkip.load(), ReverseCapture.load()); + } + } + auto id = pipe->SelfId(); + ev->Rewrite(ev->GetTypeRewrite(), id); + runtime->Send(ev.Release()); + } + if (sendResults) { + for (auto& [_, pipe] : Pipes) { + pipe->SendCaptured(runtime); + } + } + } + + public: + TManualEvent ReadsReceived; + std::atomic<i64> ToCapture; + std::atomic<i64> ToSkip; + + std::atomic<i64> ReverseCapture; + std::atomic<i64> ReverseSkip; + + TMutex CaptureLock; + TVector<THolder<IEventHandle>> Captured; + THashMap<TActorId, TReplyPipeStub*> Pipes; + }; + + TString ALL = ",101,102,103,201,202,203,301,302,303,401,402,403,501,502,503,601,602,603,701,702,703,801,802,803"; + TString Format(TVector<ui64> keys) { + TStringBuilder res; + for (auto k : keys) { + res << "," << k; + } + return res; + } + + void SendScanQuery(TTestActorRuntime* runtime, TActorId kqpProxy, TActorId sender, const TString& queryText) { + auto ev = std::make_unique<NKqp::TEvKqp::TEvQueryRequest>(); + ev->Record.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); + ev->Record.MutableRequest()->SetType(NKikimrKqp::QUERY_TYPE_SQL_SCAN); + ev->Record.MutableRequest()->SetQuery(queryText); + ev->Record.MutableRequest()->SetKeepSession(false); + ActorIdToProto(sender, ev->Record.MutableRequestActorId()); + runtime->Send(new IEventHandle(kqpProxy, sender, ev.release())); + }; + + void CollectKeysTo(TVector<ui64>* collectedKeys, TTestActorRuntime* runtime, TActorId sender) { + auto captureEvents = [=](TTestActorRuntimeBase&, TAutoPtr<IEventHandle> &ev) { + if (ev->GetTypeRewrite() == NKqp::TEvKqpExecuter::TEvStreamData::EventType) { + auto& record = ev->Get<NKqp::TEvKqpExecuter::TEvStreamData>()->Record; + for (auto& row : record.resultset().rows()) { + collectedKeys->push_back(row.items(0).uint64_value()); + } + + auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>(); + resp->Record.SetEnough(false); + resp->Record.SetSeqNo(record.GetSeqNo()); + runtime->Send(new IEventHandle(ev->Sender, sender, resp.Release())); + return true; + } + + return false; + }; + runtime->SetEventFilter(captureEvents); + } + + enum class SortOrder { + Descending, + Ascending, + Unspecified + }; + + TString OrderBy(SortOrder o) { + if (o == SortOrder::Ascending) { + return " ORDER BY Key "; + } + if (o == SortOrder::Descending) { + return " ORDER BY Key DESC "; + } + return " "; + } + + TVector<ui64> Canonize(TVector<ui64> collectedKeys, SortOrder o) { + if (o == SortOrder::Unspecified) { + Sort(collectedKeys); + } + if (o == SortOrder::Descending) { + Reverse(collectedKeys.begin(), collectedKeys.end()); + } + return collectedKeys; + } + +#define Y_UNIT_TEST_SORT(N, OPT) \ + template <SortOrder OPT> \ + struct TTestCase##N : public TCurrentTestCase { \ + TTestCase##N() : TCurrentTestCase() { \ + if constexpr (OPT == SortOrder::Descending) { Name_ = #N "+Descending"; } \ + if constexpr (OPT == SortOrder::Ascending) { Name_ = #N "+Ascending"; } \ + if constexpr (OPT == SortOrder::Unspecified) { Name_ = #N "+Unspecified"; } \ + } \ + \ + static THolder<NUnitTest::TBaseTestCase> Create() { return ::MakeHolder<TTestCase##N<Order>>(); } \ + void Execute_(NUnitTest::TTestContext&) override; \ + }; \ + struct TTestRegistration##N { \ + TTestRegistration##N() { \ + TCurrentTest::AddTest(TTestCase##N<SortOrder::Ascending>::Create); \ + TCurrentTest::AddTest(TTestCase##N<SortOrder::Descending>::Create); \ + TCurrentTest::AddTest(TTestCase##N<SortOrder::Unspecified>::Create); \ + } \ + }; \ + static TTestRegistration##N testRegistration##N; \ + template <SortOrder OPT> \ + void TTestCase##N<OPT>::Execute_(NUnitTest::TTestContext& ut_context Y_DECLARE_UNUSED) + + Y_UNIT_TEST_SORT(AfterResolve, Order) { + TKikimrSettings settings; + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(true); + settings.SetDomainRoot(KikimrDefaultUtDomainRoot); + TFeatureFlags flags; + flags.SetEnablePredicateExtractForScanQueries(true); + settings.SetFeatureFlags(flags); + settings.SetAppConfig(appConfig); + + TKikimrRunner kikimr(settings); + + auto db = kikimr.GetTableClient(); + + auto& server = kikimr.GetTestServer(); + auto* runtime = server.GetRuntime(); + Y_UNUSED(runtime); + auto kqpProxy = MakeKqpProxyID(runtime->GetNodeId(0)); + + auto sender = runtime->AllocateEdgeActor(); + auto shards = GetTableShards(&server, sender, "/Root/KeyValueLargePartition"); + + TVector<ui64> collectedKeys; + CollectKeysTo(&collectedKeys, runtime, sender); + + auto* shim = new TReadActorPipeCacheStub(); + InterceptReadActorPipeCache(runtime->Register(shim)); + shim->SetupCapture(0, 1); + SendScanQuery(runtime, kqpProxy, sender, "SELECT Key FROM `/Root/KeyValueLargePartition`" + OrderBy(Order)); + + shim->ReadsReceived.WaitI(); + Cerr << "starting split -----------------------------------------------------------" << Endl; + SetSplitMergePartCountLimit(runtime, -1); + { + auto senderSplit = runtime->AllocateEdgeActor(); + ui64 txId = AsyncSplitTable(&server, senderSplit, "/Root/KeyValueLargePartition", shards.at(0), 400); + WaitTxNotification(&server, senderSplit, txId); + } + Cerr << "resume evread -----------------------------------------------------------" << Endl; + shim->SkipAll(); + shim->SendCaptured(runtime); + + auto reply = runtime->GrabEdgeEventRethrow<TEvKqp::TEvQueryResponse>(sender); + UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetRef().GetYdbStatus(), Ydb::StatusIds::SUCCESS); + UNIT_ASSERT_VALUES_EQUAL(Format(Canonize(collectedKeys, Order)), ALL); + } + + Y_UNIT_TEST_SORT(AfterResult, Order) { + TKikimrSettings settings; + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(true); + settings.SetDomainRoot(KikimrDefaultUtDomainRoot); + TFeatureFlags flags; + flags.SetEnablePredicateExtractForScanQueries(true); + settings.SetFeatureFlags(flags); + settings.SetAppConfig(appConfig); + + TKikimrRunner kikimr(settings); + + auto db = kikimr.GetTableClient(); + + auto& server = kikimr.GetTestServer(); + auto* runtime = server.GetRuntime(); + Y_UNUSED(runtime); + auto kqpProxy = MakeKqpProxyID(runtime->GetNodeId(0)); + + auto sender = runtime->AllocateEdgeActor(); + auto shards = GetTableShards(&server, sender, "/Root/KeyValueLargePartition"); + + TVector<ui64> collectedKeys; + CollectKeysTo(&collectedKeys, runtime, sender); + + NKikimrTxDataShard::TEvRead evread; + evread.SetMaxRowsInResult(8); + evread.SetMaxRows(8); + InjectRangeEvReadSettings(evread); + + NKikimrTxDataShard::TEvReadAck evreadack; + evreadack.SetMaxRows(8); + InjectRangeEvReadAckSettings(evreadack); + + auto* shim = new TReadActorPipeCacheStub(); + shim->SetupCapture(1, 1); + shim->SetupResultsCapture(1); + InterceptReadActorPipeCache(runtime->Register(shim)); + SendScanQuery(runtime, kqpProxy, sender, "SELECT Key FROM `/Root/KeyValueLargePartition`" + OrderBy(Order)); + + shim->ReadsReceived.WaitI(); + Cerr << "starting split -----------------------------------------------------------" << Endl; + SetSplitMergePartCountLimit(runtime, -1); + { + auto senderSplit = runtime->AllocateEdgeActor(); + ui64 txId = AsyncSplitTable(&server, senderSplit, "/Root/KeyValueLargePartition", shards.at(0), 400); + WaitTxNotification(&server, senderSplit, txId); + } + Cerr << "resume evread -----------------------------------------------------------" << Endl; + shim->SkipAll(); + shim->AllowResults(); + shim->SendCaptured(runtime); + + auto reply = runtime->GrabEdgeEventRethrow<TEvKqp::TEvQueryResponse>(sender); + UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetRef().GetYdbStatus(), Ydb::StatusIds::SUCCESS); + if (Order != SortOrder::Descending) { // bug in datashard + UNIT_ASSERT_VALUES_EQUAL(Format(Canonize(collectedKeys, Order)), ALL); + } + } + + Y_UNIT_TEST_SORT(ChoosePartition, Order) { + TKikimrSettings settings; + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(true); + settings.SetDomainRoot(KikimrDefaultUtDomainRoot); + TFeatureFlags flags; + flags.SetEnablePredicateExtractForScanQueries(true); + settings.SetFeatureFlags(flags); + settings.SetAppConfig(appConfig); + + TKikimrRunner kikimr(settings); + + auto db = kikimr.GetTableClient(); + + auto& server = kikimr.GetTestServer(); + auto* runtime = server.GetRuntime(); + Y_UNUSED(runtime); + auto kqpProxy = MakeKqpProxyID(runtime->GetNodeId(0)); + + auto sender = runtime->AllocateEdgeActor(); + auto shards = GetTableShards(&server, sender, "/Root/KeyValueLargePartition"); + + TVector<ui64> collectedKeys; + CollectKeysTo(&collectedKeys, runtime, sender); + + NKikimrTxDataShard::TEvRead evread; + evread.SetMaxRowsInResult(8); + evread.SetMaxRows(8); + InjectRangeEvReadSettings(evread); + + NKikimrTxDataShard::TEvReadAck evreadack; + evreadack.SetMaxRows(8); + InjectRangeEvReadAckSettings(evreadack); + + auto* shim = new TReadActorPipeCacheStub(); + shim->SetupCapture(2, 1); + shim->SetupResultsCapture(2); + InterceptReadActorPipeCache(runtime->Register(shim)); + SendScanQuery(runtime, kqpProxy, sender, "SELECT Key FROM `/Root/KeyValueLargePartition`" + OrderBy(Order)); + if (Order == SortOrder::Descending) { // bug in datashard + return; + } + + shim->ReadsReceived.WaitI(); + Cerr << "starting split -----------------------------------------------------------" << Endl; + SetSplitMergePartCountLimit(runtime, -1); + { + auto senderSplit = runtime->AllocateEdgeActor(); + ui64 txId = AsyncSplitTable(&server, senderSplit, "/Root/KeyValueLargePartition", shards.at(0), 400); + WaitTxNotification(&server, senderSplit, txId); + } + Cerr << "resume evread -----------------------------------------------------------" << Endl; + shim->SkipAll(); + shim->AllowResults(); + shim->SendCaptured(runtime); + + auto reply = runtime->GrabEdgeEventRethrow<TEvKqp::TEvQueryResponse>(sender); + UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetRef().GetYdbStatus(), Ydb::StatusIds::SUCCESS); + UNIT_ASSERT_VALUES_EQUAL(Format(Canonize(collectedKeys, Order)), ALL); + } + + + Y_UNIT_TEST_SORT(BorderKeys, Order) { + TKikimrSettings settings; + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(true); + settings.SetDomainRoot(KikimrDefaultUtDomainRoot); + TFeatureFlags flags; + flags.SetEnablePredicateExtractForScanQueries(true); + settings.SetFeatureFlags(flags); + settings.SetAppConfig(appConfig); + + TKikimrRunner kikimr(settings); + + auto db = kikimr.GetTableClient(); + + auto& server = kikimr.GetTestServer(); + auto* runtime = server.GetRuntime(); + Y_UNUSED(runtime); + auto kqpProxy = MakeKqpProxyID(runtime->GetNodeId(0)); + + auto sender = runtime->AllocateEdgeActor(); + auto shards = GetTableShards(&server, sender, "/Root/KeyValueLargePartition"); + + TVector<ui64> collectedKeys; + CollectKeysTo(&collectedKeys, runtime, sender); + + NKikimrTxDataShard::TEvRead evread; + evread.SetMaxRowsInResult(12); + evread.SetMaxRows(12); + InjectRangeEvReadSettings(evread); + + NKikimrTxDataShard::TEvReadAck evreadack; + evreadack.SetMaxRows(12); + InjectRangeEvReadAckSettings(evreadack); + + auto* shim = new TReadActorPipeCacheStub(); + shim->SetupCapture(1, 1); + shim->SetupResultsCapture(1); + InterceptReadActorPipeCache(runtime->Register(shim)); + SendScanQuery(runtime, kqpProxy, sender, "SELECT Key FROM `/Root/KeyValueLargePartition`" + OrderBy(Order)); + if (Order == SortOrder::Descending) { // bug in datashard + return; + } + + shim->ReadsReceived.WaitI(); + Cerr << "starting split -----------------------------------------------------------" << Endl; + SetSplitMergePartCountLimit(runtime, -1); + { + auto senderSplit = runtime->AllocateEdgeActor(); + ui64 txId = AsyncSplitTable(&server, senderSplit, "/Root/KeyValueLargePartition", shards.at(0), 402); + WaitTxNotification(&server, senderSplit, txId); + + shards = GetTableShards(&server, sender, "/Root/KeyValueLargePartition"); + + txId = AsyncSplitTable(&server, senderSplit, "/Root/KeyValueLargePartition", shards.at(1), 404); + WaitTxNotification(&server, senderSplit, txId); + } + Cerr << "resume evread -----------------------------------------------------------" << Endl; + shim->SkipAll(); + shim->AllowResults(); + shim->SendCaptured(runtime); + + auto reply = runtime->GrabEdgeEventRethrow<TEvKqp::TEvQueryResponse>(sender); + UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetRef().GetYdbStatus(), Ydb::StatusIds::SUCCESS); + UNIT_ASSERT_VALUES_EQUAL(Format(Canonize(collectedKeys, Order)), ALL); + } +} + + +} // namespace NKqp +} // namespace NKikimr diff --git a/ydb/core/kqp/ut/sysview/kqp_sys_view_ut.cpp b/ydb/core/kqp/ut/sysview/kqp_sys_view_ut.cpp index 17d421c0608..cb8c09b7050 100644 --- a/ydb/core/kqp/ut/sysview/kqp_sys_view_ut.cpp +++ b/ydb/core/kqp/ut/sysview/kqp_sys_view_ut.cpp @@ -82,11 +82,12 @@ Y_UNIT_TEST_SUITE(KqpSystemView) { [[72057594046644480u];[9u];["/Root/BatchUpload"];[5u]]; [[72057594046644480u];[0u];["/Root/KeyValue"];[6u]]; [[72057594046644480u];[0u];["/Root/KeyValue2"];[7u]]; - [[72057594046644480u];[0u];["/Root/Test"];[8u]]; - [[72057594046644480u];[0u];["/Root/Join1"];[9u]]; - [[72057594046644480u];[1u];["/Root/Join1"];[9u]]; - [[72057594046644480u];[0u];["/Root/Join2"];[10u]]; - [[72057594046644480u];[1u];["/Root/Join2"];[10u]] + [[72057594046644480u];[0u];["/Root/KeyValueLargePartition"];[8u]]; + [[72057594046644480u];[0u];["/Root/Test"];[9u]]; + [[72057594046644480u];[0u];["/Root/Join1"];[10u]]; + [[72057594046644480u];[1u];["/Root/Join1"];[10u]]; + [[72057594046644480u];[0u];["/Root/Join2"];[11u]]; + [[72057594046644480u];[1u];["/Root/Join2"];[11u]] ])", StreamResultToYson(it)); } @@ -170,16 +171,17 @@ Y_UNIT_TEST_SUITE(KqpSystemView) { TString query = R"( SELECT OwnerId, PathId, PartIdx, Path FROM `/Root/.sys/partition_stats` - WHERE OwnerId = 72057594046644480ul AND PathId > 5u AND PathId <= 9u + WHERE OwnerId = 72057594046644480ul AND PathId > 5u AND PathId <= 10u ORDER BY PathId, PartIdx; )"; TString expectedYson = R"([ [[72057594046644480u];[6u];[0u];["/Root/KeyValue"]]; [[72057594046644480u];[7u];[0u];["/Root/KeyValue2"]]; - [[72057594046644480u];[8u];[0u];["/Root/Test"]]; - [[72057594046644480u];[9u];[0u];["/Root/Join1"]]; - [[72057594046644480u];[9u];[1u];["/Root/Join1"]] + [[72057594046644480u];[8u];[0u];["/Root/KeyValueLargePartition"]]; + [[72057594046644480u];[9u];[0u];["/Root/Test"]]; + [[72057594046644480u];[10u];[0u];["/Root/Join1"]]; + [[72057594046644480u];[10u];[1u];["/Root/Join1"]] ])"; auto it = client.StreamExecuteScanQuery(query).GetValueSync(); @@ -208,7 +210,7 @@ Y_UNIT_TEST_SUITE(KqpSystemView) { TString expectedYson = R"([ [[72057594046644480u];[6u];[0u];["/Root/KeyValue"]]; [[72057594046644480u];[7u];[0u];["/Root/KeyValue2"]]; - [[72057594046644480u];[8u];[0u];["/Root/Test"]] + [[72057594046644480u];[8u];[0u];["/Root/KeyValueLargePartition"]] ])"; auto it = client.StreamExecuteScanQuery(query).GetValueSync(); |
