aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorrobot-piglet <robot-piglet@yandex-team.com>2024-01-25 23:49:45 +0300
committerrobot-piglet <robot-piglet@yandex-team.com>2024-01-25 23:59:53 +0300
commite69ee2703dc7857bd4916a1855103268e6b476a1 (patch)
tree309713807dbaf66cdc017dc897e785f7a6d8b4e0
parenta87a6225bc70a7ec058857e1a5dc6c423610d249 (diff)
downloadydb-e69ee2703dc7857bd4916a1855103268e6b476a1.tar.gz
Intermediate changes
-rw-r--r--yt/yt/client/driver/command.cpp1
-rw-r--r--yt/yt/client/driver/driver.cpp12
-rw-r--r--yt/yt/client/driver/flow_commands.cpp286
-rw-r--r--yt/yt/client/driver/flow_commands.h130
-rw-r--r--yt/yt/flow/lib/client/public.h4
5 files changed, 408 insertions, 25 deletions
diff --git a/yt/yt/client/driver/command.cpp b/yt/yt/client/driver/command.cpp
index 8a94593e8c..cd9cf53ad7 100644
--- a/yt/yt/client/driver/command.cpp
+++ b/yt/yt/client/driver/command.cpp
@@ -16,7 +16,6 @@ using namespace NConcurrency;
////////////////////////////////////////////////////////////////////////////////
-
void ProduceOutput(
ICommandContextPtr context,
const std::function<void(IYsonConsumer*)>& producer)
diff --git a/yt/yt/client/driver/driver.cpp b/yt/yt/client/driver/driver.cpp
index a9a8bd685e..b68aff58e7 100644
--- a/yt/yt/client/driver/driver.cpp
+++ b/yt/yt/client/driver/driver.cpp
@@ -362,9 +362,15 @@ public:
REGISTER_ALL(TGetBundleConfigCommand, "get_bundle_config", Null, Structured, false, false);
REGISTER_ALL(TSetBundleConfigCommand, "set_bundle_config", Structured, Null, false, false);
- REGISTER_ALL(TStartPipelineCommand, "start_pipeline", Null, Structured, false, false);
- REGISTER_ALL(TStopPipelineCommand, "stop_pipeline", Null, Structured, false, false);
- REGISTER_ALL(TPausePipelineCommand, "pause_pipeline", Null, Structured, false, false);
+ REGISTER (TGetPipelineSpecCommand, "get_pipeline_spec", Null, Structured, true, false, ApiVersion4);
+ REGISTER (TSetPipelineSpecCommand, "set_pipeline_spec", Structured, Null, true, false, ApiVersion4);
+ REGISTER (TRemovePipelineDynamicSpecCommand, "remove_pipeline_spec", Null, Null, true, false, ApiVersion4);
+ REGISTER (TGetPipelineDynamicSpecCommand, "get_pipeline_dynamic_spec", Null, Structured, true, false, ApiVersion4);
+ REGISTER (TSetPipelineDynamicSpecCommand, "set_pipeline_dynamic_spec", Structured, Null, true, false, ApiVersion4);
+ REGISTER (TRemovePipelineDynamicSpecCommand, "remove_pipeline_dynamic_spec", Null, Null, true, false, ApiVersion4);
+ REGISTER (TStartPipelineCommand, "start_pipeline", Null, Structured, false, false, ApiVersion4);
+ REGISTER (TStopPipelineCommand, "stop_pipeline", Null, Structured, false, false, ApiVersion4);
+ REGISTER (TPausePipelineCommand, "pause_pipeline", Null, Structured, false, false, ApiVersion4);
if (Config_->EnableInternalCommands) {
REGISTER_ALL(TReadHunksCommand, "read_hunks", Null, Structured, false, true );
diff --git a/yt/yt/client/driver/flow_commands.cpp b/yt/yt/client/driver/flow_commands.cpp
index c0543f4ad2..4b5d84bfd5 100644
--- a/yt/yt/client/driver/flow_commands.cpp
+++ b/yt/yt/client/driver/flow_commands.cpp
@@ -1,36 +1,286 @@
#include "flow_commands.h"
+#include <yt/yt/core/ytree/fluent.h>
+#include <yt/yt/core/ytree/ypath_client.h>
+
namespace NYT::NDriver {
using namespace NConcurrency;
+using namespace NYTree;
+using namespace NYPath;
+using namespace NYson;
+using namespace NApi;
////////////////////////////////////////////////////////////////////////////////
-void TStartPipelineCommand::Register(TRegistrar registrar)
+namespace {
+
+void ExecuteGetPipelineSpecCommand(
+ const ICommandContextPtr& context,
+ const TYPath& specPath,
+ const auto& specGetterOptions,
+ auto specGetter)
{
- registrar.Parameter("pipeline_path", &TThis::PipelinePath);
+ auto client = context->GetClient();
+ auto result = WaitFor(specGetter(client, specGetterOptions))
+ .ValueOrThrow();
+
+ auto spec = SyncYPathGet(ConvertToNode(result.Spec), specPath);
+
+ ProduceOutput(context, [&] (NYson::IYsonConsumer* consumer) {
+ BuildYsonFluently(consumer)
+ .BeginMap()
+ .Item("spec").Value(spec)
+ .Item("version").Value(result.Version)
+ .EndMap();
+ });
}
-void TStartPipelineCommand::DoExecute(ICommandContextPtr context)
+void ExecuteSetPipelineSpecCommand(
+ const ICommandContextPtr& context,
+ const TYPath& specPath,
+ const auto& specGetterOptions,
+ auto specGetter,
+ const auto& specSetterOptions,
+ auto specSetter)
{
auto client = context->GetClient();
- WaitFor(client->StartPipeline(PipelinePath))
- .ThrowOnError();
+ auto spec = context->ConsumeInputValue();
- ProduceEmptyOutput(context);
+ auto setResult = [&] {
+ if (specPath.empty()) {
+ return WaitFor(specSetter(client, spec, specSetterOptions))
+ .ValueOrThrow();
+ } else {
+ auto getResult = WaitFor(specGetter(client, specGetterOptions))
+ .ValueOrThrow();
+
+ if (specSetterOptions.ExpectedVersion && getResult.Version != *specSetterOptions.ExpectedVersion) {
+ THROW_ERROR_EXCEPTION(
+ NFlow::EErrorCode::SpecVersionMismatch,
+ "Spec version mismatch: expected %v, got %v",
+ *specSetterOptions.ExpectedVersion,
+ getResult.Version);
+ }
+
+ auto fullSpec = ConvertToNode(getResult.Spec);
+ SyncYPathSet(fullSpec, specPath, spec);
+
+ auto adjustedOptions = specSetterOptions;
+ adjustedOptions.ExpectedVersion = getResult.Version;
+ return WaitFor(specSetter(client, ConvertToYsonString(fullSpec), adjustedOptions))
+ .ValueOrThrow();
+ }
+ }();
+
+ ProduceOutput(context, [&] (NYson::IYsonConsumer* consumer) {
+ BuildYsonFluently(consumer)
+ .BeginMap()
+ .Item("version").Value(setResult.Version)
+ .EndMap();
+ });
+}
+
+void ExecuteRemovePipelineSpecCommand(
+ const ICommandContextPtr& context,
+ const TYPath& specPath,
+ const auto& specGetterOptions,
+ auto specGetter,
+ const auto& specSetterOptions,
+ auto specSetter)
+{
+ auto client = context->GetClient();
+ auto spec = context->ConsumeInputValue();
+
+ auto getResult = WaitFor(specGetter(client, specGetterOptions))
+ .ValueOrThrow();
+
+ if (specSetterOptions.ExpectedVersion && getResult.Version != *specSetterOptions.ExpectedVersion) {
+ THROW_ERROR_EXCEPTION(
+ NFlow::EErrorCode::SpecVersionMismatch,
+ "Spec version mismatch: expected %v, got %v",
+ *specSetterOptions.ExpectedVersion,
+ getResult.Version);
+ }
+
+ auto fullSpec = ConvertToNode(getResult.Spec);
+ SyncYPathRemove(fullSpec, specPath);
+
+ auto adjustedOptions = specSetterOptions;
+ adjustedOptions.ExpectedVersion = getResult.Version;
+ auto setResult = WaitFor(specSetter(client, ConvertToYsonString(fullSpec), adjustedOptions))
+ .ValueOrThrow();
+
+ ProduceOutput(context, [&] (NYson::IYsonConsumer* consumer) {
+ BuildYsonFluently(consumer)
+ .BeginMap()
+ .Item("version").Value(setResult.Version)
+ .EndMap();
+ });
}
+} // namespace
+
////////////////////////////////////////////////////////////////////////////////
-void TStopPipelineCommand::Register(TRegistrar registrar)
+void TPipelineCommandBase::Register(TRegistrar registrar)
{
registrar.Parameter("pipeline_path", &TThis::PipelinePath);
}
-void TStopPipelineCommand::DoExecute(ICommandContextPtr context)
+////////////////////////////////////////////////////////////////////////////////
+
+void TGetPipelineSpecCommand::Register(TRegistrar registrar)
+{
+ registrar.Parameter("spec_path", &TThis::SpecPath)
+ .Default();
+}
+
+void TGetPipelineSpecCommand::DoExecute(ICommandContextPtr context)
+{
+ ExecuteGetPipelineSpecCommand(
+ context,
+ SpecPath,
+ Options,
+ [&] (const auto& client, const auto& options) { return client->GetPipelineSpec(PipelinePath, options); });
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+void TSetPipelineSpecCommand::Register(TRegistrar registrar)
+{
+ registrar.Parameter("spec_path", &TThis::SpecPath)
+ .Default();
+ registrar.ParameterWithUniversalAccessor<bool>(
+ "force",
+ [] (TThis* command) -> auto& {
+ return command->Options.Force;
+ })
+ .Optional(/*init*/ false);
+ registrar.ParameterWithUniversalAccessor<std::optional<NFlow::TVersion>>(
+ "expected_version",
+ [] (TThis* command) -> auto& {
+ return command->Options.ExpectedVersion;
+ })
+ .Optional(/*init*/ false);
+}
+
+void TSetPipelineSpecCommand::DoExecute(ICommandContextPtr context)
+{
+ ExecuteSetPipelineSpecCommand(
+ context,
+ SpecPath,
+ TGetPipelineSpecOptions(),
+ [&] (const auto& client, const auto& options) { return client->GetPipelineSpec(PipelinePath, options); },
+ Options,
+ [&] (const auto& client, const auto& spec, const auto& options) { return client->SetPipelineSpec(PipelinePath, spec, options); });
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+void TRemovePipelineSpecCommand::Register(TRegistrar registrar)
+{
+ registrar.Parameter("spec_path", &TThis::SpecPath);
+ registrar.ParameterWithUniversalAccessor<bool>(
+ "force",
+ [] (TThis* command) -> auto& {
+ return command->Options.Force;
+ })
+ .Optional(/*init*/ false);
+ registrar.ParameterWithUniversalAccessor<std::optional<NFlow::TVersion>>(
+ "expected_version",
+ [] (TThis* command) -> auto& {
+ return command->Options.ExpectedVersion;
+ })
+ .Optional(/*init*/ false);
+}
+
+void TRemovePipelineSpecCommand::DoExecute(ICommandContextPtr context)
+{
+ ExecuteRemovePipelineSpecCommand(
+ context,
+ SpecPath,
+ TGetPipelineSpecOptions(),
+ [&] (const auto& client, const auto& options) { return client->GetPipelineSpec(PipelinePath, options); },
+ Options,
+ [&] (const auto& client, const auto& spec, const auto& options) { return client->SetPipelineSpec(PipelinePath, spec, options); });
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+void TGetPipelineDynamicSpecCommand::Register(TRegistrar registrar)
+{
+ registrar.Parameter("spec_path", &TThis::SpecPath)
+ .Default();
+}
+
+void TGetPipelineDynamicSpecCommand::DoExecute(ICommandContextPtr context)
+{
+ ExecuteGetPipelineSpecCommand(
+ context,
+ SpecPath,
+ Options,
+ [&] (const auto& client, const auto& options) { return client->GetPipelineDynamicSpec(PipelinePath, options); });
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+void TSetPipelineDynamicSpecCommand::Register(TRegistrar registrar)
+{
+ registrar.Parameter("spec_path", &TThis::SpecPath)
+ .Default();
+ registrar.ParameterWithUniversalAccessor<std::optional<NFlow::TVersion>>(
+ "expected_version",
+ [] (TThis* command) -> auto& {
+ return command->Options.ExpectedVersion;
+ })
+ .Optional(/*init*/ false);
+}
+
+void TSetPipelineDynamicSpecCommand::DoExecute(ICommandContextPtr context)
+{
+ ExecuteSetPipelineSpecCommand(
+ context,
+ SpecPath,
+ TGetPipelineDynamicSpecOptions(),
+ [&] (const auto& client, const auto& options) { return client->GetPipelineDynamicSpec(PipelinePath, options); },
+ Options,
+ [&] (const auto& client, const auto& spec, const auto& options) { return client->SetPipelineDynamicSpec(PipelinePath, spec, options); });
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+void TRemovePipelineDynamicSpecCommand::Register(TRegistrar registrar)
+{
+ registrar.Parameter("spec_path", &TThis::SpecPath);
+ registrar.ParameterWithUniversalAccessor<std::optional<NFlow::TVersion>>(
+ "expected_version",
+ [] (TThis* command) -> auto& {
+ return command->Options.ExpectedVersion;
+ })
+ .Optional(/*init*/ false);
+}
+
+void TRemovePipelineDynamicSpecCommand::DoExecute(ICommandContextPtr context)
+{
+ ExecuteRemovePipelineSpecCommand(
+ context,
+ SpecPath,
+ TGetPipelineDynamicSpecOptions(),
+ [&] (const auto& client, const auto& options) { return client->GetPipelineDynamicSpec(PipelinePath, options); },
+ Options,
+ [&] (const auto& client, const auto& spec, const auto& options) { return client->SetPipelineDynamicSpec(PipelinePath, spec, options); });
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+void TStartPipelineCommand::Register(TRegistrar /*registrar*/)
+{ }
+
+void TStartPipelineCommand::DoExecute(ICommandContextPtr context)
{
auto client = context->GetClient();
- WaitFor(client->StopPipeline(PipelinePath))
+ WaitFor(client->StartPipeline(PipelinePath, Options))
.ThrowOnError();
ProduceEmptyOutput(context);
@@ -38,15 +288,27 @@ void TStopPipelineCommand::DoExecute(ICommandContextPtr context)
////////////////////////////////////////////////////////////////////////////////
-void TPausePipelineCommand::Register(TRegistrar registrar)
+void TStopPipelineCommand::Register(TRegistrar /*registrar*/)
+{ }
+
+void TStopPipelineCommand::DoExecute(ICommandContextPtr context)
{
- registrar.Parameter("pipeline_path", &TThis::PipelinePath);
+ auto client = context->GetClient();
+ WaitFor(client->StopPipeline(PipelinePath, Options))
+ .ThrowOnError();
+
+ ProduceEmptyOutput(context);
}
+////////////////////////////////////////////////////////////////////////////////
+
+void TPausePipelineCommand::Register(TRegistrar /*registrar*/)
+{ }
+
void TPausePipelineCommand::DoExecute(ICommandContextPtr context)
{
auto client = context->GetClient();
- WaitFor(client->PausePipeline(PipelinePath))
+ WaitFor(client->PausePipeline(PipelinePath, Options))
.ThrowOnError();
ProduceEmptyOutput(context);
diff --git a/yt/yt/client/driver/flow_commands.h b/yt/yt/client/driver/flow_commands.h
index c6beb43dda..dd798adfcb 100644
--- a/yt/yt/client/driver/flow_commands.h
+++ b/yt/yt/client/driver/flow_commands.h
@@ -8,8 +8,125 @@ namespace NYT::NDriver {
////////////////////////////////////////////////////////////////////////////////
+class TPipelineCommandBase
+ : public virtual NYTree::TYsonStructLite
+{
+public:
+ REGISTER_YSON_STRUCT_LITE(TPipelineCommandBase);
+
+ static void Register(TRegistrar registrar);
+
+protected:
+ NYPath::TYPath PipelinePath;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TGetPipelineSpecCommand
+ : public TTypedCommand<NApi::TGetPipelineSpecOptions>
+ , public TPipelineCommandBase
+{
+public:
+ REGISTER_YSON_STRUCT_LITE(TGetPipelineSpecCommand);
+
+ static void Register(TRegistrar registrar);
+
+private:
+ NYPath::TYPath SpecPath;
+
+ void DoExecute(ICommandContextPtr context) override;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TSetPipelineSpecCommand
+ : public TTypedCommand<NApi::TSetPipelineSpecOptions>
+ , public TPipelineCommandBase
+{
+public:
+ REGISTER_YSON_STRUCT_LITE(TSetPipelineSpecCommand);
+
+ static void Register(TRegistrar registrar);
+
+private:
+ NYPath::TYPath SpecPath;
+
+ void DoExecute(ICommandContextPtr context) override;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TRemovePipelineSpecCommand
+ : public TTypedCommand<NApi::TSetPipelineSpecOptions>
+ , public TPipelineCommandBase
+{
+public:
+ REGISTER_YSON_STRUCT_LITE(TRemovePipelineSpecCommand);
+
+ static void Register(TRegistrar registrar);
+
+private:
+ NYPath::TYPath SpecPath;
+
+ void DoExecute(ICommandContextPtr context) override;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TGetPipelineDynamicSpecCommand
+ : public TTypedCommand<NApi::TGetPipelineDynamicSpecOptions>
+ , public TPipelineCommandBase
+{
+public:
+ REGISTER_YSON_STRUCT_LITE(TGetPipelineDynamicSpecCommand);
+
+ static void Register(TRegistrar registrar);
+
+private:
+ NYPath::TYPath SpecPath;
+
+ void DoExecute(ICommandContextPtr context) override;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TSetPipelineDynamicSpecCommand
+ : public TTypedCommand<NApi::TSetPipelineDynamicSpecOptions>
+ , public TPipelineCommandBase
+{
+public:
+ REGISTER_YSON_STRUCT_LITE(TSetPipelineDynamicSpecCommand);
+
+ static void Register(TRegistrar registrar);
+
+private:
+ NYPath::TYPath SpecPath;
+
+ void DoExecute(ICommandContextPtr context) override;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
+class TRemovePipelineDynamicSpecCommand
+ : public TTypedCommand<NApi::TSetPipelineDynamicSpecOptions>
+ , public TPipelineCommandBase
+{
+public:
+ REGISTER_YSON_STRUCT_LITE(TRemovePipelineDynamicSpecCommand);
+
+ static void Register(TRegistrar registrar);
+
+private:
+ NYPath::TYPath SpecPath;
+
+ void DoExecute(ICommandContextPtr context) override;
+};
+
+////////////////////////////////////////////////////////////////////////////////
+
class TStartPipelineCommand
: public TTypedCommand<NApi::TStartPipelineOptions>
+ , public TPipelineCommandBase
{
public:
REGISTER_YSON_STRUCT_LITE(TStartPipelineCommand);
@@ -17,8 +134,6 @@ public:
static void Register(TRegistrar registrar);
private:
- NYPath::TYPath PipelinePath;
-
void DoExecute(ICommandContextPtr context) override;
};
@@ -26,6 +141,7 @@ private:
class TStopPipelineCommand
: public TTypedCommand<NApi::TStopPipelineOptions>
+ , public TPipelineCommandBase
{
public:
REGISTER_YSON_STRUCT_LITE(TStopPipelineCommand);
@@ -33,15 +149,14 @@ public:
static void Register(TRegistrar registrar);
private:
- NYPath::TYPath PipelinePath;
-
void DoExecute(ICommandContextPtr context) override;
};
////////////////////////////////////////////////////////////////////////////////
class TPausePipelineCommand
- : public TTypedCommand<NApi::TStopPipelineOptions>
+ : public TTypedCommand<NApi::TPausePipelineOptions>
+ , public TPipelineCommandBase
{
public:
REGISTER_YSON_STRUCT_LITE(TPausePipelineCommand);
@@ -49,8 +164,6 @@ public:
static void Register(TRegistrar registrar);
private:
- NYPath::TYPath PipelinePath;
-
void DoExecute(ICommandContextPtr context) override;
};
@@ -58,6 +171,7 @@ private:
class TGetPipelineStatusCommand
: public TTypedCommand<NApi::TGetPipelineStatusOptions>
+ , public TPipelineCommandBase
{
public:
REGISTER_YSON_STRUCT_LITE(TGetPipelineStatusCommand);
@@ -65,8 +179,6 @@ public:
static void Register(TRegistrar registrar);
private:
- NYPath::TYPath PipelinePath;
-
void DoExecute(ICommandContextPtr context) override;
};
diff --git a/yt/yt/flow/lib/client/public.h b/yt/yt/flow/lib/client/public.h
index c813f0d5f6..e1812eaa08 100644
--- a/yt/yt/flow/lib/client/public.h
+++ b/yt/yt/flow/lib/client/public.h
@@ -27,6 +27,10 @@ DEFINE_ENUM(EPipelineState,
((Completed) (6))
);
+YT_DEFINE_ERROR_ENUM(
+ ((SpecVersionMismatch) (3300))
+);
+
YT_DEFINE_STRONG_TYPEDEF(TVersion, i64);
////////////////////////////////////////////////////////////////////////////////