diff options
author | udovichenko-r <udovichenko-r@yandex-team.com> | 2025-03-01 12:07:39 +0300 |
---|---|---|
committer | udovichenko-r <udovichenko-r@yandex-team.com> | 2025-03-01 12:24:52 +0300 |
commit | 1f9a911f03e56f20955cf4e615945bf43631224d (patch) | |
tree | 9b16a63da0efec0344ccc7e6884323095c900b49 | |
parent | 767a07ad28640a59ce939e2c780bb8e7933558c3 (diff) | |
download | ydb-1f9a911f03e56f20955cf4e615945bf43631224d.tar.gz |
Revert code with races in metadata loading
commit_hash:b1c25fd6fb86530c9b46b7e8f57e14df530e160e
-rw-r--r-- | yt/yql/providers/yt/gateway/native/yql_yt_native.cpp | 99 |
1 files changed, 49 insertions, 50 deletions
diff --git a/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp b/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp index 498a2adcca..a0ab23f0bf 100644 --- a/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp +++ b/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp @@ -67,6 +67,7 @@ #include <util/system/execpath.h> #include <util/system/guard.h> #include <util/system/shellcommand.h> +#include <util/system/mutex.h> #include <util/ysaveload.h> #include <algorithm> @@ -2811,76 +2812,73 @@ private: TTableInfoResult& result) { TVector<NYT::TNode> attributes(tables.size()); - TVector<TMaybe<NYT::TNode>> linkAttributes(tables.size()); + NSorted::TSimpleMap<size_t, TString> requestSchemasIdxs; { + TMutex lock; auto batchGet = tx->CreateBatchRequest(); TVector<TFuture<void>> batchRes(Reserve(idxs.size())); for (auto& idx: idxs) { - batchRes.push_back(batchGet->Get(tables[idx.first].Table() + "&/@").Apply( - [&attributes, &linkAttributes, idx] (const TFuture<NYT::TNode>& res) { - try { - NYT::TNode attrs = res.GetValue(); - auto type = GetTypeFromAttributes(attrs, false); - if (type == "link") { - linkAttributes[idx.first] = attrs; - } else { - attributes[idx.first] = attrs; + batchRes.push_back(batchGet->Get(idx.second + "/@").Apply([&attributes, &requestSchemasIdxs, &lock, idx] (const TFuture<NYT::TNode>& res) { + attributes[idx.first] = res.GetValue(); + if (attributes[idx.first].HasKey("schema") && attributes[idx.first]["schema"].IsEntity()) { + with_lock (lock) { + requestSchemasIdxs.push_back(idx); } - } catch (const TErrorResponse& e) { - // Yt returns NoSuchTransaction as inner issue for ResolveError - if (!e.IsResolveError() || e.IsNoSuchTransaction()) { - throw; - } - // Just ignore. Original table path may be deleted at this time } })); } batchGet->ExecuteBatch(); WaitExceptionOrAll(batchRes).GetValue(); } - + if (!requestSchemasIdxs.empty()) { + YQL_CLOG(INFO, ProviderYt) << "Additional request of @schema for " << requestSchemasIdxs.size() << " table(s)"; + auto batchGet = tx->CreateBatchRequest(); + TVector<TFuture<void>> batchRes(Reserve(requestSchemasIdxs.size())); + auto getOpts = TGetOptions() + .AttributeFilter(TAttributeFilter() + .AddAttribute("schema") + ); + for (auto& idx: requestSchemasIdxs) { + batchRes.push_back(batchGet->Get(idx.second + "/@", getOpts).Apply([&attributes, idx] (const TFuture<NYT::TNode>& res) { + attributes[idx.first]["schema"] = res.GetValue().At("schema"); + })); + } + batchGet->ExecuteBatch(); + WaitExceptionOrAll(batchRes).GetValue(); + } { - auto schemaAttrFilter = TAttributeFilter().AddAttribute("schema"); - TVector<NYT::TNode> schemas(tables.size()); - auto batchGet = tx->CreateBatchRequest(); TVector<TFuture<void>> batchRes; - for (auto& idx : idxs) { - batchRes.push_back(batchGet->Get(tables[idx.first].Table() + "/@", TGetOptions().AttributeFilter(schemaAttrFilter)) - .Apply([idx, &schemas] (const TFuture<NYT::TNode>& res) { - try { - schemas[idx.first] = res.GetValue(); - } catch (const TErrorResponse& e) { - // Yt returns NoSuchTransaction as inner issue for ResolveError - if (!e.IsResolveError() || e.IsNoSuchTransaction()) { - throw; - } - // Just ignore. Original table path may be deleted at this time - } - })); - - if (linkAttributes[idx.first]) { - const auto& linkAttr = *linkAttributes[idx.first]; - batchRes.push_back(batchGet->Get(idx.second + "/@").Apply( - [idx, &linkAttr, &attributes](const TFuture<NYT::TNode>& f) { + auto getOpts = TGetOptions() + .AttributeFilter(TAttributeFilter() + .AddAttribute("type") + .AddAttribute(TString{QB2Premapper}) + .AddAttribute(TString{YqlRowSpecAttribute}) + ); + for (auto& idx: idxs) { + batchRes.push_back(batchGet->Get(tables[idx.first].Table() + "&/@", getOpts).Apply([idx, &attributes](const TFuture<NYT::TNode>& f) { + try { NYT::TNode attrs = f.GetValue(); - attributes[idx.first] = attrs; - // override some attributes by the link ones - if (linkAttr.HasKey(QB2Premapper)) { - attributes[idx.first][QB2Premapper] = linkAttr[QB2Premapper]; + if (GetTypeFromAttributes(attrs, false) == "link") { + // override some attributes by the link ones + if (attrs.HasKey(QB2Premapper)) { + attributes[idx.first][QB2Premapper] = attrs[QB2Premapper]; + } + if (attrs.HasKey(YqlRowSpecAttribute)) { + attributes[idx.first][YqlRowSpecAttribute] = attrs[YqlRowSpecAttribute]; + } } - if (linkAttr.HasKey(YqlRowSpecAttribute)) { - attributes[idx.first][YqlRowSpecAttribute] = linkAttr[YqlRowSpecAttribute]; + } catch (const TErrorResponse& e) { + // Yt returns NoSuchTransaction as inner issue for ResolveError + if (!e.IsResolveError() || e.IsNoSuchTransaction()) { + throw; } - })); - } + // Just ignore. Original table path may be deleted at this time + } + })); } batchGet->ExecuteBatch(); WaitExceptionOrAll(batchRes).GetValue(); - - for (auto& idx: idxs) { - attributes[idx.first]["schema"] = schemas[idx.first]["schema"]; - } } auto batchGet = tx->CreateBatchRequest(); @@ -2912,6 +2910,7 @@ private: } statInfo->Id = attrs["id"].AsString(); + YQL_ENSURE(statInfo->Id == TStringBuf(idx.second).Skip(1)); statInfo->TableRevision = attrs["revision"].IntCast<ui64>(); statInfo->Revision = GetContentRevision(attrs); |