diff options
author | Pisarenko Grigoriy <grigoriypisar@ydb.tech> | 2025-04-10 11:47:34 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-04-10 11:47:34 +0300 |
commit | cbbff2d9a5cce993e8a2d1763b61414fbcf2003e (patch) | |
tree | 10ceac7c70e57caa0beb0ae804fdac4144751521 | |
parent | e1ad79a5d044edeeda41a7537d403341ea478746 (diff) | |
download | ydb-cbbff2d9a5cce993e8a2d1763b61414fbcf2003e.tar.gz |
YQ-4221 fixed s3 write with large path (#16902)
-rw-r--r-- | ydb/core/kqp/proxy_service/kqp_script_executions.cpp | 28 | ||||
-rw-r--r-- | ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp | 7 |
2 files changed, 32 insertions, 3 deletions
diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp index 6a727ba4899..47e279e3b1b 100644 --- a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp +++ b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp @@ -70,6 +70,30 @@ NYql::TIssues DeserializeIssues(const TString& issuesSerialized) { return issues; } +template <typename TProto> +void SerializeBinaryProto(const TProto& proto, NJson::TJsonValue& value) { + value.SetType(NJson::EJsonValueType::JSON_MAP); + + const auto config = NProtobufJson::TProto2JsonConfig() + .AddStringTransform(MakeIntrusive<NProtobufJson::TBase64EncodeBytesTransform>()); + + NProtobufJson::Proto2Json(proto, value["encoded_proto"], config); +} + +template <typename TProto> +void DeserializeBinaryProto(const NJson::TJsonValue& value, TProto& proto) { + const auto& valueMap = value.GetMap(); + const auto encodedProto = valueMap.find("encoded_proto"); + if (encodedProto == valueMap.end()) { + return NProtobufJson::Json2Proto(value, proto, NProtobufJson::TJson2ProtoConfig()); + } + + const auto config = NProtobufJson::TJson2ProtoConfig() + .AddStringTransform(MakeIntrusive<NProtobufJson::TBase64DecodeBytesTransform>()); + + NProtobufJson::Json2Proto(encodedProto->second, proto, config); +} + class TQueryBase : public NKikimr::TQueryBase { public: @@ -2272,7 +2296,7 @@ private: NJson::TJsonValue::TArray& jsonArray = value.GetArraySafe(); jsonArray.resize(sinks.size()); for (size_t i = 0; i < sinks.size(); ++i) { - NProtobufJson::Proto2Json(sinks[i], jsonArray[i], NProtobufJson::TProto2JsonConfig()); + SerializeBinaryProto(sinks[i], jsonArray[i]); } NJsonWriter::TBuf serializedSinks; @@ -2416,7 +2440,7 @@ public: value.GetValuePointer(i, &serializedSink); NKqpProto::TKqpExternalSink sink; - NProtobufJson::Json2Proto(*serializedSink, sink); + DeserializeBinaryProto(*serializedSink, sink); Response->Sinks.push_back(sink); } } diff --git a/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp b/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp index 85ae65f32fc..026e9bda4ca 100644 --- a/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp +++ b/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp @@ -785,7 +785,10 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { Y_UNIT_TEST(InsertIntoBucketWithSelect) { const TString writeDataSourceName = "/Root/write_data_source"; const TString writeBucket = "test_bucket_write_with_select"; - const TString writeObject = "test_object_write/"; + + // Also tests large object path with size >= 128 + // for atomic upload commit case + const TString writeObject = TStringBuilder() << "test_object_write/" << TString(512, 'x') << "/"; { Aws::S3::S3Client s3Client = MakeS3Client(); @@ -811,6 +814,8 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) { UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); const TString sql = fmt::format(R"( + PRAGMA s3.AtomicUploadCommit = "true"; + INSERT INTO `{write_source}`.`{write_object}` WITH (FORMAT = "csv_with_names") SELECT * FROM AS_TABLE([<|id: 0, payload: "#######"|>]); |