aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMaxim Kovalev <maxkovalev@ydb.tech>2024-11-09 00:11:28 +0300
committerGitHub <noreply@github.com>2024-11-09 00:11:28 +0300
commit4999b7ed2bd9dd0742ca43ccbef6140eceffde1d (patch)
tree8ba2799a613d1526773f5b8f8a023d316faec25c
parent3c5b82ea9f599fcafd21b2cab7fbf4403fc3856c (diff)
downloadydb-4999b7ed2bd9dd0742ca43ccbef6140eceffde1d.tar.gz
YQL: Load binaries from YT binary cache (#11295)
-rw-r--r--ydb/library/yql/providers/yt/common/yql_yt_settings.cpp1
-rw-r--r--ydb/library/yql/providers/yt/common/yql_yt_settings.h1
-rw-r--r--ydb/library/yql/providers/yt/gateway/lib/transaction_cache.cpp34
-rw-r--r--ydb/library/yql/providers/yt/gateway/lib/transaction_cache.h1
-rw-r--r--ydb/library/yql/providers/yt/gateway/native/yql_yt_spec.cpp36
-rw-r--r--ydb/library/yql/providers/yt/gateway/native/yql_yt_transform.cpp50
6 files changed, 98 insertions, 25 deletions
diff --git a/ydb/library/yql/providers/yt/common/yql_yt_settings.cpp b/ydb/library/yql/providers/yt/common/yql_yt_settings.cpp
index d9845aada6f..731c413f732 100644
--- a/ydb/library/yql/providers/yt/common/yql_yt_settings.cpp
+++ b/ydb/library/yql/providers/yt/common/yql_yt_settings.cpp
@@ -391,6 +391,7 @@ TYtConfiguration::TYtConfiguration(TTypeAnnotationContext& typeCtx)
REGISTER_SETTING(*this, LLVMNodeCountLimit);
REGISTER_SETTING(*this, SamplingIoBlockSize);
REGISTER_SETTING(*this, BinaryTmpFolder);
+ REGISTER_SETTING(*this, BinaryCacheFolder);
REGISTER_SETTING(*this, BinaryExpirationInterval);
REGISTER_SETTING(*this, FolderInlineDataLimit);
REGISTER_SETTING(*this, FolderInlineItemsLimit);
diff --git a/ydb/library/yql/providers/yt/common/yql_yt_settings.h b/ydb/library/yql/providers/yt/common/yql_yt_settings.h
index ef373ef33a5..3e3899ec6d2 100644
--- a/ydb/library/yql/providers/yt/common/yql_yt_settings.h
+++ b/ydb/library/yql/providers/yt/common/yql_yt_settings.h
@@ -101,6 +101,7 @@ struct TYtSettings {
NCommon::TConfSetting<TString, false> DefaultCluster;
NCommon::TConfSetting<TString, false> StaticPool;
NCommon::TConfSetting<TString, false> BinaryTmpFolder;
+ NCommon::TConfSetting<TString, false> BinaryCacheFolder;
NCommon::TConfSetting<TDuration, false> BinaryExpirationInterval;
NCommon::TConfSetting<bool, false> IgnoreTypeV3;
NCommon::TConfSetting<bool, false> _UseMultisetAttributes;
diff --git a/ydb/library/yql/providers/yt/gateway/lib/transaction_cache.cpp b/ydb/library/yql/providers/yt/gateway/lib/transaction_cache.cpp
index 485fe970249..5e58b6436bc 100644
--- a/ydb/library/yql/providers/yt/gateway/lib/transaction_cache.cpp
+++ b/ydb/library/yql/providers/yt/gateway/lib/transaction_cache.cpp
@@ -327,6 +327,40 @@ std::pair<TString, NYT::TTransactionId> TTransactionCache::TEntry::GetBinarySnap
return std::make_pair(snapshotPath, snapshotTx->GetId());
}
+TMaybe<std::pair<TString, NYT::TTransactionId>> TTransactionCache::TEntry::GetBinarySnapshotFromCache(TString binaryCacheFolder, const TString& md5, const TString& fileName) {
+ if (binaryCacheFolder.StartsWith(NYT::TConfig::Get()->Prefix)) {
+ binaryCacheFolder = binaryCacheFolder.substr(NYT::TConfig::Get()->Prefix.size());
+ }
+ YQL_ENSURE(md5.size() > 4);
+ TString remotePath = TFsPath(binaryCacheFolder) / md5.substr(0, 2) / md5.substr(2, 2) / md5;
+
+ ITransactionPtr snapshotTx;
+ with_lock(Lock_) {
+ if (!BinarySnapshotTx) {
+ BinarySnapshotTx = Client->StartTransaction(TStartTransactionOptions().Attributes(TransactionSpec));
+ }
+ snapshotTx = BinarySnapshotTx;
+ if (auto p = BinarySnapshots.FindPtr(remotePath)) {
+ return std::make_pair(*p, snapshotTx->GetId());
+ }
+ }
+ TString snapshotPath;
+ try {
+ NYT::ILockPtr fileLock = snapshotTx->Lock(remotePath, NYT::ELockMode::LM_SNAPSHOT);
+ snapshotPath = TStringBuilder() << '#' << GetGuidAsString(fileLock->GetLockedNodeId());
+ } catch (const TErrorResponse& e) {
+ YQL_CLOG(WARN, ProviderYt) << "Can't load binary for \"" << fileName << "\" from BinaryCacheFolder: " << e.what();
+ return Nothing();
+ }
+ with_lock(Lock_) {
+ BinarySnapshots[remotePath] = snapshotPath;
+ }
+ YQL_CLOG(DEBUG, ProviderYt) << "Snapshot \""
+ << fileName << "\" -> \"" << remotePath << "\" -> "
+ << snapshotPath << ", tx=" << GetGuidAsString(snapshotTx->GetId());
+ return std::make_pair(snapshotPath, snapshotTx->GetId());
+}
+
void TTransactionCache::TEntry::CreateDefaultTmpFolder() {
if (DefaultTmpFolder) {
Client->Create(DefaultTmpFolder, NYT::NT_MAP, NYT::TCreateOptions().Recursive(true).IgnoreExisting(true));
diff --git a/ydb/library/yql/providers/yt/gateway/lib/transaction_cache.h b/ydb/library/yql/providers/yt/gateway/lib/transaction_cache.h
index 945902416dd..089db4c5d0f 100644
--- a/ydb/library/yql/providers/yt/gateway/lib/transaction_cache.h
+++ b/ydb/library/yql/providers/yt/gateway/lib/transaction_cache.h
@@ -119,6 +119,7 @@ public:
void UpdateColumnarStat(NYT::TRichYPath ytPath, const NYT::TTableColumnarStatistics& columnStat, bool extended = false);
std::pair<TString, NYT::TTransactionId> GetBinarySnapshot(TString remoteTmpFolder, const TString& md5, const TString& localPath, TDuration expirationInterval);
+ TMaybe<std::pair<TString, NYT::TTransactionId>> GetBinarySnapshotFromCache(TString binaryCacheFolder, const TString& md5, const TString& fileName);
void CreateDefaultTmpFolder();
diff --git a/ydb/library/yql/providers/yt/gateway/native/yql_yt_spec.cpp b/ydb/library/yql/providers/yt/gateway/native/yql_yt_spec.cpp
index 66fb9081743..26846821b4b 100644
--- a/ydb/library/yql/providers/yt/gateway/native/yql_yt_spec.cpp
+++ b/ydb/library/yql/providers/yt/gateway/native/yql_yt_spec.cpp
@@ -558,14 +558,7 @@ void FillUserJobSpecImpl(NYT::TUserJobSpec& spec,
}
}
- const TString binTmpFolder = settings->BinaryTmpFolder.Get().GetOrElse(TString());
- if (!localRun && binTmpFolder) {
- const TDuration binExpiration = settings->BinaryExpirationInterval.Get().GetOrElse(TDuration());
- TTransactionCache::TEntry::TPtr entry = execCtx.GetOrCreateEntry(settings);
- TString bin = mrJobBin.empty() ? GetPersistentExecPath() : mrJobBin;
- const auto binSize = TFileStat(bin).Size;
- YQL_ENSURE(binSize != 0);
-
+ if (!localRun) {
if (mrJobBin.empty()) {
mrJobBinMd5 = GetPersistentExecPathMd5();
} else if (!mrJobBinMd5) {
@@ -576,10 +569,33 @@ void FillUserJobSpecImpl(NYT::TUserJobSpec& spec,
mrJobBinMd5 = MD5::File(mrJobBin);
}
}
+ }
- auto mrJobSnapshot = entry->GetBinarySnapshot(binTmpFolder, *mrJobBinMd5, bin, binExpiration);
- spec.JobBinaryCypressPath(mrJobSnapshot.first, mrJobSnapshot.second);
+ const TString binTmpFolder = settings->BinaryTmpFolder.Get().GetOrElse(TString());
+ const TString binCacheFolder = settings->BinaryCacheFolder.Get().GetOrElse(TString());
+ if (!localRun && (binTmpFolder || binCacheFolder)) {
+ TString bin = mrJobBin.empty() ? GetPersistentExecPath() : mrJobBin;
+ const auto binSize = TFileStat(bin).Size;
+ YQL_ENSURE(binSize != 0);
fileMemUsage += binSize;
+ TTransactionCache::TEntry::TPtr entry = execCtx.GetOrCreateEntry(settings);
+ bool useBinCache = false;
+ if (binCacheFolder) {
+ if (auto snapshot = entry->GetBinarySnapshotFromCache(binCacheFolder, *mrJobBinMd5, "mrjob")) {
+ spec.JobBinaryCypressPath(snapshot->first, snapshot->second);
+ useBinCache = true;
+ }
+ }
+ if (!useBinCache) {
+ if (binTmpFolder) {
+ const TDuration binExpiration = settings->BinaryExpirationInterval.Get().GetOrElse(TDuration());
+ auto mrJobSnapshot = entry->GetBinarySnapshot(binTmpFolder, *mrJobBinMd5, bin, binExpiration);
+ spec.JobBinaryCypressPath(mrJobSnapshot.first, mrJobSnapshot.second);
+ } else if (!mrJobBin.empty()) {
+ spec.JobBinaryLocalPath(mrJobBin, mrJobBinMd5);
+ }
+
+ }
}
else if (!mrJobBin.empty()) {
const auto binSize = TFileStat(mrJobBin).Size;
diff --git a/ydb/library/yql/providers/yt/gateway/native/yql_yt_transform.cpp b/ydb/library/yql/providers/yt/gateway/native/yql_yt_transform.cpp
index dbb366d2a55..0188682fdd0 100644
--- a/ydb/library/yql/providers/yt/gateway/native/yql_yt_transform.cpp
+++ b/ydb/library/yql/providers/yt/gateway/native/yql_yt_transform.cpp
@@ -444,23 +444,43 @@ void TGatewayTransformer::ApplyUserJobSpec(NYT::TUserJobSpec& spec, bool localRu
spec.AddLocalFile(file.first, opts);
}
const TString binTmpFolder = Settings_->BinaryTmpFolder.Get().GetOrElse(TString());
- if (localRun || !binTmpFolder) {
- for (auto& file: *DeferredUdfFiles_) {
- TAddLocalFileOptions opts;
- if (!fakeChecksum && file.second.Hash) {
- opts.MD5CheckSum(file.second.Hash);
+ const TString binCacheFolder = Settings_->BinaryCacheFolder.Get().GetOrElse(TString());
+ if (!localRun && binCacheFolder) {
+ auto udfFiles = std::move(*DeferredUdfFiles_);
+ TTransactionCache::TEntry::TPtr entry = GetEntry();
+ for (auto& file: udfFiles) {
+ YQL_ENSURE(!file.second.Hash.Empty());
+ if (auto snapshot = entry->GetBinarySnapshotFromCache(binCacheFolder, file.second.Hash, file.first)) {
+ spec.AddFile(TRichYPath(snapshot->first).TransactionId(snapshot->second)
+ .FileName(TFsPath(file.first)
+ .GetName())
+ .Executable(true)
+ .BypassArtifactCache(file.second.BypassArtifactCache));
+ } else {
+ DeferredUdfFiles_->push_back(file);
}
- YQL_ENSURE(TFileStat(file.first).Size != 0);
- opts.BypassArtifactCache(file.second.BypassArtifactCache);
- spec.AddLocalFile(file.first, opts);
}
- } else {
- const TDuration binExpiration = Settings_->BinaryExpirationInterval.Get().GetOrElse(TDuration());
- auto entry = GetEntry();
- for (auto& file: *DeferredUdfFiles_) {
- YQL_ENSURE(TFileStat(file.first).Size != 0);
- auto snapshot = entry->GetBinarySnapshot(binTmpFolder, file.second.Hash, file.first, binExpiration);
- spec.AddFile(TRichYPath(snapshot.first).TransactionId(snapshot.second).FileName(TFsPath(file.first).GetName()).Executable(true).BypassArtifactCache(file.second.BypassArtifactCache));
+ }
+ if (!DeferredUdfFiles_->empty()) {
+ if (localRun || !binTmpFolder) {
+ for (auto& file: *DeferredUdfFiles_) {
+ TAddLocalFileOptions opts;
+ if (!fakeChecksum && file.second.Hash) {
+ opts.MD5CheckSum(file.second.Hash);
+ }
+ YQL_ENSURE(TFileStat(file.first).Size != 0);
+ opts.BypassArtifactCache(file.second.BypassArtifactCache);
+ spec.AddLocalFile(file.first, opts);
+ }
+ } else {
+ const TDuration binExpiration = Settings_->BinaryExpirationInterval.Get().GetOrElse(TDuration());
+ auto entry = GetEntry();
+ for (auto& file: *DeferredUdfFiles_) {
+ YQL_ENSURE(TFileStat(file.first).Size != 0);
+ YQL_ENSURE(!file.second.Hash.Empty());
+ auto snapshot = entry->GetBinarySnapshot(binTmpFolder, file.second.Hash, file.first, binExpiration);
+ spec.AddFile(TRichYPath(snapshot.first).TransactionId(snapshot.second).FileName(TFsPath(file.first).GetName()).Executable(true).BypassArtifactCache(file.second.BypassArtifactCache));
+ }
}
}
RemoteFiles_->clear();