aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorInnokentii Mokin <innokentii@ydb.tech>2024-11-12 14:01:22 +0300
committerGitHub <noreply@github.com>2024-11-12 14:01:22 +0300
commit868a62aae0b39369d21c71dd3bc710b4b0d21783 (patch)
tree9fef426e7dc336e02998763bbe3327acbc12fdea
parent667a30d8c2d6c3c312cb3f81db1e07790bc99483 (diff)
downloadydb-868a62aae0b39369d21c71dd3bc710b4b0d21783.tar.gz
[SchemeShard] Add required path creation (#11445)
-rw-r--r--ydb/core/tx/schemeshard/generated/codegen/main.cpp77
-rw-r--r--ydb/core/tx/schemeshard/generated/codegen/ya.make10
-rw-r--r--ydb/core/tx/schemeshard/generated/dispatch_op.h.in20
-rw-r--r--ydb/core/tx/schemeshard/generated/traits.h293
-rw-r--r--ydb/core/tx/schemeshard/generated/ya.make25
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__init.cpp1
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation.cpp526
-rw-r--r--ydb/core/tx/schemeshard/schemeshard__operation.h4
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_impl.h8
-rw-r--r--ydb/core/tx/schemeshard/ya.make1
10 files changed, 705 insertions, 260 deletions
diff --git a/ydb/core/tx/schemeshard/generated/codegen/main.cpp b/ydb/core/tx/schemeshard/generated/codegen/main.cpp
new file mode 100644
index 0000000000..9630facd86
--- /dev/null
+++ b/ydb/core/tx/schemeshard/generated/codegen/main.cpp
@@ -0,0 +1,77 @@
+#include <util/stream/file.h>
+#include <util/stream/output.h>
+#include <util/string/builder.h>
+#include <util/system/src_location.h>
+#include <ydb/core/protos/flat_scheme_op.pb.h>
+#include <google/protobuf/descriptor.pb.h>
+
+#include <jinja2cpp/template_env.h>
+#include <jinja2cpp/template.h>
+#include <jinja2cpp/value.h>
+#include <jinja2cpp/reflected_value.h>
+#include <cstdint>
+#include <string>
+#include <vector>
+#include <iostream>
+
+std::string replace(
+ const std::string& str,
+ const std::string& from,
+ const std::string& to)
+{
+ std::string result;
+
+ size_t pos = str.find(from);
+ result.append(str, 0, pos);
+ result.append(to);
+ result.append(str, pos + from.size(), std::string::npos);
+
+ return result;
+}
+
+int main(int argc, char** argv) {
+ if (argc < 3) {
+ Cerr << "Usage: " << argv[0] << " INPUT OUTPUT ..." << Endl;
+ return 1;
+ }
+
+ std::vector<std::string> opTypes;
+ const auto* d = NKikimrSchemeOp::EOperationType_descriptor();
+ for (int i = 0; i < d->value_count(); ++i) {
+ const auto* v = d->value(i);
+ if (v) {
+ auto name = v->full_name();
+ name = replace(name, ".", "::");
+ opTypes.emplace_back(name);
+ }
+ }
+
+ jinja2::TemplateEnv env;
+ env.AddGlobal("generator", jinja2::Reflect(std::string(__SOURCE_FILE__)));
+ env.AddGlobal("opTypes", jinja2::Reflect(opTypes));
+
+ for (int i = 1; i < argc; i += 2) {
+ if (!(i + 1 < argc)) {
+ Cerr << "ERROR: missing output for " << argv[i] << Endl;
+ return 1;
+ }
+
+ jinja2::Template t(&env);
+ auto loaded = t.Load(TFileInput(argv[i]).ReadAll(), argv[i]);
+ if (!loaded) {
+ Cerr << "ERROR: " << loaded.error().ToString() << Endl;
+ return 1;
+ }
+
+ auto rendered = t.RenderAsString({});
+ if (!rendered) {
+ Cerr << "ERROR: " << rendered.error().ToString() << Endl;
+ return 1;
+ }
+
+ TFileOutput(argv[i + 1]).Write(rendered.value());
+ Cout << "Generated " << argv[i + 1] << " from " << argv[i] << Endl;
+ }
+
+ return 0;
+}
diff --git a/ydb/core/tx/schemeshard/generated/codegen/ya.make b/ydb/core/tx/schemeshard/generated/codegen/ya.make
new file mode 100644
index 0000000000..d8b1a352e1
--- /dev/null
+++ b/ydb/core/tx/schemeshard/generated/codegen/ya.make
@@ -0,0 +1,10 @@
+PROGRAM()
+
+SRCS(main.cpp)
+
+PEERDIR(
+ contrib/libs/jinja2cpp
+ ydb/core/protos
+)
+
+END()
diff --git a/ydb/core/tx/schemeshard/generated/dispatch_op.h.in b/ydb/core/tx/schemeshard/generated/dispatch_op.h.in
new file mode 100644
index 0000000000..32ef9596a5
--- /dev/null
+++ b/ydb/core/tx/schemeshard/generated/dispatch_op.h.in
@@ -0,0 +1,20 @@
+// Auto-generated by {{ generator }}, do not modify.
+#pragma once
+
+#include <ydb/core/tx/schemeshard/generated/traits.h>
+
+namespace NKikimr::NSchemeShard::NGenerated {
+
+template <class TFn>
+constexpr auto DispatchOp(const TTxTransaction& tx, TFn fn) {
+ switch (tx.GetOperationType()) {
+{% for opType in opTypes %}
+ case {{ opType }}:
+ return fn(TSchemeTxTraits<{{ opType }}>{});
+{% endfor %}
+ default:
+ return fn(TSchemeTxTraitsFallback{});
+ }
+}
+
+} // namespace NKikimr::NSchemeShard::NGenerated
diff --git a/ydb/core/tx/schemeshard/generated/traits.h b/ydb/core/tx/schemeshard/generated/traits.h
new file mode 100644
index 0000000000..e848c89146
--- /dev/null
+++ b/ydb/core/tx/schemeshard/generated/traits.h
@@ -0,0 +1,293 @@
+#pragma once
+
+#include <util/generic/string.h>
+#include <util/generic/map.h>
+
+#include <optional>
+
+#include <ydb/core/protos/flat_scheme_op.pb.h>
+
+namespace NKikimr::NSchemeShard::NGenerated {
+
+using TTxTransaction = NKikimrSchemeOp::TModifyScheme;
+
+struct TSchemeTxTraitsFallback {
+ static std::optional<TString> GetTargetName(const TTxTransaction& tx) {
+ Y_UNUSED(tx);
+ return std::nullopt;
+ }
+
+ static bool SetName(TTxTransaction& tx, const TString& name) {
+ Y_UNUSED(tx, name);
+ return false;
+ }
+
+ static THashMap<TString, THashSet<TString>> GetRequiredPaths(const TTxTransaction& tx) {
+ Y_UNUSED(tx);
+ return {};
+ }
+
+ constexpr inline static bool CreateDirsFromName = false;
+
+ constexpr inline static bool CreateAdditionalDirs = false;
+};
+
+template <NKikimrSchemeOp::EOperationType opType>
+struct TSchemeTxTraits : public TSchemeTxTraitsFallback {};
+
+template <>
+struct TSchemeTxTraits<NKikimrSchemeOp::EOperationType::ESchemeOpMkDir> : public TSchemeTxTraitsFallback {
+ static std::optional<TString> GetTargetName(const TTxTransaction& tx) {
+ return tx.GetMkDir().GetName();
+ }
+
+ static bool SetName(TTxTransaction& tx, const TString& name) {
+ tx.MutableMkDir()->SetName(name);
+ return true;
+ }
+
+ constexpr inline static bool CreateDirsFromName = true;
+};
+
+template <>
+struct TSchemeTxTraits<NKikimrSchemeOp::EOperationType::ESchemeOpCreateTable> : public TSchemeTxTraitsFallback {
+ static std::optional<TString> GetTargetName(const TTxTransaction& tx) {
+ if (tx.GetCreateTable().HasCopyFromTable()) {
+ return std::nullopt;
+ }
+ return tx.GetCreateTable().GetName();
+ }
+
+ static bool SetName(TTxTransaction& tx, const TString& name) {
+ tx.MutableCreateTable()->SetName(name);
+ return true;
+ }
+
+ constexpr inline static bool CreateDirsFromName = true;
+};
+
+template <>
+struct TSchemeTxTraits<NKikimrSchemeOp::EOperationType::ESchemeOpCreatePersQueueGroup> : public TSchemeTxTraitsFallback {
+ static std::optional<TString> GetTargetName(const TTxTransaction& tx) {
+ return tx.GetCreatePersQueueGroup().GetName();
+ }
+
+ static bool SetName(TTxTransaction& tx, const TString& name) {
+ tx.MutableCreatePersQueueGroup()->SetName(name);
+ return true;
+ }
+
+ constexpr inline static bool CreateDirsFromName = true;
+};
+
+template <>
+struct TSchemeTxTraits<NKikimrSchemeOp::EOperationType::ESchemeOpCreateSubDomain> : public TSchemeTxTraitsFallback {
+ static std::optional<TString> GetTargetName(const TTxTransaction& tx) {
+ return tx.GetSubDomain().GetName();
+ }
+
+ static bool SetName(TTxTransaction& tx, const TString& name) {
+ tx.MutableSubDomain()->SetName(name);
+ return true;
+ }
+
+ constexpr inline static bool CreateDirsFromName = true;
+};
+
+template <>
+struct TSchemeTxTraits<NKikimrSchemeOp::EOperationType::ESchemeOpCreateExtSubDomain> : public TSchemeTxTraitsFallback {
+ static std::optional<TString> GetTargetName(const TTxTransaction& tx) {
+ return tx.GetSubDomain().GetName();
+ }
+
+ static bool SetName(TTxTransaction& tx, const TString& name) {
+ tx.MutableSubDomain()->SetName(name);
+ return true;
+ }
+
+ constexpr inline static bool CreateDirsFromName = true;
+};
+
+template <>
+struct TSchemeTxTraits<NKikimrSchemeOp::EOperationType::ESchemeOpCreateRtmrVolume> : public TSchemeTxTraitsFallback {
+ static std::optional<TString> GetTargetName(const TTxTransaction& tx) {
+ return tx.GetCreateRtmrVolume().GetName();
+ }
+
+ static bool SetName(TTxTransaction& tx, const TString& name) {
+ tx.MutableCreateRtmrVolume()->SetName(name);
+ return true;
+ }
+
+ constexpr inline static bool CreateDirsFromName = true;
+};
+
+template <>
+struct TSchemeTxTraits<NKikimrSchemeOp::EOperationType::ESchemeOpCreateBlockStoreVolume> : public TSchemeTxTraitsFallback {
+ static std::optional<TString> GetTargetName(const TTxTransaction& tx) {
+ return tx.GetCreateBlockStoreVolume().GetName();
+ }
+
+ static bool SetName(TTxTransaction& tx, const TString& name) {
+ tx.MutableCreateBlockStoreVolume()->SetName(name);
+ return true;
+ }
+
+ constexpr inline static bool CreateDirsFromName = true;
+};
+
+template <>
+struct TSchemeTxTraits<NKikimrSchemeOp::EOperationType::ESchemeOpCreateFileStore> : public TSchemeTxTraitsFallback {
+ static std::optional<TString> GetTargetName(const TTxTransaction& tx) {
+ return tx.GetCreateFileStore().GetName();
+ }
+
+ static bool SetName(TTxTransaction& tx, const TString& name) {
+ tx.MutableCreateFileStore()->SetName(name);
+ return true;
+ }
+
+ constexpr inline static bool CreateDirsFromName = true;
+};
+
+template <>
+struct TSchemeTxTraits<NKikimrSchemeOp::EOperationType::ESchemeOpCreateKesus> : public TSchemeTxTraitsFallback {
+ static std::optional<TString> GetTargetName(const TTxTransaction& tx) {
+ return tx.GetKesus().GetName();
+ }
+
+ static bool SetName(TTxTransaction& tx, const TString& name) {
+ tx.MutableKesus()->SetName(name);
+ return true;
+ }
+
+ constexpr inline static bool CreateDirsFromName = true;
+};
+
+template <>
+struct TSchemeTxTraits<NKikimrSchemeOp::EOperationType::ESchemeOpCreateSolomonVolume> : public TSchemeTxTraitsFallback {
+ static std::optional<TString> GetTargetName(const TTxTransaction& tx) {
+ return tx.GetCreateSolomonVolume().GetName();
+ }
+
+ static bool SetName(TTxTransaction& tx, const TString& name) {
+ tx.MutableCreateSolomonVolume()->SetName(name);
+ return true;
+ }
+
+ constexpr inline static bool CreateDirsFromName = true;
+};
+
+template <>
+struct TSchemeTxTraits<NKikimrSchemeOp::EOperationType::ESchemeOpCreateIndexedTable> : public TSchemeTxTraitsFallback {
+ static std::optional<TString> GetTargetName(const TTxTransaction& tx) {
+ return tx.GetCreateIndexedTable().GetTableDescription().GetName();
+ }
+
+ static bool SetName(TTxTransaction& tx, const TString& name) {
+ tx.MutableCreateIndexedTable()->MutableTableDescription()->SetName(name);
+ return true;
+ }
+
+ constexpr inline static bool CreateDirsFromName = true;
+};
+
+template <>
+struct TSchemeTxTraits<NKikimrSchemeOp::EOperationType::ESchemeOpCreateColumnStore> : public TSchemeTxTraitsFallback {
+ static std::optional<TString> GetTargetName(const TTxTransaction& tx) {
+ return tx.GetCreateColumnStore().GetName();
+ }
+
+ static bool SetName(TTxTransaction& tx, const TString& name) {
+ tx.MutableCreateColumnStore()->SetName(name);
+ return true;
+ }
+
+ constexpr inline static bool CreateDirsFromName = true;
+};
+
+template <>
+struct TSchemeTxTraits<NKikimrSchemeOp::EOperationType::ESchemeOpCreateColumnTable> : public TSchemeTxTraitsFallback {
+ static std::optional<TString> GetTargetName(const TTxTransaction& tx) {
+ return tx.GetCreateColumnTable().GetName();
+ }
+
+ static bool SetName(TTxTransaction& tx, const TString& name) {
+ tx.MutableCreateColumnTable()->SetName(name);
+ return true;
+ }
+
+ constexpr inline static bool CreateDirsFromName = true;
+};
+
+template <>
+struct TSchemeTxTraits<NKikimrSchemeOp::EOperationType::ESchemeOpCreateExternalTable> : public TSchemeTxTraitsFallback {
+ static std::optional<TString> GetTargetName(const TTxTransaction& tx) {
+ return tx.GetCreateExternalTable().GetName();
+ }
+
+ static bool SetName(TTxTransaction& tx, const TString& name) {
+ tx.MutableCreateExternalTable()->SetName(name);
+ return true;
+ }
+
+ constexpr inline static bool CreateDirsFromName = true;
+};
+
+template <>
+struct TSchemeTxTraits<NKikimrSchemeOp::EOperationType::ESchemeOpCreateExternalDataSource> : public TSchemeTxTraitsFallback {
+ static std::optional<TString> GetTargetName(const TTxTransaction& tx) {
+ return tx.GetCreateExternalDataSource().GetName();
+ }
+
+ static bool SetName(TTxTransaction& tx, const TString& name) {
+ tx.MutableCreateExternalDataSource()->SetName(name);
+ return true;
+ }
+
+ constexpr inline static bool CreateDirsFromName = true;
+};
+
+template <>
+struct TSchemeTxTraits<NKikimrSchemeOp::EOperationType::ESchemeOpCreateView> : public TSchemeTxTraitsFallback {
+ static std::optional<TString> GetTargetName(const TTxTransaction& tx) {
+ return tx.GetCreateView().GetName();
+ }
+
+ static bool SetName(TTxTransaction& tx, const TString& name) {
+ tx.MutableCreateView()->SetName(name);
+ return true;
+ }
+
+ constexpr inline static bool CreateDirsFromName = true;
+};
+
+template <>
+struct TSchemeTxTraits<NKikimrSchemeOp::EOperationType::ESchemeOpCreateResourcePool> : public TSchemeTxTraitsFallback {
+ static std::optional<TString> GetTargetName(const TTxTransaction& tx) {
+ return tx.GetCreateResourcePool().GetName();
+ }
+
+ static bool SetName(TTxTransaction& tx, const TString& name) {
+ tx.MutableCreateResourcePool()->SetName(name);
+ return true;
+ }
+
+ constexpr inline static bool CreateDirsFromName = true;
+};
+
+template <>
+struct TSchemeTxTraits<NKikimrSchemeOp::EOperationType::ESchemeOpCreateBackupCollection> : public TSchemeTxTraitsFallback {
+ static std::optional<TString> GetTargetName(const TTxTransaction& tx) {
+ return tx.GetCreateBackupCollection().GetName();
+ }
+
+ static bool SetName(TTxTransaction& tx, const TString& name) {
+ tx.MutableCreateBackupCollection()->SetName(name);
+ return true;
+ }
+
+ constexpr inline static bool CreateDirsFromName = true;
+};
+
+} // namespace NKikimr::NSchemeShard::NGenerated
diff --git a/ydb/core/tx/schemeshard/generated/ya.make b/ydb/core/tx/schemeshard/generated/ya.make
new file mode 100644
index 0000000000..2ffb6a008c
--- /dev/null
+++ b/ydb/core/tx/schemeshard/generated/ya.make
@@ -0,0 +1,25 @@
+LIBRARY()
+
+PEERDIR(
+ ydb/core/protos
+)
+
+RUN_PROGRAM(
+ ydb/core/tx/schemeshard/generated/codegen
+ dispatch_op.h.in
+ dispatch_op.h
+ IN dispatch_op.h.in
+ OUT dispatch_op.h
+ OUTPUT_INCLUDES
+ ydb/core/tx/schemeshard/generated/traits.h
+)
+
+SRCS(
+ traits.h
+)
+
+END()
+
+RECURSE(
+ codegen
+)
diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp
index f0b61571e3..e8143a224b 100644
--- a/ydb/core/tx/schemeshard/schemeshard__init.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp
@@ -3591,6 +3591,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {
Y_ABORT_UNLESS(operationId.GetSubTxId() == operation->Parts.size());
TOperationContext context{Self, txc, ctx, OnComplete, MemChanges, DbChanges};
ISubOperation::TPtr part = operation->RestorePart(txState.TxType, txState.State, context);
+ ++(operation->PreparedParts);
operation->AddPart(part);
if (!txInFlightRowset.Next())
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation.cpp b/ydb/core/tx/schemeshard/schemeshard__operation.cpp
index 68188671e1..fdf50ff7f1 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation.cpp
+++ b/ydb/core/tx/schemeshard/schemeshard__operation.cpp
@@ -8,6 +8,7 @@
#include "schemeshard_audit_log.h"
#include "schemeshard_impl.h"
+#include <ydb/core/tx/schemeshard/generated/dispatch_op.h>
#include <ydb/core/tablet/tablet_exception.h>
#include <ydb/core/tablet_flat/flat_cxx_database.h>
#include <ydb/core/tablet_flat/tablet_flat_executor.h>
@@ -87,6 +88,94 @@ NKikimrScheme::TEvModifySchemeTransaction GetRecordForPrint(const NKikimrScheme:
return recordForPrint;
}
+bool TSchemeShard::ProcessOperationParts(
+ const TVector<ISubOperation::TPtr>& parts,
+ const TTxId& txId,
+ const NKikimrScheme::TEvModifySchemeTransaction& record,
+ bool prevProposeUndoSafe,
+ TOperation::TPtr& operation,
+ THolder<TProposeResponse>& response,
+ TOperationContext& context)
+{
+ auto selfId = SelfTabletId();
+ const TString owner = record.HasOwner() ? record.GetOwner() : BUILTIN_ACL_ROOT;
+
+ if (parts.size() > 1) {
+ // allow altering impl index tables as part of consistent operation
+ context.IsAllowedPrivateTables = true;
+ }
+
+ for (auto& part : parts) {
+ TString errStr;
+ if (!context.SS->CheckInFlightLimit(part->GetTransaction().GetOperationType(), errStr)) {
+ response.Reset(new TProposeResponse(NKikimrScheme::StatusResourceExhausted, ui64(txId), ui64(selfId)));
+ response->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
+ } else {
+ response = part->Propose(owner, context);
+ }
+
+ Y_ABORT_UNLESS(response);
+
+ LOG_NOTICE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "IgniteOperation"
+ << ", opId: " << operation->NextPartId()
+ << ", propose status:" << NKikimrScheme::EStatus_Name(response->Record.GetStatus())
+ << ", reason: " << response->Record.GetReason()
+ << ", at schemeshard: " << selfId);
+
+ if (response->IsDone()) {
+ operation->AddPart(part); //at ApplyOnExecute parts is erased
+ context.OnComplete.DoneOperation(part->GetOperationId()); //mark it here by self for sure
+ } else if (response->IsConditionalAccepted()) {
+ //happens on retries, we answer like AlreadyExist or StatusSuccess with error message and do nothing in operation
+ operation->AddPart(part); //at ApplyOnExecute parts is erased
+ context.OnComplete.DoneOperation(part->GetOperationId()); //mark it here by self for sure
+ } else if (response->IsAccepted()) {
+ operation->AddPart(part);
+ //context.OnComplete.ActivateTx(partOpId) ///TODO maybe it is good idea
+ } else {
+ if (!operation->Parts.empty()) {
+ LOG_NOTICE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "Abort operation: IgniteOperation fail to propose a part"
+ << ", opId: " << part->GetOperationId()
+ << ", at schemeshard: " << selfId
+ << ", already accepted parts: " << operation->Parts.size()
+ << ", propose result status: " << NKikimrScheme::EStatus_Name(response->Record.GetStatus())
+ << ", with reason: " << response->Record.GetReason()
+ << ", tx message: " << SecureDebugString(record));
+ }
+
+ Y_VERIFY_S(context.IsUndoChangesSafe(),
+ "Operation is aborted and all changes should be reverted"
+ << ", but context.IsUndoChangesSafe is false, which means some direct writes have been done"
+ << ", opId: " << part->GetOperationId()
+ << ", at schemeshard: " << selfId
+ << ", already accepted parts: " << operation->Parts.size()
+ << ", propose result status: " << NKikimrScheme::EStatus_Name(response->Record.GetStatus())
+ << ", with reason: " << response->Record.GetReason()
+ << ", tx message: " << SecureDebugString(record));
+
+ AbortOperationPropose(txId, context);
+
+ return false;
+ }
+
+ // Check suboperations for undo safety. Log first unsafe suboperation in the schema transaction.
+ if (prevProposeUndoSafe && !context.IsUndoChangesSafe()) {
+ prevProposeUndoSafe = false;
+
+ LOG_WARN_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
+ "Operation part proposed ok, but propose itself is undo unsafe"
+ << ", suboperation type: " << NKikimrSchemeOp::EOperationType_Name(part->GetTransaction().GetOperationType())
+ << ", opId: " << part->GetOperationId()
+ << ", at schemeshard: " << selfId
+ );
+ }
+ }
+
+ return true;
+}
+
THolder<TProposeResponse> TSchemeShard::IgniteOperation(TProposeRequest& request, TOperationContext& context) {
THolder<TProposeResponse> response = nullptr;
@@ -124,6 +213,11 @@ THolder<TProposeResponse> TSchemeShard::IgniteOperation(TProposeRequest& request
}
TVector<TTxTransaction> transactions;
+ TVector<TTxTransaction> generatedTransactions;
+
+ // # Phase One
+ // Generate MkDir transactions based on object name.
+
for (const auto& transaction : record.GetTransaction()) {
auto splitResult = operation->SplitIntoTransactions(transaction, context);
if (splitResult.Status != NKikimrScheme::StatusSuccess) {
@@ -132,89 +226,39 @@ THolder<TProposeResponse> TSchemeShard::IgniteOperation(TProposeRequest& request
return std::move(response);
}
- std::move(splitResult.Transactions.begin(), splitResult.Transactions.end(), std::back_inserter(transactions));
+ std::move(splitResult.Transactions.begin(), splitResult.Transactions.end(), std::back_inserter(generatedTransactions));
+ if (splitResult.Transaction) {
+ transactions.push_back(*splitResult.Transaction);
+ }
}
- const TString owner = record.HasOwner() ? record.GetOwner() : BUILTIN_ACL_ROOT;
+ //
+ Operations[txId] = operation; //record is erased at ApplyOnExecute if all parts are done at propose
bool prevProposeUndoSafe = true;
- Operations[txId] = operation; //record is erased at ApplyOnExecute if all parts are done at propose
+ // # Phase Two
+ // For generated MkDirs parts are constructed and proposed.
+ // It is done to simplify checks in dependent (splitted) transactions
- for (const auto& transaction : transactions) {
+ for (const auto& transaction : generatedTransactions) {
auto parts = operation->ConstructParts(transaction, context);
+ operation->PreparedParts += parts.size();
- if (parts.size() > 1) {
- // allow altering impl index tables as part of consistent operation
- context.IsAllowedPrivateTables = true;
+ if (!ProcessOperationParts(parts, txId, record, prevProposeUndoSafe, operation, response, context)) {
+ return std::move(response);
}
+ }
- for (auto& part : parts) {
- TString errStr;
- if (!context.SS->CheckInFlightLimit(part->GetTransaction().GetOperationType(), errStr)) {
- response.Reset(new TProposeResponse(NKikimrScheme::StatusResourceExhausted, ui64(txId), ui64(selfId)));
- response->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
- } else {
- response = part->Propose(owner, context);
- }
-
- Y_ABORT_UNLESS(response);
-
- LOG_NOTICE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- "IgniteOperation"
- << ", opId: " << operation->NextPartId()
- << ", propose status:" << NKikimrScheme::EStatus_Name(response->Record.GetStatus())
- << ", reason: " << response->Record.GetReason()
- << ", at schemeshard: " << selfId);
-
- if (response->IsDone()) {
- operation->AddPart(part); //at ApplyOnExecute parts is erased
- context.OnComplete.DoneOperation(part->GetOperationId()); //mark it here by self for sure
- } else if (response->IsConditionalAccepted()) {
- //happens on retries, we answer like AlreadyExist or StatusSuccess with error message and do nothing in operation
- operation->AddPart(part); //at ApplyOnExecute parts is erased
- context.OnComplete.DoneOperation(part->GetOperationId()); //mark it here by self for sure
- } else if (response->IsAccepted()) {
- operation->AddPart(part);
- //context.OnComplete.ActivateTx(partOpId) ///TODO maybe it is good idea
- } else {
- if (!operation->Parts.empty()) {
- LOG_NOTICE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- "Abort operation: IgniteOperation fail to propose a part"
- << ", opId: " << part->GetOperationId()
- << ", at schemeshard: " << selfId
- << ", already accepted parts: " << operation->Parts.size()
- << ", propose result status: " << NKikimrScheme::EStatus_Name(response->Record.GetStatus())
- << ", with reason: " << response->Record.GetReason()
- << ", tx message: " << SecureDebugString(record));
- }
-
- Y_VERIFY_S(context.IsUndoChangesSafe(),
- "Operation is aborted and all changes should be reverted"
- << ", but context.IsUndoChangesSafe is false, which means some direct writes have been done"
- << ", opId: " << part->GetOperationId()
- << ", at schemeshard: " << selfId
- << ", already accepted parts: " << operation->Parts.size()
- << ", propose result status: " << NKikimrScheme::EStatus_Name(response->Record.GetStatus())
- << ", with reason: " << response->Record.GetReason()
- << ", tx message: " << SecureDebugString(record));
-
- AbortOperationPropose(txId, context);
-
- return std::move(response);
- }
+ // # Phase Three
+ // For all initial transactions parts are constructed and proposed
- // Check suboperations for undo safety. Log first unsafe suboperation in the schema transaction.
- if (prevProposeUndoSafe && !context.IsUndoChangesSafe()) {
- prevProposeUndoSafe = false;
+ for (const auto& transaction : transactions) {
+ auto parts = operation->ConstructParts(transaction, context);
+ operation->PreparedParts += parts.size();
- LOG_WARN_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- "Operation part proposed ok, but propose itself is undo unsafe"
- << ", suboperation type: " << NKikimrSchemeOp::EOperationType_Name(part->GetTransaction().GetOperationType())
- << ", opId: " << part->GetOperationId()
- << ", at schemeshard: " << selfId
- );
- }
+ if (!ProcessOperationParts(parts, txId, record, prevProposeUndoSafe, operation, response, context)) {
+ return std::move(response);
}
}
@@ -743,194 +787,14 @@ TOperation::TConsumeQuotaResult TOperation::ConsumeQuota(const TTxTransaction& t
return result;
}
-TOperation::TSplitTransactionsResult TOperation::SplitIntoTransactions(const TTxTransaction& tx, const TOperationContext& context) {
- TSplitTransactionsResult result;
-
- const TPath parentPath = TPath::Resolve(tx.GetWorkingDir(), context.SS);
- {
- TPath::TChecker checks = parentPath.Check();
- checks
- .NotUnderDomainUpgrade()
- .IsAtLocalSchemeShard()
- .IsResolved()
- .NotDeleted()
- .NotUnderDeleting()
- .IsCommonSensePath()
- .IsLikeDirectory();
-
- if (!checks) {
- result.Transactions.push_back(tx);
- return result;
- }
- }
-
- TString targetName;
-
- switch (tx.GetOperationType()) {
- case NKikimrSchemeOp::EOperationType::ESchemeOpMkDir:
- targetName = tx.GetMkDir().GetName();
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateTable:
- if (tx.GetCreateTable().HasCopyFromTable()) {
- result.Transactions.push_back(tx);
- return result;
- }
- targetName = tx.GetCreateTable().GetName();
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreatePersQueueGroup:
- targetName = tx.GetCreatePersQueueGroup().GetName();
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateSubDomain:
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateExtSubDomain:
- targetName = tx.GetSubDomain().GetName();
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateRtmrVolume:
- targetName = tx.GetCreateRtmrVolume().GetName();
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateBlockStoreVolume:
- targetName = tx.GetCreateBlockStoreVolume().GetName();
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateFileStore:
- targetName = tx.GetCreateFileStore().GetName();
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateKesus:
- targetName = tx.GetKesus().GetName();
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateSolomonVolume:
- targetName = tx.GetCreateSolomonVolume().GetName();
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateIndexedTable:
- targetName = tx.GetCreateIndexedTable().GetTableDescription().GetName();
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateColumnStore:
- targetName = tx.GetCreateColumnStore().GetName();
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateColumnTable:
- targetName = tx.GetCreateColumnTable().GetName();
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateExternalTable:
- targetName = tx.GetCreateExternalTable().GetName();
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateExternalDataSource:
- targetName = tx.GetCreateExternalDataSource().GetName();
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateView:
- targetName = tx.GetCreateView().GetName();
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateResourcePool:
- targetName = tx.GetCreateResourcePool().GetName();
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateBackupCollection:
- targetName = tx.GetCreateBackupCollection().GetName();
- break;
- default:
- result.Transactions.push_back(tx);
- return result;
- }
-
- if (!targetName || targetName.StartsWith('/') || targetName.EndsWith('/')) {
- result.Transactions.push_back(tx);
- return result;
- }
-
- TPath path = TPath::Resolve(JoinPath(tx.GetWorkingDir(), targetName), context.SS);
- {
- TPath::TChecker checks = path.Check();
- checks.IsAtLocalSchemeShard();
-
- bool exists = false;
- if (path.IsResolved()) {
- checks.IsResolved();
- exists = !path.IsDeleted();
- } else {
- checks
- .NotEmpty()
- .NotResolved();
- }
-
- if (checks && !exists) {
- checks.IsValidLeafName();
- }
-
- if (!checks) {
- result.Status = checks.GetStatus();
- result.Reason = checks.GetError();
- result.Transactions.push_back(tx);
- return result;
- }
-
- const TString name = path.LeafName();
- path.Rise();
-
- TTxTransaction create(tx);
- create.SetWorkingDir(path.PathString());
- create.SetFailOnExist(tx.GetFailOnExist());
-
- switch (tx.GetOperationType()) {
- case NKikimrSchemeOp::EOperationType::ESchemeOpMkDir:
- create.MutableMkDir()->SetName(name);
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateTable:
- create.MutableCreateTable()->SetName(name);
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreatePersQueueGroup:
- create.MutableCreatePersQueueGroup()->SetName(name);
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateSubDomain:
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateExtSubDomain:
- create.MutableSubDomain()->SetName(name);
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateRtmrVolume:
- create.MutableCreateRtmrVolume()->SetName(name);
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateBlockStoreVolume:
- create.MutableCreateBlockStoreVolume()->SetName(name);
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateFileStore:
- create.MutableCreateFileStore()->SetName(name);
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateKesus:
- create.MutableKesus()->SetName(name);
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateSolomonVolume:
- create.MutableCreateSolomonVolume()->SetName(name);
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateIndexedTable:
- create.MutableCreateIndexedTable()->MutableTableDescription()->SetName(name);
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateColumnStore:
- create.MutableCreateColumnStore()->SetName(name);
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateColumnTable:
- create.MutableCreateColumnTable()->SetName(name);
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateExternalTable:
- create.MutableCreateExternalTable()->SetName(name);
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateExternalDataSource:
- create.MutableCreateExternalDataSource()->SetName(name);
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateView:
- create.MutableCreateView()->SetName(name);
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateResourcePool:
- create.MutableCreateResourcePool()->SetName(name);
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateBackupCollection:
- create.MutableCreateBackupCollection()->SetName(name);
- break;
- default:
- Y_ABORT("Invariant violation");
- }
-
- result.Transactions.push_back(create);
+bool CreateDirs(const TTxTransaction& tx, const TPath& parentPath, TPath path, THashSet<TString>& createdPaths, TOperation::TSplitTransactionsResult& result) {
+ auto initialSize = result.Transactions.size();
- if (exists) {
- return result;
+ while (path != parentPath) {
+ if (createdPaths.contains(path.PathString())) {
+ continue;
}
- }
- while (path != parentPath) {
TPath::TChecker checks = path.Check();
checks
.NotUnderDomainUpgrade()
@@ -966,10 +830,11 @@ TOperation::TSplitTransactionsResult TOperation::SplitIntoTransactions(const TTx
result.Status = checks.GetStatus();
result.Reason = checks.GetError();
result.Transactions.clear();
- result.Transactions.push_back(tx);
- return result;
+ result.Transaction = tx;
+ return false;
}
+ createdPaths.emplace(path.PathString());
const TString name = path.LeafName();
path.Rise();
@@ -982,7 +847,150 @@ TOperation::TSplitTransactionsResult TOperation::SplitIntoTransactions(const TTx
result.Transactions.push_back(mkdir);
}
- Reverse(result.Transactions.begin(), result.Transactions.end());
+ Reverse(result.Transactions.begin() + initialSize, result.Transactions.end());
+
+ return true;
+}
+
+// # Generates additional MkDirs for transactions
+TOperation::TSplitTransactionsResult TOperation::SplitIntoTransactions(const TTxTransaction& tx, const TOperationContext& context) {
+ using namespace NGenerated;
+
+ TSplitTransactionsResult result;
+ THashSet<TString> createdPaths;
+
+ // # Generates MkDirs based on WorkingDir and path
+ // WorkingDir | TxType.ObjectName
+ // /Root/some_dir | other_dir/another_dir/object_name
+ // ^---------^----------^
+ // MkDir('/Root/some_dir', 'other_dir') and MkDir('/Root/some_dir/other_dir', 'another_dir') will be generated
+ // tx.WorkingDir will be changed to '/Root/some_dir/other_dir/another_dir'
+ // tx.TxType.ObjectName will be changed to 'object_name'
+ if (DispatchOp(tx, [&](auto traits) { return traits.CreateDirsFromName; })) {
+ TString targetName;
+
+ if (!DispatchOp(tx, [&](auto traits) {
+ auto name = traits.GetTargetName(tx);
+ if (name) {
+ targetName = *name;
+ return true;
+ }
+ return false;
+ }))
+ {
+ result.Transaction = tx;
+ return result;
+ }
+
+ if (!targetName || targetName.StartsWith('/') || targetName.EndsWith('/')) {
+ result.Transaction = tx;
+ return result;
+ }
+
+ const TPath parentPath = TPath::Resolve(tx.GetWorkingDir(), context.SS);
+ {
+ TPath::TChecker checks = parentPath.Check();
+ checks
+ .NotUnderDomainUpgrade()
+ .IsAtLocalSchemeShard()
+ .IsResolved()
+ .NotDeleted()
+ .NotUnderDeleting()
+ .IsCommonSensePath()
+ .IsLikeDirectory();
+
+ if (!checks) {
+ result.Transaction = tx;
+ return result;
+ }
+ }
+
+ TPath path = TPath::Resolve(JoinPath(tx.GetWorkingDir(), targetName), context.SS);
+ {
+ TPath::TChecker checks = path.Check();
+ checks.IsAtLocalSchemeShard();
+
+ bool exists = false;
+ if (path.IsResolved()) {
+ checks.IsResolved();
+ exists = !path.IsDeleted();
+ } else {
+ checks
+ .NotEmpty()
+ .NotResolved();
+ }
+
+ if (checks && !exists) {
+ checks.IsValidLeafName();
+ }
+
+ if (!checks) {
+ result.Status = checks.GetStatus();
+ result.Reason = checks.GetError();
+ result.Transaction = tx;
+ return result;
+ }
+
+ const TString name = path.LeafName();
+ path.Rise();
+
+ TTxTransaction create(tx);
+ create.SetWorkingDir(path.PathString());
+ create.SetFailOnExist(tx.GetFailOnExist());
+
+ if (!DispatchOp(tx, [&](auto traits) {
+ return traits.SetName(create, name);
+ }))
+ {
+ Y_ABORT("Invariant violation");
+ }
+
+ result.Transaction = create;
+
+ if (exists) {
+ return result;
+ }
+ }
+
+ if (!CreateDirs(tx, parentPath, path, createdPaths, result)) {
+ return result;
+ }
+ }
+
+ // # Generates MkDirs based on transaction-specific requirements
+ if (DispatchOp(tx, [&](auto traits) { return traits.CreateAdditionalDirs; })) {
+ for (const auto& [parentPathStr, pathStrs] : DispatchOp(tx, [&](auto traits) { return traits.GetRequiredPaths(tx); })) {
+ const TPath parentPath = TPath::Resolve(parentPathStr, context.SS);
+ {
+ TPath::TChecker checks = parentPath.Check();
+ checks
+ .NotUnderDomainUpgrade()
+ .IsAtLocalSchemeShard()
+ .IsResolved()
+ .NotDeleted()
+ .NotUnderDeleting()
+ .IsCommonSensePath()
+ .IsLikeDirectory();
+
+ if (!checks) {
+ result.Transaction = tx;
+ return result;
+ }
+ }
+
+ for (const auto& pathStr : pathStrs) {
+ TPath path = TPath::Resolve(JoinPath(parentPathStr, pathStr), context.SS);
+ if (!CreateDirs(tx, parentPath, path, createdPaths, result)) {
+ return result;
+ }
+ }
+ }
+ }
+
+ if (!result.Transaction) {
+ result.Transaction = tx;
+ }
+
return result;
}
diff --git a/ydb/core/tx/schemeshard/schemeshard__operation.h b/ydb/core/tx/schemeshard/schemeshard__operation.h
index 45c132d56f..5d22bd498b 100644
--- a/ydb/core/tx/schemeshard/schemeshard__operation.h
+++ b/ydb/core/tx/schemeshard/schemeshard__operation.h
@@ -11,6 +11,7 @@ struct TOperation: TSimpleRefCount<TOperation> {
using TPtr = TIntrusivePtr<TOperation>;
const TTxId TxId;
+ ui32 PreparedParts = 0;
TVector<ISubOperation::TPtr> Parts;
THashSet<TActorId> Subscribers;
@@ -67,6 +68,7 @@ struct TOperation: TSimpleRefCount<TOperation> {
NKikimrScheme::EStatus Status = NKikimrScheme::StatusSuccess;
TString Reason;
TVector<TTxTransaction> Transactions;
+ std::optional<TTxTransaction> Transaction;
};
TOperation(TTxId txId)
@@ -144,7 +146,7 @@ struct TOperation: TSimpleRefCount<TOperation> {
}
TOperationId NextPartId() const {
- return TOperationId(TxId, TSubTxId(Parts.size()));
+ return TOperationId(TxId, TSubTxId(PreparedParts));
}
};
diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h
index a7a3efb69d..eadd7af21c 100644
--- a/ydb/core/tx/schemeshard/schemeshard_impl.h
+++ b/ydb/core/tx/schemeshard/schemeshard_impl.h
@@ -378,6 +378,14 @@ public:
NExternalSource::IExternalSourceFactory::TPtr ExternalSourceFactory{NExternalSource::CreateExternalSourceFactory({})};
THolder<TProposeResponse> IgniteOperation(TProposeRequest& request, TOperationContext& context);
+ bool ProcessOperationParts(
+ const TVector<ISubOperation::TPtr>& parts,
+ const TTxId& txId,
+ const NKikimrScheme::TEvModifySchemeTransaction& record,
+ bool prevProposeUndoSafe,
+ TOperation::TPtr& operation,
+ THolder<TProposeResponse>& response,
+ TOperationContext& context);
void AbortOperationPropose(const TTxId txId, TOperationContext& context);
THolder<TEvDataShard::TEvProposeTransaction> MakeDataShardProposal(const TPathId& pathId, const TOperationId& opId,
diff --git a/ydb/core/tx/schemeshard/ya.make b/ydb/core/tx/schemeshard/ya.make
index 5d77353826..4ae1ca2832 100644
--- a/ydb/core/tx/schemeshard/ya.make
+++ b/ydb/core/tx/schemeshard/ya.make
@@ -283,6 +283,7 @@ PEERDIR(
ydb/core/tx/datashard
ydb/core/tx/schemeshard/backup
ydb/core/tx/schemeshard/common
+ ydb/core/tx/schemeshard/generated
ydb/core/tx/schemeshard/olap
ydb/core/tx/scheme_board
ydb/core/tx/tx_allocator_client