aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <hor911@ydb.tech>2023-02-14 23:35:54 +0300
committerhor911 <hor911@ydb.tech>2023-02-14 23:35:54 +0300
commit13218a75e88b9127fd5da4c3216237dcc2ace935 (patch)
tree768856b72dfe83a57f2886b1a5a87865b3f461e6
parenteaef11f63bb056175fdc296f35574af4cffcc2fa (diff)
downloadydb-13218a75e88b9127fd5da4c3216237dcc2ace935.tar.gz
Correct RR creation after crash on init
-rw-r--r--ydb/core/yq/libs/actors/pending_fetcher.cpp16
-rw-r--r--ydb/core/yq/libs/actors/run_actor.cpp75
-rw-r--r--ydb/core/yq/libs/actors/run_actor_params.cpp2
-rw-r--r--ydb/core/yq/libs/actors/run_actor_params.h2
-rw-r--r--ydb/core/yq/libs/common/util.h9
-rw-r--r--ydb/core/yq/libs/protos/fq_private.proto9
-rw-r--r--ydb/core/yq/libs/read_rule/read_rule_creator.cpp51
-rw-r--r--ydb/core/yq/libs/read_rule/read_rule_creator.h8
-rw-r--r--ydb/core/yq/libs/read_rule/read_rule_deleter.cpp9
-rw-r--r--ydb/core/yq/libs/read_rule/read_rule_deleter.h2
10 files changed, 86 insertions, 97 deletions
diff --git a/ydb/core/yq/libs/actors/pending_fetcher.cpp b/ydb/core/yq/libs/actors/pending_fetcher.cpp
index 2a1c1fa1fc3..d9d16da106e 100644
--- a/ydb/core/yq/libs/actors/pending_fetcher.cpp
+++ b/ydb/core/yq/libs/actors/pending_fetcher.cpp
@@ -45,6 +45,7 @@
#include <ydb/core/yq/libs/common/compression.h>
#include <ydb/core/yq/libs/common/entity_id.h>
+#include <ydb/core/yq/libs/common/util.h>
#include <ydb/core/yq/libs/events/events.h>
#include <ydb/core/yq/libs/config/protos/fq_config.pb.h>
#include <ydb/core/yq/libs/config/protos/pinger.pb.h>
@@ -99,11 +100,6 @@ struct TEvPrivate {
};
};
-template <class TElement>
-TVector<TElement> VectorFromProto(const ::google::protobuf::RepeatedPtrField<TElement>& field) {
- return { field.begin(), field.end() };
-}
-
constexpr auto CLEANUP_PERIOD = TDuration::Seconds(60);
} // namespace
@@ -352,6 +348,13 @@ private:
// todo: remove after migration
dqGraphs = VectorFromProto(task.dq_graph());
}
+
+ Fq::Private::TaskResources resources(task.resources());
+ if (task.created_topic_consumers_size()) {
+ // todo: remove after migration
+ *resources.mutable_topic_consumers() = task.created_topic_consumers();
+ }
+
TRunActorParams params(
YqSharedResources, CredentialsProviderFactory, S3Gateway,
FunctionRegistry, RandomProvider,
@@ -378,7 +381,6 @@ private:
VectorFromProto(task.result_set_meta()),
std::move(dqGraphs),
task.dq_graph_index(),
- VectorFromProto(task.created_topic_consumers()),
task.automatic(),
task.query_name(),
NProtoInterop::CastFromProto(task.deadline()),
@@ -390,7 +392,7 @@ private:
NProtoInterop::CastFromProto(task.request_started_at()),
task.restart_count(),
task.job_id().value(),
- task.resources()
+ resources
);
auto runActorId = Register(CreateRunActor(SelfId(), queryCounters, std::move(params)));
diff --git a/ydb/core/yq/libs/actors/run_actor.cpp b/ydb/core/yq/libs/actors/run_actor.cpp
index 383479a774b..66c074e6fd8 100644
--- a/ydb/core/yq/libs/actors/run_actor.cpp
+++ b/ydb/core/yq/libs/actors/run_actor.cpp
@@ -26,6 +26,7 @@
#include <ydb/library/yql/dq/integration/transform/yql_dq_task_transform.h>
#include <ydb/library/yql/providers/pq/gateway/native/yql_pq_gateway.h>
#include <ydb/library/yql/providers/pq/provider/yql_pq_provider.h>
+#include <ydb/library/yql/providers/pq/proto/dq_io.pb.h>
#include <ydb/library/yql/providers/pq/task_meta/task_meta.h>
#include <ydb/library/yql/providers/s3/provider/yql_s3_provider.h>
#include <ydb/library/yql/providers/ydb/provider/yql_ydb_provider.h>
@@ -492,15 +493,15 @@ private:
return;
}
- if (QueryStateUpdateRequest.resources().read_rules() == Fq::Private::TaskResources::PREPARE) {
+ if (QueryStateUpdateRequest.resources().topic_consumers_state() == Fq::Private::TaskResources::PREPARE) {
if (!ReadRulesCreatorId) {
ReadRulesCreatorId = Register(
::NYq::MakeReadRuleCreatorActor(
SelfId(),
Params.QueryId,
Params.YqSharedResources->UserSpaceYdbDriver,
- std::move(TopicsForConsumersCreation),
- std::move(CredentialsForConsumersCreation)
+ Params.Resources.topic_consumers(),
+ PrepareReadRuleCredentials()
)
);
}
@@ -580,7 +581,7 @@ private:
Issues.AddIssue("Internal Error");
if (!ConsumersAreDeleted) {
- for (const Fq::Private::TopicConsumer& c : Params.CreatedTopicConsumers) {
+ for (const Fq::Private::TopicConsumer& c : Params.Resources.topic_consumers()) {
TransientIssues.AddIssue(TStringBuilder() << "Created read rule `" << c.consumer_name() << "` for topic `" << c.topic_path() << "` (database id " << c.database_id() << ") maybe was left undeleted: internal error occurred");
TransientIssues.back().Severity = NYql::TSeverityIds::S_WARNING;
}
@@ -668,7 +669,6 @@ private:
LOG_D("Graph " << graphIndex);
graphIndex++;
const TString consumerNamePrefix = graphIndex == 1 ? Params.QueryId : TStringBuilder() << Params.QueryId << '-' << graphIndex; // Simple name in simple case
- const auto& secureParams = graphParams.GetSecureParams();
for (NYql::NDqProto::TDqTask& task : *graphParams.MutableTasks()) {
for (NYql::NDqProto::TTaskInput& taskInput : *task.MutableInputs()) {
if (taskInput.GetTypeCase() == NYql::NDqProto::TTaskInput::kSource && taskInput.GetSource().GetType() == "PqSource") {
@@ -684,18 +684,16 @@ private:
srcDesc.SetConsumerName(consumerName);
settingsAny.PackFrom(srcDesc);
if (isNewConsumer) {
- auto s = consumerName;
- LOG_D("Create consumer \"" << s << "\" for topic \"" << srcDesc.GetTopicPath() << "\"");
- if (const TString& tokenName = srcDesc.GetToken().GetName()) {
- const auto token = secureParams.find(tokenName);
- YQL_ENSURE(token != secureParams.end(), "Token " << tokenName << " was not found in secure params");
- CredentialsForConsumersCreation.emplace_back(
- CreateCredentialsProviderFactoryForStructuredToken(Params.CredentialsFactory, token->second, srcDesc.GetAddBearerToToken()));
- } else {
- CredentialsForConsumersCreation.emplace_back(NYdb::CreateInsecureCredentialsProviderFactory());
- }
-
- TopicsForConsumersCreation.emplace_back(std::move(srcDesc));
+ LOG_D("Create consumer \"" << srcDesc.GetConsumerName() << "\" for topic \"" << srcDesc.GetTopicPath() << "\"");
+ auto& consumer = *QueryStateUpdateRequest.mutable_resources()->add_topic_consumers();
+ consumer.set_database_id(srcDesc.GetDatabaseId());
+ consumer.set_database(srcDesc.GetDatabase());
+ consumer.set_topic_path(srcDesc.GetTopicPath());
+ consumer.set_consumer_name(srcDesc.GetConsumerName());
+ consumer.set_cluster_endpoint(srcDesc.GetEndpoint());
+ consumer.set_use_ssl(srcDesc.GetUseSsl());
+ consumer.set_token_name(srcDesc.GetToken().GetName());
+ consumer.set_add_bearer_to_token(srcDesc.GetAddBearerToToken());
}
}
}
@@ -726,8 +724,8 @@ private:
if (ev->Cookie == SaveQueryInfoCookie) {
QueryStateUpdateRequest.mutable_resources()->set_compilation(Fq::Private::TaskResources::READY);
- QueryStateUpdateRequest.mutable_resources()->set_read_rules(
- TopicsForConsumersCreation.size() ? Fq::Private::TaskResources::PREPARE : Fq::Private::TaskResources::NOT_NEEDED);
+ QueryStateUpdateRequest.mutable_resources()->set_topic_consumers_state(
+ QueryStateUpdateRequest.resources().topic_consumers().size() ? Fq::Private::TaskResources::PREPARE : Fq::Private::TaskResources::NOT_NEEDED);
ProcessQuery();
} else if (ev->Cookie == SetLoadFromCheckpointModeCookie) {
Send(ControlId, new TEvCheckpointCoordinator::TEvRunGraph());
@@ -898,23 +896,6 @@ private:
CheckForConsumers();
- Params.CreatedTopicConsumers.clear();
- Params.CreatedTopicConsumers.reserve(TopicsForConsumersCreation.size());
- for (const NYql::NPq::NProto::TDqPqTopicSource& src : TopicsForConsumersCreation) {
- auto& consumer = *request.add_created_topic_consumers();
- consumer.set_database_id(src.GetDatabaseId());
- consumer.set_database(src.GetDatabase());
- consumer.set_topic_path(src.GetTopicPath());
- consumer.set_consumer_name(src.GetConsumerName());
- consumer.set_cluster_endpoint(src.GetEndpoint());
- consumer.set_use_ssl(src.GetUseSsl());
- consumer.set_token_name(src.GetToken().GetName());
- consumer.set_add_bearer_to_token(src.GetAddBearerToToken());
-
- // Save for deletion
- Params.CreatedTopicConsumers.push_back(consumer);
- }
-
for (const auto& graphParams : DqGraphParams) {
const TString& serializedGraph = graphParams.SerializeAsString();
if (Compressor.IsEnabled()) {
@@ -1175,7 +1156,7 @@ private:
LOG_D(Issues.ToOneLineString());
Finish(YandexQuery::QueryMeta::FAILED);
} else {
- QueryStateUpdateRequest.mutable_resources()->set_read_rules(Fq::Private::TaskResources::READY);
+ QueryStateUpdateRequest.mutable_resources()->set_topic_consumers_state(Fq::Private::TaskResources::READY);
ProcessQuery();
}
}
@@ -1204,17 +1185,18 @@ private:
}
bool NeedDeleteReadRules() const {
- return !Params.CreatedTopicConsumers.empty();
+ return Params.Resources.topic_consumers_state() == Fq::Private::TaskResources::PREPARE
+ || Params.Resources.topic_consumers_state() == Fq::Private::TaskResources::READY;
}
bool CanRunReadRulesDeletionActor() const {
return !ReadRulesCreatorId && FinalizingStatusIsWritten && QueryResponseArrived;
}
- void RunReadRulesDeletionActor() {
+ TVector<std::shared_ptr<NYdb::ICredentialsProviderFactory>> PrepareReadRuleCredentials() {
TVector<std::shared_ptr<NYdb::ICredentialsProviderFactory>> credentials;
- credentials.reserve(Params.CreatedTopicConsumers.size());
- for (const Fq::Private::TopicConsumer& c : Params.CreatedTopicConsumers) {
+ credentials.reserve(Params.Resources.topic_consumers().size());
+ for (const Fq::Private::TopicConsumer& c : Params.Resources.topic_consumers()) {
if (const TString& tokenName = c.token_name()) {
credentials.emplace_back(
CreateCredentialsProviderFactoryForStructuredToken(Params.CredentialsFactory, FindTokenByName(tokenName), c.add_bearer_to_token()));
@@ -1222,14 +1204,17 @@ private:
credentials.emplace_back(NYdb::CreateInsecureCredentialsProviderFactory());
}
}
+ return credentials;
+ }
+ void RunReadRulesDeletionActor() {
Register(
::NYq::MakeReadRuleDeleterActor(
SelfId(),
Params.QueryId,
Params.YqSharedResources->UserSpaceYdbDriver,
- Params.CreatedTopicConsumers,
- std::move(credentials)
+ Params.Resources.topic_consumers(),
+ PrepareReadRuleCredentials()
)
);
}
@@ -1933,7 +1918,7 @@ private:
<< " Status: " << YandexQuery::QueryMeta::ComputeStatus_Name(Params.Status)
<< " DqGraphs: " << Params.DqGraphs.size()
<< " DqGraphIndex: " << Params.DqGraphIndex
- << " CreatedTopicConsumers: " << Params.CreatedTopicConsumers.size()
+ << " Resource.TopicConsumers: " << Params.Resources.topic_consumers().size()
<< " }");
}
@@ -1969,8 +1954,6 @@ private:
const TCompressor Compressor;
// Consumers creation
- TVector<NYql::NPq::NProto::TDqPqTopicSource> TopicsForConsumersCreation;
- TVector<std::shared_ptr<NYdb::ICredentialsProviderFactory>> CredentialsForConsumersCreation;
TMap<TString, TString> Statistics;
NActors::TActorId ReadRulesCreatorId;
diff --git a/ydb/core/yq/libs/actors/run_actor_params.cpp b/ydb/core/yq/libs/actors/run_actor_params.cpp
index fea230bbc3d..94f0af5d02b 100644
--- a/ydb/core/yq/libs/actors/run_actor_params.cpp
+++ b/ydb/core/yq/libs/actors/run_actor_params.cpp
@@ -43,7 +43,6 @@ TRunActorParams::TRunActorParams(
TVector<YandexQuery::ResultSetMeta> resultSetMetas,
TVector<TString> dqGraphs,
int32_t dqGraphIndex,
- TVector<Fq::Private::TopicConsumer> createdTopicConsumers,
bool automatic,
const TString& queryName,
const TInstant& deadline,
@@ -95,7 +94,6 @@ TRunActorParams::TRunActorParams(
, ResultSetMetas(std::move(resultSetMetas))
, DqGraphs(std::move(dqGraphs))
, DqGraphIndex(dqGraphIndex)
- , CreatedTopicConsumers(std::move(createdTopicConsumers))
, Automatic(automatic)
, QueryName(queryName)
, Deadline(deadline)
diff --git a/ydb/core/yq/libs/actors/run_actor_params.h b/ydb/core/yq/libs/actors/run_actor_params.h
index 8717a2a5e1b..c6195babecf 100644
--- a/ydb/core/yq/libs/actors/run_actor_params.h
+++ b/ydb/core/yq/libs/actors/run_actor_params.h
@@ -60,7 +60,6 @@ struct TRunActorParams { // TODO2 : Change name
TVector<YandexQuery::ResultSetMeta> resultSetMetas,
TVector<TString> dqGraphs,
int32_t dqGraphIndex,
- TVector<Fq::Private::TopicConsumer> createdTopicConsumers,
bool automatic,
const TString& queryName,
const TInstant& deadline,
@@ -117,7 +116,6 @@ struct TRunActorParams { // TODO2 : Change name
const TVector<YandexQuery::ResultSetMeta> ResultSetMetas;
const TVector<TString> DqGraphs;
const int32_t DqGraphIndex;
- TVector<Fq::Private::TopicConsumer> CreatedTopicConsumers;
const bool Automatic = false;
const TString QueryName;
diff --git a/ydb/core/yq/libs/common/util.h b/ydb/core/yq/libs/common/util.h
index e8e78bf3c20..2c303600f81 100644
--- a/ydb/core/yq/libs/common/util.h
+++ b/ydb/core/yq/libs/common/util.h
@@ -3,6 +3,10 @@
#include <algorithm>
#include <array>
+#include <google/protobuf/repeated_field.h>
+
+#include <util/generic/vector.h>
+
namespace NYq {
template<std::size_t K, typename T, std::size_t N>
@@ -13,4 +17,9 @@ auto CreateArray(const T(&list)[N]) -> std::array<T, K> {
return result;
}
+template <class TElement>
+TVector<TElement> VectorFromProto(const ::google::protobuf::RepeatedPtrField<TElement>& field) {
+ return { field.begin(), field.end() };
+}
+
} // namespace NYq
diff --git a/ydb/core/yq/libs/protos/fq_private.proto b/ydb/core/yq/libs/protos/fq_private.proto
index 457c0f77f32..f463a373cc6 100644
--- a/ydb/core/yq/libs/protos/fq_private.proto
+++ b/ydb/core/yq/libs/protos/fq_private.proto
@@ -27,16 +27,17 @@ message TaskResources {
enum ResourceState {
UNSPECIFIED = 0; // on start
NOT_NEEDED = 1; // is not configured to use - skip it
- PREPARE = 2; // resource creating in progress (retriable)
- READY = 3; // created and ready to go
- CLEANUP = 4; // is being destroyin (cleanup)
+ PREPARE = 2; // resource creating in progress (retriable)
+ READY = 3; // created and ready to go
+ CLEANUP = 4; // is being destroyed (cleanup), may be removed
}
ResourceState rate_limiter = 1;
ResourceState compilation = 2;
- ResourceState read_rules = 3;
+ ResourceState topic_consumers_state = 3;
string rate_limiter_path = 10;
+ repeated TopicConsumer topic_consumers = 11;
}
message GetTaskRequest {
diff --git a/ydb/core/yq/libs/read_rule/read_rule_creator.cpp b/ydb/core/yq/libs/read_rule/read_rule_creator.cpp
index 1ee5fa00774..2d1edb3105c 100644
--- a/ydb/core/yq/libs/read_rule/read_rule_creator.cpp
+++ b/ydb/core/yq/libs/read_rule/read_rule_creator.cpp
@@ -1,5 +1,6 @@
#include "read_rule_creator.h"
+#include <ydb/core/yq/libs/common/util.h>
#include <ydb/core/yq/libs/events/events.h>
#include <ydb/core/protos/services.pb.h>
@@ -70,13 +71,13 @@ public:
NActors::TActorId owner,
TString queryId,
NYdb::TDriver ydbDriver,
- NYql::NPq::NProto::TDqPqTopicSource topic,
+ const Fq::Private::TopicConsumer& topicConsumer,
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProvider,
ui64 index
)
: Owner(owner)
, QueryId(std::move(queryId))
- , Topic(std::move(topic))
+ , TopicConsumer(topicConsumer)
, YdbDriver(std::move(ydbDriver))
, PqClient(YdbDriver, GetPqClientSettings(std::move(credentialsProvider)))
, Index(index)
@@ -92,24 +93,24 @@ public:
TString GetTopicPath() const {
TStringBuilder ret;
- ret << Topic.GetDatabase();
+ ret << TopicConsumer.database();
if (ret && ret.back() != '/') {
ret << '/';
}
- ret << Topic.GetTopicPath();
+ ret << TopicConsumer.topic_path();
return std::move(ret);
}
void StartRequest() {
Y_VERIFY(!RequestInFlight);
RequestInFlight = true;
- LOG_D("Make request for read rule creation for topic `" << Topic.GetTopicPath() << "` [" << Index << "]");
+ LOG_D("Make request for read rule creation for topic `" << TopicConsumer.topic_path() << "` [" << Index << "]");
PqClient.AddReadRule(
GetTopicPath(),
NYdb::NPersQueue::TAddReadRuleSettings()
.ReadRule(
NYdb::NPersQueue::TReadRuleSettings()
- .ConsumerName(Topic.GetConsumerName())
+ .ConsumerName(TopicConsumer.consumer_name())
.ServiceType("yandex-query")
.SupportedCodecs({
NYdb::NPersQueue::ECodec::RAW,
@@ -141,7 +142,7 @@ public:
nextRetryDelay = Nothing(); // Not retryable
}
- LOG_D("Failed to add read rule to `" << Topic.GetTopicPath() << "`: " << status.GetIssues().ToString() << ". Status: " << status.GetStatus() << ". Retry after: " << nextRetryDelay);
+ LOG_D("Failed to add read rule to `" << TopicConsumer.topic_path() << "`: " << status.GetIssues().ToOneLineString() << ". Status: " << status.GetStatus() << ". Retry after: " << nextRetryDelay);
if (!nextRetryDelay) { // Not retryable
Send(Owner, MakeHolder<TEvPrivate::TEvSingleReadRuleCreatorResult>(status.GetIssues()), 0, Index);
PassAway();
@@ -184,17 +185,17 @@ private:
NYdb::NPersQueue::TPersQueueClientSettings GetPqClientSettings(std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProvider) {
return NYdb::NPersQueue::TPersQueueClientSettings()
.ClusterDiscoveryMode(NYdb::NPersQueue::EClusterDiscoveryMode::Off)
- .Database(Topic.GetDatabase())
- .DiscoveryEndpoint(Topic.GetEndpoint())
+ .Database(TopicConsumer.database())
+ .DiscoveryEndpoint(TopicConsumer.cluster_endpoint())
.CredentialsProviderFactory(std::move(credentialsProvider))
.DiscoveryMode(NYdb::EDiscoveryMode::Async)
- .SslCredentials(NYdb::TSslCredentials(Topic.GetUseSsl()));
+ .SslCredentials(NYdb::TSslCredentials(TopicConsumer.use_ssl()));
}
private:
const NActors::TActorId Owner;
const TString QueryId;
- const NYql::NPq::NProto::TDqPqTopicSource Topic;
+ const Fq::Private::TopicConsumer TopicConsumer;
NYdb::TDriver YdbDriver;
NYdb::NPersQueue::TPersQueueClient PqClient;
ui64 Index = 0;
@@ -210,17 +211,17 @@ public:
NActors::TActorId owner,
TString queryId,
NYdb::TDriver ydbDriver,
- TVector<NYql::NPq::NProto::TDqPqTopicSource> topics,
+ const ::google::protobuf::RepeatedPtrField<Fq::Private::TopicConsumer>& topicConsumers,
TVector<std::shared_ptr<NYdb::ICredentialsProviderFactory>> credentials
)
: Owner(owner)
, QueryId(std::move(queryId))
, YdbDriver(std::move(ydbDriver))
- , Topics(std::move(topics))
+ , TopicConsumers(VectorFromProto(topicConsumers))
, Credentials(std::move(credentials))
{
- Y_VERIFY(!Topics.empty());
- Results.resize(Topics.size());
+ Y_VERIFY(!TopicConsumers.empty());
+ Results.resize(TopicConsumers.size());
}
static constexpr char ActorName[] = "YQ_READ_RULE_CREATOR";
@@ -228,11 +229,11 @@ public:
void Bootstrap() {
Become(&TReadRuleCreator::StateFunc);
- Children.reserve(Topics.size());
- Results.reserve(Topics.size());
- for (size_t i = 0; i < Topics.size(); ++i) {
- LOG_D("Create read rule creation actor for `" << Topics[i].GetTopicPath() << "` [" << i << "]");
- Children.push_back(Register(new TSingleReadRuleCreator(SelfId(), QueryId, YdbDriver, Topics[i], Credentials[i], i)));
+ Children.reserve(TopicConsumers.size());
+ Results.reserve(TopicConsumers.size());
+ for (size_t i = 0; i < TopicConsumers.size(); ++i) {
+ LOG_D("Create read rule creation actor for `" << TopicConsumers[i].topic_path() << "` [" << i << "]");
+ Children.push_back(Register(new TSingleReadRuleCreator(SelfId(), QueryId, YdbDriver, TopicConsumers[i], Credentials[i], i)));
}
}
@@ -255,8 +256,8 @@ public:
}
void SendResultsAndPassAwayIfDone() {
- Y_VERIFY(ResultsGot <= Topics.size());
- if (ResultsGot == Topics.size()) {
+ Y_VERIFY(ResultsGot <= TopicConsumers.size());
+ if (ResultsGot == TopicConsumers.size()) {
NYql::TIssues issues;
if (!Ok) {
NYql::TIssue mainIssue("Failed to create read rules for topics");
@@ -281,7 +282,7 @@ private:
const NActors::TActorId Owner;
const TString QueryId;
NYdb::TDriver YdbDriver;
- const TVector<NYql::NPq::NProto::TDqPqTopicSource> Topics;
+ const TVector<Fq::Private::TopicConsumer> TopicConsumers;
const TVector<std::shared_ptr<NYdb::ICredentialsProviderFactory>> Credentials;
size_t ResultsGot = 0;
bool Ok = true;
@@ -295,7 +296,7 @@ NActors::IActor* MakeReadRuleCreatorActor(
NActors::TActorId owner,
TString queryId,
NYdb::TDriver ydbDriver,
- TVector<NYql::NPq::NProto::TDqPqTopicSource> topics,
+ const ::google::protobuf::RepeatedPtrField<Fq::Private::TopicConsumer>& topicConsumers,
TVector<std::shared_ptr<NYdb::ICredentialsProviderFactory>> credentials
)
{
@@ -303,7 +304,7 @@ NActors::IActor* MakeReadRuleCreatorActor(
owner,
std::move(queryId),
std::move(ydbDriver),
- std::move(topics),
+ topicConsumers,
std::move(credentials)
);
}
diff --git a/ydb/core/yq/libs/read_rule/read_rule_creator.h b/ydb/core/yq/libs/read_rule/read_rule_creator.h
index 66f0f7c2c32..265016a397b 100644
--- a/ydb/core/yq/libs/read_rule/read_rule_creator.h
+++ b/ydb/core/yq/libs/read_rule/read_rule_creator.h
@@ -1,21 +1,17 @@
#pragma once
-#include <ydb/library/yql/providers/pq/proto/dq_io.pb.h>
+#include <ydb/core/yq/libs/protos/fq_private.pb.h>
#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
#include <library/cpp/actors/core/actor.h>
-#include <util/generic/maybe.h>
-
-#include <google/protobuf/any.pb.h>
-
namespace NYq {
NActors::IActor* MakeReadRuleCreatorActor(
NActors::TActorId owner,
TString queryId,
NYdb::TDriver ydbDriver,
- TVector<NYql::NPq::NProto::TDqPqTopicSource> topics,
+ const ::google::protobuf::RepeatedPtrField<Fq::Private::TopicConsumer>& topicConsumers,
TVector<std::shared_ptr<NYdb::ICredentialsProviderFactory>> credentials // For each topic
);
diff --git a/ydb/core/yq/libs/read_rule/read_rule_deleter.cpp b/ydb/core/yq/libs/read_rule/read_rule_deleter.cpp
index 3db6600584a..2eed047f740 100644
--- a/ydb/core/yq/libs/read_rule/read_rule_deleter.cpp
+++ b/ydb/core/yq/libs/read_rule/read_rule_deleter.cpp
@@ -1,5 +1,6 @@
#include "read_rule_deleter.h"
+#include <ydb/core/yq/libs/common/util.h>
#include <ydb/core/yq/libs/events/events.h>
#include <ydb/core/protos/services.pb.h>
@@ -183,14 +184,14 @@ public:
NActors::TActorId owner,
TString queryId,
NYdb::TDriver ydbDriver,
- TVector<Fq::Private::TopicConsumer> topics,
+ const ::google::protobuf::RepeatedPtrField<Fq::Private::TopicConsumer>& topicConsumers,
TVector<std::shared_ptr<NYdb::ICredentialsProviderFactory>> credentials,
size_t maxRetries
)
: Owner(owner)
, QueryId(std::move(queryId))
, YdbDriver(std::move(ydbDriver))
- , Topics(std::move(topics))
+ , Topics(VectorFromProto(topicConsumers))
, Credentials(std::move(credentials))
, MaxRetries(maxRetries)
{
@@ -271,7 +272,7 @@ NActors::IActor* MakeReadRuleDeleterActor(
NActors::TActorId owner,
TString queryId,
NYdb::TDriver ydbDriver,
- TVector<Fq::Private::TopicConsumer> topics,
+ const ::google::protobuf::RepeatedPtrField<Fq::Private::TopicConsumer>& topicConsumers,
TVector<std::shared_ptr<NYdb::ICredentialsProviderFactory>> credentials, // For each topic
size_t maxRetries
)
@@ -280,7 +281,7 @@ NActors::IActor* MakeReadRuleDeleterActor(
owner,
std::move(queryId),
std::move(ydbDriver),
- std::move(topics),
+ topicConsumers,
std::move(credentials),
maxRetries
);
diff --git a/ydb/core/yq/libs/read_rule/read_rule_deleter.h b/ydb/core/yq/libs/read_rule/read_rule_deleter.h
index 69a2f256802..07bd70787b8 100644
--- a/ydb/core/yq/libs/read_rule/read_rule_deleter.h
+++ b/ydb/core/yq/libs/read_rule/read_rule_deleter.h
@@ -11,7 +11,7 @@ NActors::IActor* MakeReadRuleDeleterActor(
NActors::TActorId owner,
TString queryId,
NYdb::TDriver ydbDriver,
- TVector<Fq::Private::TopicConsumer> topics,
+ const ::google::protobuf::RepeatedPtrField<Fq::Private::TopicConsumer>& topicConsumers,
TVector<std::shared_ptr<NYdb::ICredentialsProviderFactory>> credentials, // For each topic
size_t maxRetries = 15
);