aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPisarenko Grigoriy <grigoriypisar@ydb.tech>2025-04-10 11:47:34 +0300
committerGitHub <noreply@github.com>2025-04-10 11:47:34 +0300
commitcbbff2d9a5cce993e8a2d1763b61414fbcf2003e (patch)
tree10ceac7c70e57caa0beb0ae804fdac4144751521
parente1ad79a5d044edeeda41a7537d403341ea478746 (diff)
downloadydb-cbbff2d9a5cce993e8a2d1763b61414fbcf2003e.tar.gz
YQ-4221 fixed s3 write with large path (#16902)
-rw-r--r--ydb/core/kqp/proxy_service/kqp_script_executions.cpp28
-rw-r--r--ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp7
2 files changed, 32 insertions, 3 deletions
diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp
index 6a727ba4899..47e279e3b1b 100644
--- a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp
@@ -70,6 +70,30 @@ NYql::TIssues DeserializeIssues(const TString& issuesSerialized) {
return issues;
}
+template <typename TProto>
+void SerializeBinaryProto(const TProto& proto, NJson::TJsonValue& value) {
+ value.SetType(NJson::EJsonValueType::JSON_MAP);
+
+ const auto config = NProtobufJson::TProto2JsonConfig()
+ .AddStringTransform(MakeIntrusive<NProtobufJson::TBase64EncodeBytesTransform>());
+
+ NProtobufJson::Proto2Json(proto, value["encoded_proto"], config);
+}
+
+template <typename TProto>
+void DeserializeBinaryProto(const NJson::TJsonValue& value, TProto& proto) {
+ const auto& valueMap = value.GetMap();
+ const auto encodedProto = valueMap.find("encoded_proto");
+ if (encodedProto == valueMap.end()) {
+ return NProtobufJson::Json2Proto(value, proto, NProtobufJson::TJson2ProtoConfig());
+ }
+
+ const auto config = NProtobufJson::TJson2ProtoConfig()
+ .AddStringTransform(MakeIntrusive<NProtobufJson::TBase64DecodeBytesTransform>());
+
+ NProtobufJson::Json2Proto(encodedProto->second, proto, config);
+}
+
class TQueryBase : public NKikimr::TQueryBase {
public:
@@ -2272,7 +2296,7 @@ private:
NJson::TJsonValue::TArray& jsonArray = value.GetArraySafe();
jsonArray.resize(sinks.size());
for (size_t i = 0; i < sinks.size(); ++i) {
- NProtobufJson::Proto2Json(sinks[i], jsonArray[i], NProtobufJson::TProto2JsonConfig());
+ SerializeBinaryProto(sinks[i], jsonArray[i]);
}
NJsonWriter::TBuf serializedSinks;
@@ -2416,7 +2440,7 @@ public:
value.GetValuePointer(i, &serializedSink);
NKqpProto::TKqpExternalSink sink;
- NProtobufJson::Json2Proto(*serializedSink, sink);
+ DeserializeBinaryProto(*serializedSink, sink);
Response->Sinks.push_back(sink);
}
}
diff --git a/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp b/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp
index 85ae65f32fc..026e9bda4ca 100644
--- a/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp
+++ b/ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp
@@ -785,7 +785,10 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
Y_UNIT_TEST(InsertIntoBucketWithSelect) {
const TString writeDataSourceName = "/Root/write_data_source";
const TString writeBucket = "test_bucket_write_with_select";
- const TString writeObject = "test_object_write/";
+
+ // Also tests large object path with size >= 128
+ // for atomic upload commit case
+ const TString writeObject = TStringBuilder() << "test_object_write/" << TString(512, 'x') << "/";
{
Aws::S3::S3Client s3Client = MakeS3Client();
@@ -811,6 +814,8 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
const TString sql = fmt::format(R"(
+ PRAGMA s3.AtomicUploadCommit = "true";
+
INSERT INTO `{write_source}`.`{write_object}` WITH (FORMAT = "csv_with_names")
SELECT * FROM AS_TABLE([<|id: 0, payload: "#######"|>]);