diff options
author | Alexander Avdonkin <aavdonkin@yandex.ru> | 2025-03-05 17:00:45 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2025-03-05 17:00:45 +0300 |
commit | 347babcacae409cceab8bb183cc98b110f971465 (patch) | |
tree | 8a53003b6b9320582e50bb95176e10e7394bf926 | |
parent | 1d48d63269fb026ed127542eda87bd8860dcc0cb (diff) | |
download | ydb-347babcacae409cceab8bb183cc98b110f971465.tar.gz |
Stop writing to column tables if quota is exceeded (#11580)
25 files changed, 606 insertions, 24 deletions
diff --git a/ydb/core/protos/counters_columnshard.proto b/ydb/core/protos/counters_columnshard.proto index 327a8c999f5..d1c7a6d5e68 100644 --- a/ydb/core/protos/counters_columnshard.proto +++ b/ydb/core/protos/counters_columnshard.proto @@ -205,4 +205,6 @@ enum ETxTypes { TXTYPE_ASK_PORTION_METADATA = 38 [(TxTypeOpts) = {Name: "TxAskPortionMetadata"}]; TXTYPE_WRITE_PORTIONS_FINISHED = 39 [(TxTypeOpts) = {Name: "TxWritePortionsFinished"}]; TXTYPE_WRITE_PORTIONS_FAILED = 40 [(TxTypeOpts) = {Name: "TxWritePortionsFailed"}]; + TXTYPE_PERSIST_SUBDOMAIN_OUT_OF_SPACE = 41 [(TxTypeOpts) = {Name: "TxPersistSubDomainOutOfSpace"}]; + TXTYPE_PERSIST_SUBDOMAIN_PATH_ID = 42 [(TxTypeOpts) = {Name: "TxPersistSubDomainPathId"}]; } diff --git a/ydb/core/protos/tx_columnshard.proto b/ydb/core/protos/tx_columnshard.proto index 657786fd450..a6dd75e9ded 100644 --- a/ydb/core/protos/tx_columnshard.proto +++ b/ydb/core/protos/tx_columnshard.proto @@ -162,6 +162,7 @@ message TEvProposeTransaction { optional NKikimrSubDomains.TProcessingParams ProcessingParams = 6; optional uint64 Flags = 7; optional NKikimrTx.TMessageSeqNo SeqNo = 8; + optional uint64 SubDomainPathId = 9; } message TEvCheckPlannedTransaction { diff --git a/ydb/core/tx/columnshard/columnshard.h b/ydb/core/tx/columnshard/columnshard.h index 4f731c2dd12..53f1efccf3a 100644 --- a/ydb/core/tx/columnshard/columnshard.h +++ b/ydb/core/tx/columnshard/columnshard.h @@ -140,16 +140,19 @@ namespace TEvColumnShard { } TEvProposeTransaction(NKikimrTxColumnShard::ETransactionKind txKind, ui64 ssId, const TActorId& source, - ui64 txId, TString txBody, const ui32 flags = 0) + ui64 txId, TString txBody, const ui32 flags, ui64 subDomainPathId) : TEvProposeTransaction(txKind, source, txId, std::move(txBody), flags) { // Y_ABORT_UNLESS(txKind == NKikimrTxColumnShard::TX_KIND_SCHEMA); Record.SetSchemeShardId(ssId); + if (subDomainPathId != 0) { + Record.SetSubDomainPathId(subDomainPathId); + } } TEvProposeTransaction(NKikimrTxColumnShard::ETransactionKind txKind, ui64 ssId, const TActorId& source, - ui64 txId, TString txBody, const TMessageSeqNo& seqNo, const NKikimrSubDomains::TProcessingParams& processingParams, const ui32 flags = 0) - : TEvProposeTransaction(txKind, ssId, source, txId, std::move(txBody), flags) + ui64 txId, TString txBody, const TMessageSeqNo& seqNo, const NKikimrSubDomains::TProcessingParams& processingParams, const ui32 flags, ui64 subDomainPathId) + : TEvProposeTransaction(txKind, ssId, source, txId, std::move(txBody), flags, subDomainPathId) { Record.MutableProcessingParams()->CopyFrom(processingParams); *Record.MutableSeqNo() = seqNo.SerializeToProto(); diff --git a/ydb/core/tx/columnshard/columnshard__init.cpp b/ydb/core/tx/columnshard/columnshard__init.cpp index f45dbe14cfe..9190c7c7ba7 100644 --- a/ydb/core/tx/columnshard/columnshard__init.cpp +++ b/ydb/core/tx/columnshard/columnshard__init.cpp @@ -108,6 +108,11 @@ void TTxInit::Complete(const TActorContext& ctx) { AFL_VERIFY(!Self->IsTxInitFinished); Self->IsTxInitFinished = true; Self->TrySwitchToWork(ctx); + if (Self->SpaceWatcher->SubDomainPathId) { + Self->SpaceWatcher->StartWatchingSubDomainPathId(); + } else { + Self->SpaceWatcher->StartFindSubDomainPathId(); + } } class TTxUpdateSchema: public TTransactionBase<TColumnShard> { diff --git a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp index 55cff6c401f..1a097066844 100644 --- a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp +++ b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp @@ -54,6 +54,16 @@ public: } else { AFL_VERIFY(Self->CurrentSchemeShardId == record.GetSchemeShardId()); } + if (txKind == NKikimrTxColumnShard::TX_KIND_SCHEMA) { + if (record.HasSubDomainPathId()) { + ui64 subDomainPathId = record.GetSubDomainPathId(); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "propose")("subdomain_id", subDomainPathId); + Self->SpaceWatcher->PersistSubDomainPathId(subDomainPathId, txc); + Self->SpaceWatcher->StartWatchingSubDomainPathId(); + } else { + Self->SpaceWatcher->StartFindSubDomainPathId(); + } + } } std::optional<TMessageSeqNo> msgSeqNo; if (Ev->Get()->Record.HasSeqNo()) { diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index b869f48a7b0..7dee0672eec 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -203,24 +203,30 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex writeMeta.SetLongTxId(NLongTxService::TLongTxId::FromProto(record.GetLongTxId())); writeMeta.SetWritePartId(record.GetWritePartId()); - const auto returnFail = [&](const NColumnShard::ECumulativeCounters signalIndex, const EWriteFailReason reason) { + const auto returnFail = [&](const NColumnShard::ECumulativeCounters signalIndex, const EWriteFailReason reason, NKikimrTxColumnShard::EResultStatus resultStatus) { Counters.GetTabletCounters()->IncCounter(signalIndex); - ctx.Send(source, std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeMeta, NKikimrTxColumnShard::EResultStatus::ERROR)); + ctx.Send(source, std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeMeta, resultStatus)); Counters.GetCSCounters().OnFailedWriteResponse(reason); return; }; + if (SpaceWatcher->SubDomainOutOfSpace && (!record.HasModificationType() || (record.GetModificationType() != NKikimrTxColumnShard::TEvWrite::OPERATION_DELETE))) { + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "skip_writing")("reason", "quota_exceeded"); + Counters.GetTabletCounters()->IncCounter(COUNTER_OUT_OF_SPACE); + return returnFail(COUNTER_WRITE_FAIL, EWriteFailReason::Overload, NKikimrTxColumnShard::EResultStatus::OVERLOADED); + } + if (!AppDataVerified().ColumnShardConfig.GetWritingEnabled()) { AFL_WARN(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "skip_writing")("reason", "disabled"); - return returnFail(COUNTER_WRITE_FAIL, EWriteFailReason::Disabled); + return returnFail(COUNTER_WRITE_FAIL, EWriteFailReason::Disabled, NKikimrTxColumnShard::EResultStatus::ERROR); } if (!TablesManager.IsReadyForStartWrite(pathId, false)) { LOG_S_NOTICE("Write (fail) into pathId:" << writeMeta.GetTableId() << (TablesManager.HasPrimaryIndex() ? "" : " no index") << " at tablet " << TabletID()); - return returnFail(COUNTER_WRITE_FAIL, EWriteFailReason::NoTable); + return returnFail(COUNTER_WRITE_FAIL, EWriteFailReason::NoTable, NKikimrTxColumnShard::EResultStatus::ERROR); } { @@ -230,7 +236,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex .CheckWriteData(); if (status.IsFail()) { AFL_WARN(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "writing_fail_through_compaction")("reason", status.GetErrorMessage()); - return returnFail(COUNTER_WRITE_FAIL, EWriteFailReason::CompactionCriteria); + return returnFail(COUNTER_WRITE_FAIL, EWriteFailReason::CompactionCriteria, NKikimrTxColumnShard::EResultStatus::ERROR); } } @@ -239,7 +245,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex if (!arrowData->ParseFromProto(record)) { LOG_S_ERROR( "Write (fail) " << record.GetData().size() << " bytes into pathId " << writeMeta.GetTableId() << " at tablet " << TabletID()); - return returnFail(COUNTER_WRITE_FAIL, EWriteFailReason::IncorrectSchema); + return returnFail(COUNTER_WRITE_FAIL, EWriteFailReason::IncorrectSchema, NKikimrTxColumnShard::EResultStatus::ERROR); } NEvWrite::TWriteData writeData(writeMetaPtr, arrowData, snapshotSchema->GetIndexInfo().GetReplaceKey(), @@ -564,7 +570,11 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor return; } - auto overloadStatus = CheckOverloadedImmediate(pathId); + const bool outOfSpace = SpaceWatcher->SubDomainOutOfSpace && (*mType != NEvWrite::EModificationType::Delete); + if (outOfSpace) { + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "skip_writing")("reason", "quota_exceeded")("source", "dataevent"); + } + auto overloadStatus = outOfSpace ? EOverloadStatus::Disk : CheckOverloadedImmediate(pathId); if (overloadStatus != EOverloadStatus::None) { std::unique_ptr<NActors::IEventBase> result = NEvents::TDataEvents::TEvWriteResult::BuildError( TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED, "overload data error"); diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index f8f6b922c1a..3af3f62a18d 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -97,6 +97,8 @@ TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet) , NormalizerController(StoragesManager, Counters.GetSubscribeCounters()) , SysLocks(this) { AFL_VERIFY(TabletActivityImpl->Inc() == 1); + SpaceWatcher = new TSpaceWatcher(this); + SpaceWatcherId = TActorContext::AsActorContext().Register(SpaceWatcher); } void TColumnShard::OnDetach(const TActorContext& ctx) { @@ -1158,6 +1160,7 @@ void TColumnShard::Die(const TActorContext& ctx) { NTabletPipe::CloseAndForgetClient(SelfId(), StatsReportPipe); UnregisterMediatorTimeCast(); NYDBTest::TControllers::GetColumnShardController()->OnTabletStopped(*this); + Send(SpaceWatcherId, new NActors::TEvents::TEvPoison); IActor::Die(ctx); } @@ -1606,6 +1609,7 @@ void TColumnShard::Enqueue(STFUNC_SIG) { HFunc(TEvPrivate::TEvTieringModified, HandleInit); HFunc(TEvPrivate::TEvNormalizerResult, Handle); HFunc(NOlap::NDataAccessorControl::TEvAskTabletDataAccessors, Handle); + HFunc(TEvTxProxySchemeCache::TEvWatchNotifyUpdated, Handle); default: AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "unexpected event in enqueue"); return NTabletFlatExecutor::TTabletExecutedFlat::Enqueue(ev); diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h index 6bd8acfc0da..3bb3665069c 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.h +++ b/ydb/core/tx/columnshard/columnshard_impl.h @@ -2,6 +2,7 @@ #include "background_controller.h" #include "columnshard.h" #include "columnshard_private_events.h" +#include "columnshard_subdomain_path_id.h" #include "counters.h" #include "defs.h" #include "inflight_request_tracker.h" @@ -39,6 +40,7 @@ #include <ydb/core/tx/tiering/common.h> #include <ydb/core/tx/time_cast/time_cast.h> #include <ydb/core/tx/tx_processing.h> +#include <ydb/core/tx/scheme_cache/scheme_cache.h> #include <ydb/services/metadata/abstract/common.h> #include <ydb/services/metadata/service.h> @@ -182,6 +184,9 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa friend class TTxRemoveSharedBlobs; friend class TTxFinishAsyncTransaction; friend class TWaitEraseTablesTxSubscriber; + friend class TTxPersistSubDomainOutOfSpace; + friend class TTxPersistSubDomainPathId; + friend class TSpaceWatcher; friend class NOlap::TCleanupPortionsColumnEngineChanges; friend class NOlap::TCleanupTablesColumnEngineChanges; @@ -295,8 +300,8 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa void Handle(NOlap::NDataSharing::NEvents::TEvFinishedFromSource::TPtr& ev, const TActorContext& ctx); void Handle(NOlap::NDataSharing::NEvents::TEvAckFinishToSource::TPtr& ev, const TActorContext& ctx); void Handle(NOlap::NDataSharing::NEvents::TEvAckFinishFromInitiator::TPtr& ev, const TActorContext& ctx); - void Handle(NOlap::NDataAccessorControl::TEvAskTabletDataAccessors::TPtr& ev, const TActorContext& ctx); + void Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TPtr& ev, const TActorContext& ctx); void HandleInit(TEvPrivate::TEvTieringModified::TPtr& ev, const TActorContext&); @@ -463,6 +468,7 @@ protected: HFunc(NOlap::NDataSharing::NEvents::TEvAckFinishToSource, Handle); HFunc(NOlap::NDataSharing::NEvents::TEvAckFinishFromInitiator, Handle); HFunc(NOlap::NDataAccessorControl::TEvAskTabletDataAccessors, Handle); + HFunc(TEvTxProxySchemeCache::TEvWatchNotifyUpdated, Handle); default: if (!HandleDefaultEvents(ev, SelfId())) { @@ -552,6 +558,8 @@ private: TLimits Limits; NOlap::TNormalizationController NormalizerController; NDataShard::TSysLocks SysLocks; + TSpaceWatcher* SpaceWatcher; + TActorId SpaceWatcherId; void TryRegisterMediatorTimeCast(); void UnregisterMediatorTimeCast(); diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h index e73f581ef90..2f7f93cf7cc 100644 --- a/ydb/core/tx/columnshard/columnshard_schema.h +++ b/ydb/core/tx/columnshard/columnshard_schema.h @@ -83,7 +83,9 @@ struct Schema : NIceDb::Schema { LastCompletedTxId = 14, LastNormalizerSequentialId = 15, GCBarrierPreparationGen = 16, - GCBarrierPreparationStep = 17 + GCBarrierPreparationStep = 17, + SubDomainLocalPathId = 18, + SubDomainOutOfSpace = 19 }; enum class EInsertTableIds : ui8 { diff --git a/ydb/core/tx/columnshard/columnshard_subdomain_path_id.cpp b/ydb/core/tx/columnshard/columnshard_subdomain_path_id.cpp new file mode 100644 index 00000000000..b11a35a4f0d --- /dev/null +++ b/ydb/core/tx/columnshard/columnshard_subdomain_path_id.cpp @@ -0,0 +1,120 @@ +#include "columnshard_impl.h" + +namespace NKikimr::NColumnShard { + +class TTxPersistSubDomainOutOfSpace : public NTabletFlatExecutor::TTransactionBase<TColumnShard> { +public: + TTxPersistSubDomainOutOfSpace(TColumnShard* self, bool outOfSpace) + : TTransactionBase(self) + , OutOfSpace(outOfSpace) + { } + + TTxType GetTxType() const override { return TXTYPE_PERSIST_SUBDOMAIN_OUT_OF_SPACE; } + + bool Execute(TTransactionContext& txc, const TActorContext&) override { + NIceDb::TNiceDb db(txc.DB); + + if (Self->SpaceWatcher->SubDomainOutOfSpace != OutOfSpace) { + Schema::SaveSpecialValue(db, Schema::EValueIds::SubDomainOutOfSpace, ui64(OutOfSpace ? 1 : 0)); + Self->SpaceWatcher->SubDomainOutOfSpace = OutOfSpace; + } + + return true; + } + + void Complete(const TActorContext&) override { + // nothing + } + +private: + const bool OutOfSpace; +}; + +class TTxPersistSubDomainPathId : public NTabletFlatExecutor::TTransactionBase<TColumnShard> { +public: + TTxPersistSubDomainPathId(TColumnShard* self, ui64 localPathId) + : TTransactionBase(self) + , LocalPathId(localPathId) + { } + + TTxType GetTxType() const override { return TXTYPE_PERSIST_SUBDOMAIN_PATH_ID; } + + bool Execute(TTransactionContext& txc, const TActorContext&) override { + if (!Self->SpaceWatcher->SubDomainPathId) { + Self->SpaceWatcher->PersistSubDomainPathId(LocalPathId, txc); + Self->SpaceWatcher->StartWatchingSubDomainPathId(); + } + return true; + } + + void Complete(const TActorContext&) override { + // nothing + } + +private: + const ui64 LocalPathId; +}; + +void TSpaceWatcher::PersistSubDomainPathId(ui64 localPathId, + NTabletFlatExecutor::TTransactionContext &txc) { + SubDomainPathId = localPathId; + NIceDb::TNiceDb db(txc.DB); + Schema::SaveSpecialValue(db, Schema::EValueIds::SubDomainLocalPathId, localPathId); +} + +void TSpaceWatcher::StopWatchingSubDomainPathId() { + if (WatchingSubDomainPathId) { + Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchRemove()); + WatchingSubDomainPathId.reset(); + } +} + +void TSpaceWatcher::StartWatchingSubDomainPathId() { + if (!SubDomainPathId) { + return; + } + + if (!WatchingSubDomainPathId) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("started_watching_subdomain", *SubDomainPathId); + Self->Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchPathId(TPathId(Self->CurrentSchemeShardId, *SubDomainPathId))); + WatchingSubDomainPathId = *SubDomainPathId; + } +} + +void TSpaceWatcher::Handle(NActors::TEvents::TEvPoison::TPtr& , const TActorContext& ctx) { + Die(ctx); +} + +void TColumnShard::Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TPtr& ev, const TActorContext& ctx) { + const auto* msg = ev->Get(); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("notify_subdomain", msg->PathId); + const bool outOfSpace = msg->Result->GetPathDescription() + .GetDomainDescription() + .GetDomainState() + .GetDiskQuotaExceeded(); + + Execute(new TTxPersistSubDomainOutOfSpace(this, outOfSpace), ctx); +} + +static constexpr TDuration MaxFindSubDomainPathIdDelay = TDuration::Minutes(10); + +void TSpaceWatcher::StartFindSubDomainPathId(bool delayFirstRequest) { + if (!FindSubDomainPathIdActor && + Self->CurrentSchemeShardId != 0 && + (!SubDomainPathId)) + { + FindSubDomainPathIdActor = Register(CreateFindSubDomainPathIdActor(SelfId(), Self->TabletID(), Self->CurrentSchemeShardId, delayFirstRequest, MaxFindSubDomainPathIdDelay)); + } +} + + +void TSpaceWatcher::Handle(NSchemeShard::TEvSchemeShard::TEvSubDomainPathIdFound::TPtr& ev, const TActorContext& ctx) { + const auto* msg = ev->Get(); + if (FindSubDomainPathIdActor == ev->Sender) { + FindSubDomainPathIdActor = { }; + } + AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "subdomain_found")("scheme_shard_id", msg->SchemeShardId)("local_path_id", msg->LocalPathId); + Self->Execute(new TTxPersistSubDomainPathId(Self, msg->LocalPathId), ctx); +} + +} diff --git a/ydb/core/tx/columnshard/columnshard_subdomain_path_id.h b/ydb/core/tx/columnshard/columnshard_subdomain_path_id.h new file mode 100644 index 00000000000..b313c483874 --- /dev/null +++ b/ydb/core/tx/columnshard/columnshard_subdomain_path_id.h @@ -0,0 +1,56 @@ +#pragma once + +#include <ydb/library/actors/core/actorid.h> +#include <ydb/core/tx/scheme_cache/scheme_cache.h> + +namespace NKikimr::NColumnShard::NLoading { +class TSpecialValuesInitializer; +}; + +namespace NKikimr::NColumnShard { + +class TSpaceWatcher : public TActorBootstrapped<TSpaceWatcher> { + TColumnShard* Self; + NActors::TActorId FindSubDomainPathIdActor; + std::optional<NKikimr::TLocalPathId> SubDomainPathId; + std::optional<NKikimr::TLocalPathId> WatchingSubDomainPathId; + bool SubDomainOutOfSpace = false; + +public: + friend class TColumnShard; + friend class TTxInit; + friend class TTxPersistSubDomainOutOfSpace; + friend class TTxPersistSubDomainPathId; + friend class NKikimr::NColumnShard::NLoading::TSpecialValuesInitializer; + +public: + TSpaceWatcher(TColumnShard* self) + : Self(self) { + } + + void PersistSubDomainPathId(ui64 localPathId, NTabletFlatExecutor::TTransactionContext &txc); + void StopWatchingSubDomainPathId(); + void StartWatchingSubDomainPathId(); + void StartFindSubDomainPathId(bool delayFirstRequest = true); + + void Bootstrap(const TActorContext& /*ctx*/) { + Become(&TThis::StateWork); + } + + void Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TPtr& ev, const TActorContext& ctx); + void Handle(NSchemeShard::TEvSchemeShard::TEvSubDomainPathIdFound::TPtr& ev, const TActorContext&); + void Handle(NActors::TEvents::TEvPoison::TPtr& ev, const TActorContext&); + + STFUNC(StateWork) { + switch (ev->GetTypeRewrite()) { + HFunc(NSchemeShard::TEvSchemeShard::TEvSubDomainPathIdFound, Handle); + HFunc(NActors::TEvents::TEvPoison, Handle); + default: + LOG_S_WARN("TSpaceWatcher.StateWork at " << " unhandled event type: " << ev->GetTypeName() + << " event: " << ev->ToString()); + break; + } + } +}; + +} diff --git a/ydb/core/tx/columnshard/loading/stages.cpp b/ydb/core/tx/columnshard/loading/stages.cpp index 42176ac2aa1..866099ddd8b 100644 --- a/ydb/core/tx/columnshard/loading/stages.cpp +++ b/ydb/core/tx/columnshard/loading/stages.cpp @@ -173,6 +173,16 @@ bool TSpecialValuesInitializer::DoExecute(NTabletFlatExecutor::TTransactionConte return false; } + if (!Schema::GetSpecialValueOpt(db, Schema::EValueIds::SubDomainLocalPathId, Self->SpaceWatcher->SubDomainPathId)) { + return false; + } + + ui64 outOfSpace = 0; + if (!Schema::GetSpecialValueOpt(db, Schema::EValueIds::SubDomainOutOfSpace, outOfSpace)) { + return false; + } + Self->SpaceWatcher->SubDomainOutOfSpace = outOfSpace; + { ui64 lastCompletedStep = 0; ui64 lastCompletedTx = 0; diff --git a/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.cpp b/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.cpp index 45efee22f28..ccde6657af3 100644 --- a/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.cpp +++ b/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.cpp @@ -56,7 +56,7 @@ void RefreshTiering(TTestBasicRuntime& runtime, const TActorId& sender) { bool ProposeSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, const TString& txBody, NOlap::TSnapshot snap) { auto event = std::make_unique<TEvColumnShard::TEvProposeTransaction>( - NKikimrTxColumnShard::TX_KIND_SCHEMA, 0, sender, snap.GetTxId(), txBody); + NKikimrTxColumnShard::TX_KIND_SCHEMA, 0, sender, snap.GetTxId(), txBody, 0, 0); ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, event.release()); auto ev = runtime.GrabEdgeEvent<TEvColumnShard::TEvProposeTransactionResult>(sender); @@ -207,7 +207,8 @@ void ScanIndexStats(TTestBasicRuntime& runtime, TActorId& sender, const std::vec ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, scan.release()); } -void ProposeCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 shardId, ui64 txId, const std::vector<ui64>& /* writeIds */, const ui64 lockId) { +template<class Checker> +void ProposeCommitCheck(TTestBasicRuntime& runtime, TActorId& sender, ui64 shardId, ui64 txId, const std::vector<ui64>& /* writeIds */, const ui64 lockId, Checker&& checker) { auto write = std::make_unique<NEvents::TDataEvents::TEvWrite>(txId, NKikimrDataEvents::TEvWrite::MODE_PREPARE); auto* lock = write->Record.MutableLocks()->AddLocks(); lock->SetLockId(lockId); @@ -219,14 +220,31 @@ void ProposeCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 shardId, u UNIT_ASSERT(event); auto& res = event->Record; - AFL_VERIFY(res.GetTxId() == txId)("tx_id", txId)("res", res.GetTxId()); - UNIT_ASSERT_EQUAL(res.GetStatus(), NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED); + checker(res); +} + +void ProposeCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 shardId, ui64 txId, const std::vector<ui64>& writeIds, const ui64 lockId) { + ProposeCommitCheck(runtime, sender, shardId, txId, writeIds, lockId, [&](auto& res) { + AFL_VERIFY(res.GetTxId() == txId)("tx_id", txId)("res", res.GetTxId()); + UNIT_ASSERT_EQUAL(res.GetStatus(), NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED); + }); +} + +void ProposeCommitFail(TTestBasicRuntime& runtime, TActorId& sender, ui64 shardId, ui64 txId, const std::vector<ui64>& writeIds, const ui64 lockId) { + ProposeCommitCheck(runtime, sender, shardId, txId, writeIds, lockId, [&](auto& res) { + UNIT_ASSERT_UNEQUAL(res.GetStatus(), NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED); + }); } void ProposeCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 txId, const std::vector<ui64>& writeIds, const ui64 lockId) { ProposeCommit(runtime, sender, TTestTxConfig::TxTablet0, txId, writeIds, lockId); } + +void ProposeCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 txId, const std::vector<ui64>& writeIds) { + ProposeCommit(runtime, sender, TTestTxConfig::TxTablet0, txId, writeIds); +} + void PlanCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 planStep, const TSet<ui64>& txIds) { PlanCommit(runtime, sender, TTestTxConfig::TxTablet0, planStep, txIds); } diff --git a/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h b/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h index 1be3d35827e..7ba31f9425d 100644 --- a/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h +++ b/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h @@ -429,6 +429,8 @@ ui32 WaitWriteResult(TTestBasicRuntime& runtime, ui64 shardId, std::vector<ui64> void ScanIndexStats(TTestBasicRuntime& runtime, TActorId& sender, const std::vector<ui64>& pathIds, NOlap::TSnapshot snap, ui64 scanId = 0); +void ProposeCommitFail( + TTestBasicRuntime& runtime, TActorId& sender, ui64 shardId, ui64 txId, const std::vector<ui64>& writeIds, const ui64 lockId = 1); void ProposeCommit( TTestBasicRuntime& runtime, TActorId& sender, ui64 shardId, ui64 txId, const std::vector<ui64>& writeIds, const ui64 lockId = 1); void ProposeCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 txId, const std::vector<ui64>& writeIds, const ui64 lockId = 1); diff --git a/ydb/core/tx/columnshard/ya.make b/ydb/core/tx/columnshard/ya.make index f1048afeef6..f945d8d4fa9 100644 --- a/ydb/core/tx/columnshard/ya.make +++ b/ydb/core/tx/columnshard/ya.make @@ -12,6 +12,7 @@ SRCS( columnshard__propose_transaction.cpp columnshard__scan.cpp columnshard__statistics.cpp + columnshard_subdomain_path_id.cpp columnshard__write.cpp columnshard__write_index.cpp columnshard.cpp diff --git a/ydb/core/tx/datashard/datashard_subdomain_path_id.cpp b/ydb/core/tx/datashard/datashard_subdomain_path_id.cpp index ac973a45e0c..356d982e876 100644 --- a/ydb/core/tx/datashard/datashard_subdomain_path_id.cpp +++ b/ydb/core/tx/datashard/datashard_subdomain_path_id.cpp @@ -56,6 +56,7 @@ private: }; void TDataShard::Handle(NSchemeShard::TEvSchemeShard::TEvSubDomainPathIdFound::TPtr& ev, const TActorContext& ctx) { + AFL_DEBUG(NKikimrServices::TX_DATASHARD)("event", "subdomain_found"); const auto* msg = ev->Get(); if (FindSubDomainPathIdActor == ev->Sender) { diff --git a/ydb/core/tx/schemeshard/olap/operations/alter_store.cpp b/ydb/core/tx/schemeshard/olap/operations/alter_store.cpp index 668495bdc08..fd230cafb93 100644 --- a/ydb/core/tx/schemeshard/olap/operations/alter_store.cpp +++ b/ydb/core/tx/schemeshard/olap/operations/alter_store.cpp @@ -151,7 +151,9 @@ public: context.Ctx.SelfID, ui64(OperationId.GetTxId()), columnShardTxBody, seqNo, - context.SS->SelectProcessingParams(txState->TargetPathId)); + context.SS->SelectProcessingParams(txState->TargetPathId), + 0, + 0); context.OnComplete.BindMsgToPipe(OperationId, tabletId, shard.Idx, event.release()); } else { diff --git a/ydb/core/tx/schemeshard/olap/operations/alter_table.cpp b/ydb/core/tx/schemeshard/olap/operations/alter_table.cpp index 488bddfe846..377249c5555 100644 --- a/ydb/core/tx/schemeshard/olap/operations/alter_table.cpp +++ b/ydb/core/tx/schemeshard/olap/operations/alter_table.cpp @@ -63,7 +63,9 @@ public: context.Ctx.SelfID, ui64(OperationId.GetTxId()), txShardString, seqNo, - context.SS->SelectProcessingParams(txState->TargetPathId)); + context.SS->SelectProcessingParams(txState->TargetPathId), + 0, + 0); context.OnComplete.BindMsgToPipe(OperationId, tabletId, shard.Idx, event.release()); diff --git a/ydb/core/tx/schemeshard/olap/operations/create_store.cpp b/ydb/core/tx/schemeshard/olap/operations/create_store.cpp index 218c476912b..60c6ab538be 100644 --- a/ydb/core/tx/schemeshard/olap/operations/create_store.cpp +++ b/ydb/core/tx/schemeshard/olap/operations/create_store.cpp @@ -94,13 +94,16 @@ public: TTabletId tabletId = context.SS->ShardInfos[shard.Idx].TabletID; if (shard.TabletType == ETabletType::ColumnShard) { + const ui64 subDomainPathId = context.SS->ResolvePathIdForDomain(txState->TargetPathId).LocalPathId; auto event = std::make_unique<TEvColumnShard::TEvProposeTransaction>( NKikimrTxColumnShard::TX_KIND_SCHEMA, context.SS->TabletID(), context.Ctx.SelfID, ui64(OperationId.GetTxId()), columnShardTxBody, seqNo, - context.SS->SelectProcessingParams(txState->TargetPathId)); + context.SS->SelectProcessingParams(txState->TargetPathId), + 0, + subDomainPathId); context.OnComplete.BindMsgToPipe(OperationId, tabletId, shard.Idx, event.release()); } else { diff --git a/ydb/core/tx/schemeshard/olap/operations/create_table.cpp b/ydb/core/tx/schemeshard/olap/operations/create_table.cpp index 374ac2ed02d..01403b255aa 100644 --- a/ydb/core/tx/schemeshard/olap/operations/create_table.cpp +++ b/ydb/core/tx/schemeshard/olap/operations/create_table.cpp @@ -300,14 +300,16 @@ public: TTabletId tabletId = context.SS->ShardInfos[shard.Idx].TabletID; if (shard.TabletType == ETabletType::ColumnShard) { + const ui64 subDomainPathId = context.SS->ResolvePathIdForDomain(txState->TargetPathId).LocalPathId; auto event = std::make_unique<TEvColumnShard::TEvProposeTransaction>( NKikimrTxColumnShard::TX_KIND_SCHEMA, context.SS->TabletID(), context.Ctx.SelfID, ui64(OperationId.GetTxId()), columnShardTxBody, seqNo, - context.SS->SelectProcessingParams(txState->TargetPathId)); - + context.SS->SelectProcessingParams(txState->TargetPathId), + 0, + subDomainPathId); context.OnComplete.BindMsgToPipe(OperationId, tabletId, shard.Idx, event.release()); } else { LOG_ERROR_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, DebugHint() << " unexpected tablet type"); diff --git a/ydb/core/tx/schemeshard/olap/operations/drop_table.cpp b/ydb/core/tx/schemeshard/olap/operations/drop_table.cpp index 368f8a7dd0b..a4e474512b5 100644 --- a/ydb/core/tx/schemeshard/olap/operations/drop_table.cpp +++ b/ydb/core/tx/schemeshard/olap/operations/drop_table.cpp @@ -70,7 +70,9 @@ public: context.Ctx.SelfID, ui64(OperationId.GetTxId()), columnShardTxBody, seqNo, - context.SS->SelectProcessingParams(txState->TargetPathId)); + context.SS->SelectProcessingParams(txState->TargetPathId), + 0, + 0); context.OnComplete.BindMsgToPipe(OperationId, tabletId, shard.Idx, event.release()); } diff --git a/ydb/core/tx/schemeshard/ut_olap/ut_olap.cpp b/ydb/core/tx/schemeshard/ut_olap/ut_olap.cpp index 4ec3b6a0f07..9c5865ebfa3 100644 --- a/ydb/core/tx/schemeshard/ut_olap/ut_olap.cpp +++ b/ydb/core/tx/schemeshard/ut_olap/ut_olap.cpp @@ -47,6 +47,58 @@ static const TVector<NArrow::NTest::TTestColumn> defaultYdbSchema = { }} +#define DEBUG_HINT (TStringBuilder() << "at line " << __LINE__) + +NLs::TCheckFunc LsCheckDiskQuotaExceeded( + bool expectExceeded = true, + const TString& debugHint = "" +) { + return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) { + auto& desc = record.GetPathDescription().GetDomainDescription(); + UNIT_ASSERT_VALUES_EQUAL_C( + desc.GetDomainState().GetDiskQuotaExceeded(), + expectExceeded, + debugHint << ", subdomain's disk space usage:\n" << desc.GetDiskSpaceUsage().DebugString() + ); + }; +} + +void CheckQuotaExceedance(TTestActorRuntime& runtime, + ui64 schemeShard, + const TString& pathToSubdomain, + bool expectExceeded, + const TString& debugHint = "" +) { + TestDescribeResult(DescribePath(runtime, schemeShard, pathToSubdomain), { + LsCheckDiskQuotaExceeded(expectExceeded, debugHint) + }); +} + +NKikimrTxDataShard::TEvPeriodicTableStats WaitTableStats(TTestActorRuntime& runtime, ui64 columnShardId, ui64 minPartCount = 0) { + NKikimrTxDataShard::TEvPeriodicTableStats stats; + bool captured = false; + + auto observer = runtime.AddObserver<TEvDataShard::TEvPeriodicTableStats>([&](const auto& event) { + const auto& record = event->Get()->Record; + if (record.GetDatashardId() == columnShardId && record.GetTableStats().GetPartCount() >= minPartCount) { + stats = record; + captured = true; + } + } + ); + + for (int i = 0; i < 5 && !captured; ++i) { + TDispatchOptions options; + options.CustomFinalCondition = [&]() { return captured; }; + runtime.DispatchEvents(options, TDuration::Seconds(5)); + } + + observer.Remove(); + + UNIT_ASSERT(captured); + + return stats; +} Y_UNIT_TEST_SUITE(TOlap) { Y_UNIT_TEST(CreateStore) { @@ -926,4 +978,161 @@ Y_UNIT_TEST_SUITE(TOlap) { TestLs(runtime, "/MyRoot/OlapStore", false, NLs::PathExist); } + + Y_UNIT_TEST(StoreStatsQuota) { + TTestBasicRuntime runtime; + + TTestEnvOptions opts; + opts.DisableStatsBatching(true); + opts.EnablePersistentPartitionStats(true); + opts.EnableTopicDiskSubDomainQuota(false); + + TTestEnv env(runtime, opts); + runtime.SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG); + runtime.UpdateCurrentTime(TInstant::Now() - TDuration::Seconds(600)); + + auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>(); + csController->SetOverridePeriodicWakeupActivationPeriod(TDuration::Seconds(1)); + csController->SetOverrideLagForCompactionBeforeTierings(TDuration::Seconds(1)); + csController->DisableBackground(NKikimr::NYDBTest::ICSController::EBackground::Compaction); + + // disable stats batching + auto& appData = runtime.GetAppData(); + appData.SchemeShardConfig.SetStatsBatchTimeoutMs(0); + appData.SchemeShardConfig.SetStatsMaxBatchSize(0); + + // apply config via reboot + TActorId sender = runtime.AllocateEdgeActor(); + GracefulRestartTablet(runtime, TTestTxConfig::SchemeShard, sender); + + constexpr const char* databaseDescription = R"( + DatabaseQuotas { + data_size_hard_quota: 1000000 + data_size_soft_quota: 900000 + } + )"; + + ui64 txId = 100; + + TestCreateSubDomain(runtime, ++txId, "/MyRoot", TStringBuilder() << R"( + Name: "SomeDatabase" + )" << databaseDescription + ); + + const TString& olapSchema = defaultStoreSchema; + + TestCreateOlapStore(runtime, ++txId, "/MyRoot/SomeDatabase", olapSchema); + env.TestWaitNotification(runtime, txId); + + TestLs(runtime, "/MyRoot/SomeDatabase/OlapStore", false, NLs::PathExist); + TestLsPathId(runtime, 3, NLs::PathStringEqual("/MyRoot/SomeDatabase/OlapStore")); + + TString tableSchema = R"( + Name: "ColumnTable" + ColumnShardCount: 1 + )"; + + TestCreateColumnTable(runtime, ++txId, "/MyRoot/SomeDatabase/OlapStore", tableSchema); + env.TestWaitNotification(runtime, txId); + + ui64 pathId = 0; + ui64 shardId = 0; + ui64 planStep = 0; + auto checkFn = [&](const NKikimrScheme::TEvDescribeSchemeResult& record) { + auto& self = record.GetPathDescription().GetSelf(); + pathId = self.GetPathId(); + txId = self.GetCreateTxId() + 1; + planStep = self.GetCreateStep(); + auto& sharding = record.GetPathDescription().GetColumnTableDescription().GetSharding(); + UNIT_ASSERT_VALUES_EQUAL(sharding.ColumnShardsSize(), 1); + shardId = sharding.GetColumnShards()[0]; + UNIT_ASSERT_VALUES_EQUAL(record.GetPath(), "/MyRoot/SomeDatabase/OlapStore/ColumnTable"); + }; + + TestLsPathId(runtime, 4, checkFn); + UNIT_ASSERT(shardId); + UNIT_ASSERT(pathId); + UNIT_ASSERT(planStep); + { + auto description = DescribePrivatePath(runtime, TTestTxConfig::SchemeShard, "/MyRoot/SomeDatabase/OlapStore/ColumnTable", true, true); + Cerr << description.DebugString() << Endl; + auto& tabletStats = description.GetPathDescription().GetTableStats(); + + UNIT_ASSERT(description.GetPathDescription().HasTableStats()); + UNIT_ASSERT_EQUAL(tabletStats.GetRowCount(), 0); + UNIT_ASSERT_EQUAL(tabletStats.GetDataSize(), 0); + } + + CheckQuotaExceedance(runtime, TTestTxConfig::SchemeShard, "/MyRoot/SomeDatabase", false, DEBUG_HINT); + + ui32 rowsInBatch = 100000; + ui64 writeId = 0; + TString data; + + { // Write data directly into shard + TActorId sender = runtime.AllocateEdgeActor(); + data = NTxUT::MakeTestBlob({0, rowsInBatch}, defaultYdbSchema, {}, { "timestamp" }); + TSet<ui64> txIds; + for (ui32 i = 0; i < 10; ++i) { + std::vector<ui64> writeIds; + ++txId; + NTxUT::WriteData(runtime, sender, shardId, ++writeId, pathId, data, defaultYdbSchema, &writeIds, NEvWrite::EModificationType::Upsert, txId); + NTxUT::ProposeCommit(runtime, sender, shardId, txId, writeIds, txId); + txIds.insert(txId); + } + + NTxUT::PlanCommit(runtime, sender, shardId, ++planStep, txIds); + + WaitTableStats(runtime, shardId); + CheckQuotaExceedance(runtime, TTestTxConfig::SchemeShard, "/MyRoot/SomeDatabase", true, DEBUG_HINT); + + // Check that writes will fail if quota is exceeded + std::vector<ui64> writeIds; + ++txId; + AFL_VERIFY(!NTxUT::WriteData(runtime, sender, shardId, ++writeId, pathId, data, defaultYdbSchema, &writeIds, NEvWrite::EModificationType::Upsert, txId)); + NTxUT::ProposeCommitFail(runtime, sender, shardId, txId, writeIds, txId); + } + + csController->WaitIndexation(TDuration::Seconds(5)); + { + auto description = DescribePrivatePath(runtime, TTestTxConfig::SchemeShard, "/MyRoot/SomeDatabase/OlapStore", true, true); + Cerr << description.DebugString() << Endl; + auto& tabletStats = description.GetPathDescription().GetTableStats(); + + UNIT_ASSERT_GT(tabletStats.GetRowCount(), 0); + UNIT_ASSERT_GT(tabletStats.GetDataSize(), 0); + UNIT_ASSERT_GT(tabletStats.GetPartCount(), 0); + UNIT_ASSERT_GT(tabletStats.GetRowUpdates(), 0); + UNIT_ASSERT_GT(tabletStats.GetImmediateTxCompleted(), 0); + UNIT_ASSERT_GT(tabletStats.GetPlannedTxCompleted(), 0); + UNIT_ASSERT_GT(tabletStats.GetLastAccessTime(), 0); + UNIT_ASSERT_GT(tabletStats.GetLastUpdateTime(), 0); + } + + { + auto description = DescribePrivatePath(runtime, TTestTxConfig::SchemeShard, "/MyRoot/SomeDatabase/OlapStore/ColumnTable", true, true); + Cerr << description.DebugString() << Endl; + auto& tabletStats = description.GetPathDescription().GetTableStats(); + + UNIT_ASSERT_GT(tabletStats.GetRowCount(), 0); + UNIT_ASSERT_GT(tabletStats.GetDataSize(), 0); + UNIT_ASSERT_GT(tabletStats.GetPartCount(), 0); + UNIT_ASSERT_GT(tabletStats.GetLastAccessTime(), 0); + UNIT_ASSERT_GT(tabletStats.GetLastUpdateTime(), 0); + } + + std::vector<ui64> writeIds; + TSet<ui64> txIds; + ++txId; + bool delResult = NTxUT::WriteData(runtime, sender, shardId, ++writeId, pathId, data, defaultYdbSchema, &writeIds, NEvWrite::EModificationType::Delete, txId); + Y_UNUSED(delResult); + NTxUT::ProposeCommit(runtime, sender, shardId, txId, writeIds, txId); + txIds.insert(txId); + NTxUT::PlanCommit(runtime, sender, shardId, ++planStep, txIds); + + csController->EnableBackground(NKikimr::NYDBTest::ICSController::EBackground::Compaction); + csController->WaitCompactions(TDuration::Seconds(60)); + WaitTableStats(runtime, shardId); + CheckQuotaExceedance(runtime, TTestTxConfig::SchemeShard, "/MyRoot/SomeDatabase", false, DEBUG_HINT); + } } diff --git a/ydb/tests/functional/serverless/conftest.py b/ydb/tests/functional/serverless/conftest.py index d46197139f4..7840d125b91 100644 --- a/ydb/tests/functional/serverless/conftest.py +++ b/ydb/tests/functional/serverless/conftest.py @@ -118,6 +118,15 @@ def ydb_disk_quoted_serverless_db(ydb_cluster, ydb_root, ydb_hostel_db, ydb_safe yield database_name +@pytest.fixture(scope='function') +def ydb_disk_small_quoted_serverless_db(ydb_cluster, ydb_root, ydb_hostel_db, ydb_safe_test_name): + database_name = os.path.join(ydb_root, "quoted_serverless", ydb_safe_test_name) + disk_quotas = {'hard': 6 * 1024 * 1024, 'soft': 3 * 1024 * 1024} + + with ydb_serverless_db_ctx(ydb_cluster, database_name, ydb_hostel_db, disk_quotas=disk_quotas): + yield database_name + + @contextlib.contextmanager def ydb_serverless_db_with_exclusive_nodes_ctx(ydb_cluster, database, hostel_db, timeout_seconds=100): logger.info("setup ydb_serverless_db_with_exclusive_nodes %s using hostel %s", database, hostel_db) diff --git a/ydb/tests/functional/serverless/test_serverless.py b/ydb/tests/functional/serverless/test_serverless.py index 4a06c621483..8345cfad6ad 100644 --- a/ydb/tests/functional/serverless/test_serverless.py +++ b/ydb/tests/functional/serverless/test_serverless.py @@ -6,6 +6,11 @@ import time import copy import pytest import subprocess +import datetime + +from ydb.retries import ( + RetrySettings, +) from hamcrest import assert_that, contains_inanyorder, not_none, not_, only_contains, is_in from tornado import gen @@ -49,6 +54,9 @@ CLUSTER_CONFIG = dict( datashard_config={ 'keep_snapshot_timeout': 5000, }, + column_shard_config={ + 'disabled_on_scheme_shard': False, + }, ) @@ -365,7 +373,7 @@ def test_database_with_disk_quotas(ydb_hostel_db, ydb_disk_quoted_serverless_db, commit_tx=True, ) except ydb.Unavailable as e: - if not ignore_out_of_space or 'OUT_OF_SPACE' not in str(e): + if not ignore_out_of_space or 'DISK_SPACE_EXHAUSTED' not in str(e): raise @restart_coro_on_bad_session @@ -438,7 +446,7 @@ def test_database_with_disk_quotas(ydb_hostel_db, ydb_disk_quoted_serverless_db, # Writes should be denied when database moves into DiskQuotaExceeded state time.sleep(1) - with pytest.raises(ydb.Unavailable, match=r'.*OUT_OF_SPACE.*'): + with pytest.raises(ydb.Unavailable, match=r'.*DISK_SPACE_EXHAUSTED.*'): IOLoop.current().run_sync(lambda: async_write_key(path, 0, 'test', ignore_out_of_space=False)) with pytest.raises(ydb.Unavailable, match=r'.*out of disk space.*'): IOLoop.current().run_sync(lambda: async_bulk_upsert(path, [BulkUpsertRow(0, 'test')])) @@ -460,6 +468,95 @@ def test_database_with_disk_quotas(ydb_hostel_db, ydb_disk_quoted_serverless_db, IOLoop.current().run_sync(lambda: async_write_key(path, 0, 'test', ignore_out_of_space=False)) +def test_database_with_column_disk_quotas(ydb_hostel_db, ydb_disk_small_quoted_serverless_db, ydb_endpoint, ydb_cluster): + logger.debug( + "test for serverless db %s over hostel db %s", ydb_disk_small_quoted_serverless_db, ydb_hostel_db + ) + + database = ydb_disk_small_quoted_serverless_db + + driver_config = ydb.DriverConfig( + ydb_endpoint, + database + ) + logger.info(" database is %s", database) + + driver = ydb.Driver(driver_config) + driver.wait(120) + + def create_table(session, path): + logger.debug("creating table %s", path) + + session.execute_scheme( + f""" + CREATE TABLE `{path}` ( + id Uint64 NOT NULL, + value_string Utf8, + PRIMARY KEY(id) + ) + PARTITION BY HASH(id) + WITH ( + STORE = COLUMN, + AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 1, + TTL=interval("PT1M") on id as seconds + ) + """ + ) + + class BulkUpsertRow(object): + __slots__ = ('id', 'value_string') + + def __init__(self, id, value_string): + self.id = id + self.value_string = value_string + + @gen.coroutine + def async_bulk_upsert(path, rows): + column_types = ydb.BulkUpsertColumns() \ + .add_column('id', ydb.OptionalType(ydb.PrimitiveType.Uint64)) \ + .add_column('value_string', ydb.OptionalType(ydb.PrimitiveType.Utf8)) + yield driver.table_client.async_bulk_upsert(path, rows, column_types) + + driver.scheme_client.make_directory(os.path.join(database, "dirA0")) + with ydb.QuerySessionPool(driver) as qpool: + path = os.path.join(database, "dirA0", "table") + with ydb.SessionPool(driver) as pool: + pool.retry_operation_sync(create_table, None, path) + + data = 'a' * 7000000 + for start in range(0, 1): + IOLoop.current().run_sync(lambda: async_bulk_upsert(path, [BulkUpsertRow(int(datetime.datetime.now().timestamp()), data)])) + + for _ in range(120): + time.sleep(1) + described = ydb_cluster.client.describe(database, '') + logger.debug('database state after write_keys: %s', described) + if described.PathDescription.DomainDescription.DomainState.DiskQuotaExceeded: + break + else: + assert False, 'database did not move into DiskQuotaExceeded state' + + # Writes should be denied when database moves into DiskQuotaExceeded state + time.sleep(1) + logger.debug("start insert") + with pytest.raises(ydb.issues.Overloaded, match=r'.*overload data error.*'): + qpool.execute_with_retries( + "UPSERT INTO `{}`(id, value_string) VALUES({}, 'xxx')".format(path, int(datetime.datetime.now().timestamp()) + 100), + retry_settings=RetrySettings(max_retries=0)) + logger.debug("finish insert") + with pytest.raises(ydb.issues.Overloaded, match=r'.*System overloaded.*'): + IOLoop.current().run_sync(lambda: async_bulk_upsert(path, [BulkUpsertRow(0, 'test')])) + + for _ in range(300): + time.sleep(1) + described = ydb_cluster.client.describe(database, '') + logger.debug('database state after erase_keys: %s', described) + if not described.PathDescription.DomainDescription.DomainState.DiskQuotaExceeded: + break + else: + assert False, 'database did not move out of DiskQuotaExceeded state' + + def test_discovery(ydb_hostel_db, ydb_serverless_db, ydb_endpoint): def list_endpoints(database): logger.debug("List endpoints of %s", database) diff --git a/ydb/tests/library/harness/kikimr_config.py b/ydb/tests/library/harness/kikimr_config.py index 2b0653795ff..4c146a260c1 100644 --- a/ydb/tests/library/harness/kikimr_config.py +++ b/ydb/tests/library/harness/kikimr_config.py @@ -356,6 +356,9 @@ class KikimrConfigGenerator(object): if columnshard_config: self.yaml_config["column_shard_config"] = columnshard_config + if column_shard_config: + self.yaml_config["column_shard_config"] = column_shard_config + self.__build() if self.grpc_ssl_enable: |