diff options
author | Innokentii Mokin <innokentii@ydb.tech> | 2024-11-12 14:01:22 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-11-12 14:01:22 +0300 |
commit | 868a62aae0b39369d21c71dd3bc710b4b0d21783 (patch) | |
tree | 9fef426e7dc336e02998763bbe3327acbc12fdea | |
parent | 667a30d8c2d6c3c312cb3f81db1e07790bc99483 (diff) | |
download | ydb-868a62aae0b39369d21c71dd3bc710b4b0d21783.tar.gz |
[SchemeShard] Add required path creation (#11445)
-rw-r--r-- | ydb/core/tx/schemeshard/generated/codegen/main.cpp | 77 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/generated/codegen/ya.make | 10 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/generated/dispatch_op.h.in | 20 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/generated/traits.h | 293 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/generated/ya.make | 25 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__init.cpp | 1 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__operation.cpp | 526 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard__operation.h | 4 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/schemeshard_impl.h | 8 | ||||
-rw-r--r-- | ydb/core/tx/schemeshard/ya.make | 1 |
10 files changed, 705 insertions, 260 deletions
diff --git a/ydb/core/tx/schemeshard/generated/codegen/main.cpp b/ydb/core/tx/schemeshard/generated/codegen/main.cpp new file mode 100644 index 0000000000..9630facd86 --- /dev/null +++ b/ydb/core/tx/schemeshard/generated/codegen/main.cpp @@ -0,0 +1,77 @@ +#include <util/stream/file.h> +#include <util/stream/output.h> +#include <util/string/builder.h> +#include <util/system/src_location.h> +#include <ydb/core/protos/flat_scheme_op.pb.h> +#include <google/protobuf/descriptor.pb.h> + +#include <jinja2cpp/template_env.h> +#include <jinja2cpp/template.h> +#include <jinja2cpp/value.h> +#include <jinja2cpp/reflected_value.h> +#include <cstdint> +#include <string> +#include <vector> +#include <iostream> + +std::string replace( + const std::string& str, + const std::string& from, + const std::string& to) +{ + std::string result; + + size_t pos = str.find(from); + result.append(str, 0, pos); + result.append(to); + result.append(str, pos + from.size(), std::string::npos); + + return result; +} + +int main(int argc, char** argv) { + if (argc < 3) { + Cerr << "Usage: " << argv[0] << " INPUT OUTPUT ..." << Endl; + return 1; + } + + std::vector<std::string> opTypes; + const auto* d = NKikimrSchemeOp::EOperationType_descriptor(); + for (int i = 0; i < d->value_count(); ++i) { + const auto* v = d->value(i); + if (v) { + auto name = v->full_name(); + name = replace(name, ".", "::"); + opTypes.emplace_back(name); + } + } + + jinja2::TemplateEnv env; + env.AddGlobal("generator", jinja2::Reflect(std::string(__SOURCE_FILE__))); + env.AddGlobal("opTypes", jinja2::Reflect(opTypes)); + + for (int i = 1; i < argc; i += 2) { + if (!(i + 1 < argc)) { + Cerr << "ERROR: missing output for " << argv[i] << Endl; + return 1; + } + + jinja2::Template t(&env); + auto loaded = t.Load(TFileInput(argv[i]).ReadAll(), argv[i]); + if (!loaded) { + Cerr << "ERROR: " << loaded.error().ToString() << Endl; + return 1; + } + + auto rendered = t.RenderAsString({}); + if (!rendered) { + Cerr << "ERROR: " << rendered.error().ToString() << Endl; + return 1; + } + + TFileOutput(argv[i + 1]).Write(rendered.value()); + Cout << "Generated " << argv[i + 1] << " from " << argv[i] << Endl; + } + + return 0; +} diff --git a/ydb/core/tx/schemeshard/generated/codegen/ya.make b/ydb/core/tx/schemeshard/generated/codegen/ya.make new file mode 100644 index 0000000000..d8b1a352e1 --- /dev/null +++ b/ydb/core/tx/schemeshard/generated/codegen/ya.make @@ -0,0 +1,10 @@ +PROGRAM() + +SRCS(main.cpp) + +PEERDIR( + contrib/libs/jinja2cpp + ydb/core/protos +) + +END() diff --git a/ydb/core/tx/schemeshard/generated/dispatch_op.h.in b/ydb/core/tx/schemeshard/generated/dispatch_op.h.in new file mode 100644 index 0000000000..32ef9596a5 --- /dev/null +++ b/ydb/core/tx/schemeshard/generated/dispatch_op.h.in @@ -0,0 +1,20 @@ +// Auto-generated by {{ generator }}, do not modify. +#pragma once + +#include <ydb/core/tx/schemeshard/generated/traits.h> + +namespace NKikimr::NSchemeShard::NGenerated { + +template <class TFn> +constexpr auto DispatchOp(const TTxTransaction& tx, TFn fn) { + switch (tx.GetOperationType()) { +{% for opType in opTypes %} + case {{ opType }}: + return fn(TSchemeTxTraits<{{ opType }}>{}); +{% endfor %} + default: + return fn(TSchemeTxTraitsFallback{}); + } +} + +} // namespace NKikimr::NSchemeShard::NGenerated diff --git a/ydb/core/tx/schemeshard/generated/traits.h b/ydb/core/tx/schemeshard/generated/traits.h new file mode 100644 index 0000000000..e848c89146 --- /dev/null +++ b/ydb/core/tx/schemeshard/generated/traits.h @@ -0,0 +1,293 @@ +#pragma once + +#include <util/generic/string.h> +#include <util/generic/map.h> + +#include <optional> + +#include <ydb/core/protos/flat_scheme_op.pb.h> + +namespace NKikimr::NSchemeShard::NGenerated { + +using TTxTransaction = NKikimrSchemeOp::TModifyScheme; + +struct TSchemeTxTraitsFallback { + static std::optional<TString> GetTargetName(const TTxTransaction& tx) { + Y_UNUSED(tx); + return std::nullopt; + } + + static bool SetName(TTxTransaction& tx, const TString& name) { + Y_UNUSED(tx, name); + return false; + } + + static THashMap<TString, THashSet<TString>> GetRequiredPaths(const TTxTransaction& tx) { + Y_UNUSED(tx); + return {}; + } + + constexpr inline static bool CreateDirsFromName = false; + + constexpr inline static bool CreateAdditionalDirs = false; +}; + +template <NKikimrSchemeOp::EOperationType opType> +struct TSchemeTxTraits : public TSchemeTxTraitsFallback {}; + +template <> +struct TSchemeTxTraits<NKikimrSchemeOp::EOperationType::ESchemeOpMkDir> : public TSchemeTxTraitsFallback { + static std::optional<TString> GetTargetName(const TTxTransaction& tx) { + return tx.GetMkDir().GetName(); + } + + static bool SetName(TTxTransaction& tx, const TString& name) { + tx.MutableMkDir()->SetName(name); + return true; + } + + constexpr inline static bool CreateDirsFromName = true; +}; + +template <> +struct TSchemeTxTraits<NKikimrSchemeOp::EOperationType::ESchemeOpCreateTable> : public TSchemeTxTraitsFallback { + static std::optional<TString> GetTargetName(const TTxTransaction& tx) { + if (tx.GetCreateTable().HasCopyFromTable()) { + return std::nullopt; + } + return tx.GetCreateTable().GetName(); + } + + static bool SetName(TTxTransaction& tx, const TString& name) { + tx.MutableCreateTable()->SetName(name); + return true; + } + + constexpr inline static bool CreateDirsFromName = true; +}; + +template <> +struct TSchemeTxTraits<NKikimrSchemeOp::EOperationType::ESchemeOpCreatePersQueueGroup> : public TSchemeTxTraitsFallback { + static std::optional<TString> GetTargetName(const TTxTransaction& tx) { + return tx.GetCreatePersQueueGroup().GetName(); + } + + static bool SetName(TTxTransaction& tx, const TString& name) { + tx.MutableCreatePersQueueGroup()->SetName(name); + return true; + } + + constexpr inline static bool CreateDirsFromName = true; +}; + +template <> +struct TSchemeTxTraits<NKikimrSchemeOp::EOperationType::ESchemeOpCreateSubDomain> : public TSchemeTxTraitsFallback { + static std::optional<TString> GetTargetName(const TTxTransaction& tx) { + return tx.GetSubDomain().GetName(); + } + + static bool SetName(TTxTransaction& tx, const TString& name) { + tx.MutableSubDomain()->SetName(name); + return true; + } + + constexpr inline static bool CreateDirsFromName = true; +}; + +template <> +struct TSchemeTxTraits<NKikimrSchemeOp::EOperationType::ESchemeOpCreateExtSubDomain> : public TSchemeTxTraitsFallback { + static std::optional<TString> GetTargetName(const TTxTransaction& tx) { + return tx.GetSubDomain().GetName(); + } + + static bool SetName(TTxTransaction& tx, const TString& name) { + tx.MutableSubDomain()->SetName(name); + return true; + } + + constexpr inline static bool CreateDirsFromName = true; +}; + +template <> +struct TSchemeTxTraits<NKikimrSchemeOp::EOperationType::ESchemeOpCreateRtmrVolume> : public TSchemeTxTraitsFallback { + static std::optional<TString> GetTargetName(const TTxTransaction& tx) { + return tx.GetCreateRtmrVolume().GetName(); + } + + static bool SetName(TTxTransaction& tx, const TString& name) { + tx.MutableCreateRtmrVolume()->SetName(name); + return true; + } + + constexpr inline static bool CreateDirsFromName = true; +}; + +template <> +struct TSchemeTxTraits<NKikimrSchemeOp::EOperationType::ESchemeOpCreateBlockStoreVolume> : public TSchemeTxTraitsFallback { + static std::optional<TString> GetTargetName(const TTxTransaction& tx) { + return tx.GetCreateBlockStoreVolume().GetName(); + } + + static bool SetName(TTxTransaction& tx, const TString& name) { + tx.MutableCreateBlockStoreVolume()->SetName(name); + return true; + } + + constexpr inline static bool CreateDirsFromName = true; +}; + +template <> +struct TSchemeTxTraits<NKikimrSchemeOp::EOperationType::ESchemeOpCreateFileStore> : public TSchemeTxTraitsFallback { + static std::optional<TString> GetTargetName(const TTxTransaction& tx) { + return tx.GetCreateFileStore().GetName(); + } + + static bool SetName(TTxTransaction& tx, const TString& name) { + tx.MutableCreateFileStore()->SetName(name); + return true; + } + + constexpr inline static bool CreateDirsFromName = true; +}; + +template <> +struct TSchemeTxTraits<NKikimrSchemeOp::EOperationType::ESchemeOpCreateKesus> : public TSchemeTxTraitsFallback { + static std::optional<TString> GetTargetName(const TTxTransaction& tx) { + return tx.GetKesus().GetName(); + } + + static bool SetName(TTxTransaction& tx, const TString& name) { + tx.MutableKesus()->SetName(name); + return true; + } + + constexpr inline static bool CreateDirsFromName = true; +}; + +template <> +struct TSchemeTxTraits<NKikimrSchemeOp::EOperationType::ESchemeOpCreateSolomonVolume> : public TSchemeTxTraitsFallback { + static std::optional<TString> GetTargetName(const TTxTransaction& tx) { + return tx.GetCreateSolomonVolume().GetName(); + } + + static bool SetName(TTxTransaction& tx, const TString& name) { + tx.MutableCreateSolomonVolume()->SetName(name); + return true; + } + + constexpr inline static bool CreateDirsFromName = true; +}; + +template <> +struct TSchemeTxTraits<NKikimrSchemeOp::EOperationType::ESchemeOpCreateIndexedTable> : public TSchemeTxTraitsFallback { + static std::optional<TString> GetTargetName(const TTxTransaction& tx) { + return tx.GetCreateIndexedTable().GetTableDescription().GetName(); + } + + static bool SetName(TTxTransaction& tx, const TString& name) { + tx.MutableCreateIndexedTable()->MutableTableDescription()->SetName(name); + return true; + } + + constexpr inline static bool CreateDirsFromName = true; +}; + +template <> +struct TSchemeTxTraits<NKikimrSchemeOp::EOperationType::ESchemeOpCreateColumnStore> : public TSchemeTxTraitsFallback { + static std::optional<TString> GetTargetName(const TTxTransaction& tx) { + return tx.GetCreateColumnStore().GetName(); + } + + static bool SetName(TTxTransaction& tx, const TString& name) { + tx.MutableCreateColumnStore()->SetName(name); + return true; + } + + constexpr inline static bool CreateDirsFromName = true; +}; + +template <> +struct TSchemeTxTraits<NKikimrSchemeOp::EOperationType::ESchemeOpCreateColumnTable> : public TSchemeTxTraitsFallback { + static std::optional<TString> GetTargetName(const TTxTransaction& tx) { + return tx.GetCreateColumnTable().GetName(); + } + + static bool SetName(TTxTransaction& tx, const TString& name) { + tx.MutableCreateColumnTable()->SetName(name); + return true; + } + + constexpr inline static bool CreateDirsFromName = true; +}; + +template <> +struct TSchemeTxTraits<NKikimrSchemeOp::EOperationType::ESchemeOpCreateExternalTable> : public TSchemeTxTraitsFallback { + static std::optional<TString> GetTargetName(const TTxTransaction& tx) { + return tx.GetCreateExternalTable().GetName(); + } + + static bool SetName(TTxTransaction& tx, const TString& name) { + tx.MutableCreateExternalTable()->SetName(name); + return true; + } + + constexpr inline static bool CreateDirsFromName = true; +}; + +template <> +struct TSchemeTxTraits<NKikimrSchemeOp::EOperationType::ESchemeOpCreateExternalDataSource> : public TSchemeTxTraitsFallback { + static std::optional<TString> GetTargetName(const TTxTransaction& tx) { + return tx.GetCreateExternalDataSource().GetName(); + } + + static bool SetName(TTxTransaction& tx, const TString& name) { + tx.MutableCreateExternalDataSource()->SetName(name); + return true; + } + + constexpr inline static bool CreateDirsFromName = true; +}; + +template <> +struct TSchemeTxTraits<NKikimrSchemeOp::EOperationType::ESchemeOpCreateView> : public TSchemeTxTraitsFallback { + static std::optional<TString> GetTargetName(const TTxTransaction& tx) { + return tx.GetCreateView().GetName(); + } + + static bool SetName(TTxTransaction& tx, const TString& name) { + tx.MutableCreateView()->SetName(name); + return true; + } + + constexpr inline static bool CreateDirsFromName = true; +}; + +template <> +struct TSchemeTxTraits<NKikimrSchemeOp::EOperationType::ESchemeOpCreateResourcePool> : public TSchemeTxTraitsFallback { + static std::optional<TString> GetTargetName(const TTxTransaction& tx) { + return tx.GetCreateResourcePool().GetName(); + } + + static bool SetName(TTxTransaction& tx, const TString& name) { + tx.MutableCreateResourcePool()->SetName(name); + return true; + } + + constexpr inline static bool CreateDirsFromName = true; +}; + +template <> +struct TSchemeTxTraits<NKikimrSchemeOp::EOperationType::ESchemeOpCreateBackupCollection> : public TSchemeTxTraitsFallback { + static std::optional<TString> GetTargetName(const TTxTransaction& tx) { + return tx.GetCreateBackupCollection().GetName(); + } + + static bool SetName(TTxTransaction& tx, const TString& name) { + tx.MutableCreateBackupCollection()->SetName(name); + return true; + } + + constexpr inline static bool CreateDirsFromName = true; +}; + +} // namespace NKikimr::NSchemeShard::NGenerated diff --git a/ydb/core/tx/schemeshard/generated/ya.make b/ydb/core/tx/schemeshard/generated/ya.make new file mode 100644 index 0000000000..2ffb6a008c --- /dev/null +++ b/ydb/core/tx/schemeshard/generated/ya.make @@ -0,0 +1,25 @@ +LIBRARY() + +PEERDIR( + ydb/core/protos +) + +RUN_PROGRAM( + ydb/core/tx/schemeshard/generated/codegen + dispatch_op.h.in + dispatch_op.h + IN dispatch_op.h.in + OUT dispatch_op.h + OUTPUT_INCLUDES + ydb/core/tx/schemeshard/generated/traits.h +) + +SRCS( + traits.h +) + +END() + +RECURSE( + codegen +) diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp index f0b61571e3..e8143a224b 100644 --- a/ydb/core/tx/schemeshard/schemeshard__init.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp @@ -3591,6 +3591,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> { Y_ABORT_UNLESS(operationId.GetSubTxId() == operation->Parts.size()); TOperationContext context{Self, txc, ctx, OnComplete, MemChanges, DbChanges}; ISubOperation::TPtr part = operation->RestorePart(txState.TxType, txState.State, context); + ++(operation->PreparedParts); operation->AddPart(part); if (!txInFlightRowset.Next()) diff --git a/ydb/core/tx/schemeshard/schemeshard__operation.cpp b/ydb/core/tx/schemeshard/schemeshard__operation.cpp index 68188671e1..fdf50ff7f1 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__operation.cpp @@ -8,6 +8,7 @@ #include "schemeshard_audit_log.h" #include "schemeshard_impl.h" +#include <ydb/core/tx/schemeshard/generated/dispatch_op.h> #include <ydb/core/tablet/tablet_exception.h> #include <ydb/core/tablet_flat/flat_cxx_database.h> #include <ydb/core/tablet_flat/tablet_flat_executor.h> @@ -87,6 +88,94 @@ NKikimrScheme::TEvModifySchemeTransaction GetRecordForPrint(const NKikimrScheme: return recordForPrint; } +bool TSchemeShard::ProcessOperationParts( + const TVector<ISubOperation::TPtr>& parts, + const TTxId& txId, + const NKikimrScheme::TEvModifySchemeTransaction& record, + bool prevProposeUndoSafe, + TOperation::TPtr& operation, + THolder<TProposeResponse>& response, + TOperationContext& context) +{ + auto selfId = SelfTabletId(); + const TString owner = record.HasOwner() ? record.GetOwner() : BUILTIN_ACL_ROOT; + + if (parts.size() > 1) { + // allow altering impl index tables as part of consistent operation + context.IsAllowedPrivateTables = true; + } + + for (auto& part : parts) { + TString errStr; + if (!context.SS->CheckInFlightLimit(part->GetTransaction().GetOperationType(), errStr)) { + response.Reset(new TProposeResponse(NKikimrScheme::StatusResourceExhausted, ui64(txId), ui64(selfId))); + response->SetError(NKikimrScheme::StatusResourceExhausted, errStr); + } else { + response = part->Propose(owner, context); + } + + Y_ABORT_UNLESS(response); + + LOG_NOTICE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "IgniteOperation" + << ", opId: " << operation->NextPartId() + << ", propose status:" << NKikimrScheme::EStatus_Name(response->Record.GetStatus()) + << ", reason: " << response->Record.GetReason() + << ", at schemeshard: " << selfId); + + if (response->IsDone()) { + operation->AddPart(part); //at ApplyOnExecute parts is erased + context.OnComplete.DoneOperation(part->GetOperationId()); //mark it here by self for sure + } else if (response->IsConditionalAccepted()) { + //happens on retries, we answer like AlreadyExist or StatusSuccess with error message and do nothing in operation + operation->AddPart(part); //at ApplyOnExecute parts is erased + context.OnComplete.DoneOperation(part->GetOperationId()); //mark it here by self for sure + } else if (response->IsAccepted()) { + operation->AddPart(part); + //context.OnComplete.ActivateTx(partOpId) ///TODO maybe it is good idea + } else { + if (!operation->Parts.empty()) { + LOG_NOTICE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "Abort operation: IgniteOperation fail to propose a part" + << ", opId: " << part->GetOperationId() + << ", at schemeshard: " << selfId + << ", already accepted parts: " << operation->Parts.size() + << ", propose result status: " << NKikimrScheme::EStatus_Name(response->Record.GetStatus()) + << ", with reason: " << response->Record.GetReason() + << ", tx message: " << SecureDebugString(record)); + } + + Y_VERIFY_S(context.IsUndoChangesSafe(), + "Operation is aborted and all changes should be reverted" + << ", but context.IsUndoChangesSafe is false, which means some direct writes have been done" + << ", opId: " << part->GetOperationId() + << ", at schemeshard: " << selfId + << ", already accepted parts: " << operation->Parts.size() + << ", propose result status: " << NKikimrScheme::EStatus_Name(response->Record.GetStatus()) + << ", with reason: " << response->Record.GetReason() + << ", tx message: " << SecureDebugString(record)); + + AbortOperationPropose(txId, context); + + return false; + } + + // Check suboperations for undo safety. Log first unsafe suboperation in the schema transaction. + if (prevProposeUndoSafe && !context.IsUndoChangesSafe()) { + prevProposeUndoSafe = false; + + LOG_WARN_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, + "Operation part proposed ok, but propose itself is undo unsafe" + << ", suboperation type: " << NKikimrSchemeOp::EOperationType_Name(part->GetTransaction().GetOperationType()) + << ", opId: " << part->GetOperationId() + << ", at schemeshard: " << selfId + ); + } + } + + return true; +} + THolder<TProposeResponse> TSchemeShard::IgniteOperation(TProposeRequest& request, TOperationContext& context) { THolder<TProposeResponse> response = nullptr; @@ -124,6 +213,11 @@ THolder<TProposeResponse> TSchemeShard::IgniteOperation(TProposeRequest& request } TVector<TTxTransaction> transactions; + TVector<TTxTransaction> generatedTransactions; + + // # Phase One + // Generate MkDir transactions based on object name. + for (const auto& transaction : record.GetTransaction()) { auto splitResult = operation->SplitIntoTransactions(transaction, context); if (splitResult.Status != NKikimrScheme::StatusSuccess) { @@ -132,89 +226,39 @@ THolder<TProposeResponse> TSchemeShard::IgniteOperation(TProposeRequest& request return std::move(response); } - std::move(splitResult.Transactions.begin(), splitResult.Transactions.end(), std::back_inserter(transactions)); + std::move(splitResult.Transactions.begin(), splitResult.Transactions.end(), std::back_inserter(generatedTransactions)); + if (splitResult.Transaction) { + transactions.push_back(*splitResult.Transaction); + } } - const TString owner = record.HasOwner() ? record.GetOwner() : BUILTIN_ACL_ROOT; + // + Operations[txId] = operation; //record is erased at ApplyOnExecute if all parts are done at propose bool prevProposeUndoSafe = true; - Operations[txId] = operation; //record is erased at ApplyOnExecute if all parts are done at propose + // # Phase Two + // For generated MkDirs parts are constructed and proposed. + // It is done to simplify checks in dependent (splitted) transactions - for (const auto& transaction : transactions) { + for (const auto& transaction : generatedTransactions) { auto parts = operation->ConstructParts(transaction, context); + operation->PreparedParts += parts.size(); - if (parts.size() > 1) { - // allow altering impl index tables as part of consistent operation - context.IsAllowedPrivateTables = true; + if (!ProcessOperationParts(parts, txId, record, prevProposeUndoSafe, operation, response, context)) { + return std::move(response); } + } - for (auto& part : parts) { - TString errStr; - if (!context.SS->CheckInFlightLimit(part->GetTransaction().GetOperationType(), errStr)) { - response.Reset(new TProposeResponse(NKikimrScheme::StatusResourceExhausted, ui64(txId), ui64(selfId))); - response->SetError(NKikimrScheme::StatusResourceExhausted, errStr); - } else { - response = part->Propose(owner, context); - } - - Y_ABORT_UNLESS(response); - - LOG_NOTICE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "IgniteOperation" - << ", opId: " << operation->NextPartId() - << ", propose status:" << NKikimrScheme::EStatus_Name(response->Record.GetStatus()) - << ", reason: " << response->Record.GetReason() - << ", at schemeshard: " << selfId); - - if (response->IsDone()) { - operation->AddPart(part); //at ApplyOnExecute parts is erased - context.OnComplete.DoneOperation(part->GetOperationId()); //mark it here by self for sure - } else if (response->IsConditionalAccepted()) { - //happens on retries, we answer like AlreadyExist or StatusSuccess with error message and do nothing in operation - operation->AddPart(part); //at ApplyOnExecute parts is erased - context.OnComplete.DoneOperation(part->GetOperationId()); //mark it here by self for sure - } else if (response->IsAccepted()) { - operation->AddPart(part); - //context.OnComplete.ActivateTx(partOpId) ///TODO maybe it is good idea - } else { - if (!operation->Parts.empty()) { - LOG_NOTICE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "Abort operation: IgniteOperation fail to propose a part" - << ", opId: " << part->GetOperationId() - << ", at schemeshard: " << selfId - << ", already accepted parts: " << operation->Parts.size() - << ", propose result status: " << NKikimrScheme::EStatus_Name(response->Record.GetStatus()) - << ", with reason: " << response->Record.GetReason() - << ", tx message: " << SecureDebugString(record)); - } - - Y_VERIFY_S(context.IsUndoChangesSafe(), - "Operation is aborted and all changes should be reverted" - << ", but context.IsUndoChangesSafe is false, which means some direct writes have been done" - << ", opId: " << part->GetOperationId() - << ", at schemeshard: " << selfId - << ", already accepted parts: " << operation->Parts.size() - << ", propose result status: " << NKikimrScheme::EStatus_Name(response->Record.GetStatus()) - << ", with reason: " << response->Record.GetReason() - << ", tx message: " << SecureDebugString(record)); - - AbortOperationPropose(txId, context); - - return std::move(response); - } + // # Phase Three + // For all initial transactions parts are constructed and proposed - // Check suboperations for undo safety. Log first unsafe suboperation in the schema transaction. - if (prevProposeUndoSafe && !context.IsUndoChangesSafe()) { - prevProposeUndoSafe = false; + for (const auto& transaction : transactions) { + auto parts = operation->ConstructParts(transaction, context); + operation->PreparedParts += parts.size(); - LOG_WARN_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, - "Operation part proposed ok, but propose itself is undo unsafe" - << ", suboperation type: " << NKikimrSchemeOp::EOperationType_Name(part->GetTransaction().GetOperationType()) - << ", opId: " << part->GetOperationId() - << ", at schemeshard: " << selfId - ); - } + if (!ProcessOperationParts(parts, txId, record, prevProposeUndoSafe, operation, response, context)) { + return std::move(response); } } @@ -743,194 +787,14 @@ TOperation::TConsumeQuotaResult TOperation::ConsumeQuota(const TTxTransaction& t return result; } -TOperation::TSplitTransactionsResult TOperation::SplitIntoTransactions(const TTxTransaction& tx, const TOperationContext& context) { - TSplitTransactionsResult result; - - const TPath parentPath = TPath::Resolve(tx.GetWorkingDir(), context.SS); - { - TPath::TChecker checks = parentPath.Check(); - checks - .NotUnderDomainUpgrade() - .IsAtLocalSchemeShard() - .IsResolved() - .NotDeleted() - .NotUnderDeleting() - .IsCommonSensePath() - .IsLikeDirectory(); - - if (!checks) { - result.Transactions.push_back(tx); - return result; - } - } - - TString targetName; - - switch (tx.GetOperationType()) { - case NKikimrSchemeOp::EOperationType::ESchemeOpMkDir: - targetName = tx.GetMkDir().GetName(); - break; - case NKikimrSchemeOp::EOperationType::ESchemeOpCreateTable: - if (tx.GetCreateTable().HasCopyFromTable()) { - result.Transactions.push_back(tx); - return result; - } - targetName = tx.GetCreateTable().GetName(); - break; - case NKikimrSchemeOp::EOperationType::ESchemeOpCreatePersQueueGroup: - targetName = tx.GetCreatePersQueueGroup().GetName(); - break; - case NKikimrSchemeOp::EOperationType::ESchemeOpCreateSubDomain: - case NKikimrSchemeOp::EOperationType::ESchemeOpCreateExtSubDomain: - targetName = tx.GetSubDomain().GetName(); - break; - case NKikimrSchemeOp::EOperationType::ESchemeOpCreateRtmrVolume: - targetName = tx.GetCreateRtmrVolume().GetName(); - break; - case NKikimrSchemeOp::EOperationType::ESchemeOpCreateBlockStoreVolume: - targetName = tx.GetCreateBlockStoreVolume().GetName(); - break; - case NKikimrSchemeOp::EOperationType::ESchemeOpCreateFileStore: - targetName = tx.GetCreateFileStore().GetName(); - break; - case NKikimrSchemeOp::EOperationType::ESchemeOpCreateKesus: - targetName = tx.GetKesus().GetName(); - break; - case NKikimrSchemeOp::EOperationType::ESchemeOpCreateSolomonVolume: - targetName = tx.GetCreateSolomonVolume().GetName(); - break; - case NKikimrSchemeOp::EOperationType::ESchemeOpCreateIndexedTable: - targetName = tx.GetCreateIndexedTable().GetTableDescription().GetName(); - break; - case NKikimrSchemeOp::EOperationType::ESchemeOpCreateColumnStore: - targetName = tx.GetCreateColumnStore().GetName(); - break; - case NKikimrSchemeOp::EOperationType::ESchemeOpCreateColumnTable: - targetName = tx.GetCreateColumnTable().GetName(); - break; - case NKikimrSchemeOp::EOperationType::ESchemeOpCreateExternalTable: - targetName = tx.GetCreateExternalTable().GetName(); - break; - case NKikimrSchemeOp::EOperationType::ESchemeOpCreateExternalDataSource: - targetName = tx.GetCreateExternalDataSource().GetName(); - break; - case NKikimrSchemeOp::EOperationType::ESchemeOpCreateView: - targetName = tx.GetCreateView().GetName(); - break; - case NKikimrSchemeOp::EOperationType::ESchemeOpCreateResourcePool: - targetName = tx.GetCreateResourcePool().GetName(); - break; - case NKikimrSchemeOp::EOperationType::ESchemeOpCreateBackupCollection: - targetName = tx.GetCreateBackupCollection().GetName(); - break; - default: - result.Transactions.push_back(tx); - return result; - } - - if (!targetName || targetName.StartsWith('/') || targetName.EndsWith('/')) { - result.Transactions.push_back(tx); - return result; - } - - TPath path = TPath::Resolve(JoinPath(tx.GetWorkingDir(), targetName), context.SS); - { - TPath::TChecker checks = path.Check(); - checks.IsAtLocalSchemeShard(); - - bool exists = false; - if (path.IsResolved()) { - checks.IsResolved(); - exists = !path.IsDeleted(); - } else { - checks - .NotEmpty() - .NotResolved(); - } - - if (checks && !exists) { - checks.IsValidLeafName(); - } - - if (!checks) { - result.Status = checks.GetStatus(); - result.Reason = checks.GetError(); - result.Transactions.push_back(tx); - return result; - } - - const TString name = path.LeafName(); - path.Rise(); - - TTxTransaction create(tx); - create.SetWorkingDir(path.PathString()); - create.SetFailOnExist(tx.GetFailOnExist()); - - switch (tx.GetOperationType()) { - case NKikimrSchemeOp::EOperationType::ESchemeOpMkDir: - create.MutableMkDir()->SetName(name); - break; - case NKikimrSchemeOp::EOperationType::ESchemeOpCreateTable: - create.MutableCreateTable()->SetName(name); - break; - case NKikimrSchemeOp::EOperationType::ESchemeOpCreatePersQueueGroup: - create.MutableCreatePersQueueGroup()->SetName(name); - break; - case NKikimrSchemeOp::EOperationType::ESchemeOpCreateSubDomain: - case NKikimrSchemeOp::EOperationType::ESchemeOpCreateExtSubDomain: - create.MutableSubDomain()->SetName(name); - break; - case NKikimrSchemeOp::EOperationType::ESchemeOpCreateRtmrVolume: - create.MutableCreateRtmrVolume()->SetName(name); - break; - case NKikimrSchemeOp::EOperationType::ESchemeOpCreateBlockStoreVolume: - create.MutableCreateBlockStoreVolume()->SetName(name); - break; - case NKikimrSchemeOp::EOperationType::ESchemeOpCreateFileStore: - create.MutableCreateFileStore()->SetName(name); - break; - case NKikimrSchemeOp::EOperationType::ESchemeOpCreateKesus: - create.MutableKesus()->SetName(name); - break; - case NKikimrSchemeOp::EOperationType::ESchemeOpCreateSolomonVolume: - create.MutableCreateSolomonVolume()->SetName(name); - break; - case NKikimrSchemeOp::EOperationType::ESchemeOpCreateIndexedTable: - create.MutableCreateIndexedTable()->MutableTableDescription()->SetName(name); - break; - case NKikimrSchemeOp::EOperationType::ESchemeOpCreateColumnStore: - create.MutableCreateColumnStore()->SetName(name); - break; - case NKikimrSchemeOp::EOperationType::ESchemeOpCreateColumnTable: - create.MutableCreateColumnTable()->SetName(name); - break; - case NKikimrSchemeOp::EOperationType::ESchemeOpCreateExternalTable: - create.MutableCreateExternalTable()->SetName(name); - break; - case NKikimrSchemeOp::EOperationType::ESchemeOpCreateExternalDataSource: - create.MutableCreateExternalDataSource()->SetName(name); - break; - case NKikimrSchemeOp::EOperationType::ESchemeOpCreateView: - create.MutableCreateView()->SetName(name); - break; - case NKikimrSchemeOp::EOperationType::ESchemeOpCreateResourcePool: - create.MutableCreateResourcePool()->SetName(name); - break; - case NKikimrSchemeOp::EOperationType::ESchemeOpCreateBackupCollection: - create.MutableCreateBackupCollection()->SetName(name); - break; - default: - Y_ABORT("Invariant violation"); - } - - result.Transactions.push_back(create); +bool CreateDirs(const TTxTransaction& tx, const TPath& parentPath, TPath path, THashSet<TString>& createdPaths, TOperation::TSplitTransactionsResult& result) { + auto initialSize = result.Transactions.size(); - if (exists) { - return result; + while (path != parentPath) { + if (createdPaths.contains(path.PathString())) { + continue; } - } - while (path != parentPath) { TPath::TChecker checks = path.Check(); checks .NotUnderDomainUpgrade() @@ -966,10 +830,11 @@ TOperation::TSplitTransactionsResult TOperation::SplitIntoTransactions(const TTx result.Status = checks.GetStatus(); result.Reason = checks.GetError(); result.Transactions.clear(); - result.Transactions.push_back(tx); - return result; + result.Transaction = tx; + return false; } + createdPaths.emplace(path.PathString()); const TString name = path.LeafName(); path.Rise(); @@ -982,7 +847,150 @@ TOperation::TSplitTransactionsResult TOperation::SplitIntoTransactions(const TTx result.Transactions.push_back(mkdir); } - Reverse(result.Transactions.begin(), result.Transactions.end()); + Reverse(result.Transactions.begin() + initialSize, result.Transactions.end()); + + return true; +} + +// # Generates additional MkDirs for transactions +TOperation::TSplitTransactionsResult TOperation::SplitIntoTransactions(const TTxTransaction& tx, const TOperationContext& context) { + using namespace NGenerated; + + TSplitTransactionsResult result; + THashSet<TString> createdPaths; + + // # Generates MkDirs based on WorkingDir and path + // WorkingDir | TxType.ObjectName + // /Root/some_dir | other_dir/another_dir/object_name + // ^---------^----------^ + // MkDir('/Root/some_dir', 'other_dir') and MkDir('/Root/some_dir/other_dir', 'another_dir') will be generated + // tx.WorkingDir will be changed to '/Root/some_dir/other_dir/another_dir' + // tx.TxType.ObjectName will be changed to 'object_name' + if (DispatchOp(tx, [&](auto traits) { return traits.CreateDirsFromName; })) { + TString targetName; + + if (!DispatchOp(tx, [&](auto traits) { + auto name = traits.GetTargetName(tx); + if (name) { + targetName = *name; + return true; + } + return false; + })) + { + result.Transaction = tx; + return result; + } + + if (!targetName || targetName.StartsWith('/') || targetName.EndsWith('/')) { + result.Transaction = tx; + return result; + } + + const TPath parentPath = TPath::Resolve(tx.GetWorkingDir(), context.SS); + { + TPath::TChecker checks = parentPath.Check(); + checks + .NotUnderDomainUpgrade() + .IsAtLocalSchemeShard() + .IsResolved() + .NotDeleted() + .NotUnderDeleting() + .IsCommonSensePath() + .IsLikeDirectory(); + + if (!checks) { + result.Transaction = tx; + return result; + } + } + + TPath path = TPath::Resolve(JoinPath(tx.GetWorkingDir(), targetName), context.SS); + { + TPath::TChecker checks = path.Check(); + checks.IsAtLocalSchemeShard(); + + bool exists = false; + if (path.IsResolved()) { + checks.IsResolved(); + exists = !path.IsDeleted(); + } else { + checks + .NotEmpty() + .NotResolved(); + } + + if (checks && !exists) { + checks.IsValidLeafName(); + } + + if (!checks) { + result.Status = checks.GetStatus(); + result.Reason = checks.GetError(); + result.Transaction = tx; + return result; + } + + const TString name = path.LeafName(); + path.Rise(); + + TTxTransaction create(tx); + create.SetWorkingDir(path.PathString()); + create.SetFailOnExist(tx.GetFailOnExist()); + + if (!DispatchOp(tx, [&](auto traits) { + return traits.SetName(create, name); + })) + { + Y_ABORT("Invariant violation"); + } + + result.Transaction = create; + + if (exists) { + return result; + } + } + + if (!CreateDirs(tx, parentPath, path, createdPaths, result)) { + return result; + } + } + + // # Generates MkDirs based on transaction-specific requirements + if (DispatchOp(tx, [&](auto traits) { return traits.CreateAdditionalDirs; })) { + for (const auto& [parentPathStr, pathStrs] : DispatchOp(tx, [&](auto traits) { return traits.GetRequiredPaths(tx); })) { + const TPath parentPath = TPath::Resolve(parentPathStr, context.SS); + { + TPath::TChecker checks = parentPath.Check(); + checks + .NotUnderDomainUpgrade() + .IsAtLocalSchemeShard() + .IsResolved() + .NotDeleted() + .NotUnderDeleting() + .IsCommonSensePath() + .IsLikeDirectory(); + + if (!checks) { + result.Transaction = tx; + return result; + } + } + + for (const auto& pathStr : pathStrs) { + TPath path = TPath::Resolve(JoinPath(parentPathStr, pathStr), context.SS); + if (!CreateDirs(tx, parentPath, path, createdPaths, result)) { + return result; + } + } + } + } + + if (!result.Transaction) { + result.Transaction = tx; + } + return result; } diff --git a/ydb/core/tx/schemeshard/schemeshard__operation.h b/ydb/core/tx/schemeshard/schemeshard__operation.h index 45c132d56f..5d22bd498b 100644 --- a/ydb/core/tx/schemeshard/schemeshard__operation.h +++ b/ydb/core/tx/schemeshard/schemeshard__operation.h @@ -11,6 +11,7 @@ struct TOperation: TSimpleRefCount<TOperation> { using TPtr = TIntrusivePtr<TOperation>; const TTxId TxId; + ui32 PreparedParts = 0; TVector<ISubOperation::TPtr> Parts; THashSet<TActorId> Subscribers; @@ -67,6 +68,7 @@ struct TOperation: TSimpleRefCount<TOperation> { NKikimrScheme::EStatus Status = NKikimrScheme::StatusSuccess; TString Reason; TVector<TTxTransaction> Transactions; + std::optional<TTxTransaction> Transaction; }; TOperation(TTxId txId) @@ -144,7 +146,7 @@ struct TOperation: TSimpleRefCount<TOperation> { } TOperationId NextPartId() const { - return TOperationId(TxId, TSubTxId(Parts.size())); + return TOperationId(TxId, TSubTxId(PreparedParts)); } }; diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index a7a3efb69d..eadd7af21c 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -378,6 +378,14 @@ public: NExternalSource::IExternalSourceFactory::TPtr ExternalSourceFactory{NExternalSource::CreateExternalSourceFactory({})}; THolder<TProposeResponse> IgniteOperation(TProposeRequest& request, TOperationContext& context); + bool ProcessOperationParts( + const TVector<ISubOperation::TPtr>& parts, + const TTxId& txId, + const NKikimrScheme::TEvModifySchemeTransaction& record, + bool prevProposeUndoSafe, + TOperation::TPtr& operation, + THolder<TProposeResponse>& response, + TOperationContext& context); void AbortOperationPropose(const TTxId txId, TOperationContext& context); THolder<TEvDataShard::TEvProposeTransaction> MakeDataShardProposal(const TPathId& pathId, const TOperationId& opId, diff --git a/ydb/core/tx/schemeshard/ya.make b/ydb/core/tx/schemeshard/ya.make index 5d77353826..4ae1ca2832 100644 --- a/ydb/core/tx/schemeshard/ya.make +++ b/ydb/core/tx/schemeshard/ya.make @@ -283,6 +283,7 @@ PEERDIR( ydb/core/tx/datashard ydb/core/tx/schemeshard/backup ydb/core/tx/schemeshard/common + ydb/core/tx/schemeshard/generated ydb/core/tx/schemeshard/olap ydb/core/tx/scheme_board ydb/core/tx/tx_allocator_client |