diff options
author | Maxim Kovalev <maxkovalev@ydb.tech> | 2024-11-09 00:11:28 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-11-09 00:11:28 +0300 |
commit | 4999b7ed2bd9dd0742ca43ccbef6140eceffde1d (patch) | |
tree | 8ba2799a613d1526773f5b8f8a023d316faec25c | |
parent | 3c5b82ea9f599fcafd21b2cab7fbf4403fc3856c (diff) | |
download | ydb-4999b7ed2bd9dd0742ca43ccbef6140eceffde1d.tar.gz |
YQL: Load binaries from YT binary cache (#11295)
6 files changed, 98 insertions, 25 deletions
diff --git a/ydb/library/yql/providers/yt/common/yql_yt_settings.cpp b/ydb/library/yql/providers/yt/common/yql_yt_settings.cpp index d9845aada6f..731c413f732 100644 --- a/ydb/library/yql/providers/yt/common/yql_yt_settings.cpp +++ b/ydb/library/yql/providers/yt/common/yql_yt_settings.cpp @@ -391,6 +391,7 @@ TYtConfiguration::TYtConfiguration(TTypeAnnotationContext& typeCtx) REGISTER_SETTING(*this, LLVMNodeCountLimit); REGISTER_SETTING(*this, SamplingIoBlockSize); REGISTER_SETTING(*this, BinaryTmpFolder); + REGISTER_SETTING(*this, BinaryCacheFolder); REGISTER_SETTING(*this, BinaryExpirationInterval); REGISTER_SETTING(*this, FolderInlineDataLimit); REGISTER_SETTING(*this, FolderInlineItemsLimit); diff --git a/ydb/library/yql/providers/yt/common/yql_yt_settings.h b/ydb/library/yql/providers/yt/common/yql_yt_settings.h index ef373ef33a5..3e3899ec6d2 100644 --- a/ydb/library/yql/providers/yt/common/yql_yt_settings.h +++ b/ydb/library/yql/providers/yt/common/yql_yt_settings.h @@ -101,6 +101,7 @@ struct TYtSettings { NCommon::TConfSetting<TString, false> DefaultCluster; NCommon::TConfSetting<TString, false> StaticPool; NCommon::TConfSetting<TString, false> BinaryTmpFolder; + NCommon::TConfSetting<TString, false> BinaryCacheFolder; NCommon::TConfSetting<TDuration, false> BinaryExpirationInterval; NCommon::TConfSetting<bool, false> IgnoreTypeV3; NCommon::TConfSetting<bool, false> _UseMultisetAttributes; diff --git a/ydb/library/yql/providers/yt/gateway/lib/transaction_cache.cpp b/ydb/library/yql/providers/yt/gateway/lib/transaction_cache.cpp index 485fe970249..5e58b6436bc 100644 --- a/ydb/library/yql/providers/yt/gateway/lib/transaction_cache.cpp +++ b/ydb/library/yql/providers/yt/gateway/lib/transaction_cache.cpp @@ -327,6 +327,40 @@ std::pair<TString, NYT::TTransactionId> TTransactionCache::TEntry::GetBinarySnap return std::make_pair(snapshotPath, snapshotTx->GetId()); } +TMaybe<std::pair<TString, NYT::TTransactionId>> TTransactionCache::TEntry::GetBinarySnapshotFromCache(TString binaryCacheFolder, const TString& md5, const TString& fileName) { + if (binaryCacheFolder.StartsWith(NYT::TConfig::Get()->Prefix)) { + binaryCacheFolder = binaryCacheFolder.substr(NYT::TConfig::Get()->Prefix.size()); + } + YQL_ENSURE(md5.size() > 4); + TString remotePath = TFsPath(binaryCacheFolder) / md5.substr(0, 2) / md5.substr(2, 2) / md5; + + ITransactionPtr snapshotTx; + with_lock(Lock_) { + if (!BinarySnapshotTx) { + BinarySnapshotTx = Client->StartTransaction(TStartTransactionOptions().Attributes(TransactionSpec)); + } + snapshotTx = BinarySnapshotTx; + if (auto p = BinarySnapshots.FindPtr(remotePath)) { + return std::make_pair(*p, snapshotTx->GetId()); + } + } + TString snapshotPath; + try { + NYT::ILockPtr fileLock = snapshotTx->Lock(remotePath, NYT::ELockMode::LM_SNAPSHOT); + snapshotPath = TStringBuilder() << '#' << GetGuidAsString(fileLock->GetLockedNodeId()); + } catch (const TErrorResponse& e) { + YQL_CLOG(WARN, ProviderYt) << "Can't load binary for \"" << fileName << "\" from BinaryCacheFolder: " << e.what(); + return Nothing(); + } + with_lock(Lock_) { + BinarySnapshots[remotePath] = snapshotPath; + } + YQL_CLOG(DEBUG, ProviderYt) << "Snapshot \"" + << fileName << "\" -> \"" << remotePath << "\" -> " + << snapshotPath << ", tx=" << GetGuidAsString(snapshotTx->GetId()); + return std::make_pair(snapshotPath, snapshotTx->GetId()); +} + void TTransactionCache::TEntry::CreateDefaultTmpFolder() { if (DefaultTmpFolder) { Client->Create(DefaultTmpFolder, NYT::NT_MAP, NYT::TCreateOptions().Recursive(true).IgnoreExisting(true)); diff --git a/ydb/library/yql/providers/yt/gateway/lib/transaction_cache.h b/ydb/library/yql/providers/yt/gateway/lib/transaction_cache.h index 945902416dd..089db4c5d0f 100644 --- a/ydb/library/yql/providers/yt/gateway/lib/transaction_cache.h +++ b/ydb/library/yql/providers/yt/gateway/lib/transaction_cache.h @@ -119,6 +119,7 @@ public: void UpdateColumnarStat(NYT::TRichYPath ytPath, const NYT::TTableColumnarStatistics& columnStat, bool extended = false); std::pair<TString, NYT::TTransactionId> GetBinarySnapshot(TString remoteTmpFolder, const TString& md5, const TString& localPath, TDuration expirationInterval); + TMaybe<std::pair<TString, NYT::TTransactionId>> GetBinarySnapshotFromCache(TString binaryCacheFolder, const TString& md5, const TString& fileName); void CreateDefaultTmpFolder(); diff --git a/ydb/library/yql/providers/yt/gateway/native/yql_yt_spec.cpp b/ydb/library/yql/providers/yt/gateway/native/yql_yt_spec.cpp index 66fb9081743..26846821b4b 100644 --- a/ydb/library/yql/providers/yt/gateway/native/yql_yt_spec.cpp +++ b/ydb/library/yql/providers/yt/gateway/native/yql_yt_spec.cpp @@ -558,14 +558,7 @@ void FillUserJobSpecImpl(NYT::TUserJobSpec& spec, } } - const TString binTmpFolder = settings->BinaryTmpFolder.Get().GetOrElse(TString()); - if (!localRun && binTmpFolder) { - const TDuration binExpiration = settings->BinaryExpirationInterval.Get().GetOrElse(TDuration()); - TTransactionCache::TEntry::TPtr entry = execCtx.GetOrCreateEntry(settings); - TString bin = mrJobBin.empty() ? GetPersistentExecPath() : mrJobBin; - const auto binSize = TFileStat(bin).Size; - YQL_ENSURE(binSize != 0); - + if (!localRun) { if (mrJobBin.empty()) { mrJobBinMd5 = GetPersistentExecPathMd5(); } else if (!mrJobBinMd5) { @@ -576,10 +569,33 @@ void FillUserJobSpecImpl(NYT::TUserJobSpec& spec, mrJobBinMd5 = MD5::File(mrJobBin); } } + } - auto mrJobSnapshot = entry->GetBinarySnapshot(binTmpFolder, *mrJobBinMd5, bin, binExpiration); - spec.JobBinaryCypressPath(mrJobSnapshot.first, mrJobSnapshot.second); + const TString binTmpFolder = settings->BinaryTmpFolder.Get().GetOrElse(TString()); + const TString binCacheFolder = settings->BinaryCacheFolder.Get().GetOrElse(TString()); + if (!localRun && (binTmpFolder || binCacheFolder)) { + TString bin = mrJobBin.empty() ? GetPersistentExecPath() : mrJobBin; + const auto binSize = TFileStat(bin).Size; + YQL_ENSURE(binSize != 0); fileMemUsage += binSize; + TTransactionCache::TEntry::TPtr entry = execCtx.GetOrCreateEntry(settings); + bool useBinCache = false; + if (binCacheFolder) { + if (auto snapshot = entry->GetBinarySnapshotFromCache(binCacheFolder, *mrJobBinMd5, "mrjob")) { + spec.JobBinaryCypressPath(snapshot->first, snapshot->second); + useBinCache = true; + } + } + if (!useBinCache) { + if (binTmpFolder) { + const TDuration binExpiration = settings->BinaryExpirationInterval.Get().GetOrElse(TDuration()); + auto mrJobSnapshot = entry->GetBinarySnapshot(binTmpFolder, *mrJobBinMd5, bin, binExpiration); + spec.JobBinaryCypressPath(mrJobSnapshot.first, mrJobSnapshot.second); + } else if (!mrJobBin.empty()) { + spec.JobBinaryLocalPath(mrJobBin, mrJobBinMd5); + } + + } } else if (!mrJobBin.empty()) { const auto binSize = TFileStat(mrJobBin).Size; diff --git a/ydb/library/yql/providers/yt/gateway/native/yql_yt_transform.cpp b/ydb/library/yql/providers/yt/gateway/native/yql_yt_transform.cpp index dbb366d2a55..0188682fdd0 100644 --- a/ydb/library/yql/providers/yt/gateway/native/yql_yt_transform.cpp +++ b/ydb/library/yql/providers/yt/gateway/native/yql_yt_transform.cpp @@ -444,23 +444,43 @@ void TGatewayTransformer::ApplyUserJobSpec(NYT::TUserJobSpec& spec, bool localRu spec.AddLocalFile(file.first, opts); } const TString binTmpFolder = Settings_->BinaryTmpFolder.Get().GetOrElse(TString()); - if (localRun || !binTmpFolder) { - for (auto& file: *DeferredUdfFiles_) { - TAddLocalFileOptions opts; - if (!fakeChecksum && file.second.Hash) { - opts.MD5CheckSum(file.second.Hash); + const TString binCacheFolder = Settings_->BinaryCacheFolder.Get().GetOrElse(TString()); + if (!localRun && binCacheFolder) { + auto udfFiles = std::move(*DeferredUdfFiles_); + TTransactionCache::TEntry::TPtr entry = GetEntry(); + for (auto& file: udfFiles) { + YQL_ENSURE(!file.second.Hash.Empty()); + if (auto snapshot = entry->GetBinarySnapshotFromCache(binCacheFolder, file.second.Hash, file.first)) { + spec.AddFile(TRichYPath(snapshot->first).TransactionId(snapshot->second) + .FileName(TFsPath(file.first) + .GetName()) + .Executable(true) + .BypassArtifactCache(file.second.BypassArtifactCache)); + } else { + DeferredUdfFiles_->push_back(file); } - YQL_ENSURE(TFileStat(file.first).Size != 0); - opts.BypassArtifactCache(file.second.BypassArtifactCache); - spec.AddLocalFile(file.first, opts); } - } else { - const TDuration binExpiration = Settings_->BinaryExpirationInterval.Get().GetOrElse(TDuration()); - auto entry = GetEntry(); - for (auto& file: *DeferredUdfFiles_) { - YQL_ENSURE(TFileStat(file.first).Size != 0); - auto snapshot = entry->GetBinarySnapshot(binTmpFolder, file.second.Hash, file.first, binExpiration); - spec.AddFile(TRichYPath(snapshot.first).TransactionId(snapshot.second).FileName(TFsPath(file.first).GetName()).Executable(true).BypassArtifactCache(file.second.BypassArtifactCache)); + } + if (!DeferredUdfFiles_->empty()) { + if (localRun || !binTmpFolder) { + for (auto& file: *DeferredUdfFiles_) { + TAddLocalFileOptions opts; + if (!fakeChecksum && file.second.Hash) { + opts.MD5CheckSum(file.second.Hash); + } + YQL_ENSURE(TFileStat(file.first).Size != 0); + opts.BypassArtifactCache(file.second.BypassArtifactCache); + spec.AddLocalFile(file.first, opts); + } + } else { + const TDuration binExpiration = Settings_->BinaryExpirationInterval.Get().GetOrElse(TDuration()); + auto entry = GetEntry(); + for (auto& file: *DeferredUdfFiles_) { + YQL_ENSURE(TFileStat(file.first).Size != 0); + YQL_ENSURE(!file.second.Hash.Empty()); + auto snapshot = entry->GetBinarySnapshot(binTmpFolder, file.second.Hash, file.first, binExpiration); + spec.AddFile(TRichYPath(snapshot.first).TransactionId(snapshot.second).FileName(TFsPath(file.first).GetName()).Executable(true).BypassArtifactCache(file.second.BypassArtifactCache)); + } } } RemoteFiles_->clear(); |