diff options
author | Nikolay Shestakov <tesseract@ydb.tech> | 2025-02-21 19:00:13 +0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-02-21 14:00:13 +0000 |
commit | 147c972fc3a3eabca9f89df5473016a0afe338b8 (patch) | |
tree | bb5bc36a6afffa14d397c7245efaed0a5c0558ed | |
parent | d2ecf39bd530f2f1d5e96042820c544d53b70196 (diff) | |
download | ydb-147c972fc3a3eabca9f89df5473016a0afe338b8.tar.gz |
Fix wrong order of columns for transfer data from topic to column table (#14887)
-rw-r--r-- | ydb/core/tx/replication/service/transfer_writer.cpp | 25 | ||||
-rw-r--r-- | ydb/tests/functional/transfer/main.cpp | 403 |
2 files changed, 370 insertions, 58 deletions
diff --git a/ydb/core/tx/replication/service/transfer_writer.cpp b/ydb/core/tx/replication/service/transfer_writer.cpp index f9d4314c54..bb2b193e4f 100644 --- a/ydb/core/tx/replication/service/transfer_writer.cpp +++ b/ydb/core/tx/replication/service/transfer_writer.cpp @@ -236,27 +236,32 @@ TScheme BuildScheme(const TAutoPtr<NSchemeCache::TSchemeCacheNavigate>& nav) { }); result.TableColumns.resize(keyColumns); - result.ColumnsMetadata.resize(keyColumns); for (const auto& [_, column] : entry.Columns) { - NKikimrKqp::TKqpColumnMetadataProto* c; if (column.KeyOrder >= 0) { result.TableColumns[column.KeyOrder] = {column.Name, column.Id, column.PType, column.KeyOrder >= 0, !column.IsNotNullColumn}; - c = &result.ColumnsMetadata[column.KeyOrder]; } else { result.TableColumns.emplace_back(column.Name, column.Id, column.PType, column.KeyOrder >= 0, !column.IsNotNullColumn); - result.ColumnsMetadata.emplace_back(); - c = &result.ColumnsMetadata.back(); } + } + + std::map<TString, TSysTables::TTableColumnInfo> columns; + for (const auto& [_, column] : entry.Columns) { + columns[column.Name] = column; + } - result.WriteIndex.push_back(result.WriteIndex.size()); + size_t i = keyColumns; + for (const auto& [_, column] : columns) { + result.ColumnsMetadata.emplace_back(); + auto& c = result.ColumnsMetadata.back(); + result.WriteIndex.push_back(column.KeyOrder >= 0 ? column.KeyOrder : i++); - c->SetName(column.Name); - c->SetId(column.Id); - c->SetTypeId(column.PType.GetTypeId()); + c.SetName(column.Name); + c.SetId(column.Id); + c.SetTypeId(column.PType.GetTypeId()); if (NScheme::NTypeIds::IsParametrizedType(column.PType.GetTypeId())) { - NScheme::ProtoFromTypeInfo(column.PType, "", *c->MutableTypeInfo()); + NScheme::ProtoFromTypeInfo(column.PType, "", *c.MutableTypeInfo()); } } diff --git a/ydb/tests/functional/transfer/main.cpp b/ydb/tests/functional/transfer/main.cpp index b16c7357d6..9ea3b36bb7 100644 --- a/ydb/tests/functional/transfer/main.cpp +++ b/ydb/tests/functional/transfer/main.cpp @@ -15,25 +15,82 @@ using namespace NYdb::NTopic; namespace { -std::pair<ui64, Ydb::ResultSet> DoRead(TSession& s, const TString& table) { - auto res = s.ExecuteQuery( - Sprintf("SELECT Key, Message FROM `/local/%s` ORDER BY Key", table.data()), - TTxControl::NoTx()).GetValueSync(); - UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); +volatile size_t TestCaseCounter = 0; - const auto proto = NYdb::TProtoAccessor::GetProto(res.GetResultSet(0)); +struct IChecker { + virtual void Assert(const TString& msg, const ::Ydb::Value& value) = 0; + virtual ~IChecker() = default; +}; - return {proto.rowsSize(), proto}; +template<typename T> +struct Checker : public IChecker { + Checker(T&& expected) + : Expected(std::move(expected)) + {} + + void Assert(const TString& msg, const ::Ydb::Value& value) override { + UNIT_ASSERT_VALUES_EQUAL_C(Get(value), Expected, msg); + } + + T Get(const ::Ydb::Value& value); + + T Expected; +}; + +template<> +bool Checker<bool>::Get(const ::Ydb::Value& value) { + return value.bool_value(); } -} // namespace +template<> +ui64 Checker<ui64>::Get(const ::Ydb::Value& value) { + return value.uint64_value(); +} -Y_UNIT_TEST_SUITE(Transfer) -{ - void Main(const TString& transferName, const TString& tableName, const TString& tableDDL, const TString& topicName) - { +template<> +double Checker<double>::Get(const ::Ydb::Value& value) { + return value.double_value(); +} + +template<> +TString Checker<TString>::Get(const ::Ydb::Value& value) { + return value.text_value(); +} + +template<> +TInstant Checker<TInstant>::Get(const ::Ydb::Value& value) { + return TInstant::Days(value.uint32_value()); +} + +template<typename T> +std::pair<TString, std::shared_ptr<IChecker>> _C(TString&& name, T&& expected) { + return { + std::move(name), + std::make_shared<Checker<T>>(std::move(expected)) + }; +} + +struct TConfig { + const char* TableDDL; + const char* Lambda; + const char* Message; + TVector<std::pair<TString, std::shared_ptr<IChecker>>> Expectations; +}; + + +struct MainTestCase { + + MainTestCase(TConfig&& config) + : Config(std::move(config)) { + size_t id = TestCaseCounter++; + + TopicName = TStringBuilder() << "Topic_" << id; + TableName = TStringBuilder() << "Table_" << id; + TransferName = TStringBuilder() << "Transfer_" << id; + } + + void Run() { TString connectionString = GetEnv("YDB_ENDPOINT") + "/?database=" + GetEnv("YDB_DATABASE"); - Cerr << ">>>>> connectionString = " << connectionString << Endl << Flush; auto config = TDriverConfig(connectionString); auto driver = TDriver(config); @@ -42,6 +99,7 @@ Y_UNIT_TEST_SUITE(Transfer) auto topicClient = TTopicClient(driver); { + auto tableDDL = Sprintf(Config.TableDDL, TableName.data()); auto res = session.ExecuteQuery(tableDDL, TTxControl::NoTx()).GetValueSync(); UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); } @@ -49,20 +107,13 @@ Y_UNIT_TEST_SUITE(Transfer) { auto res = session.ExecuteQuery(Sprintf(R"( CREATE TOPIC `%s`; - )", topicName.data()), TTxControl::NoTx()).GetValueSync(); + )", TopicName.data()), TTxControl::NoTx()).GetValueSync(); UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); } { auto res = session.ExecuteQuery(Sprintf(R"( - $l = ($x) -> { - return [ - <| - Key:CAST($x._offset AS Uint64), - Message:CAST($x._data AS Utf8) - |> - ]; - }; + %s; CREATE TRANSFER `%s` FROM `%s` TO `%s` USING $l @@ -70,28 +121,31 @@ Y_UNIT_TEST_SUITE(Transfer) CONNECTION_STRING = 'grpc://%s' -- , TOKEN = 'user@builtin' ); - )", transferName.data(), topicName.data(), tableName.data(), connectionString.data()), TTxControl::NoTx()).GetValueSync(); + )", Config.Lambda, TransferName.data(), TopicName.data(), TableName.data(), connectionString.data()), TTxControl::NoTx()).GetValueSync(); UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); } { TWriteSessionSettings writeSettings; - writeSettings.Path(topicName); + writeSettings.Path(TopicName); writeSettings.DeduplicationEnabled(false); auto writeSession = topicClient.CreateSimpleBlockingWriteSession(writeSettings); - UNIT_ASSERT(writeSession->Write("message-1")); + UNIT_ASSERT(writeSession->Write(Config.Message)); writeSession->Close(TDuration::Seconds(1)); } { for (size_t attempt = 20; attempt--; ) { - auto res = DoRead(session, tableName); + auto res = DoRead(session); Cerr << "Attempt=" << attempt << " count=" << res.first << Endl << Flush; if (res.first == 1) { const Ydb::ResultSet& proto = res.second; - UNIT_ASSERT_VALUES_EQUAL(proto.rows(0).items(0).uint64_value(), 0); - UNIT_ASSERT_VALUES_EQUAL(proto.rows(0).items(1).text_value(), "message-1"); + for (size_t i = 0; i < Config.Expectations.size(); ++i) { + auto& c = Config.Expectations[i]; + TString msg = TStringBuilder() << "Column '" << c.first << "': "; + c.second->Assert(msg, proto.rows(0).items(i)); + } break; } @@ -102,30 +156,283 @@ Y_UNIT_TEST_SUITE(Transfer) } } - Y_UNIT_TEST(Main_ColumnTable) + + std::pair<ui64, Ydb::ResultSet> DoRead(TSession& s) { + TStringBuilder columns; + for (size_t i = 0; i < Config.Expectations.size(); ++i) { + if (i) { + columns << ", "; + } + columns << "`" << Config.Expectations[i].first << "`"; + } + + + auto res = s.ExecuteQuery( + Sprintf("SELECT %s FROM `%s`", columns.data(), TableName.data()), + TTxControl::NoTx()).GetValueSync(); + UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString()); + + const auto proto = NYdb::TProtoAccessor::GetProto(res.GetResultSet(0)); + return {proto.rowsSize(), proto}; + } + + TConfig Config; + + TString TopicName; + TString TableName; + TString TransferName; + + std::vector<std::string> ColumnNames; +}; + + +} // namespace + +Y_UNIT_TEST_SUITE(Transfer) +{ + Y_UNIT_TEST(Main_ColumnTable_KeyColumnFirst) { - Main("ColumnTransfer1", "TargetColumnTable1", R"( - CREATE TABLE `/local/TargetColumnTable1` ( - Key Uint64 NOT NULL, - Message Utf8 NOT NULL, - PRIMARY KEY (Key) - ) WITH ( - STORE = COLUMN - ); - )", "SourceTopic1"); + MainTestCase({ + .TableDDL = R"( + CREATE TABLE `%s` ( + Key Uint64 NOT NULL, + Message Utf8 NOT NULL, + PRIMARY KEY (Key) + ) WITH ( + STORE = COLUMN + ); + )", + + .Lambda = R"( + $l = ($x) -> { + return [ + <| + Key:CAST($x._offset AS Uint64), + Message:CAST($x._data AS Utf8) + |> + ]; + }; + )", + + .Message = "Message-1", + + .Expectations = { + _C("Key", ui64(0)), + _C("Message", TString("Message-1")), + } + }).Run(); } - Y_UNIT_TEST(Main_ColumnTable_KeyColumnOrder) + Y_UNIT_TEST(Main_ColumnTable_KeyColumnLast) { - Main("ColumnTransfer2", "TargetColumnTable2", R"( - CREATE TABLE `/local/TargetColumnTable2` ( - Message Utf8 NOT NULL, - Key Uint64 NOT NULL, - PRIMARY KEY (Key) - ) WITH ( - STORE = COLUMN - ); - )", "SourceTopic2"); + MainTestCase({ + .TableDDL = R"( + CREATE TABLE `%s` ( + Message Utf8 NOT NULL, + Key Uint64 NOT NULL, + PRIMARY KEY (Key) + ) WITH ( + STORE = COLUMN + ); + )", + + .Lambda = R"( + $l = ($x) -> { + return [ + <| + Key:CAST($x._offset AS Uint64), + Message:CAST($x._data AS Utf8) + |> + ]; + }; + )", + + .Message = "Message-1", + + .Expectations = { + _C("Key", ui64(0)), + _C("Message", TString("Message-1")), + } + }).Run(); } + + Y_UNIT_TEST(Main_ColumnTable_JsonMessage) + { + MainTestCase({ + .TableDDL = R"( + CREATE TABLE `%s` ( + Id Uint64 NOT NULL, + FirstName Utf8 NOT NULL, + LastName Utf8 NOT NULL, + Salary Uint64 NOT NULL, + PRIMARY KEY (Id) + ) WITH ( + STORE = COLUMN + ); + )", + + .Lambda = R"( + $l = ($x) -> { + $input = CAST($x._data AS JSON); + + return [ + <| + Id: Yson::ConvertToUint64($input.id), + FirstName: CAST(Yson::ConvertToString($input.first_name) AS Utf8), + LastName: CAST(Yson::ConvertToString($input.last_name) AS Utf8), + Salary: CAST(Yson::ConvertToString($input.salary) AS UInt64) + |> + ]; + }; + )", + + .Message = R"({ + "id": 1, + "first_name": "Vasya", + "last_name": "Pupkin", + "salary": "123" + })", + + .Expectations = { + _C("Id", ui64(1)), + _C("FirstName", TString("Vasya")), + _C("LastName", TString("Pupkin")), + _C("Salary", ui64(123)), + } + }).Run(); + } + + Y_UNIT_TEST(Main_ColumnTable_NullableColumn) + { + MainTestCase({ + .TableDDL = R"( + CREATE TABLE `%s` ( + Key Uint64 NOT NULL, + Message Utf8, + PRIMARY KEY (Key) + ) WITH ( + STORE = COLUMN + ); + )", + + .Lambda = R"( + $l = ($x) -> { + return [ + <| + Key:CAST($x._offset AS Uint64), + Message:CAST($x._data AS Utf8) + |> + ]; + }; + )", + + .Message = "Message-1", + + .Expectations = { + _C("Key", ui64(0)), + _C("Message", TString("Message-1")), + } + }).Run(); + } + + Y_UNIT_TEST(Main_ColumnTable_Date) + { + MainTestCase({ + .TableDDL = R"( + CREATE TABLE `%s` ( + Key Uint64 NOT NULL, + Message Date, + PRIMARY KEY (Key) + ) WITH ( + STORE = COLUMN + ); + )", + + .Lambda = R"( + $l = ($x) -> { + return [ + <| + Key:CAST($x._offset AS Uint64), + Message: CAST($x._data AS Date) + |> + ]; + }; + )", + + .Message = "2025-02-21", + + .Expectations = { + _C("Key", ui64(0)), + _C("Message", TInstant::ParseIso8601("2025-02-21")), + } + }).Run(); + } + + Y_UNIT_TEST(Main_ColumnTable_Double) + { + MainTestCase({ + .TableDDL = R"( + CREATE TABLE `%s` ( + Key Uint64 NOT NULL, + Message Double, + PRIMARY KEY (Key) + ) WITH ( + STORE = COLUMN + ); + )", + + .Lambda = R"( + $l = ($x) -> { + return [ + <| + Key:CAST($x._offset AS Uint64), + Message: CAST($x._data AS Double) + |> + ]; + }; + )", + + .Message = "1.23", + + .Expectations = { + _C("Key", ui64(0)), + _C("Message", 1.23), + } + }).Run(); + } + + Y_UNIT_TEST(Main_ColumnTable_Utf8_Long) + { + MainTestCase({ + .TableDDL = R"( + CREATE TABLE `%s` ( + Key Uint64 NOT NULL, + Message Utf8 NOT NULL, + PRIMARY KEY (Key) + ) WITH ( + STORE = COLUMN + ); + )", + + .Lambda = R"( + $l = ($x) -> { + return [ + <| + Key:CAST($x._offset AS Uint64), + Message:CAST($x._data AS Utf8) + |> + ]; + }; + )", + + .Message = "Message-1 long value 0 1234567890 1 1234567890 2 1234567890 3 1234567890 4 1234567890 5 1234567890 6 1234567890", + + .Expectations = { + _C("Key", ui64(0)), + _C("Message", TString("Message-1 long value 0 1234567890 1 1234567890 2 1234567890 3 1234567890 4 1234567890 5 1234567890 6 1234567890")), + } + }).Run(); + } + } |