aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authordcherednik <dcherednik@ydb.tech>2023-08-18 17:14:33 +0300
committerdcherednik <dcherednik@ydb.tech>2023-08-18 19:11:14 +0300
commit1cbfd34a55732f7b1d407986b45e40853f01f2c2 (patch)
tree5786127461bee5550ef95ce51bf5217c2b061a2c
parent89efb41db3e4ce655dac0b6b3da2fb32616fa98c (diff)
downloadydb-1cbfd34a55732f7b1d407986b45e40853f01f2c2.tar.gz
Move out schemeshard_build_index_tx_base code from *.h file
-rw-r--r--ydb/core/tx/schemeshard/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/tx/schemeshard/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/tx/schemeshard/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/tx/schemeshard/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp406
-rw-r--r--ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.h413
-rw-r--r--ydb/core/tx/schemeshard/ya.make1
7 files changed, 432 insertions, 392 deletions
diff --git a/ydb/core/tx/schemeshard/CMakeLists.darwin-x86_64.txt b/ydb/core/tx/schemeshard/CMakeLists.darwin-x86_64.txt
index ccf5144f2c..a1d3433ee0 100644
--- a/ydb/core/tx/schemeshard/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/tx/schemeshard/CMakeLists.darwin-x86_64.txt
@@ -258,6 +258,7 @@ target_sources(core-tx-schemeshard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_import.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_build_index.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_build_index__cancel.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_build_index__forget.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_build_index__list.cpp
diff --git a/ydb/core/tx/schemeshard/CMakeLists.linux-aarch64.txt b/ydb/core/tx/schemeshard/CMakeLists.linux-aarch64.txt
index 0843f2fe66..f399397e85 100644
--- a/ydb/core/tx/schemeshard/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/tx/schemeshard/CMakeLists.linux-aarch64.txt
@@ -259,6 +259,7 @@ target_sources(core-tx-schemeshard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_import.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_build_index.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_build_index__cancel.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_build_index__forget.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_build_index__list.cpp
diff --git a/ydb/core/tx/schemeshard/CMakeLists.linux-x86_64.txt b/ydb/core/tx/schemeshard/CMakeLists.linux-x86_64.txt
index 0843f2fe66..f399397e85 100644
--- a/ydb/core/tx/schemeshard/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/tx/schemeshard/CMakeLists.linux-x86_64.txt
@@ -259,6 +259,7 @@ target_sources(core-tx-schemeshard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_import.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_build_index.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_build_index__cancel.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_build_index__forget.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_build_index__list.cpp
diff --git a/ydb/core/tx/schemeshard/CMakeLists.windows-x86_64.txt b/ydb/core/tx/schemeshard/CMakeLists.windows-x86_64.txt
index 85c19cfc15..35bbced865 100644
--- a/ydb/core/tx/schemeshard/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/tx/schemeshard/CMakeLists.windows-x86_64.txt
@@ -258,6 +258,7 @@ target_sources(core-tx-schemeshard PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_import_flow_proposals.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_import.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_build_index.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_build_index__cancel.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_build_index__forget.cpp
${CMAKE_SOURCE_DIR}/ydb/core/tx/schemeshard/schemeshard_build_index__list.cpp
diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp b/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp
new file mode 100644
index 0000000000..5324d4c8ae
--- /dev/null
+++ b/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.cpp
@@ -0,0 +1,406 @@
+#include "schemeshard_build_index_tx_base.h"
+
+#include "schemeshard_impl.h"
+#include "schemeshard_identificators.h"
+#include "schemeshard_billing_helpers.h"
+#include "schemeshard_build_index_helpers.h"
+
+#include "schemeshard__operation_side_effects.h"
+
+#include <ydb/core/metering/metering.h>
+#include <ydb/core/tablet_flat/tablet_flat_executor.h>
+
+namespace NKikimr {
+namespace NSchemeShard {
+
+void TSchemeShard::TIndexBuilder::TTxBase::ApplyState(NTabletFlatExecutor::TTransactionContext& txc) {
+ for (auto& rec: StateChanges) {
+ TIndexBuildId buildId;
+ TIndexBuildInfo::EState state;
+ std::tie(buildId, state) = rec;
+
+ Y_VERIFY_S(Self->IndexBuilds.contains(buildId), "IndexBuilds has no " << buildId);
+ auto buildInfo = Self->IndexBuilds.at(buildId);
+ LOG_I("Change state from " << buildInfo->State << " to " << state);
+ buildInfo->State = state;
+
+ NIceDb::TNiceDb db(txc.DB);
+ Self->PersistBuildIndexState(db, buildInfo);
+ }
+}
+
+void TSchemeShard::TIndexBuilder::TTxBase::ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& ctx) {
+ SideEffects.ApplyOnExecute(Self, txc, ctx);
+ ApplyState(txc);
+ ApplyBill(txc, ctx);
+}
+
+void TSchemeShard::TIndexBuilder::TTxBase::ApplyOnComplete(const TActorContext& ctx) {
+ SideEffects.ApplyOnComplete(Self, ctx);
+ ApplySchedule(ctx);
+}
+
+void TSchemeShard::TIndexBuilder::TTxBase::ApplySchedule(const TActorContext& ctx) {
+ for (const auto& rec: ToScheduleBilling) {
+ ctx.Schedule(
+ std::get<1>(rec),
+ new TEvPrivate::TEvIndexBuildingMakeABill(
+ ui64(std::get<0>(rec)),
+ ctx.Now()));
+ }
+}
+
+ui64 TSchemeShard::TIndexBuilder::TTxBase::RequestUnits(const TBillingStats& stats) {
+ return TRUCalculator::ReadTable(stats.GetBytes())
+ + TRUCalculator::BulkUpsert(stats.GetBytes(), stats.GetRows());
+}
+
+void TSchemeShard::TIndexBuilder::TTxBase::RoundPeriod(TInstant& start, TInstant& end) {
+ if (start.Hours() == end.Hours()) {
+ return; // that is OK
+ }
+
+ TInstant hourEnd = TInstant::Hours(end.Hours());
+
+ if (end - hourEnd >= TDuration::Seconds(1)) {
+ start = hourEnd;
+ return;
+ }
+
+ if (hourEnd - start >= TDuration::Seconds(2)) {
+ end = hourEnd - TDuration::Seconds(1);
+ return;
+ }
+
+ start = hourEnd - TDuration::Seconds(2);
+ end = hourEnd - TDuration::Seconds(1);
+}
+
+void TSchemeShard::TIndexBuilder::TTxBase::ApplyBill(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& ctx)
+{
+ for (const auto& rec: ToBill) {
+ const auto& buildId = std::get<0>(rec);
+ auto startPeriod = std::get<1>(rec);
+ auto endPeriod = std::get<2>(rec);
+
+ if (!startPeriod && !endPeriod) {
+ startPeriod = endPeriod = ctx.Now();
+ }
+
+ RoundPeriod(startPeriod, endPeriod);
+
+ Y_VERIFY_S(Self->IndexBuilds.contains(buildId), "IndexBuilds has no " << buildId);
+ auto buildInfo = Self->IndexBuilds.at(buildId);
+
+ TBillingStats toBill = buildInfo->Processed - buildInfo->Billed;
+ if (!toBill) {
+ continue;
+ }
+
+ TPath domain = TPath::Init(buildInfo->DomainPathId, Self);
+ TPathElement::TPtr pathEl = domain.Base();
+
+ TString cloud_id;
+ if (pathEl->UserAttrs->Attrs.contains("cloud_id")) {
+ cloud_id = pathEl->UserAttrs->Attrs.at("cloud_id");
+ }
+ TString folder_id;
+ if (pathEl->UserAttrs->Attrs.contains("folder_id")) {
+ folder_id = pathEl->UserAttrs->Attrs.at("folder_id");
+ }
+ TString database_id;
+ if (pathEl->UserAttrs->Attrs.contains("database_id")) {
+ database_id = pathEl->UserAttrs->Attrs.at("database_id");
+ }
+
+ if (!cloud_id || !folder_id || !database_id) {
+ LOG_N("ApplyBill: unable to make a bill, neither cloud_id and nor folder_id nor database_id have found in user attributes at the domain"
+ << ", build index operation: " << buildId
+ << ", domain: " << domain.PathString()
+ << ", domainId: " << buildInfo->DomainPathId
+ << ", tableId: " << buildInfo->TablePathId
+ << ", not billed usage: " << toBill);
+ continue;
+ }
+
+ if (!Self->IsServerlessDomain(domain)) {
+ LOG_N("ApplyBill: unable to make a bill, domain is not a serverless db"
+ << ", build index operation: " << buildId
+ << ", domain: " << domain.PathString()
+ << ", domainId: " << buildInfo->DomainPathId
+ << ", IsDomainSchemeShard: " << Self->IsDomainSchemeShard
+ << ", ParentDomainId: " << Self->ParentDomainId
+ << ", ResourcesDomainId: " << domain.DomainInfo()->GetResourcesDomainId()
+ << ", not billed usage: " << toBill);
+ continue;
+ }
+
+ NIceDb::TNiceDb db(txc.DB);
+
+ buildInfo->Billed += toBill;
+ Self->PersistBuildIndexBilling(db, buildInfo);
+
+ ui64 requestUnits = RequestUnits(toBill);
+
+ TString id = TStringBuilder()
+ << buildId << "-"
+ << buildInfo->TablePathId.OwnerId << "-" << buildInfo->TablePathId.LocalPathId << "-"
+ << buildInfo->Billed.GetRows() << "-" << buildInfo->Billed.GetBytes() << "-"
+ << buildInfo->Processed.GetRows() << "-" << buildInfo->Processed.GetBytes();
+
+ const TString billRecord = TBillRecord()
+ .Id(id)
+ .CloudId(cloud_id)
+ .FolderId(folder_id)
+ .ResourceId(database_id)
+ .SourceWt(ctx.Now())
+ .Usage(TBillRecord::RequestUnits(requestUnits, startPeriod, endPeriod))
+ .ToString();
+
+ LOG_D("ApplyBill: made a bill"
+ << ", buildInfo: " << *buildInfo
+ << ", record: '" << billRecord << "'");
+
+ auto request = MakeHolder<NMetering::TEvMetering::TEvWriteMeteringJson>(std::move(billRecord));
+ // send message at Complete stage
+ Send(NMetering::MakeMeteringServiceID(), std::move(request));
+ }
+}
+
+void TSchemeShard::TIndexBuilder::TTxBase::Send(TActorId dst, THolder<IEventBase> message, ui32 flags, ui64 cookie) {
+ SideEffects.Send(dst, message.Release(), cookie, flags);
+}
+
+void TSchemeShard::TIndexBuilder::TTxBase::ChangeState(TIndexBuildId id, TIndexBuildInfo::EState state) {
+ StateChanges.push_back(TChangeStateRec(id, state));
+}
+
+void TSchemeShard::TIndexBuilder::TTxBase::Progress(TIndexBuildId id) {
+ SideEffects.ToProgress(id);
+}
+
+void TSchemeShard::TIndexBuilder::TTxBase::Fill(NKikimrIndexBuilder::TIndexBuild& index, const TIndexBuildInfo::TPtr indexInfo) {
+ index.SetId(ui64(indexInfo->Id));
+ if (indexInfo->Issue) {
+ AddIssue(index.MutableIssues(), indexInfo->Issue);
+ }
+
+ for (const auto& item: indexInfo->Shards) {
+ const TShardIdx& shardIdx = item.first;
+ const TIndexBuildInfo::TShardStatus& status = item.second;
+
+ if (status.Status != NKikimrTxDataShard::TEvBuildIndexProgressResponse::INPROGRESS) {
+ if (status.UploadStatus != Ydb::StatusIds::SUCCESS) {
+ if (status.DebugMessage) {
+ AddIssue(index.MutableIssues(), status.ToString(shardIdx));
+ }
+ }
+ }
+ }
+
+ switch (indexInfo->State) {
+ case TIndexBuildInfo::EState::Locking:
+ case TIndexBuildInfo::EState::GatheringStatistics:
+ case TIndexBuildInfo::EState::Initiating:
+ index.SetState(Ydb::Table::IndexBuildState::STATE_PREPARING);
+ index.SetProgress(0.0);
+ break;
+ case TIndexBuildInfo::EState::Filling:
+ index.SetState(Ydb::Table::IndexBuildState::STATE_TRANSFERING_DATA);
+ index.SetProgress(indexInfo->CalcProgressPercent());
+ break;
+ case TIndexBuildInfo::EState::Applying:
+ case TIndexBuildInfo::EState::Unlocking:
+ index.SetState(Ydb::Table::IndexBuildState::STATE_APPLYING);
+ index.SetProgress(100.0);
+ break;
+ case TIndexBuildInfo::EState::Done:
+ index.SetState(Ydb::Table::IndexBuildState::STATE_DONE);
+ index.SetProgress(100.0);
+ break;
+ case TIndexBuildInfo::EState::Cancellation_Applying:
+ case TIndexBuildInfo::EState::Cancellation_Unlocking:
+ index.SetState(Ydb::Table::IndexBuildState::STATE_CANCELLATION);
+ index.SetProgress(0.0);
+ break;
+ case TIndexBuildInfo::EState::Cancelled:
+ index.SetState(Ydb::Table::IndexBuildState::STATE_CANCELLED);
+ index.SetProgress(0.0);
+ break;
+ case TIndexBuildInfo::EState::Rejection_Applying:
+ index.SetState(Ydb::Table::IndexBuildState::STATE_REJECTION);
+ index.SetProgress(0.0);
+ break;
+ case TIndexBuildInfo::EState::Rejection_Unlocking:
+ index.SetState(Ydb::Table::IndexBuildState::STATE_REJECTION);
+ index.SetProgress(0.0);
+ break;
+ case TIndexBuildInfo::EState::Rejected:
+ index.SetState(Ydb::Table::IndexBuildState::STATE_REJECTED);
+ index.SetProgress(0.0);
+ break;
+ case TIndexBuildInfo::EState::Invalid:
+ index.SetState(Ydb::Table::IndexBuildState::STATE_UNSPECIFIED);
+ break;
+ }
+
+ Fill(*index.MutableSettings(), indexInfo);
+}
+
+void TSchemeShard::TIndexBuilder::TTxBase::Fill(NKikimrIndexBuilder::TIndexBuildSettings& settings, const TIndexBuildInfo::TPtr indexInfo) {
+ TPath table = TPath::Init(indexInfo->TablePathId, Self);
+ settings.set_source_path(table.PathString());
+
+ Ydb::Table::TableIndex& index = *settings.mutable_index();
+ index.set_name(indexInfo->IndexName);
+
+ *index.mutable_index_columns() = {
+ indexInfo->IndexColumns.begin(),
+ indexInfo->IndexColumns.end()
+ };
+
+ *index.mutable_data_columns() = {
+ indexInfo->DataColumns.begin(),
+ indexInfo->DataColumns.end()
+ };
+
+ switch (indexInfo->IndexType) {
+ case NKikimrSchemeOp::EIndexType::EIndexTypeGlobal:
+ *index.mutable_global_index() = Ydb::Table::GlobalIndex();
+ break;
+ case NKikimrSchemeOp::EIndexType::EIndexTypeGlobalAsync:
+ *index.mutable_global_async_index() = Ydb::Table::GlobalAsyncIndex();
+ break;
+ case NKikimrSchemeOp::EIndexType::EIndexTypeInvalid:
+ Y_FAIL("Unreachable");
+ };
+
+ settings.set_max_batch_bytes(indexInfo->Limits.MaxBatchBytes);
+ settings.set_max_batch_rows(indexInfo->Limits.MaxBatchRows);
+ settings.set_max_shards_in_flight(indexInfo->Limits.MaxShards);
+ settings.set_max_retries_upload_batch(indexInfo->Limits.MaxRetries);
+}
+
+void TSchemeShard::TIndexBuilder::TTxBase::AddIssue(::google::protobuf::RepeatedPtrField<::Ydb::Issue::IssueMessage>* issues,
+ const TString& message,
+ NYql::TSeverityIds::ESeverityId severity)
+{
+ auto& issue = *issues->Add();
+ issue.set_severity(severity);
+ issue.set_message(message);
+}
+
+void TSchemeShard::TIndexBuilder::TTxBase::SendNotificationsIfFinished(TIndexBuildInfo::TPtr indexInfo) {
+ if (!indexInfo->IsFinished()) {
+ return;
+ }
+
+ LOG_T("TIndexBuildInfo SendNotifications: "
+ << ": id# " << indexInfo->Id
+ << ", subscribers count# " << indexInfo->Subscribers.size());
+
+ TSet<TActorId> toAnswer;
+ toAnswer.swap(indexInfo->Subscribers);
+ for (auto& actorId: toAnswer) {
+ Send(actorId, MakeHolder<TEvSchemeShard::TEvNotifyTxCompletionResult>(ui64(indexInfo->Id)));
+ }
+}
+
+void TSchemeShard::TIndexBuilder::TTxBase::EraseBuildInfo(const TIndexBuildInfo::TPtr indexBuildInfo) {
+ Self->IndexBuilds.erase(indexBuildInfo->Id);
+ Self->IndexBuildsByUid.erase(indexBuildInfo->Uid);
+
+ Self->TxIdToIndexBuilds.erase(indexBuildInfo->LockTxId);
+ Self->TxIdToIndexBuilds.erase(indexBuildInfo->InitiateTxId);
+ Self->TxIdToIndexBuilds.erase(indexBuildInfo->ApplyTxId);
+ Self->TxIdToIndexBuilds.erase(indexBuildInfo->UnlockTxId);
+}
+
+Ydb::StatusIds::StatusCode TSchemeShard::TIndexBuilder::TTxBase::TranslateStatusCode(NKikimrScheme::EStatus status) {
+ switch (status) {
+ case NKikimrScheme::StatusSuccess:
+ case NKikimrScheme::StatusAccepted:
+ case NKikimrScheme::StatusAlreadyExists:
+ return Ydb::StatusIds::SUCCESS;
+
+ case NKikimrScheme::StatusPathDoesNotExist:
+ case NKikimrScheme::StatusPathIsNotDirectory:
+ case NKikimrScheme::StatusSchemeError:
+ case NKikimrScheme::StatusNameConflict:
+ case NKikimrScheme::StatusInvalidParameter:
+ case NKikimrScheme::StatusRedirectDomain:
+ return Ydb::StatusIds::BAD_REQUEST;
+
+ case NKikimrScheme::StatusMultipleModifications:
+ case NKikimrScheme::StatusQuotaExceeded:
+ return Ydb::StatusIds::OVERLOADED;
+
+ case NKikimrScheme::StatusReadOnly:
+ case NKikimrScheme::StatusPreconditionFailed:
+ case NKikimrScheme::StatusResourceExhausted: //TODO: find better YDB status for it
+ return Ydb::StatusIds::PRECONDITION_FAILED;
+
+ case NKikimrScheme::StatusAccessDenied:
+ return Ydb::StatusIds::UNAUTHORIZED;
+ case NKikimrScheme::StatusNotAvailable:
+ case NKikimrScheme::StatusTxIdNotExists:
+ case NKikimrScheme::StatusTxIsNotCancellable:
+ case NKikimrScheme::StatusReserved18:
+ case NKikimrScheme::StatusReserved19:
+ Y_FAIL("unreachable");
+ }
+
+ return Ydb::StatusIds::STATUS_CODE_UNSPECIFIED;
+}
+
+void TSchemeShard::TIndexBuilder::TTxBase::Bill(const TIndexBuildInfo::TPtr& indexBuildInfo,
+ TInstant startPeriod, TInstant endPeriod)
+{
+ ToBill.push_back(TToBill(indexBuildInfo->Id, std::move(startPeriod), std::move(endPeriod)));
+}
+
+void TSchemeShard::TIndexBuilder::TTxBase::AskToScheduleBilling(const TIndexBuildInfo::TPtr& indexBuildInfo) {
+ if (indexBuildInfo->BillingEventIsScheduled) {
+ return;
+ }
+
+ if (indexBuildInfo->State != TIndexBuildInfo::EState::Filling) {
+ return;
+ }
+
+ indexBuildInfo->BillingEventIsScheduled = true;
+
+ ToScheduleBilling.push_back(TBillingEventSchedule(indexBuildInfo->Id, indexBuildInfo->ReBillPeriod));
+}
+
+bool TSchemeShard::TIndexBuilder::TTxBase::GotScheduledBilling(const TIndexBuildInfo::TPtr& indexBuildInfo) {
+ if (!indexBuildInfo->BillingEventIsScheduled) {
+ return false;
+ }
+
+ if (indexBuildInfo->State != TIndexBuildInfo::EState::Filling) {
+ return false;
+ }
+
+ indexBuildInfo->BillingEventIsScheduled = false;
+
+ return true;
+}
+
+bool TSchemeShard::TIndexBuilder::TTxBase::Execute(TTransactionContext& txc, const TActorContext& ctx) {
+ if (!DoExecute(txc, ctx)) {
+ return false;
+ }
+
+ ApplyOnExecute(txc, ctx);
+ return true;
+}
+
+void TSchemeShard::TIndexBuilder::TTxBase::Complete(const TActorContext& ctx) {
+ DoComplete(ctx);
+
+ ApplyOnComplete(ctx);
+}
+
+} // NSchemeShard
+} // NKikimr
diff --git a/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.h b/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.h
index e9f4adb574..4542a8c7d4 100644
--- a/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.h
+++ b/ydb/core/tx/schemeshard/schemeshard_build_index_tx_base.h
@@ -1,14 +1,6 @@
#pragma once
#include "schemeshard_impl.h"
-#include "schemeshard_identificators.h"
-#include "schemeshard_billing_helpers.h"
-#include "schemeshard_build_index_helpers.h"
-
-#include "schemeshard__operation_part.h" // TSideEffects, make separate file
-
-#include <ydb/core/metering/metering.h>
-#include <ydb/core/tablet_flat/tablet_flat_executor.h>
namespace NKikimr {
namespace NSchemeShard {
@@ -24,379 +16,29 @@ private:
using TToBill = std::tuple<TIndexBuildId, TInstant, TInstant>;
TDeque<TToBill> ToBill;
- void ApplyState(NTabletFlatExecutor::TTransactionContext& txc) {
- for (auto& rec: StateChanges) {
- TIndexBuildId buildId;
- TIndexBuildInfo::EState state;
- std::tie(buildId, state) = rec;
-
- Y_VERIFY_S(Self->IndexBuilds.contains(buildId), "IndexBuilds has no " << buildId);
- auto buildInfo = Self->IndexBuilds.at(buildId);
- LOG_I("Change state from " << buildInfo->State << " to " << state);
- buildInfo->State = state;
-
- NIceDb::TNiceDb db(txc.DB);
- Self->PersistBuildIndexState(db, buildInfo);
- }
- }
-
- void ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& ctx) {
- SideEffects.ApplyOnExecute(Self, txc, ctx);
- ApplyState(txc);
- ApplyBill(txc, ctx);
- }
-
- void ApplyOnComplete(const TActorContext& ctx) {
- SideEffects.ApplyOnComplete(Self, ctx);
- ApplySchedule(ctx);
- }
-
- void ApplySchedule(const TActorContext& ctx) {
- for (const auto& rec: ToScheduleBilling) {
- ctx.Schedule(
- std::get<1>(rec),
- new TEvPrivate::TEvIndexBuildingMakeABill(
- ui64(std::get<0>(rec)),
- ctx.Now()));
- }
- }
-
- ui64 RequestUnits(const TBillingStats& stats) {
- return TRUCalculator::ReadTable(stats.GetBytes())
- + TRUCalculator::BulkUpsert(stats.GetBytes(), stats.GetRows());
- }
-
- void RoundPeriod(TInstant& start, TInstant& end) {
- if (start.Hours() == end.Hours()) {
- return; // that is OK
- }
-
- TInstant hourEnd = TInstant::Hours(end.Hours());
-
- if (end - hourEnd >= TDuration::Seconds(1)) {
- start = hourEnd;
- return;
- }
-
- if (hourEnd - start >= TDuration::Seconds(2)) {
- end = hourEnd - TDuration::Seconds(1);
- return;
- }
-
- start = hourEnd - TDuration::Seconds(2);
- end = hourEnd - TDuration::Seconds(1);
- }
-
- void ApplyBill(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& ctx)
- {
- for (const auto& rec: ToBill) {
- const auto& buildId = std::get<0>(rec);
- auto startPeriod = std::get<1>(rec);
- auto endPeriod = std::get<2>(rec);
-
- if (!startPeriod && !endPeriod) {
- startPeriod = endPeriod = ctx.Now();
- }
-
- RoundPeriod(startPeriod, endPeriod);
-
- Y_VERIFY_S(Self->IndexBuilds.contains(buildId), "IndexBuilds has no " << buildId);
- auto buildInfo = Self->IndexBuilds.at(buildId);
-
- TBillingStats toBill = buildInfo->Processed - buildInfo->Billed;
- if (!toBill) {
- continue;
- }
-
- TPath domain = TPath::Init(buildInfo->DomainPathId, Self);
- TPathElement::TPtr pathEl = domain.Base();
-
- TString cloud_id;
- if (pathEl->UserAttrs->Attrs.contains("cloud_id")) {
- cloud_id = pathEl->UserAttrs->Attrs.at("cloud_id");
- }
- TString folder_id;
- if (pathEl->UserAttrs->Attrs.contains("folder_id")) {
- folder_id = pathEl->UserAttrs->Attrs.at("folder_id");
- }
- TString database_id;
- if (pathEl->UserAttrs->Attrs.contains("database_id")) {
- database_id = pathEl->UserAttrs->Attrs.at("database_id");
- }
-
- if (!cloud_id || !folder_id || !database_id) {
- LOG_N("ApplyBill: unable to make a bill, neither cloud_id and nor folder_id nor database_id have found in user attributes at the domain"
- << ", build index operation: " << buildId
- << ", domain: " << domain.PathString()
- << ", domainId: " << buildInfo->DomainPathId
- << ", tableId: " << buildInfo->TablePathId
- << ", not billed usage: " << toBill);
- continue;
- }
-
- if (!Self->IsServerlessDomain(domain)) {
- LOG_N("ApplyBill: unable to make a bill, domain is not a serverless db"
- << ", build index operation: " << buildId
- << ", domain: " << domain.PathString()
- << ", domainId: " << buildInfo->DomainPathId
- << ", IsDomainSchemeShard: " << Self->IsDomainSchemeShard
- << ", ParentDomainId: " << Self->ParentDomainId
- << ", ResourcesDomainId: " << domain.DomainInfo()->GetResourcesDomainId()
- << ", not billed usage: " << toBill);
- continue;
- }
-
- NIceDb::TNiceDb db(txc.DB);
-
- buildInfo->Billed += toBill;
- Self->PersistBuildIndexBilling(db, buildInfo);
-
- ui64 requestUnits = RequestUnits(toBill);
-
- TString id = TStringBuilder()
- << buildId << "-"
- << buildInfo->TablePathId.OwnerId << "-" << buildInfo->TablePathId.LocalPathId << "-"
- << buildInfo->Billed.GetRows() << "-" << buildInfo->Billed.GetBytes() << "-"
- << buildInfo->Processed.GetRows() << "-" << buildInfo->Processed.GetBytes();
-
- const TString billRecord = TBillRecord()
- .Id(id)
- .CloudId(cloud_id)
- .FolderId(folder_id)
- .ResourceId(database_id)
- .SourceWt(ctx.Now())
- .Usage(TBillRecord::RequestUnits(requestUnits, startPeriod, endPeriod))
- .ToString();
-
- LOG_D("ApplyBill: made a bill"
- << ", buildInfo: " << *buildInfo
- << ", record: '" << billRecord << "'");
-
- auto request = MakeHolder<NMetering::TEvMetering::TEvWriteMeteringJson>(std::move(billRecord));
- // send message at Complete stage
- Send(NMetering::MakeMeteringServiceID(), std::move(request));
- }
- }
+ void ApplyState(NTabletFlatExecutor::TTransactionContext& txc);
+ void ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& ctx);
+ void ApplyOnComplete(const TActorContext& ctx);
+ void ApplySchedule(const TActorContext& ctx);
+ ui64 RequestUnits(const TBillingStats& stats);
+ void RoundPeriod(TInstant& start, TInstant& end);
+ void ApplyBill(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& ctx);
protected:
- void Send(TActorId dst, THolder<IEventBase> message, ui32 flags = 0, ui64 cookie = 0) {
- SideEffects.Send(dst, message.Release(), cookie, flags);
- }
-
- void ChangeState(TIndexBuildId id, TIndexBuildInfo::EState state) {
- StateChanges.push_back(TChangeStateRec(id, state));
- }
-
- void Progress(TIndexBuildId id) {
- SideEffects.ToProgress(id);
- }
-
- void Fill(NKikimrIndexBuilder::TIndexBuild& index, const TIndexBuildInfo::TPtr indexInfo) {
- index.SetId(ui64(indexInfo->Id));
- if (indexInfo->Issue) {
- AddIssue(index.MutableIssues(), indexInfo->Issue);
- }
-
- for (const auto& item: indexInfo->Shards) {
- const TShardIdx& shardIdx = item.first;
- const TIndexBuildInfo::TShardStatus& status = item.second;
-
- if (status.Status != NKikimrTxDataShard::TEvBuildIndexProgressResponse::INPROGRESS) {
- if (status.UploadStatus != Ydb::StatusIds::SUCCESS) {
- if (status.DebugMessage) {
- AddIssue(index.MutableIssues(), status.ToString(shardIdx));
- }
- }
- }
- }
-
- switch (indexInfo->State) {
- case TIndexBuildInfo::EState::Locking:
- case TIndexBuildInfo::EState::GatheringStatistics:
- case TIndexBuildInfo::EState::Initiating:
- index.SetState(Ydb::Table::IndexBuildState::STATE_PREPARING);
- index.SetProgress(0.0);
- break;
- case TIndexBuildInfo::EState::Filling:
- index.SetState(Ydb::Table::IndexBuildState::STATE_TRANSFERING_DATA);
- index.SetProgress(indexInfo->CalcProgressPercent());
- break;
- case TIndexBuildInfo::EState::Applying:
- case TIndexBuildInfo::EState::Unlocking:
- index.SetState(Ydb::Table::IndexBuildState::STATE_APPLYING);
- index.SetProgress(100.0);
- break;
- case TIndexBuildInfo::EState::Done:
- index.SetState(Ydb::Table::IndexBuildState::STATE_DONE);
- index.SetProgress(100.0);
- break;
- case TIndexBuildInfo::EState::Cancellation_Applying:
- case TIndexBuildInfo::EState::Cancellation_Unlocking:
- index.SetState(Ydb::Table::IndexBuildState::STATE_CANCELLATION);
- index.SetProgress(0.0);
- break;
- case TIndexBuildInfo::EState::Cancelled:
- index.SetState(Ydb::Table::IndexBuildState::STATE_CANCELLED);
- index.SetProgress(0.0);
- break;
- case TIndexBuildInfo::EState::Rejection_Applying:
- index.SetState(Ydb::Table::IndexBuildState::STATE_REJECTION);
- index.SetProgress(0.0);
- break;
- case TIndexBuildInfo::EState::Rejection_Unlocking:
- index.SetState(Ydb::Table::IndexBuildState::STATE_REJECTION);
- index.SetProgress(0.0);
- break;
- case TIndexBuildInfo::EState::Rejected:
- index.SetState(Ydb::Table::IndexBuildState::STATE_REJECTED);
- index.SetProgress(0.0);
- break;
- case TIndexBuildInfo::EState::Invalid:
- index.SetState(Ydb::Table::IndexBuildState::STATE_UNSPECIFIED);
- break;
- }
-
- Fill(*index.MutableSettings(), indexInfo);
- }
-
- void Fill(NKikimrIndexBuilder::TIndexBuildSettings& settings, const TIndexBuildInfo::TPtr indexInfo) {
- TPath table = TPath::Init(indexInfo->TablePathId, Self);
- settings.set_source_path(table.PathString());
-
- Ydb::Table::TableIndex& index = *settings.mutable_index();
- index.set_name(indexInfo->IndexName);
-
- *index.mutable_index_columns() = {
- indexInfo->IndexColumns.begin(),
- indexInfo->IndexColumns.end()
- };
-
- *index.mutable_data_columns() = {
- indexInfo->DataColumns.begin(),
- indexInfo->DataColumns.end()
- };
-
- switch (indexInfo->IndexType) {
- case NKikimrSchemeOp::EIndexType::EIndexTypeGlobal:
- *index.mutable_global_index() = Ydb::Table::GlobalIndex();
- break;
- case NKikimrSchemeOp::EIndexType::EIndexTypeGlobalAsync:
- *index.mutable_global_async_index() = Ydb::Table::GlobalAsyncIndex();
- break;
- case NKikimrSchemeOp::EIndexType::EIndexTypeInvalid:
- Y_FAIL("Unreachable");
- };
-
- settings.set_max_batch_bytes(indexInfo->Limits.MaxBatchBytes);
- settings.set_max_batch_rows(indexInfo->Limits.MaxBatchRows);
- settings.set_max_shards_in_flight(indexInfo->Limits.MaxShards);
- settings.set_max_retries_upload_batch(indexInfo->Limits.MaxRetries);
- }
-
+ void Send(TActorId dst, THolder<IEventBase> message, ui32 flags = 0, ui64 cookie = 0);
+ void ChangeState(TIndexBuildId id, TIndexBuildInfo::EState state);
+ void Progress(TIndexBuildId id);
+ void Fill(NKikimrIndexBuilder::TIndexBuild& index, const TIndexBuildInfo::TPtr indexInfo);
+ void Fill(NKikimrIndexBuilder::TIndexBuildSettings& settings, const TIndexBuildInfo::TPtr indexInfo);
void AddIssue(::google::protobuf::RepeatedPtrField< ::Ydb::Issue::IssueMessage>* issues,
const TString& message,
- NYql::TSeverityIds::ESeverityId severity = NYql::TSeverityIds::S_ERROR)
- {
- auto& issue = *issues->Add();
- issue.set_severity(severity);
- issue.set_message(message);
- }
-
- void SendNotificationsIfFinished(TIndexBuildInfo::TPtr indexInfo) {
- if (!indexInfo->IsFinished()) {
- return;
- }
-
- LOG_T("TIndexBuildInfo SendNotifications: "
- << ": id# " << indexInfo->Id
- << ", subscribers count# " << indexInfo->Subscribers.size());
-
- TSet<TActorId> toAnswer;
- toAnswer.swap(indexInfo->Subscribers);
- for (auto& actorId: toAnswer) {
- Send(actorId, MakeHolder<TEvSchemeShard::TEvNotifyTxCompletionResult>(ui64(indexInfo->Id)));
- }
- }
-
- void EraseBuildInfo(const TIndexBuildInfo::TPtr indexBuildInfo) {
- Self->IndexBuilds.erase(indexBuildInfo->Id);
- Self->IndexBuildsByUid.erase(indexBuildInfo->Uid);
-
- Self->TxIdToIndexBuilds.erase(indexBuildInfo->LockTxId);
- Self->TxIdToIndexBuilds.erase(indexBuildInfo->InitiateTxId);
- Self->TxIdToIndexBuilds.erase(indexBuildInfo->ApplyTxId);
- Self->TxIdToIndexBuilds.erase(indexBuildInfo->UnlockTxId);
- }
-
- Ydb::StatusIds::StatusCode TranslateStatusCode(NKikimrScheme::EStatus status) {
- switch (status) {
- case NKikimrScheme::StatusSuccess:
- case NKikimrScheme::StatusAccepted:
- case NKikimrScheme::StatusAlreadyExists:
- return Ydb::StatusIds::SUCCESS;
-
- case NKikimrScheme::StatusPathDoesNotExist:
- case NKikimrScheme::StatusPathIsNotDirectory:
- case NKikimrScheme::StatusSchemeError:
- case NKikimrScheme::StatusNameConflict:
- case NKikimrScheme::StatusInvalidParameter:
- case NKikimrScheme::StatusRedirectDomain:
- return Ydb::StatusIds::BAD_REQUEST;
-
- case NKikimrScheme::StatusMultipleModifications:
- case NKikimrScheme::StatusQuotaExceeded:
- return Ydb::StatusIds::OVERLOADED;
-
- case NKikimrScheme::StatusReadOnly:
- case NKikimrScheme::StatusPreconditionFailed:
- case NKikimrScheme::StatusResourceExhausted: //TODO: find better YDB status for it
- return Ydb::StatusIds::PRECONDITION_FAILED;
-
- case NKikimrScheme::StatusAccessDenied:
- return Ydb::StatusIds::UNAUTHORIZED;
- case NKikimrScheme::StatusNotAvailable:
- case NKikimrScheme::StatusTxIdNotExists:
- case NKikimrScheme::StatusTxIsNotCancellable:
- case NKikimrScheme::StatusReserved18:
- case NKikimrScheme::StatusReserved19:
- Y_FAIL("unreachable");
- }
-
- return Ydb::StatusIds::STATUS_CODE_UNSPECIFIED;
- }
-
- void Bill(const TIndexBuildInfo::TPtr& indexBuildInfo, TInstant startPeriod = TInstant::Zero(), TInstant endPeriod = TInstant::Zero()) {
-
- ToBill.push_back(TToBill(indexBuildInfo->Id, std::move(startPeriod), std::move(endPeriod)));
- }
-
- void AskToScheduleBilling(const TIndexBuildInfo::TPtr& indexBuildInfo) {
- if (indexBuildInfo->BillingEventIsScheduled) {
- return;
- }
-
- if (indexBuildInfo->State != TIndexBuildInfo::EState::Filling) {
- return;
- }
-
- indexBuildInfo->BillingEventIsScheduled = true;
-
- ToScheduleBilling.push_back(TBillingEventSchedule(indexBuildInfo->Id, indexBuildInfo->ReBillPeriod));
- }
-
- bool GotScheduledBilling(const TIndexBuildInfo::TPtr& indexBuildInfo) {
- if (!indexBuildInfo->BillingEventIsScheduled) {
- return false;
- }
-
- if (indexBuildInfo->State != TIndexBuildInfo::EState::Filling) {
- return false;
- }
-
- indexBuildInfo->BillingEventIsScheduled = false;
-
- return true;
- }
+ NYql::TSeverityIds::ESeverityId severity = NYql::TSeverityIds::S_ERROR);
+ void SendNotificationsIfFinished(TIndexBuildInfo::TPtr indexInfo);
+ void EraseBuildInfo(const TIndexBuildInfo::TPtr indexBuildInfo);
+ Ydb::StatusIds::StatusCode TranslateStatusCode(NKikimrScheme::EStatus status);
+ void Bill(const TIndexBuildInfo::TPtr& indexBuildInfo, TInstant startPeriod = TInstant::Zero(), TInstant endPeriod = TInstant::Zero());
+ void AskToScheduleBilling(const TIndexBuildInfo::TPtr& indexBuildInfo);
+ bool GotScheduledBilling(const TIndexBuildInfo::TPtr& indexBuildInfo);
public:
explicit TTxBase(TSelf* self)
@@ -408,21 +50,8 @@ public:
virtual bool DoExecute(TTransactionContext& txc, const TActorContext& ctx) = 0;
virtual void DoComplete(const TActorContext& ctx) = 0;
- bool Execute(TTransactionContext& txc, const TActorContext& ctx) override {
- if (!DoExecute(txc, ctx)) {
- return false;
- }
-
- ApplyOnExecute(txc, ctx);
- return true;
- }
-
- void Complete(const TActorContext& ctx) override {
- DoComplete(ctx);
-
- ApplyOnComplete(ctx);
- }
-
+ bool Execute(TTransactionContext& txc, const TActorContext& ctx) override;
+ void Complete(const TActorContext& ctx) override;
};
} // NSchemeShard
diff --git a/ydb/core/tx/schemeshard/ya.make b/ydb/core/tx/schemeshard/ya.make
index 0e0c5e8d3b..1d458745bd 100644
--- a/ydb/core/tx/schemeshard/ya.make
+++ b/ydb/core/tx/schemeshard/ya.make
@@ -213,6 +213,7 @@ SRCS(
schemeshard_import_flow_proposals.cpp
schemeshard_import.cpp
schemeshard_build_index.cpp
+ schemeshard_build_index_tx_base.cpp
schemeshard_build_index__cancel.cpp
schemeshard_build_index__forget.cpp
schemeshard_build_index__list.cpp