diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2023-12-04 17:32:31 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2023-12-05 04:16:04 +0300 |
commit | 729685076fffeae4fcfb8df31b8861813fc4a9be (patch) | |
tree | 459271bae0a07e833216282b42a4c4239e3a031f /contrib/clickhouse | |
parent | bff99195ecfa960a4162f236cf20a5e57f6d2f26 (diff) | |
download | ydb-729685076fffeae4fcfb8df31b8861813fc4a9be.tar.gz |
Intermediate changes
Diffstat (limited to 'contrib/clickhouse')
-rw-r--r-- | contrib/clickhouse/src/Common/ya.make | 4 | ||||
-rw-r--r-- | contrib/clickhouse/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp | 4 | ||||
-rw-r--r-- | contrib/clickhouse/src/IO/S3/Client.cpp | 33 | ||||
-rw-r--r-- | contrib/clickhouse/src/IO/S3/Client.h | 4 | ||||
-rw-r--r-- | contrib/clickhouse/src/IO/S3/PocoHTTPClient.cpp | 14 | ||||
-rw-r--r-- | contrib/clickhouse/src/IO/S3/Requests.cpp | 16 | ||||
-rw-r--r-- | contrib/clickhouse/src/IO/S3/Requests.h | 34 | ||||
-rw-r--r-- | contrib/clickhouse/src/IO/WriteBufferFromS3.cpp | 24 | ||||
-rw-r--r-- | contrib/clickhouse/src/IO/WriteBufferFromS3.h | 5 | ||||
-rw-r--r-- | contrib/clickhouse/src/ya.make | 4 |
10 files changed, 66 insertions, 76 deletions
diff --git a/contrib/clickhouse/src/Common/ya.make b/contrib/clickhouse/src/Common/ya.make index ab21fe0294..e81b00e118 100644 --- a/contrib/clickhouse/src/Common/ya.make +++ b/contrib/clickhouse/src/Common/ya.make @@ -35,8 +35,12 @@ PEERDIR( contrib/libs/snappy contrib/libs/sparsehash contrib/libs/zstd + contrib/restricted/aws/aws-c-auth contrib/restricted/aws/aws-c-common contrib/restricted/aws/aws-c-io + contrib/restricted/aws/aws-c-mqtt + contrib/restricted/aws/aws-c-sdkutils + contrib/restricted/aws/aws-crt-cpp contrib/restricted/boost/circular_buffer contrib/restricted/boost/container_hash contrib/restricted/boost/context diff --git a/contrib/clickhouse/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp b/contrib/clickhouse/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp index 1e68d99c7e..0d9670efeb 100644 --- a/contrib/clickhouse/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp +++ b/contrib/clickhouse/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp @@ -127,7 +127,7 @@ private: result = !objects.empty(); for (const auto & object : objects) - batch.emplace_back(object.GetKey(), ObjectMetadata{static_cast<uint64_t>(object.GetSize()), Poco::Timestamp::fromEpochTime(object.GetLastModified().Millis() / 1000), {}}); + batch.emplace_back(object.GetKey(), ObjectMetadata{static_cast<uint64_t>(object.GetSize()), Poco::Timestamp::fromEpochTime(object.GetLastModified().Seconds()), {}}); if (result) request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken()); @@ -293,7 +293,7 @@ void S3ObjectStorage::listObjects(const std::string & path, RelativePathsWithMet break; for (const auto & object : objects) - children.emplace_back(object.GetKey(), ObjectMetadata{static_cast<uint64_t>(object.GetSize()), Poco::Timestamp::fromEpochTime(object.GetLastModified().Millis() / 1000), {}}); + children.emplace_back(object.GetKey(), ObjectMetadata{static_cast<uint64_t>(object.GetSize()), Poco::Timestamp::fromEpochTime(object.GetLastModified().Seconds()), {}}); if (max_keys) { diff --git a/contrib/clickhouse/src/IO/S3/Client.cpp b/contrib/clickhouse/src/IO/S3/Client.cpp index 104fc2dd5b..c00b6ad6b7 100644 --- a/contrib/clickhouse/src/IO/S3/Client.cpp +++ b/contrib/clickhouse/src/IO/S3/Client.cpp @@ -9,9 +9,9 @@ #include <aws/s3/model/HeadObjectRequest.h> #include <aws/s3/model/ListObjectsV2Request.h> #include <aws/core/client/AWSErrorMarshaller.h> -// #include <aws/core/endpoint/EndpointParameter.h> +#include <aws/core/endpoint/EndpointParameter.h> #include <aws/core/utils/HashingUtils.h> -// #include <aws/core/utils/logging/ErrorMacros.h> +#include <aws/core/utils/logging/ErrorMacros.h> #include <Poco/Net/NetException.h> @@ -51,8 +51,8 @@ namespace S3 Client::RetryStrategy::RetryStrategy(std::shared_ptr<Aws::Client::RetryStrategy> wrapped_strategy_) : wrapped_strategy(std::move(wrapped_strategy_)) { - // if (!wrapped_strategy) - // wrapped_strategy = Aws::Client::InitRetryStrategy(); + if (!wrapped_strategy) + wrapped_strategy = Aws::Client::InitRetryStrategy(); } /// NOLINTNEXTLINE(google-runtime-int) @@ -81,10 +81,10 @@ void Client::RetryStrategy::GetSendToken() return wrapped_strategy->GetSendToken(); } -// bool Client::RetryStrategy::HasSendToken() -// { -// return wrapped_strategy->HasSendToken(); -// } +bool Client::RetryStrategy::HasSendToken() +{ + return wrapped_strategy->HasSendToken(); +} void Client::RetryStrategy::RequestBookkeeping(const Aws::Client::HttpResponseOutcome& httpResponseOutcome) { @@ -166,11 +166,9 @@ Client::Client( , sse_kms_config(std::move(sse_kms_config_)) , log(&Poco::Logger::get("S3Client")) { -#if 0 auto * endpoint_provider = dynamic_cast<Aws::S3::Endpoint::S3DefaultEpProviderBase *>(accessEndpointProvider().get()); endpoint_provider->GetBuiltInParameters().GetParameter("Region").GetString(explicit_region); endpoint_provider->GetBuiltInParameters().GetParameter("Endpoint").GetString(initial_endpoint); -#endif provider_type = deduceProviderType(initial_endpoint); LOG_TRACE(log, "Provider type: {}", toString(provider_type)); @@ -408,14 +406,12 @@ Model::CompleteMultipartUploadOutcome Client::CompleteMultipartUpload(const Comp compose_req.SetKey(key); compose_req.SetComponentNames({key}); compose_req.SetContentType("binary/octet-stream"); -#if 0 auto compose_outcome = ComposeObject(compose_req); if (compose_outcome.IsSuccess()) LOG_TRACE(log, "Composing object was successful"); else LOG_INFO(log, "Failed to compose object. Message: {}, Key: {}, Bucket: {}", compose_outcome.GetError().GetMessage(), key, bucket); -#endif return outcome; } @@ -456,7 +452,6 @@ Model::DeleteObjectsOutcome Client::DeleteObjects(const DeleteObjectsRequest & r request, [this](const Model::DeleteObjectsRequest & req) { return DeleteObjects(req); }); } -#if 0 Client::ComposeObjectOutcome Client::ComposeObject(const ComposeObjectRequest & request) const { auto request_fn = [this](const ComposeObjectRequest & req) @@ -486,7 +481,6 @@ Client::ComposeObjectOutcome Client::ComposeObject(const ComposeObjectRequest & return doRequestWithRetryNetworkErrors</*IsReadMethod*/ false>( request, request_fn); } -#endif template <typename RequestType, typename RequestFn> std::invoke_result_t<RequestFn, RequestType> @@ -662,7 +656,9 @@ std::string Client::getRegionForBucket(const std::string & bucket, bool force_de if (outcome.IsSuccess()) { const auto & result = outcome.GetResult(); - // region = result.GetRegion(); +#if 0 + region = result.GetRegion(); +#endif } else { @@ -687,8 +683,6 @@ std::string Client::getRegionForBucket(const std::string & bucket, bool force_de std::optional<S3::URI> Client::getURIFromError(const Aws::S3::S3Error & error) const { - return std::nullopt; -#if 0 auto endpoint = GetErrorMarshaller()->ExtractEndpoint(error); if (endpoint.empty()) return std::nullopt; @@ -704,7 +698,6 @@ std::optional<S3::URI> Client::getURIFromError(const Aws::S3::S3Error & error) c uri.SetAuthority(endpoint); return S3::URI(uri.GetURIString()); -#endif } // Do a list request because head requests don't have body in response @@ -799,13 +792,13 @@ ClientFactory::ClientFactory() { aws_options = Aws::SDKOptions{}; Aws::InitAPI(aws_options); - // Aws::Utils::Logging::InitializeAWSLogging(std::make_shared<AWSLogger>(false)); + Aws::Utils::Logging::InitializeAWSLogging(std::make_shared<AWSLogger>(false)); Aws::Http::SetHttpClientFactory(std::make_shared<PocoHTTPClientFactory>()); } ClientFactory::~ClientFactory() { - // Aws::Utils::Logging::ShutdownAWSLogging(); + Aws::Utils::Logging::ShutdownAWSLogging(); Aws::ShutdownAPI(aws_options); } diff --git a/contrib/clickhouse/src/IO/S3/Client.h b/contrib/clickhouse/src/IO/S3/Client.h index 721b8dd944..535ed06a44 100644 --- a/contrib/clickhouse/src/IO/S3/Client.h +++ b/contrib/clickhouse/src/IO/S3/Client.h @@ -36,7 +36,7 @@ struct ServerSideEncryptionKMSConfig #include <aws/core/Aws.h> #include <aws/core/client/DefaultRetryStrategy.h> #include <aws/s3/S3Client.h> -// #include <aws/s3/S3ServiceClientModel.h> +#include <aws/s3/S3ServiceClientModel.h> #include <aws/core/client/AWSErrorMarshaller.h> #include <aws/core/client/RetryStrategy.h> @@ -174,7 +174,7 @@ public: void GetSendToken() override; - // bool HasSendToken() override; + bool HasSendToken() override; void RequestBookkeeping(const Aws::Client::HttpResponseOutcome& httpResponseOutcome) override; void RequestBookkeeping(const Aws::Client::HttpResponseOutcome& httpResponseOutcome, const Aws::Client::AWSError<Aws::Client::CoreErrors>& lastError) override; diff --git a/contrib/clickhouse/src/IO/S3/PocoHTTPClient.cpp b/contrib/clickhouse/src/IO/S3/PocoHTTPClient.cpp index a61f88c4af..6ef7051c5d 100644 --- a/contrib/clickhouse/src/IO/S3/PocoHTTPClient.cpp +++ b/contrib/clickhouse/src/IO/S3/PocoHTTPClient.cpp @@ -345,10 +345,12 @@ void PocoHTTPClient::makeRequestInternalImpl( session = makeHTTPSession(target_uri, timeouts); bool use_tunnel = request_configuration.protocol == DB::ProxyConfiguration::Protocol::HTTP && target_uri.getScheme() == "https"; - // session->setProxy( - // request_configuration.proxy_host, - // request_configuration.proxy_port - // ); +#if 0 + session->setProxy( + request_configuration.proxy_host, + request_configuration.proxy_port + ); +#endif } else { @@ -359,8 +361,10 @@ void PocoHTTPClient::makeRequestInternalImpl( session = makeHTTPSession(target_uri, timeouts); } +#if 0 /// In case of error this address will be written to logs - // request.SetResolvedRemoteHost(session->getResolvedAddress()); + request.SetResolvedRemoteHost(session->getResolvedAddress()); +#endif Poco::Net::HTTPRequest poco_request(Poco::Net::HTTPRequest::HTTP_1_1); diff --git a/contrib/clickhouse/src/IO/S3/Requests.cpp b/contrib/clickhouse/src/IO/S3/Requests.cpp index 2f2f8637ef..56d2e44a2c 100644 --- a/contrib/clickhouse/src/IO/S3/Requests.cpp +++ b/contrib/clickhouse/src/IO/S3/Requests.cpp @@ -3,7 +3,7 @@ #if USE_AWS_S3 #include <Common/logger_useful.h> -// #include <aws/core/endpoint/EndpointParameter.h> +#include <aws/core/endpoint/EndpointParameter.h> #include <aws/core/utils/xml/XmlSerializer.h> namespace DB::S3 @@ -82,14 +82,14 @@ Aws::Http::HeaderValueCollection ComposeObjectRequest::GetRequestSpecificHeaders return {Aws::Http::HeaderValuePair(Aws::Http::CONTENT_TYPE_HEADER, content_type)}; } -// Aws::Endpoint::EndpointParameters ComposeObjectRequest::GetEndpointContextParams() const -// { -// EndpointParameters parameters; -// if (BucketHasBeenSet()) -// parameters.emplace_back("Bucket", GetBucket(), Aws::Endpoint::EndpointParameter::ParameterOrigin::OPERATION_CONTEXT); +Aws::Endpoint::EndpointParameters ComposeObjectRequest::GetEndpointContextParams() const +{ + EndpointParameters parameters; + if (BucketHasBeenSet()) + parameters.emplace_back("Bucket", GetBucket(), Aws::Endpoint::EndpointParameter::ParameterOrigin::OPERATION_CONTEXT); -// return parameters; -// } + return parameters; +} const Aws::String & ComposeObjectRequest::GetBucket() const { diff --git a/contrib/clickhouse/src/IO/S3/Requests.h b/contrib/clickhouse/src/IO/S3/Requests.h index 5d0b930e01..929034707d 100644 --- a/contrib/clickhouse/src/IO/S3/Requests.h +++ b/contrib/clickhouse/src/IO/S3/Requests.h @@ -7,7 +7,7 @@ #include <IO/S3/URI.h> #include <IO/S3/ProviderType.h> -// #include <aws/core/endpoint/EndpointParameter.h> +#include <aws/core/endpoint/EndpointParameter.h> #include <aws/s3/model/HeadObjectRequest.h> #include <aws/s3/model/ListObjectsV2Request.h> #include <aws/s3/model/ListObjectsRequest.h> @@ -31,21 +31,21 @@ template <typename BaseRequest> class ExtendedRequest : public BaseRequest { public: - // Aws::Endpoint::EndpointParameters GetEndpointContextParams() const override - // { - // auto params = BaseRequest::GetEndpointContextParams(); - // if (!region_override.empty()) - // params.emplace_back("Region", region_override); - - // if (uri_override.has_value()) - // { - // static const Aws::String AWS_S3_FORCE_PATH_STYLE = "ForcePathStyle"; - // params.emplace_back(AWS_S3_FORCE_PATH_STYLE, !uri_override->is_virtual_hosted_style); - // params.emplace_back("Endpoint", uri_override->endpoint); - // } - - // return params; - // } + Aws::Endpoint::EndpointParameters GetEndpointContextParams() const override + { + auto params = BaseRequest::GetEndpointContextParams(); + if (!region_override.empty()) + params.emplace_back("Region", region_override); + + if (uri_override.has_value()) + { + static const Aws::String AWS_S3_FORCE_PATH_STYLE = "ForcePathStyle"; + params.emplace_back(AWS_S3_FORCE_PATH_STYLE, !uri_override->is_virtual_hosted_style); + params.emplace_back("Endpoint", uri_override->endpoint); + } + + return params; + } void overrideRegion(std::string region) const { @@ -106,7 +106,7 @@ public: AWS_S3_API Aws::Http::HeaderValueCollection GetRequestSpecificHeaders() const override; - // AWS_S3_API EndpointParameters GetEndpointContextParams() const override; + AWS_S3_API EndpointParameters GetEndpointContextParams() const override; const Aws::String & GetBucket() const; bool BucketHasBeenSet() const; diff --git a/contrib/clickhouse/src/IO/WriteBufferFromS3.cpp b/contrib/clickhouse/src/IO/WriteBufferFromS3.cpp index 824d0ae00a..26b116ea04 100644 --- a/contrib/clickhouse/src/IO/WriteBufferFromS3.cpp +++ b/contrib/clickhouse/src/IO/WriteBufferFromS3.cpp @@ -56,24 +56,6 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } -// struct WriteBufferFromS3::PartData -// { -// Memory<> memory; -// size_t data_size = 0; - -// std::shared_ptr<std::iostream> createAwsBuffer() -// { -// auto buffer = std::make_shared<StdIStreamFromMemory>(memory.data(), data_size); -// buffer->exceptions(std::ios::badbit); -// return buffer; -// } - -// bool isEmpty() const -// { -// return data_size == 0; -// } -// }; - std::shared_ptr<std::iostream> WriteBufferFromS3::PartData::createAwsBuffer() { auto buffer = std::make_shared<StdIStreamFromMemory>(memory.data(), data_size); @@ -81,6 +63,12 @@ std::shared_ptr<std::iostream> WriteBufferFromS3::PartData::createAwsBuffer() return buffer; } +bool WriteBufferFromS3::PartData::isEmpty() const +{ + return data_size == 0; +} + + WriteBufferFromS3::WriteBufferFromS3( std::shared_ptr<const S3::Client> client_ptr_, std::shared_ptr<const S3::Client> client_with_long_timeout_ptr_, diff --git a/contrib/clickhouse/src/IO/WriteBufferFromS3.h b/contrib/clickhouse/src/IO/WriteBufferFromS3.h index 0fdf771e1f..9f399ccc97 100644 --- a/contrib/clickhouse/src/IO/WriteBufferFromS3.h +++ b/contrib/clickhouse/src/IO/WriteBufferFromS3.h @@ -72,10 +72,7 @@ private: std::shared_ptr<std::iostream> createAwsBuffer(); - bool isEmpty() const - { - return data_size == 0; - } + bool isEmpty() const; }; void hidePartialData(); diff --git a/contrib/clickhouse/src/ya.make b/contrib/clickhouse/src/ya.make index 9113f0a7a8..1f5158a3ca 100644 --- a/contrib/clickhouse/src/ya.make +++ b/contrib/clickhouse/src/ya.make @@ -47,8 +47,12 @@ PEERDIR( contrib/libs/wyhash contrib/libs/xxhash contrib/libs/zstd + contrib/restricted/aws/aws-c-auth contrib/restricted/aws/aws-c-common contrib/restricted/aws/aws-c-io + contrib/restricted/aws/aws-c-mqtt + contrib/restricted/aws/aws-c-sdkutils + contrib/restricted/aws/aws-crt-cpp contrib/restricted/boost/circular_buffer contrib/restricted/boost/container_hash contrib/restricted/boost/context |