aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorbbiff <bbiff@yandex-team.com>2022-09-14 19:54:09 +0300
committerbbiff <bbiff@yandex-team.com>2022-09-14 19:54:09 +0300
commit6349b2b5a9e6f101fb161cb341ffdc66bf9f0e0c (patch)
treee0227eee53fbbbc1b6920757540ff62c40e6e292
parent142f320dd0c9cc66e7accf199ae4fc50ef4d353e (diff)
downloadydb-6349b2b5a9e6f101fb161cb341ffdc66bf9f0e0c.tar.gz
bugfix
-rw-r--r--ydb/library/yql/providers/pq/provider/yql_pq_load_meta.cpp45
1 files changed, 28 insertions, 17 deletions
diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_load_meta.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_load_meta.cpp
index 35d293d6f7..827a4e2fd4 100644
--- a/ydb/library/yql/providers/pq/provider/yql_pq_load_meta.cpp
+++ b/ydb/library/yql/providers/pq/provider/yql_pq_load_meta.cpp
@@ -18,12 +18,14 @@ using namespace NNodes;
namespace {
class TPqLoadTopicMetadataTransformer : public TGraphTransformerBase {
+private:
+ using TTopics = THashMap<std::pair<TString, TString>, TPqState::TTopicMeta>;
public:
explicit TPqLoadTopicMetadataTransformer(TPqState::TPtr state)
: State_(std::move(state))
{}
- void AddToPendingTopics(const TString& cluster, const TString& topicPath, TPositionHandle pos, TExprNode::TPtr rowSpec, TExprNode::TPtr columnOrder) {
+ void AddToPendingTopics(const TString& cluster, const TString& topicPath, TPositionHandle pos, TExprNode::TPtr rowSpec, TExprNode::TPtr columnOrder, TTopics& pendingTopics) {
const auto topicKey = std::make_pair(cluster, topicPath);
const auto found = State_->Topics.FindPtr(topicKey);
if (found) {
@@ -35,7 +37,7 @@ public:
m.Pos = pos;
m.RowSpec = rowSpec;
m.ColumnOrder = columnOrder;
- PendingTopics_.emplace(topicKey, m);
+ pendingTopics.emplace(topicKey, m);
}
TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
@@ -53,31 +55,23 @@ public:
}
TTopicKeyParser topicParser(read.Arg(2).Ref(), read.Ref().Child(4), ctx);
- AddToPendingTopics(read.DataSource().Cluster().StringValue(), topicParser.GetTopicPath(), node->Pos(), topicParser.GetUserSchema(), topicParser.GetColumnOrder());
+ AddToPendingTopics(read.DataSource().Cluster().StringValue(), topicParser.GetTopicPath(), node->Pos(), topicParser.GetUserSchema(), topicParser.GetColumnOrder(), PendingReadTopics_);
} else if (auto maybePqWrite = TMaybeNode<TPqWrite>(node)) {
TPqWrite write = maybePqWrite.Cast();
if (write.DataSink().Category().Value() == PqProviderName) {
TTopicKeyParser topicParser(write.Arg(2).Ref(), nullptr, ctx);
- AddToPendingTopics(write.DataSink().Cluster().StringValue(), topicParser.GetTopicPath(), node->Pos(), {}, {});
+ AddToPendingTopics(write.DataSink().Cluster().StringValue(), topicParser.GetTopicPath(), node->Pos(), {}, {}, PendingWriteTopics_);
}
}
return true;
});
- for (auto& [x, meta] : PendingTopics_) {
- auto itemType = LoadTopicMeta(x.first, x.second, ctx, meta);
- if (!itemType) {
- return TStatus::Error;
- }
-
- if (!meta.RowSpec) {
- meta.RowSpec = ExpandType(meta.Pos, *itemType, ctx);
- }
- State_->Topics.emplace(x, meta);
+ TStatus status = FillState(PendingReadTopics_, ctx);
+ if (status != TStatus::Ok) {
+ return status;
}
- PendingTopics_.clear();
- return TStatus::Ok;
+ return FillState(PendingWriteTopics_, ctx);
}
NThreading::TFuture<void> DoGetAsyncFuture(const TExprNode& input) final {
@@ -110,6 +104,22 @@ private:
return ctx.MakeType<TStructExprType>(items);
}
+ TStatus FillState(TTopics& pendingTopics, TExprContext& ctx) {
+ for (auto& [x, meta] : pendingTopics) {
+ auto itemType = LoadTopicMeta(x.first, x.second, ctx, meta);
+ if (!itemType) {
+ return TStatus::Error;
+ }
+
+ if (!meta.RowSpec) {
+ meta.RowSpec = ExpandType(meta.Pos, *itemType, ctx);
+ }
+ State_->Topics.emplace(x, meta);
+ }
+ pendingTopics.clear();
+ return TStatus::Ok;
+ }
+
const TStructExprType* LoadTopicMeta(const TString& cluster, const TString& topic, TExprContext& ctx, TPqState::TTopicMeta& meta) {
// todo: return TFuture
try {
@@ -132,7 +142,8 @@ private:
private:
TPqState::TPtr State_;
// (cluster, topic) -> meta
- THashMap<std::pair<TString, TString>, TPqState::TTopicMeta> PendingTopics_;
+ TTopics PendingWriteTopics_;
+ TTopics PendingReadTopics_;
NThreading::TFuture<void> AsyncFuture_;
};