diff options
author | mpereskokova <mpereskokova@yandex-team.com> | 2024-12-27 12:17:45 +0300 |
---|---|---|
committer | mpereskokova <mpereskokova@yandex-team.com> | 2024-12-27 12:31:40 +0300 |
commit | 27df3d52304f343a3e9e87aa36b014a7a442f94e (patch) | |
tree | d0d8609d34d55ebc7ad2720ab08141db9998eaa6 | |
parent | fb93258ef12aeecc0d83dd46886060dc5beedc31 (diff) | |
download | ydb-27df3d52304f343a3e9e87aa36b014a7a442f94e.tar.gz |
Prepare for opaque schema
commit_hash:9cb8a56fbb479ddfc16c538f8acf2f710bed3dfb
-rw-r--r-- | yt/yql/providers/yt/gateway/native/yql_yt_native.cpp | 57 |
1 files changed, 38 insertions, 19 deletions
diff --git a/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp b/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp index 4152ae1e0e..3bfce1952a 100644 --- a/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp +++ b/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp @@ -2578,19 +2578,17 @@ private: { TVector<NYT::TNode> attributes(tables.size()); TVector<TMaybe<NYT::TNode>> linkAttributes(tables.size()); - std::atomic<bool> linksPresent = false; { auto batchGet = tx->CreateBatchRequest(); TVector<TFuture<void>> batchRes(Reserve(idxs.size())); for (auto& idx: idxs) { batchRes.push_back(batchGet->Get(tables[idx.first].Table() + "&/@").Apply( - [&attributes, &linkAttributes, &linksPresent, idx] (const TFuture<NYT::TNode>& res) { + [&attributes, &linkAttributes, idx] (const TFuture<NYT::TNode>& res) { try { NYT::TNode attrs = res.GetValue(); auto type = GetTypeFromAttributes(attrs, false); if (type == "link") { linkAttributes[idx.first] = attrs; - linksPresent.store(true); } else { attributes[idx.first] = attrs; } @@ -2606,30 +2604,51 @@ private: batchGet->ExecuteBatch(); WaitExceptionOrAll(batchRes).GetValue(); } - if (linksPresent.load()) { + + { + auto schemaAttrFilter = TAttributeFilter().AddAttribute("schema"); + TVector<NYT::TNode> schemas(tables.size()); + auto batchGet = tx->CreateBatchRequest(); TVector<TFuture<void>> batchRes; for (auto& idx : idxs) { - if (!linkAttributes[idx.first]) { - continue; + batchRes.push_back(batchGet->Get(tables[idx.first].Table() + "/@", TGetOptions().AttributeFilter(schemaAttrFilter)) + .Apply([idx, &schemas] (const TFuture<NYT::TNode>& res) { + try { + schemas[idx.first] = res.GetValue(); + } catch (const TErrorResponse& e) { + // Yt returns NoSuchTransaction as inner issue for ResolveError + if (!e.IsResolveError() || e.IsNoSuchTransaction()) { + throw; + } + // Just ignore. Original table path may be deleted at this time + } + })); + + if (linkAttributes[idx.first]) { + const auto& linkAttr = *linkAttributes[idx.first]; + batchRes.push_back(batchGet->Get(idx.second + "/@").Apply( + [idx, &linkAttr, &attributes](const TFuture<NYT::TNode>& f) { + NYT::TNode attrs = f.GetValue(); + attributes[idx.first] = attrs; + // override some attributes by the link ones + if (linkAttr.HasKey(QB2Premapper)) { + attributes[idx.first][QB2Premapper] = linkAttr[QB2Premapper]; + } + if (linkAttr.HasKey(YqlRowSpecAttribute)) { + attributes[idx.first][YqlRowSpecAttribute] = linkAttr[YqlRowSpecAttribute]; + } + })); } - const auto& linkAttr = *linkAttributes[idx.first]; - batchRes.push_back(batchGet->Get(idx.second + "/@").Apply( - [idx, &linkAttr, &attributes](const TFuture<NYT::TNode>& f) { - NYT::TNode attrs = f.GetValue(); - attributes[idx.first] = attrs; - // override some attributes by the link ones - if (linkAttr.HasKey(QB2Premapper)) { - attributes[idx.first][QB2Premapper] = linkAttr[QB2Premapper]; - } - if (linkAttr.HasKey(YqlRowSpecAttribute)) { - attributes[idx.first][YqlRowSpecAttribute] = linkAttr[YqlRowSpecAttribute]; - } - })); } batchGet->ExecuteBatch(); WaitExceptionOrAll(batchRes).GetValue(); + + for (auto& idx: idxs) { + attributes[idx.first]["schema"] = schemas[idx.first]["schema"]; + } } + auto batchGet = tx->CreateBatchRequest(); TVector<TFuture<void>> batchRes; |