diff options
author | ilnaz <ilnaz@ydb.tech> | 2022-11-23 16:31:12 +0300 |
---|---|---|
committer | ilnaz <ilnaz@ydb.tech> | 2022-11-23 16:31:12 +0300 |
commit | 852c6a25f38f4967fa9a5a5d999f50cac0974b8e (patch) | |
tree | 27babaad0dff05aa74d788bb9565cfaf0b3755de | |
parent | b59871c639b7b97ebe658e1d25bca1a4c0534555 (diff) | |
download | ydb-852c6a25f38f4967fa9a5a5d999f50cac0974b8e.tar.gz |
Handle NaNs
-rw-r--r-- | ydb/core/tx/datashard/change_sender_cdc_stream.cpp | 9 | ||||
-rw-r--r-- | ydb/core/tx/datashard/datashard_ut_change_exchange.cpp | 28 |
2 files changed, 34 insertions, 3 deletions
diff --git a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp index 73fe7d13fc0..c63543ca257 100644 --- a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp +++ b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp @@ -109,7 +109,14 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti case NKikimrSchemeOp::ECdcStreamFormatJson: { NJson::TJsonValue json; record.SerializeTo(json); - data.SetData(WriteJson(json, false)); + + TStringStream str; + NJson::TJsonWriterConfig jsonConfig; + jsonConfig.ValidateUtf8 = false; + jsonConfig.WriteNanAsString = true; + WriteJson(&str, &json, jsonConfig); + + data.SetData(str.Str()); cmd.SetPartitionKey(record.GetPartitionKey()); break; } diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp index 60c873c2845..daf0d21ab00 100644 --- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp +++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp @@ -995,7 +995,6 @@ Y_UNIT_TEST_SUITE(Cdc) { } }; - struct TopicRunner { static void Read(const TShardedTableOptions& tableDesc, const TCdcStream& streamDesc, const TVector<TString>& queries, const TVector<TString>& records) @@ -1150,6 +1149,32 @@ Y_UNIT_TEST_SUITE(Cdc) { }); } + Y_UNIT_TEST_TRIPLET(NaN, PqRunner, YdsRunner, TopicRunner) { + const auto variants = std::vector<std::pair<const char*, const char*>>{ + {"Double", ""}, + {"Float", "f"}, + }; + + for (const auto& [type, s] : variants) { + const auto table = TShardedTableOptions() + .Columns({ + {"key", "Uint32", true, false}, + {"value", type, false, false}, + }); + + TRunner::Read(table, Updates(NKikimrSchemeOp::ECdcStreamFormatJson), {Sprintf(R"( + UPSERT INTO `/Root/Table` (key, value) VALUES + (1, 0.0%s/0.0%s), + (2, 1.0%s/0.0%s), + (3, -1.0%s/0.0%s); + )", s, s, s, s, s, s)}, { + R"({"update":{"value":"nan"},"key":[1]})", + R"({"update":{"value":"inf"},"key":[2]})", + R"({"update":{"value":"-inf"},"key":[3]})", + }); + } + } + TShardedTableOptions Utf8Table() { return TShardedTableOptions() .Columns({ @@ -1177,7 +1202,6 @@ Y_UNIT_TEST_SUITE(Cdc) { TRunner::Drop(SimpleTable(), KeysOnly(NKikimrSchemeOp::ECdcStreamFormatJson)); } - Y_UNIT_TEST(AlterViaTopicService) { TTestTopicEnv env(SimpleTable(), KeysOnly(NKikimrSchemeOp::ECdcStreamFormatJson)); auto& client = env.GetClient(); |