diff options
author | arotchev <arotchev@yandex-team.ru> | 2022-02-10 16:52:16 +0300 |
---|---|---|
committer | Daniil Cherednik <dcherednik@yandex-team.ru> | 2022-02-10 16:52:16 +0300 |
commit | 2e1daf96e8e32937eb55bc8c8322b6c310a9cef5 (patch) | |
tree | 5af582ebc783a8e831852502368e53d995e40f02 | |
parent | bc9502dd9565b1d3084b1871eba035befe00d7e5 (diff) | |
download | ydb-2e1daf96e8e32937eb55bc8c8322b6c310a9cef5.tar.gz |
Restoring authorship annotation for <arotchev@yandex-team.ru>. Commit 1 of 2.
27 files changed, 280 insertions, 280 deletions
diff --git a/ydb/library/yql/core/yql_user_data_storage.cpp b/ydb/library/yql/core/yql_user_data_storage.cpp index 1a9d725d274..e5da5463642 100644 --- a/ydb/library/yql/core/yql_user_data_storage.cpp +++ b/ydb/library/yql/core/yql_user_data_storage.cpp @@ -122,13 +122,13 @@ TString TUserDataStorage::MakeFolderName(const TStringBuf& name) { return fullName; } -TString TUserDataStorage::MakeRelativeName(const TStringBuf& name) { - if (name.StartsWith(HomePath)) { - return TString(name.substr(HomePath.size())); - } - return name.StartsWith(Sep) ? TString(name.substr(1)) : TString(name); -} - +TString TUserDataStorage::MakeRelativeName(const TStringBuf& name) { + if (name.StartsWith(HomePath)) { + return TString(name.substr(HomePath.size())); + } + return name.StartsWith(Sep) ? TString(name.substr(1)) : TString(name); +} + TUserDataKey TUserDataStorage::ComposeUserDataKey(const TStringBuf& name) { auto fullName = MakeFullName(name); return TUserDataKey::File(fullName); @@ -139,13 +139,13 @@ bool TUserDataStorage::ContainsUserDataFolder(const TStringBuf& name) const { } TMaybe<std::map<TUserDataKey, const TUserDataBlock*>> TUserDataStorage::FindUserDataFolder(const TStringBuf& name, ui32 maxFileCount) const { - return FindUserDataFolder(UserData_,name,maxFileCount); -} - -TMaybe<std::map<TUserDataKey, const TUserDataBlock*>> TUserDataStorage::FindUserDataFolder(const TUserDataTable& userData, const TStringBuf& name, ui32 maxFileCount) { + return FindUserDataFolder(UserData_,name,maxFileCount); +} + +TMaybe<std::map<TUserDataKey, const TUserDataBlock*>> TUserDataStorage::FindUserDataFolder(const TUserDataTable& userData, const TStringBuf& name, ui32 maxFileCount) { auto fullName = MakeFolderName(name); TMaybe<std::map<TUserDataKey, const TUserDataBlock*>> res; - EnumFolderContent(userData, fullName, maxFileCount, [&](auto& key, auto& block) { + EnumFolderContent(userData, fullName, maxFileCount, [&](auto& key, auto& block) { if (!res) { res.ConstructInPlace(); } diff --git a/ydb/library/yql/core/yql_user_data_storage.h b/ydb/library/yql/core/yql_user_data_storage.h index f1575cfffef..b2867419b47 100644 --- a/ydb/library/yql/core/yql_user_data_storage.h +++ b/ydb/library/yql/core/yql_user_data_storage.h @@ -38,15 +38,15 @@ public: bool ContainsUserDataFolder(const TStringBuf& name) const; TMaybe<std::map<TUserDataKey, const TUserDataBlock*>> FindUserDataFolder(const TStringBuf& name, ui32 maxFileCount = ~0u) const; - static TMaybe<std::map<TUserDataKey, const TUserDataBlock*>> FindUserDataFolder(const TUserDataTable& userData, const TStringBuf& name, ui32 maxFileCount = ~0u); - + static TMaybe<std::map<TUserDataKey, const TUserDataBlock*>> FindUserDataFolder(const TUserDataTable& userData, const TStringBuf& name, ui32 maxFileCount = ~0u); + void FillUserDataTokens(); void TryFillUserDataToken(TUserDataBlock& block) const; std::map<TString, const TUserDataBlock*> GetDirectoryContent(const TStringBuf& path, ui32 maxFileCount = ~0u) const; static TString MakeFullName(const TStringBuf& name); static TString MakeFolderName(const TStringBuf& name); static TUserDataKey ComposeUserDataKey(const TStringBuf& name); - static TString MakeRelativeName(const TStringBuf& name); + static TString MakeRelativeName(const TStringBuf& name); TVector<TString> GetLibraries() const; // working with frozen files diff --git a/ydb/library/yql/dq/common/dq_common.h b/ydb/library/yql/dq/common/dq_common.h index 71cc7be6cb8..a2a5bfac076 100644 --- a/ydb/library/yql/dq/common/dq_common.h +++ b/ydb/library/yql/dq/common/dq_common.h @@ -23,8 +23,8 @@ struct TBaseDqResManEvents { ES_CLUSTER_STATUS, ES_CLUSTER_STATUS_RESPONSE, - ES_IS_READY, - ES_IS_READY_RESPONSE, + ES_IS_READY, + ES_IS_READY_RESPONSE, ES_JOB_STOP, ES_JOB_STOP_RESPONSE, diff --git a/ydb/library/yql/providers/common/metrics/metrics_registry.cpp b/ydb/library/yql/providers/common/metrics/metrics_registry.cpp index 1fb7bdcbba7..c93bbf9bbee 100644 --- a/ydb/library/yql/providers/common/metrics/metrics_registry.cpp +++ b/ydb/library/yql/providers/common/metrics/metrics_registry.cpp @@ -185,11 +185,11 @@ public: } void MergeSnapshot(const NProto::TMetricsRegistrySnapshot& snapshot) override { - MergeFromGroupProto( - snapshot.HasMergeToRoot() - ? GetSensorsRootGroup().Get() - : Sensors_.Get(), - snapshot.GetRootGroup(), + MergeFromGroupProto( + snapshot.HasMergeToRoot() + ? GetSensorsRootGroup().Get() + : Sensors_.Get(), + snapshot.GetRootGroup(), snapshot.HasDontIncrement() ? snapshot.GetDontIncrement() : false); @@ -203,10 +203,10 @@ public: // do nothing } - TSensorsGroupPtr GetSensors() override { - return Sensors_.Get(); - } - + TSensorsGroupPtr GetSensors() override { + return Sensors_.Get(); + } + private: TSensorCounterPtr GetCounter( const TString& labelName, diff --git a/ydb/library/yql/providers/common/metrics/metrics_registry.h b/ydb/library/yql/providers/common/metrics/metrics_registry.h index d344ba9dd98..f6042f7bed4 100644 --- a/ydb/library/yql/providers/common/metrics/metrics_registry.h +++ b/ydb/library/yql/providers/common/metrics/metrics_registry.h @@ -2,7 +2,7 @@ #include "sensors_group.h" -#include <util/generic/yexception.h> +#include <util/generic/yexception.h> namespace NYql { @@ -49,10 +49,10 @@ struct IMetricsRegistry: public TThrRefBase { const TString& userName) const = 0; virtual void Flush() = 0; - - virtual TSensorsGroupPtr GetSensors() { - ythrow yexception() << "Not implemented"; - } + + virtual TSensorsGroupPtr GetSensors() { + ythrow yexception() << "Not implemented"; + } }; diff --git a/ydb/library/yql/providers/common/metrics/protos/metrics_registry.proto b/ydb/library/yql/providers/common/metrics/protos/metrics_registry.proto index 07144462582..2d19be90f86 100644 --- a/ydb/library/yql/providers/common/metrics/protos/metrics_registry.proto +++ b/ydb/library/yql/providers/common/metrics/protos/metrics_registry.proto @@ -27,5 +27,5 @@ message TCounterGroup { message TMetricsRegistrySnapshot { optional TCounterGroup RootGroup = 1; optional bool DontIncrement = 2; - optional bool MergeToRoot = 3; + optional bool MergeToRoot = 3; } diff --git a/ydb/library/yql/providers/common/metrics/sensors_group.h b/ydb/library/yql/providers/common/metrics/sensors_group.h index 956a0c6fe22..d2b5c0bb56d 100644 --- a/ydb/library/yql/providers/common/metrics/sensors_group.h +++ b/ydb/library/yql/providers/common/metrics/sensors_group.h @@ -9,7 +9,7 @@ namespace NSensorComponent { static const TString kWorkerServer = "worker_server"; static const TString kDataServer = "data_server"; static const TString kInspectorClient = "inspector_client"; - static const TString kDq = "dq"; + static const TString kDq = "dq"; } // namspace NSensorComponent diff --git a/ydb/library/yql/providers/dq/actors/executer_actor.cpp b/ydb/library/yql/providers/dq/actors/executer_actor.cpp index 91f1caee748..acedfc27f45 100644 --- a/ydb/library/yql/providers/dq/actors/executer_actor.cpp +++ b/ydb/library/yql/providers/dq/actors/executer_actor.cpp @@ -48,7 +48,7 @@ public: : TRichActor<TDqExecuter>(&TDqExecuter::Handler) , GwmActorId(gwmActorId) , PrinterId(printerId) - , Settings(settings) + , Settings(settings) , TraceId(traceId) , Username(username) , Counters(counters) // root, component=dq @@ -444,7 +444,7 @@ private: NActors::TActorId GwmActorId; NActors::TActorId PrinterId; - TDqConfiguration::TPtr Settings; + TDqConfiguration::TPtr Settings; NActors::TActorId ControlId; NActors::TActorId ResultId; diff --git a/ydb/library/yql/providers/dq/api/grpc/api.proto b/ydb/library/yql/providers/dq/api/grpc/api.proto index ef620dca4f4..171c27a95a9 100644 --- a/ydb/library/yql/providers/dq/api/grpc/api.proto +++ b/ydb/library/yql/providers/dq/api/grpc/api.proto @@ -18,7 +18,7 @@ service DqService { rpc JobStop (JobStopRequest) returns (JobStopResponse); rpc GetMaster (GetMasterRequest) returns (GetMasterResponse); rpc ConfigureFailureInjector (ConfigureFailureInjectorRequest) returns (ConfigureFailureInjectorResponse); - rpc IsReady (IsReadyRequest) returns (IsReadyResponse); + rpc IsReady (IsReadyRequest) returns (IsReadyResponse); rpc Routes (RoutesRequest) returns (RoutesResponse); rpc Benchmark (BenchmarkRequest) returns (BenchmarkResponse); } diff --git a/ydb/library/yql/providers/dq/api/protos/dqs.proto b/ydb/library/yql/providers/dq/api/protos/dqs.proto index 09aecfcf923..bd0162d3b4a 100644 --- a/ydb/library/yql/providers/dq/api/protos/dqs.proto +++ b/ydb/library/yql/providers/dq/api/protos/dqs.proto @@ -118,15 +118,15 @@ message TEvQueryStatusResponse { Yql.DqsProto.QueryStatusResponse Response = 1; } -message TEvIsReady { - Yql.DqsProto.IsReadyRequest Request = 1; - bool IsForwarded = 2; -} - -message TEvIsReadyResponse { - bool IsReady = 1; -} - +message TEvIsReady { + Yql.DqsProto.IsReadyRequest Request = 1; + bool IsForwarded = 2; +} + +message TEvIsReadyResponse { + bool IsReady = 1; +} + message TEvRoutesRequest { } diff --git a/ydb/library/yql/providers/dq/api/protos/service.proto b/ydb/library/yql/providers/dq/api/protos/service.proto index b91351e5d12..d8de79cbf6d 100644 --- a/ydb/library/yql/providers/dq/api/protos/service.proto +++ b/ydb/library/yql/providers/dq/api/protos/service.proto @@ -318,14 +318,14 @@ message ConfigureFailureInjectorRequest { message ConfigureFailureInjectorResponse { bool success = 1; } - -message IsReadyRequest { - repeated TFile Files = 1; -} - -message IsReadyResponse { - bool IsReady = 1; -} + +message IsReadyRequest { + repeated TFile Files = 1; +} + +message IsReadyResponse { + bool IsReady = 1; +} message RoutesRequest { uint32 NodeId = 1; diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp b/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp index f9ba89a33d9..1e1561815d8 100644 --- a/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp +++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.cpp @@ -15,7 +15,7 @@ TDqConfiguration::TDqConfiguration() { REGISTER_SETTING(*this, MaxRetries); REGISTER_SETTING(*this, MaxNetworkRetries); REGISTER_SETTING(*this, RetryBackoffMs); - REGISTER_SETTING(*this, CollectCoreDumps); + REGISTER_SETTING(*this, CollectCoreDumps); REGISTER_SETTING(*this, FallbackPolicy); REGISTER_SETTING(*this, PullRequestTimeoutMs); REGISTER_SETTING(*this, PingTimeoutMs); diff --git a/ydb/library/yql/providers/dq/common/yql_dq_settings.h b/ydb/library/yql/providers/dq/common/yql_dq_settings.h index 3c37e4d48fd..d65e58873dc 100644 --- a/ydb/library/yql/providers/dq/common/yql_dq_settings.h +++ b/ydb/library/yql/providers/dq/common/yql_dq_settings.h @@ -41,7 +41,7 @@ struct TDqSettings { NCommon::TConfSetting<int, false> MaxRetries; NCommon::TConfSetting<int, false> MaxNetworkRetries; NCommon::TConfSetting<ui64, false> RetryBackoffMs; - NCommon::TConfSetting<bool, false> CollectCoreDumps; + NCommon::TConfSetting<bool, false> CollectCoreDumps; NCommon::TConfSetting<TString, false> FallbackPolicy; NCommon::TConfSetting<ui64, false> PullRequestTimeoutMs; NCommon::TConfSetting<ui64, false> PingTimeoutMs; diff --git a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp index 8759f442219..802eba7b2f6 100644 --- a/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp +++ b/ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp @@ -335,39 +335,39 @@ private: const auto& name = callableType->GetNameStr(); if (name == TStringBuf("FolderPath")) { - const TString folderName(AS_VALUE(TDataLiteral, callable.GetInput(0))->AsValue().AsStringRef()); - auto blocks = TUserDataStorage::FindUserDataFolder(files, folderName); - MKQL_ENSURE(blocks, "Folder not found: " << folderName); - for ( const auto& b : *blocks) { - auto block = b.second; - auto filePath = block->FrozenFile->GetPath().GetPath(); - auto fullFileName = localRun ? filePath : TUserDataStorage::MakeRelativeName(b.first.Alias()); - YQL_LOG(DEBUG) << "Path resolve " << filePath << "|"<< fullFileName; - // validate - switch (block->Type) { - case EUserDataType::URL: - case EUserDataType::PATH: - case EUserDataType::RAW_INLINE_DATA: { - break; - } - default: - YQL_ENSURE(false, "Unknown block type " << block->Type); - } - // filePath, fileName, md5 - auto f = IDqGateway::TFileResource(); - f.SetLocalPath(filePath); - f.SetName(fullFileName); - f.SetObjectId(block->FrozenFile->GetMd5()); + const TString folderName(AS_VALUE(TDataLiteral, callable.GetInput(0))->AsValue().AsStringRef()); + auto blocks = TUserDataStorage::FindUserDataFolder(files, folderName); + MKQL_ENSURE(blocks, "Folder not found: " << folderName); + for ( const auto& b : *blocks) { + auto block = b.second; + auto filePath = block->FrozenFile->GetPath().GetPath(); + auto fullFileName = localRun ? filePath : TUserDataStorage::MakeRelativeName(b.first.Alias()); + YQL_LOG(DEBUG) << "Path resolve " << filePath << "|"<< fullFileName; + // validate + switch (block->Type) { + case EUserDataType::URL: + case EUserDataType::PATH: + case EUserDataType::RAW_INLINE_DATA: { + break; + } + default: + YQL_ENSURE(false, "Unknown block type " << block->Type); + } + // filePath, fileName, md5 + auto f = IDqGateway::TFileResource(); + f.SetLocalPath(filePath); + f.SetName(fullFileName); + f.SetObjectId(block->FrozenFile->GetMd5()); f.SetSize(block->FrozenFile->GetSize()); - f.SetObjectType(IDqGateway::TFileResource::EUSER_FILE); + f.SetObjectType(IDqGateway::TFileResource::EUSER_FILE); uploadList->emplace(f); - } - const TProgramBuilder pgmBuilder(typeEnv, *State->FunctionRegistry); - auto result = pgmBuilder.NewDataLiteral<NUdf::EDataSlot::String>(folderName); - result.Freeze(); - if (result.GetNode() != node) { - callable.SetResult(result, typeEnv); - } + } + const TProgramBuilder pgmBuilder(typeEnv, *State->FunctionRegistry); + auto result = pgmBuilder.NewDataLiteral<NUdf::EDataSlot::String>(folderName); + result.Freeze(); + if (result.GetNode() != node) { + callable.SetResult(result, typeEnv); + } } else if (name == TStringBuf("FileContent") || name == TStringBuf("FilePath")) { const TString fileName(AS_VALUE(TDataLiteral, callable.GetInput(0))->AsValue().AsStringRef()); @@ -375,7 +375,7 @@ private: MKQL_ENSURE(block, "File not found: " << fileName); auto filePath = block->FrozenFile->GetPath().GetPath(); - auto fullFileName = localRun ? filePath : fileName; + auto fullFileName = localRun ? filePath : fileName; const TProgramBuilder pgmBuilder(typeEnv, *State->FunctionRegistry); TRuntimeNode result; diff --git a/ydb/library/yql/providers/dq/provider/ya.make b/ydb/library/yql/providers/dq/provider/ya.make index bdb3547266c..28d814d8bdb 100644 --- a/ydb/library/yql/providers/dq/provider/ya.make +++ b/ydb/library/yql/providers/dq/provider/ya.make @@ -3,8 +3,8 @@ LIBRARY() OWNER(g:yql) SRCS( - yql_dq_control.cpp - yql_dq_control.h + yql_dq_control.cpp + yql_dq_control.h yql_dq_datasink_type_ann.cpp yql_dq_datasink_type_ann.h yql_dq_datasource_type_ann.cpp diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_control.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_control.cpp index 1a0053350a6..65d18a3d392 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_control.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_control.cpp @@ -1,4 +1,4 @@ -#include "yql_dq_control.h" +#include "yql_dq_control.h" #include <ydb/library/yql/providers/dq/api/grpc/api.grpc.pb.h> #include <ydb/library/yql/providers/dq/config/config.pb.h> @@ -9,134 +9,134 @@ #include <library/cpp/grpc/client/grpc_client_low.h> -#include <library/cpp/svnversion/svnversion.h> - +#include <library/cpp/svnversion/svnversion.h> + #include <util/memory/blob.h> #include <util/string/builder.h> #include <util/system/file.h> namespace NYql { - -using TFileResource = Yql::DqsProto::TFile; - + +using TFileResource = Yql::DqsProto::TFile; + const TString DqStrippedSuffied = ".s"; -class TDqControl : public IDqControl { - +class TDqControl : public IDqControl { + public: - TDqControl(const NGrpc::TGRpcClientConfig &grpcConf, int threads, const TVector<TFileResource> &files) - : GrpcClient(threads) - , Service(GrpcClient.CreateGRpcServiceConnection<Yql::DqsProto::DqService>(grpcConf)) - , Files(files) + TDqControl(const NGrpc::TGRpcClientConfig &grpcConf, int threads, const TVector<TFileResource> &files) + : GrpcClient(threads) + , Service(GrpcClient.CreateGRpcServiceConnection<Yql::DqsProto::DqService>(grpcConf)) + , Files(files) { } - // after call, forking process in not allowed - bool IsReady(const TMap<TString, TString>& additinalFiles) override { - Yql::DqsProto::IsReadyRequest request; - for (const auto& file : Files) { - *request.AddFiles() = file; + // after call, forking process in not allowed + bool IsReady(const TMap<TString, TString>& additinalFiles) override { + Yql::DqsProto::IsReadyRequest request; + for (const auto& file : Files) { + *request.AddFiles() = file; } for (const auto& [path, objectId] : additinalFiles){ - TFileResource r; - r.SetLocalPath(path); + TFileResource r; + r.SetLocalPath(path); r.SetObjectType(Yql::DqsProto::TFile::EUDF_FILE); r.SetObjectId(objectId); r.SetSize(TFile(path, OpenExisting | RdOnly).GetLength()); - *request.AddFiles() = r; - } - - auto promise = NThreading::NewPromise<bool>(); - auto callback = [promise](NGrpc::TGrpcStatus&& status, Yql::DqsProto::IsReadyResponse&& resp) mutable { - Y_UNUSED(resp); - - promise.SetValue(status.Ok() && resp.GetIsReady()); + *request.AddFiles() = r; + } + + auto promise = NThreading::NewPromise<bool>(); + auto callback = [promise](NGrpc::TGrpcStatus&& status, Yql::DqsProto::IsReadyResponse&& resp) mutable { + Y_UNUSED(resp); + + promise.SetValue(status.Ok() && resp.GetIsReady()); }; NGrpc::TCallMeta meta; meta.Timeout = TDuration::Seconds(1); - Service->DoRequest<Yql::DqsProto::IsReadyRequest, Yql::DqsProto::IsReadyResponse>( + Service->DoRequest<Yql::DqsProto::IsReadyRequest, Yql::DqsProto::IsReadyResponse>( request, callback, &Yql::DqsProto::DqService::Stub::AsyncIsReady, meta); - - try { - return promise.GetFuture().GetValueSync(); - } catch (...) { - YQL_LOG(INFO) << "DqControl IsReady Exception " << CurrentExceptionMessage(); - return false; - } + + try { + return promise.GetFuture().GetValueSync(); + } catch (...) { + YQL_LOG(INFO) << "DqControl IsReady Exception " << CurrentExceptionMessage(); + return false; + } } private: - NGrpc::TGRpcClientLow GrpcClient; - std::unique_ptr<NGrpc::TServiceConnection<Yql::DqsProto::DqService>> Service; - const TVector<TFileResource> &Files; -}; - - -class TDqControlFactory : public IDqControlFactory { - -public: - TDqControlFactory( - const TString &host, - int port, - int threads, - const TMap<TString, TString>& udfs, - const TString& vanillaLitePath, + NGrpc::TGRpcClientLow GrpcClient; + std::unique_ptr<NGrpc::TServiceConnection<Yql::DqsProto::DqService>> Service; + const TVector<TFileResource> &Files; +}; + + +class TDqControlFactory : public IDqControlFactory { + +public: + TDqControlFactory( + const TString &host, + int port, + int threads, + const TMap<TString, TString>& udfs, + const TString& vanillaLitePath, const TString& vanillaLiteMd5, const THashSet<TString> &filter, bool enableStrip, const TFileStoragePtr& fileStorage - ) - : Threads(threads) - , GrpcConf(TStringBuilder() << host << ":" << port) - , IndexedUdfFilter(filter) + ) + : Threads(threads) + , GrpcConf(TStringBuilder() << host << ":" << port) + , IndexedUdfFilter(filter) , EnableStrip(enableStrip) , FileStorage(fileStorage) - { - if (!vanillaLitePath.empty()) { + { + if (!vanillaLitePath.empty()) { TString path = vanillaLitePath; TString objectId = GetProgramCommitId(); TString newPath, newObjectId; std::tie(newPath, newObjectId) = GetPathAndObjectId(path, objectId, vanillaLiteMd5); - TFileResource vanillaLite; + TFileResource vanillaLite; vanillaLite.SetLocalPath(newPath); - vanillaLite.SetName(vanillaLitePath.substr(vanillaLitePath.rfind('/') + 1)); + vanillaLite.SetName(vanillaLitePath.substr(vanillaLitePath.rfind('/') + 1)); vanillaLite.SetObjectType(Yql::DqsProto::TFile::EEXE_FILE); vanillaLite.SetObjectId(newObjectId); vanillaLite.SetSize(TFile(newPath, OpenExisting | RdOnly).GetLength()); - Files.push_back(vanillaLite); - } - + Files.push_back(vanillaLite); + } + for (const auto& [path, objectId] : udfs){ YQL_LOG(DEBUG) << "DQ control, adding file: " << path << " with objectId " << objectId; TString newPath, newObjectId; std::tie(newPath, newObjectId) = GetPathAndObjectId(path, objectId, objectId); YQL_LOG(DEBUG) << "DQ control, rewrite path/objectId: " << newPath << ", " << newObjectId; - TFileResource r; + TFileResource r; r.SetLocalPath(newPath); r.SetObjectType(Yql::DqsProto::TFile::EUDF_FILE); r.SetObjectId(newObjectId); r.SetSize(TFile(newPath, OpenExisting | RdOnly).GetLength()); - Files.push_back(r); - } - } - - IDqControlPtr GetControl() override { - return new TDqControl(GrpcConf, Threads, Files); - } - - const THashSet<TString>& GetIndexedUdfFilter() override { - return IndexedUdfFilter; - } - + Files.push_back(r); + } + } + + IDqControlPtr GetControl() override { + return new TDqControl(GrpcConf, Threads, Files); + } + + const THashSet<TString>& GetIndexedUdfFilter() override { + return IndexedUdfFilter; + } + bool StripEnabled() const override { return EnableStrip; } -private: +private: std::tuple<TString, TString> GetPathAndObjectId(const TString& path, const TString& objectId, const TString& md5 = {}) { if (!EnableStrip) { return std::make_tuple(path, objectId); @@ -150,17 +150,17 @@ private: return std::make_tuple(fileLink->GetPath(), objectId + DqStrippedSuffied); } - int Threads; - TVector<TFileResource> Files; + int Threads; + TVector<TFileResource> Files; NGrpc::TGRpcClientConfig GrpcConf; - THashSet<TString> IndexedUdfFilter; + THashSet<TString> IndexedUdfFilter; THashMap<TString, TFileLinkPtr> FileLinks; bool EnableStrip; const TFileStoragePtr FileStorage; }; IDqControlFactoryPtr CreateDqControlFactory(const NProto::TDqConfig& config, const TMap<TString, TString>& udfs, const TFileStoragePtr& fileStorage) { - THashSet<TString> indexedUdfFilter(config.GetControl().GetIndexedUdfsToWarmup().begin(), config.GetControl().GetIndexedUdfsToWarmup().end()); + THashSet<TString> indexedUdfFilter(config.GetControl().GetIndexedUdfsToWarmup().begin(), config.GetControl().GetIndexedUdfsToWarmup().end()); return new TDqControlFactory( "localhost", config.GetPort(), diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_control.h b/ydb/library/yql/providers/dq/provider/yql_dq_control.h index d6b58eacd75..72fe7aa3d13 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_control.h +++ b/ydb/library/yql/providers/dq/provider/yql_dq_control.h @@ -1,7 +1,7 @@ #pragma once #include <util/generic/ptr.h> -#include <util/generic/map.h> +#include <util/generic/map.h> #include <ydb/library/yql/core/file_storage/file_storage.h> @@ -11,28 +11,28 @@ namespace NProto { class TDqConfig; } -class IDqControl : public TThrRefBase { +class IDqControl : public TThrRefBase { public: - using TPtr = TIntrusivePtr<IDqControl>; + using TPtr = TIntrusivePtr<IDqControl>; - virtual bool IsReady(const TMap<TString, TString>& udfs = TMap<TString, TString>())= 0; -}; + virtual bool IsReady(const TMap<TString, TString>& udfs = TMap<TString, TString>())= 0; +}; -using IDqControlPtr = TIntrusivePtr<IDqControl>; +using IDqControlPtr = TIntrusivePtr<IDqControl>; -class IDqControlFactory : public TThrRefBase { -public: - using TPtr = TIntrusivePtr<IDqControlFactory>; +class IDqControlFactory : public TThrRefBase { +public: + using TPtr = TIntrusivePtr<IDqControlFactory>; - virtual IDqControlPtr GetControl() = 0; - virtual const THashSet<TString>& GetIndexedUdfFilter() = 0; + virtual IDqControlPtr GetControl() = 0; + virtual const THashSet<TString>& GetIndexedUdfFilter() = 0; virtual bool StripEnabled() const = 0; -}; - -using IDqControlFactoryPtr = TIntrusivePtr<IDqControlFactory>; - +}; + +using IDqControlFactoryPtr = TIntrusivePtr<IDqControlFactory>; + IDqControlFactoryPtr CreateDqControlFactory(const NProto::TDqConfig& config, const TMap<TString, TString>& udfs, const TFileStoragePtr& fileStorage); - + extern const TString DqStrippedSuffied; } // namespace NYql diff --git a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp index 9b41920b177..335e37cd806 100644 --- a/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp +++ b/ydb/library/yql/providers/dq/runtime/task_command_executor.cpp @@ -29,7 +29,7 @@ #include <util/stream/file.h> #include <util/stream/pipe.h> -#ifdef _unix_ +#ifdef _unix_ #include <sys/resource.h> #endif @@ -680,15 +680,15 @@ public: } } - void DontCollectDumps() { -#ifdef _unix_ - rlimit limit = {0, 0}; - if (setrlimit(RLIMIT_CORE, &limit) != 0) { - Cerr << "Failed to set rlimit " << Endl; - } -#endif - } - + void DontCollectDumps() { +#ifdef _unix_ + rlimit limit = {0, 0}; + if (setrlimit(RLIMIT_CORE, &limit) != 0) { + Cerr << "Failed to set rlimit " << Endl; + } +#endif + } + template<typename T> void Prepare(const NDqProto::TDqTask& task, const T& taskMeta, TPipedOutput& output) { NYql::NDqProto::TPrepareResponse result; diff --git a/ydb/library/yql/providers/dq/service/grpc_service.cpp b/ydb/library/yql/providers/dq/service/grpc_service.cpp index 65a9e500d9d..0d95c5a73aa 100644 --- a/ydb/library/yql/providers/dq/service/grpc_service.cpp +++ b/ydb/library/yql/providers/dq/service/grpc_service.cpp @@ -720,29 +720,29 @@ namespace NYql::NDqs { ActorSystem.Send(new IEventHandle(MakeWorkerManagerActorID(ActorSystem.NodeId), callbackId, requestEvent.Release())); }); - - ADD_REQUEST(IsReady, IsReadyRequest, IsReadyResponse, { - auto* request = dynamic_cast<const Yql::DqsProto::IsReadyRequest*>(ctx->GetRequest()); - Y_VERIFY(!!request); - - auto ev = MakeHolder<TEvIsReady>(*request); - - auto callback = MakeHolder<TRichActorFutureCallback<TEvIsReadyResponse>>( - [ctx] (TAutoPtr<TEventHandle<TEvIsReadyResponse>>& event) mutable { - Yql::DqsProto::IsReadyResponse result; - result.SetIsReady(event->Get()->Record.GetIsReady()); - ctx->Reply(&result, Ydb::StatusIds::SUCCESS); - }, - [ctx] () mutable { - YQL_LOG(DEBUG) << "IsReadyForRevision failed"; - ctx->ReplyError(grpc::UNAVAILABLE, "Error"); - }, - TDuration::MilliSeconds(2000)); - - TActorId callbackId = ActorSystem.Register(callback.Release()); - - ActorSystem.Send(new IEventHandle(MakeWorkerManagerActorID(ActorSystem.NodeId), callbackId, ev.Release())); - }); + + ADD_REQUEST(IsReady, IsReadyRequest, IsReadyResponse, { + auto* request = dynamic_cast<const Yql::DqsProto::IsReadyRequest*>(ctx->GetRequest()); + Y_VERIFY(!!request); + + auto ev = MakeHolder<TEvIsReady>(*request); + + auto callback = MakeHolder<TRichActorFutureCallback<TEvIsReadyResponse>>( + [ctx] (TAutoPtr<TEventHandle<TEvIsReadyResponse>>& event) mutable { + Yql::DqsProto::IsReadyResponse result; + result.SetIsReady(event->Get()->Record.GetIsReady()); + ctx->Reply(&result, Ydb::StatusIds::SUCCESS); + }, + [ctx] () mutable { + YQL_LOG(DEBUG) << "IsReadyForRevision failed"; + ctx->ReplyError(grpc::UNAVAILABLE, "Error"); + }, + TDuration::MilliSeconds(2000)); + + TActorId callbackId = ActorSystem.Register(callback.Release()); + + ActorSystem.Send(new IEventHandle(MakeWorkerManagerActorID(ActorSystem.NodeId), callbackId, ev.Release())); + }); ADD_REQUEST(Routes, RoutesRequest, RoutesResponse, { auto* request = dynamic_cast<const Yql::DqsProto::RoutesRequest*>(ctx->GetRequest()); diff --git a/ydb/library/yql/providers/dq/service/service_node.cpp b/ydb/library/yql/providers/dq/service/service_node.cpp index cda24ec6cf7..f4205824b6d 100644 --- a/ydb/library/yql/providers/dq/service/service_node.cpp +++ b/ydb/library/yql/providers/dq/service/service_node.cpp @@ -8,7 +8,7 @@ #include <library/cpp/grpc/server/actors/logger.h> -#include <utility> +#include <utility> namespace NYql { using namespace NActors; @@ -82,7 +82,7 @@ namespace NYql { IMetricsRegistryPtr metricsRegistry) : Config(config) , Threads(threads) - , MetricsRegistry(std::move(metricsRegistry)) + , MetricsRegistry(std::move(metricsRegistry)) { std::tie(Setup, LogSettings) = BuildActorSetup( Config.NodeId, @@ -158,9 +158,9 @@ namespace NYql { (static_cast<TDqsGrpcService*>(Service.Get()))->Stop().Wait(timeout); Server->Stop(); - for (auto id : ActorIds) { - ActorSystem->Send(id, new NActors::TEvents::TEvPoison); - } + for (auto id : ActorIds) { + ActorSystem->Send(id, new NActors::TEvents::TEvPoison); + } ActorSystem->Stop(); } } // namespace NYql diff --git a/ydb/library/yql/providers/dq/service/service_node.h b/ydb/library/yql/providers/dq/service/service_node.h index 32f904901dc..5b0606a3275 100644 --- a/ydb/library/yql/providers/dq/service/service_node.h +++ b/ydb/library/yql/providers/dq/service/service_node.h @@ -37,7 +37,7 @@ namespace NYql { private: NDqs::TServiceNodeConfig Config; ui32 Threads; - IMetricsRegistryPtr MetricsRegistry; + IMetricsRegistryPtr MetricsRegistry; THolder<NActors::TActorSystemSetup> Setup; TIntrusivePtr<NActors::NLog::TSettings> LogSettings; THolder<NActors::TActorSystem> ActorSystem; diff --git a/ydb/library/yql/providers/dq/worker_manager/interface/events.cpp b/ydb/library/yql/providers/dq/worker_manager/interface/events.cpp index 9044dc47e30..0f0545e7340 100644 --- a/ydb/library/yql/providers/dq/worker_manager/interface/events.cpp +++ b/ydb/library/yql/providers/dq/worker_manager/interface/events.cpp @@ -100,11 +100,11 @@ namespace NYql::NDqs { Record.SetIsForwarded(false); } - TEvIsReady::TEvIsReady(const Yql::DqsProto::IsReadyRequest &request) { - *Record.MutableRequest() = request; - Record.SetIsForwarded(false); - } - + TEvIsReady::TEvIsReady(const Yql::DqsProto::IsReadyRequest &request) { + *Record.MutableRequest() = request; + Record.SetIsForwarded(false); + } + TEvConfigureFailureInjectorRequest::TEvConfigureFailureInjectorRequest(const Yql::DqsProto::ConfigureFailureInjectorRequest& request) { *Record.MutableRequest() = request; } diff --git a/ydb/library/yql/providers/dq/worker_manager/interface/events.h b/ydb/library/yql/providers/dq/worker_manager/interface/events.h index a2d0c9f5409..26394d40382 100644 --- a/ydb/library/yql/providers/dq/worker_manager/interface/events.h +++ b/ydb/library/yql/providers/dq/worker_manager/interface/events.h @@ -102,19 +102,19 @@ using TDqResManEvents = NDq::TBaseDqResManEvents<NActors::TEvents::EEventSpace:: TEvQueryStatusResponse() = default; }; - struct TEvIsReady - : NActors::TEventPB<TEvIsReady, NYql::NDqProto::TEvIsReady, TDqResManEvents::ES_IS_READY> { - - TEvIsReady() = default; - TEvIsReady(const Yql::DqsProto::IsReadyRequest& request); - }; - - struct TEvIsReadyResponse - : NActors::TEventPB<TEvIsReadyResponse, NYql::NDqProto::TEvIsReadyResponse, TDqResManEvents::ES_IS_READY_RESPONSE> { - - TEvIsReadyResponse() = default; - }; - + struct TEvIsReady + : NActors::TEventPB<TEvIsReady, NYql::NDqProto::TEvIsReady, TDqResManEvents::ES_IS_READY> { + + TEvIsReady() = default; + TEvIsReady(const Yql::DqsProto::IsReadyRequest& request); + }; + + struct TEvIsReadyResponse + : NActors::TEventPB<TEvIsReadyResponse, NYql::NDqProto::TEvIsReadyResponse, TDqResManEvents::ES_IS_READY_RESPONSE> { + + TEvIsReadyResponse() = default; + }; + struct TEvRoutesRequest : NActors::TEventPB<TEvRoutesRequest, NYql::NDqProto::TEvRoutesRequest, TDqResManEvents::ES_ROUTES> { diff --git a/ydb/library/yql/providers/dq/worker_manager/interface/worker_info.cpp b/ydb/library/yql/providers/dq/worker_manager/interface/worker_info.cpp index 634b25fb075..864730bdc04 100644 --- a/ydb/library/yql/providers/dq/worker_manager/interface/worker_info.cpp +++ b/ydb/library/yql/providers/dq/worker_manager/interface/worker_info.cpp @@ -236,12 +236,12 @@ namespace NYql::NDqs { } } - bool TWorkerInfo::AddToDownloadList(const TString& key, const TFileResource& value) { + bool TWorkerInfo::AddToDownloadList(const TString& key, const TFileResource& value) { if (!Resources.contains(key) && DownloadList.insert({key, value}).second) { *CurrentDownloadsSum += 1; - return true; + return true; } - return false; + return false; } const THashMap<TString, TWorkerInfo::TFileResource>& TWorkerInfo::GetDownloadList() { diff --git a/ydb/library/yql/providers/dq/worker_manager/interface/worker_info.h b/ydb/library/yql/providers/dq/worker_manager/interface/worker_info.h index 5281a844f4c..e129b78b9f2 100644 --- a/ydb/library/yql/providers/dq/worker_manager/interface/worker_info.h +++ b/ydb/library/yql/providers/dq/worker_manager/interface/worker_info.h @@ -150,7 +150,7 @@ struct TWorkerInfo: public TThrRefBase { void AddToDownloadList(const THashMap<TString, TFileResource>& downloadList); - bool AddToDownloadList(const TString& key, const TFileResource& value); + bool AddToDownloadList(const TString& key, const TFileResource& value); const THashMap<TString, TFileResource>& GetDownloadList(); diff --git a/ydb/library/yql/sql/v1/sql.cpp b/ydb/library/yql/sql/v1/sql.cpp index ab5a8647885..6538ea98434 100644 --- a/ydb/library/yql/sql/v1/sql.cpp +++ b/ydb/library/yql/sql/v1/sql.cpp @@ -9618,30 +9618,30 @@ TNodePtr TSqlQuery::PragmaStatement(const TRule_pragma_stmt& stmt, bool& success success = true; return {}; } else if (normalizedPragma == "strict") { - if (values.size() == 0U) { - Ctx.PragmaYsonStrict = true; - success = true; + if (values.size() == 0U) { + Ctx.PragmaYsonStrict = true; + success = true; } else if (values.size() == 1U && values.front().GetLiteral() && TryFromString(*values.front().GetLiteral(), Ctx.PragmaYsonStrict)) { - success = true; - } else { - Error() << "Expected 'true', 'false' or no parameter for: " << pragma; - Ctx.IncrementMonCounter("sql_errors", "BadPragmaValue"); - } + success = true; + } else { + Error() << "Expected 'true', 'false' or no parameter for: " << pragma; + Ctx.IncrementMonCounter("sql_errors", "BadPragmaValue"); + } return {}; } else if (normalizedPragma == "disablestrict") { - if (values.size() == 0U) { - Ctx.PragmaYsonStrict = false; - success = true; - return {}; - } - bool pragmaYsonDisableStrict; + if (values.size() == 0U) { + Ctx.PragmaYsonStrict = false; + success = true; + return {}; + } + bool pragmaYsonDisableStrict; if (values.size() == 1U && values.front().GetLiteral() && TryFromString(*values.front().GetLiteral(), pragmaYsonDisableStrict)) { - Ctx.PragmaYsonStrict = !pragmaYsonDisableStrict; - success = true; - } else { - Error() << "Expected 'true', 'false' or no parameter for: " << pragma; - Ctx.IncrementMonCounter("sql_errors", "BadPragmaValue"); - } + Ctx.PragmaYsonStrict = !pragmaYsonDisableStrict; + success = true; + } else { + Error() << "Expected 'true', 'false' or no parameter for: " << pragma; + Ctx.IncrementMonCounter("sql_errors", "BadPragmaValue"); + } return {}; } else { Error() << "Unknown pragma: '" << pragma << "'"; diff --git a/ydb/library/yql/sql/v1/sql_ut.cpp b/ydb/library/yql/sql/v1/sql_ut.cpp index 85b6e0dfcd3..f54c35cb16e 100644 --- a/ydb/library/yql/sql/v1/sql_ut.cpp +++ b/ydb/library/yql/sql/v1/sql_ut.cpp @@ -1280,16 +1280,16 @@ Y_UNIT_TEST_SUITE(SqlParsingOnly) { "REDUCE Input ON key using all $func(UUU::VVV(TableRow()));"; UNIT_ASSERT(SqlToYql(req).IsOk()); } - - Y_UNIT_TEST(YsonDisableStrict) { - UNIT_ASSERT(SqlToYql("pragma yson.DisableStrict = \"false\";").IsOk()); - UNIT_ASSERT(SqlToYql("pragma yson.DisableStrict;").IsOk()); - } - - Y_UNIT_TEST(YsonStrict) { - UNIT_ASSERT(SqlToYql("pragma yson.Strict = \"false\";").IsOk()); - UNIT_ASSERT(SqlToYql("pragma yson.Strict;").IsOk()); - } + + Y_UNIT_TEST(YsonDisableStrict) { + UNIT_ASSERT(SqlToYql("pragma yson.DisableStrict = \"false\";").IsOk()); + UNIT_ASSERT(SqlToYql("pragma yson.DisableStrict;").IsOk()); + } + + Y_UNIT_TEST(YsonStrict) { + UNIT_ASSERT(SqlToYql("pragma yson.Strict = \"false\";").IsOk()); + UNIT_ASSERT(SqlToYql("pragma yson.Strict;").IsOk()); + } Y_UNIT_TEST(JoinByTuple) { auto req = "use plato;\n" @@ -2782,12 +2782,12 @@ select FormatType($f()); UNIT_ASSERT(!res.Root); UNIT_ASSERT_NO_DIFF(Err2Str(res), "<main>:6:1: Error: DISCARD within UNION ALL is only allowed before first subquery\n"); } - - Y_UNIT_TEST(YsonStrictInvalidPragma) { - auto res = SqlToYql("pragma yson.Strict = \"wrong\";"); - UNIT_ASSERT(!res.Root); + + Y_UNIT_TEST(YsonStrictInvalidPragma) { + auto res = SqlToYql("pragma yson.Strict = \"wrong\";"); + UNIT_ASSERT(!res.Root); UNIT_ASSERT_NO_DIFF(Err2Str(res), "<main>:1:22: Error: Expected 'true', 'false' or no parameter for: Strict\n"); - } + } Y_UNIT_TEST(WarnTableNameInSomeContexts) { UNIT_ASSERT(SqlToYql("use plato; select TableName() from Input;").IsOk()); |