diff options
author | hor911 <hor911@ydb.tech> | 2023-02-14 23:35:54 +0300 |
---|---|---|
committer | hor911 <hor911@ydb.tech> | 2023-02-14 23:35:54 +0300 |
commit | 13218a75e88b9127fd5da4c3216237dcc2ace935 (patch) | |
tree | 768856b72dfe83a57f2886b1a5a87865b3f461e6 | |
parent | eaef11f63bb056175fdc296f35574af4cffcc2fa (diff) | |
download | ydb-13218a75e88b9127fd5da4c3216237dcc2ace935.tar.gz |
Correct RR creation after crash on init
-rw-r--r-- | ydb/core/yq/libs/actors/pending_fetcher.cpp | 16 | ||||
-rw-r--r-- | ydb/core/yq/libs/actors/run_actor.cpp | 75 | ||||
-rw-r--r-- | ydb/core/yq/libs/actors/run_actor_params.cpp | 2 | ||||
-rw-r--r-- | ydb/core/yq/libs/actors/run_actor_params.h | 2 | ||||
-rw-r--r-- | ydb/core/yq/libs/common/util.h | 9 | ||||
-rw-r--r-- | ydb/core/yq/libs/protos/fq_private.proto | 9 | ||||
-rw-r--r-- | ydb/core/yq/libs/read_rule/read_rule_creator.cpp | 51 | ||||
-rw-r--r-- | ydb/core/yq/libs/read_rule/read_rule_creator.h | 8 | ||||
-rw-r--r-- | ydb/core/yq/libs/read_rule/read_rule_deleter.cpp | 9 | ||||
-rw-r--r-- | ydb/core/yq/libs/read_rule/read_rule_deleter.h | 2 |
10 files changed, 86 insertions, 97 deletions
diff --git a/ydb/core/yq/libs/actors/pending_fetcher.cpp b/ydb/core/yq/libs/actors/pending_fetcher.cpp index 2a1c1fa1fc3..d9d16da106e 100644 --- a/ydb/core/yq/libs/actors/pending_fetcher.cpp +++ b/ydb/core/yq/libs/actors/pending_fetcher.cpp @@ -45,6 +45,7 @@ #include <ydb/core/yq/libs/common/compression.h> #include <ydb/core/yq/libs/common/entity_id.h> +#include <ydb/core/yq/libs/common/util.h> #include <ydb/core/yq/libs/events/events.h> #include <ydb/core/yq/libs/config/protos/fq_config.pb.h> #include <ydb/core/yq/libs/config/protos/pinger.pb.h> @@ -99,11 +100,6 @@ struct TEvPrivate { }; }; -template <class TElement> -TVector<TElement> VectorFromProto(const ::google::protobuf::RepeatedPtrField<TElement>& field) { - return { field.begin(), field.end() }; -} - constexpr auto CLEANUP_PERIOD = TDuration::Seconds(60); } // namespace @@ -352,6 +348,13 @@ private: // todo: remove after migration dqGraphs = VectorFromProto(task.dq_graph()); } + + Fq::Private::TaskResources resources(task.resources()); + if (task.created_topic_consumers_size()) { + // todo: remove after migration + *resources.mutable_topic_consumers() = task.created_topic_consumers(); + } + TRunActorParams params( YqSharedResources, CredentialsProviderFactory, S3Gateway, FunctionRegistry, RandomProvider, @@ -378,7 +381,6 @@ private: VectorFromProto(task.result_set_meta()), std::move(dqGraphs), task.dq_graph_index(), - VectorFromProto(task.created_topic_consumers()), task.automatic(), task.query_name(), NProtoInterop::CastFromProto(task.deadline()), @@ -390,7 +392,7 @@ private: NProtoInterop::CastFromProto(task.request_started_at()), task.restart_count(), task.job_id().value(), - task.resources() + resources ); auto runActorId = Register(CreateRunActor(SelfId(), queryCounters, std::move(params))); diff --git a/ydb/core/yq/libs/actors/run_actor.cpp b/ydb/core/yq/libs/actors/run_actor.cpp index 383479a774b..66c074e6fd8 100644 --- a/ydb/core/yq/libs/actors/run_actor.cpp +++ b/ydb/core/yq/libs/actors/run_actor.cpp @@ -26,6 +26,7 @@ #include <ydb/library/yql/dq/integration/transform/yql_dq_task_transform.h> #include <ydb/library/yql/providers/pq/gateway/native/yql_pq_gateway.h> #include <ydb/library/yql/providers/pq/provider/yql_pq_provider.h> +#include <ydb/library/yql/providers/pq/proto/dq_io.pb.h> #include <ydb/library/yql/providers/pq/task_meta/task_meta.h> #include <ydb/library/yql/providers/s3/provider/yql_s3_provider.h> #include <ydb/library/yql/providers/ydb/provider/yql_ydb_provider.h> @@ -492,15 +493,15 @@ private: return; } - if (QueryStateUpdateRequest.resources().read_rules() == Fq::Private::TaskResources::PREPARE) { + if (QueryStateUpdateRequest.resources().topic_consumers_state() == Fq::Private::TaskResources::PREPARE) { if (!ReadRulesCreatorId) { ReadRulesCreatorId = Register( ::NYq::MakeReadRuleCreatorActor( SelfId(), Params.QueryId, Params.YqSharedResources->UserSpaceYdbDriver, - std::move(TopicsForConsumersCreation), - std::move(CredentialsForConsumersCreation) + Params.Resources.topic_consumers(), + PrepareReadRuleCredentials() ) ); } @@ -580,7 +581,7 @@ private: Issues.AddIssue("Internal Error"); if (!ConsumersAreDeleted) { - for (const Fq::Private::TopicConsumer& c : Params.CreatedTopicConsumers) { + for (const Fq::Private::TopicConsumer& c : Params.Resources.topic_consumers()) { TransientIssues.AddIssue(TStringBuilder() << "Created read rule `" << c.consumer_name() << "` for topic `" << c.topic_path() << "` (database id " << c.database_id() << ") maybe was left undeleted: internal error occurred"); TransientIssues.back().Severity = NYql::TSeverityIds::S_WARNING; } @@ -668,7 +669,6 @@ private: LOG_D("Graph " << graphIndex); graphIndex++; const TString consumerNamePrefix = graphIndex == 1 ? Params.QueryId : TStringBuilder() << Params.QueryId << '-' << graphIndex; // Simple name in simple case - const auto& secureParams = graphParams.GetSecureParams(); for (NYql::NDqProto::TDqTask& task : *graphParams.MutableTasks()) { for (NYql::NDqProto::TTaskInput& taskInput : *task.MutableInputs()) { if (taskInput.GetTypeCase() == NYql::NDqProto::TTaskInput::kSource && taskInput.GetSource().GetType() == "PqSource") { @@ -684,18 +684,16 @@ private: srcDesc.SetConsumerName(consumerName); settingsAny.PackFrom(srcDesc); if (isNewConsumer) { - auto s = consumerName; - LOG_D("Create consumer \"" << s << "\" for topic \"" << srcDesc.GetTopicPath() << "\""); - if (const TString& tokenName = srcDesc.GetToken().GetName()) { - const auto token = secureParams.find(tokenName); - YQL_ENSURE(token != secureParams.end(), "Token " << tokenName << " was not found in secure params"); - CredentialsForConsumersCreation.emplace_back( - CreateCredentialsProviderFactoryForStructuredToken(Params.CredentialsFactory, token->second, srcDesc.GetAddBearerToToken())); - } else { - CredentialsForConsumersCreation.emplace_back(NYdb::CreateInsecureCredentialsProviderFactory()); - } - - TopicsForConsumersCreation.emplace_back(std::move(srcDesc)); + LOG_D("Create consumer \"" << srcDesc.GetConsumerName() << "\" for topic \"" << srcDesc.GetTopicPath() << "\""); + auto& consumer = *QueryStateUpdateRequest.mutable_resources()->add_topic_consumers(); + consumer.set_database_id(srcDesc.GetDatabaseId()); + consumer.set_database(srcDesc.GetDatabase()); + consumer.set_topic_path(srcDesc.GetTopicPath()); + consumer.set_consumer_name(srcDesc.GetConsumerName()); + consumer.set_cluster_endpoint(srcDesc.GetEndpoint()); + consumer.set_use_ssl(srcDesc.GetUseSsl()); + consumer.set_token_name(srcDesc.GetToken().GetName()); + consumer.set_add_bearer_to_token(srcDesc.GetAddBearerToToken()); } } } @@ -726,8 +724,8 @@ private: if (ev->Cookie == SaveQueryInfoCookie) { QueryStateUpdateRequest.mutable_resources()->set_compilation(Fq::Private::TaskResources::READY); - QueryStateUpdateRequest.mutable_resources()->set_read_rules( - TopicsForConsumersCreation.size() ? Fq::Private::TaskResources::PREPARE : Fq::Private::TaskResources::NOT_NEEDED); + QueryStateUpdateRequest.mutable_resources()->set_topic_consumers_state( + QueryStateUpdateRequest.resources().topic_consumers().size() ? Fq::Private::TaskResources::PREPARE : Fq::Private::TaskResources::NOT_NEEDED); ProcessQuery(); } else if (ev->Cookie == SetLoadFromCheckpointModeCookie) { Send(ControlId, new TEvCheckpointCoordinator::TEvRunGraph()); @@ -898,23 +896,6 @@ private: CheckForConsumers(); - Params.CreatedTopicConsumers.clear(); - Params.CreatedTopicConsumers.reserve(TopicsForConsumersCreation.size()); - for (const NYql::NPq::NProto::TDqPqTopicSource& src : TopicsForConsumersCreation) { - auto& consumer = *request.add_created_topic_consumers(); - consumer.set_database_id(src.GetDatabaseId()); - consumer.set_database(src.GetDatabase()); - consumer.set_topic_path(src.GetTopicPath()); - consumer.set_consumer_name(src.GetConsumerName()); - consumer.set_cluster_endpoint(src.GetEndpoint()); - consumer.set_use_ssl(src.GetUseSsl()); - consumer.set_token_name(src.GetToken().GetName()); - consumer.set_add_bearer_to_token(src.GetAddBearerToToken()); - - // Save for deletion - Params.CreatedTopicConsumers.push_back(consumer); - } - for (const auto& graphParams : DqGraphParams) { const TString& serializedGraph = graphParams.SerializeAsString(); if (Compressor.IsEnabled()) { @@ -1175,7 +1156,7 @@ private: LOG_D(Issues.ToOneLineString()); Finish(YandexQuery::QueryMeta::FAILED); } else { - QueryStateUpdateRequest.mutable_resources()->set_read_rules(Fq::Private::TaskResources::READY); + QueryStateUpdateRequest.mutable_resources()->set_topic_consumers_state(Fq::Private::TaskResources::READY); ProcessQuery(); } } @@ -1204,17 +1185,18 @@ private: } bool NeedDeleteReadRules() const { - return !Params.CreatedTopicConsumers.empty(); + return Params.Resources.topic_consumers_state() == Fq::Private::TaskResources::PREPARE + || Params.Resources.topic_consumers_state() == Fq::Private::TaskResources::READY; } bool CanRunReadRulesDeletionActor() const { return !ReadRulesCreatorId && FinalizingStatusIsWritten && QueryResponseArrived; } - void RunReadRulesDeletionActor() { + TVector<std::shared_ptr<NYdb::ICredentialsProviderFactory>> PrepareReadRuleCredentials() { TVector<std::shared_ptr<NYdb::ICredentialsProviderFactory>> credentials; - credentials.reserve(Params.CreatedTopicConsumers.size()); - for (const Fq::Private::TopicConsumer& c : Params.CreatedTopicConsumers) { + credentials.reserve(Params.Resources.topic_consumers().size()); + for (const Fq::Private::TopicConsumer& c : Params.Resources.topic_consumers()) { if (const TString& tokenName = c.token_name()) { credentials.emplace_back( CreateCredentialsProviderFactoryForStructuredToken(Params.CredentialsFactory, FindTokenByName(tokenName), c.add_bearer_to_token())); @@ -1222,14 +1204,17 @@ private: credentials.emplace_back(NYdb::CreateInsecureCredentialsProviderFactory()); } } + return credentials; + } + void RunReadRulesDeletionActor() { Register( ::NYq::MakeReadRuleDeleterActor( SelfId(), Params.QueryId, Params.YqSharedResources->UserSpaceYdbDriver, - Params.CreatedTopicConsumers, - std::move(credentials) + Params.Resources.topic_consumers(), + PrepareReadRuleCredentials() ) ); } @@ -1933,7 +1918,7 @@ private: << " Status: " << YandexQuery::QueryMeta::ComputeStatus_Name(Params.Status) << " DqGraphs: " << Params.DqGraphs.size() << " DqGraphIndex: " << Params.DqGraphIndex - << " CreatedTopicConsumers: " << Params.CreatedTopicConsumers.size() + << " Resource.TopicConsumers: " << Params.Resources.topic_consumers().size() << " }"); } @@ -1969,8 +1954,6 @@ private: const TCompressor Compressor; // Consumers creation - TVector<NYql::NPq::NProto::TDqPqTopicSource> TopicsForConsumersCreation; - TVector<std::shared_ptr<NYdb::ICredentialsProviderFactory>> CredentialsForConsumersCreation; TMap<TString, TString> Statistics; NActors::TActorId ReadRulesCreatorId; diff --git a/ydb/core/yq/libs/actors/run_actor_params.cpp b/ydb/core/yq/libs/actors/run_actor_params.cpp index fea230bbc3d..94f0af5d02b 100644 --- a/ydb/core/yq/libs/actors/run_actor_params.cpp +++ b/ydb/core/yq/libs/actors/run_actor_params.cpp @@ -43,7 +43,6 @@ TRunActorParams::TRunActorParams( TVector<YandexQuery::ResultSetMeta> resultSetMetas, TVector<TString> dqGraphs, int32_t dqGraphIndex, - TVector<Fq::Private::TopicConsumer> createdTopicConsumers, bool automatic, const TString& queryName, const TInstant& deadline, @@ -95,7 +94,6 @@ TRunActorParams::TRunActorParams( , ResultSetMetas(std::move(resultSetMetas)) , DqGraphs(std::move(dqGraphs)) , DqGraphIndex(dqGraphIndex) - , CreatedTopicConsumers(std::move(createdTopicConsumers)) , Automatic(automatic) , QueryName(queryName) , Deadline(deadline) diff --git a/ydb/core/yq/libs/actors/run_actor_params.h b/ydb/core/yq/libs/actors/run_actor_params.h index 8717a2a5e1b..c6195babecf 100644 --- a/ydb/core/yq/libs/actors/run_actor_params.h +++ b/ydb/core/yq/libs/actors/run_actor_params.h @@ -60,7 +60,6 @@ struct TRunActorParams { // TODO2 : Change name TVector<YandexQuery::ResultSetMeta> resultSetMetas, TVector<TString> dqGraphs, int32_t dqGraphIndex, - TVector<Fq::Private::TopicConsumer> createdTopicConsumers, bool automatic, const TString& queryName, const TInstant& deadline, @@ -117,7 +116,6 @@ struct TRunActorParams { // TODO2 : Change name const TVector<YandexQuery::ResultSetMeta> ResultSetMetas; const TVector<TString> DqGraphs; const int32_t DqGraphIndex; - TVector<Fq::Private::TopicConsumer> CreatedTopicConsumers; const bool Automatic = false; const TString QueryName; diff --git a/ydb/core/yq/libs/common/util.h b/ydb/core/yq/libs/common/util.h index e8e78bf3c20..2c303600f81 100644 --- a/ydb/core/yq/libs/common/util.h +++ b/ydb/core/yq/libs/common/util.h @@ -3,6 +3,10 @@ #include <algorithm> #include <array> +#include <google/protobuf/repeated_field.h> + +#include <util/generic/vector.h> + namespace NYq { template<std::size_t K, typename T, std::size_t N> @@ -13,4 +17,9 @@ auto CreateArray(const T(&list)[N]) -> std::array<T, K> { return result; } +template <class TElement> +TVector<TElement> VectorFromProto(const ::google::protobuf::RepeatedPtrField<TElement>& field) { + return { field.begin(), field.end() }; +} + } // namespace NYq diff --git a/ydb/core/yq/libs/protos/fq_private.proto b/ydb/core/yq/libs/protos/fq_private.proto index 457c0f77f32..f463a373cc6 100644 --- a/ydb/core/yq/libs/protos/fq_private.proto +++ b/ydb/core/yq/libs/protos/fq_private.proto @@ -27,16 +27,17 @@ message TaskResources { enum ResourceState { UNSPECIFIED = 0; // on start NOT_NEEDED = 1; // is not configured to use - skip it - PREPARE = 2; // resource creating in progress (retriable) - READY = 3; // created and ready to go - CLEANUP = 4; // is being destroyin (cleanup) + PREPARE = 2; // resource creating in progress (retriable) + READY = 3; // created and ready to go + CLEANUP = 4; // is being destroyed (cleanup), may be removed } ResourceState rate_limiter = 1; ResourceState compilation = 2; - ResourceState read_rules = 3; + ResourceState topic_consumers_state = 3; string rate_limiter_path = 10; + repeated TopicConsumer topic_consumers = 11; } message GetTaskRequest { diff --git a/ydb/core/yq/libs/read_rule/read_rule_creator.cpp b/ydb/core/yq/libs/read_rule/read_rule_creator.cpp index 1ee5fa00774..2d1edb3105c 100644 --- a/ydb/core/yq/libs/read_rule/read_rule_creator.cpp +++ b/ydb/core/yq/libs/read_rule/read_rule_creator.cpp @@ -1,5 +1,6 @@ #include "read_rule_creator.h" +#include <ydb/core/yq/libs/common/util.h> #include <ydb/core/yq/libs/events/events.h> #include <ydb/core/protos/services.pb.h> @@ -70,13 +71,13 @@ public: NActors::TActorId owner, TString queryId, NYdb::TDriver ydbDriver, - NYql::NPq::NProto::TDqPqTopicSource topic, + const Fq::Private::TopicConsumer& topicConsumer, std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProvider, ui64 index ) : Owner(owner) , QueryId(std::move(queryId)) - , Topic(std::move(topic)) + , TopicConsumer(topicConsumer) , YdbDriver(std::move(ydbDriver)) , PqClient(YdbDriver, GetPqClientSettings(std::move(credentialsProvider))) , Index(index) @@ -92,24 +93,24 @@ public: TString GetTopicPath() const { TStringBuilder ret; - ret << Topic.GetDatabase(); + ret << TopicConsumer.database(); if (ret && ret.back() != '/') { ret << '/'; } - ret << Topic.GetTopicPath(); + ret << TopicConsumer.topic_path(); return std::move(ret); } void StartRequest() { Y_VERIFY(!RequestInFlight); RequestInFlight = true; - LOG_D("Make request for read rule creation for topic `" << Topic.GetTopicPath() << "` [" << Index << "]"); + LOG_D("Make request for read rule creation for topic `" << TopicConsumer.topic_path() << "` [" << Index << "]"); PqClient.AddReadRule( GetTopicPath(), NYdb::NPersQueue::TAddReadRuleSettings() .ReadRule( NYdb::NPersQueue::TReadRuleSettings() - .ConsumerName(Topic.GetConsumerName()) + .ConsumerName(TopicConsumer.consumer_name()) .ServiceType("yandex-query") .SupportedCodecs({ NYdb::NPersQueue::ECodec::RAW, @@ -141,7 +142,7 @@ public: nextRetryDelay = Nothing(); // Not retryable } - LOG_D("Failed to add read rule to `" << Topic.GetTopicPath() << "`: " << status.GetIssues().ToString() << ". Status: " << status.GetStatus() << ". Retry after: " << nextRetryDelay); + LOG_D("Failed to add read rule to `" << TopicConsumer.topic_path() << "`: " << status.GetIssues().ToOneLineString() << ". Status: " << status.GetStatus() << ". Retry after: " << nextRetryDelay); if (!nextRetryDelay) { // Not retryable Send(Owner, MakeHolder<TEvPrivate::TEvSingleReadRuleCreatorResult>(status.GetIssues()), 0, Index); PassAway(); @@ -184,17 +185,17 @@ private: NYdb::NPersQueue::TPersQueueClientSettings GetPqClientSettings(std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProvider) { return NYdb::NPersQueue::TPersQueueClientSettings() .ClusterDiscoveryMode(NYdb::NPersQueue::EClusterDiscoveryMode::Off) - .Database(Topic.GetDatabase()) - .DiscoveryEndpoint(Topic.GetEndpoint()) + .Database(TopicConsumer.database()) + .DiscoveryEndpoint(TopicConsumer.cluster_endpoint()) .CredentialsProviderFactory(std::move(credentialsProvider)) .DiscoveryMode(NYdb::EDiscoveryMode::Async) - .SslCredentials(NYdb::TSslCredentials(Topic.GetUseSsl())); + .SslCredentials(NYdb::TSslCredentials(TopicConsumer.use_ssl())); } private: const NActors::TActorId Owner; const TString QueryId; - const NYql::NPq::NProto::TDqPqTopicSource Topic; + const Fq::Private::TopicConsumer TopicConsumer; NYdb::TDriver YdbDriver; NYdb::NPersQueue::TPersQueueClient PqClient; ui64 Index = 0; @@ -210,17 +211,17 @@ public: NActors::TActorId owner, TString queryId, NYdb::TDriver ydbDriver, - TVector<NYql::NPq::NProto::TDqPqTopicSource> topics, + const ::google::protobuf::RepeatedPtrField<Fq::Private::TopicConsumer>& topicConsumers, TVector<std::shared_ptr<NYdb::ICredentialsProviderFactory>> credentials ) : Owner(owner) , QueryId(std::move(queryId)) , YdbDriver(std::move(ydbDriver)) - , Topics(std::move(topics)) + , TopicConsumers(VectorFromProto(topicConsumers)) , Credentials(std::move(credentials)) { - Y_VERIFY(!Topics.empty()); - Results.resize(Topics.size()); + Y_VERIFY(!TopicConsumers.empty()); + Results.resize(TopicConsumers.size()); } static constexpr char ActorName[] = "YQ_READ_RULE_CREATOR"; @@ -228,11 +229,11 @@ public: void Bootstrap() { Become(&TReadRuleCreator::StateFunc); - Children.reserve(Topics.size()); - Results.reserve(Topics.size()); - for (size_t i = 0; i < Topics.size(); ++i) { - LOG_D("Create read rule creation actor for `" << Topics[i].GetTopicPath() << "` [" << i << "]"); - Children.push_back(Register(new TSingleReadRuleCreator(SelfId(), QueryId, YdbDriver, Topics[i], Credentials[i], i))); + Children.reserve(TopicConsumers.size()); + Results.reserve(TopicConsumers.size()); + for (size_t i = 0; i < TopicConsumers.size(); ++i) { + LOG_D("Create read rule creation actor for `" << TopicConsumers[i].topic_path() << "` [" << i << "]"); + Children.push_back(Register(new TSingleReadRuleCreator(SelfId(), QueryId, YdbDriver, TopicConsumers[i], Credentials[i], i))); } } @@ -255,8 +256,8 @@ public: } void SendResultsAndPassAwayIfDone() { - Y_VERIFY(ResultsGot <= Topics.size()); - if (ResultsGot == Topics.size()) { + Y_VERIFY(ResultsGot <= TopicConsumers.size()); + if (ResultsGot == TopicConsumers.size()) { NYql::TIssues issues; if (!Ok) { NYql::TIssue mainIssue("Failed to create read rules for topics"); @@ -281,7 +282,7 @@ private: const NActors::TActorId Owner; const TString QueryId; NYdb::TDriver YdbDriver; - const TVector<NYql::NPq::NProto::TDqPqTopicSource> Topics; + const TVector<Fq::Private::TopicConsumer> TopicConsumers; const TVector<std::shared_ptr<NYdb::ICredentialsProviderFactory>> Credentials; size_t ResultsGot = 0; bool Ok = true; @@ -295,7 +296,7 @@ NActors::IActor* MakeReadRuleCreatorActor( NActors::TActorId owner, TString queryId, NYdb::TDriver ydbDriver, - TVector<NYql::NPq::NProto::TDqPqTopicSource> topics, + const ::google::protobuf::RepeatedPtrField<Fq::Private::TopicConsumer>& topicConsumers, TVector<std::shared_ptr<NYdb::ICredentialsProviderFactory>> credentials ) { @@ -303,7 +304,7 @@ NActors::IActor* MakeReadRuleCreatorActor( owner, std::move(queryId), std::move(ydbDriver), - std::move(topics), + topicConsumers, std::move(credentials) ); } diff --git a/ydb/core/yq/libs/read_rule/read_rule_creator.h b/ydb/core/yq/libs/read_rule/read_rule_creator.h index 66f0f7c2c32..265016a397b 100644 --- a/ydb/core/yq/libs/read_rule/read_rule_creator.h +++ b/ydb/core/yq/libs/read_rule/read_rule_creator.h @@ -1,21 +1,17 @@ #pragma once -#include <ydb/library/yql/providers/pq/proto/dq_io.pb.h> +#include <ydb/core/yq/libs/protos/fq_private.pb.h> #include <ydb/public/sdk/cpp/client/ydb_driver/driver.h> #include <library/cpp/actors/core/actor.h> -#include <util/generic/maybe.h> - -#include <google/protobuf/any.pb.h> - namespace NYq { NActors::IActor* MakeReadRuleCreatorActor( NActors::TActorId owner, TString queryId, NYdb::TDriver ydbDriver, - TVector<NYql::NPq::NProto::TDqPqTopicSource> topics, + const ::google::protobuf::RepeatedPtrField<Fq::Private::TopicConsumer>& topicConsumers, TVector<std::shared_ptr<NYdb::ICredentialsProviderFactory>> credentials // For each topic ); diff --git a/ydb/core/yq/libs/read_rule/read_rule_deleter.cpp b/ydb/core/yq/libs/read_rule/read_rule_deleter.cpp index 3db6600584a..2eed047f740 100644 --- a/ydb/core/yq/libs/read_rule/read_rule_deleter.cpp +++ b/ydb/core/yq/libs/read_rule/read_rule_deleter.cpp @@ -1,5 +1,6 @@ #include "read_rule_deleter.h" +#include <ydb/core/yq/libs/common/util.h> #include <ydb/core/yq/libs/events/events.h> #include <ydb/core/protos/services.pb.h> @@ -183,14 +184,14 @@ public: NActors::TActorId owner, TString queryId, NYdb::TDriver ydbDriver, - TVector<Fq::Private::TopicConsumer> topics, + const ::google::protobuf::RepeatedPtrField<Fq::Private::TopicConsumer>& topicConsumers, TVector<std::shared_ptr<NYdb::ICredentialsProviderFactory>> credentials, size_t maxRetries ) : Owner(owner) , QueryId(std::move(queryId)) , YdbDriver(std::move(ydbDriver)) - , Topics(std::move(topics)) + , Topics(VectorFromProto(topicConsumers)) , Credentials(std::move(credentials)) , MaxRetries(maxRetries) { @@ -271,7 +272,7 @@ NActors::IActor* MakeReadRuleDeleterActor( NActors::TActorId owner, TString queryId, NYdb::TDriver ydbDriver, - TVector<Fq::Private::TopicConsumer> topics, + const ::google::protobuf::RepeatedPtrField<Fq::Private::TopicConsumer>& topicConsumers, TVector<std::shared_ptr<NYdb::ICredentialsProviderFactory>> credentials, // For each topic size_t maxRetries ) @@ -280,7 +281,7 @@ NActors::IActor* MakeReadRuleDeleterActor( owner, std::move(queryId), std::move(ydbDriver), - std::move(topics), + topicConsumers, std::move(credentials), maxRetries ); diff --git a/ydb/core/yq/libs/read_rule/read_rule_deleter.h b/ydb/core/yq/libs/read_rule/read_rule_deleter.h index 69a2f256802..07bd70787b8 100644 --- a/ydb/core/yq/libs/read_rule/read_rule_deleter.h +++ b/ydb/core/yq/libs/read_rule/read_rule_deleter.h @@ -11,7 +11,7 @@ NActors::IActor* MakeReadRuleDeleterActor( NActors::TActorId owner, TString queryId, NYdb::TDriver ydbDriver, - TVector<Fq::Private::TopicConsumer> topics, + const ::google::protobuf::RepeatedPtrField<Fq::Private::TopicConsumer>& topicConsumers, TVector<std::shared_ptr<NYdb::ICredentialsProviderFactory>> credentials, // For each topic size_t maxRetries = 15 ); |