aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikolay Shestakov <tesseract@ydb.tech>2025-02-21 19:00:13 +0500
committerGitHub <noreply@github.com>2025-02-21 14:00:13 +0000
commit147c972fc3a3eabca9f89df5473016a0afe338b8 (patch)
treebb5bc36a6afffa14d397c7245efaed0a5c0558ed
parentd2ecf39bd530f2f1d5e96042820c544d53b70196 (diff)
downloadydb-147c972fc3a3eabca9f89df5473016a0afe338b8.tar.gz
Fix wrong order of columns for transfer data from topic to column table (#14887)
-rw-r--r--ydb/core/tx/replication/service/transfer_writer.cpp25
-rw-r--r--ydb/tests/functional/transfer/main.cpp403
2 files changed, 370 insertions, 58 deletions
diff --git a/ydb/core/tx/replication/service/transfer_writer.cpp b/ydb/core/tx/replication/service/transfer_writer.cpp
index f9d4314c54..bb2b193e4f 100644
--- a/ydb/core/tx/replication/service/transfer_writer.cpp
+++ b/ydb/core/tx/replication/service/transfer_writer.cpp
@@ -236,27 +236,32 @@ TScheme BuildScheme(const TAutoPtr<NSchemeCache::TSchemeCacheNavigate>& nav) {
});
result.TableColumns.resize(keyColumns);
- result.ColumnsMetadata.resize(keyColumns);
for (const auto& [_, column] : entry.Columns) {
- NKikimrKqp::TKqpColumnMetadataProto* c;
if (column.KeyOrder >= 0) {
result.TableColumns[column.KeyOrder] = {column.Name, column.Id, column.PType, column.KeyOrder >= 0, !column.IsNotNullColumn};
- c = &result.ColumnsMetadata[column.KeyOrder];
} else {
result.TableColumns.emplace_back(column.Name, column.Id, column.PType, column.KeyOrder >= 0, !column.IsNotNullColumn);
- result.ColumnsMetadata.emplace_back();
- c = &result.ColumnsMetadata.back();
}
+ }
+
+ std::map<TString, TSysTables::TTableColumnInfo> columns;
+ for (const auto& [_, column] : entry.Columns) {
+ columns[column.Name] = column;
+ }
- result.WriteIndex.push_back(result.WriteIndex.size());
+ size_t i = keyColumns;
+ for (const auto& [_, column] : columns) {
+ result.ColumnsMetadata.emplace_back();
+ auto& c = result.ColumnsMetadata.back();
+ result.WriteIndex.push_back(column.KeyOrder >= 0 ? column.KeyOrder : i++);
- c->SetName(column.Name);
- c->SetId(column.Id);
- c->SetTypeId(column.PType.GetTypeId());
+ c.SetName(column.Name);
+ c.SetId(column.Id);
+ c.SetTypeId(column.PType.GetTypeId());
if (NScheme::NTypeIds::IsParametrizedType(column.PType.GetTypeId())) {
- NScheme::ProtoFromTypeInfo(column.PType, "", *c->MutableTypeInfo());
+ NScheme::ProtoFromTypeInfo(column.PType, "", *c.MutableTypeInfo());
}
}
diff --git a/ydb/tests/functional/transfer/main.cpp b/ydb/tests/functional/transfer/main.cpp
index b16c7357d6..9ea3b36bb7 100644
--- a/ydb/tests/functional/transfer/main.cpp
+++ b/ydb/tests/functional/transfer/main.cpp
@@ -15,25 +15,82 @@ using namespace NYdb::NTopic;
namespace {
-std::pair<ui64, Ydb::ResultSet> DoRead(TSession& s, const TString& table) {
- auto res = s.ExecuteQuery(
- Sprintf("SELECT Key, Message FROM `/local/%s` ORDER BY Key", table.data()),
- TTxControl::NoTx()).GetValueSync();
- UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
+volatile size_t TestCaseCounter = 0;
- const auto proto = NYdb::TProtoAccessor::GetProto(res.GetResultSet(0));
+struct IChecker {
+ virtual void Assert(const TString& msg, const ::Ydb::Value& value) = 0;
+ virtual ~IChecker() = default;
+};
- return {proto.rowsSize(), proto};
+template<typename T>
+struct Checker : public IChecker {
+ Checker(T&& expected)
+ : Expected(std::move(expected))
+ {}
+
+ void Assert(const TString& msg, const ::Ydb::Value& value) override {
+ UNIT_ASSERT_VALUES_EQUAL_C(Get(value), Expected, msg);
+ }
+
+ T Get(const ::Ydb::Value& value);
+
+ T Expected;
+};
+
+template<>
+bool Checker<bool>::Get(const ::Ydb::Value& value) {
+ return value.bool_value();
}
-} // namespace
+template<>
+ui64 Checker<ui64>::Get(const ::Ydb::Value& value) {
+ return value.uint64_value();
+}
-Y_UNIT_TEST_SUITE(Transfer)
-{
- void Main(const TString& transferName, const TString& tableName, const TString& tableDDL, const TString& topicName)
- {
+template<>
+double Checker<double>::Get(const ::Ydb::Value& value) {
+ return value.double_value();
+}
+
+template<>
+TString Checker<TString>::Get(const ::Ydb::Value& value) {
+ return value.text_value();
+}
+
+template<>
+TInstant Checker<TInstant>::Get(const ::Ydb::Value& value) {
+ return TInstant::Days(value.uint32_value());
+}
+
+template<typename T>
+std::pair<TString, std::shared_ptr<IChecker>> _C(TString&& name, T&& expected) {
+ return {
+ std::move(name),
+ std::make_shared<Checker<T>>(std::move(expected))
+ };
+}
+
+struct TConfig {
+ const char* TableDDL;
+ const char* Lambda;
+ const char* Message;
+ TVector<std::pair<TString, std::shared_ptr<IChecker>>> Expectations;
+};
+
+
+struct MainTestCase {
+
+ MainTestCase(TConfig&& config)
+ : Config(std::move(config)) {
+ size_t id = TestCaseCounter++;
+
+ TopicName = TStringBuilder() << "Topic_" << id;
+ TableName = TStringBuilder() << "Table_" << id;
+ TransferName = TStringBuilder() << "Transfer_" << id;
+ }
+
+ void Run() {
TString connectionString = GetEnv("YDB_ENDPOINT") + "/?database=" + GetEnv("YDB_DATABASE");
- Cerr << ">>>>> connectionString = " << connectionString << Endl << Flush;
auto config = TDriverConfig(connectionString);
auto driver = TDriver(config);
@@ -42,6 +99,7 @@ Y_UNIT_TEST_SUITE(Transfer)
auto topicClient = TTopicClient(driver);
{
+ auto tableDDL = Sprintf(Config.TableDDL, TableName.data());
auto res = session.ExecuteQuery(tableDDL, TTxControl::NoTx()).GetValueSync();
UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
}
@@ -49,20 +107,13 @@ Y_UNIT_TEST_SUITE(Transfer)
{
auto res = session.ExecuteQuery(Sprintf(R"(
CREATE TOPIC `%s`;
- )", topicName.data()), TTxControl::NoTx()).GetValueSync();
+ )", TopicName.data()), TTxControl::NoTx()).GetValueSync();
UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
}
{
auto res = session.ExecuteQuery(Sprintf(R"(
- $l = ($x) -> {
- return [
- <|
- Key:CAST($x._offset AS Uint64),
- Message:CAST($x._data AS Utf8)
- |>
- ];
- };
+ %s;
CREATE TRANSFER `%s`
FROM `%s` TO `%s` USING $l
@@ -70,28 +121,31 @@ Y_UNIT_TEST_SUITE(Transfer)
CONNECTION_STRING = 'grpc://%s'
-- , TOKEN = 'user@builtin'
);
- )", transferName.data(), topicName.data(), tableName.data(), connectionString.data()), TTxControl::NoTx()).GetValueSync();
+ )", Config.Lambda, TransferName.data(), TopicName.data(), TableName.data(), connectionString.data()), TTxControl::NoTx()).GetValueSync();
UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
}
{
TWriteSessionSettings writeSettings;
- writeSettings.Path(topicName);
+ writeSettings.Path(TopicName);
writeSettings.DeduplicationEnabled(false);
auto writeSession = topicClient.CreateSimpleBlockingWriteSession(writeSettings);
- UNIT_ASSERT(writeSession->Write("message-1"));
+ UNIT_ASSERT(writeSession->Write(Config.Message));
writeSession->Close(TDuration::Seconds(1));
}
{
for (size_t attempt = 20; attempt--; ) {
- auto res = DoRead(session, tableName);
+ auto res = DoRead(session);
Cerr << "Attempt=" << attempt << " count=" << res.first << Endl << Flush;
if (res.first == 1) {
const Ydb::ResultSet& proto = res.second;
- UNIT_ASSERT_VALUES_EQUAL(proto.rows(0).items(0).uint64_value(), 0);
- UNIT_ASSERT_VALUES_EQUAL(proto.rows(0).items(1).text_value(), "message-1");
+ for (size_t i = 0; i < Config.Expectations.size(); ++i) {
+ auto& c = Config.Expectations[i];
+ TString msg = TStringBuilder() << "Column '" << c.first << "': ";
+ c.second->Assert(msg, proto.rows(0).items(i));
+ }
break;
}
@@ -102,30 +156,283 @@ Y_UNIT_TEST_SUITE(Transfer)
}
}
- Y_UNIT_TEST(Main_ColumnTable)
+
+ std::pair<ui64, Ydb::ResultSet> DoRead(TSession& s) {
+ TStringBuilder columns;
+ for (size_t i = 0; i < Config.Expectations.size(); ++i) {
+ if (i) {
+ columns << ", ";
+ }
+ columns << "`" << Config.Expectations[i].first << "`";
+ }
+
+
+ auto res = s.ExecuteQuery(
+ Sprintf("SELECT %s FROM `%s`", columns.data(), TableName.data()),
+ TTxControl::NoTx()).GetValueSync();
+ UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
+
+ const auto proto = NYdb::TProtoAccessor::GetProto(res.GetResultSet(0));
+ return {proto.rowsSize(), proto};
+ }
+
+ TConfig Config;
+
+ TString TopicName;
+ TString TableName;
+ TString TransferName;
+
+ std::vector<std::string> ColumnNames;
+};
+
+
+} // namespace
+
+Y_UNIT_TEST_SUITE(Transfer)
+{
+ Y_UNIT_TEST(Main_ColumnTable_KeyColumnFirst)
{
- Main("ColumnTransfer1", "TargetColumnTable1", R"(
- CREATE TABLE `/local/TargetColumnTable1` (
- Key Uint64 NOT NULL,
- Message Utf8 NOT NULL,
- PRIMARY KEY (Key)
- ) WITH (
- STORE = COLUMN
- );
- )", "SourceTopic1");
+ MainTestCase({
+ .TableDDL = R"(
+ CREATE TABLE `%s` (
+ Key Uint64 NOT NULL,
+ Message Utf8 NOT NULL,
+ PRIMARY KEY (Key)
+ ) WITH (
+ STORE = COLUMN
+ );
+ )",
+
+ .Lambda = R"(
+ $l = ($x) -> {
+ return [
+ <|
+ Key:CAST($x._offset AS Uint64),
+ Message:CAST($x._data AS Utf8)
+ |>
+ ];
+ };
+ )",
+
+ .Message = "Message-1",
+
+ .Expectations = {
+ _C("Key", ui64(0)),
+ _C("Message", TString("Message-1")),
+ }
+ }).Run();
}
- Y_UNIT_TEST(Main_ColumnTable_KeyColumnOrder)
+ Y_UNIT_TEST(Main_ColumnTable_KeyColumnLast)
{
- Main("ColumnTransfer2", "TargetColumnTable2", R"(
- CREATE TABLE `/local/TargetColumnTable2` (
- Message Utf8 NOT NULL,
- Key Uint64 NOT NULL,
- PRIMARY KEY (Key)
- ) WITH (
- STORE = COLUMN
- );
- )", "SourceTopic2");
+ MainTestCase({
+ .TableDDL = R"(
+ CREATE TABLE `%s` (
+ Message Utf8 NOT NULL,
+ Key Uint64 NOT NULL,
+ PRIMARY KEY (Key)
+ ) WITH (
+ STORE = COLUMN
+ );
+ )",
+
+ .Lambda = R"(
+ $l = ($x) -> {
+ return [
+ <|
+ Key:CAST($x._offset AS Uint64),
+ Message:CAST($x._data AS Utf8)
+ |>
+ ];
+ };
+ )",
+
+ .Message = "Message-1",
+
+ .Expectations = {
+ _C("Key", ui64(0)),
+ _C("Message", TString("Message-1")),
+ }
+ }).Run();
}
+
+ Y_UNIT_TEST(Main_ColumnTable_JsonMessage)
+ {
+ MainTestCase({
+ .TableDDL = R"(
+ CREATE TABLE `%s` (
+ Id Uint64 NOT NULL,
+ FirstName Utf8 NOT NULL,
+ LastName Utf8 NOT NULL,
+ Salary Uint64 NOT NULL,
+ PRIMARY KEY (Id)
+ ) WITH (
+ STORE = COLUMN
+ );
+ )",
+
+ .Lambda = R"(
+ $l = ($x) -> {
+ $input = CAST($x._data AS JSON);
+
+ return [
+ <|
+ Id: Yson::ConvertToUint64($input.id),
+ FirstName: CAST(Yson::ConvertToString($input.first_name) AS Utf8),
+ LastName: CAST(Yson::ConvertToString($input.last_name) AS Utf8),
+ Salary: CAST(Yson::ConvertToString($input.salary) AS UInt64)
+ |>
+ ];
+ };
+ )",
+
+ .Message = R"({
+ "id": 1,
+ "first_name": "Vasya",
+ "last_name": "Pupkin",
+ "salary": "123"
+ })",
+
+ .Expectations = {
+ _C("Id", ui64(1)),
+ _C("FirstName", TString("Vasya")),
+ _C("LastName", TString("Pupkin")),
+ _C("Salary", ui64(123)),
+ }
+ }).Run();
+ }
+
+ Y_UNIT_TEST(Main_ColumnTable_NullableColumn)
+ {
+ MainTestCase({
+ .TableDDL = R"(
+ CREATE TABLE `%s` (
+ Key Uint64 NOT NULL,
+ Message Utf8,
+ PRIMARY KEY (Key)
+ ) WITH (
+ STORE = COLUMN
+ );
+ )",
+
+ .Lambda = R"(
+ $l = ($x) -> {
+ return [
+ <|
+ Key:CAST($x._offset AS Uint64),
+ Message:CAST($x._data AS Utf8)
+ |>
+ ];
+ };
+ )",
+
+ .Message = "Message-1",
+
+ .Expectations = {
+ _C("Key", ui64(0)),
+ _C("Message", TString("Message-1")),
+ }
+ }).Run();
+ }
+
+ Y_UNIT_TEST(Main_ColumnTable_Date)
+ {
+ MainTestCase({
+ .TableDDL = R"(
+ CREATE TABLE `%s` (
+ Key Uint64 NOT NULL,
+ Message Date,
+ PRIMARY KEY (Key)
+ ) WITH (
+ STORE = COLUMN
+ );
+ )",
+
+ .Lambda = R"(
+ $l = ($x) -> {
+ return [
+ <|
+ Key:CAST($x._offset AS Uint64),
+ Message: CAST($x._data AS Date)
+ |>
+ ];
+ };
+ )",
+
+ .Message = "2025-02-21",
+
+ .Expectations = {
+ _C("Key", ui64(0)),
+ _C("Message", TInstant::ParseIso8601("2025-02-21")),
+ }
+ }).Run();
+ }
+
+ Y_UNIT_TEST(Main_ColumnTable_Double)
+ {
+ MainTestCase({
+ .TableDDL = R"(
+ CREATE TABLE `%s` (
+ Key Uint64 NOT NULL,
+ Message Double,
+ PRIMARY KEY (Key)
+ ) WITH (
+ STORE = COLUMN
+ );
+ )",
+
+ .Lambda = R"(
+ $l = ($x) -> {
+ return [
+ <|
+ Key:CAST($x._offset AS Uint64),
+ Message: CAST($x._data AS Double)
+ |>
+ ];
+ };
+ )",
+
+ .Message = "1.23",
+
+ .Expectations = {
+ _C("Key", ui64(0)),
+ _C("Message", 1.23),
+ }
+ }).Run();
+ }
+
+ Y_UNIT_TEST(Main_ColumnTable_Utf8_Long)
+ {
+ MainTestCase({
+ .TableDDL = R"(
+ CREATE TABLE `%s` (
+ Key Uint64 NOT NULL,
+ Message Utf8 NOT NULL,
+ PRIMARY KEY (Key)
+ ) WITH (
+ STORE = COLUMN
+ );
+ )",
+
+ .Lambda = R"(
+ $l = ($x) -> {
+ return [
+ <|
+ Key:CAST($x._offset AS Uint64),
+ Message:CAST($x._data AS Utf8)
+ |>
+ ];
+ };
+ )",
+
+ .Message = "Message-1 long value 0 1234567890 1 1234567890 2 1234567890 3 1234567890 4 1234567890 5 1234567890 6 1234567890",
+
+ .Expectations = {
+ _C("Key", ui64(0)),
+ _C("Message", TString("Message-1 long value 0 1234567890 1 1234567890 2 1234567890 3 1234567890 4 1234567890 5 1234567890 6 1234567890")),
+ }
+ }).Run();
+ }
+
}