aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorArtem Alekseev <fexolm@ydb.tech>2024-11-11 16:27:26 +0300
committerGitHub <noreply@github.com>2024-11-11 16:27:26 +0300
commit9b3c2bd67eff7b0e1822f7645ffc2a306af4a77c (patch)
tree064b1806df8be75c24fb7751acc8c8fedaf79ae4
parent02d2031a62659ea9d85aa481628436a1d0c1480c (diff)
downloadydb-9b3c2bd67eff7b0e1822f7645ffc2a306af4a77c.tar.gz
Remove destination session after partitioning finish (#11411)
-rw-r--r--.github/config/muted_ya.txt7
-rw-r--r--ydb/core/kqp/ut/olap/blobs_sharing_ut.cpp133
-rw-r--r--ydb/core/tx/columnshard/transactions/operators/sharing.cpp15
-rw-r--r--ydb/core/tx/columnshard/transactions/operators/sharing.h1
4 files changed, 147 insertions, 9 deletions
diff --git a/.github/config/muted_ya.txt b/.github/config/muted_ya.txt
index 1c49a8d7d99..889063bbc58 100644
--- a/.github/config/muted_ya.txt
+++ b/.github/config/muted_ya.txt
@@ -24,6 +24,13 @@ ydb/core/kqp/ut/olap KqpOlapBlobsSharing.BlobsSharingSplit1_1_clean_with_restart
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.TableReshardingConsistency64
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.TableReshardingModuloN
ydb/core/kqp/ut/olap KqpOlapBlobsSharing.UpsertWhileSplitTest
+ydb/core/kqp/ut/olap KqpOlapBlobsSharing.MultipleMerge
+ydb/core/kqp/ut/olap KqpOlapBlobsSharing.MultipleSplits
+ydb/core/kqp/ut/olap KqpOlapBlobsSharing.MultipleSplitsThenMerges
+ydb/core/kqp/ut/olap KqpOlapBlobsSharing.MultipleSplitsWithRestartsAfterWait
+ydb/core/kqp/ut/olap KqpOlapBlobsSharing.MultipleSplitsWithRestartsWhenWait
+ydb/core/kqp/ut/olap KqpOlapBlobsSharing.MultipleMergesWithRestartsAfterWait
+ydb/core/kqp/ut/olap KqpOlapBlobsSharing.MultipleMergesWithRestartsWhenWait
ydb/core/kqp/ut/olap KqpOlapIndexes.IndexesActualization
ydb/core/kqp/ut/olap KqpOlapIndexes.IndexesInBS
ydb/core/kqp/ut/olap KqpOlapIndexes.IndexesInLocalMetadata
diff --git a/ydb/core/kqp/ut/olap/blobs_sharing_ut.cpp b/ydb/core/kqp/ut/olap/blobs_sharing_ut.cpp
index c6a00cdde5b..cfbdd117d3e 100644
--- a/ydb/core/kqp/ut/olap/blobs_sharing_ut.cpp
+++ b/ydb/core/kqp/ut/olap/blobs_sharing_ut.cpp
@@ -1,14 +1,16 @@
-#include "helpers/typed_local.h"
#include "helpers/local.h"
+#include "helpers/typed_local.h"
#include "helpers/writer.h"
-#include <ydb/core/tx/columnshard/data_sharing/initiator/controller/abstract.h>
-#include <ydb/core/tx/columnshard/hooks/testing/controller.h>
+
+#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/core/tx/columnshard/common/snapshot.h>
-#include <ydb/core/tx/columnshard/data_sharing/initiator/status/abstract.h>
#include <ydb/core/tx/columnshard/data_sharing/common/context/context.h>
-#include <ydb/core/tx/columnshard/data_sharing/destination/session/destination.h>
#include <ydb/core/tx/columnshard/data_sharing/destination/events/control.h>
-#include <ydb/core/base/tablet_pipecache.h>
+#include <ydb/core/tx/columnshard/data_sharing/destination/session/destination.h>
+#include <ydb/core/tx/columnshard/data_sharing/initiator/controller/abstract.h>
+#include <ydb/core/tx/columnshard/data_sharing/initiator/status/abstract.h>
+#include <ydb/core/tx/columnshard/hooks/testing/controller.h>
+
#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h>
#include <ydb/public/sdk/cpp/client/ydb_ss_tasks/task.h>
@@ -276,7 +278,7 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) {
void WaitResharding(const TString& hint = "") {
const TInstant start = TInstant::Now();
bool clean = false;
- while (TInstant::Now() - start < TDuration::Seconds(20)) {
+ while (TInstant::Now() - start < TDuration::Seconds(200)) {
NYdb::NOperation::TOperationClient operationClient(Kikimr.GetDriver());
auto result = operationClient.List<NYdb::NSchemeShard::TBackgroundProcessesResponse>().GetValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
@@ -408,7 +410,7 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) {
public:
TAsyncReshardingTest() {
- TLocalHelper(Kikimr).CreateTestOlapTable("olapTable", "olapStore", 128, 4);
+ TLocalHelper(Kikimr).CreateTestOlapTable("olapTable", "olapStore", 1024, 32);
}
void AddBatch(int numRows) {
@@ -561,5 +563,120 @@ Y_UNIT_TEST_SUITE(KqpOlapBlobsSharing) {
tester.CheckCount();
}
+
+ Y_UNIT_TEST(MultipleMerge) {
+ TAsyncReshardingTest tester;
+ tester.DisableCompaction();
+
+ tester.AddBatch(10000);
+
+ for (int i = 0; i < 4; i++) {
+ tester.StartResharding("MERGE");
+ tester.WaitResharding();
+ }
+
+ tester.RestartAllShards();
+
+ tester.CheckCount();
+ }
+
+ Y_UNIT_TEST(MultipleSplits) {
+ TAsyncReshardingTest tester;
+ tester.DisableCompaction();
+
+ tester.AddBatch(10000);
+
+ for (int i = 0; i < 4; i++) {
+ tester.StartResharding("SPLIT");
+ tester.WaitResharding();
+ }
+
+ tester.RestartAllShards();
+
+ tester.CheckCount();
+ }
+
+ Y_UNIT_TEST(MultipleSplitsThenMerges) {
+ TAsyncReshardingTest tester;
+ tester.DisableCompaction();
+
+ tester.AddBatch(10000);
+
+ for (int i = 0; i < 4; i++) {
+ tester.StartResharding("SPLIT");
+ tester.WaitResharding();
+ }
+
+ for (int i = 0; i < 8; i++) {
+ tester.StartResharding("MERGE");
+ tester.WaitResharding();
+ }
+
+ tester.RestartAllShards();
+
+ tester.CheckCount();
+ }
+
+ Y_UNIT_TEST(MultipleSplitsWithRestartsAfterWait) {
+ TAsyncReshardingTest tester;
+ tester.DisableCompaction();
+
+ tester.AddBatch(10000);
+
+ for (int i = 0; i < 4; i++) {
+ tester.StartResharding("SPLIT");
+ tester.WaitResharding();
+ tester.RestartAllShards();
+ }
+
+ tester.CheckCount();
+ }
+
+ Y_UNIT_TEST(MultipleSplitsWithRestartsWhenWait) {
+ TAsyncReshardingTest tester;
+ tester.DisableCompaction();
+
+ tester.AddBatch(10000);
+
+ for (int i = 0; i < 4; i++) {
+ tester.StartResharding("SPLIT");
+ tester.RestartAllShards();
+ tester.WaitResharding();
+ }
+ tester.RestartAllShards();
+
+ tester.CheckCount();
+ }
+
+ Y_UNIT_TEST(MultipleMergesWithRestartsAfterWait) {
+ TAsyncReshardingTest tester;
+ tester.DisableCompaction();
+
+ tester.AddBatch(10000);
+
+ for (int i = 0; i < 4; i++) {
+ tester.StartResharding("MERGE");
+ tester.WaitResharding();
+ tester.RestartAllShards();
+ }
+
+ tester.CheckCount();
+ }
+
+ Y_UNIT_TEST(MultipleMergesWithRestartsWhenWait) {
+ TAsyncReshardingTest tester;
+ tester.DisableCompaction();
+
+ tester.AddBatch(10000);
+
+ for (int i = 0; i < 4; i++) {
+ tester.StartResharding("MERGE");
+ tester.RestartAllShards();
+ tester.WaitResharding();
+ }
+ tester.RestartAllShards();
+
+ tester.CheckCount();
+ }
}
}
diff --git a/ydb/core/tx/columnshard/transactions/operators/sharing.cpp b/ydb/core/tx/columnshard/transactions/operators/sharing.cpp
index ec90f07c16e..666ee719cb7 100644
--- a/ydb/core/tx/columnshard/transactions/operators/sharing.cpp
+++ b/ydb/core/tx/columnshard/transactions/operators/sharing.cpp
@@ -52,11 +52,24 @@ void TSharingTransactionOperator::DoStartProposeOnComplete(TColumnShard& /*owner
}
bool TSharingTransactionOperator::ProgressOnExecute(
- TColumnShard& /*owner*/, const NOlap::TSnapshot& /*version*/, NTabletFlatExecutor::TTransactionContext& /*txc*/) {
+ TColumnShard& owner, const NOlap::TSnapshot& /*version*/, NTabletFlatExecutor::TTransactionContext& txc) {
+ if (!SharingTask) {
+ return true;
+ }
+ if (!TxFinish) {
+ TxFinish = SharingTask->AckInitiatorFinished(&owner, SharingTask).DetachResult();
+ }
+ TxFinish->Execute(txc, NActors::TActivationContext::AsActorContext());
+
return true;
}
bool TSharingTransactionOperator::ProgressOnComplete(TColumnShard& owner, const TActorContext& ctx) {
+ if (!SharingTask) {
+ return true;
+ }
+ AFL_VERIFY(!!TxFinish);
+ TxFinish->Complete(ctx);
for (TActorId subscriber : NotifySubscribers) {
auto event = MakeHolder<TEvColumnShard::TEvNotifyTxCompletionResult>(owner.TabletID(), GetTxId());
ctx.Send(subscriber, event.Release(), 0, 0);
diff --git a/ydb/core/tx/columnshard/transactions/operators/sharing.h b/ydb/core/tx/columnshard/transactions/operators/sharing.h
index 13c7df7cad0..c5c961d98ba 100644
--- a/ydb/core/tx/columnshard/transactions/operators/sharing.h
+++ b/ydb/core/tx/columnshard/transactions/operators/sharing.h
@@ -17,6 +17,7 @@ private:
mutable std::unique_ptr<NTabletFlatExecutor::ITransaction> TxPropose;
mutable std::unique_ptr<NTabletFlatExecutor::ITransaction> TxConfirm;
mutable std::unique_ptr<NTabletFlatExecutor::ITransaction> TxAbort;
+ mutable std::unique_ptr<NTabletFlatExecutor::ITransaction> TxFinish;
static inline auto Registrator = TFactory::TRegistrator<TSharingTransactionOperator>(NKikimrTxColumnShard::TX_KIND_SHARING);
THashSet<TActorId> NotifySubscribers;
virtual TTxController::TProposeResult DoStartProposeOnExecute(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) override;