aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorilnaz <ilnaz@ydb.tech>2022-11-23 16:31:12 +0300
committerilnaz <ilnaz@ydb.tech>2022-11-23 16:31:12 +0300
commit852c6a25f38f4967fa9a5a5d999f50cac0974b8e (patch)
tree27babaad0dff05aa74d788bb9565cfaf0b3755de
parentb59871c639b7b97ebe658e1d25bca1a4c0534555 (diff)
downloadydb-852c6a25f38f4967fa9a5a5d999f50cac0974b8e.tar.gz
Handle NaNs
-rw-r--r--ydb/core/tx/datashard/change_sender_cdc_stream.cpp9
-rw-r--r--ydb/core/tx/datashard/datashard_ut_change_exchange.cpp28
2 files changed, 34 insertions, 3 deletions
diff --git a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
index 73fe7d13fc0..c63543ca257 100644
--- a/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
+++ b/ydb/core/tx/datashard/change_sender_cdc_stream.cpp
@@ -109,7 +109,14 @@ class TCdcChangeSenderPartition: public TActorBootstrapped<TCdcChangeSenderParti
case NKikimrSchemeOp::ECdcStreamFormatJson: {
NJson::TJsonValue json;
record.SerializeTo(json);
- data.SetData(WriteJson(json, false));
+
+ TStringStream str;
+ NJson::TJsonWriterConfig jsonConfig;
+ jsonConfig.ValidateUtf8 = false;
+ jsonConfig.WriteNanAsString = true;
+ WriteJson(&str, &json, jsonConfig);
+
+ data.SetData(str.Str());
cmd.SetPartitionKey(record.GetPartitionKey());
break;
}
diff --git a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
index 60c873c2845..daf0d21ab00 100644
--- a/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
+++ b/ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
@@ -995,7 +995,6 @@ Y_UNIT_TEST_SUITE(Cdc) {
}
};
-
struct TopicRunner {
static void Read(const TShardedTableOptions& tableDesc, const TCdcStream& streamDesc,
const TVector<TString>& queries, const TVector<TString>& records)
@@ -1150,6 +1149,32 @@ Y_UNIT_TEST_SUITE(Cdc) {
});
}
+ Y_UNIT_TEST_TRIPLET(NaN, PqRunner, YdsRunner, TopicRunner) {
+ const auto variants = std::vector<std::pair<const char*, const char*>>{
+ {"Double", ""},
+ {"Float", "f"},
+ };
+
+ for (const auto& [type, s] : variants) {
+ const auto table = TShardedTableOptions()
+ .Columns({
+ {"key", "Uint32", true, false},
+ {"value", type, false, false},
+ });
+
+ TRunner::Read(table, Updates(NKikimrSchemeOp::ECdcStreamFormatJson), {Sprintf(R"(
+ UPSERT INTO `/Root/Table` (key, value) VALUES
+ (1, 0.0%s/0.0%s),
+ (2, 1.0%s/0.0%s),
+ (3, -1.0%s/0.0%s);
+ )", s, s, s, s, s, s)}, {
+ R"({"update":{"value":"nan"},"key":[1]})",
+ R"({"update":{"value":"inf"},"key":[2]})",
+ R"({"update":{"value":"-inf"},"key":[3]})",
+ });
+ }
+ }
+
TShardedTableOptions Utf8Table() {
return TShardedTableOptions()
.Columns({
@@ -1177,7 +1202,6 @@ Y_UNIT_TEST_SUITE(Cdc) {
TRunner::Drop(SimpleTable(), KeysOnly(NKikimrSchemeOp::ECdcStreamFormatJson));
}
-
Y_UNIT_TEST(AlterViaTopicService) {
TTestTopicEnv env(SimpleTable(), KeysOnly(NKikimrSchemeOp::ECdcStreamFormatJson));
auto& client = env.GetClient();