diff options
author | mokhotskii <mokhotskii@ydb.tech> | 2022-12-21 19:02:11 +0300 |
---|---|---|
committer | mokhotskii <mokhotskii@ydb.tech> | 2022-12-21 19:02:11 +0300 |
commit | b1b8ff242fbd109c0b03c2a0ab99c3e196ec1e2e (patch) | |
tree | 05630de3f066a1ed75d795cd08b60b5660c80716 | |
parent | 99c0ebbafdea901b71fb0be0696bc2b6fa58f0f6 (diff) | |
download | ydb-b1b8ff242fbd109c0b03c2a0ab99c3e196ec1e2e.tar.gz |
Finalize cbor feature
Finalize cbor feature
-rw-r--r-- | ydb/core/http_proxy/http_req.cpp | 26 | ||||
-rw-r--r-- | ydb/core/http_proxy/json_proto_conversion.h | 166 | ||||
-rw-r--r-- | ydb/core/http_proxy/ut/CMakeLists.darwin.txt | 1 | ||||
-rw-r--r-- | ydb/core/http_proxy/ut/CMakeLists.linux-aarch64.txt | 1 | ||||
-rw-r--r-- | ydb/core/http_proxy/ut/CMakeLists.linux.txt | 1 | ||||
-rw-r--r-- | ydb/core/http_proxy/ut/json_proto_conversion_ut.cpp | 93 |
6 files changed, 272 insertions, 16 deletions
diff --git a/ydb/core/http_proxy/http_req.cpp b/ydb/core/http_proxy/http_req.cpp index da0088023b..08dbb6571e 100644 --- a/ydb/core/http_proxy/http_req.cpp +++ b/ydb/core/http_proxy/http_req.cpp @@ -795,7 +795,7 @@ namespace NKikimr::NHttpProxy { switch (contentType) { case MIME_CBOR: { auto toCborStr = NJson::WriteJson(Body, false); - auto toCbor = nlohmann::json::to_cbor(nlohmann::json::parse(toCborStr)); + auto toCbor = nlohmann::json::to_cbor(nlohmann::json::parse(toCborStr, nullptr, false)); return {(char*)&toCbor[0], toCbor.size()}; } default: { @@ -806,8 +806,8 @@ namespace NKikimr::NHttpProxy { } void THttpRequestContext::RequestBodyToProto(NProtoBuf::Message* request) { - auto requestJsonStr = Request->Body; - if (requestJsonStr.empty()) { + TStringBuf requestStr = Request->Body; + if (requestStr.empty()) { throw NKikimr::NSQS::TSQSException(NKikimr::NSQS::NErrors::MALFORMED_QUERY_STRING) << "Empty body"; } @@ -817,28 +817,26 @@ namespace NKikimr::NHttpProxy { listStreamsRequest->set_recurse(true); } - std::string bufferStr; switch (ContentType) { case MIME_CBOR: { - // CborToProto(HttpContext.Request->Body, request); - auto fromCbor = nlohmann::json::from_cbor(Request->Body.begin(), - Request->Body.end(), true, false); + auto fromCbor = nlohmann::json::from_cbor(requestStr.begin(), requestStr.end(), + true, false, + nlohmann::json::cbor_tag_handler_t::ignore); if (fromCbor.is_discarded()) { throw NKikimr::NSQS::TSQSException(NKikimr::NSQS::NErrors::MALFORMED_QUERY_STRING) << "Can not parse request body from CBOR"; } else { - bufferStr = fromCbor.dump(); - requestJsonStr = TStringBuf(bufferStr.begin(), bufferStr.end()); + NlohmannJsonToProto(fromCbor, request); } + break; } case MIME_JSON: { - NJson::TJsonValue requestBody; - auto fromJson = NJson::ReadJsonTree(requestJsonStr, &requestBody); - if (fromJson) { - JsonToProto(requestBody, request); - } else { + auto fromJson = nlohmann::json::parse(requestStr, nullptr, false); + if (fromJson.is_discarded()) { throw NKikimr::NSQS::TSQSException(NKikimr::NSQS::NErrors::MALFORMED_QUERY_STRING) << "Can not parse request body from JSON"; + } else { + NlohmannJsonToProto(fromJson, request); } break; } diff --git a/ydb/core/http_proxy/json_proto_conversion.h b/ydb/core/http_proxy/json_proto_conversion.h index 4399cfef9b..0cc5f0915c 100644 --- a/ydb/core/http_proxy/json_proto_conversion.h +++ b/ydb/core/http_proxy/json_proto_conversion.h @@ -9,6 +9,8 @@ #include <ydb/library/naming_conventions/naming_conventions.h> #include <ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.h> +#include <nlohmann/json.hpp> + namespace NKikimr::NHttpProxy { @@ -114,7 +116,7 @@ private: const google::protobuf::Reflection* ProtoReflection = nullptr; }; -void ProtoToJson(const NProtoBuf::Message& resp, NJson::TJsonValue& value) { +inline void ProtoToJson(const NProtoBuf::Message& resp, NJson::TJsonValue& value) { auto config = NProtobufJson::TProto2JsonConfig() .SetFormatOutput(false) .SetMissingSingleKeyMode(NProtobufJson::TProto2JsonConfig::MissingKeyDefault) @@ -278,4 +280,166 @@ void JsonToProto(const NJson::TJsonValue& jsonValue, NProtoBuf::Message* message } } +inline void NlohmannJsonToProto(const nlohmann::json& jsonValue, NProtoBuf::Message* message, ui32 depth = 0) { + Y_ENSURE(depth < 101, "Json depth is > 100"); + Y_ENSURE(jsonValue.is_object(), "Top level of json value is not a map"); + auto* desc = message->GetDescriptor(); + auto* reflection = message->GetReflection(); + for (const auto& [key, value] : jsonValue.get<std::unordered_map<std::string, nlohmann::json>>()) { + auto* fieldDescriptor = desc->FindFieldByName(NNaming::CamelToSnakeCase(key.c_str())); + Y_ENSURE(fieldDescriptor, "Unexpected json key: " + key); + auto transformer = Ydb::DataStreams::V1::TRANSFORM_NONE; + if (fieldDescriptor->options().HasExtension(Ydb::DataStreams::V1::FieldTransformer)) { + transformer = fieldDescriptor->options().GetExtension(Ydb::DataStreams::V1::FieldTransformer); + } + + if (value.is_array()) { + Y_ENSURE(fieldDescriptor->is_repeated()); + for (auto& elem : value) { + switch (transformer) { + case Ydb::DataStreams::V1::TRANSFORM_BASE64: { + Y_ENSURE(fieldDescriptor->cpp_type() == + google::protobuf::FieldDescriptor::CPPTYPE_STRING, + "Base64 transformer is only applicable to strings"); + if (elem.is_binary()) { + reflection->AddString(message, fieldDescriptor, std::string(elem.get_binary().begin(), elem.get_binary().end())); + } else { + reflection->AddString(message, fieldDescriptor, Base64Decode(elem.get<std::string>())); + } + break; + } + case Ydb::DataStreams::V1::TRANSFORM_DOUBLE_S_TO_INT_MS: { + reflection->AddInt64(message, fieldDescriptor, elem.get<double>() * 1000); + break; + } + case Ydb::DataStreams::V1::TRANSFORM_EMPTY_TO_NOTHING: + case Ydb::DataStreams::V1::TRANSFORM_NONE: { + switch (fieldDescriptor->cpp_type()) { + case google::protobuf::FieldDescriptor::CPPTYPE_INT32: + reflection->AddInt32(message, fieldDescriptor, elem.get<i32>()); + break; + case google::protobuf::FieldDescriptor::CPPTYPE_INT64: + reflection->AddInt64(message, fieldDescriptor, elem.get<i32>()); + break; + case google::protobuf::FieldDescriptor::CPPTYPE_UINT32: + reflection->AddUInt32(message, fieldDescriptor, elem.get<ui32>()); + break; + case google::protobuf::FieldDescriptor::CPPTYPE_UINT64: + reflection->AddUInt64(message, fieldDescriptor, elem.get<ui32>()); + break; + case google::protobuf::FieldDescriptor::CPPTYPE_DOUBLE: + reflection->AddDouble(message, fieldDescriptor, elem.get<double>()); + break; + case google::protobuf::FieldDescriptor::CPPTYPE_FLOAT: + reflection->AddFloat(message, fieldDescriptor, elem.get<double>()); + break; + case google::protobuf::FieldDescriptor::CPPTYPE_BOOL: + reflection->AddBool(message, fieldDescriptor, elem.get<bool>()); + break; + case google::protobuf::FieldDescriptor::CPPTYPE_ENUM: + { + const NProtoBuf::EnumValueDescriptor* enumValueDescriptor = + fieldDescriptor->enum_type()->FindValueByName(elem.get<std::string>()); + i32 number{0}; + if (enumValueDescriptor == nullptr && + TryFromString(elem.get<std::string>(), number)) { + enumValueDescriptor = + fieldDescriptor->enum_type()->FindValueByNumber(number); + } + if (enumValueDescriptor != nullptr) { + reflection->AddEnum(message, fieldDescriptor, enumValueDescriptor); + } + } + break; + case google::protobuf::FieldDescriptor::CPPTYPE_STRING: + reflection->AddString(message, fieldDescriptor, elem.get<std::string>()); + break; + case google::protobuf::FieldDescriptor::CPPTYPE_MESSAGE: { + NProtoBuf::Message *msg = reflection->AddMessage(message, fieldDescriptor); + NlohmannJsonToProto(elem, msg, depth + 1); + break; + } + default: + Y_ENSURE(false, "Unexpected type"); + } + break; + } + default: + Y_ENSURE(false, "Unknown transformer type"); + } + } + } else { + switch (transformer) { + case Ydb::DataStreams::V1::TRANSFORM_BASE64: { + Y_ENSURE(fieldDescriptor->cpp_type() == + google::protobuf::FieldDescriptor::CPPTYPE_STRING, + "Base64 transformer is applicable only to strings"); + if (value.is_binary()) { + reflection->SetString(message, fieldDescriptor, std::string(value.get_binary().begin(), value.get_binary().end())); + } else { + reflection->SetString(message, fieldDescriptor, Base64Decode(value.get<std::string>())); + } + break; + } + case Ydb::DataStreams::V1::TRANSFORM_DOUBLE_S_TO_INT_MS: { + reflection->SetInt64(message, fieldDescriptor, value.get<double>() * 1000); + break; + } + case Ydb::DataStreams::V1::TRANSFORM_EMPTY_TO_NOTHING: + case Ydb::DataStreams::V1::TRANSFORM_NONE: { + switch (fieldDescriptor->cpp_type()) { + case google::protobuf::FieldDescriptor::CPPTYPE_INT32: + reflection->SetInt32(message, fieldDescriptor, value.get<i32>()); + break; + case google::protobuf::FieldDescriptor::CPPTYPE_INT64: + reflection->SetInt64(message, fieldDescriptor, value.get<i32>()); + break; + case google::protobuf::FieldDescriptor::CPPTYPE_UINT32: + reflection->SetUInt32(message, fieldDescriptor, value.get<ui32>()); + break; + case google::protobuf::FieldDescriptor::CPPTYPE_UINT64: + reflection->SetUInt64(message, fieldDescriptor, value.get<ui32>()); + break; + case google::protobuf::FieldDescriptor::CPPTYPE_DOUBLE: + reflection->SetDouble(message, fieldDescriptor, value.get<double>()); + break; + case google::protobuf::FieldDescriptor::CPPTYPE_FLOAT: + reflection->SetFloat(message, fieldDescriptor, value.get<double>()); + break; + case google::protobuf::FieldDescriptor::CPPTYPE_BOOL: + reflection->SetBool(message, fieldDescriptor, value.get<bool>()); + break; + case google::protobuf::FieldDescriptor::CPPTYPE_ENUM: { + const NProtoBuf::EnumValueDescriptor* enumValueDescriptor = + fieldDescriptor->enum_type()->FindValueByName(value.get<std::string>()); + i32 number{0}; + if (enumValueDescriptor == nullptr && + TryFromString(value.get<std::string>(), number)) { + enumValueDescriptor = + fieldDescriptor->enum_type()->FindValueByNumber(number); + } + if (enumValueDescriptor != nullptr) { + reflection->SetEnum(message, fieldDescriptor, enumValueDescriptor); + } + break; + } + case google::protobuf::FieldDescriptor::CPPTYPE_STRING: + reflection->SetString(message, fieldDescriptor, value.get<std::string>()); + break; + case google::protobuf::FieldDescriptor::CPPTYPE_MESSAGE: { + auto *msg = reflection->MutableMessage(message, fieldDescriptor); + NlohmannJsonToProto(value, msg, depth + 1); + break; + } + default: + Y_ENSURE(false, "Unexpected type"); + } + break; + } + default: Y_ENSURE(false, "Unexpected transformer"); + } + } + } +} + } // namespace NKikimr::NHttpProxy diff --git a/ydb/core/http_proxy/ut/CMakeLists.darwin.txt b/ydb/core/http_proxy/ut/CMakeLists.darwin.txt index 8167941b47..09f8502a4d 100644 --- a/ydb/core/http_proxy/ut/CMakeLists.darwin.txt +++ b/ydb/core/http_proxy/ut/CMakeLists.darwin.txt @@ -17,6 +17,7 @@ target_link_libraries(ydb-core-http_proxy-ut PUBLIC library-cpp-cpuid_check cpp-testing-unittest_main ydb-core-http_proxy + contrib-restricted-nlohmann_json library-cpp-resource cpp-client-ydb_types ) diff --git a/ydb/core/http_proxy/ut/CMakeLists.linux-aarch64.txt b/ydb/core/http_proxy/ut/CMakeLists.linux-aarch64.txt index 7553897818..a7ee67234d 100644 --- a/ydb/core/http_proxy/ut/CMakeLists.linux-aarch64.txt +++ b/ydb/core/http_proxy/ut/CMakeLists.linux-aarch64.txt @@ -18,6 +18,7 @@ target_link_libraries(ydb-core-http_proxy-ut PUBLIC library-cpp-lfalloc cpp-testing-unittest_main ydb-core-http_proxy + contrib-restricted-nlohmann_json library-cpp-resource cpp-client-ydb_types ) diff --git a/ydb/core/http_proxy/ut/CMakeLists.linux.txt b/ydb/core/http_proxy/ut/CMakeLists.linux.txt index d7d358cb11..3cfcfa4b19 100644 --- a/ydb/core/http_proxy/ut/CMakeLists.linux.txt +++ b/ydb/core/http_proxy/ut/CMakeLists.linux.txt @@ -20,6 +20,7 @@ target_link_libraries(ydb-core-http_proxy-ut PUBLIC library-cpp-cpuid_check cpp-testing-unittest_main ydb-core-http_proxy + contrib-restricted-nlohmann_json library-cpp-resource cpp-client-ydb_types ) diff --git a/ydb/core/http_proxy/ut/json_proto_conversion_ut.cpp b/ydb/core/http_proxy/ut/json_proto_conversion_ut.cpp index eaf335c574..fb986daf92 100644 --- a/ydb/core/http_proxy/ut/json_proto_conversion_ut.cpp +++ b/ydb/core/http_proxy/ut/json_proto_conversion_ut.cpp @@ -70,7 +70,98 @@ Y_UNIT_TEST(JsonToProtoArray) { UNIT_ASSERT_VALUES_EQUAL(message.shard_level_metrics(i), str); } } - } +Y_UNIT_TEST(NlohmannJsonToProtoArray) { + { + Ydb::DataStreams::V1::PutRecordsRequest message; + nlohmann::json jsonValue; + jsonValue["StreamName"] = "stream"; + auto& records = jsonValue["Records"]; + nlohmann::json record; + record["Data"] = nlohmann::json::binary({123,34,116,105,99,107,101,114, + 83,121,109,98,111,108,34,58, + 34,66,82,75,46,65,34,44, + 34,116,114,97,100,101,84,121, + 112,101,34,58,34,83,69,76, + 76,34,44,34,112,114,105,99, + 101,34,58,50,53,49,54,50, + 48,46,49,49,44,34,113,117, + 97,110,116,105,116,121,34,58, + 51,56,50,52,44,34,105,100, + 34,58,54,125}, 42); + record["ExplicitHashKey"] = "exp0"; + record["PartitionKey"] = "part0"; + records.push_back(record); + record["Data"] = nlohmann::json::binary({123,34,116,105,99,107,101,114, + 83,121,109,98,111,108,34,58, + 34,66,82,75,46,65,34,44, + 34,116,114,97,100,101,84,121, + 112,101,34,58,34,83,69,76, + 76,34,44,34,112,114,105,99, + 101,34,58,50,53,49,54,50, + 48,46,49,49,44,34,113,117, + 97,110,116,105,116,121,34,58, + 51,49,50,52,44,34,105,100, + 34,58,50,125}, 42); + record["ExplicitHashKey"] = "exp1"; + record["PartitionKey"] = "part1"; + records.push_back(record); + record["Data"] = nlohmann::json::binary({116,105,99,107,101,114,83,121, + 109,98,111,108,66,82,75,46, + 65,116,114,97,100,101,84,121, + 112,101,83,69,76,76,112,114, + 105,99,101,50,53,49,54,50, + 48,46,0,0,113,117,97,110, + 116,105,116,121,51}, 42); + record["ExplicitHashKey"] = "exp2"; + record["PartitionKey"] = "part2"; + records.push_back(record); + NKikimr::NHttpProxy::NlohmannJsonToProto(jsonValue, &message); + + UNIT_ASSERT_VALUES_EQUAL(message.stream_name(), "stream"); + + UNIT_ASSERT_VALUES_EQUAL(message.records(0).explicit_hash_key(), "exp0"); + UNIT_ASSERT_VALUES_EQUAL(message.records(0).partition_key(), "part0"); + UNIT_ASSERT_VALUES_EQUAL(message.records(0).data(), + "{\"tickerSymbol\":\"BRK.A\",\"tradeType\":\"SELL\",\"price\":251620.11,\"quantity\":3824,\"id\":6}"); + UNIT_ASSERT_VALUES_EQUAL(message.records(1).explicit_hash_key(), "exp1"); + UNIT_ASSERT_VALUES_EQUAL(message.records(1).partition_key(), "part1"); + UNIT_ASSERT_VALUES_EQUAL(message.records(1).data(), + "{\"tickerSymbol\":\"BRK.A\",\"tradeType\":\"SELL\",\"price\":251620.11,\"quantity\":3124,\"id\":2}"); + // This one last record is just an array of bytes with 0 bytes in it + UNIT_ASSERT_VALUES_EQUAL(message.records(2).explicit_hash_key(), "exp2"); + UNIT_ASSERT_VALUES_EQUAL(message.records(2).partition_key(), "part2"); + std::string binaryWithNull{'t','i','c','k','e','r','S','y','m','b','o','l', + 'B','R','K','.','A','t','r','a','d','e','T','y', + 'p','e','S','E','L','L','p','r','i','c','e','2', + '5','1','6','2','0','.','\0','\0','q','u','a','n', + 't','i','t','y','3'}; + UNIT_ASSERT_VALUES_EQUAL(message.records(2).data().size(), binaryWithNull.size()); + for (size_t i = 0; i < binaryWithNull.size(); ++i) { + UNIT_ASSERT_VALUES_EQUAL(binaryWithNull[i], message.records(2).data()[i]); + } + } + + { + Ydb::DataStreams::V1::PutRecordsRequest message; + nlohmann::json jsonValue; + jsonValue["StreamName"] = "stream"; + auto& records = jsonValue["Records"]; + nlohmann::json record; + record["Data"] = "MTIzCg=="; + record["ExplicitHashKey"] = "exp0"; + record["PartitionKey"] = "part0"; + records.push_back(record); + + NKikimr::NHttpProxy::NlohmannJsonToProto(jsonValue, &message); + + UNIT_ASSERT_VALUES_EQUAL(message.stream_name(), "stream"); + + UNIT_ASSERT_VALUES_EQUAL(message.records(0).data(), "123\n"); + UNIT_ASSERT_VALUES_EQUAL(message.records(0).explicit_hash_key(), "exp0"); + UNIT_ASSERT_VALUES_EQUAL(message.records(0).partition_key(), "part0"); + } + +} } // Y_UNIT_TEST_SUITE(JsonProtoConversion) |