aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikolay Shestakov <tesseract@ydb.tech>2025-03-06 21:47:51 +0500
committerGitHub <noreply@github.com>2025-03-06 16:47:51 +0000
commitf5739aac018edf5103248a8a0f61413975573e62 (patch)
treeae2b41ed132b9fdc1ce890bd733d30445e1f749c
parent06876fe20685ad9d3c6854ddb01ad386bce93f71 (diff)
downloadydb-f5739aac018edf5103248a8a0f61413975573e62.tar.gz
Test for describe a transfer with errors (#15404)
-rw-r--r--ydb/core/tx/replication/service/transfer_writer.cpp43
-rw-r--r--ydb/tests/functional/transfer/main.cpp84
2 files changed, 97 insertions, 30 deletions
diff --git a/ydb/core/tx/replication/service/transfer_writer.cpp b/ydb/core/tx/replication/service/transfer_writer.cpp
index df8338e6f8..cac69ecd69 100644
--- a/ydb/core/tx/replication/service/transfer_writer.cpp
+++ b/ydb/core/tx/replication/service/transfer_writer.cpp
@@ -402,7 +402,7 @@ private:
switch (ev->GetTypeRewrite()) {
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
- hFunc(TEvWorker::TEvHandshake, HoldHandle);
+ hFunc(TEvWorker::TEvHandshake, Handle);
hFunc(TEvWorker::TEvData, HoldHandle);
sFunc(TEvents::TEvWakeup, GetTableScheme);
sFunc(TEvents::TEvPoison, PassAway);
@@ -501,7 +501,7 @@ private:
switch (ev->GetTypeRewrite()) {
hFunc(NFq::TEvRowDispatcher::TEvPurecalcCompileResponse, Handle);
- hFunc(TEvWorker::TEvHandshake, HoldHandle);
+ hFunc(TEvWorker::TEvHandshake, Handle);
hFunc(TEvWorker::TEvData, HoldHandle);
//sFunc(TEvents::TEvWakeup, SendS3Request);
sFunc(TEvents::TEvPoison, PassAway);
@@ -545,11 +545,6 @@ private:
void StartWork() {
Become(&TThis::StateWork);
- if (PendingWorker) {
- ProcessWorker(*PendingWorker);
- PendingWorker.reset();
- }
-
if (PendingRecords) {
ProcessData(PendingPartitionId, *PendingRecords);
PendingRecords.reset();
@@ -565,21 +560,16 @@ private:
}
}
- void HoldHandle(TEvWorker::TEvHandshake::TPtr& ev) {
- Y_ABORT_UNLESS(!PendingWorker);
- PendingWorker = ev->Sender;
- }
-
void Handle(TEvWorker::TEvHandshake::TPtr& ev) {
- ProcessWorker(ev->Sender);
- }
-
- void ProcessWorker(const TActorId& worker) {
- Worker = worker;
+ Worker = ev->Sender;
LOG_D("Handshake"
<< ": worker# " << Worker);
- Send(Worker, new TEvWorker::TEvHandshake());
+ if (ProcessingError) {
+ Leave(ProcessingErrorStatus, *ProcessingError);
+ } else {
+ Send(Worker, new TEvWorker::TEvHandshake());
+ }
}
void HoldHandle(TEvWorker::TEvData::TPtr& ev) {
@@ -632,7 +622,7 @@ private:
STFUNC(StateWrite) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvents::TEvCompleted, Handle);
- hFunc(TEvWorker::TEvHandshake, HoldHandle);
+ hFunc(TEvWorker::TEvHandshake, Handle);
hFunc(TEvWorker::TEvData, HoldHandle);
sFunc(TEvents::TEvPoison, PassAway);
@@ -688,12 +678,16 @@ private:
this->Schedule(Delay + random, new TEvents::TEvWakeup());
}
- template <typename... Args>
- void Leave(Args&&... args) {
+ void Leave(TEvWorker::TEvGone::EStatus status, const TString& message) {
LOG_I("Leave");
- Send(Worker, new TEvWorker::TEvGone(std::forward<Args>(args)...));
- PassAway();
+ if (Worker) {
+ Send(Worker, new TEvWorker::TEvGone(status, message));
+ PassAway();
+ } else {
+ ProcessingErrorStatus = status;
+ ProcessingError = message;
+ }
}
void PassAway() override {
@@ -726,9 +720,10 @@ private:
TProgramHolder::TPtr ProgramHolder;
mutable TMaybe<TString> LogPrefix;
+
+ mutable TEvWorker::TEvGone::EStatus ProcessingErrorStatus;
mutable TMaybe<TString> ProcessingError;
- std::optional<TActorId> PendingWorker;
ui32 PendingPartitionId = 0;
std::optional<TVector<TTopicMessage>> PendingRecords;
diff --git a/ydb/tests/functional/transfer/main.cpp b/ydb/tests/functional/transfer/main.cpp
index f80b672432..619ac9281f 100644
--- a/ydb/tests/functional/transfer/main.cpp
+++ b/ydb/tests/functional/transfer/main.cpp
@@ -147,13 +147,13 @@ struct MainTestCase {
UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
}
- void CreateTopic() {
+ void CreateTopic(size_t partitionCount = 10) {
auto res = Session.ExecuteQuery(Sprintf(R"(
CREATE TOPIC `%s`
WITH (
- min_active_partitions = 10
+ min_active_partitions = %d
);
- )", TopicName.data()), TTxControl::NoTx()).GetValueSync();
+ )", TopicName.data(), partitionCount), TTxControl::NoTx()).GetValueSync();
UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
}
@@ -194,7 +194,7 @@ struct MainTestCase {
TDescribeReplicationSettings settings;
settings.IncludeStats(true);
- return client.DescribeReplication(TString("/") + GetEnv("YDB_DATABASE") + "/" + TransferName, settings);
+ return client.DescribeReplication(TString("/") + GetEnv("YDB_DATABASE") + "/" + TransferName, settings).ExtractValueSync();
}
auto DescribeTopic() {
@@ -828,14 +828,14 @@ Y_UNIT_TEST_SUITE(Transfer)
});
{
- auto result = testCase.DescribeTransfer().ExtractValueSync();
+ auto result = testCase.DescribeTransfer();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToOneLineString());
}
testCase.DropTransfer();
{
- auto result = testCase.DescribeTransfer().ExtractValueSync();
+ auto result = testCase.DescribeTransfer();
UNIT_ASSERT_C(!result.IsSuccess(), result.GetIssues().ToOneLineString());
UNIT_ASSERT_VALUES_EQUAL(EStatus::SCHEME_ERROR, result.GetStatus());
}
@@ -897,5 +897,77 @@ Y_UNIT_TEST_SUITE(Transfer)
}
}
+ Y_UNIT_TEST(DescribeError_OnLambdaCompilation)
+ {
+ MainTestCase testCase;
+ testCase.CreateTable(R"(
+ CREATE TABLE `%s` (
+ Key Uint64 NOT NULL,
+ Message Utf8 NOT NULL,
+ PRIMARY KEY (Key)
+ ) WITH (
+ STORE = COLUMN
+ );
+ )");
+
+ testCase.CreateTopic(1);
+ testCase.CreateTransfer(R"(
+ $l = ($x) -> {
+ return $x._unknown_field_for_lambda_compilation_error;
+ };
+ )");
+
+ for (size_t i = 20; i--;) {
+ auto result = testCase.DescribeTransfer().GetReplicationDescription();
+ if (TReplicationDescription::EState::Error == result.GetState()) {
+ Cerr << ">>>>> " << result.GetErrorState().GetIssues().ToOneLineString() << Endl << Flush;
+ UNIT_ASSERT(result.GetErrorState().GetIssues().ToOneLineString().contains("_unknown_field_for_lambda_compilation_error"));
+ break;
+ }
+
+ UNIT_ASSERT_C(i, "Unable to wait transfer error");
+ Sleep(TDuration::Seconds(1));
+ }
+ }
+
+ Y_UNIT_TEST(DescribeError_OnWriteToShard)
+ {
+ MainTestCase testCase;
+ testCase.CreateTable(R"(
+ CREATE TABLE `%s` (
+ Key Uint64 NOT NULL,
+ Message Utf8,
+ PRIMARY KEY (Key)
+ ) WITH (
+ STORE = COLUMN
+ );
+ )");
+
+ testCase.CreateTopic(1);
+ testCase.CreateTransfer(R"(
+ $l = ($x) -> {
+ return [
+ <|
+ Key:null,
+ Message:CAST($x._data AS Utf8)
+ |>
+ ];
+ };
+ )");
+
+ testCase.Write({"message-1"});
+
+ for (size_t i = 20; i--;) {
+ auto result = testCase.DescribeTransfer().GetReplicationDescription();
+ if (TReplicationDescription::EState::Error == result.GetState()) {
+ Cerr << ">>>>> " << result.GetErrorState().GetIssues().ToOneLineString() << Endl << Flush;
+ UNIT_ASSERT(result.GetErrorState().GetIssues().ToOneLineString().contains("Cannot write data into shard"));
+ break;
+ }
+
+ UNIT_ASSERT_C(i, "Unable to wait transfer error");
+ Sleep(TDuration::Seconds(1));
+ }
+ }
}