diff options
author | bbiff <bbiff@yandex-team.com> | 2022-09-14 19:54:09 +0300 |
---|---|---|
committer | bbiff <bbiff@yandex-team.com> | 2022-09-14 19:54:09 +0300 |
commit | 6349b2b5a9e6f101fb161cb341ffdc66bf9f0e0c (patch) | |
tree | e0227eee53fbbbc1b6920757540ff62c40e6e292 | |
parent | 142f320dd0c9cc66e7accf199ae4fc50ef4d353e (diff) | |
download | ydb-6349b2b5a9e6f101fb161cb341ffdc66bf9f0e0c.tar.gz |
bugfix
-rw-r--r-- | ydb/library/yql/providers/pq/provider/yql_pq_load_meta.cpp | 45 |
1 files changed, 28 insertions, 17 deletions
diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_load_meta.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_load_meta.cpp index 35d293d6f7..827a4e2fd4 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_load_meta.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_load_meta.cpp @@ -18,12 +18,14 @@ using namespace NNodes; namespace { class TPqLoadTopicMetadataTransformer : public TGraphTransformerBase { +private: + using TTopics = THashMap<std::pair<TString, TString>, TPqState::TTopicMeta>; public: explicit TPqLoadTopicMetadataTransformer(TPqState::TPtr state) : State_(std::move(state)) {} - void AddToPendingTopics(const TString& cluster, const TString& topicPath, TPositionHandle pos, TExprNode::TPtr rowSpec, TExprNode::TPtr columnOrder) { + void AddToPendingTopics(const TString& cluster, const TString& topicPath, TPositionHandle pos, TExprNode::TPtr rowSpec, TExprNode::TPtr columnOrder, TTopics& pendingTopics) { const auto topicKey = std::make_pair(cluster, topicPath); const auto found = State_->Topics.FindPtr(topicKey); if (found) { @@ -35,7 +37,7 @@ public: m.Pos = pos; m.RowSpec = rowSpec; m.ColumnOrder = columnOrder; - PendingTopics_.emplace(topicKey, m); + pendingTopics.emplace(topicKey, m); } TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final { @@ -53,31 +55,23 @@ public: } TTopicKeyParser topicParser(read.Arg(2).Ref(), read.Ref().Child(4), ctx); - AddToPendingTopics(read.DataSource().Cluster().StringValue(), topicParser.GetTopicPath(), node->Pos(), topicParser.GetUserSchema(), topicParser.GetColumnOrder()); + AddToPendingTopics(read.DataSource().Cluster().StringValue(), topicParser.GetTopicPath(), node->Pos(), topicParser.GetUserSchema(), topicParser.GetColumnOrder(), PendingReadTopics_); } else if (auto maybePqWrite = TMaybeNode<TPqWrite>(node)) { TPqWrite write = maybePqWrite.Cast(); if (write.DataSink().Category().Value() == PqProviderName) { TTopicKeyParser topicParser(write.Arg(2).Ref(), nullptr, ctx); - AddToPendingTopics(write.DataSink().Cluster().StringValue(), topicParser.GetTopicPath(), node->Pos(), {}, {}); + AddToPendingTopics(write.DataSink().Cluster().StringValue(), topicParser.GetTopicPath(), node->Pos(), {}, {}, PendingWriteTopics_); } } return true; }); - for (auto& [x, meta] : PendingTopics_) { - auto itemType = LoadTopicMeta(x.first, x.second, ctx, meta); - if (!itemType) { - return TStatus::Error; - } - - if (!meta.RowSpec) { - meta.RowSpec = ExpandType(meta.Pos, *itemType, ctx); - } - State_->Topics.emplace(x, meta); + TStatus status = FillState(PendingReadTopics_, ctx); + if (status != TStatus::Ok) { + return status; } - PendingTopics_.clear(); - return TStatus::Ok; + return FillState(PendingWriteTopics_, ctx); } NThreading::TFuture<void> DoGetAsyncFuture(const TExprNode& input) final { @@ -110,6 +104,22 @@ private: return ctx.MakeType<TStructExprType>(items); } + TStatus FillState(TTopics& pendingTopics, TExprContext& ctx) { + for (auto& [x, meta] : pendingTopics) { + auto itemType = LoadTopicMeta(x.first, x.second, ctx, meta); + if (!itemType) { + return TStatus::Error; + } + + if (!meta.RowSpec) { + meta.RowSpec = ExpandType(meta.Pos, *itemType, ctx); + } + State_->Topics.emplace(x, meta); + } + pendingTopics.clear(); + return TStatus::Ok; + } + const TStructExprType* LoadTopicMeta(const TString& cluster, const TString& topic, TExprContext& ctx, TPqState::TTopicMeta& meta) { // todo: return TFuture try { @@ -132,7 +142,8 @@ private: private: TPqState::TPtr State_; // (cluster, topic) -> meta - THashMap<std::pair<TString, TString>, TPqState::TTopicMeta> PendingTopics_; + TTopics PendingWriteTopics_; + TTopics PendingReadTopics_; NThreading::TFuture<void> AsyncFuture_; }; |