aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormokhotskii <mokhotskii@ydb.tech>2022-12-21 19:02:11 +0300
committermokhotskii <mokhotskii@ydb.tech>2022-12-21 19:02:11 +0300
commitb1b8ff242fbd109c0b03c2a0ab99c3e196ec1e2e (patch)
tree05630de3f066a1ed75d795cd08b60b5660c80716
parent99c0ebbafdea901b71fb0be0696bc2b6fa58f0f6 (diff)
downloadydb-b1b8ff242fbd109c0b03c2a0ab99c3e196ec1e2e.tar.gz
Finalize cbor feature
Finalize cbor feature
-rw-r--r--ydb/core/http_proxy/http_req.cpp26
-rw-r--r--ydb/core/http_proxy/json_proto_conversion.h166
-rw-r--r--ydb/core/http_proxy/ut/CMakeLists.darwin.txt1
-rw-r--r--ydb/core/http_proxy/ut/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/http_proxy/ut/CMakeLists.linux.txt1
-rw-r--r--ydb/core/http_proxy/ut/json_proto_conversion_ut.cpp93
6 files changed, 272 insertions, 16 deletions
diff --git a/ydb/core/http_proxy/http_req.cpp b/ydb/core/http_proxy/http_req.cpp
index da0088023b..08dbb6571e 100644
--- a/ydb/core/http_proxy/http_req.cpp
+++ b/ydb/core/http_proxy/http_req.cpp
@@ -795,7 +795,7 @@ namespace NKikimr::NHttpProxy {
switch (contentType) {
case MIME_CBOR: {
auto toCborStr = NJson::WriteJson(Body, false);
- auto toCbor = nlohmann::json::to_cbor(nlohmann::json::parse(toCborStr));
+ auto toCbor = nlohmann::json::to_cbor(nlohmann::json::parse(toCborStr, nullptr, false));
return {(char*)&toCbor[0], toCbor.size()};
}
default: {
@@ -806,8 +806,8 @@ namespace NKikimr::NHttpProxy {
}
void THttpRequestContext::RequestBodyToProto(NProtoBuf::Message* request) {
- auto requestJsonStr = Request->Body;
- if (requestJsonStr.empty()) {
+ TStringBuf requestStr = Request->Body;
+ if (requestStr.empty()) {
throw NKikimr::NSQS::TSQSException(NKikimr::NSQS::NErrors::MALFORMED_QUERY_STRING) <<
"Empty body";
}
@@ -817,28 +817,26 @@ namespace NKikimr::NHttpProxy {
listStreamsRequest->set_recurse(true);
}
- std::string bufferStr;
switch (ContentType) {
case MIME_CBOR: {
- // CborToProto(HttpContext.Request->Body, request);
- auto fromCbor = nlohmann::json::from_cbor(Request->Body.begin(),
- Request->Body.end(), true, false);
+ auto fromCbor = nlohmann::json::from_cbor(requestStr.begin(), requestStr.end(),
+ true, false,
+ nlohmann::json::cbor_tag_handler_t::ignore);
if (fromCbor.is_discarded()) {
throw NKikimr::NSQS::TSQSException(NKikimr::NSQS::NErrors::MALFORMED_QUERY_STRING) <<
"Can not parse request body from CBOR";
} else {
- bufferStr = fromCbor.dump();
- requestJsonStr = TStringBuf(bufferStr.begin(), bufferStr.end());
+ NlohmannJsonToProto(fromCbor, request);
}
+ break;
}
case MIME_JSON: {
- NJson::TJsonValue requestBody;
- auto fromJson = NJson::ReadJsonTree(requestJsonStr, &requestBody);
- if (fromJson) {
- JsonToProto(requestBody, request);
- } else {
+ auto fromJson = nlohmann::json::parse(requestStr, nullptr, false);
+ if (fromJson.is_discarded()) {
throw NKikimr::NSQS::TSQSException(NKikimr::NSQS::NErrors::MALFORMED_QUERY_STRING) <<
"Can not parse request body from JSON";
+ } else {
+ NlohmannJsonToProto(fromJson, request);
}
break;
}
diff --git a/ydb/core/http_proxy/json_proto_conversion.h b/ydb/core/http_proxy/json_proto_conversion.h
index 4399cfef9b..0cc5f0915c 100644
--- a/ydb/core/http_proxy/json_proto_conversion.h
+++ b/ydb/core/http_proxy/json_proto_conversion.h
@@ -9,6 +9,8 @@
#include <ydb/library/naming_conventions/naming_conventions.h>
#include <ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.h>
+#include <nlohmann/json.hpp>
+
namespace NKikimr::NHttpProxy {
@@ -114,7 +116,7 @@ private:
const google::protobuf::Reflection* ProtoReflection = nullptr;
};
-void ProtoToJson(const NProtoBuf::Message& resp, NJson::TJsonValue& value) {
+inline void ProtoToJson(const NProtoBuf::Message& resp, NJson::TJsonValue& value) {
auto config = NProtobufJson::TProto2JsonConfig()
.SetFormatOutput(false)
.SetMissingSingleKeyMode(NProtobufJson::TProto2JsonConfig::MissingKeyDefault)
@@ -278,4 +280,166 @@ void JsonToProto(const NJson::TJsonValue& jsonValue, NProtoBuf::Message* message
}
}
+inline void NlohmannJsonToProto(const nlohmann::json& jsonValue, NProtoBuf::Message* message, ui32 depth = 0) {
+ Y_ENSURE(depth < 101, "Json depth is > 100");
+ Y_ENSURE(jsonValue.is_object(), "Top level of json value is not a map");
+ auto* desc = message->GetDescriptor();
+ auto* reflection = message->GetReflection();
+ for (const auto& [key, value] : jsonValue.get<std::unordered_map<std::string, nlohmann::json>>()) {
+ auto* fieldDescriptor = desc->FindFieldByName(NNaming::CamelToSnakeCase(key.c_str()));
+ Y_ENSURE(fieldDescriptor, "Unexpected json key: " + key);
+ auto transformer = Ydb::DataStreams::V1::TRANSFORM_NONE;
+ if (fieldDescriptor->options().HasExtension(Ydb::DataStreams::V1::FieldTransformer)) {
+ transformer = fieldDescriptor->options().GetExtension(Ydb::DataStreams::V1::FieldTransformer);
+ }
+
+ if (value.is_array()) {
+ Y_ENSURE(fieldDescriptor->is_repeated());
+ for (auto& elem : value) {
+ switch (transformer) {
+ case Ydb::DataStreams::V1::TRANSFORM_BASE64: {
+ Y_ENSURE(fieldDescriptor->cpp_type() ==
+ google::protobuf::FieldDescriptor::CPPTYPE_STRING,
+ "Base64 transformer is only applicable to strings");
+ if (elem.is_binary()) {
+ reflection->AddString(message, fieldDescriptor, std::string(elem.get_binary().begin(), elem.get_binary().end()));
+ } else {
+ reflection->AddString(message, fieldDescriptor, Base64Decode(elem.get<std::string>()));
+ }
+ break;
+ }
+ case Ydb::DataStreams::V1::TRANSFORM_DOUBLE_S_TO_INT_MS: {
+ reflection->AddInt64(message, fieldDescriptor, elem.get<double>() * 1000);
+ break;
+ }
+ case Ydb::DataStreams::V1::TRANSFORM_EMPTY_TO_NOTHING:
+ case Ydb::DataStreams::V1::TRANSFORM_NONE: {
+ switch (fieldDescriptor->cpp_type()) {
+ case google::protobuf::FieldDescriptor::CPPTYPE_INT32:
+ reflection->AddInt32(message, fieldDescriptor, elem.get<i32>());
+ break;
+ case google::protobuf::FieldDescriptor::CPPTYPE_INT64:
+ reflection->AddInt64(message, fieldDescriptor, elem.get<i32>());
+ break;
+ case google::protobuf::FieldDescriptor::CPPTYPE_UINT32:
+ reflection->AddUInt32(message, fieldDescriptor, elem.get<ui32>());
+ break;
+ case google::protobuf::FieldDescriptor::CPPTYPE_UINT64:
+ reflection->AddUInt64(message, fieldDescriptor, elem.get<ui32>());
+ break;
+ case google::protobuf::FieldDescriptor::CPPTYPE_DOUBLE:
+ reflection->AddDouble(message, fieldDescriptor, elem.get<double>());
+ break;
+ case google::protobuf::FieldDescriptor::CPPTYPE_FLOAT:
+ reflection->AddFloat(message, fieldDescriptor, elem.get<double>());
+ break;
+ case google::protobuf::FieldDescriptor::CPPTYPE_BOOL:
+ reflection->AddBool(message, fieldDescriptor, elem.get<bool>());
+ break;
+ case google::protobuf::FieldDescriptor::CPPTYPE_ENUM:
+ {
+ const NProtoBuf::EnumValueDescriptor* enumValueDescriptor =
+ fieldDescriptor->enum_type()->FindValueByName(elem.get<std::string>());
+ i32 number{0};
+ if (enumValueDescriptor == nullptr &&
+ TryFromString(elem.get<std::string>(), number)) {
+ enumValueDescriptor =
+ fieldDescriptor->enum_type()->FindValueByNumber(number);
+ }
+ if (enumValueDescriptor != nullptr) {
+ reflection->AddEnum(message, fieldDescriptor, enumValueDescriptor);
+ }
+ }
+ break;
+ case google::protobuf::FieldDescriptor::CPPTYPE_STRING:
+ reflection->AddString(message, fieldDescriptor, elem.get<std::string>());
+ break;
+ case google::protobuf::FieldDescriptor::CPPTYPE_MESSAGE: {
+ NProtoBuf::Message *msg = reflection->AddMessage(message, fieldDescriptor);
+ NlohmannJsonToProto(elem, msg, depth + 1);
+ break;
+ }
+ default:
+ Y_ENSURE(false, "Unexpected type");
+ }
+ break;
+ }
+ default:
+ Y_ENSURE(false, "Unknown transformer type");
+ }
+ }
+ } else {
+ switch (transformer) {
+ case Ydb::DataStreams::V1::TRANSFORM_BASE64: {
+ Y_ENSURE(fieldDescriptor->cpp_type() ==
+ google::protobuf::FieldDescriptor::CPPTYPE_STRING,
+ "Base64 transformer is applicable only to strings");
+ if (value.is_binary()) {
+ reflection->SetString(message, fieldDescriptor, std::string(value.get_binary().begin(), value.get_binary().end()));
+ } else {
+ reflection->SetString(message, fieldDescriptor, Base64Decode(value.get<std::string>()));
+ }
+ break;
+ }
+ case Ydb::DataStreams::V1::TRANSFORM_DOUBLE_S_TO_INT_MS: {
+ reflection->SetInt64(message, fieldDescriptor, value.get<double>() * 1000);
+ break;
+ }
+ case Ydb::DataStreams::V1::TRANSFORM_EMPTY_TO_NOTHING:
+ case Ydb::DataStreams::V1::TRANSFORM_NONE: {
+ switch (fieldDescriptor->cpp_type()) {
+ case google::protobuf::FieldDescriptor::CPPTYPE_INT32:
+ reflection->SetInt32(message, fieldDescriptor, value.get<i32>());
+ break;
+ case google::protobuf::FieldDescriptor::CPPTYPE_INT64:
+ reflection->SetInt64(message, fieldDescriptor, value.get<i32>());
+ break;
+ case google::protobuf::FieldDescriptor::CPPTYPE_UINT32:
+ reflection->SetUInt32(message, fieldDescriptor, value.get<ui32>());
+ break;
+ case google::protobuf::FieldDescriptor::CPPTYPE_UINT64:
+ reflection->SetUInt64(message, fieldDescriptor, value.get<ui32>());
+ break;
+ case google::protobuf::FieldDescriptor::CPPTYPE_DOUBLE:
+ reflection->SetDouble(message, fieldDescriptor, value.get<double>());
+ break;
+ case google::protobuf::FieldDescriptor::CPPTYPE_FLOAT:
+ reflection->SetFloat(message, fieldDescriptor, value.get<double>());
+ break;
+ case google::protobuf::FieldDescriptor::CPPTYPE_BOOL:
+ reflection->SetBool(message, fieldDescriptor, value.get<bool>());
+ break;
+ case google::protobuf::FieldDescriptor::CPPTYPE_ENUM: {
+ const NProtoBuf::EnumValueDescriptor* enumValueDescriptor =
+ fieldDescriptor->enum_type()->FindValueByName(value.get<std::string>());
+ i32 number{0};
+ if (enumValueDescriptor == nullptr &&
+ TryFromString(value.get<std::string>(), number)) {
+ enumValueDescriptor =
+ fieldDescriptor->enum_type()->FindValueByNumber(number);
+ }
+ if (enumValueDescriptor != nullptr) {
+ reflection->SetEnum(message, fieldDescriptor, enumValueDescriptor);
+ }
+ break;
+ }
+ case google::protobuf::FieldDescriptor::CPPTYPE_STRING:
+ reflection->SetString(message, fieldDescriptor, value.get<std::string>());
+ break;
+ case google::protobuf::FieldDescriptor::CPPTYPE_MESSAGE: {
+ auto *msg = reflection->MutableMessage(message, fieldDescriptor);
+ NlohmannJsonToProto(value, msg, depth + 1);
+ break;
+ }
+ default:
+ Y_ENSURE(false, "Unexpected type");
+ }
+ break;
+ }
+ default: Y_ENSURE(false, "Unexpected transformer");
+ }
+ }
+ }
+}
+
} // namespace NKikimr::NHttpProxy
diff --git a/ydb/core/http_proxy/ut/CMakeLists.darwin.txt b/ydb/core/http_proxy/ut/CMakeLists.darwin.txt
index 8167941b47..09f8502a4d 100644
--- a/ydb/core/http_proxy/ut/CMakeLists.darwin.txt
+++ b/ydb/core/http_proxy/ut/CMakeLists.darwin.txt
@@ -17,6 +17,7 @@ target_link_libraries(ydb-core-http_proxy-ut PUBLIC
library-cpp-cpuid_check
cpp-testing-unittest_main
ydb-core-http_proxy
+ contrib-restricted-nlohmann_json
library-cpp-resource
cpp-client-ydb_types
)
diff --git a/ydb/core/http_proxy/ut/CMakeLists.linux-aarch64.txt b/ydb/core/http_proxy/ut/CMakeLists.linux-aarch64.txt
index 7553897818..a7ee67234d 100644
--- a/ydb/core/http_proxy/ut/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/http_proxy/ut/CMakeLists.linux-aarch64.txt
@@ -18,6 +18,7 @@ target_link_libraries(ydb-core-http_proxy-ut PUBLIC
library-cpp-lfalloc
cpp-testing-unittest_main
ydb-core-http_proxy
+ contrib-restricted-nlohmann_json
library-cpp-resource
cpp-client-ydb_types
)
diff --git a/ydb/core/http_proxy/ut/CMakeLists.linux.txt b/ydb/core/http_proxy/ut/CMakeLists.linux.txt
index d7d358cb11..3cfcfa4b19 100644
--- a/ydb/core/http_proxy/ut/CMakeLists.linux.txt
+++ b/ydb/core/http_proxy/ut/CMakeLists.linux.txt
@@ -20,6 +20,7 @@ target_link_libraries(ydb-core-http_proxy-ut PUBLIC
library-cpp-cpuid_check
cpp-testing-unittest_main
ydb-core-http_proxy
+ contrib-restricted-nlohmann_json
library-cpp-resource
cpp-client-ydb_types
)
diff --git a/ydb/core/http_proxy/ut/json_proto_conversion_ut.cpp b/ydb/core/http_proxy/ut/json_proto_conversion_ut.cpp
index eaf335c574..fb986daf92 100644
--- a/ydb/core/http_proxy/ut/json_proto_conversion_ut.cpp
+++ b/ydb/core/http_proxy/ut/json_proto_conversion_ut.cpp
@@ -70,7 +70,98 @@ Y_UNIT_TEST(JsonToProtoArray) {
UNIT_ASSERT_VALUES_EQUAL(message.shard_level_metrics(i), str);
}
}
-
}
+Y_UNIT_TEST(NlohmannJsonToProtoArray) {
+ {
+ Ydb::DataStreams::V1::PutRecordsRequest message;
+ nlohmann::json jsonValue;
+ jsonValue["StreamName"] = "stream";
+ auto& records = jsonValue["Records"];
+ nlohmann::json record;
+ record["Data"] = nlohmann::json::binary({123,34,116,105,99,107,101,114,
+ 83,121,109,98,111,108,34,58,
+ 34,66,82,75,46,65,34,44,
+ 34,116,114,97,100,101,84,121,
+ 112,101,34,58,34,83,69,76,
+ 76,34,44,34,112,114,105,99,
+ 101,34,58,50,53,49,54,50,
+ 48,46,49,49,44,34,113,117,
+ 97,110,116,105,116,121,34,58,
+ 51,56,50,52,44,34,105,100,
+ 34,58,54,125}, 42);
+ record["ExplicitHashKey"] = "exp0";
+ record["PartitionKey"] = "part0";
+ records.push_back(record);
+ record["Data"] = nlohmann::json::binary({123,34,116,105,99,107,101,114,
+ 83,121,109,98,111,108,34,58,
+ 34,66,82,75,46,65,34,44,
+ 34,116,114,97,100,101,84,121,
+ 112,101,34,58,34,83,69,76,
+ 76,34,44,34,112,114,105,99,
+ 101,34,58,50,53,49,54,50,
+ 48,46,49,49,44,34,113,117,
+ 97,110,116,105,116,121,34,58,
+ 51,49,50,52,44,34,105,100,
+ 34,58,50,125}, 42);
+ record["ExplicitHashKey"] = "exp1";
+ record["PartitionKey"] = "part1";
+ records.push_back(record);
+ record["Data"] = nlohmann::json::binary({116,105,99,107,101,114,83,121,
+ 109,98,111,108,66,82,75,46,
+ 65,116,114,97,100,101,84,121,
+ 112,101,83,69,76,76,112,114,
+ 105,99,101,50,53,49,54,50,
+ 48,46,0,0,113,117,97,110,
+ 116,105,116,121,51}, 42);
+ record["ExplicitHashKey"] = "exp2";
+ record["PartitionKey"] = "part2";
+ records.push_back(record);
+ NKikimr::NHttpProxy::NlohmannJsonToProto(jsonValue, &message);
+
+ UNIT_ASSERT_VALUES_EQUAL(message.stream_name(), "stream");
+
+ UNIT_ASSERT_VALUES_EQUAL(message.records(0).explicit_hash_key(), "exp0");
+ UNIT_ASSERT_VALUES_EQUAL(message.records(0).partition_key(), "part0");
+ UNIT_ASSERT_VALUES_EQUAL(message.records(0).data(),
+ "{\"tickerSymbol\":\"BRK.A\",\"tradeType\":\"SELL\",\"price\":251620.11,\"quantity\":3824,\"id\":6}");
+ UNIT_ASSERT_VALUES_EQUAL(message.records(1).explicit_hash_key(), "exp1");
+ UNIT_ASSERT_VALUES_EQUAL(message.records(1).partition_key(), "part1");
+ UNIT_ASSERT_VALUES_EQUAL(message.records(1).data(),
+ "{\"tickerSymbol\":\"BRK.A\",\"tradeType\":\"SELL\",\"price\":251620.11,\"quantity\":3124,\"id\":2}");
+ // This one last record is just an array of bytes with 0 bytes in it
+ UNIT_ASSERT_VALUES_EQUAL(message.records(2).explicit_hash_key(), "exp2");
+ UNIT_ASSERT_VALUES_EQUAL(message.records(2).partition_key(), "part2");
+ std::string binaryWithNull{'t','i','c','k','e','r','S','y','m','b','o','l',
+ 'B','R','K','.','A','t','r','a','d','e','T','y',
+ 'p','e','S','E','L','L','p','r','i','c','e','2',
+ '5','1','6','2','0','.','\0','\0','q','u','a','n',
+ 't','i','t','y','3'};
+ UNIT_ASSERT_VALUES_EQUAL(message.records(2).data().size(), binaryWithNull.size());
+ for (size_t i = 0; i < binaryWithNull.size(); ++i) {
+ UNIT_ASSERT_VALUES_EQUAL(binaryWithNull[i], message.records(2).data()[i]);
+ }
+ }
+
+ {
+ Ydb::DataStreams::V1::PutRecordsRequest message;
+ nlohmann::json jsonValue;
+ jsonValue["StreamName"] = "stream";
+ auto& records = jsonValue["Records"];
+ nlohmann::json record;
+ record["Data"] = "MTIzCg==";
+ record["ExplicitHashKey"] = "exp0";
+ record["PartitionKey"] = "part0";
+ records.push_back(record);
+
+ NKikimr::NHttpProxy::NlohmannJsonToProto(jsonValue, &message);
+
+ UNIT_ASSERT_VALUES_EQUAL(message.stream_name(), "stream");
+
+ UNIT_ASSERT_VALUES_EQUAL(message.records(0).data(), "123\n");
+ UNIT_ASSERT_VALUES_EQUAL(message.records(0).explicit_hash_key(), "exp0");
+ UNIT_ASSERT_VALUES_EQUAL(message.records(0).partition_key(), "part0");
+ }
+
+}
} // Y_UNIT_TEST_SUITE(JsonProtoConversion)