diff options
author | mokhotskii <mokhotskii@ydb.tech> | 2023-01-21 13:37:30 +0300 |
---|---|---|
committer | mokhotskii <mokhotskii@ydb.tech> | 2023-01-21 13:37:30 +0300 |
commit | b78bf978d416289812a7fc33fa068ca6e487afbc (patch) | |
tree | 64c1ba064a8192dd42371b7850434f24e393c6f9 | |
parent | bac35aae278ce5730098393b98eda87962ed045b (diff) | |
download | ydb-b78bf978d416289812a7fc33fa068ca6e487afbc.tar.gz |
Fix cbor encoding of records in GetRecords responce
Fix cbor behavior
-rw-r--r-- | ydb/core/http_proxy/custom_metrics.h | 2 | ||||
-rw-r--r-- | ydb/core/http_proxy/http_req.cpp | 36 | ||||
-rw-r--r-- | ydb/core/http_proxy/json_proto_conversion.h | 23 | ||||
-rw-r--r-- | ydb/public/api/protos/draft/datastreams.proto | 2 | ||||
-rw-r--r-- | ydb/services/datastreams/datastreams_proxy.cpp | 2 |
5 files changed, 54 insertions, 11 deletions
diff --git a/ydb/core/http_proxy/custom_metrics.h b/ydb/core/http_proxy/custom_metrics.h index 318f790be9..dd1fe842f4 100644 --- a/ydb/core/http_proxy/custom_metrics.h +++ b/ydb/core/http_proxy/custom_metrics.h @@ -158,7 +158,7 @@ void FillOutputCustomMetrics<GetRecordsResult>(const GetRecordsResult& result, c r.partition_key().size() + r.sequence_number().size() + sizeof(r.timestamp()) + - sizeof(r.encryption()) + sizeof(r.encryption_type()) ; }); diff --git a/ydb/core/http_proxy/http_req.cpp b/ydb/core/http_proxy/http_req.cpp index 08dbb6571e..339cc35460 100644 --- a/ydb/core/http_proxy/http_req.cpp +++ b/ydb/core/http_proxy/http_req.cpp @@ -478,7 +478,8 @@ namespace NKikimr::NHttpProxy { void HandleGrpcResponse(TEvServerlessProxy::TEvGrpcRequestResult::TPtr ev, const TActorContext& ctx) { if (ev->Get()->Status->IsSuccess()) { - ProtoToJson(*ev->Get()->Message, HttpContext.ResponseData.Body); + ProtoToJson(*ev->Get()->Message, HttpContext.ResponseData.Body, + HttpContext.ContentType == MIME_CBOR); FillOutputCustomMetrics<TProtoResult>( *(dynamic_cast<TProtoResult*>(ev->Get()->Message.Get())), HttpContext, ctx); ReportLatencyCounters(ctx); @@ -792,10 +793,41 @@ namespace NKikimr::NHttpProxy { } TString THttpResponseData::DumpBody(MimeTypes contentType) { + // according to https://json.nlohmann.me/features/binary_formats/cbor/#serialization + auto cborBinaryTagBySize = [](size_t size) -> ui8 { + if (size <= 23) { + return 0x40 + static_cast<ui32>(size); + } else if (size <= 255) { + return 0x58; + } else if (size <= 65536) { + return 0x59; + } + + return 0x5A; + }; switch (contentType) { case MIME_CBOR: { + bool gotData = false; + std::function<bool(int, nlohmann::json::parse_event_t, nlohmann::basic_json<>&)> bz = + [&gotData, &cborBinaryTagBySize](int, nlohmann::json::parse_event_t event, nlohmann::json& parsed) { + if (event == nlohmann::json::parse_event_t::key and parsed == nlohmann::json("Data")) { + gotData = true; + return true; + } + if (event == nlohmann::json::parse_event_t::value and gotData) { + gotData = false; + std::string data = parsed.get<std::string>(); + parsed = nlohmann::json::binary({data.begin(), data.end()}, + cborBinaryTagBySize(data.size())); + return true; + } + return true; + }; + auto toCborStr = NJson::WriteJson(Body, false); - auto toCbor = nlohmann::json::to_cbor(nlohmann::json::parse(toCborStr, nullptr, false)); + auto json = + nlohmann::json::parse(TStringBuf(toCborStr).begin(), TStringBuf(toCborStr).end(), bz, false); + auto toCbor = nlohmann::json::to_cbor(json); return {(char*)&toCbor[0], toCbor.size()}; } default: { diff --git a/ydb/core/http_proxy/json_proto_conversion.h b/ydb/core/http_proxy/json_proto_conversion.h index 0cc5f0915c..f2601459ec 100644 --- a/ydb/core/http_proxy/json_proto_conversion.h +++ b/ydb/core/http_proxy/json_proto_conversion.h @@ -21,9 +21,11 @@ TString ProxyFieldNameConverter(const google::protobuf::FieldDescriptor& descrip class TYdsProtoToJsonPrinter : public NProtobufJson::TProto2JsonPrinter { public: TYdsProtoToJsonPrinter(const google::protobuf::Reflection* reflection, - const NProtobufJson::TProto2JsonConfig& config) + const NProtobufJson::TProto2JsonConfig& config, + bool skipBase64Encode) : NProtobufJson::TProto2JsonPrinter(config) , ProtoReflection(reflection) + , SkipBase64Encode(skipBase64Encode) {} protected: @@ -48,14 +50,22 @@ protected: key = MakeKey(field); } + auto maybeBase64Encode = [skipBase64Encode = this->SkipBase64Encode, &key](const TString& str) { + if (key == "Data" && skipBase64Encode) { + return str; + } + + return Base64Encode(str); + }; + if (field.is_repeated()) { for (int i = 0, endI = ProtoReflection->FieldSize(proto, &field); i < endI; ++i) { PrintStringValue<false>(field, TStringBuf(), - Base64Encode(proto.GetReflection()->GetRepeatedString(proto, &field, i)), json); + maybeBase64Encode(proto.GetReflection()->GetRepeatedString(proto, &field, i)), json); } } else { PrintStringValue<true>(field, key, - Base64Encode(proto.GetReflection()->GetString(proto, &field)), json); + maybeBase64Encode(proto.GetReflection()->GetString(proto, &field)), json); } return; } @@ -114,19 +124,20 @@ protected: private: const google::protobuf::Reflection* ProtoReflection = nullptr; + bool SkipBase64Encode; }; -inline void ProtoToJson(const NProtoBuf::Message& resp, NJson::TJsonValue& value) { +inline void ProtoToJson(const NProtoBuf::Message& resp, NJson::TJsonValue& value, bool skipBase64Encode) { auto config = NProtobufJson::TProto2JsonConfig() .SetFormatOutput(false) .SetMissingSingleKeyMode(NProtobufJson::TProto2JsonConfig::MissingKeyDefault) .SetNameGenerator(ProxyFieldNameConverter) .SetEnumMode(NProtobufJson::TProto2JsonConfig::EnumName); - TYdsProtoToJsonPrinter printer(resp.GetReflection(), config); + TYdsProtoToJsonPrinter printer(resp.GetReflection(), config, skipBase64Encode); printer.Print(resp, *NProtobufJson::CreateJsonMapOutput(value)); } -void JsonToProto(const NJson::TJsonValue& jsonValue, NProtoBuf::Message* message, ui32 depth = 0) { +inline void JsonToProto(const NJson::TJsonValue& jsonValue, NProtoBuf::Message* message, ui32 depth = 0) { Y_ENSURE(depth < 101, "Json depth is > 100"); Y_ENSURE(jsonValue.IsMap(), "Top level of json value is not a map"); auto* desc = message->GetDescriptor(); diff --git a/ydb/public/api/protos/draft/datastreams.proto b/ydb/public/api/protos/draft/datastreams.proto index d2825e065e..08a4ccb4c9 100644 --- a/ydb/public/api/protos/draft/datastreams.proto +++ b/ydb/public/api/protos/draft/datastreams.proto @@ -70,7 +70,7 @@ message Record { // Data blob bytes data = 2 [(FieldTransformer) = TRANSFORM_BASE64]; // Encryption type used on record - EncryptionType encryption = 3; + EncryptionType encryption_type = 3; // Identifies shard in the stream the record is assigned to string partition_key = 4; // Unique id of the record within shard diff --git a/ydb/services/datastreams/datastreams_proxy.cpp b/ydb/services/datastreams/datastreams_proxy.cpp index adb1da7006..586f9a9eae 100644 --- a/ydb/services/datastreams/datastreams_proxy.cpp +++ b/ydb/services/datastreams/datastreams_proxy.cpp @@ -1461,7 +1461,7 @@ namespace NKikimr::NDataStreams::V1 { auto record = Result.add_records(); record->set_data(proto.GetData()); record->set_timestamp(r.GetCreateTimestampMS()); - record->set_encryption(Ydb::DataStreams::V1::EncryptionType::NONE); + record->set_encryption_type(Ydb::DataStreams::V1::EncryptionType::NONE); record->set_partition_key(r.GetPartitionKey()); record->set_sequence_number(std::to_string(r.GetOffset()).c_str()); if (proto.GetCodec() > 0) { |