aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVitaly Stoyan <vvvv@ydb.tech>2024-10-02 21:08:27 +0300
committerGitHub <noreply@github.com>2024-10-02 18:08:27 +0000
commit2b39cf81ef772aea7cf7594ab27c2fd602dfa01e (patch)
tree1608d32bdb7ab546cd670addf245252b79153227
parent2d80271a267e91aedfb3a4c543669f957748ed9e (diff)
downloadydb-2b39cf81ef772aea7cf7594ab27c2fd602dfa01e.tar.gz
Replay PathStat in yt gateway & other fixes (#9996)
-rw-r--r--ydb/library/yql/core/qplayer/udf_resolver/yql_qplayer_udf_resolver.cpp4
-rw-r--r--ydb/library/yql/providers/dq/provider/yql_dq_validate.cpp2
-rw-r--r--ydb/library/yql/providers/yt/gateway/qplayer/yql_yt_qplayer_gateway.cpp124
3 files changed, 123 insertions, 7 deletions
diff --git a/ydb/library/yql/core/qplayer/udf_resolver/yql_qplayer_udf_resolver.cpp b/ydb/library/yql/core/qplayer/udf_resolver/yql_qplayer_udf_resolver.cpp
index cccdec5781..4743833544 100644
--- a/ydb/library/yql/core/qplayer/udf_resolver/yql_qplayer_udf_resolver.cpp
+++ b/ydb/library/yql/core/qplayer/udf_resolver/yql_qplayer_udf_resolver.cpp
@@ -30,10 +30,10 @@ public:
TMaybe<TFilePathWithMd5> GetSystemModulePath(const TStringBuf& moduleName) const final {
if (QContext_.CanRead()) {
- ythrow yexception() << "can't replay GetSystemModulePath";
+ return MakeMaybe<TFilePathWithMd5>("", "");
}
- return Inner_-> GetSystemModulePath(moduleName);
+ return Inner_->GetSystemModulePath(moduleName);
}
bool LoadMetadata(const TVector<TImport*>& imports,
diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_validate.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_validate.cpp
index 3ea6273816..9408482053 100644
--- a/ydb/library/yql/providers/dq/provider/yql_dq_validate.cpp
+++ b/ydb/library/yql/providers/dq/provider/yql_dq_validate.cpp
@@ -185,7 +185,7 @@ public:
}
}
- if (!hasError && HasMapJoin_ && !TypeCtx_.ForceDq && !TypeCtx_.QContext.CanRead()) {
+ if (!hasError && HasMapJoin_ && !TypeCtx_.ForceDq) {
size_t dataSize = 0;
for (auto& [integration, nodes]: ReadsPerProvider_) {
TMaybe<ui64> size;
diff --git a/ydb/library/yql/providers/yt/gateway/qplayer/yql_yt_qplayer_gateway.cpp b/ydb/library/yql/providers/yt/gateway/qplayer/yql_yt_qplayer_gateway.cpp
index 915c17a456..98ded5f032 100644
--- a/ydb/library/yql/providers/yt/gateway/qplayer/yql_yt_qplayer_gateway.cpp
+++ b/ydb/library/yql/providers/yt/gateway/qplayer/yql_yt_qplayer_gateway.cpp
@@ -24,6 +24,7 @@ const TString YtGateway_GetTableRange = "YtGateway_GetTableRange";
const TString YtGateway_GetFolder = "YtGateway_GetFolder";
const TString YtGateway_GetFolders = "YtGateway_GetFolders";
const TString YtGateway_ResolveLinks = "YtGateway_ResolveLinks";
+const TString YtGateway_PathStat = "YtGateway_PathStat";
TString MakeHash(const TString& str) {
SHA256_CTX sha;
@@ -98,7 +99,7 @@ public:
if (valueNode.HasKey("Ranges")) {
p.Ranges.ConstructInPlace();
- for (const auto& r : valueNode["Ranges"].AsString()) {
+ for (const auto& r : valueNode["Ranges"].AsList()) {
NYT::TReadRange range;
NYT::Deserialize(range, r);
p.Ranges->push_back(range);
@@ -697,17 +698,132 @@ public:
return Inner_->DropTrackables(std::move(options));
}
+ static TString MakePathStatKey(const TString& cluster, bool extended, const TPathStatReq& req) {
+ auto node = NYT::TNode()
+ ("Cluster", cluster)
+ ("Extended", extended);
+
+ NYT::TNode pathNode;
+ NYT::TNodeBuilder builder(&pathNode);
+ NYT::Serialize(req.Path(), &builder);
+ auto path = NYT::TNode()
+ ("Path", pathNode)
+ ("IsTemp", req.IsTemp())
+ ("IsAnonymous", req.IsAnonymous())
+ ("Epoch", req.Epoch());
+
+ node("Path", path);
+ return MakeHash(NYT::NodeToCanonicalYsonString(node));
+ }
+
+ static TString SerializePathStat(const TPathStatResult& stat, ui32 index) {
+ Y_ENSURE(index < stat.DataSize.size());
+ Y_ENSURE(index < stat.Extended.size());
+ auto xNode = NYT::TNode();
+ if (!stat.Extended[index].Defined()) {
+ xNode = NYT::TNode::CreateEntity();
+ } else {
+ auto dataWeightMap = NYT::TNode::CreateMap();
+ for (const auto& d : stat.Extended[index]->DataWeight) {
+ dataWeightMap(d.first, d.second);
+ }
+
+ auto uniqCountsMap = NYT::TNode::CreateMap();
+ for (const auto& e : stat.Extended[index]->EstimatedUniqueCounts) {
+ uniqCountsMap(e.first, e.second);
+ }
+
+ xNode = NYT::TNode()
+ ("DataWeight", dataWeightMap)
+ ("EstimatedUniqueCounts", uniqCountsMap);
+ }
+
+ auto node = NYT::TNode()
+ ("DataSize", stat.DataSize[index])
+ ("Extended", xNode);
+
+ return NYT::NodeToYsonString(node, NYT::NYson::EYsonFormat::Binary);
+ }
+
+ static void DeserializePathStat(const NYT::TNode& node, TPathStatResult& stat, ui32 index) {
+ Y_ENSURE(index < stat.DataSize.size());
+ Y_ENSURE(index < stat.Extended.size());
+ stat.DataSize[index] = node["DataSize"].AsUint64();
+ stat.Extended[index] = Nothing();
+ const auto& x = node["Extended"];
+ if (!x.IsEntity()) {
+ auto& xValue = stat.Extended[index];
+ xValue.ConstructInPlace();
+ for (const auto& d : x["DataWeight"].AsMap()) {
+ xValue->DataWeight[d.first] = d.second.AsInt64();
+ }
+
+ for (const auto& e : x["EstimatedUniqueCounts"].AsMap()) {
+ xValue->EstimatedUniqueCounts[e.first] = e.second.AsUint64();
+ }
+ }
+ }
+
NThreading::TFuture<TPathStatResult> PathStat(TPathStatOptions&& options) final {
if (QContext_.CanRead()) {
- throw yexception() << "Can't replay PathStat";
+ TPathStatResult res;
+ res.DataSize.resize(options.Paths().size(), 0);
+ res.Extended.resize(options.Paths().size());
+
+ for (ui32 index = 0; index < options.Paths().size(); ++index) {
+ const auto& key = MakePathStatKey(options.Cluster(), options.Extended(), options.Paths()[index]);
+ auto item = QContext_.GetReader()->Get({YtGateway_PathStat, key}).GetValueSync();
+ if (!item) {
+ throw yexception() << "Missing replay data";
+ }
+
+ auto valueNode = NYT::NodeFromYsonString(TStringBuf(item->Value));
+ DeserializePathStat(valueNode, res, index);
+ }
+
+ res.SetSuccess();
+ return NThreading::MakeFuture<TPathStatResult>(res);
}
- return Inner_->PathStat(std::move(options));
+ auto optionsDup = options;
+ return Inner_->PathStat(std::move(options))
+ .Subscribe([optionsDup, qContext = QContext_](const NThreading::TFuture<TPathStatResult>& future) {
+ if (!qContext.CanWrite() || future.HasException()) {
+ return;
+ }
+
+ const auto& res = future.GetValueSync();
+ if (!res.Success()) {
+ return;
+ }
+
+ for (ui32 index = 0; index < optionsDup.Paths().size(); ++index) {
+ const auto& key = MakePathStatKey(optionsDup.Cluster(), optionsDup.Extended(), optionsDup.Paths()[index]);
+ auto value = SerializePathStat(res, index);
+ qContext.GetWriter()->Put({YtGateway_PathStat, key}, value).GetValueSync();
+ }
+ });
}
TPathStatResult TryPathStat(TPathStatOptions&& options) final {
if (QContext_.CanRead()) {
- throw yexception() << "Can't replay TryPathStat";
+ TPathStatResult res;
+ res.DataSize.resize(options.Paths().size(), 0);
+ res.Extended.resize(options.Paths().size());
+
+ for (ui32 index = 0; index < options.Paths().size(); ++index) {
+ const auto& key = MakePathStatKey(options.Cluster(), options.Extended(), options.Paths()[index]);
+ auto item = QContext_.GetReader()->Get({YtGateway_PathStat, key}).GetValueSync();
+ if (!item) {
+ return res;
+ }
+
+ auto valueNode = NYT::NodeFromYsonString(TStringBuf(item->Value));
+ DeserializePathStat(valueNode, res, index);
+ }
+
+ res.SetSuccess();
+ return res;
}
return Inner_->TryPathStat(std::move(options));