diff options
author | Vitalii Gridnev <gridnevvvit@gmail.com> | 2024-07-01 21:18:50 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-07-01 21:18:50 +0300 |
commit | d82c9ad5595510629fea2aa1f037fd18ed7ae4db (patch) | |
tree | 2119b2d2a8a01f3abd8f6a53d99ab75b98d9739e | |
parent | a48b1893c9e422dbf27589a70c2f4fe5fa8404a8 (diff) | |
download | ydb-d82c9ad5595510629fea2aa1f037fd18ed7ae4db.tar.gz |
fix saveload logic & support loading custom udfs (#6144)
-rw-r--r-- | ydb/tools/query_replay_yt/main.cpp | 55 | ||||
-rw-r--r-- | ydb/tools/query_replay_yt/query_compiler.cpp | 13 | ||||
-rw-r--r-- | ydb/tools/query_replay_yt/query_replay.cpp | 3 | ||||
-rw-r--r-- | ydb/tools/query_replay_yt/query_replay.h | 1 |
4 files changed, 63 insertions, 9 deletions
diff --git a/ydb/tools/query_replay_yt/main.cpp b/ydb/tools/query_replay_yt/main.cpp index 55b9a09260..c9237e84a2 100644 --- a/ydb/tools/query_replay_yt/main.cpp +++ b/ydb/tools/query_replay_yt/main.cpp @@ -13,8 +13,28 @@ #include <yt/cpp/mapreduce/interface/logging/logger.h> +#include <util/string/split.h> + using namespace NActors; +TVector<std::pair<TString, TString>> GetJobFiles(TVector<TString> udfs) { + TVector<std::pair<TString, TString>> result; + + for(const TString& udf: udfs) { + TVector<TString> splitResult; + Split(udf.data(), "/", splitResult); + while(!splitResult.empty() && splitResult.back().empty()) { + splitResult.pop_back(); + } + + Y_ENSURE(!splitResult.empty()); + + result.push_back(std::make_pair(udf, splitResult.back())); + } + + return result; +} + class TQueryReplayMapper : public NYT::IMapper<NYT::TTableReader<NYT::TNode>, NYT::TTableWriter<NYT::TNode>> { @@ -25,7 +45,8 @@ class TQueryReplayMapper TIntrusivePtr<NKikimr::NMiniKQL::IMutableFunctionRegistry> FunctionRegistry; TIntrusivePtr<NKikimr::NKqp::TModuleResolverState> ModuleResolverState; - TQueryReplayConfig Config; + TVector<TString> UdfFiles; + ui32 ActorSystemThreadsCount = 5; TString GetFailReason(const TQueryReplayEvents::TCheckQueryPlanStatus& status) { switch (status) { @@ -58,16 +79,30 @@ class TQueryReplayMapper public: TQueryReplayMapper() = default; - TQueryReplayMapper(const TQueryReplayConfig& config) : Config(config) { - } + + Y_SAVELOAD_JOB(UdfFiles, ActorSystemThreadsCount); + + TQueryReplayMapper(TVector<TString> udfFiles, ui32 actorSystemThreadsCount) + : UdfFiles(udfFiles) + , ActorSystemThreadsCount(actorSystemThreadsCount) + {} void Start(NYT::TTableWriter<NYT::TNode>*) override { TypeRegistry.Reset(new NKikimr::NScheme::TKikimrTypeRegistry()); FunctionRegistry.Reset(NKikimr::NMiniKQL::CreateFunctionRegistry(NKikimr::NMiniKQL::CreateBuiltinRegistry())->Clone()); NKikimr::NMiniKQL::FillStaticModules(*FunctionRegistry); + NKikimr::NMiniKQL::TUdfModuleRemappings remappings; + THashSet<TString> usedUdfPaths; + + for(const auto& [_, udfPath]: GetJobFiles(UdfFiles)) { + if (usedUdfPaths.insert(udfPath).second) { + FunctionRegistry->LoadUdfs(udfPath, remappings, 0); + } + } + AppData.Reset(new NKikimr::TAppData(0, 0, 0, 0, {}, TypeRegistry.Get(), FunctionRegistry.Get(), nullptr, nullptr)); AppData->Counters = MakeIntrusive<NMonitoring::TDynamicCounters>(new NMonitoring::TDynamicCounters()); - auto setup = BuildActorSystemSetup(Config.ActorSystemThreadsCount); + auto setup = BuildActorSystemSetup(ActorSystemThreadsCount); ActorSystem.Reset(new TActorSystem(setup, AppData.Get())); ActorSystem->Start(); ActorSystem->Register(NKikimr::NKqp::CreateKqpResourceManagerActor({}, nullptr)); @@ -164,9 +199,17 @@ int main(int argc, const char** argv) { NYT::TMapOperationSpec spec; spec.AddInput<NYT::TNode>(config.SrcPath); spec.AddOutput<NYT::TNode>(NYT::TRichYPath(config.DstPath).Schema(OutputSchema())); - spec.MapperSpec(NYT::TUserJobSpec().MemoryLimit(5_GB)); - client->Map(spec, new TQueryReplayMapper(config)); + auto userJobSpec = NYT::TUserJobSpec(); + userJobSpec.MemoryLimit(1_GB); + + for(const auto& [udf, udfInJob]: GetJobFiles(config.UdfFiles)) { + userJobSpec.AddLocalFile(udf, NYT::TAddLocalFileOptions().PathInJob(udfInJob)); + } + + spec.MapperSpec(userJobSpec); + + client->Map(spec, new TQueryReplayMapper(config.UdfFiles, config.ActorSystemThreadsCount)); return EXIT_SUCCESS; } diff --git a/ydb/tools/query_replay_yt/query_compiler.cpp b/ydb/tools/query_replay_yt/query_compiler.cpp index b5d69560a8..64cd4aecfd 100644 --- a/ydb/tools/query_replay_yt/query_compiler.cpp +++ b/ydb/tools/query_replay_yt/query_compiler.cpp @@ -221,6 +221,7 @@ public: , Config(MakeIntrusive<TKikimrConfiguration>()) , FunctionRegistry(functionRegistry) { + Config->EnableKqpScanQueryStreamLookup = true; } void Bootstrap() { @@ -278,8 +279,16 @@ private: case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT: AsyncCompileResult = KqpHost->PrepareGenericScript(Query->Text, prepareSettings); break; - case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY: - case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY: + case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY: { + prepareSettings.ConcurrentResults = false; + AsyncCompileResult = KqpHost->PrepareGenericQuery(Query->Text, prepareSettings, nullptr); + break; + } + case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY: { + AsyncCompileResult = KqpHost->PrepareGenericQuery(Query->Text, prepareSettings, nullptr); + break; + } + default: YQL_ENSURE(false, "Unexpected query type: " << Query->Settings.QueryType); } diff --git a/ydb/tools/query_replay_yt/query_replay.cpp b/ydb/tools/query_replay_yt/query_replay.cpp index 1cbfedc057..ddc1b4417f 100644 --- a/ydb/tools/query_replay_yt/query_replay.cpp +++ b/ydb/tools/query_replay_yt/query_replay.cpp @@ -17,7 +17,8 @@ void TQueryReplayConfig::ParseConfig(int argc, const char** argv) { opts.AddLongOption("cluster", "YT cluster").StoreResult(&Cluster).Required(); opts.AddLongOption("src-path", "Source table path").StoreResult(&SrcPath).Required(); opts.AddLongOption("dst-path", "Target table path").StoreResult(&DstPath).Required(); - opts.AddLongOption("threads", "Number of ActorSystem threads").StoreResult(&DstPath); + opts.AddLongOption("threads", "Number of ActorSystem threads").StoreResult(&ActorSystemThreadsCount); + opts.AddLongOption("udf-file", "UDFS to load").AppendTo(&UdfFiles); NLastGetopt::TOptsParseResult parseResult(&opts, argc, argv); } diff --git a/ydb/tools/query_replay_yt/query_replay.h b/ydb/tools/query_replay_yt/query_replay.h index 062a966925..10cfff2837 100644 --- a/ydb/tools/query_replay_yt/query_replay.h +++ b/ydb/tools/query_replay_yt/query_replay.h @@ -18,6 +18,7 @@ struct TQueryReplayConfig { TString SrcPath; TString DstPath; ui32 ActorSystemThreadsCount = 5; + TVector<TString> UdfFiles; void ParseConfig(int argc, const char** argv); }; |