aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2023-12-04 17:32:31 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2023-12-05 04:16:04 +0300
commit729685076fffeae4fcfb8df31b8861813fc4a9be (patch)
tree459271bae0a07e833216282b42a4c4239e3a031f
parentbff99195ecfa960a4162f236cf20a5e57f6d2f26 (diff)
downloadydb-729685076fffeae4fcfb8df31b8861813fc4a9be.tar.gz
Intermediate changes
-rw-r--r--contrib/clickhouse/src/Common/ya.make4
-rw-r--r--contrib/clickhouse/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp4
-rw-r--r--contrib/clickhouse/src/IO/S3/Client.cpp33
-rw-r--r--contrib/clickhouse/src/IO/S3/Client.h4
-rw-r--r--contrib/clickhouse/src/IO/S3/PocoHTTPClient.cpp14
-rw-r--r--contrib/clickhouse/src/IO/S3/Requests.cpp16
-rw-r--r--contrib/clickhouse/src/IO/S3/Requests.h34
-rw-r--r--contrib/clickhouse/src/IO/WriteBufferFromS3.cpp24
-rw-r--r--contrib/clickhouse/src/IO/WriteBufferFromS3.h5
-rw-r--r--contrib/clickhouse/src/ya.make4
10 files changed, 66 insertions, 76 deletions
diff --git a/contrib/clickhouse/src/Common/ya.make b/contrib/clickhouse/src/Common/ya.make
index ab21fe0294..e81b00e118 100644
--- a/contrib/clickhouse/src/Common/ya.make
+++ b/contrib/clickhouse/src/Common/ya.make
@@ -35,8 +35,12 @@ PEERDIR(
contrib/libs/snappy
contrib/libs/sparsehash
contrib/libs/zstd
+ contrib/restricted/aws/aws-c-auth
contrib/restricted/aws/aws-c-common
contrib/restricted/aws/aws-c-io
+ contrib/restricted/aws/aws-c-mqtt
+ contrib/restricted/aws/aws-c-sdkutils
+ contrib/restricted/aws/aws-crt-cpp
contrib/restricted/boost/circular_buffer
contrib/restricted/boost/container_hash
contrib/restricted/boost/context
diff --git a/contrib/clickhouse/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp b/contrib/clickhouse/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp
index 1e68d99c7e..0d9670efeb 100644
--- a/contrib/clickhouse/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp
+++ b/contrib/clickhouse/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp
@@ -127,7 +127,7 @@ private:
result = !objects.empty();
for (const auto & object : objects)
- batch.emplace_back(object.GetKey(), ObjectMetadata{static_cast<uint64_t>(object.GetSize()), Poco::Timestamp::fromEpochTime(object.GetLastModified().Millis() / 1000), {}});
+ batch.emplace_back(object.GetKey(), ObjectMetadata{static_cast<uint64_t>(object.GetSize()), Poco::Timestamp::fromEpochTime(object.GetLastModified().Seconds()), {}});
if (result)
request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken());
@@ -293,7 +293,7 @@ void S3ObjectStorage::listObjects(const std::string & path, RelativePathsWithMet
break;
for (const auto & object : objects)
- children.emplace_back(object.GetKey(), ObjectMetadata{static_cast<uint64_t>(object.GetSize()), Poco::Timestamp::fromEpochTime(object.GetLastModified().Millis() / 1000), {}});
+ children.emplace_back(object.GetKey(), ObjectMetadata{static_cast<uint64_t>(object.GetSize()), Poco::Timestamp::fromEpochTime(object.GetLastModified().Seconds()), {}});
if (max_keys)
{
diff --git a/contrib/clickhouse/src/IO/S3/Client.cpp b/contrib/clickhouse/src/IO/S3/Client.cpp
index 104fc2dd5b..c00b6ad6b7 100644
--- a/contrib/clickhouse/src/IO/S3/Client.cpp
+++ b/contrib/clickhouse/src/IO/S3/Client.cpp
@@ -9,9 +9,9 @@
#include <aws/s3/model/HeadObjectRequest.h>
#include <aws/s3/model/ListObjectsV2Request.h>
#include <aws/core/client/AWSErrorMarshaller.h>
-// #include <aws/core/endpoint/EndpointParameter.h>
+#include <aws/core/endpoint/EndpointParameter.h>
#include <aws/core/utils/HashingUtils.h>
-// #include <aws/core/utils/logging/ErrorMacros.h>
+#include <aws/core/utils/logging/ErrorMacros.h>
#include <Poco/Net/NetException.h>
@@ -51,8 +51,8 @@ namespace S3
Client::RetryStrategy::RetryStrategy(std::shared_ptr<Aws::Client::RetryStrategy> wrapped_strategy_)
: wrapped_strategy(std::move(wrapped_strategy_))
{
- // if (!wrapped_strategy)
- // wrapped_strategy = Aws::Client::InitRetryStrategy();
+ if (!wrapped_strategy)
+ wrapped_strategy = Aws::Client::InitRetryStrategy();
}
/// NOLINTNEXTLINE(google-runtime-int)
@@ -81,10 +81,10 @@ void Client::RetryStrategy::GetSendToken()
return wrapped_strategy->GetSendToken();
}
-// bool Client::RetryStrategy::HasSendToken()
-// {
-// return wrapped_strategy->HasSendToken();
-// }
+bool Client::RetryStrategy::HasSendToken()
+{
+ return wrapped_strategy->HasSendToken();
+}
void Client::RetryStrategy::RequestBookkeeping(const Aws::Client::HttpResponseOutcome& httpResponseOutcome)
{
@@ -166,11 +166,9 @@ Client::Client(
, sse_kms_config(std::move(sse_kms_config_))
, log(&Poco::Logger::get("S3Client"))
{
-#if 0
auto * endpoint_provider = dynamic_cast<Aws::S3::Endpoint::S3DefaultEpProviderBase *>(accessEndpointProvider().get());
endpoint_provider->GetBuiltInParameters().GetParameter("Region").GetString(explicit_region);
endpoint_provider->GetBuiltInParameters().GetParameter("Endpoint").GetString(initial_endpoint);
-#endif
provider_type = deduceProviderType(initial_endpoint);
LOG_TRACE(log, "Provider type: {}", toString(provider_type));
@@ -408,14 +406,12 @@ Model::CompleteMultipartUploadOutcome Client::CompleteMultipartUpload(const Comp
compose_req.SetKey(key);
compose_req.SetComponentNames({key});
compose_req.SetContentType("binary/octet-stream");
-#if 0
auto compose_outcome = ComposeObject(compose_req);
if (compose_outcome.IsSuccess())
LOG_TRACE(log, "Composing object was successful");
else
LOG_INFO(log, "Failed to compose object. Message: {}, Key: {}, Bucket: {}", compose_outcome.GetError().GetMessage(), key, bucket);
-#endif
return outcome;
}
@@ -456,7 +452,6 @@ Model::DeleteObjectsOutcome Client::DeleteObjects(const DeleteObjectsRequest & r
request, [this](const Model::DeleteObjectsRequest & req) { return DeleteObjects(req); });
}
-#if 0
Client::ComposeObjectOutcome Client::ComposeObject(const ComposeObjectRequest & request) const
{
auto request_fn = [this](const ComposeObjectRequest & req)
@@ -486,7 +481,6 @@ Client::ComposeObjectOutcome Client::ComposeObject(const ComposeObjectRequest &
return doRequestWithRetryNetworkErrors</*IsReadMethod*/ false>(
request, request_fn);
}
-#endif
template <typename RequestType, typename RequestFn>
std::invoke_result_t<RequestFn, RequestType>
@@ -662,7 +656,9 @@ std::string Client::getRegionForBucket(const std::string & bucket, bool force_de
if (outcome.IsSuccess())
{
const auto & result = outcome.GetResult();
- // region = result.GetRegion();
+#if 0
+ region = result.GetRegion();
+#endif
}
else
{
@@ -687,8 +683,6 @@ std::string Client::getRegionForBucket(const std::string & bucket, bool force_de
std::optional<S3::URI> Client::getURIFromError(const Aws::S3::S3Error & error) const
{
- return std::nullopt;
-#if 0
auto endpoint = GetErrorMarshaller()->ExtractEndpoint(error);
if (endpoint.empty())
return std::nullopt;
@@ -704,7 +698,6 @@ std::optional<S3::URI> Client::getURIFromError(const Aws::S3::S3Error & error) c
uri.SetAuthority(endpoint);
return S3::URI(uri.GetURIString());
-#endif
}
// Do a list request because head requests don't have body in response
@@ -799,13 +792,13 @@ ClientFactory::ClientFactory()
{
aws_options = Aws::SDKOptions{};
Aws::InitAPI(aws_options);
- // Aws::Utils::Logging::InitializeAWSLogging(std::make_shared<AWSLogger>(false));
+ Aws::Utils::Logging::InitializeAWSLogging(std::make_shared<AWSLogger>(false));
Aws::Http::SetHttpClientFactory(std::make_shared<PocoHTTPClientFactory>());
}
ClientFactory::~ClientFactory()
{
- // Aws::Utils::Logging::ShutdownAWSLogging();
+ Aws::Utils::Logging::ShutdownAWSLogging();
Aws::ShutdownAPI(aws_options);
}
diff --git a/contrib/clickhouse/src/IO/S3/Client.h b/contrib/clickhouse/src/IO/S3/Client.h
index 721b8dd944..535ed06a44 100644
--- a/contrib/clickhouse/src/IO/S3/Client.h
+++ b/contrib/clickhouse/src/IO/S3/Client.h
@@ -36,7 +36,7 @@ struct ServerSideEncryptionKMSConfig
#include <aws/core/Aws.h>
#include <aws/core/client/DefaultRetryStrategy.h>
#include <aws/s3/S3Client.h>
-// #include <aws/s3/S3ServiceClientModel.h>
+#include <aws/s3/S3ServiceClientModel.h>
#include <aws/core/client/AWSErrorMarshaller.h>
#include <aws/core/client/RetryStrategy.h>
@@ -174,7 +174,7 @@ public:
void GetSendToken() override;
- // bool HasSendToken() override;
+ bool HasSendToken() override;
void RequestBookkeeping(const Aws::Client::HttpResponseOutcome& httpResponseOutcome) override;
void RequestBookkeeping(const Aws::Client::HttpResponseOutcome& httpResponseOutcome, const Aws::Client::AWSError<Aws::Client::CoreErrors>& lastError) override;
diff --git a/contrib/clickhouse/src/IO/S3/PocoHTTPClient.cpp b/contrib/clickhouse/src/IO/S3/PocoHTTPClient.cpp
index a61f88c4af..6ef7051c5d 100644
--- a/contrib/clickhouse/src/IO/S3/PocoHTTPClient.cpp
+++ b/contrib/clickhouse/src/IO/S3/PocoHTTPClient.cpp
@@ -345,10 +345,12 @@ void PocoHTTPClient::makeRequestInternalImpl(
session = makeHTTPSession(target_uri, timeouts);
bool use_tunnel = request_configuration.protocol == DB::ProxyConfiguration::Protocol::HTTP && target_uri.getScheme() == "https";
- // session->setProxy(
- // request_configuration.proxy_host,
- // request_configuration.proxy_port
- // );
+#if 0
+ session->setProxy(
+ request_configuration.proxy_host,
+ request_configuration.proxy_port
+ );
+#endif
}
else
{
@@ -359,8 +361,10 @@ void PocoHTTPClient::makeRequestInternalImpl(
session = makeHTTPSession(target_uri, timeouts);
}
+#if 0
/// In case of error this address will be written to logs
- // request.SetResolvedRemoteHost(session->getResolvedAddress());
+ request.SetResolvedRemoteHost(session->getResolvedAddress());
+#endif
Poco::Net::HTTPRequest poco_request(Poco::Net::HTTPRequest::HTTP_1_1);
diff --git a/contrib/clickhouse/src/IO/S3/Requests.cpp b/contrib/clickhouse/src/IO/S3/Requests.cpp
index 2f2f8637ef..56d2e44a2c 100644
--- a/contrib/clickhouse/src/IO/S3/Requests.cpp
+++ b/contrib/clickhouse/src/IO/S3/Requests.cpp
@@ -3,7 +3,7 @@
#if USE_AWS_S3
#include <Common/logger_useful.h>
-// #include <aws/core/endpoint/EndpointParameter.h>
+#include <aws/core/endpoint/EndpointParameter.h>
#include <aws/core/utils/xml/XmlSerializer.h>
namespace DB::S3
@@ -82,14 +82,14 @@ Aws::Http::HeaderValueCollection ComposeObjectRequest::GetRequestSpecificHeaders
return {Aws::Http::HeaderValuePair(Aws::Http::CONTENT_TYPE_HEADER, content_type)};
}
-// Aws::Endpoint::EndpointParameters ComposeObjectRequest::GetEndpointContextParams() const
-// {
-// EndpointParameters parameters;
-// if (BucketHasBeenSet())
-// parameters.emplace_back("Bucket", GetBucket(), Aws::Endpoint::EndpointParameter::ParameterOrigin::OPERATION_CONTEXT);
+Aws::Endpoint::EndpointParameters ComposeObjectRequest::GetEndpointContextParams() const
+{
+ EndpointParameters parameters;
+ if (BucketHasBeenSet())
+ parameters.emplace_back("Bucket", GetBucket(), Aws::Endpoint::EndpointParameter::ParameterOrigin::OPERATION_CONTEXT);
-// return parameters;
-// }
+ return parameters;
+}
const Aws::String & ComposeObjectRequest::GetBucket() const
{
diff --git a/contrib/clickhouse/src/IO/S3/Requests.h b/contrib/clickhouse/src/IO/S3/Requests.h
index 5d0b930e01..929034707d 100644
--- a/contrib/clickhouse/src/IO/S3/Requests.h
+++ b/contrib/clickhouse/src/IO/S3/Requests.h
@@ -7,7 +7,7 @@
#include <IO/S3/URI.h>
#include <IO/S3/ProviderType.h>
-// #include <aws/core/endpoint/EndpointParameter.h>
+#include <aws/core/endpoint/EndpointParameter.h>
#include <aws/s3/model/HeadObjectRequest.h>
#include <aws/s3/model/ListObjectsV2Request.h>
#include <aws/s3/model/ListObjectsRequest.h>
@@ -31,21 +31,21 @@ template <typename BaseRequest>
class ExtendedRequest : public BaseRequest
{
public:
- // Aws::Endpoint::EndpointParameters GetEndpointContextParams() const override
- // {
- // auto params = BaseRequest::GetEndpointContextParams();
- // if (!region_override.empty())
- // params.emplace_back("Region", region_override);
-
- // if (uri_override.has_value())
- // {
- // static const Aws::String AWS_S3_FORCE_PATH_STYLE = "ForcePathStyle";
- // params.emplace_back(AWS_S3_FORCE_PATH_STYLE, !uri_override->is_virtual_hosted_style);
- // params.emplace_back("Endpoint", uri_override->endpoint);
- // }
-
- // return params;
- // }
+ Aws::Endpoint::EndpointParameters GetEndpointContextParams() const override
+ {
+ auto params = BaseRequest::GetEndpointContextParams();
+ if (!region_override.empty())
+ params.emplace_back("Region", region_override);
+
+ if (uri_override.has_value())
+ {
+ static const Aws::String AWS_S3_FORCE_PATH_STYLE = "ForcePathStyle";
+ params.emplace_back(AWS_S3_FORCE_PATH_STYLE, !uri_override->is_virtual_hosted_style);
+ params.emplace_back("Endpoint", uri_override->endpoint);
+ }
+
+ return params;
+ }
void overrideRegion(std::string region) const
{
@@ -106,7 +106,7 @@ public:
AWS_S3_API Aws::Http::HeaderValueCollection GetRequestSpecificHeaders() const override;
- // AWS_S3_API EndpointParameters GetEndpointContextParams() const override;
+ AWS_S3_API EndpointParameters GetEndpointContextParams() const override;
const Aws::String & GetBucket() const;
bool BucketHasBeenSet() const;
diff --git a/contrib/clickhouse/src/IO/WriteBufferFromS3.cpp b/contrib/clickhouse/src/IO/WriteBufferFromS3.cpp
index 824d0ae00a..26b116ea04 100644
--- a/contrib/clickhouse/src/IO/WriteBufferFromS3.cpp
+++ b/contrib/clickhouse/src/IO/WriteBufferFromS3.cpp
@@ -56,24 +56,6 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
-// struct WriteBufferFromS3::PartData
-// {
-// Memory<> memory;
-// size_t data_size = 0;
-
-// std::shared_ptr<std::iostream> createAwsBuffer()
-// {
-// auto buffer = std::make_shared<StdIStreamFromMemory>(memory.data(), data_size);
-// buffer->exceptions(std::ios::badbit);
-// return buffer;
-// }
-
-// bool isEmpty() const
-// {
-// return data_size == 0;
-// }
-// };
-
std::shared_ptr<std::iostream> WriteBufferFromS3::PartData::createAwsBuffer()
{
auto buffer = std::make_shared<StdIStreamFromMemory>(memory.data(), data_size);
@@ -81,6 +63,12 @@ std::shared_ptr<std::iostream> WriteBufferFromS3::PartData::createAwsBuffer()
return buffer;
}
+bool WriteBufferFromS3::PartData::isEmpty() const
+{
+ return data_size == 0;
+}
+
+
WriteBufferFromS3::WriteBufferFromS3(
std::shared_ptr<const S3::Client> client_ptr_,
std::shared_ptr<const S3::Client> client_with_long_timeout_ptr_,
diff --git a/contrib/clickhouse/src/IO/WriteBufferFromS3.h b/contrib/clickhouse/src/IO/WriteBufferFromS3.h
index 0fdf771e1f..9f399ccc97 100644
--- a/contrib/clickhouse/src/IO/WriteBufferFromS3.h
+++ b/contrib/clickhouse/src/IO/WriteBufferFromS3.h
@@ -72,10 +72,7 @@ private:
std::shared_ptr<std::iostream> createAwsBuffer();
- bool isEmpty() const
- {
- return data_size == 0;
- }
+ bool isEmpty() const;
};
void hidePartialData();
diff --git a/contrib/clickhouse/src/ya.make b/contrib/clickhouse/src/ya.make
index 9113f0a7a8..1f5158a3ca 100644
--- a/contrib/clickhouse/src/ya.make
+++ b/contrib/clickhouse/src/ya.make
@@ -47,8 +47,12 @@ PEERDIR(
contrib/libs/wyhash
contrib/libs/xxhash
contrib/libs/zstd
+ contrib/restricted/aws/aws-c-auth
contrib/restricted/aws/aws-c-common
contrib/restricted/aws/aws-c-io
+ contrib/restricted/aws/aws-c-mqtt
+ contrib/restricted/aws/aws-c-sdkutils
+ contrib/restricted/aws/aws-crt-cpp
contrib/restricted/boost/circular_buffer
contrib/restricted/boost/container_hash
contrib/restricted/boost/context