diff options
author | Oleg Doronin <dorooleg@yandex.ru> | 2024-09-10 08:22:59 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-09-10 09:22:59 +0300 |
commit | f30d4812e0766d2d8d54390d1744087e56118b61 (patch) | |
tree | e935acad1339bb0ebc18eced6b51f3a4ce7c2704 | |
parent | 257026796833e57263e04c77640bd88322f4ba58 (diff) | |
download | ydb-f30d4812e0766d2d8d54390d1744087e56118b61.tar.gz |
The code has been got rid of the TS3ReadActorFactoryConfig (#8181)
-rw-r--r-- | ydb/core/fq/libs/actors/run_actor.cpp | 9 | ||||
-rw-r--r-- | ydb/core/fq/libs/config/protos/read_actors_factory.proto | 10 | ||||
-rw-r--r-- | ydb/core/fq/libs/init/init.cpp | 12 | ||||
-rw-r--r-- | ydb/library/yql/providers/s3/provider/yql_s3_provider.cpp | 5 | ||||
-rw-r--r-- | ydb/library/yql/providers/s3/provider/yql_s3_provider.h | 2 | ||||
-rw-r--r-- | ydb/library/yql/tools/dqrun/dqrun.cpp | 3 | ||||
-rw-r--r-- | ydb/tests/fq/common/conftest.py | 2 | ||||
-rw-r--r-- | ydb/tests/fq/plans/conftest.py | 2 | ||||
-rw-r--r-- | ydb/tests/fq/restarts/conftest.py | 2 | ||||
-rw-r--r-- | ydb/tests/fq/s3/conftest.py | 2 | ||||
-rw-r--r-- | ydb/tests/tools/fq_runner/kikimr_runner.py | 8 | ||||
-rw-r--r-- | ydb/tests/tools/fq_runner/kikimr_utils.py | 6 |
12 files changed, 14 insertions, 49 deletions
diff --git a/ydb/core/fq/libs/actors/run_actor.cpp b/ydb/core/fq/libs/actors/run_actor.cpp index f66a7eee4e..fcaf7d1601 100644 --- a/ydb/core/fq/libs/actors/run_actor.cpp +++ b/ydb/core/fq/libs/actors/run_actor.cpp @@ -779,11 +779,7 @@ private: mkqlDefaultLimit = 8_GB; } - // This part is for backward compatibility. TODO: remove this part after migration to TS3GatewayConfig - auto s3ReadDefaultInflightLimit = Params.Config.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig().GetDataInflight(); - if (s3ReadDefaultInflightLimit == 0) { - s3ReadDefaultInflightLimit = Params.Config.GetGateways().GetS3().GetDataInflight(); - } + auto s3ReadDefaultInflightLimit = Params.Config.GetGateways().GetS3().GetDataInflight(); if (s3ReadDefaultInflightLimit == 0) { s3ReadDefaultInflightLimit = 200_MB; } @@ -1970,8 +1966,7 @@ private: } { - dataProvidersInit.push_back(GetS3DataProviderInitializer(Params.S3Gateway, Params.CredentialsFactory, - Params.Config.GetReadActorsFactoryConfig().HasS3ReadActorFactoryConfig() ? Params.Config.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig().GetAllowLocalFiles() : Params.Config.GetGateways().GetS3().GetAllowLocalFiles(), NActors::TActivationContext::ActorSystem())); // This part is for backward compatibility. TODO: remove this part after migration to TS3GatewayConfig + dataProvidersInit.push_back(GetS3DataProviderInitializer(Params.S3Gateway, Params.CredentialsFactory, NActors::TActivationContext::ActorSystem())); } { diff --git a/ydb/core/fq/libs/config/protos/read_actors_factory.proto b/ydb/core/fq/libs/config/protos/read_actors_factory.proto index e780151233..f8aa5a7a8e 100644 --- a/ydb/core/fq/libs/config/protos/read_actors_factory.proto +++ b/ydb/core/fq/libs/config/protos/read_actors_factory.proto @@ -8,19 +8,11 @@ import "ydb/library/yql/providers/s3/proto/retry_config.proto"; //////////////////////////////////////////////////////////// -message TS3ReadActorFactoryConfig { - NYql.NS3.TRetryConfig RetryConfig = 1; - uint64 RowsInBatch = 2; // Default = 1000 - uint64 MaxInflight = 3; // Default = 20 - uint64 DataInflight = 4; // Default = 200 MB - bool AllowLocalFiles = 5; -} - message TPqReadActorFactoryConfig { bool CookieCommitMode = 1; // Turn off RangesMode setting in PQ read session. } message TReadActorsFactoryConfig { - TS3ReadActorFactoryConfig S3ReadActorFactoryConfig = 1; + reserved 1; TPqReadActorFactoryConfig PqReadActorFactoryConfig = 2; } diff --git a/ydb/core/fq/libs/init/init.cpp b/ydb/core/fq/libs/init/init.cpp index 19d7456273..ae43b99e68 100644 --- a/ydb/core/fq/libs/init/init.cpp +++ b/ydb/core/fq/libs/init/init.cpp @@ -196,21 +196,9 @@ void Init( auto s3ActorsFactory = NYql::NDq::CreateS3ActorsFactory(); if (protoConfig.GetPrivateApi().GetEnabled()) { - const auto& s3readConfig = protoConfig.GetReadActorsFactoryConfig().GetS3ReadActorFactoryConfig(); auto s3HttpRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(NYql::THttpRetryPolicyOptions{.MaxTime = TDuration::Max(), .RetriedCurlCodes = NYql::FqRetriedCurlCodes()}); NYql::NDq::TS3ReadActorFactoryConfig readActorFactoryCfg = NYql::NDq::CreateReadActorFactoryConfig(protoConfig.GetGateways().GetS3()); - // These fillings were left for the backward compatibility. TODO: remove this part after migration to TS3GatewayConfig - if (const ui64 rowsInBatch = s3readConfig.GetRowsInBatch()) { - readActorFactoryCfg.RowsInBatch = rowsInBatch; - } - if (const ui64 maxInflight = s3readConfig.GetMaxInflight()) { - readActorFactoryCfg.MaxInflight = maxInflight; - } - if (const ui64 dataInflight = s3readConfig.GetDataInflight()) { - readActorFactoryCfg.DataInflight = dataInflight; - } - RegisterDqInputTransformLookupActorFactory(*asyncIoFactory); RegisterDqPqReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, yqCounters->GetSubgroup("subsystem", "DqSourceTracker")); RegisterYdbReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory); diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_provider.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_provider.cpp index c283c53e8c..92cb4b120f 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_provider.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_provider.cpp @@ -4,8 +4,8 @@ namespace NYql { -TDataProviderInitializer GetS3DataProviderInitializer(IHTTPGateway::TPtr gateway, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, bool allowLocalFiles, NActors::TActorSystem* actorSystem) { - return [gateway, credentialsFactory, allowLocalFiles, actorSystem] ( +TDataProviderInitializer GetS3DataProviderInitializer(IHTTPGateway::TPtr gateway, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, NActors::TActorSystem* actorSystem) { + return [gateway, credentialsFactory, actorSystem] ( const TString& userName, const TString& sessionId, const TGatewaysConfig* gatewaysConfig, @@ -35,7 +35,6 @@ TDataProviderInitializer GetS3DataProviderInitializer(IHTTPGateway::TPtr gateway if (gatewaysConfig) { state->Configuration->Init(gatewaysConfig->GetS3(), typeCtx); } - state->Configuration->AllowLocalFiles = allowLocalFiles; state->Gateway = gateway; TDataProviderInfo info; diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_provider.h b/ydb/library/yql/providers/s3/provider/yql_s3_provider.h index f5eaa96630..e02f3f4d3c 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_provider.h +++ b/ydb/library/yql/providers/s3/provider/yql_s3_provider.h @@ -35,7 +35,7 @@ struct TS3State : public TThrRefBase NActors::TActorSystem* ActorSystem = nullptr; }; -TDataProviderInitializer GetS3DataProviderInitializer(IHTTPGateway::TPtr gateway, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory = nullptr, bool allowLocalFiles = false, NActors::TActorSystem* actorSystem = nullptr); +TDataProviderInitializer GetS3DataProviderInitializer(IHTTPGateway::TPtr gateway, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory = nullptr, NActors::TActorSystem* actorSystem = nullptr); TIntrusivePtr<IDataProvider> CreateS3DataSource(TS3State::TPtr state); TIntrusivePtr<IDataProvider> CreateS3DataSink(TS3State::TPtr state); diff --git a/ydb/library/yql/tools/dqrun/dqrun.cpp b/ydb/library/yql/tools/dqrun/dqrun.cpp index 32de31a609..3a813dcc91 100644 --- a/ydb/library/yql/tools/dqrun/dqrun.cpp +++ b/ydb/library/yql/tools/dqrun/dqrun.cpp @@ -956,13 +956,14 @@ int RunMain(int argc, const char* argv[]) } if (gatewaysConfig.HasS3()) { + gatewaysConfig.MutableS3()->SetAllowLocalFiles(true); for (auto& cluster: gatewaysConfig.GetS3().GetClusterMapping()) { clusters.emplace(to_lower(cluster.GetName()), TString{S3ProviderName}); } if (!httpGateway) { httpGateway = IHTTPGateway::Make(gatewaysConfig.HasHttpGateway() ? &gatewaysConfig.GetHttpGateway() : nullptr); } - dataProvidersInit.push_back(GetS3DataProviderInitializer(httpGateway, nullptr, true, nullptr)); + dataProvidersInit.push_back(GetS3DataProviderInitializer(httpGateway, nullptr)); } if (gatewaysConfig.HasPq()) { diff --git a/ydb/tests/fq/common/conftest.py b/ydb/tests/fq/common/conftest.py index 9099b5e3a5..f9ff9230d0 100644 --- a/ydb/tests/fq/common/conftest.py +++ b/ydb/tests/fq/common/conftest.py @@ -18,9 +18,9 @@ from ydb.tests.tools.fq_runner.kikimr_utils import start_kikimr @pytest.fixture def kikimr(request: pytest.FixtureRequest, yq_version: str): kikimr_extensions = [ + AddFormatSizeLimitExtension(), AddInflightExtension(), AddDataInflightExtension(), - AddFormatSizeLimitExtension(), DefaultConfigExtension(''), YQv2Extension(yq_version), ComputeExtension(), diff --git a/ydb/tests/fq/plans/conftest.py b/ydb/tests/fq/plans/conftest.py index ffecb406cd..0114bf5727 100644 --- a/ydb/tests/fq/plans/conftest.py +++ b/ydb/tests/fq/plans/conftest.py @@ -61,9 +61,9 @@ def s3(request) -> S3: @pytest.fixture def kikimr(request: pytest.FixtureRequest, s3: S3, yq_version: str, stats_mode: str): kikimr_extensions = [ + AddFormatSizeLimitExtension(), AddInflightExtension(), AddDataInflightExtension(), - AddFormatSizeLimitExtension(), DefaultConfigExtension(s3.s3_url), YQv2Extension(yq_version), ComputeExtension(), diff --git a/ydb/tests/fq/restarts/conftest.py b/ydb/tests/fq/restarts/conftest.py index 2ae2fd611f..6780a04e24 100644 --- a/ydb/tests/fq/restarts/conftest.py +++ b/ydb/tests/fq/restarts/conftest.py @@ -66,9 +66,9 @@ def stats_mode(): @pytest.fixture def kikimr(request: pytest.FixtureRequest, s3: S3, yq_version: str, stats_mode: str): kikimr_extensions = [ + AddFormatSizeLimitExtension(), AddInflightExtension(), AddDataInflightExtension(), - AddFormatSizeLimitExtension(), DefaultConfigExtension(s3.s3_url), YQv2Extension(yq_version), ComputeExtension(), diff --git a/ydb/tests/fq/s3/conftest.py b/ydb/tests/fq/s3/conftest.py index 12eeea7118..42965ba92b 100644 --- a/ydb/tests/fq/s3/conftest.py +++ b/ydb/tests/fq/s3/conftest.py @@ -87,9 +87,9 @@ def kikimr_params(request: pytest.FixtureRequest): def get_kikimr_extensions(s3: S3, yq_version: str, kikimr_settings, mvp_external_ydb_endpoint): return [ + AddFormatSizeLimitExtension(), AddInflightExtension(), AddDataInflightExtension(), - AddFormatSizeLimitExtension(), DefaultConfigExtension(s3.s3_url), YQv2Extension(yq_version, kikimr_settings.get("is_replace_if_exists", False)), ComputeExtension(), diff --git a/ydb/tests/tools/fq_runner/kikimr_runner.py b/ydb/tests/tools/fq_runner/kikimr_runner.py index d0480e8dd5..a10bde35d1 100644 --- a/ydb/tests/tools/fq_runner/kikimr_runner.py +++ b/ydb/tests/tools/fq_runner/kikimr_runner.py @@ -520,14 +520,6 @@ class YqTenant(BaseTenant): fq_config['quotas_manager'] = {'enabled': True} self.fill_rate_limiter_config(fq_config['rate_limiter'], "RateLimiter_" + self.uuid) - fq_config['read_actors_factory_config'] = { - 's3_read_actor_factory_config': { - 'retry_config': { - 'max_retry_time_ms': 3000 - } - } - } - class TenantType(Enum): YQ = 1 diff --git a/ydb/tests/tools/fq_runner/kikimr_utils.py b/ydb/tests/tools/fq_runner/kikimr_utils.py index b3f86b84a3..c227b64b6f 100644 --- a/ydb/tests/tools/fq_runner/kikimr_utils.py +++ b/ydb/tests/tools/fq_runner/kikimr_utils.py @@ -46,8 +46,7 @@ class AddInflightExtension(ExtensionPoint): def apply_to_kikimr(self, request, kikimr): kikimr.inflight = request.param["inflight"] - kikimr.compute_plane.fq_config['read_actors_factory_config']['s3_read_actor_factory_config'][ - 'max_inflight'] = kikimr.inflight + kikimr.compute_plane.fq_config['gateways']['s3']['max_inflight'] = kikimr.inflight del request.param["inflight"] @@ -59,8 +58,7 @@ class AddDataInflightExtension(ExtensionPoint): def apply_to_kikimr(self, request, kikimr): kikimr.data_inflight = request.param["data_inflight"] - kikimr.compute_plane.fq_config['read_actors_factory_config']['s3_read_actor_factory_config'][ - 'data_inflight'] = kikimr.data_inflight + kikimr.compute_plane.fq_config['gateways']['s3']['data_inflight'] = kikimr.data_inflight del request.param["data_inflight"] |