diff options
author | Nikolay Shestakov <tesseract@ydb.tech> | 2025-03-06 21:47:51 +0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-03-06 16:47:51 +0000 |
commit | f5739aac018edf5103248a8a0f61413975573e62 (patch) | |
tree | ae2b41ed132b9fdc1ce890bd733d30445e1f749c | |
parent | 06876fe20685ad9d3c6854ddb01ad386bce93f71 (diff) | |
download | ydb-f5739aac018edf5103248a8a0f61413975573e62.tar.gz |
Test for describe a transfer with errors (#15404)
-rw-r--r-- | ydb/core/tx/replication/service/transfer_writer.cpp | 43 | ||||
-rw-r--r-- | ydb/tests/functional/transfer/main.cpp | 84 |
2 files changed, 97 insertions, 30 deletions
diff --git a/ydb/core/tx/replication/service/transfer_writer.cpp b/ydb/core/tx/replication/service/transfer_writer.cpp index df8338e6f8..cac69ecd69 100644 --- a/ydb/core/tx/replication/service/transfer_writer.cpp +++ b/ydb/core/tx/replication/service/transfer_writer.cpp @@ -402,7 +402,7 @@ private: switch (ev->GetTypeRewrite()) { hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); - hFunc(TEvWorker::TEvHandshake, HoldHandle); + hFunc(TEvWorker::TEvHandshake, Handle); hFunc(TEvWorker::TEvData, HoldHandle); sFunc(TEvents::TEvWakeup, GetTableScheme); sFunc(TEvents::TEvPoison, PassAway); @@ -501,7 +501,7 @@ private: switch (ev->GetTypeRewrite()) { hFunc(NFq::TEvRowDispatcher::TEvPurecalcCompileResponse, Handle); - hFunc(TEvWorker::TEvHandshake, HoldHandle); + hFunc(TEvWorker::TEvHandshake, Handle); hFunc(TEvWorker::TEvData, HoldHandle); //sFunc(TEvents::TEvWakeup, SendS3Request); sFunc(TEvents::TEvPoison, PassAway); @@ -545,11 +545,6 @@ private: void StartWork() { Become(&TThis::StateWork); - if (PendingWorker) { - ProcessWorker(*PendingWorker); - PendingWorker.reset(); - } - if (PendingRecords) { ProcessData(PendingPartitionId, *PendingRecords); PendingRecords.reset(); @@ -565,21 +560,16 @@ private: } } - void HoldHandle(TEvWorker::TEvHandshake::TPtr& ev) { - Y_ABORT_UNLESS(!PendingWorker); - PendingWorker = ev->Sender; - } - void Handle(TEvWorker::TEvHandshake::TPtr& ev) { - ProcessWorker(ev->Sender); - } - - void ProcessWorker(const TActorId& worker) { - Worker = worker; + Worker = ev->Sender; LOG_D("Handshake" << ": worker# " << Worker); - Send(Worker, new TEvWorker::TEvHandshake()); + if (ProcessingError) { + Leave(ProcessingErrorStatus, *ProcessingError); + } else { + Send(Worker, new TEvWorker::TEvHandshake()); + } } void HoldHandle(TEvWorker::TEvData::TPtr& ev) { @@ -632,7 +622,7 @@ private: STFUNC(StateWrite) { switch (ev->GetTypeRewrite()) { hFunc(TEvents::TEvCompleted, Handle); - hFunc(TEvWorker::TEvHandshake, HoldHandle); + hFunc(TEvWorker::TEvHandshake, Handle); hFunc(TEvWorker::TEvData, HoldHandle); sFunc(TEvents::TEvPoison, PassAway); @@ -688,12 +678,16 @@ private: this->Schedule(Delay + random, new TEvents::TEvWakeup()); } - template <typename... Args> - void Leave(Args&&... args) { + void Leave(TEvWorker::TEvGone::EStatus status, const TString& message) { LOG_I("Leave"); - Send(Worker, new TEvWorker::TEvGone(std::forward<Args>(args)...)); - PassAway(); + if (Worker) { + Send(Worker, new TEvWorker::TEvGone(status, message)); + PassAway(); + } else { + ProcessingErrorStatus = status; + ProcessingError = message; + } } void PassAway() override { @@ -726,9 +720,10 @@ private: TProgramHolder::TPtr ProgramHolder; mutable TMaybe<TString> LogPrefix; + + mutable TEvWorker::TEvGone::EStatus ProcessingErrorStatus; mutable TMaybe<TString> ProcessingError; - std::optional<TActorId> PendingWorker; ui32 PendingPartitionId = 0; std::optional<TVector<TTopicMessage>> PendingRecords; diff --git a/ydb/tests/functional/transfer/main.cpp b/ydb/tests/functional/transfer/main.cpp index f80b672432..619ac9281f 100644 --- a/ydb/tests/functional/transfer/main.cpp +++ b/ydb/tests/functional/transfer/main.cpp @@ -147,13 +147,13 @@ struct MainTestCase { UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); } - void CreateTopic() { + void CreateTopic(size_t partitionCount = 10) { auto res = Session.ExecuteQuery(Sprintf(R"( CREATE TOPIC `%s` WITH ( - min_active_partitions = 10 + min_active_partitions = %d ); - )", TopicName.data()), TTxControl::NoTx()).GetValueSync(); + )", TopicName.data(), partitionCount), TTxControl::NoTx()).GetValueSync(); UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); } @@ -194,7 +194,7 @@ struct MainTestCase { TDescribeReplicationSettings settings; settings.IncludeStats(true); - return client.DescribeReplication(TString("/") + GetEnv("YDB_DATABASE") + "/" + TransferName, settings); + return client.DescribeReplication(TString("/") + GetEnv("YDB_DATABASE") + "/" + TransferName, settings).ExtractValueSync(); } auto DescribeTopic() { @@ -828,14 +828,14 @@ Y_UNIT_TEST_SUITE(Transfer) }); { - auto result = testCase.DescribeTransfer().ExtractValueSync(); + auto result = testCase.DescribeTransfer(); UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToOneLineString()); } testCase.DropTransfer(); { - auto result = testCase.DescribeTransfer().ExtractValueSync(); + auto result = testCase.DescribeTransfer(); UNIT_ASSERT_C(!result.IsSuccess(), result.GetIssues().ToOneLineString()); UNIT_ASSERT_VALUES_EQUAL(EStatus::SCHEME_ERROR, result.GetStatus()); } @@ -897,5 +897,77 @@ Y_UNIT_TEST_SUITE(Transfer) } } + Y_UNIT_TEST(DescribeError_OnLambdaCompilation) + { + MainTestCase testCase; + testCase.CreateTable(R"( + CREATE TABLE `%s` ( + Key Uint64 NOT NULL, + Message Utf8 NOT NULL, + PRIMARY KEY (Key) + ) WITH ( + STORE = COLUMN + ); + )"); + + testCase.CreateTopic(1); + testCase.CreateTransfer(R"( + $l = ($x) -> { + return $x._unknown_field_for_lambda_compilation_error; + }; + )"); + + for (size_t i = 20; i--;) { + auto result = testCase.DescribeTransfer().GetReplicationDescription(); + if (TReplicationDescription::EState::Error == result.GetState()) { + Cerr << ">>>>> " << result.GetErrorState().GetIssues().ToOneLineString() << Endl << Flush; + UNIT_ASSERT(result.GetErrorState().GetIssues().ToOneLineString().contains("_unknown_field_for_lambda_compilation_error")); + break; + } + + UNIT_ASSERT_C(i, "Unable to wait transfer error"); + Sleep(TDuration::Seconds(1)); + } + } + + Y_UNIT_TEST(DescribeError_OnWriteToShard) + { + MainTestCase testCase; + testCase.CreateTable(R"( + CREATE TABLE `%s` ( + Key Uint64 NOT NULL, + Message Utf8, + PRIMARY KEY (Key) + ) WITH ( + STORE = COLUMN + ); + )"); + + testCase.CreateTopic(1); + testCase.CreateTransfer(R"( + $l = ($x) -> { + return [ + <| + Key:null, + Message:CAST($x._data AS Utf8) + |> + ]; + }; + )"); + + testCase.Write({"message-1"}); + + for (size_t i = 20; i--;) { + auto result = testCase.DescribeTransfer().GetReplicationDescription(); + if (TReplicationDescription::EState::Error == result.GetState()) { + Cerr << ">>>>> " << result.GetErrorState().GetIssues().ToOneLineString() << Endl << Flush; + UNIT_ASSERT(result.GetErrorState().GetIssues().ToOneLineString().contains("Cannot write data into shard")); + break; + } + + UNIT_ASSERT_C(i, "Unable to wait transfer error"); + Sleep(TDuration::Seconds(1)); + } + } } |