diff options
author | Vitaly Stoyan <vvvv@ydb.tech> | 2024-10-02 21:08:27 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-10-02 18:08:27 +0000 |
commit | 2b39cf81ef772aea7cf7594ab27c2fd602dfa01e (patch) | |
tree | 1608d32bdb7ab546cd670addf245252b79153227 | |
parent | 2d80271a267e91aedfb3a4c543669f957748ed9e (diff) | |
download | ydb-2b39cf81ef772aea7cf7594ab27c2fd602dfa01e.tar.gz |
Replay PathStat in yt gateway & other fixes (#9996)
3 files changed, 123 insertions, 7 deletions
diff --git a/ydb/library/yql/core/qplayer/udf_resolver/yql_qplayer_udf_resolver.cpp b/ydb/library/yql/core/qplayer/udf_resolver/yql_qplayer_udf_resolver.cpp index cccdec5781..4743833544 100644 --- a/ydb/library/yql/core/qplayer/udf_resolver/yql_qplayer_udf_resolver.cpp +++ b/ydb/library/yql/core/qplayer/udf_resolver/yql_qplayer_udf_resolver.cpp @@ -30,10 +30,10 @@ public: TMaybe<TFilePathWithMd5> GetSystemModulePath(const TStringBuf& moduleName) const final { if (QContext_.CanRead()) { - ythrow yexception() << "can't replay GetSystemModulePath"; + return MakeMaybe<TFilePathWithMd5>("", ""); } - return Inner_-> GetSystemModulePath(moduleName); + return Inner_->GetSystemModulePath(moduleName); } bool LoadMetadata(const TVector<TImport*>& imports, diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_validate.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_validate.cpp index 3ea6273816..9408482053 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_validate.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_validate.cpp @@ -185,7 +185,7 @@ public: } } - if (!hasError && HasMapJoin_ && !TypeCtx_.ForceDq && !TypeCtx_.QContext.CanRead()) { + if (!hasError && HasMapJoin_ && !TypeCtx_.ForceDq) { size_t dataSize = 0; for (auto& [integration, nodes]: ReadsPerProvider_) { TMaybe<ui64> size; diff --git a/ydb/library/yql/providers/yt/gateway/qplayer/yql_yt_qplayer_gateway.cpp b/ydb/library/yql/providers/yt/gateway/qplayer/yql_yt_qplayer_gateway.cpp index 915c17a456..98ded5f032 100644 --- a/ydb/library/yql/providers/yt/gateway/qplayer/yql_yt_qplayer_gateway.cpp +++ b/ydb/library/yql/providers/yt/gateway/qplayer/yql_yt_qplayer_gateway.cpp @@ -24,6 +24,7 @@ const TString YtGateway_GetTableRange = "YtGateway_GetTableRange"; const TString YtGateway_GetFolder = "YtGateway_GetFolder"; const TString YtGateway_GetFolders = "YtGateway_GetFolders"; const TString YtGateway_ResolveLinks = "YtGateway_ResolveLinks"; +const TString YtGateway_PathStat = "YtGateway_PathStat"; TString MakeHash(const TString& str) { SHA256_CTX sha; @@ -98,7 +99,7 @@ public: if (valueNode.HasKey("Ranges")) { p.Ranges.ConstructInPlace(); - for (const auto& r : valueNode["Ranges"].AsString()) { + for (const auto& r : valueNode["Ranges"].AsList()) { NYT::TReadRange range; NYT::Deserialize(range, r); p.Ranges->push_back(range); @@ -697,17 +698,132 @@ public: return Inner_->DropTrackables(std::move(options)); } + static TString MakePathStatKey(const TString& cluster, bool extended, const TPathStatReq& req) { + auto node = NYT::TNode() + ("Cluster", cluster) + ("Extended", extended); + + NYT::TNode pathNode; + NYT::TNodeBuilder builder(&pathNode); + NYT::Serialize(req.Path(), &builder); + auto path = NYT::TNode() + ("Path", pathNode) + ("IsTemp", req.IsTemp()) + ("IsAnonymous", req.IsAnonymous()) + ("Epoch", req.Epoch()); + + node("Path", path); + return MakeHash(NYT::NodeToCanonicalYsonString(node)); + } + + static TString SerializePathStat(const TPathStatResult& stat, ui32 index) { + Y_ENSURE(index < stat.DataSize.size()); + Y_ENSURE(index < stat.Extended.size()); + auto xNode = NYT::TNode(); + if (!stat.Extended[index].Defined()) { + xNode = NYT::TNode::CreateEntity(); + } else { + auto dataWeightMap = NYT::TNode::CreateMap(); + for (const auto& d : stat.Extended[index]->DataWeight) { + dataWeightMap(d.first, d.second); + } + + auto uniqCountsMap = NYT::TNode::CreateMap(); + for (const auto& e : stat.Extended[index]->EstimatedUniqueCounts) { + uniqCountsMap(e.first, e.second); + } + + xNode = NYT::TNode() + ("DataWeight", dataWeightMap) + ("EstimatedUniqueCounts", uniqCountsMap); + } + + auto node = NYT::TNode() + ("DataSize", stat.DataSize[index]) + ("Extended", xNode); + + return NYT::NodeToYsonString(node, NYT::NYson::EYsonFormat::Binary); + } + + static void DeserializePathStat(const NYT::TNode& node, TPathStatResult& stat, ui32 index) { + Y_ENSURE(index < stat.DataSize.size()); + Y_ENSURE(index < stat.Extended.size()); + stat.DataSize[index] = node["DataSize"].AsUint64(); + stat.Extended[index] = Nothing(); + const auto& x = node["Extended"]; + if (!x.IsEntity()) { + auto& xValue = stat.Extended[index]; + xValue.ConstructInPlace(); + for (const auto& d : x["DataWeight"].AsMap()) { + xValue->DataWeight[d.first] = d.second.AsInt64(); + } + + for (const auto& e : x["EstimatedUniqueCounts"].AsMap()) { + xValue->EstimatedUniqueCounts[e.first] = e.second.AsUint64(); + } + } + } + NThreading::TFuture<TPathStatResult> PathStat(TPathStatOptions&& options) final { if (QContext_.CanRead()) { - throw yexception() << "Can't replay PathStat"; + TPathStatResult res; + res.DataSize.resize(options.Paths().size(), 0); + res.Extended.resize(options.Paths().size()); + + for (ui32 index = 0; index < options.Paths().size(); ++index) { + const auto& key = MakePathStatKey(options.Cluster(), options.Extended(), options.Paths()[index]); + auto item = QContext_.GetReader()->Get({YtGateway_PathStat, key}).GetValueSync(); + if (!item) { + throw yexception() << "Missing replay data"; + } + + auto valueNode = NYT::NodeFromYsonString(TStringBuf(item->Value)); + DeserializePathStat(valueNode, res, index); + } + + res.SetSuccess(); + return NThreading::MakeFuture<TPathStatResult>(res); } - return Inner_->PathStat(std::move(options)); + auto optionsDup = options; + return Inner_->PathStat(std::move(options)) + .Subscribe([optionsDup, qContext = QContext_](const NThreading::TFuture<TPathStatResult>& future) { + if (!qContext.CanWrite() || future.HasException()) { + return; + } + + const auto& res = future.GetValueSync(); + if (!res.Success()) { + return; + } + + for (ui32 index = 0; index < optionsDup.Paths().size(); ++index) { + const auto& key = MakePathStatKey(optionsDup.Cluster(), optionsDup.Extended(), optionsDup.Paths()[index]); + auto value = SerializePathStat(res, index); + qContext.GetWriter()->Put({YtGateway_PathStat, key}, value).GetValueSync(); + } + }); } TPathStatResult TryPathStat(TPathStatOptions&& options) final { if (QContext_.CanRead()) { - throw yexception() << "Can't replay TryPathStat"; + TPathStatResult res; + res.DataSize.resize(options.Paths().size(), 0); + res.Extended.resize(options.Paths().size()); + + for (ui32 index = 0; index < options.Paths().size(); ++index) { + const auto& key = MakePathStatKey(options.Cluster(), options.Extended(), options.Paths()[index]); + auto item = QContext_.GetReader()->Get({YtGateway_PathStat, key}).GetValueSync(); + if (!item) { + return res; + } + + auto valueNode = NYT::NodeFromYsonString(TStringBuf(item->Value)); + DeserializePathStat(valueNode, res, index); + } + + res.SetSuccess(); + return res; } return Inner_->TryPathStat(std::move(options)); |