aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormpereskokova <mpereskokova@yandex-team.com>2024-12-27 12:17:45 +0300
committermpereskokova <mpereskokova@yandex-team.com>2024-12-27 12:31:40 +0300
commit27df3d52304f343a3e9e87aa36b014a7a442f94e (patch)
treed0d8609d34d55ebc7ad2720ab08141db9998eaa6
parentfb93258ef12aeecc0d83dd46886060dc5beedc31 (diff)
downloadydb-27df3d52304f343a3e9e87aa36b014a7a442f94e.tar.gz
Prepare for opaque schema
commit_hash:9cb8a56fbb479ddfc16c538f8acf2f710bed3dfb
-rw-r--r--yt/yql/providers/yt/gateway/native/yql_yt_native.cpp57
1 files changed, 38 insertions, 19 deletions
diff --git a/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp b/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp
index 4152ae1e0e..3bfce1952a 100644
--- a/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp
+++ b/yt/yql/providers/yt/gateway/native/yql_yt_native.cpp
@@ -2578,19 +2578,17 @@ private:
{
TVector<NYT::TNode> attributes(tables.size());
TVector<TMaybe<NYT::TNode>> linkAttributes(tables.size());
- std::atomic<bool> linksPresent = false;
{
auto batchGet = tx->CreateBatchRequest();
TVector<TFuture<void>> batchRes(Reserve(idxs.size()));
for (auto& idx: idxs) {
batchRes.push_back(batchGet->Get(tables[idx.first].Table() + "&/@").Apply(
- [&attributes, &linkAttributes, &linksPresent, idx] (const TFuture<NYT::TNode>& res) {
+ [&attributes, &linkAttributes, idx] (const TFuture<NYT::TNode>& res) {
try {
NYT::TNode attrs = res.GetValue();
auto type = GetTypeFromAttributes(attrs, false);
if (type == "link") {
linkAttributes[idx.first] = attrs;
- linksPresent.store(true);
} else {
attributes[idx.first] = attrs;
}
@@ -2606,30 +2604,51 @@ private:
batchGet->ExecuteBatch();
WaitExceptionOrAll(batchRes).GetValue();
}
- if (linksPresent.load()) {
+
+ {
+ auto schemaAttrFilter = TAttributeFilter().AddAttribute("schema");
+ TVector<NYT::TNode> schemas(tables.size());
+
auto batchGet = tx->CreateBatchRequest();
TVector<TFuture<void>> batchRes;
for (auto& idx : idxs) {
- if (!linkAttributes[idx.first]) {
- continue;
+ batchRes.push_back(batchGet->Get(tables[idx.first].Table() + "/@", TGetOptions().AttributeFilter(schemaAttrFilter))
+ .Apply([idx, &schemas] (const TFuture<NYT::TNode>& res) {
+ try {
+ schemas[idx.first] = res.GetValue();
+ } catch (const TErrorResponse& e) {
+ // Yt returns NoSuchTransaction as inner issue for ResolveError
+ if (!e.IsResolveError() || e.IsNoSuchTransaction()) {
+ throw;
+ }
+ // Just ignore. Original table path may be deleted at this time
+ }
+ }));
+
+ if (linkAttributes[idx.first]) {
+ const auto& linkAttr = *linkAttributes[idx.first];
+ batchRes.push_back(batchGet->Get(idx.second + "/@").Apply(
+ [idx, &linkAttr, &attributes](const TFuture<NYT::TNode>& f) {
+ NYT::TNode attrs = f.GetValue();
+ attributes[idx.first] = attrs;
+ // override some attributes by the link ones
+ if (linkAttr.HasKey(QB2Premapper)) {
+ attributes[idx.first][QB2Premapper] = linkAttr[QB2Premapper];
+ }
+ if (linkAttr.HasKey(YqlRowSpecAttribute)) {
+ attributes[idx.first][YqlRowSpecAttribute] = linkAttr[YqlRowSpecAttribute];
+ }
+ }));
}
- const auto& linkAttr = *linkAttributes[idx.first];
- batchRes.push_back(batchGet->Get(idx.second + "/@").Apply(
- [idx, &linkAttr, &attributes](const TFuture<NYT::TNode>& f) {
- NYT::TNode attrs = f.GetValue();
- attributes[idx.first] = attrs;
- // override some attributes by the link ones
- if (linkAttr.HasKey(QB2Premapper)) {
- attributes[idx.first][QB2Premapper] = linkAttr[QB2Premapper];
- }
- if (linkAttr.HasKey(YqlRowSpecAttribute)) {
- attributes[idx.first][YqlRowSpecAttribute] = linkAttr[YqlRowSpecAttribute];
- }
- }));
}
batchGet->ExecuteBatch();
WaitExceptionOrAll(batchRes).GetValue();
+
+ for (auto& idx: idxs) {
+ attributes[idx.first]["schema"] = schemas[idx.first]["schema"];
+ }
}
+
auto batchGet = tx->CreateBatchRequest();
TVector<TFuture<void>> batchRes;