diff options
author | robot-piglet <robot-piglet@yandex-team.com> | 2024-01-25 23:49:45 +0300 |
---|---|---|
committer | robot-piglet <robot-piglet@yandex-team.com> | 2024-01-25 23:59:53 +0300 |
commit | e69ee2703dc7857bd4916a1855103268e6b476a1 (patch) | |
tree | 309713807dbaf66cdc017dc897e785f7a6d8b4e0 | |
parent | a87a6225bc70a7ec058857e1a5dc6c423610d249 (diff) | |
download | ydb-e69ee2703dc7857bd4916a1855103268e6b476a1.tar.gz |
Intermediate changes
-rw-r--r-- | yt/yt/client/driver/command.cpp | 1 | ||||
-rw-r--r-- | yt/yt/client/driver/driver.cpp | 12 | ||||
-rw-r--r-- | yt/yt/client/driver/flow_commands.cpp | 286 | ||||
-rw-r--r-- | yt/yt/client/driver/flow_commands.h | 130 | ||||
-rw-r--r-- | yt/yt/flow/lib/client/public.h | 4 |
5 files changed, 408 insertions, 25 deletions
diff --git a/yt/yt/client/driver/command.cpp b/yt/yt/client/driver/command.cpp index 8a94593e8c..cd9cf53ad7 100644 --- a/yt/yt/client/driver/command.cpp +++ b/yt/yt/client/driver/command.cpp @@ -16,7 +16,6 @@ using namespace NConcurrency; //////////////////////////////////////////////////////////////////////////////// - void ProduceOutput( ICommandContextPtr context, const std::function<void(IYsonConsumer*)>& producer) diff --git a/yt/yt/client/driver/driver.cpp b/yt/yt/client/driver/driver.cpp index a9a8bd685e..b68aff58e7 100644 --- a/yt/yt/client/driver/driver.cpp +++ b/yt/yt/client/driver/driver.cpp @@ -362,9 +362,15 @@ public: REGISTER_ALL(TGetBundleConfigCommand, "get_bundle_config", Null, Structured, false, false); REGISTER_ALL(TSetBundleConfigCommand, "set_bundle_config", Structured, Null, false, false); - REGISTER_ALL(TStartPipelineCommand, "start_pipeline", Null, Structured, false, false); - REGISTER_ALL(TStopPipelineCommand, "stop_pipeline", Null, Structured, false, false); - REGISTER_ALL(TPausePipelineCommand, "pause_pipeline", Null, Structured, false, false); + REGISTER (TGetPipelineSpecCommand, "get_pipeline_spec", Null, Structured, true, false, ApiVersion4); + REGISTER (TSetPipelineSpecCommand, "set_pipeline_spec", Structured, Null, true, false, ApiVersion4); + REGISTER (TRemovePipelineDynamicSpecCommand, "remove_pipeline_spec", Null, Null, true, false, ApiVersion4); + REGISTER (TGetPipelineDynamicSpecCommand, "get_pipeline_dynamic_spec", Null, Structured, true, false, ApiVersion4); + REGISTER (TSetPipelineDynamicSpecCommand, "set_pipeline_dynamic_spec", Structured, Null, true, false, ApiVersion4); + REGISTER (TRemovePipelineDynamicSpecCommand, "remove_pipeline_dynamic_spec", Null, Null, true, false, ApiVersion4); + REGISTER (TStartPipelineCommand, "start_pipeline", Null, Structured, false, false, ApiVersion4); + REGISTER (TStopPipelineCommand, "stop_pipeline", Null, Structured, false, false, ApiVersion4); + REGISTER (TPausePipelineCommand, "pause_pipeline", Null, Structured, false, false, ApiVersion4); if (Config_->EnableInternalCommands) { REGISTER_ALL(TReadHunksCommand, "read_hunks", Null, Structured, false, true ); diff --git a/yt/yt/client/driver/flow_commands.cpp b/yt/yt/client/driver/flow_commands.cpp index c0543f4ad2..4b5d84bfd5 100644 --- a/yt/yt/client/driver/flow_commands.cpp +++ b/yt/yt/client/driver/flow_commands.cpp @@ -1,36 +1,286 @@ #include "flow_commands.h" +#include <yt/yt/core/ytree/fluent.h> +#include <yt/yt/core/ytree/ypath_client.h> + namespace NYT::NDriver { using namespace NConcurrency; +using namespace NYTree; +using namespace NYPath; +using namespace NYson; +using namespace NApi; //////////////////////////////////////////////////////////////////////////////// -void TStartPipelineCommand::Register(TRegistrar registrar) +namespace { + +void ExecuteGetPipelineSpecCommand( + const ICommandContextPtr& context, + const TYPath& specPath, + const auto& specGetterOptions, + auto specGetter) { - registrar.Parameter("pipeline_path", &TThis::PipelinePath); + auto client = context->GetClient(); + auto result = WaitFor(specGetter(client, specGetterOptions)) + .ValueOrThrow(); + + auto spec = SyncYPathGet(ConvertToNode(result.Spec), specPath); + + ProduceOutput(context, [&] (NYson::IYsonConsumer* consumer) { + BuildYsonFluently(consumer) + .BeginMap() + .Item("spec").Value(spec) + .Item("version").Value(result.Version) + .EndMap(); + }); } -void TStartPipelineCommand::DoExecute(ICommandContextPtr context) +void ExecuteSetPipelineSpecCommand( + const ICommandContextPtr& context, + const TYPath& specPath, + const auto& specGetterOptions, + auto specGetter, + const auto& specSetterOptions, + auto specSetter) { auto client = context->GetClient(); - WaitFor(client->StartPipeline(PipelinePath)) - .ThrowOnError(); + auto spec = context->ConsumeInputValue(); - ProduceEmptyOutput(context); + auto setResult = [&] { + if (specPath.empty()) { + return WaitFor(specSetter(client, spec, specSetterOptions)) + .ValueOrThrow(); + } else { + auto getResult = WaitFor(specGetter(client, specGetterOptions)) + .ValueOrThrow(); + + if (specSetterOptions.ExpectedVersion && getResult.Version != *specSetterOptions.ExpectedVersion) { + THROW_ERROR_EXCEPTION( + NFlow::EErrorCode::SpecVersionMismatch, + "Spec version mismatch: expected %v, got %v", + *specSetterOptions.ExpectedVersion, + getResult.Version); + } + + auto fullSpec = ConvertToNode(getResult.Spec); + SyncYPathSet(fullSpec, specPath, spec); + + auto adjustedOptions = specSetterOptions; + adjustedOptions.ExpectedVersion = getResult.Version; + return WaitFor(specSetter(client, ConvertToYsonString(fullSpec), adjustedOptions)) + .ValueOrThrow(); + } + }(); + + ProduceOutput(context, [&] (NYson::IYsonConsumer* consumer) { + BuildYsonFluently(consumer) + .BeginMap() + .Item("version").Value(setResult.Version) + .EndMap(); + }); +} + +void ExecuteRemovePipelineSpecCommand( + const ICommandContextPtr& context, + const TYPath& specPath, + const auto& specGetterOptions, + auto specGetter, + const auto& specSetterOptions, + auto specSetter) +{ + auto client = context->GetClient(); + auto spec = context->ConsumeInputValue(); + + auto getResult = WaitFor(specGetter(client, specGetterOptions)) + .ValueOrThrow(); + + if (specSetterOptions.ExpectedVersion && getResult.Version != *specSetterOptions.ExpectedVersion) { + THROW_ERROR_EXCEPTION( + NFlow::EErrorCode::SpecVersionMismatch, + "Spec version mismatch: expected %v, got %v", + *specSetterOptions.ExpectedVersion, + getResult.Version); + } + + auto fullSpec = ConvertToNode(getResult.Spec); + SyncYPathRemove(fullSpec, specPath); + + auto adjustedOptions = specSetterOptions; + adjustedOptions.ExpectedVersion = getResult.Version; + auto setResult = WaitFor(specSetter(client, ConvertToYsonString(fullSpec), adjustedOptions)) + .ValueOrThrow(); + + ProduceOutput(context, [&] (NYson::IYsonConsumer* consumer) { + BuildYsonFluently(consumer) + .BeginMap() + .Item("version").Value(setResult.Version) + .EndMap(); + }); } +} // namespace + //////////////////////////////////////////////////////////////////////////////// -void TStopPipelineCommand::Register(TRegistrar registrar) +void TPipelineCommandBase::Register(TRegistrar registrar) { registrar.Parameter("pipeline_path", &TThis::PipelinePath); } -void TStopPipelineCommand::DoExecute(ICommandContextPtr context) +//////////////////////////////////////////////////////////////////////////////// + +void TGetPipelineSpecCommand::Register(TRegistrar registrar) +{ + registrar.Parameter("spec_path", &TThis::SpecPath) + .Default(); +} + +void TGetPipelineSpecCommand::DoExecute(ICommandContextPtr context) +{ + ExecuteGetPipelineSpecCommand( + context, + SpecPath, + Options, + [&] (const auto& client, const auto& options) { return client->GetPipelineSpec(PipelinePath, options); }); +} + +//////////////////////////////////////////////////////////////////////////////// + +void TSetPipelineSpecCommand::Register(TRegistrar registrar) +{ + registrar.Parameter("spec_path", &TThis::SpecPath) + .Default(); + registrar.ParameterWithUniversalAccessor<bool>( + "force", + [] (TThis* command) -> auto& { + return command->Options.Force; + }) + .Optional(/*init*/ false); + registrar.ParameterWithUniversalAccessor<std::optional<NFlow::TVersion>>( + "expected_version", + [] (TThis* command) -> auto& { + return command->Options.ExpectedVersion; + }) + .Optional(/*init*/ false); +} + +void TSetPipelineSpecCommand::DoExecute(ICommandContextPtr context) +{ + ExecuteSetPipelineSpecCommand( + context, + SpecPath, + TGetPipelineSpecOptions(), + [&] (const auto& client, const auto& options) { return client->GetPipelineSpec(PipelinePath, options); }, + Options, + [&] (const auto& client, const auto& spec, const auto& options) { return client->SetPipelineSpec(PipelinePath, spec, options); }); +} + +//////////////////////////////////////////////////////////////////////////////// + +void TRemovePipelineSpecCommand::Register(TRegistrar registrar) +{ + registrar.Parameter("spec_path", &TThis::SpecPath); + registrar.ParameterWithUniversalAccessor<bool>( + "force", + [] (TThis* command) -> auto& { + return command->Options.Force; + }) + .Optional(/*init*/ false); + registrar.ParameterWithUniversalAccessor<std::optional<NFlow::TVersion>>( + "expected_version", + [] (TThis* command) -> auto& { + return command->Options.ExpectedVersion; + }) + .Optional(/*init*/ false); +} + +void TRemovePipelineSpecCommand::DoExecute(ICommandContextPtr context) +{ + ExecuteRemovePipelineSpecCommand( + context, + SpecPath, + TGetPipelineSpecOptions(), + [&] (const auto& client, const auto& options) { return client->GetPipelineSpec(PipelinePath, options); }, + Options, + [&] (const auto& client, const auto& spec, const auto& options) { return client->SetPipelineSpec(PipelinePath, spec, options); }); +} + +//////////////////////////////////////////////////////////////////////////////// + +void TGetPipelineDynamicSpecCommand::Register(TRegistrar registrar) +{ + registrar.Parameter("spec_path", &TThis::SpecPath) + .Default(); +} + +void TGetPipelineDynamicSpecCommand::DoExecute(ICommandContextPtr context) +{ + ExecuteGetPipelineSpecCommand( + context, + SpecPath, + Options, + [&] (const auto& client, const auto& options) { return client->GetPipelineDynamicSpec(PipelinePath, options); }); +} + +//////////////////////////////////////////////////////////////////////////////// + +void TSetPipelineDynamicSpecCommand::Register(TRegistrar registrar) +{ + registrar.Parameter("spec_path", &TThis::SpecPath) + .Default(); + registrar.ParameterWithUniversalAccessor<std::optional<NFlow::TVersion>>( + "expected_version", + [] (TThis* command) -> auto& { + return command->Options.ExpectedVersion; + }) + .Optional(/*init*/ false); +} + +void TSetPipelineDynamicSpecCommand::DoExecute(ICommandContextPtr context) +{ + ExecuteSetPipelineSpecCommand( + context, + SpecPath, + TGetPipelineDynamicSpecOptions(), + [&] (const auto& client, const auto& options) { return client->GetPipelineDynamicSpec(PipelinePath, options); }, + Options, + [&] (const auto& client, const auto& spec, const auto& options) { return client->SetPipelineDynamicSpec(PipelinePath, spec, options); }); +} + +//////////////////////////////////////////////////////////////////////////////// + +void TRemovePipelineDynamicSpecCommand::Register(TRegistrar registrar) +{ + registrar.Parameter("spec_path", &TThis::SpecPath); + registrar.ParameterWithUniversalAccessor<std::optional<NFlow::TVersion>>( + "expected_version", + [] (TThis* command) -> auto& { + return command->Options.ExpectedVersion; + }) + .Optional(/*init*/ false); +} + +void TRemovePipelineDynamicSpecCommand::DoExecute(ICommandContextPtr context) +{ + ExecuteRemovePipelineSpecCommand( + context, + SpecPath, + TGetPipelineDynamicSpecOptions(), + [&] (const auto& client, const auto& options) { return client->GetPipelineDynamicSpec(PipelinePath, options); }, + Options, + [&] (const auto& client, const auto& spec, const auto& options) { return client->SetPipelineDynamicSpec(PipelinePath, spec, options); }); +} + +//////////////////////////////////////////////////////////////////////////////// + +void TStartPipelineCommand::Register(TRegistrar /*registrar*/) +{ } + +void TStartPipelineCommand::DoExecute(ICommandContextPtr context) { auto client = context->GetClient(); - WaitFor(client->StopPipeline(PipelinePath)) + WaitFor(client->StartPipeline(PipelinePath, Options)) .ThrowOnError(); ProduceEmptyOutput(context); @@ -38,15 +288,27 @@ void TStopPipelineCommand::DoExecute(ICommandContextPtr context) //////////////////////////////////////////////////////////////////////////////// -void TPausePipelineCommand::Register(TRegistrar registrar) +void TStopPipelineCommand::Register(TRegistrar /*registrar*/) +{ } + +void TStopPipelineCommand::DoExecute(ICommandContextPtr context) { - registrar.Parameter("pipeline_path", &TThis::PipelinePath); + auto client = context->GetClient(); + WaitFor(client->StopPipeline(PipelinePath, Options)) + .ThrowOnError(); + + ProduceEmptyOutput(context); } +//////////////////////////////////////////////////////////////////////////////// + +void TPausePipelineCommand::Register(TRegistrar /*registrar*/) +{ } + void TPausePipelineCommand::DoExecute(ICommandContextPtr context) { auto client = context->GetClient(); - WaitFor(client->PausePipeline(PipelinePath)) + WaitFor(client->PausePipeline(PipelinePath, Options)) .ThrowOnError(); ProduceEmptyOutput(context); diff --git a/yt/yt/client/driver/flow_commands.h b/yt/yt/client/driver/flow_commands.h index c6beb43dda..dd798adfcb 100644 --- a/yt/yt/client/driver/flow_commands.h +++ b/yt/yt/client/driver/flow_commands.h @@ -8,8 +8,125 @@ namespace NYT::NDriver { //////////////////////////////////////////////////////////////////////////////// +class TPipelineCommandBase + : public virtual NYTree::TYsonStructLite +{ +public: + REGISTER_YSON_STRUCT_LITE(TPipelineCommandBase); + + static void Register(TRegistrar registrar); + +protected: + NYPath::TYPath PipelinePath; +}; + +//////////////////////////////////////////////////////////////////////////////// + +class TGetPipelineSpecCommand + : public TTypedCommand<NApi::TGetPipelineSpecOptions> + , public TPipelineCommandBase +{ +public: + REGISTER_YSON_STRUCT_LITE(TGetPipelineSpecCommand); + + static void Register(TRegistrar registrar); + +private: + NYPath::TYPath SpecPath; + + void DoExecute(ICommandContextPtr context) override; +}; + +//////////////////////////////////////////////////////////////////////////////// + +class TSetPipelineSpecCommand + : public TTypedCommand<NApi::TSetPipelineSpecOptions> + , public TPipelineCommandBase +{ +public: + REGISTER_YSON_STRUCT_LITE(TSetPipelineSpecCommand); + + static void Register(TRegistrar registrar); + +private: + NYPath::TYPath SpecPath; + + void DoExecute(ICommandContextPtr context) override; +}; + +//////////////////////////////////////////////////////////////////////////////// + +class TRemovePipelineSpecCommand + : public TTypedCommand<NApi::TSetPipelineSpecOptions> + , public TPipelineCommandBase +{ +public: + REGISTER_YSON_STRUCT_LITE(TRemovePipelineSpecCommand); + + static void Register(TRegistrar registrar); + +private: + NYPath::TYPath SpecPath; + + void DoExecute(ICommandContextPtr context) override; +}; + +//////////////////////////////////////////////////////////////////////////////// + +class TGetPipelineDynamicSpecCommand + : public TTypedCommand<NApi::TGetPipelineDynamicSpecOptions> + , public TPipelineCommandBase +{ +public: + REGISTER_YSON_STRUCT_LITE(TGetPipelineDynamicSpecCommand); + + static void Register(TRegistrar registrar); + +private: + NYPath::TYPath SpecPath; + + void DoExecute(ICommandContextPtr context) override; +}; + +//////////////////////////////////////////////////////////////////////////////// + +class TSetPipelineDynamicSpecCommand + : public TTypedCommand<NApi::TSetPipelineDynamicSpecOptions> + , public TPipelineCommandBase +{ +public: + REGISTER_YSON_STRUCT_LITE(TSetPipelineDynamicSpecCommand); + + static void Register(TRegistrar registrar); + +private: + NYPath::TYPath SpecPath; + + void DoExecute(ICommandContextPtr context) override; +}; + +//////////////////////////////////////////////////////////////////////////////// + +class TRemovePipelineDynamicSpecCommand + : public TTypedCommand<NApi::TSetPipelineDynamicSpecOptions> + , public TPipelineCommandBase +{ +public: + REGISTER_YSON_STRUCT_LITE(TRemovePipelineDynamicSpecCommand); + + static void Register(TRegistrar registrar); + +private: + NYPath::TYPath SpecPath; + + void DoExecute(ICommandContextPtr context) override; +}; + +//////////////////////////////////////////////////////////////////////////////// + class TStartPipelineCommand : public TTypedCommand<NApi::TStartPipelineOptions> + , public TPipelineCommandBase { public: REGISTER_YSON_STRUCT_LITE(TStartPipelineCommand); @@ -17,8 +134,6 @@ public: static void Register(TRegistrar registrar); private: - NYPath::TYPath PipelinePath; - void DoExecute(ICommandContextPtr context) override; }; @@ -26,6 +141,7 @@ private: class TStopPipelineCommand : public TTypedCommand<NApi::TStopPipelineOptions> + , public TPipelineCommandBase { public: REGISTER_YSON_STRUCT_LITE(TStopPipelineCommand); @@ -33,15 +149,14 @@ public: static void Register(TRegistrar registrar); private: - NYPath::TYPath PipelinePath; - void DoExecute(ICommandContextPtr context) override; }; //////////////////////////////////////////////////////////////////////////////// class TPausePipelineCommand - : public TTypedCommand<NApi::TStopPipelineOptions> + : public TTypedCommand<NApi::TPausePipelineOptions> + , public TPipelineCommandBase { public: REGISTER_YSON_STRUCT_LITE(TPausePipelineCommand); @@ -49,8 +164,6 @@ public: static void Register(TRegistrar registrar); private: - NYPath::TYPath PipelinePath; - void DoExecute(ICommandContextPtr context) override; }; @@ -58,6 +171,7 @@ private: class TGetPipelineStatusCommand : public TTypedCommand<NApi::TGetPipelineStatusOptions> + , public TPipelineCommandBase { public: REGISTER_YSON_STRUCT_LITE(TGetPipelineStatusCommand); @@ -65,8 +179,6 @@ public: static void Register(TRegistrar registrar); private: - NYPath::TYPath PipelinePath; - void DoExecute(ICommandContextPtr context) override; }; diff --git a/yt/yt/flow/lib/client/public.h b/yt/yt/flow/lib/client/public.h index c813f0d5f6..e1812eaa08 100644 --- a/yt/yt/flow/lib/client/public.h +++ b/yt/yt/flow/lib/client/public.h @@ -27,6 +27,10 @@ DEFINE_ENUM(EPipelineState, ((Completed) (6)) ); +YT_DEFINE_ERROR_ENUM( + ((SpecVersionMismatch) (3300)) +); + YT_DEFINE_STRONG_TYPEDEF(TVersion, i64); //////////////////////////////////////////////////////////////////////////////// |