aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVitalii Gridnev <gridnevvvit@gmail.com>2024-07-01 21:18:50 +0300
committerGitHub <noreply@github.com>2024-07-01 21:18:50 +0300
commitd82c9ad5595510629fea2aa1f037fd18ed7ae4db (patch)
tree2119b2d2a8a01f3abd8f6a53d99ab75b98d9739e
parenta48b1893c9e422dbf27589a70c2f4fe5fa8404a8 (diff)
downloadydb-d82c9ad5595510629fea2aa1f037fd18ed7ae4db.tar.gz
fix saveload logic & support loading custom udfs (#6144)
-rw-r--r--ydb/tools/query_replay_yt/main.cpp55
-rw-r--r--ydb/tools/query_replay_yt/query_compiler.cpp13
-rw-r--r--ydb/tools/query_replay_yt/query_replay.cpp3
-rw-r--r--ydb/tools/query_replay_yt/query_replay.h1
4 files changed, 63 insertions, 9 deletions
diff --git a/ydb/tools/query_replay_yt/main.cpp b/ydb/tools/query_replay_yt/main.cpp
index 55b9a09260..c9237e84a2 100644
--- a/ydb/tools/query_replay_yt/main.cpp
+++ b/ydb/tools/query_replay_yt/main.cpp
@@ -13,8 +13,28 @@
#include <yt/cpp/mapreduce/interface/logging/logger.h>
+#include <util/string/split.h>
+
using namespace NActors;
+TVector<std::pair<TString, TString>> GetJobFiles(TVector<TString> udfs) {
+ TVector<std::pair<TString, TString>> result;
+
+ for(const TString& udf: udfs) {
+ TVector<TString> splitResult;
+ Split(udf.data(), "/", splitResult);
+ while(!splitResult.empty() && splitResult.back().empty()) {
+ splitResult.pop_back();
+ }
+
+ Y_ENSURE(!splitResult.empty());
+
+ result.push_back(std::make_pair(udf, splitResult.back()));
+ }
+
+ return result;
+}
+
class TQueryReplayMapper
: public NYT::IMapper<NYT::TTableReader<NYT::TNode>, NYT::TTableWriter<NYT::TNode>>
{
@@ -25,7 +45,8 @@ class TQueryReplayMapper
TIntrusivePtr<NKikimr::NMiniKQL::IMutableFunctionRegistry> FunctionRegistry;
TIntrusivePtr<NKikimr::NKqp::TModuleResolverState> ModuleResolverState;
- TQueryReplayConfig Config;
+ TVector<TString> UdfFiles;
+ ui32 ActorSystemThreadsCount = 5;
TString GetFailReason(const TQueryReplayEvents::TCheckQueryPlanStatus& status) {
switch (status) {
@@ -58,16 +79,30 @@ class TQueryReplayMapper
public:
TQueryReplayMapper() = default;
- TQueryReplayMapper(const TQueryReplayConfig& config) : Config(config) {
- }
+
+ Y_SAVELOAD_JOB(UdfFiles, ActorSystemThreadsCount);
+
+ TQueryReplayMapper(TVector<TString> udfFiles, ui32 actorSystemThreadsCount)
+ : UdfFiles(udfFiles)
+ , ActorSystemThreadsCount(actorSystemThreadsCount)
+ {}
void Start(NYT::TTableWriter<NYT::TNode>*) override {
TypeRegistry.Reset(new NKikimr::NScheme::TKikimrTypeRegistry());
FunctionRegistry.Reset(NKikimr::NMiniKQL::CreateFunctionRegistry(NKikimr::NMiniKQL::CreateBuiltinRegistry())->Clone());
NKikimr::NMiniKQL::FillStaticModules(*FunctionRegistry);
+ NKikimr::NMiniKQL::TUdfModuleRemappings remappings;
+ THashSet<TString> usedUdfPaths;
+
+ for(const auto& [_, udfPath]: GetJobFiles(UdfFiles)) {
+ if (usedUdfPaths.insert(udfPath).second) {
+ FunctionRegistry->LoadUdfs(udfPath, remappings, 0);
+ }
+ }
+
AppData.Reset(new NKikimr::TAppData(0, 0, 0, 0, {}, TypeRegistry.Get(), FunctionRegistry.Get(), nullptr, nullptr));
AppData->Counters = MakeIntrusive<NMonitoring::TDynamicCounters>(new NMonitoring::TDynamicCounters());
- auto setup = BuildActorSystemSetup(Config.ActorSystemThreadsCount);
+ auto setup = BuildActorSystemSetup(ActorSystemThreadsCount);
ActorSystem.Reset(new TActorSystem(setup, AppData.Get()));
ActorSystem->Start();
ActorSystem->Register(NKikimr::NKqp::CreateKqpResourceManagerActor({}, nullptr));
@@ -164,9 +199,17 @@ int main(int argc, const char** argv) {
NYT::TMapOperationSpec spec;
spec.AddInput<NYT::TNode>(config.SrcPath);
spec.AddOutput<NYT::TNode>(NYT::TRichYPath(config.DstPath).Schema(OutputSchema()));
- spec.MapperSpec(NYT::TUserJobSpec().MemoryLimit(5_GB));
- client->Map(spec, new TQueryReplayMapper(config));
+ auto userJobSpec = NYT::TUserJobSpec();
+ userJobSpec.MemoryLimit(1_GB);
+
+ for(const auto& [udf, udfInJob]: GetJobFiles(config.UdfFiles)) {
+ userJobSpec.AddLocalFile(udf, NYT::TAddLocalFileOptions().PathInJob(udfInJob));
+ }
+
+ spec.MapperSpec(userJobSpec);
+
+ client->Map(spec, new TQueryReplayMapper(config.UdfFiles, config.ActorSystemThreadsCount));
return EXIT_SUCCESS;
}
diff --git a/ydb/tools/query_replay_yt/query_compiler.cpp b/ydb/tools/query_replay_yt/query_compiler.cpp
index b5d69560a8..64cd4aecfd 100644
--- a/ydb/tools/query_replay_yt/query_compiler.cpp
+++ b/ydb/tools/query_replay_yt/query_compiler.cpp
@@ -221,6 +221,7 @@ public:
, Config(MakeIntrusive<TKikimrConfiguration>())
, FunctionRegistry(functionRegistry)
{
+ Config->EnableKqpScanQueryStreamLookup = true;
}
void Bootstrap() {
@@ -278,8 +279,16 @@ private:
case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_SCRIPT:
AsyncCompileResult = KqpHost->PrepareGenericScript(Query->Text, prepareSettings);
break;
- case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY:
- case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY:
+ case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY: {
+ prepareSettings.ConcurrentResults = false;
+ AsyncCompileResult = KqpHost->PrepareGenericQuery(Query->Text, prepareSettings, nullptr);
+ break;
+ }
+ case NKikimrKqp::QUERY_TYPE_SQL_GENERIC_CONCURRENT_QUERY: {
+ AsyncCompileResult = KqpHost->PrepareGenericQuery(Query->Text, prepareSettings, nullptr);
+ break;
+ }
+
default:
YQL_ENSURE(false, "Unexpected query type: " << Query->Settings.QueryType);
}
diff --git a/ydb/tools/query_replay_yt/query_replay.cpp b/ydb/tools/query_replay_yt/query_replay.cpp
index 1cbfedc057..ddc1b4417f 100644
--- a/ydb/tools/query_replay_yt/query_replay.cpp
+++ b/ydb/tools/query_replay_yt/query_replay.cpp
@@ -17,7 +17,8 @@ void TQueryReplayConfig::ParseConfig(int argc, const char** argv) {
opts.AddLongOption("cluster", "YT cluster").StoreResult(&Cluster).Required();
opts.AddLongOption("src-path", "Source table path").StoreResult(&SrcPath).Required();
opts.AddLongOption("dst-path", "Target table path").StoreResult(&DstPath).Required();
- opts.AddLongOption("threads", "Number of ActorSystem threads").StoreResult(&DstPath);
+ opts.AddLongOption("threads", "Number of ActorSystem threads").StoreResult(&ActorSystemThreadsCount);
+ opts.AddLongOption("udf-file", "UDFS to load").AppendTo(&UdfFiles);
NLastGetopt::TOptsParseResult parseResult(&opts, argc, argv);
}
diff --git a/ydb/tools/query_replay_yt/query_replay.h b/ydb/tools/query_replay_yt/query_replay.h
index 062a966925..10cfff2837 100644
--- a/ydb/tools/query_replay_yt/query_replay.h
+++ b/ydb/tools/query_replay_yt/query_replay.h
@@ -18,6 +18,7 @@ struct TQueryReplayConfig {
TString SrcPath;
TString DstPath;
ui32 ActorSystemThreadsCount = 5;
+ TVector<TString> UdfFiles;
void ParseConfig(int argc, const char** argv);
};