aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikolay Shestakov <tesseract@ydb.tech>2025-02-28 18:48:52 +0500
committerGitHub <noreply@github.com>2025-02-28 16:48:52 +0300
commit29480bdd28267f4698ee779bd2ce9b06ed9ff452 (patch)
tree82e8dd12a36825c8044377a650d040f98040dbd2
parent0076d49b97d76b46ca46ce9e436b91931232dc0c (diff)
downloadydb-29480bdd28267f4698ee779bd2ce9b06ed9ff452.tar.gz
add test for drop transfer (#15204)
-rw-r--r--ydb/public/tools/lib/cmds/__init__.py8
-rw-r--r--ydb/tests/functional/transfer/main.cpp63
-rw-r--r--ydb/tests/functional/transfer/ya.make1
3 files changed, 71 insertions, 1 deletions
diff --git a/ydb/public/tools/lib/cmds/__init__.py b/ydb/public/tools/lib/cmds/__init__.py
index 3ce9493a87..34f1db3115 100644
--- a/ydb/public/tools/lib/cmds/__init__.py
+++ b/ydb/public/tools/lib/cmds/__init__.py
@@ -356,6 +356,12 @@ def deploy(arguments):
if kafka_api_port != 0:
optionals['kafka_api_port'] = kafka_api_port
+ enabled_grpc_services = arguments.enabled_grpc_services.copy() # type: typing.List[str]
+ if 'YDB_GRPC_SERVICES' in os.environ:
+ services = os.environ['YDB_GRPC_SERVICES'].split(",")
+ for service in services:
+ enabled_grpc_services.append(service)
+
configuration = KikimrConfigGenerator(
erasure=parse_erasure(arguments),
binary_paths=[arguments.ydb_binary_path] if arguments.ydb_binary_path else None,
@@ -375,7 +381,7 @@ def deploy(arguments):
use_log_files=not arguments.dont_use_log_files,
default_users=default_users(),
extra_feature_flags=enable_feature_flags,
- extra_grpc_services=arguments.enabled_grpc_services,
+ extra_grpc_services=enabled_grpc_services,
generic_connector_config=generic_connector_config(),
**optionals
)
diff --git a/ydb/tests/functional/transfer/main.cpp b/ydb/tests/functional/transfer/main.cpp
index a52b4a3ee7..e991653079 100644
--- a/ydb/tests/functional/transfer/main.cpp
+++ b/ydb/tests/functional/transfer/main.cpp
@@ -6,6 +6,7 @@
#include <ydb-cpp-sdk/client/topic/client.h>
#include <ydb-cpp-sdk/client/proto/accessor.h>
#include <ydb-cpp-sdk/client/draft/ydb_scripting.h>
+#include <ydb-cpp-sdk/client/draft/ydb_replication.h>
#include <library/cpp/threading/local_executor/local_executor.h>
@@ -179,6 +180,22 @@ struct MainTestCase {
UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
}
+ void DropTransfer() {
+ auto res = Session.ExecuteQuery(Sprintf(R"(
+ DROP TRANSFER `%s`;
+ )", TransferName.data()), TTxControl::NoTx()).GetValueSync();
+ UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
+ }
+
+ auto DescribeTransfer() {
+ NYdb::NReplication::TReplicationClient client(Driver);
+
+ NYdb::NReplication::TDescribeReplicationSettings settings;
+ settings.IncludeStats(true);
+
+ return client.DescribeReplication(TString("/") + GetEnv("YDB_DATABASE") + "/" + TransferName, settings);
+ }
+
void Write(const TMessage& message) {
TWriteSessionSettings writeSettings;
writeSettings.Path(TopicName);
@@ -768,5 +785,51 @@ Y_UNIT_TEST_SUITE(Transfer)
});
}
+ Y_UNIT_TEST(DropTransfer)
+ {
+ MainTestCase testCase;
+ testCase.Run({
+ .TableDDL = R"(
+ CREATE TABLE `%s` (
+ Key Uint64 NOT NULL,
+ Message Utf8 NOT NULL,
+ PRIMARY KEY (Key)
+ ) WITH (
+ STORE = COLUMN
+ );
+ )",
+
+ .Lambda = R"(
+ $l = ($x) -> {
+ return [
+ <|
+ Key:CAST($x._offset AS Uint64),
+ Message:CAST($x._data AS Utf8)
+ |>
+ ];
+ };
+ )",
+
+ .Messages = {{"Message-1"}},
+
+ .Expectations = {{
+ _C("Key", ui64(0)),
+ _C("Message", TString("Message-1")),
+ }}
+ });
+
+ {
+ auto result = testCase.DescribeTransfer().ExtractValueSync();
+ UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToOneLineString());
+ }
+
+ testCase.DropTransfer();
+
+ {
+ auto result = testCase.DescribeTransfer().ExtractValueSync();
+ UNIT_ASSERT_C(!result.IsSuccess(), result.GetIssues().ToOneLineString());
+ }
+ }
+
}
diff --git a/ydb/tests/functional/transfer/ya.make b/ydb/tests/functional/transfer/ya.make
index 62c1fa4d2e..152a3fde51 100644
--- a/ydb/tests/functional/transfer/ya.make
+++ b/ydb/tests/functional/transfer/ya.make
@@ -5,6 +5,7 @@ ENV(YDB_USE_IN_MEMORY_PDISKS=true)
ENV(YDB_ERASURE=block_4-2)
ENV(YDB_FEATURE_FLAGS="enable_topic_transfer")
+ENV(YDB_GRPC_SERVICES="replication")
PEERDIR(
library/cpp/threading/local_executor