diff options
| -rw-r--r-- | ydb/core/kqp/host/kqp_gateway_proxy.cpp | 6 | ||||
| -rw-r--r-- | ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp | 59 | ||||
| -rw-r--r-- | ydb/tests/functional/transfer/replication.cpp | 65 | ||||
| -rw-r--r-- | ydb/tests/functional/transfer/transfer.cpp (renamed from ydb/tests/functional/transfer/main.cpp) | 518 | ||||
| -rw-r--r-- | ydb/tests/functional/transfer/utils.h | 545 | ||||
| -rw-r--r-- | ydb/tests/functional/transfer/ya.make | 7 |
6 files changed, 738 insertions, 462 deletions
diff --git a/ydb/core/kqp/host/kqp_gateway_proxy.cpp b/ydb/core/kqp/host/kqp_gateway_proxy.cpp index 90d42a56391..11cf4998af4 100644 --- a/ydb/core/kqp/host/kqp_gateway_proxy.cpp +++ b/ydb/core/kqp/host/kqp_gateway_proxy.cpp @@ -2513,6 +2513,12 @@ public: auto& state = *op.MutableState(); state.MutableDone()->SetFailoverMode( static_cast<NKikimrReplication::TReplicationState::TDone::EFailoverMode>(done->FailoverMode)); + } else if (const auto& paused = settings.Settings.StatePaused) { + auto& state = *op.MutableState(); + state.MutablePaused(); + } else if (const auto& standBy = settings.Settings.StateStandBy) { + auto& state = *op.MutableState(); + state.MutableStandBy(); } if (settings.Settings.ConnectionString || settings.Settings.Endpoint || settings.Settings.Database || diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp index 6ee68fa8086..7cf744d0fa2 100644 --- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp +++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp @@ -7642,6 +7642,35 @@ Y_UNIT_TEST_SUITE(KqpScheme) { --!syntax_v1 ALTER ASYNC REPLICATION `/Root/replication` SET ( + STATE = "Paused" + ); + )"; + + const auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + // alter state and config + { + auto query = R"( + --!syntax_v1 + ALTER ASYNC REPLICATION `/Root/replication` + SET ( + STATE = "StandBy" + ); + )"; + + const auto result = session.ExecuteSchemeQuery(query).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + + // alter state and config + { + auto query = R"( + --!syntax_v1 + ALTER ASYNC REPLICATION `/Root/replication` + SET ( STATE = "DONE", FAILOVER_MODE = "FORCE", CONNECTION_STRING = "grpc://localhost:2135/?database=/Root" @@ -7676,7 +7705,7 @@ Y_UNIT_TEST_SUITE(KqpScheme) { break; } - UNIT_ASSERT_C(i, "Alter timeout"); + //UNIT_ASSERT_C(i, "Alter timeout"); Sleep(TDuration::Seconds(1)); } } @@ -9014,6 +9043,34 @@ Y_UNIT_TEST_SUITE(KqpScheme) { --!syntax_v1 ALTER TRANSFER `/Root/transfer` SET ( + STATE = "Paused" + ); + )"; + + const auto result = session.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + // alter state and config + { + auto query = R"( + --!syntax_v1 + ALTER TRANSFER `/Root/transfer` + SET ( + STATE = "StandBy" + ); + )"; + + const auto result = session.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + + // alter state and config + { + auto query = R"( + --!syntax_v1 + ALTER TRANSFER `/Root/transfer` + SET ( STATE = "DONE", CONNECTION_STRING = "grpc://localhost:2135/?database=/Root" ); diff --git a/ydb/tests/functional/transfer/replication.cpp b/ydb/tests/functional/transfer/replication.cpp new file mode 100644 index 00000000000..36792f9df43 --- /dev/null +++ b/ydb/tests/functional/transfer/replication.cpp @@ -0,0 +1,65 @@ +#include "utils.h" + +using namespace NReplicationTest; + +Y_UNIT_TEST_SUITE(Replication) +{ + Y_UNIT_TEST(PauseAndResumeReplication) + { + MainTestCase testCase; + testCase.CreateSourceTable(R"( + CREATE TABLE `%s` ( + Key Uint64 NOT NULL, + Message Utf8, + PRIMARY KEY (Key) + ); + )"); + + testCase.CreateReplication(); + + testCase.ExecuteSourceTableQuery("INSERT INTO `%s` (`Key`, `Message`) VALUES (1, 'Message-1');"); + + testCase.CheckResult({{ + _C("Message", TString("Message-1")) + }}); + + testCase.CheckReplicationState(TReplicationDescription::EState::Running); + + Cerr << "State: Paused" << Endl << Flush; + + testCase.PauseReplication(); + + Sleep(TDuration::Seconds(1)); + testCase.CheckReplicationState(TReplicationDescription::EState::Paused); + + testCase.ExecuteSourceTableQuery("INSERT INTO `%s` (`Key`, `Message`) VALUES (2, 'Message-2');"); + + // Replication is paused. New messages aren`t added to the table. + Sleep(TDuration::Seconds(3)); + testCase.CheckResult({{ + _C("Message", TString("Message-1")) + }}); + + Cerr << "State: StandBy" << Endl << Flush; + + testCase.ResumeReplication(); + + // Replication is resumed. New messages are added to the table. + testCase.CheckReplicationState(TReplicationDescription::EState::Running); + testCase.CheckResult({{ + _C("Message", TString("Message-1")) + }, { + _C("Message", TString("Message-2")), + }}); + + // More cycles for pause/resume + testCase.PauseReplication(); + testCase.CheckReplicationState(TReplicationDescription::EState::Paused); + + testCase.ResumeReplication(); + testCase.CheckReplicationState(TReplicationDescription::EState::Running); + + testCase.DropReplicatopn(); + } +} + diff --git a/ydb/tests/functional/transfer/main.cpp b/ydb/tests/functional/transfer/transfer.cpp index abf1dae7d70..eb090031dfb 100644 --- a/ydb/tests/functional/transfer/main.cpp +++ b/ydb/tests/functional/transfer/transfer.cpp @@ -1,449 +1,6 @@ -#include <util/system/env.h> -#include <library/cpp/testing/unittest/registar.h> +#include "utils.h" -#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/driver/driver.h> -#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/query/client.h> -#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/client.h> -#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/proto/accessor.h> -#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/draft/ydb_scripting.h> -#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/draft/ydb_replication.h> - -#include <library/cpp/threading/local_executor/local_executor.h> - -using namespace NYdb; -using namespace NYdb::NQuery; -using namespace NYdb::NTopic; -using namespace NYdb::NReplication; - -namespace { - -volatile size_t TestCaseCounter = RandomNumber<size_t>(); - -struct IChecker { - virtual void Assert(const TString& msg, const ::Ydb::Value& value) = 0; - virtual ~IChecker() = default; -}; - -template<typename T> -struct Checker : public IChecker { - Checker(T&& expected) - : Expected(std::move(expected)) - {} - - void Assert(const TString& msg, const ::Ydb::Value& value) override { - UNIT_ASSERT_VALUES_EQUAL_C(Get(value), Expected, msg); - } - - T Get(const ::Ydb::Value& value); - - T Expected; -}; - -template<> -bool Checker<bool>::Get(const ::Ydb::Value& value) { - return value.bool_value(); -} - -template<> -ui32 Checker<ui32>::Get(const ::Ydb::Value& value) { - return value.uint32_value(); -} - -template<> -ui64 Checker<ui64>::Get(const ::Ydb::Value& value) { - return value.uint64_value(); -} - -template<> -double Checker<double>::Get(const ::Ydb::Value& value) { - return value.double_value(); -} - -template<> -TString Checker<TString>::Get(const ::Ydb::Value& value) { - return value.text_value(); -} - -template<> -TInstant Checker<TInstant>::Get(const ::Ydb::Value& value) { - return TInstant::Days(value.uint32_value()); -} - -template<typename T> -std::pair<TString, std::shared_ptr<IChecker>> _C(TString&& name, T&& expected) { - return { - std::move(name), - std::make_shared<Checker<T>>(std::move(expected)) - }; -} - -struct TMessage { - TString Message; - std::optional<ui32> Partition = std::nullopt; - std::optional<TString> ProducerId = std::nullopt; - std::optional<TString> MessageGroupId = std::nullopt; - std::optional<ui64> SeqNo = std::nullopt; -}; - -TMessage _withSeqNo(ui64 seqNo) { - return { - .Message = TStringBuilder() << "Message-" << seqNo, - .Partition = 0, - .ProducerId = std::nullopt, - .MessageGroupId = std::nullopt, - .SeqNo = seqNo - }; -} - -TMessage _withProducerId(const TString& producerId) { - return { - .Message = TStringBuilder() << "Message-" << producerId, - .Partition = 0, - .ProducerId = producerId, - .MessageGroupId = std::nullopt, - .SeqNo = std::nullopt - }; -} - -TMessage _withMessageGroupId(const TString& messageGroupId) { - return { - .Message = TStringBuilder() << "Message-" << messageGroupId, - .Partition = 0, - .ProducerId = messageGroupId, - .MessageGroupId = messageGroupId, - .SeqNo = std::nullopt - }; -} - -using TExpectations = TVector<TVector<std::pair<TString, std::shared_ptr<IChecker>>>>; - -struct TConfig { - const TString TableDDL; - const TString Lambda; - const TVector<TMessage> Messages; - const TExpectations Expectations; - const TVector<TString> AlterLambdas; -}; - -struct MainTestCase { - - MainTestCase() - : Id(TestCaseCounter++) - , ConnectionString(GetEnv("YDB_ENDPOINT") + "/?database=" + GetEnv("YDB_DATABASE")) - , TopicName(TStringBuilder() << "Topic_" << Id) - , TableName(TStringBuilder() << "Table_" << Id) - , TransferName(TStringBuilder() << "Transfer_" << Id) - , Driver(TDriverConfig(ConnectionString)) - , TableClient(Driver) - , Session(TableClient.GetSession().GetValueSync().GetSession()) - , TopicClient(Driver) - { - } - - void ExecuteDDL(const TString& ddl) { - auto res = Session.ExecuteQuery(ddl, TTxControl::NoTx()).GetValueSync(); - UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); - } - - void CreateTable(const TString& tableDDL) { - ExecuteDDL(Sprintf(tableDDL.data(), TableName.data())); - } - - void CreateTopic(size_t partitionCount = 10) { - ExecuteDDL(Sprintf(R"( - CREATE TOPIC `%s` - WITH ( - min_active_partitions = %d - ); - )", TopicName.data(), partitionCount)); - } - - void CreateConsumer(const TString& consumerName) { - ExecuteDDL(Sprintf(R"( - ALTER TOPIC `%s` - ADD CONSUMER `%s`; - )", TopicName.data(), consumerName.data())); - } - - struct CreateTransferSettings { - std::optional<TString> ConsumerName = std::nullopt; - std::optional<TDuration> FlushInterval; - std::optional<ui64> BatchSizeBytes; - - CreateTransferSettings() - : ConsumerName(std::nullopt) - , FlushInterval(TDuration::Seconds(1)) - , BatchSizeBytes(8_MB) {} - - static CreateTransferSettings WithConsumerName(const TString& consumerName) { - CreateTransferSettings result; - result.ConsumerName = consumerName; - return result; - } - - static CreateTransferSettings WithBatching(const TDuration& flushInterval, const ui64 batchSize) { - CreateTransferSettings result; - result.FlushInterval = flushInterval; - result.BatchSizeBytes = batchSize; - return result; - } - }; - - void CreateTransfer(const TString& lambda, const CreateTransferSettings& settings = CreateTransferSettings()) { - TStringBuilder sb; - if (settings.ConsumerName) { - sb << ", CONSUMER = '" << *settings.ConsumerName << "'" << Endl; - } - if (settings.FlushInterval) { - sb << ", FLUSH_INTERVAL = Interval('PT" << settings.FlushInterval->Seconds() << "S')" << Endl; - } - if (settings.BatchSizeBytes) { - sb << ", BATCH_SIZE_BYTES = " << *settings.BatchSizeBytes << Endl; - } - - auto ddl = Sprintf(R"( - %s; - - CREATE TRANSFER `%s` - FROM `%s` TO `%s` USING $l - WITH ( - CONNECTION_STRING = 'grpc://%s' - %s - ); - )", lambda.data(), TransferName.data(), TopicName.data(), TableName.data(), ConnectionString.data(), sb.data()); - - ExecuteDDL(ddl); - } - - struct AlterTransferSettings { - std::optional<TString> TransformLambda; - std::optional<TDuration> FlushInterval; - std::optional<ui64> BatchSizeBytes; - - AlterTransferSettings() - : FlushInterval(std::nullopt) - , BatchSizeBytes(std::nullopt) {} - - static AlterTransferSettings WithBatching(const TDuration& flushInterval, const ui64 batchSize) { - AlterTransferSettings result; - result.FlushInterval = flushInterval; - result.BatchSizeBytes = batchSize; - return result; - } - - static AlterTransferSettings WithTransformLambda(const TString& lambda) { - AlterTransferSettings result; - result.TransformLambda = lambda; - return result; - } - }; - - void AlterTransfer(const TString& lambda) { - AlterTransfer(AlterTransferSettings::WithTransformLambda(lambda)); - } - - void AlterTransfer(const AlterTransferSettings& settings, bool success = true) { - TString lambda = settings.TransformLambda ? *settings.TransformLambda : ""; - TString setLambda = settings.TransformLambda ? "SET USING $l" : ""; - - TStringBuilder sb; - if (settings.FlushInterval) { - sb << "FLUSH_INTERVAL = Interval('PT" << settings.FlushInterval->Seconds() << "S')" << Endl; - } - if (settings.BatchSizeBytes) { - sb << ", BATCH_SIZE_BYTES = " << *settings.BatchSizeBytes << Endl; - } - - TString setOptions; - if (!sb.empty()) { - setOptions = TStringBuilder() << "SET (" << sb << " )"; - } - - auto res = Session.ExecuteQuery(Sprintf(R"( - %s; - - ALTER TRANSFER `%s` - %s - %s; - )", lambda.data(), TransferName.data(), setLambda.data(), setOptions.data()), TTxControl::NoTx()).GetValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(success, res.IsSuccess(), res.GetIssues().ToString()); - } - - void DropTransfer() { - auto res = Session.ExecuteQuery(Sprintf(R"( - DROP TRANSFER `%s`; - )", TransferName.data()), TTxControl::NoTx()).GetValueSync(); - UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); - } - - void PauseTransfer() { - ExecuteDDL(Sprintf(R"( - ALTER TRANSFER `%s` - SET ( - STATE = "Paused" - ); - )", TransferName.data())); - } - - void ResumeTransfer() { - ExecuteDDL(Sprintf(R"( - ALTER TRANSFER `%s` - SET ( - STATE = "StandBy" - ); - )", TransferName.data())); - } - - auto DescribeTransfer() { - TReplicationClient client(Driver); - - TDescribeReplicationSettings settings; - settings.IncludeStats(true); - - return client.DescribeReplication(TString("/") + GetEnv("YDB_DATABASE") + "/" + TransferName, settings).ExtractValueSync(); - } - - auto DescribeTopic() { - TDescribeTopicSettings settings; - settings.IncludeLocation(true); - settings.IncludeStats(true); - - return TopicClient.DescribeTopic(TopicName, settings).ExtractValueSync(); - } - - void Write(const TMessage& message) { - TWriteSessionSettings writeSettings; - writeSettings.Path(TopicName); - writeSettings.DeduplicationEnabled(message.SeqNo); - if (message.Partition) { - writeSettings.PartitionId(message.Partition); - } - if (message.ProducerId) { - writeSettings.ProducerId(*message.ProducerId); - } - if (message.MessageGroupId) { - writeSettings.MessageGroupId(*message.MessageGroupId); - } - auto writeSession = TopicClient.CreateSimpleBlockingWriteSession(writeSettings); - - UNIT_ASSERT(writeSession->Write(message.Message, message.SeqNo)); - writeSession->Close(TDuration::Seconds(1)); - } - - std::pair<ui64, Ydb::ResultSet> DoRead(const TExpectations& expectations) { - auto& e = expectations.front(); - - TStringBuilder columns; - for (size_t i = 0; i < e.size(); ++i) { - if (i) { - columns << ", "; - } - columns << "`" << e[i].first << "`"; - } - - - auto query = Sprintf("SELECT %s FROM `%s` ORDER BY %s", columns.data(), TableName.data(), columns.data()); - Cerr << ">>>>> Query: " << query << Endl << Flush; - auto res = Session.ExecuteQuery(query, TTxControl::NoTx()).GetValueSync(); - UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); - - const auto proto = NYdb::TProtoAccessor::GetProto(res.GetResultSet(0)); - return {proto.rowsSize(), proto}; - } - - void CheckResult(const TExpectations& expectations) { - for (size_t attempt = 20; attempt--; ) { - auto res = DoRead(expectations); - Cerr << "Attempt=" << attempt << " count=" << res.first << Endl << Flush; - if (res.first == expectations.size()) { - const Ydb::ResultSet& proto = res.second; - for (size_t i = 0; i < expectations.size(); ++i) { - auto& row = proto.rows(i); - auto& rowExpectations = expectations[i]; - for (size_t i = 0; i < rowExpectations.size(); ++i) { - auto& c = rowExpectations[i]; - TString msg = TStringBuilder() << "Row " << i << " column '" << c.first << "': "; - c.second->Assert(msg, row.items(i)); - } - } - - break; - } - - UNIT_ASSERT_C(attempt, "Unable to wait transfer result"); - Sleep(TDuration::Seconds(1)); - } - } - - TReplicationDescription CheckTransferState(TReplicationDescription::EState expected) { - for (size_t i = 20; i--;) { - auto result = DescribeTransfer().GetReplicationDescription(); - if (expected == result.GetState()) { - return result; - } - - UNIT_ASSERT_C(i, "Unable to wait transfer state. Expected: " << expected << ", actual: " << result.GetState()); - Sleep(TDuration::Seconds(1)); - } - - Y_UNREACHABLE(); - } - - void CheckTransferStateError(const TString& expectedMessage) { - auto result = CheckTransferState(TReplicationDescription::EState::Error); - Cerr << ">>>>> ACTUAL: " << result.GetErrorState().GetIssues().ToOneLineString() << Endl << Flush; - Cerr << ">>>>> EXPECTED: " << expectedMessage << Endl << Flush; - UNIT_ASSERT(result.GetErrorState().GetIssues().ToOneLineString().contains(expectedMessage)); - } - - void Run(const TConfig& config) { - - CreateTable(config.TableDDL); - CreateTopic(); - - TVector<TString> lambdas; - lambdas.insert(lambdas.end(), config.AlterLambdas.begin(), config.AlterLambdas.end()); - lambdas.push_back(config.Lambda); - - for (size_t i = 0; i < lambdas.size(); ++i) { - auto lambda = lambdas[i]; - if (!i) { - CreateTransfer(lambda); - } else { - Sleep(TDuration::Seconds(1)); - - AlterTransfer(lambda); - - if (i == lambdas.size() - 1) { - Sleep(TDuration::Seconds(1)); - } - } - } - - for (const auto& m : config.Messages) { - Write(m); - } - - CheckResult(config.Expectations); - } - - const size_t Id; - const TString ConnectionString; - - const TString TopicName; - const TString TableName; - const TString TransferName; - - TDriver Driver; - TQueryClient TableClient; - TSession Session; - TTopicClient TopicClient; - - std::vector<std::string> ColumnNames; -}; - - -} // namespace +using namespace NReplicationTest; Y_UNIT_TEST_SUITE(Transfer) { @@ -924,8 +481,8 @@ Y_UNIT_TEST_SUITE(Transfer) Y_UNIT_TEST(DropTransfer) { MainTestCase testCase; - testCase.Run({ - .TableDDL = R"( + + testCase.CreateTable(R"( CREATE TABLE `%s` ( Key Uint64 NOT NULL, Message Utf8 NOT NULL, @@ -933,9 +490,9 @@ Y_UNIT_TEST_SUITE(Transfer) ) WITH ( STORE = COLUMN ); - )", - - .Lambda = R"( + )"); + testCase.CreateTopic(); + testCase.CreateTransfer(R"( $l = ($x) -> { return [ <| @@ -944,15 +501,13 @@ Y_UNIT_TEST_SUITE(Transfer) |> ]; }; - )", - - .Messages = {{"Message-1"}}, + )"); - .Expectations = {{ - _C("Key", ui64(0)), - _C("Message", TString("Message-1")), - }} - }); + testCase.Write({"Message-1"}); + testCase.CheckResult({{ + _C("Key", ui64(0)), + _C("Message", TString("Message-1")), + }}); { auto result = testCase.DescribeTransfer(); @@ -966,6 +521,9 @@ Y_UNIT_TEST_SUITE(Transfer) UNIT_ASSERT_C(!result.IsSuccess(), result.GetIssues().ToOneLineString()); UNIT_ASSERT_VALUES_EQUAL(EStatus::SCHEME_ERROR, result.GetStatus()); } + + testCase.DropTable(); + testCase.DropTopic(); } Y_UNIT_TEST(CreateAndDropConsumer) @@ -1022,6 +580,9 @@ Y_UNIT_TEST_SUITE(Transfer) UNIT_ASSERT_C(i, "Unable to wait consumer has been removed"); Sleep(TDuration::Seconds(1)); } + + testCase.DropTable(); + testCase.DropTopic(); } Y_UNIT_TEST(DescribeError_OnLambdaCompilation) @@ -1045,6 +606,10 @@ Y_UNIT_TEST_SUITE(Transfer) )"); testCase.CheckTransferStateError("_unknown_field_for_lambda_compilation_error"); + + testCase.DropTransfer(); + testCase.DropTable(); + testCase.DropTopic(); } /* Y_UNIT_TEST(DescribeError_OnWriteToShard) @@ -1125,6 +690,9 @@ Y_UNIT_TEST_SUITE(Transfer) UNIT_ASSERT_VALUES_EQUAL(1, consumers.size()); UNIT_ASSERT_VALUES_EQUAL("PredefinedConsumer", consumers[0].GetConsumerName()); } + + testCase.DropTable(); + testCase.DropTopic(); } Y_UNIT_TEST(CustomFlushInterval) @@ -1172,6 +740,10 @@ Y_UNIT_TEST_SUITE(Transfer) UNIT_ASSERT_C(attempt, "Unable to wait transfer result"); Sleep(TDuration::Seconds(1)); } + + testCase.DropTransfer(); + testCase.DropTable(); + testCase.DropTopic(); } Y_UNIT_TEST(AlterFlushInterval) @@ -1223,6 +795,10 @@ Y_UNIT_TEST_SUITE(Transfer) testCase.CheckResult({{ _C("Message", TString("Message-1")) }}); + + testCase.DropTransfer(); + testCase.DropTable(); + testCase.DropTopic(); } Y_UNIT_TEST(AlterBatchSize) @@ -1266,6 +842,10 @@ Y_UNIT_TEST_SUITE(Transfer) testCase.CheckResult({{ _C("Message", TString("Message-1")) }}); + + testCase.DropTransfer(); + testCase.DropTable(); + testCase.DropTopic(); } Y_UNIT_TEST(CreateTransferSourceNotExists) @@ -1293,6 +873,9 @@ Y_UNIT_TEST_SUITE(Transfer) )"); testCase.CheckTransferStateError("Discovery error: local/Topic_"); + + testCase.DropTransfer(); + testCase.DropTable(); } Y_UNIT_TEST(CreateTransferSourceIsNotTopic) @@ -1308,7 +891,7 @@ Y_UNIT_TEST_SUITE(Transfer) ); )"); - testCase.ExecuteDDL(Sprintf(R"( + testCase.ExecuteDDL(Sprintf(R"( CREATE TABLE `%s` ( Key Uint64 NOT NULL, PRIMARY KEY (Key) @@ -1327,6 +910,9 @@ Y_UNIT_TEST_SUITE(Transfer) )"); testCase.CheckTransferStateError("Discovery error: local/Topic_"); + + testCase.DropTransfer(); + testCase.DropTable(); } Y_UNIT_TEST(CreateTransferRowTable) @@ -1353,6 +939,10 @@ Y_UNIT_TEST_SUITE(Transfer) )"); testCase.CheckTransferStateError("Only column tables are supported as transfer targets"); + + testCase.DropTransfer(); + testCase.DropTable(); + testCase.DropTopic(); } Y_UNIT_TEST(CreateTransferTargetIsNotTable) @@ -1375,6 +965,9 @@ Y_UNIT_TEST_SUITE(Transfer) )"); testCase.CheckTransferStateError("Only column tables are supported as transfer targets"); + + testCase.DropTransfer(); + testCase.DropTopic(); } Y_UNIT_TEST(CreateTransferTargetNotExists) @@ -1394,6 +987,9 @@ Y_UNIT_TEST_SUITE(Transfer) )"); testCase.CheckTransferStateError(TStringBuilder() << "The target table `/local/" << testCase.TableName << "` does not exist"); + + testCase.DropTransfer(); + testCase.DropTopic(); } Y_UNIT_TEST(PauseAndResumeTransfer) @@ -1462,6 +1058,10 @@ Y_UNIT_TEST_SUITE(Transfer) testCase.ResumeTransfer(); testCase.CheckTransferState(TReplicationDescription::EState::Running); + + testCase.DropTransfer(); + testCase.DropTable(); + testCase.DropTopic(); } } diff --git a/ydb/tests/functional/transfer/utils.h b/ydb/tests/functional/transfer/utils.h new file mode 100644 index 00000000000..05bbfe52cbd --- /dev/null +++ b/ydb/tests/functional/transfer/utils.h @@ -0,0 +1,545 @@ +#pragma once + +#include <util/system/env.h> +#include <library/cpp/testing/unittest/registar.h> + +#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/driver/driver.h> +#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/query/client.h> +#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/client.h> +#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/proto/accessor.h> +#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/draft/ydb_scripting.h> +#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/draft/ydb_replication.h> + +#include <library/cpp/threading/local_executor/local_executor.h> + +using namespace NYdb; +using namespace NYdb::NQuery; +using namespace NYdb::NTopic; +using namespace NYdb::NReplication; + +namespace NReplicationTest { + +struct IChecker { + virtual void Assert(const TString& msg, const ::Ydb::Value& value) = 0; + virtual ~IChecker() = default; +}; + +template<typename T> +struct Checker : public IChecker { + Checker(T&& expected) + : Expected(std::move(expected)) + {} + + void Assert(const TString& msg, const ::Ydb::Value& value) override { + UNIT_ASSERT_VALUES_EQUAL_C(Get(value), Expected, msg); + } + + T Get(const ::Ydb::Value& value); + + T Expected; +}; + +template<> +inline bool Checker<bool>::Get(const ::Ydb::Value& value) { + return value.bool_value(); +} + +template<> +inline ui32 Checker<ui32>::Get(const ::Ydb::Value& value) { + return value.uint32_value(); +} + +template<> +inline ui64 Checker<ui64>::Get(const ::Ydb::Value& value) { + return value.uint64_value(); +} + +template<> +inline double Checker<double>::Get(const ::Ydb::Value& value) { + return value.double_value(); +} + +template<> +inline TString Checker<TString>::Get(const ::Ydb::Value& value) { + return value.text_value(); +} + +template<> +inline TInstant Checker<TInstant>::Get(const ::Ydb::Value& value) { + return TInstant::Days(value.uint32_value()); +} + +template<typename T> +std::pair<TString, std::shared_ptr<IChecker>> _C(TString&& name, T&& expected) { + return { + std::move(name), + std::make_shared<Checker<T>>(std::move(expected)) + }; +} + +struct TMessage { + TString Message; + std::optional<ui32> Partition = std::nullopt; + std::optional<TString> ProducerId = std::nullopt; + std::optional<TString> MessageGroupId = std::nullopt; + std::optional<ui64> SeqNo = std::nullopt; +}; + +inline TMessage _withSeqNo(ui64 seqNo) { + return { + .Message = TStringBuilder() << "Message-" << seqNo, + .Partition = 0, + .ProducerId = std::nullopt, + .MessageGroupId = std::nullopt, + .SeqNo = seqNo + }; +} + +inline TMessage _withProducerId(const TString& producerId) { + return { + .Message = TStringBuilder() << "Message-" << producerId, + .Partition = 0, + .ProducerId = producerId, + .MessageGroupId = std::nullopt, + .SeqNo = std::nullopt + }; +} + +inline TMessage _withMessageGroupId(const TString& messageGroupId) { + return { + .Message = TStringBuilder() << "Message-" << messageGroupId, + .Partition = 0, + .ProducerId = messageGroupId, + .MessageGroupId = messageGroupId, + .SeqNo = std::nullopt + }; +} + +using TExpectations = TVector<TVector<std::pair<TString, std::shared_ptr<IChecker>>>>; + +struct TConfig { + const TString TableDDL; + const TString Lambda; + const TVector<TMessage> Messages; + const TExpectations Expectations; + const TVector<TString> AlterLambdas; +}; + +struct MainTestCase { + + MainTestCase() + : Id(RandomNumber<size_t>()) + , ConnectionString(GetEnv("YDB_ENDPOINT") + "/?database=" + GetEnv("YDB_DATABASE")) + , TopicName(TStringBuilder() << "Topic_" << Id) + , SourceTableName(TStringBuilder() << "SourceTable_" << Id) + , TableName(TStringBuilder() << "Table_" << Id) + , ReplicationName(TStringBuilder() << "Replication_" << Id) + , TransferName(TStringBuilder() << "Transfer_" << Id) + , Driver(TDriverConfig(ConnectionString)) + , TableClient(Driver) + , Session(TableClient.GetSession().GetValueSync().GetSession()) + , TopicClient(Driver) + { + } + + void ExecuteDDL(const TString& ddl, bool checkResult = true) { + Cerr << "DDL: " << ddl << Endl << Flush; + auto res = Session.ExecuteQuery(ddl, TTxControl::NoTx()).GetValueSync(); + if (checkResult) { + UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); + } + } + + auto ExecuteSourceTableQuery(const TString& query) { + for (size_t i = 10; i--;) { + auto q = Sprintf(query.data(), SourceTableName.data()); + Cerr << ">>>>> Query: " << q << Endl << Flush; + auto res = Session.ExecuteQuery(q, TTxControl::NoTx()).GetValueSync(); + if (res.IsSuccess()) { + return; + } + + UNIT_ASSERT_C(i, res.GetIssues().ToString()); + Sleep(TDuration::Seconds(1)); + } + } + + void CreateTable(const TString& tableDDL) { + ExecuteDDL(Sprintf(tableDDL.data(), TableName.data())); + } + + void DropTable() { + ExecuteDDL(Sprintf("DROP TABLE `%s`", TableName.data())); + } + + void CreateSourceTable(const TString& tableDDL) { + ExecuteDDL(Sprintf(tableDDL.data(), SourceTableName.data())); + } + + void DropSourceTable() { + ExecuteDDL(Sprintf("DROP TABLE `%s`", SourceTableName.data())); + } + + void CreateTopic(size_t partitionCount = 10) { + ExecuteDDL(Sprintf(R"( + CREATE TOPIC `%s` + WITH ( + min_active_partitions = %d + ); + )", TopicName.data(), partitionCount)); + } + + void DropTopic() { + ExecuteDDL(Sprintf("DROP TOPIC `%s`", TopicName.data())); + } + + void CreateConsumer(const TString& consumerName) { + ExecuteDDL(Sprintf(R"( + ALTER TOPIC `%s` + ADD CONSUMER `%s`; + )", TopicName.data(), consumerName.data())); + } + + struct CreateTransferSettings { + std::optional<TString> ConsumerName = std::nullopt; + std::optional<TDuration> FlushInterval; + std::optional<ui64> BatchSizeBytes; + + CreateTransferSettings() + : ConsumerName(std::nullopt) + , FlushInterval(TDuration::Seconds(1)) + , BatchSizeBytes(8_MB) {} + + static CreateTransferSettings WithConsumerName(const TString& consumerName) { + CreateTransferSettings result; + result.ConsumerName = consumerName; + return result; + } + + static CreateTransferSettings WithBatching(const TDuration& flushInterval, const ui64 batchSize) { + CreateTransferSettings result; + result.FlushInterval = flushInterval; + result.BatchSizeBytes = batchSize; + return result; + } + }; + + void CreateTransfer(const TString& lambda, const CreateTransferSettings& settings = CreateTransferSettings()) { + TStringBuilder sb; + if (settings.ConsumerName) { + sb << ", CONSUMER = '" << *settings.ConsumerName << "'" << Endl; + } + if (settings.FlushInterval) { + sb << ", FLUSH_INTERVAL = Interval('PT" << settings.FlushInterval->Seconds() << "S')" << Endl; + } + if (settings.BatchSizeBytes) { + sb << ", BATCH_SIZE_BYTES = " << *settings.BatchSizeBytes << Endl; + } + + auto ddl = Sprintf(R"( + %s; + + CREATE TRANSFER `%s` + FROM `%s` TO `%s` USING $l + WITH ( + CONNECTION_STRING = 'grpc://%s' + %s + ); + )", lambda.data(), TransferName.data(), TopicName.data(), TableName.data(), ConnectionString.data(), sb.data()); + + ExecuteDDL(ddl); + } + + struct AlterTransferSettings { + std::optional<TString> TransformLambda; + std::optional<TDuration> FlushInterval; + std::optional<ui64> BatchSizeBytes; + + AlterTransferSettings() + : FlushInterval(std::nullopt) + , BatchSizeBytes(std::nullopt) {} + + static AlterTransferSettings WithBatching(const TDuration& flushInterval, const ui64 batchSize) { + AlterTransferSettings result; + result.FlushInterval = flushInterval; + result.BatchSizeBytes = batchSize; + return result; + } + + static AlterTransferSettings WithTransformLambda(const TString& lambda) { + AlterTransferSettings result; + result.TransformLambda = lambda; + return result; + } + }; + + void AlterTransfer(const TString& lambda) { + AlterTransfer(AlterTransferSettings::WithTransformLambda(lambda)); + } + + void AlterTransfer(const AlterTransferSettings& settings, bool success = true) { + TString lambda = settings.TransformLambda ? *settings.TransformLambda : ""; + TString setLambda = settings.TransformLambda ? "SET USING $l" : ""; + + TStringBuilder sb; + if (settings.FlushInterval) { + sb << "FLUSH_INTERVAL = Interval('PT" << settings.FlushInterval->Seconds() << "S')" << Endl; + } + if (settings.BatchSizeBytes) { + sb << ", BATCH_SIZE_BYTES = " << *settings.BatchSizeBytes << Endl; + } + + TString setOptions; + if (!sb.empty()) { + setOptions = TStringBuilder() << "SET (" << sb << " )"; + } + + auto res = Session.ExecuteQuery(Sprintf(R"( + %s; + + ALTER TRANSFER `%s` + %s + %s; + )", lambda.data(), TransferName.data(), setLambda.data(), setOptions.data()), TTxControl::NoTx()).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(success, res.IsSuccess(), res.GetIssues().ToString()); + } + + void DropTransfer() { + ExecuteDDL(Sprintf("DROP TRANSFER `%s`;", TransferName.data())); + } + + void PauseTransfer() { + ExecuteDDL(Sprintf(R"( + ALTER TRANSFER `%s` + SET ( + STATE = "Paused" + ); + )", TransferName.data())); + } + + void ResumeTransfer() { + ExecuteDDL(Sprintf(R"( + ALTER TRANSFER `%s` + SET ( + STATE = "StandBy" + ); + )", TransferName.data())); + } + + auto DescribeTransfer() { + TReplicationClient client(Driver); + + TDescribeReplicationSettings settings; + settings.IncludeStats(true); + + return client.DescribeReplication(TString("/") + GetEnv("YDB_DATABASE") + "/" + TransferName, settings).ExtractValueSync(); + } + + void CreateReplication() { + auto ddl = Sprintf(R"( + CREATE ASYNC REPLICATION `%s` + FOR `%s` AS `%s` + WITH ( + CONNECTION_STRING = 'grpc://%s' + ); + )", ReplicationName.data(), SourceTableName.data(), TableName.data(), ConnectionString.data()); + + ExecuteDDL(ddl); + } + + void DropReplicatopn() { + ExecuteDDL(Sprintf("DROP ASYNC REPLICATION `%s`;", ReplicationName.data())); + } + + auto DescribeReplication() { + TReplicationClient client(Driver); + + TDescribeReplicationSettings settings; + settings.IncludeStats(true); + + return client.DescribeReplication(TString("/") + GetEnv("YDB_DATABASE") + "/" + ReplicationName, settings).ExtractValueSync(); + } + + TReplicationDescription CheckReplicationState(TReplicationDescription::EState expected) { + for (size_t i = 20; i--;) { + auto result = DescribeReplication().GetReplicationDescription(); + if (expected == result.GetState()) { + return result; + } + + UNIT_ASSERT_C(i, "Unable to wait replication state. Expected: " << expected << ", actual: " << result.GetState()); + Sleep(TDuration::Seconds(1)); + } + + Y_UNREACHABLE(); + } + + void PauseReplication() { + ExecuteDDL(Sprintf(R"( + ALTER ASYNC REPLICATION `%s` + SET ( + STATE = "Paused" + ); + )", ReplicationName.data())); + } + + void ResumeReplication() { + ExecuteDDL(Sprintf(R"( + ALTER ASYNC REPLICATION `%s` + SET ( + STATE = "StandBy" + ); + )", ReplicationName.data())); + } + + auto DescribeTopic() { + TDescribeTopicSettings settings; + settings.IncludeLocation(true); + settings.IncludeStats(true); + + return TopicClient.DescribeTopic(TopicName, settings).ExtractValueSync(); + } + + void Write(const TMessage& message) { + TWriteSessionSettings writeSettings; + writeSettings.Path(TopicName); + writeSettings.DeduplicationEnabled(message.SeqNo); + if (message.Partition) { + writeSettings.PartitionId(message.Partition); + } + if (message.ProducerId) { + writeSettings.ProducerId(*message.ProducerId); + } + if (message.MessageGroupId) { + writeSettings.MessageGroupId(*message.MessageGroupId); + } + auto writeSession = TopicClient.CreateSimpleBlockingWriteSession(writeSettings); + + UNIT_ASSERT(writeSession->Write(message.Message, message.SeqNo)); + writeSession->Close(TDuration::Seconds(1)); + } + + std::pair<i64, Ydb::ResultSet> DoRead(const TExpectations& expectations) { + auto& e = expectations.front(); + + TStringBuilder columns; + for (size_t i = 0; i < e.size(); ++i) { + if (i) { + columns << ", "; + } + columns << "`" << e[i].first << "`"; + } + + + auto query = Sprintf("SELECT %s FROM `%s` ORDER BY %s", columns.data(), TableName.data(), columns.data()); + Cerr << ">>>>> Query: " << query << Endl << Flush; + auto res = Session.ExecuteQuery(query, TTxControl::NoTx()).GetValueSync(); + if (!res.IsSuccess()) { + Cerr << ">>>>> Query error: " << res.GetIssues().ToString() << Endl << Flush; + TResultSet r{Ydb::ResultSet()}; + return {-1, NYdb::TProtoAccessor::GetProto(r)}; + } + + const auto proto = NYdb::TProtoAccessor::GetProto(res.GetResultSet(0)); + return {proto.rowsSize(), proto}; + } + + void CheckResult(const TExpectations& expectations) { + for (size_t attempt = 20; attempt--; ) { + auto res = DoRead(expectations); + Cerr << "Attempt=" << attempt << " count=" << res.first << Endl << Flush; + if (res.first == (ssize_t)expectations.size()) { + const Ydb::ResultSet& proto = res.second; + for (size_t i = 0; i < expectations.size(); ++i) { + auto& row = proto.rows(i); + auto& rowExpectations = expectations[i]; + for (size_t i = 0; i < rowExpectations.size(); ++i) { + auto& c = rowExpectations[i]; + TString msg = TStringBuilder() << "Row " << i << " column '" << c.first << "': "; + c.second->Assert(msg, row.items(i)); + } + } + + break; + } + + UNIT_ASSERT_C(attempt, "Unable to wait transfer result"); + Sleep(TDuration::Seconds(1)); + } + } + + TReplicationDescription CheckTransferState(TReplicationDescription::EState expected) { + for (size_t i = 20; i--;) { + auto result = DescribeTransfer().GetReplicationDescription(); + if (expected == result.GetState()) { + return result; + } + + UNIT_ASSERT_C(i, "Unable to wait transfer state. Expected: " << expected << ", actual: " << result.GetState()); + Sleep(TDuration::Seconds(1)); + } + + Y_UNREACHABLE(); + } + + void CheckTransferStateError(const TString& expectedMessage) { + auto result = CheckTransferState(TReplicationDescription::EState::Error); + Cerr << ">>>>> ACTUAL: " << result.GetErrorState().GetIssues().ToOneLineString() << Endl << Flush; + Cerr << ">>>>> EXPECTED: " << expectedMessage << Endl << Flush; + UNIT_ASSERT(result.GetErrorState().GetIssues().ToOneLineString().contains(expectedMessage)); + } + + void Run(const TConfig& config) { + + CreateTable(config.TableDDL); + CreateTopic(); + + TVector<TString> lambdas; + lambdas.insert(lambdas.end(), config.AlterLambdas.begin(), config.AlterLambdas.end()); + lambdas.push_back(config.Lambda); + + for (size_t i = 0; i < lambdas.size(); ++i) { + auto lambda = lambdas[i]; + if (!i) { + CreateTransfer(lambda); + } else { + Sleep(TDuration::Seconds(1)); + + AlterTransfer(lambda); + + if (i == lambdas.size() - 1) { + Sleep(TDuration::Seconds(1)); + } + } + } + + for (const auto& m : config.Messages) { + Write(m); + } + + CheckResult(config.Expectations); + + DropTransfer(); + DropTable(); + DropTopic(); + } + + const size_t Id; + const TString ConnectionString; + + const TString TopicName; + const TString SourceTableName; + const TString TableName; + const TString ReplicationName; + const TString TransferName; + + TDriver Driver; + TQueryClient TableClient; + TSession Session; + TTopicClient TopicClient; + + std::vector<std::string> ColumnNames; +}; + + +} // namespace diff --git a/ydb/tests/functional/transfer/ya.make b/ydb/tests/functional/transfer/ya.make index 152a3fde51d..c6b70829d5a 100644 --- a/ydb/tests/functional/transfer/ya.make +++ b/ydb/tests/functional/transfer/ya.make @@ -16,7 +16,8 @@ PEERDIR( ) SRCS( - main.cpp + replication.cpp + transfer.cpp ) INCLUDE(${ARCADIA_ROOT}/ydb/public/tools/ydb_recipe/recipe.inc) @@ -24,7 +25,9 @@ INCLUDE(${ARCADIA_ROOT}/ydb/public/tools/ydb_recipe/recipe.inc) SIZE(MEDIUM) IF (SANITIZER_TYPE) - REQUIREMENTS(ram:16 cpu:4) + REQUIREMENTS(ram:24 cpu:4) +ELSE() + REQUIREMENTS(ram:16 cpu:2) ENDIF() END() |
