aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormokhotskii <mokhotskii@ydb.tech>2023-01-21 13:37:30 +0300
committermokhotskii <mokhotskii@ydb.tech>2023-01-21 13:37:30 +0300
commitb78bf978d416289812a7fc33fa068ca6e487afbc (patch)
tree64c1ba064a8192dd42371b7850434f24e393c6f9
parentbac35aae278ce5730098393b98eda87962ed045b (diff)
downloadydb-b78bf978d416289812a7fc33fa068ca6e487afbc.tar.gz
Fix cbor encoding of records in GetRecords responce
Fix cbor behavior
-rw-r--r--ydb/core/http_proxy/custom_metrics.h2
-rw-r--r--ydb/core/http_proxy/http_req.cpp36
-rw-r--r--ydb/core/http_proxy/json_proto_conversion.h23
-rw-r--r--ydb/public/api/protos/draft/datastreams.proto2
-rw-r--r--ydb/services/datastreams/datastreams_proxy.cpp2
5 files changed, 54 insertions, 11 deletions
diff --git a/ydb/core/http_proxy/custom_metrics.h b/ydb/core/http_proxy/custom_metrics.h
index 318f790be9..dd1fe842f4 100644
--- a/ydb/core/http_proxy/custom_metrics.h
+++ b/ydb/core/http_proxy/custom_metrics.h
@@ -158,7 +158,7 @@ void FillOutputCustomMetrics<GetRecordsResult>(const GetRecordsResult& result, c
r.partition_key().size() +
r.sequence_number().size() +
sizeof(r.timestamp()) +
- sizeof(r.encryption())
+ sizeof(r.encryption_type())
;
});
diff --git a/ydb/core/http_proxy/http_req.cpp b/ydb/core/http_proxy/http_req.cpp
index 08dbb6571e..339cc35460 100644
--- a/ydb/core/http_proxy/http_req.cpp
+++ b/ydb/core/http_proxy/http_req.cpp
@@ -478,7 +478,8 @@ namespace NKikimr::NHttpProxy {
void HandleGrpcResponse(TEvServerlessProxy::TEvGrpcRequestResult::TPtr ev,
const TActorContext& ctx) {
if (ev->Get()->Status->IsSuccess()) {
- ProtoToJson(*ev->Get()->Message, HttpContext.ResponseData.Body);
+ ProtoToJson(*ev->Get()->Message, HttpContext.ResponseData.Body,
+ HttpContext.ContentType == MIME_CBOR);
FillOutputCustomMetrics<TProtoResult>(
*(dynamic_cast<TProtoResult*>(ev->Get()->Message.Get())), HttpContext, ctx);
ReportLatencyCounters(ctx);
@@ -792,10 +793,41 @@ namespace NKikimr::NHttpProxy {
}
TString THttpResponseData::DumpBody(MimeTypes contentType) {
+ // according to https://json.nlohmann.me/features/binary_formats/cbor/#serialization
+ auto cborBinaryTagBySize = [](size_t size) -> ui8 {
+ if (size <= 23) {
+ return 0x40 + static_cast<ui32>(size);
+ } else if (size <= 255) {
+ return 0x58;
+ } else if (size <= 65536) {
+ return 0x59;
+ }
+
+ return 0x5A;
+ };
switch (contentType) {
case MIME_CBOR: {
+ bool gotData = false;
+ std::function<bool(int, nlohmann::json::parse_event_t, nlohmann::basic_json<>&)> bz =
+ [&gotData, &cborBinaryTagBySize](int, nlohmann::json::parse_event_t event, nlohmann::json& parsed) {
+ if (event == nlohmann::json::parse_event_t::key and parsed == nlohmann::json("Data")) {
+ gotData = true;
+ return true;
+ }
+ if (event == nlohmann::json::parse_event_t::value and gotData) {
+ gotData = false;
+ std::string data = parsed.get<std::string>();
+ parsed = nlohmann::json::binary({data.begin(), data.end()},
+ cborBinaryTagBySize(data.size()));
+ return true;
+ }
+ return true;
+ };
+
auto toCborStr = NJson::WriteJson(Body, false);
- auto toCbor = nlohmann::json::to_cbor(nlohmann::json::parse(toCborStr, nullptr, false));
+ auto json =
+ nlohmann::json::parse(TStringBuf(toCborStr).begin(), TStringBuf(toCborStr).end(), bz, false);
+ auto toCbor = nlohmann::json::to_cbor(json);
return {(char*)&toCbor[0], toCbor.size()};
}
default: {
diff --git a/ydb/core/http_proxy/json_proto_conversion.h b/ydb/core/http_proxy/json_proto_conversion.h
index 0cc5f0915c..f2601459ec 100644
--- a/ydb/core/http_proxy/json_proto_conversion.h
+++ b/ydb/core/http_proxy/json_proto_conversion.h
@@ -21,9 +21,11 @@ TString ProxyFieldNameConverter(const google::protobuf::FieldDescriptor& descrip
class TYdsProtoToJsonPrinter : public NProtobufJson::TProto2JsonPrinter {
public:
TYdsProtoToJsonPrinter(const google::protobuf::Reflection* reflection,
- const NProtobufJson::TProto2JsonConfig& config)
+ const NProtobufJson::TProto2JsonConfig& config,
+ bool skipBase64Encode)
: NProtobufJson::TProto2JsonPrinter(config)
, ProtoReflection(reflection)
+ , SkipBase64Encode(skipBase64Encode)
{}
protected:
@@ -48,14 +50,22 @@ protected:
key = MakeKey(field);
}
+ auto maybeBase64Encode = [skipBase64Encode = this->SkipBase64Encode, &key](const TString& str) {
+ if (key == "Data" && skipBase64Encode) {
+ return str;
+ }
+
+ return Base64Encode(str);
+ };
+
if (field.is_repeated()) {
for (int i = 0, endI = ProtoReflection->FieldSize(proto, &field); i < endI; ++i) {
PrintStringValue<false>(field, TStringBuf(),
- Base64Encode(proto.GetReflection()->GetRepeatedString(proto, &field, i)), json);
+ maybeBase64Encode(proto.GetReflection()->GetRepeatedString(proto, &field, i)), json);
}
} else {
PrintStringValue<true>(field, key,
- Base64Encode(proto.GetReflection()->GetString(proto, &field)), json);
+ maybeBase64Encode(proto.GetReflection()->GetString(proto, &field)), json);
}
return;
}
@@ -114,19 +124,20 @@ protected:
private:
const google::protobuf::Reflection* ProtoReflection = nullptr;
+ bool SkipBase64Encode;
};
-inline void ProtoToJson(const NProtoBuf::Message& resp, NJson::TJsonValue& value) {
+inline void ProtoToJson(const NProtoBuf::Message& resp, NJson::TJsonValue& value, bool skipBase64Encode) {
auto config = NProtobufJson::TProto2JsonConfig()
.SetFormatOutput(false)
.SetMissingSingleKeyMode(NProtobufJson::TProto2JsonConfig::MissingKeyDefault)
.SetNameGenerator(ProxyFieldNameConverter)
.SetEnumMode(NProtobufJson::TProto2JsonConfig::EnumName);
- TYdsProtoToJsonPrinter printer(resp.GetReflection(), config);
+ TYdsProtoToJsonPrinter printer(resp.GetReflection(), config, skipBase64Encode);
printer.Print(resp, *NProtobufJson::CreateJsonMapOutput(value));
}
-void JsonToProto(const NJson::TJsonValue& jsonValue, NProtoBuf::Message* message, ui32 depth = 0) {
+inline void JsonToProto(const NJson::TJsonValue& jsonValue, NProtoBuf::Message* message, ui32 depth = 0) {
Y_ENSURE(depth < 101, "Json depth is > 100");
Y_ENSURE(jsonValue.IsMap(), "Top level of json value is not a map");
auto* desc = message->GetDescriptor();
diff --git a/ydb/public/api/protos/draft/datastreams.proto b/ydb/public/api/protos/draft/datastreams.proto
index d2825e065e..08a4ccb4c9 100644
--- a/ydb/public/api/protos/draft/datastreams.proto
+++ b/ydb/public/api/protos/draft/datastreams.proto
@@ -70,7 +70,7 @@ message Record {
// Data blob
bytes data = 2 [(FieldTransformer) = TRANSFORM_BASE64];
// Encryption type used on record
- EncryptionType encryption = 3;
+ EncryptionType encryption_type = 3;
// Identifies shard in the stream the record is assigned to
string partition_key = 4;
// Unique id of the record within shard
diff --git a/ydb/services/datastreams/datastreams_proxy.cpp b/ydb/services/datastreams/datastreams_proxy.cpp
index adb1da7006..586f9a9eae 100644
--- a/ydb/services/datastreams/datastreams_proxy.cpp
+++ b/ydb/services/datastreams/datastreams_proxy.cpp
@@ -1461,7 +1461,7 @@ namespace NKikimr::NDataStreams::V1 {
auto record = Result.add_records();
record->set_data(proto.GetData());
record->set_timestamp(r.GetCreateTimestampMS());
- record->set_encryption(Ydb::DataStreams::V1::EncryptionType::NONE);
+ record->set_encryption_type(Ydb::DataStreams::V1::EncryptionType::NONE);
record->set_partition_key(r.GetPartitionKey());
record->set_sequence_number(std::to_string(r.GetOffset()).c_str());
if (proto.GetCodec() > 0) {