aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorudovichenko-r <udovichenko-r@yandex-team.com>2025-03-01 12:07:39 +0300
committerudovichenko-r <udovichenko-r@yandex-team.com>2025-03-01 12:24:52 +0300
commit1f9a911f03e56f20955cf4e615945bf43631224d (patch)
tree9b16a63da0efec0344ccc7e6884323095c900b49
parent767a07ad28640a59ce939e2c780bb8e7933558c3 (diff)
downloadydb-1f9a911f03e56f20955cf4e615945bf43631224d.tar.gz
Revert code with races in metadata loading
commit_hash:b1c25fd6fb86530c9b46b7e8f57e14df530e160e
-rw-r--r--yt/yql/providers/yt/gateway/native/yql_yt_native.cpp99
1 files changed, 49 insertions, 50 deletions
diff --git a/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp b/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp
index 498a2adcca..a0ab23f0bf 100644
--- a/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp
+++ b/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp
@@ -67,6 +67,7 @@
#include <util/system/execpath.h>
#include <util/system/guard.h>
#include <util/system/shellcommand.h>
+#include <util/system/mutex.h>
#include <util/ysaveload.h>
#include <algorithm>
@@ -2811,76 +2812,73 @@ private:
TTableInfoResult& result)
{
TVector<NYT::TNode> attributes(tables.size());
- TVector<TMaybe<NYT::TNode>> linkAttributes(tables.size());
+ NSorted::TSimpleMap<size_t, TString> requestSchemasIdxs;
{
+ TMutex lock;
auto batchGet = tx->CreateBatchRequest();
TVector<TFuture<void>> batchRes(Reserve(idxs.size()));
for (auto& idx: idxs) {
- batchRes.push_back(batchGet->Get(tables[idx.first].Table() + "&/@").Apply(
- [&attributes, &linkAttributes, idx] (const TFuture<NYT::TNode>& res) {
- try {
- NYT::TNode attrs = res.GetValue();
- auto type = GetTypeFromAttributes(attrs, false);
- if (type == "link") {
- linkAttributes[idx.first] = attrs;
- } else {
- attributes[idx.first] = attrs;
+ batchRes.push_back(batchGet->Get(idx.second + "/@").Apply([&attributes, &requestSchemasIdxs, &lock, idx] (const TFuture<NYT::TNode>& res) {
+ attributes[idx.first] = res.GetValue();
+ if (attributes[idx.first].HasKey("schema") && attributes[idx.first]["schema"].IsEntity()) {
+ with_lock (lock) {
+ requestSchemasIdxs.push_back(idx);
}
- } catch (const TErrorResponse& e) {
- // Yt returns NoSuchTransaction as inner issue for ResolveError
- if (!e.IsResolveError() || e.IsNoSuchTransaction()) {
- throw;
- }
- // Just ignore. Original table path may be deleted at this time
}
}));
}
batchGet->ExecuteBatch();
WaitExceptionOrAll(batchRes).GetValue();
}
-
+ if (!requestSchemasIdxs.empty()) {
+ YQL_CLOG(INFO, ProviderYt) << "Additional request of @schema for " << requestSchemasIdxs.size() << " table(s)";
+ auto batchGet = tx->CreateBatchRequest();
+ TVector<TFuture<void>> batchRes(Reserve(requestSchemasIdxs.size()));
+ auto getOpts = TGetOptions()
+ .AttributeFilter(TAttributeFilter()
+ .AddAttribute("schema")
+ );
+ for (auto& idx: requestSchemasIdxs) {
+ batchRes.push_back(batchGet->Get(idx.second + "/@", getOpts).Apply([&attributes, idx] (const TFuture<NYT::TNode>& res) {
+ attributes[idx.first]["schema"] = res.GetValue().At("schema");
+ }));
+ }
+ batchGet->ExecuteBatch();
+ WaitExceptionOrAll(batchRes).GetValue();
+ }
{
- auto schemaAttrFilter = TAttributeFilter().AddAttribute("schema");
- TVector<NYT::TNode> schemas(tables.size());
-
auto batchGet = tx->CreateBatchRequest();
TVector<TFuture<void>> batchRes;
- for (auto& idx : idxs) {
- batchRes.push_back(batchGet->Get(tables[idx.first].Table() + "/@", TGetOptions().AttributeFilter(schemaAttrFilter))
- .Apply([idx, &schemas] (const TFuture<NYT::TNode>& res) {
- try {
- schemas[idx.first] = res.GetValue();
- } catch (const TErrorResponse& e) {
- // Yt returns NoSuchTransaction as inner issue for ResolveError
- if (!e.IsResolveError() || e.IsNoSuchTransaction()) {
- throw;
- }
- // Just ignore. Original table path may be deleted at this time
- }
- }));
-
- if (linkAttributes[idx.first]) {
- const auto& linkAttr = *linkAttributes[idx.first];
- batchRes.push_back(batchGet->Get(idx.second + "/@").Apply(
- [idx, &linkAttr, &attributes](const TFuture<NYT::TNode>& f) {
+ auto getOpts = TGetOptions()
+ .AttributeFilter(TAttributeFilter()
+ .AddAttribute("type")
+ .AddAttribute(TString{QB2Premapper})
+ .AddAttribute(TString{YqlRowSpecAttribute})
+ );
+ for (auto& idx: idxs) {
+ batchRes.push_back(batchGet->Get(tables[idx.first].Table() + "&/@", getOpts).Apply([idx, &attributes](const TFuture<NYT::TNode>& f) {
+ try {
NYT::TNode attrs = f.GetValue();
- attributes[idx.first] = attrs;
- // override some attributes by the link ones
- if (linkAttr.HasKey(QB2Premapper)) {
- attributes[idx.first][QB2Premapper] = linkAttr[QB2Premapper];
+ if (GetTypeFromAttributes(attrs, false) == "link") {
+ // override some attributes by the link ones
+ if (attrs.HasKey(QB2Premapper)) {
+ attributes[idx.first][QB2Premapper] = attrs[QB2Premapper];
+ }
+ if (attrs.HasKey(YqlRowSpecAttribute)) {
+ attributes[idx.first][YqlRowSpecAttribute] = attrs[YqlRowSpecAttribute];
+ }
}
- if (linkAttr.HasKey(YqlRowSpecAttribute)) {
- attributes[idx.first][YqlRowSpecAttribute] = linkAttr[YqlRowSpecAttribute];
+ } catch (const TErrorResponse& e) {
+ // Yt returns NoSuchTransaction as inner issue for ResolveError
+ if (!e.IsResolveError() || e.IsNoSuchTransaction()) {
+ throw;
}
- }));
- }
+ // Just ignore. Original table path may be deleted at this time
+ }
+ }));
}
batchGet->ExecuteBatch();
WaitExceptionOrAll(batchRes).GetValue();
-
- for (auto& idx: idxs) {
- attributes[idx.first]["schema"] = schemas[idx.first]["schema"];
- }
}
auto batchGet = tx->CreateBatchRequest();
@@ -2912,6 +2910,7 @@ private:
}
statInfo->Id = attrs["id"].AsString();
+ YQL_ENSURE(statInfo->Id == TStringBuf(idx.second).Skip(1));
statInfo->TableRevision = attrs["revision"].IntCast<ui64>();
statInfo->Revision = GetContentRevision(attrs);