aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlexander Avdonkin <aavdonkin@yandex.ru>2025-03-05 17:00:45 +0300
committerGitHub <noreply@github.com>2025-03-05 17:00:45 +0300
commit347babcacae409cceab8bb183cc98b110f971465 (patch)
tree8a53003b6b9320582e50bb95176e10e7394bf926
parent1d48d63269fb026ed127542eda87bd8860dcc0cb (diff)
downloadydb-347babcacae409cceab8bb183cc98b110f971465.tar.gz
Stop writing to column tables if quota is exceeded (#11580)
-rw-r--r--ydb/core/protos/counters_columnshard.proto2
-rw-r--r--ydb/core/protos/tx_columnshard.proto1
-rw-r--r--ydb/core/tx/columnshard/columnshard.h9
-rw-r--r--ydb/core/tx/columnshard/columnshard__init.cpp5
-rw-r--r--ydb/core/tx/columnshard/columnshard__propose_transaction.cpp10
-rw-r--r--ydb/core/tx/columnshard/columnshard__write.cpp24
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp4
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.h10
-rw-r--r--ydb/core/tx/columnshard/columnshard_schema.h4
-rw-r--r--ydb/core/tx/columnshard/columnshard_subdomain_path_id.cpp120
-rw-r--r--ydb/core/tx/columnshard/columnshard_subdomain_path_id.h56
-rw-r--r--ydb/core/tx/columnshard/loading/stages.cpp10
-rw-r--r--ydb/core/tx/columnshard/test_helper/columnshard_ut_common.cpp26
-rw-r--r--ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h2
-rw-r--r--ydb/core/tx/columnshard/ya.make1
-rw-r--r--ydb/core/tx/datashard/datashard_subdomain_path_id.cpp1
-rw-r--r--ydb/core/tx/schemeshard/olap/operations/alter_store.cpp4
-rw-r--r--ydb/core/tx/schemeshard/olap/operations/alter_table.cpp4
-rw-r--r--ydb/core/tx/schemeshard/olap/operations/create_store.cpp5
-rw-r--r--ydb/core/tx/schemeshard/olap/operations/create_table.cpp6
-rw-r--r--ydb/core/tx/schemeshard/olap/operations/drop_table.cpp4
-rw-r--r--ydb/core/tx/schemeshard/ut_olap/ut_olap.cpp209
-rw-r--r--ydb/tests/functional/serverless/conftest.py9
-rw-r--r--ydb/tests/functional/serverless/test_serverless.py101
-rw-r--r--ydb/tests/library/harness/kikimr_config.py3
25 files changed, 606 insertions, 24 deletions
diff --git a/ydb/core/protos/counters_columnshard.proto b/ydb/core/protos/counters_columnshard.proto
index 327a8c999f5..d1c7a6d5e68 100644
--- a/ydb/core/protos/counters_columnshard.proto
+++ b/ydb/core/protos/counters_columnshard.proto
@@ -205,4 +205,6 @@ enum ETxTypes {
TXTYPE_ASK_PORTION_METADATA = 38 [(TxTypeOpts) = {Name: "TxAskPortionMetadata"}];
TXTYPE_WRITE_PORTIONS_FINISHED = 39 [(TxTypeOpts) = {Name: "TxWritePortionsFinished"}];
TXTYPE_WRITE_PORTIONS_FAILED = 40 [(TxTypeOpts) = {Name: "TxWritePortionsFailed"}];
+ TXTYPE_PERSIST_SUBDOMAIN_OUT_OF_SPACE = 41 [(TxTypeOpts) = {Name: "TxPersistSubDomainOutOfSpace"}];
+ TXTYPE_PERSIST_SUBDOMAIN_PATH_ID = 42 [(TxTypeOpts) = {Name: "TxPersistSubDomainPathId"}];
}
diff --git a/ydb/core/protos/tx_columnshard.proto b/ydb/core/protos/tx_columnshard.proto
index 657786fd450..a6dd75e9ded 100644
--- a/ydb/core/protos/tx_columnshard.proto
+++ b/ydb/core/protos/tx_columnshard.proto
@@ -162,6 +162,7 @@ message TEvProposeTransaction {
optional NKikimrSubDomains.TProcessingParams ProcessingParams = 6;
optional uint64 Flags = 7;
optional NKikimrTx.TMessageSeqNo SeqNo = 8;
+ optional uint64 SubDomainPathId = 9;
}
message TEvCheckPlannedTransaction {
diff --git a/ydb/core/tx/columnshard/columnshard.h b/ydb/core/tx/columnshard/columnshard.h
index 4f731c2dd12..53f1efccf3a 100644
--- a/ydb/core/tx/columnshard/columnshard.h
+++ b/ydb/core/tx/columnshard/columnshard.h
@@ -140,16 +140,19 @@ namespace TEvColumnShard {
}
TEvProposeTransaction(NKikimrTxColumnShard::ETransactionKind txKind, ui64 ssId, const TActorId& source,
- ui64 txId, TString txBody, const ui32 flags = 0)
+ ui64 txId, TString txBody, const ui32 flags, ui64 subDomainPathId)
: TEvProposeTransaction(txKind, source, txId, std::move(txBody), flags)
{
// Y_ABORT_UNLESS(txKind == NKikimrTxColumnShard::TX_KIND_SCHEMA);
Record.SetSchemeShardId(ssId);
+ if (subDomainPathId != 0) {
+ Record.SetSubDomainPathId(subDomainPathId);
+ }
}
TEvProposeTransaction(NKikimrTxColumnShard::ETransactionKind txKind, ui64 ssId, const TActorId& source,
- ui64 txId, TString txBody, const TMessageSeqNo& seqNo, const NKikimrSubDomains::TProcessingParams& processingParams, const ui32 flags = 0)
- : TEvProposeTransaction(txKind, ssId, source, txId, std::move(txBody), flags)
+ ui64 txId, TString txBody, const TMessageSeqNo& seqNo, const NKikimrSubDomains::TProcessingParams& processingParams, const ui32 flags, ui64 subDomainPathId)
+ : TEvProposeTransaction(txKind, ssId, source, txId, std::move(txBody), flags, subDomainPathId)
{
Record.MutableProcessingParams()->CopyFrom(processingParams);
*Record.MutableSeqNo() = seqNo.SerializeToProto();
diff --git a/ydb/core/tx/columnshard/columnshard__init.cpp b/ydb/core/tx/columnshard/columnshard__init.cpp
index f45dbe14cfe..9190c7c7ba7 100644
--- a/ydb/core/tx/columnshard/columnshard__init.cpp
+++ b/ydb/core/tx/columnshard/columnshard__init.cpp
@@ -108,6 +108,11 @@ void TTxInit::Complete(const TActorContext& ctx) {
AFL_VERIFY(!Self->IsTxInitFinished);
Self->IsTxInitFinished = true;
Self->TrySwitchToWork(ctx);
+ if (Self->SpaceWatcher->SubDomainPathId) {
+ Self->SpaceWatcher->StartWatchingSubDomainPathId();
+ } else {
+ Self->SpaceWatcher->StartFindSubDomainPathId();
+ }
}
class TTxUpdateSchema: public TTransactionBase<TColumnShard> {
diff --git a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp
index 55cff6c401f..1a097066844 100644
--- a/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp
+++ b/ydb/core/tx/columnshard/columnshard__propose_transaction.cpp
@@ -54,6 +54,16 @@ public:
} else {
AFL_VERIFY(Self->CurrentSchemeShardId == record.GetSchemeShardId());
}
+ if (txKind == NKikimrTxColumnShard::TX_KIND_SCHEMA) {
+ if (record.HasSubDomainPathId()) {
+ ui64 subDomainPathId = record.GetSubDomainPathId();
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "propose")("subdomain_id", subDomainPathId);
+ Self->SpaceWatcher->PersistSubDomainPathId(subDomainPathId, txc);
+ Self->SpaceWatcher->StartWatchingSubDomainPathId();
+ } else {
+ Self->SpaceWatcher->StartFindSubDomainPathId();
+ }
+ }
}
std::optional<TMessageSeqNo> msgSeqNo;
if (Ev->Get()->Record.HasSeqNo()) {
diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp
index b869f48a7b0..7dee0672eec 100644
--- a/ydb/core/tx/columnshard/columnshard__write.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write.cpp
@@ -203,24 +203,30 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
writeMeta.SetLongTxId(NLongTxService::TLongTxId::FromProto(record.GetLongTxId()));
writeMeta.SetWritePartId(record.GetWritePartId());
- const auto returnFail = [&](const NColumnShard::ECumulativeCounters signalIndex, const EWriteFailReason reason) {
+ const auto returnFail = [&](const NColumnShard::ECumulativeCounters signalIndex, const EWriteFailReason reason, NKikimrTxColumnShard::EResultStatus resultStatus) {
Counters.GetTabletCounters()->IncCounter(signalIndex);
- ctx.Send(source, std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeMeta, NKikimrTxColumnShard::EResultStatus::ERROR));
+ ctx.Send(source, std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeMeta, resultStatus));
Counters.GetCSCounters().OnFailedWriteResponse(reason);
return;
};
+ if (SpaceWatcher->SubDomainOutOfSpace && (!record.HasModificationType() || (record.GetModificationType() != NKikimrTxColumnShard::TEvWrite::OPERATION_DELETE))) {
+ AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "skip_writing")("reason", "quota_exceeded");
+ Counters.GetTabletCounters()->IncCounter(COUNTER_OUT_OF_SPACE);
+ return returnFail(COUNTER_WRITE_FAIL, EWriteFailReason::Overload, NKikimrTxColumnShard::EResultStatus::OVERLOADED);
+ }
+
if (!AppDataVerified().ColumnShardConfig.GetWritingEnabled()) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "skip_writing")("reason", "disabled");
- return returnFail(COUNTER_WRITE_FAIL, EWriteFailReason::Disabled);
+ return returnFail(COUNTER_WRITE_FAIL, EWriteFailReason::Disabled, NKikimrTxColumnShard::EResultStatus::ERROR);
}
if (!TablesManager.IsReadyForStartWrite(pathId, false)) {
LOG_S_NOTICE("Write (fail) into pathId:" << writeMeta.GetTableId() << (TablesManager.HasPrimaryIndex() ? "" : " no index")
<< " at tablet " << TabletID());
- return returnFail(COUNTER_WRITE_FAIL, EWriteFailReason::NoTable);
+ return returnFail(COUNTER_WRITE_FAIL, EWriteFailReason::NoTable, NKikimrTxColumnShard::EResultStatus::ERROR);
}
{
@@ -230,7 +236,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
.CheckWriteData();
if (status.IsFail()) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "writing_fail_through_compaction")("reason", status.GetErrorMessage());
- return returnFail(COUNTER_WRITE_FAIL, EWriteFailReason::CompactionCriteria);
+ return returnFail(COUNTER_WRITE_FAIL, EWriteFailReason::CompactionCriteria, NKikimrTxColumnShard::EResultStatus::ERROR);
}
}
@@ -239,7 +245,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
if (!arrowData->ParseFromProto(record)) {
LOG_S_ERROR(
"Write (fail) " << record.GetData().size() << " bytes into pathId " << writeMeta.GetTableId() << " at tablet " << TabletID());
- return returnFail(COUNTER_WRITE_FAIL, EWriteFailReason::IncorrectSchema);
+ return returnFail(COUNTER_WRITE_FAIL, EWriteFailReason::IncorrectSchema, NKikimrTxColumnShard::EResultStatus::ERROR);
}
NEvWrite::TWriteData writeData(writeMetaPtr, arrowData, snapshotSchema->GetIndexInfo().GetReplaceKey(),
@@ -564,7 +570,11 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
return;
}
- auto overloadStatus = CheckOverloadedImmediate(pathId);
+ const bool outOfSpace = SpaceWatcher->SubDomainOutOfSpace && (*mType != NEvWrite::EModificationType::Delete);
+ if (outOfSpace) {
+ AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "skip_writing")("reason", "quota_exceeded")("source", "dataevent");
+ }
+ auto overloadStatus = outOfSpace ? EOverloadStatus::Disk : CheckOverloadedImmediate(pathId);
if (overloadStatus != EOverloadStatus::None) {
std::unique_ptr<NActors::IEventBase> result = NEvents::TDataEvents::TEvWriteResult::BuildError(
TabletID(), 0, NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED, "overload data error");
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index f8f6b922c1a..3af3f62a18d 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -97,6 +97,8 @@ TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet)
, NormalizerController(StoragesManager, Counters.GetSubscribeCounters())
, SysLocks(this) {
AFL_VERIFY(TabletActivityImpl->Inc() == 1);
+ SpaceWatcher = new TSpaceWatcher(this);
+ SpaceWatcherId = TActorContext::AsActorContext().Register(SpaceWatcher);
}
void TColumnShard::OnDetach(const TActorContext& ctx) {
@@ -1158,6 +1160,7 @@ void TColumnShard::Die(const TActorContext& ctx) {
NTabletPipe::CloseAndForgetClient(SelfId(), StatsReportPipe);
UnregisterMediatorTimeCast();
NYDBTest::TControllers::GetColumnShardController()->OnTabletStopped(*this);
+ Send(SpaceWatcherId, new NActors::TEvents::TEvPoison);
IActor::Die(ctx);
}
@@ -1606,6 +1609,7 @@ void TColumnShard::Enqueue(STFUNC_SIG) {
HFunc(TEvPrivate::TEvTieringModified, HandleInit);
HFunc(TEvPrivate::TEvNormalizerResult, Handle);
HFunc(NOlap::NDataAccessorControl::TEvAskTabletDataAccessors, Handle);
+ HFunc(TEvTxProxySchemeCache::TEvWatchNotifyUpdated, Handle);
default:
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "unexpected event in enqueue");
return NTabletFlatExecutor::TTabletExecutedFlat::Enqueue(ev);
diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h
index 6bd8acfc0da..3bb3665069c 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.h
+++ b/ydb/core/tx/columnshard/columnshard_impl.h
@@ -2,6 +2,7 @@
#include "background_controller.h"
#include "columnshard.h"
#include "columnshard_private_events.h"
+#include "columnshard_subdomain_path_id.h"
#include "counters.h"
#include "defs.h"
#include "inflight_request_tracker.h"
@@ -39,6 +40,7 @@
#include <ydb/core/tx/tiering/common.h>
#include <ydb/core/tx/time_cast/time_cast.h>
#include <ydb/core/tx/tx_processing.h>
+#include <ydb/core/tx/scheme_cache/scheme_cache.h>
#include <ydb/services/metadata/abstract/common.h>
#include <ydb/services/metadata/service.h>
@@ -182,6 +184,9 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
friend class TTxRemoveSharedBlobs;
friend class TTxFinishAsyncTransaction;
friend class TWaitEraseTablesTxSubscriber;
+ friend class TTxPersistSubDomainOutOfSpace;
+ friend class TTxPersistSubDomainPathId;
+ friend class TSpaceWatcher;
friend class NOlap::TCleanupPortionsColumnEngineChanges;
friend class NOlap::TCleanupTablesColumnEngineChanges;
@@ -295,8 +300,8 @@ class TColumnShard: public TActor<TColumnShard>, public NTabletFlatExecutor::TTa
void Handle(NOlap::NDataSharing::NEvents::TEvFinishedFromSource::TPtr& ev, const TActorContext& ctx);
void Handle(NOlap::NDataSharing::NEvents::TEvAckFinishToSource::TPtr& ev, const TActorContext& ctx);
void Handle(NOlap::NDataSharing::NEvents::TEvAckFinishFromInitiator::TPtr& ev, const TActorContext& ctx);
-
void Handle(NOlap::NDataAccessorControl::TEvAskTabletDataAccessors::TPtr& ev, const TActorContext& ctx);
+ void Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TPtr& ev, const TActorContext& ctx);
void HandleInit(TEvPrivate::TEvTieringModified::TPtr& ev, const TActorContext&);
@@ -463,6 +468,7 @@ protected:
HFunc(NOlap::NDataSharing::NEvents::TEvAckFinishToSource, Handle);
HFunc(NOlap::NDataSharing::NEvents::TEvAckFinishFromInitiator, Handle);
HFunc(NOlap::NDataAccessorControl::TEvAskTabletDataAccessors, Handle);
+ HFunc(TEvTxProxySchemeCache::TEvWatchNotifyUpdated, Handle);
default:
if (!HandleDefaultEvents(ev, SelfId())) {
@@ -552,6 +558,8 @@ private:
TLimits Limits;
NOlap::TNormalizationController NormalizerController;
NDataShard::TSysLocks SysLocks;
+ TSpaceWatcher* SpaceWatcher;
+ TActorId SpaceWatcherId;
void TryRegisterMediatorTimeCast();
void UnregisterMediatorTimeCast();
diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h
index e73f581ef90..2f7f93cf7cc 100644
--- a/ydb/core/tx/columnshard/columnshard_schema.h
+++ b/ydb/core/tx/columnshard/columnshard_schema.h
@@ -83,7 +83,9 @@ struct Schema : NIceDb::Schema {
LastCompletedTxId = 14,
LastNormalizerSequentialId = 15,
GCBarrierPreparationGen = 16,
- GCBarrierPreparationStep = 17
+ GCBarrierPreparationStep = 17,
+ SubDomainLocalPathId = 18,
+ SubDomainOutOfSpace = 19
};
enum class EInsertTableIds : ui8 {
diff --git a/ydb/core/tx/columnshard/columnshard_subdomain_path_id.cpp b/ydb/core/tx/columnshard/columnshard_subdomain_path_id.cpp
new file mode 100644
index 00000000000..b11a35a4f0d
--- /dev/null
+++ b/ydb/core/tx/columnshard/columnshard_subdomain_path_id.cpp
@@ -0,0 +1,120 @@
+#include "columnshard_impl.h"
+
+namespace NKikimr::NColumnShard {
+
+class TTxPersistSubDomainOutOfSpace : public NTabletFlatExecutor::TTransactionBase<TColumnShard> {
+public:
+ TTxPersistSubDomainOutOfSpace(TColumnShard* self, bool outOfSpace)
+ : TTransactionBase(self)
+ , OutOfSpace(outOfSpace)
+ { }
+
+ TTxType GetTxType() const override { return TXTYPE_PERSIST_SUBDOMAIN_OUT_OF_SPACE; }
+
+ bool Execute(TTransactionContext& txc, const TActorContext&) override {
+ NIceDb::TNiceDb db(txc.DB);
+
+ if (Self->SpaceWatcher->SubDomainOutOfSpace != OutOfSpace) {
+ Schema::SaveSpecialValue(db, Schema::EValueIds::SubDomainOutOfSpace, ui64(OutOfSpace ? 1 : 0));
+ Self->SpaceWatcher->SubDomainOutOfSpace = OutOfSpace;
+ }
+
+ return true;
+ }
+
+ void Complete(const TActorContext&) override {
+ // nothing
+ }
+
+private:
+ const bool OutOfSpace;
+};
+
+class TTxPersistSubDomainPathId : public NTabletFlatExecutor::TTransactionBase<TColumnShard> {
+public:
+ TTxPersistSubDomainPathId(TColumnShard* self, ui64 localPathId)
+ : TTransactionBase(self)
+ , LocalPathId(localPathId)
+ { }
+
+ TTxType GetTxType() const override { return TXTYPE_PERSIST_SUBDOMAIN_PATH_ID; }
+
+ bool Execute(TTransactionContext& txc, const TActorContext&) override {
+ if (!Self->SpaceWatcher->SubDomainPathId) {
+ Self->SpaceWatcher->PersistSubDomainPathId(LocalPathId, txc);
+ Self->SpaceWatcher->StartWatchingSubDomainPathId();
+ }
+ return true;
+ }
+
+ void Complete(const TActorContext&) override {
+ // nothing
+ }
+
+private:
+ const ui64 LocalPathId;
+};
+
+void TSpaceWatcher::PersistSubDomainPathId(ui64 localPathId,
+ NTabletFlatExecutor::TTransactionContext &txc) {
+ SubDomainPathId = localPathId;
+ NIceDb::TNiceDb db(txc.DB);
+ Schema::SaveSpecialValue(db, Schema::EValueIds::SubDomainLocalPathId, localPathId);
+}
+
+void TSpaceWatcher::StopWatchingSubDomainPathId() {
+ if (WatchingSubDomainPathId) {
+ Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchRemove());
+ WatchingSubDomainPathId.reset();
+ }
+}
+
+void TSpaceWatcher::StartWatchingSubDomainPathId() {
+ if (!SubDomainPathId) {
+ return;
+ }
+
+ if (!WatchingSubDomainPathId) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("started_watching_subdomain", *SubDomainPathId);
+ Self->Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchPathId(TPathId(Self->CurrentSchemeShardId, *SubDomainPathId)));
+ WatchingSubDomainPathId = *SubDomainPathId;
+ }
+}
+
+void TSpaceWatcher::Handle(NActors::TEvents::TEvPoison::TPtr& , const TActorContext& ctx) {
+ Die(ctx);
+}
+
+void TColumnShard::Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TPtr& ev, const TActorContext& ctx) {
+ const auto* msg = ev->Get();
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("notify_subdomain", msg->PathId);
+ const bool outOfSpace = msg->Result->GetPathDescription()
+ .GetDomainDescription()
+ .GetDomainState()
+ .GetDiskQuotaExceeded();
+
+ Execute(new TTxPersistSubDomainOutOfSpace(this, outOfSpace), ctx);
+}
+
+static constexpr TDuration MaxFindSubDomainPathIdDelay = TDuration::Minutes(10);
+
+void TSpaceWatcher::StartFindSubDomainPathId(bool delayFirstRequest) {
+ if (!FindSubDomainPathIdActor &&
+ Self->CurrentSchemeShardId != 0 &&
+ (!SubDomainPathId))
+ {
+ FindSubDomainPathIdActor = Register(CreateFindSubDomainPathIdActor(SelfId(), Self->TabletID(), Self->CurrentSchemeShardId, delayFirstRequest, MaxFindSubDomainPathIdDelay));
+ }
+}
+
+
+void TSpaceWatcher::Handle(NSchemeShard::TEvSchemeShard::TEvSubDomainPathIdFound::TPtr& ev, const TActorContext& ctx) {
+ const auto* msg = ev->Get();
+ if (FindSubDomainPathIdActor == ev->Sender) {
+ FindSubDomainPathIdActor = { };
+ }
+ AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "subdomain_found")("scheme_shard_id", msg->SchemeShardId)("local_path_id", msg->LocalPathId);
+ Self->Execute(new TTxPersistSubDomainPathId(Self, msg->LocalPathId), ctx);
+}
+
+}
diff --git a/ydb/core/tx/columnshard/columnshard_subdomain_path_id.h b/ydb/core/tx/columnshard/columnshard_subdomain_path_id.h
new file mode 100644
index 00000000000..b313c483874
--- /dev/null
+++ b/ydb/core/tx/columnshard/columnshard_subdomain_path_id.h
@@ -0,0 +1,56 @@
+#pragma once
+
+#include <ydb/library/actors/core/actorid.h>
+#include <ydb/core/tx/scheme_cache/scheme_cache.h>
+
+namespace NKikimr::NColumnShard::NLoading {
+class TSpecialValuesInitializer;
+};
+
+namespace NKikimr::NColumnShard {
+
+class TSpaceWatcher : public TActorBootstrapped<TSpaceWatcher> {
+ TColumnShard* Self;
+ NActors::TActorId FindSubDomainPathIdActor;
+ std::optional<NKikimr::TLocalPathId> SubDomainPathId;
+ std::optional<NKikimr::TLocalPathId> WatchingSubDomainPathId;
+ bool SubDomainOutOfSpace = false;
+
+public:
+ friend class TColumnShard;
+ friend class TTxInit;
+ friend class TTxPersistSubDomainOutOfSpace;
+ friend class TTxPersistSubDomainPathId;
+ friend class NKikimr::NColumnShard::NLoading::TSpecialValuesInitializer;
+
+public:
+ TSpaceWatcher(TColumnShard* self)
+ : Self(self) {
+ }
+
+ void PersistSubDomainPathId(ui64 localPathId, NTabletFlatExecutor::TTransactionContext &txc);
+ void StopWatchingSubDomainPathId();
+ void StartWatchingSubDomainPathId();
+ void StartFindSubDomainPathId(bool delayFirstRequest = true);
+
+ void Bootstrap(const TActorContext& /*ctx*/) {
+ Become(&TThis::StateWork);
+ }
+
+ void Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TPtr& ev, const TActorContext& ctx);
+ void Handle(NSchemeShard::TEvSchemeShard::TEvSubDomainPathIdFound::TPtr& ev, const TActorContext&);
+ void Handle(NActors::TEvents::TEvPoison::TPtr& ev, const TActorContext&);
+
+ STFUNC(StateWork) {
+ switch (ev->GetTypeRewrite()) {
+ HFunc(NSchemeShard::TEvSchemeShard::TEvSubDomainPathIdFound, Handle);
+ HFunc(NActors::TEvents::TEvPoison, Handle);
+ default:
+ LOG_S_WARN("TSpaceWatcher.StateWork at " << " unhandled event type: " << ev->GetTypeName()
+ << " event: " << ev->ToString());
+ break;
+ }
+ }
+};
+
+}
diff --git a/ydb/core/tx/columnshard/loading/stages.cpp b/ydb/core/tx/columnshard/loading/stages.cpp
index 42176ac2aa1..866099ddd8b 100644
--- a/ydb/core/tx/columnshard/loading/stages.cpp
+++ b/ydb/core/tx/columnshard/loading/stages.cpp
@@ -173,6 +173,16 @@ bool TSpecialValuesInitializer::DoExecute(NTabletFlatExecutor::TTransactionConte
return false;
}
+ if (!Schema::GetSpecialValueOpt(db, Schema::EValueIds::SubDomainLocalPathId, Self->SpaceWatcher->SubDomainPathId)) {
+ return false;
+ }
+
+ ui64 outOfSpace = 0;
+ if (!Schema::GetSpecialValueOpt(db, Schema::EValueIds::SubDomainOutOfSpace, outOfSpace)) {
+ return false;
+ }
+ Self->SpaceWatcher->SubDomainOutOfSpace = outOfSpace;
+
{
ui64 lastCompletedStep = 0;
ui64 lastCompletedTx = 0;
diff --git a/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.cpp b/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.cpp
index 45efee22f28..ccde6657af3 100644
--- a/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.cpp
+++ b/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.cpp
@@ -56,7 +56,7 @@ void RefreshTiering(TTestBasicRuntime& runtime, const TActorId& sender) {
bool ProposeSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, const TString& txBody, NOlap::TSnapshot snap) {
auto event = std::make_unique<TEvColumnShard::TEvProposeTransaction>(
- NKikimrTxColumnShard::TX_KIND_SCHEMA, 0, sender, snap.GetTxId(), txBody);
+ NKikimrTxColumnShard::TX_KIND_SCHEMA, 0, sender, snap.GetTxId(), txBody, 0, 0);
ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, event.release());
auto ev = runtime.GrabEdgeEvent<TEvColumnShard::TEvProposeTransactionResult>(sender);
@@ -207,7 +207,8 @@ void ScanIndexStats(TTestBasicRuntime& runtime, TActorId& sender, const std::vec
ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, scan.release());
}
-void ProposeCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 shardId, ui64 txId, const std::vector<ui64>& /* writeIds */, const ui64 lockId) {
+template<class Checker>
+void ProposeCommitCheck(TTestBasicRuntime& runtime, TActorId& sender, ui64 shardId, ui64 txId, const std::vector<ui64>& /* writeIds */, const ui64 lockId, Checker&& checker) {
auto write = std::make_unique<NEvents::TDataEvents::TEvWrite>(txId, NKikimrDataEvents::TEvWrite::MODE_PREPARE);
auto* lock = write->Record.MutableLocks()->AddLocks();
lock->SetLockId(lockId);
@@ -219,14 +220,31 @@ void ProposeCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 shardId, u
UNIT_ASSERT(event);
auto& res = event->Record;
- AFL_VERIFY(res.GetTxId() == txId)("tx_id", txId)("res", res.GetTxId());
- UNIT_ASSERT_EQUAL(res.GetStatus(), NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED);
+ checker(res);
+}
+
+void ProposeCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 shardId, ui64 txId, const std::vector<ui64>& writeIds, const ui64 lockId) {
+ ProposeCommitCheck(runtime, sender, shardId, txId, writeIds, lockId, [&](auto& res) {
+ AFL_VERIFY(res.GetTxId() == txId)("tx_id", txId)("res", res.GetTxId());
+ UNIT_ASSERT_EQUAL(res.GetStatus(), NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED);
+ });
+}
+
+void ProposeCommitFail(TTestBasicRuntime& runtime, TActorId& sender, ui64 shardId, ui64 txId, const std::vector<ui64>& writeIds, const ui64 lockId) {
+ ProposeCommitCheck(runtime, sender, shardId, txId, writeIds, lockId, [&](auto& res) {
+ UNIT_ASSERT_UNEQUAL(res.GetStatus(), NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED);
+ });
}
void ProposeCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 txId, const std::vector<ui64>& writeIds, const ui64 lockId) {
ProposeCommit(runtime, sender, TTestTxConfig::TxTablet0, txId, writeIds, lockId);
}
+
+void ProposeCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 txId, const std::vector<ui64>& writeIds) {
+ ProposeCommit(runtime, sender, TTestTxConfig::TxTablet0, txId, writeIds);
+}
+
void PlanCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 planStep, const TSet<ui64>& txIds) {
PlanCommit(runtime, sender, TTestTxConfig::TxTablet0, planStep, txIds);
}
diff --git a/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h b/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h
index 1be3d35827e..7ba31f9425d 100644
--- a/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h
+++ b/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h
@@ -429,6 +429,8 @@ ui32 WaitWriteResult(TTestBasicRuntime& runtime, ui64 shardId, std::vector<ui64>
void ScanIndexStats(TTestBasicRuntime& runtime, TActorId& sender, const std::vector<ui64>& pathIds, NOlap::TSnapshot snap, ui64 scanId = 0);
+void ProposeCommitFail(
+ TTestBasicRuntime& runtime, TActorId& sender, ui64 shardId, ui64 txId, const std::vector<ui64>& writeIds, const ui64 lockId = 1);
void ProposeCommit(
TTestBasicRuntime& runtime, TActorId& sender, ui64 shardId, ui64 txId, const std::vector<ui64>& writeIds, const ui64 lockId = 1);
void ProposeCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 txId, const std::vector<ui64>& writeIds, const ui64 lockId = 1);
diff --git a/ydb/core/tx/columnshard/ya.make b/ydb/core/tx/columnshard/ya.make
index f1048afeef6..f945d8d4fa9 100644
--- a/ydb/core/tx/columnshard/ya.make
+++ b/ydb/core/tx/columnshard/ya.make
@@ -12,6 +12,7 @@ SRCS(
columnshard__propose_transaction.cpp
columnshard__scan.cpp
columnshard__statistics.cpp
+ columnshard_subdomain_path_id.cpp
columnshard__write.cpp
columnshard__write_index.cpp
columnshard.cpp
diff --git a/ydb/core/tx/datashard/datashard_subdomain_path_id.cpp b/ydb/core/tx/datashard/datashard_subdomain_path_id.cpp
index ac973a45e0c..356d982e876 100644
--- a/ydb/core/tx/datashard/datashard_subdomain_path_id.cpp
+++ b/ydb/core/tx/datashard/datashard_subdomain_path_id.cpp
@@ -56,6 +56,7 @@ private:
};
void TDataShard::Handle(NSchemeShard::TEvSchemeShard::TEvSubDomainPathIdFound::TPtr& ev, const TActorContext& ctx) {
+ AFL_DEBUG(NKikimrServices::TX_DATASHARD)("event", "subdomain_found");
const auto* msg = ev->Get();
if (FindSubDomainPathIdActor == ev->Sender) {
diff --git a/ydb/core/tx/schemeshard/olap/operations/alter_store.cpp b/ydb/core/tx/schemeshard/olap/operations/alter_store.cpp
index 668495bdc08..fd230cafb93 100644
--- a/ydb/core/tx/schemeshard/olap/operations/alter_store.cpp
+++ b/ydb/core/tx/schemeshard/olap/operations/alter_store.cpp
@@ -151,7 +151,9 @@ public:
context.Ctx.SelfID,
ui64(OperationId.GetTxId()),
columnShardTxBody, seqNo,
- context.SS->SelectProcessingParams(txState->TargetPathId));
+ context.SS->SelectProcessingParams(txState->TargetPathId),
+ 0,
+ 0);
context.OnComplete.BindMsgToPipe(OperationId, tabletId, shard.Idx, event.release());
} else {
diff --git a/ydb/core/tx/schemeshard/olap/operations/alter_table.cpp b/ydb/core/tx/schemeshard/olap/operations/alter_table.cpp
index 488bddfe846..377249c5555 100644
--- a/ydb/core/tx/schemeshard/olap/operations/alter_table.cpp
+++ b/ydb/core/tx/schemeshard/olap/operations/alter_table.cpp
@@ -63,7 +63,9 @@ public:
context.Ctx.SelfID,
ui64(OperationId.GetTxId()),
txShardString, seqNo,
- context.SS->SelectProcessingParams(txState->TargetPathId));
+ context.SS->SelectProcessingParams(txState->TargetPathId),
+ 0,
+ 0);
context.OnComplete.BindMsgToPipe(OperationId, tabletId, shard.Idx, event.release());
diff --git a/ydb/core/tx/schemeshard/olap/operations/create_store.cpp b/ydb/core/tx/schemeshard/olap/operations/create_store.cpp
index 218c476912b..60c6ab538be 100644
--- a/ydb/core/tx/schemeshard/olap/operations/create_store.cpp
+++ b/ydb/core/tx/schemeshard/olap/operations/create_store.cpp
@@ -94,13 +94,16 @@ public:
TTabletId tabletId = context.SS->ShardInfos[shard.Idx].TabletID;
if (shard.TabletType == ETabletType::ColumnShard) {
+ const ui64 subDomainPathId = context.SS->ResolvePathIdForDomain(txState->TargetPathId).LocalPathId;
auto event = std::make_unique<TEvColumnShard::TEvProposeTransaction>(
NKikimrTxColumnShard::TX_KIND_SCHEMA,
context.SS->TabletID(),
context.Ctx.SelfID,
ui64(OperationId.GetTxId()),
columnShardTxBody, seqNo,
- context.SS->SelectProcessingParams(txState->TargetPathId));
+ context.SS->SelectProcessingParams(txState->TargetPathId),
+ 0,
+ subDomainPathId);
context.OnComplete.BindMsgToPipe(OperationId, tabletId, shard.Idx, event.release());
} else {
diff --git a/ydb/core/tx/schemeshard/olap/operations/create_table.cpp b/ydb/core/tx/schemeshard/olap/operations/create_table.cpp
index 374ac2ed02d..01403b255aa 100644
--- a/ydb/core/tx/schemeshard/olap/operations/create_table.cpp
+++ b/ydb/core/tx/schemeshard/olap/operations/create_table.cpp
@@ -300,14 +300,16 @@ public:
TTabletId tabletId = context.SS->ShardInfos[shard.Idx].TabletID;
if (shard.TabletType == ETabletType::ColumnShard) {
+ const ui64 subDomainPathId = context.SS->ResolvePathIdForDomain(txState->TargetPathId).LocalPathId;
auto event = std::make_unique<TEvColumnShard::TEvProposeTransaction>(
NKikimrTxColumnShard::TX_KIND_SCHEMA,
context.SS->TabletID(),
context.Ctx.SelfID,
ui64(OperationId.GetTxId()),
columnShardTxBody, seqNo,
- context.SS->SelectProcessingParams(txState->TargetPathId));
-
+ context.SS->SelectProcessingParams(txState->TargetPathId),
+ 0,
+ subDomainPathId);
context.OnComplete.BindMsgToPipe(OperationId, tabletId, shard.Idx, event.release());
} else {
LOG_ERROR_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, DebugHint() << " unexpected tablet type");
diff --git a/ydb/core/tx/schemeshard/olap/operations/drop_table.cpp b/ydb/core/tx/schemeshard/olap/operations/drop_table.cpp
index 368f8a7dd0b..a4e474512b5 100644
--- a/ydb/core/tx/schemeshard/olap/operations/drop_table.cpp
+++ b/ydb/core/tx/schemeshard/olap/operations/drop_table.cpp
@@ -70,7 +70,9 @@ public:
context.Ctx.SelfID,
ui64(OperationId.GetTxId()),
columnShardTxBody, seqNo,
- context.SS->SelectProcessingParams(txState->TargetPathId));
+ context.SS->SelectProcessingParams(txState->TargetPathId),
+ 0,
+ 0);
context.OnComplete.BindMsgToPipe(OperationId, tabletId, shard.Idx, event.release());
}
diff --git a/ydb/core/tx/schemeshard/ut_olap/ut_olap.cpp b/ydb/core/tx/schemeshard/ut_olap/ut_olap.cpp
index 4ec3b6a0f07..9c5865ebfa3 100644
--- a/ydb/core/tx/schemeshard/ut_olap/ut_olap.cpp
+++ b/ydb/core/tx/schemeshard/ut_olap/ut_olap.cpp
@@ -47,6 +47,58 @@ static const TVector<NArrow::NTest::TTestColumn> defaultYdbSchema = {
}}
+#define DEBUG_HINT (TStringBuilder() << "at line " << __LINE__)
+
+NLs::TCheckFunc LsCheckDiskQuotaExceeded(
+ bool expectExceeded = true,
+ const TString& debugHint = ""
+) {
+ return [=] (const NKikimrScheme::TEvDescribeSchemeResult& record) {
+ auto& desc = record.GetPathDescription().GetDomainDescription();
+ UNIT_ASSERT_VALUES_EQUAL_C(
+ desc.GetDomainState().GetDiskQuotaExceeded(),
+ expectExceeded,
+ debugHint << ", subdomain's disk space usage:\n" << desc.GetDiskSpaceUsage().DebugString()
+ );
+ };
+}
+
+void CheckQuotaExceedance(TTestActorRuntime& runtime,
+ ui64 schemeShard,
+ const TString& pathToSubdomain,
+ bool expectExceeded,
+ const TString& debugHint = ""
+) {
+ TestDescribeResult(DescribePath(runtime, schemeShard, pathToSubdomain), {
+ LsCheckDiskQuotaExceeded(expectExceeded, debugHint)
+ });
+}
+
+NKikimrTxDataShard::TEvPeriodicTableStats WaitTableStats(TTestActorRuntime& runtime, ui64 columnShardId, ui64 minPartCount = 0) {
+ NKikimrTxDataShard::TEvPeriodicTableStats stats;
+ bool captured = false;
+
+ auto observer = runtime.AddObserver<TEvDataShard::TEvPeriodicTableStats>([&](const auto& event) {
+ const auto& record = event->Get()->Record;
+ if (record.GetDatashardId() == columnShardId && record.GetTableStats().GetPartCount() >= minPartCount) {
+ stats = record;
+ captured = true;
+ }
+ }
+ );
+
+ for (int i = 0; i < 5 && !captured; ++i) {
+ TDispatchOptions options;
+ options.CustomFinalCondition = [&]() { return captured; };
+ runtime.DispatchEvents(options, TDuration::Seconds(5));
+ }
+
+ observer.Remove();
+
+ UNIT_ASSERT(captured);
+
+ return stats;
+}
Y_UNIT_TEST_SUITE(TOlap) {
Y_UNIT_TEST(CreateStore) {
@@ -926,4 +978,161 @@ Y_UNIT_TEST_SUITE(TOlap) {
TestLs(runtime, "/MyRoot/OlapStore", false, NLs::PathExist);
}
+
+ Y_UNIT_TEST(StoreStatsQuota) {
+ TTestBasicRuntime runtime;
+
+ TTestEnvOptions opts;
+ opts.DisableStatsBatching(true);
+ opts.EnablePersistentPartitionStats(true);
+ opts.EnableTopicDiskSubDomainQuota(false);
+
+ TTestEnv env(runtime, opts);
+ runtime.SetLogPriority(NKikimrServices::TX_COLUMNSHARD, NActors::NLog::PRI_DEBUG);
+ runtime.UpdateCurrentTime(TInstant::Now() - TDuration::Seconds(600));
+
+ auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
+ csController->SetOverridePeriodicWakeupActivationPeriod(TDuration::Seconds(1));
+ csController->SetOverrideLagForCompactionBeforeTierings(TDuration::Seconds(1));
+ csController->DisableBackground(NKikimr::NYDBTest::ICSController::EBackground::Compaction);
+
+ // disable stats batching
+ auto& appData = runtime.GetAppData();
+ appData.SchemeShardConfig.SetStatsBatchTimeoutMs(0);
+ appData.SchemeShardConfig.SetStatsMaxBatchSize(0);
+
+ // apply config via reboot
+ TActorId sender = runtime.AllocateEdgeActor();
+ GracefulRestartTablet(runtime, TTestTxConfig::SchemeShard, sender);
+
+ constexpr const char* databaseDescription = R"(
+ DatabaseQuotas {
+ data_size_hard_quota: 1000000
+ data_size_soft_quota: 900000
+ }
+ )";
+
+ ui64 txId = 100;
+
+ TestCreateSubDomain(runtime, ++txId, "/MyRoot", TStringBuilder() << R"(
+ Name: "SomeDatabase"
+ )" << databaseDescription
+ );
+
+ const TString& olapSchema = defaultStoreSchema;
+
+ TestCreateOlapStore(runtime, ++txId, "/MyRoot/SomeDatabase", olapSchema);
+ env.TestWaitNotification(runtime, txId);
+
+ TestLs(runtime, "/MyRoot/SomeDatabase/OlapStore", false, NLs::PathExist);
+ TestLsPathId(runtime, 3, NLs::PathStringEqual("/MyRoot/SomeDatabase/OlapStore"));
+
+ TString tableSchema = R"(
+ Name: "ColumnTable"
+ ColumnShardCount: 1
+ )";
+
+ TestCreateColumnTable(runtime, ++txId, "/MyRoot/SomeDatabase/OlapStore", tableSchema);
+ env.TestWaitNotification(runtime, txId);
+
+ ui64 pathId = 0;
+ ui64 shardId = 0;
+ ui64 planStep = 0;
+ auto checkFn = [&](const NKikimrScheme::TEvDescribeSchemeResult& record) {
+ auto& self = record.GetPathDescription().GetSelf();
+ pathId = self.GetPathId();
+ txId = self.GetCreateTxId() + 1;
+ planStep = self.GetCreateStep();
+ auto& sharding = record.GetPathDescription().GetColumnTableDescription().GetSharding();
+ UNIT_ASSERT_VALUES_EQUAL(sharding.ColumnShardsSize(), 1);
+ shardId = sharding.GetColumnShards()[0];
+ UNIT_ASSERT_VALUES_EQUAL(record.GetPath(), "/MyRoot/SomeDatabase/OlapStore/ColumnTable");
+ };
+
+ TestLsPathId(runtime, 4, checkFn);
+ UNIT_ASSERT(shardId);
+ UNIT_ASSERT(pathId);
+ UNIT_ASSERT(planStep);
+ {
+ auto description = DescribePrivatePath(runtime, TTestTxConfig::SchemeShard, "/MyRoot/SomeDatabase/OlapStore/ColumnTable", true, true);
+ Cerr << description.DebugString() << Endl;
+ auto& tabletStats = description.GetPathDescription().GetTableStats();
+
+ UNIT_ASSERT(description.GetPathDescription().HasTableStats());
+ UNIT_ASSERT_EQUAL(tabletStats.GetRowCount(), 0);
+ UNIT_ASSERT_EQUAL(tabletStats.GetDataSize(), 0);
+ }
+
+ CheckQuotaExceedance(runtime, TTestTxConfig::SchemeShard, "/MyRoot/SomeDatabase", false, DEBUG_HINT);
+
+ ui32 rowsInBatch = 100000;
+ ui64 writeId = 0;
+ TString data;
+
+ { // Write data directly into shard
+ TActorId sender = runtime.AllocateEdgeActor();
+ data = NTxUT::MakeTestBlob({0, rowsInBatch}, defaultYdbSchema, {}, { "timestamp" });
+ TSet<ui64> txIds;
+ for (ui32 i = 0; i < 10; ++i) {
+ std::vector<ui64> writeIds;
+ ++txId;
+ NTxUT::WriteData(runtime, sender, shardId, ++writeId, pathId, data, defaultYdbSchema, &writeIds, NEvWrite::EModificationType::Upsert, txId);
+ NTxUT::ProposeCommit(runtime, sender, shardId, txId, writeIds, txId);
+ txIds.insert(txId);
+ }
+
+ NTxUT::PlanCommit(runtime, sender, shardId, ++planStep, txIds);
+
+ WaitTableStats(runtime, shardId);
+ CheckQuotaExceedance(runtime, TTestTxConfig::SchemeShard, "/MyRoot/SomeDatabase", true, DEBUG_HINT);
+
+ // Check that writes will fail if quota is exceeded
+ std::vector<ui64> writeIds;
+ ++txId;
+ AFL_VERIFY(!NTxUT::WriteData(runtime, sender, shardId, ++writeId, pathId, data, defaultYdbSchema, &writeIds, NEvWrite::EModificationType::Upsert, txId));
+ NTxUT::ProposeCommitFail(runtime, sender, shardId, txId, writeIds, txId);
+ }
+
+ csController->WaitIndexation(TDuration::Seconds(5));
+ {
+ auto description = DescribePrivatePath(runtime, TTestTxConfig::SchemeShard, "/MyRoot/SomeDatabase/OlapStore", true, true);
+ Cerr << description.DebugString() << Endl;
+ auto& tabletStats = description.GetPathDescription().GetTableStats();
+
+ UNIT_ASSERT_GT(tabletStats.GetRowCount(), 0);
+ UNIT_ASSERT_GT(tabletStats.GetDataSize(), 0);
+ UNIT_ASSERT_GT(tabletStats.GetPartCount(), 0);
+ UNIT_ASSERT_GT(tabletStats.GetRowUpdates(), 0);
+ UNIT_ASSERT_GT(tabletStats.GetImmediateTxCompleted(), 0);
+ UNIT_ASSERT_GT(tabletStats.GetPlannedTxCompleted(), 0);
+ UNIT_ASSERT_GT(tabletStats.GetLastAccessTime(), 0);
+ UNIT_ASSERT_GT(tabletStats.GetLastUpdateTime(), 0);
+ }
+
+ {
+ auto description = DescribePrivatePath(runtime, TTestTxConfig::SchemeShard, "/MyRoot/SomeDatabase/OlapStore/ColumnTable", true, true);
+ Cerr << description.DebugString() << Endl;
+ auto& tabletStats = description.GetPathDescription().GetTableStats();
+
+ UNIT_ASSERT_GT(tabletStats.GetRowCount(), 0);
+ UNIT_ASSERT_GT(tabletStats.GetDataSize(), 0);
+ UNIT_ASSERT_GT(tabletStats.GetPartCount(), 0);
+ UNIT_ASSERT_GT(tabletStats.GetLastAccessTime(), 0);
+ UNIT_ASSERT_GT(tabletStats.GetLastUpdateTime(), 0);
+ }
+
+ std::vector<ui64> writeIds;
+ TSet<ui64> txIds;
+ ++txId;
+ bool delResult = NTxUT::WriteData(runtime, sender, shardId, ++writeId, pathId, data, defaultYdbSchema, &writeIds, NEvWrite::EModificationType::Delete, txId);
+ Y_UNUSED(delResult);
+ NTxUT::ProposeCommit(runtime, sender, shardId, txId, writeIds, txId);
+ txIds.insert(txId);
+ NTxUT::PlanCommit(runtime, sender, shardId, ++planStep, txIds);
+
+ csController->EnableBackground(NKikimr::NYDBTest::ICSController::EBackground::Compaction);
+ csController->WaitCompactions(TDuration::Seconds(60));
+ WaitTableStats(runtime, shardId);
+ CheckQuotaExceedance(runtime, TTestTxConfig::SchemeShard, "/MyRoot/SomeDatabase", false, DEBUG_HINT);
+ }
}
diff --git a/ydb/tests/functional/serverless/conftest.py b/ydb/tests/functional/serverless/conftest.py
index d46197139f4..7840d125b91 100644
--- a/ydb/tests/functional/serverless/conftest.py
+++ b/ydb/tests/functional/serverless/conftest.py
@@ -118,6 +118,15 @@ def ydb_disk_quoted_serverless_db(ydb_cluster, ydb_root, ydb_hostel_db, ydb_safe
yield database_name
+@pytest.fixture(scope='function')
+def ydb_disk_small_quoted_serverless_db(ydb_cluster, ydb_root, ydb_hostel_db, ydb_safe_test_name):
+ database_name = os.path.join(ydb_root, "quoted_serverless", ydb_safe_test_name)
+ disk_quotas = {'hard': 6 * 1024 * 1024, 'soft': 3 * 1024 * 1024}
+
+ with ydb_serverless_db_ctx(ydb_cluster, database_name, ydb_hostel_db, disk_quotas=disk_quotas):
+ yield database_name
+
+
@contextlib.contextmanager
def ydb_serverless_db_with_exclusive_nodes_ctx(ydb_cluster, database, hostel_db, timeout_seconds=100):
logger.info("setup ydb_serverless_db_with_exclusive_nodes %s using hostel %s", database, hostel_db)
diff --git a/ydb/tests/functional/serverless/test_serverless.py b/ydb/tests/functional/serverless/test_serverless.py
index 4a06c621483..8345cfad6ad 100644
--- a/ydb/tests/functional/serverless/test_serverless.py
+++ b/ydb/tests/functional/serverless/test_serverless.py
@@ -6,6 +6,11 @@ import time
import copy
import pytest
import subprocess
+import datetime
+
+from ydb.retries import (
+ RetrySettings,
+)
from hamcrest import assert_that, contains_inanyorder, not_none, not_, only_contains, is_in
from tornado import gen
@@ -49,6 +54,9 @@ CLUSTER_CONFIG = dict(
datashard_config={
'keep_snapshot_timeout': 5000,
},
+ column_shard_config={
+ 'disabled_on_scheme_shard': False,
+ },
)
@@ -365,7 +373,7 @@ def test_database_with_disk_quotas(ydb_hostel_db, ydb_disk_quoted_serverless_db,
commit_tx=True,
)
except ydb.Unavailable as e:
- if not ignore_out_of_space or 'OUT_OF_SPACE' not in str(e):
+ if not ignore_out_of_space or 'DISK_SPACE_EXHAUSTED' not in str(e):
raise
@restart_coro_on_bad_session
@@ -438,7 +446,7 @@ def test_database_with_disk_quotas(ydb_hostel_db, ydb_disk_quoted_serverless_db,
# Writes should be denied when database moves into DiskQuotaExceeded state
time.sleep(1)
- with pytest.raises(ydb.Unavailable, match=r'.*OUT_OF_SPACE.*'):
+ with pytest.raises(ydb.Unavailable, match=r'.*DISK_SPACE_EXHAUSTED.*'):
IOLoop.current().run_sync(lambda: async_write_key(path, 0, 'test', ignore_out_of_space=False))
with pytest.raises(ydb.Unavailable, match=r'.*out of disk space.*'):
IOLoop.current().run_sync(lambda: async_bulk_upsert(path, [BulkUpsertRow(0, 'test')]))
@@ -460,6 +468,95 @@ def test_database_with_disk_quotas(ydb_hostel_db, ydb_disk_quoted_serverless_db,
IOLoop.current().run_sync(lambda: async_write_key(path, 0, 'test', ignore_out_of_space=False))
+def test_database_with_column_disk_quotas(ydb_hostel_db, ydb_disk_small_quoted_serverless_db, ydb_endpoint, ydb_cluster):
+ logger.debug(
+ "test for serverless db %s over hostel db %s", ydb_disk_small_quoted_serverless_db, ydb_hostel_db
+ )
+
+ database = ydb_disk_small_quoted_serverless_db
+
+ driver_config = ydb.DriverConfig(
+ ydb_endpoint,
+ database
+ )
+ logger.info(" database is %s", database)
+
+ driver = ydb.Driver(driver_config)
+ driver.wait(120)
+
+ def create_table(session, path):
+ logger.debug("creating table %s", path)
+
+ session.execute_scheme(
+ f"""
+ CREATE TABLE `{path}` (
+ id Uint64 NOT NULL,
+ value_string Utf8,
+ PRIMARY KEY(id)
+ )
+ PARTITION BY HASH(id)
+ WITH (
+ STORE = COLUMN,
+ AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 1,
+ TTL=interval("PT1M") on id as seconds
+ )
+ """
+ )
+
+ class BulkUpsertRow(object):
+ __slots__ = ('id', 'value_string')
+
+ def __init__(self, id, value_string):
+ self.id = id
+ self.value_string = value_string
+
+ @gen.coroutine
+ def async_bulk_upsert(path, rows):
+ column_types = ydb.BulkUpsertColumns() \
+ .add_column('id', ydb.OptionalType(ydb.PrimitiveType.Uint64)) \
+ .add_column('value_string', ydb.OptionalType(ydb.PrimitiveType.Utf8))
+ yield driver.table_client.async_bulk_upsert(path, rows, column_types)
+
+ driver.scheme_client.make_directory(os.path.join(database, "dirA0"))
+ with ydb.QuerySessionPool(driver) as qpool:
+ path = os.path.join(database, "dirA0", "table")
+ with ydb.SessionPool(driver) as pool:
+ pool.retry_operation_sync(create_table, None, path)
+
+ data = 'a' * 7000000
+ for start in range(0, 1):
+ IOLoop.current().run_sync(lambda: async_bulk_upsert(path, [BulkUpsertRow(int(datetime.datetime.now().timestamp()), data)]))
+
+ for _ in range(120):
+ time.sleep(1)
+ described = ydb_cluster.client.describe(database, '')
+ logger.debug('database state after write_keys: %s', described)
+ if described.PathDescription.DomainDescription.DomainState.DiskQuotaExceeded:
+ break
+ else:
+ assert False, 'database did not move into DiskQuotaExceeded state'
+
+ # Writes should be denied when database moves into DiskQuotaExceeded state
+ time.sleep(1)
+ logger.debug("start insert")
+ with pytest.raises(ydb.issues.Overloaded, match=r'.*overload data error.*'):
+ qpool.execute_with_retries(
+ "UPSERT INTO `{}`(id, value_string) VALUES({}, 'xxx')".format(path, int(datetime.datetime.now().timestamp()) + 100),
+ retry_settings=RetrySettings(max_retries=0))
+ logger.debug("finish insert")
+ with pytest.raises(ydb.issues.Overloaded, match=r'.*System overloaded.*'):
+ IOLoop.current().run_sync(lambda: async_bulk_upsert(path, [BulkUpsertRow(0, 'test')]))
+
+ for _ in range(300):
+ time.sleep(1)
+ described = ydb_cluster.client.describe(database, '')
+ logger.debug('database state after erase_keys: %s', described)
+ if not described.PathDescription.DomainDescription.DomainState.DiskQuotaExceeded:
+ break
+ else:
+ assert False, 'database did not move out of DiskQuotaExceeded state'
+
+
def test_discovery(ydb_hostel_db, ydb_serverless_db, ydb_endpoint):
def list_endpoints(database):
logger.debug("List endpoints of %s", database)
diff --git a/ydb/tests/library/harness/kikimr_config.py b/ydb/tests/library/harness/kikimr_config.py
index 2b0653795ff..4c146a260c1 100644
--- a/ydb/tests/library/harness/kikimr_config.py
+++ b/ydb/tests/library/harness/kikimr_config.py
@@ -356,6 +356,9 @@ class KikimrConfigGenerator(object):
if columnshard_config:
self.yaml_config["column_shard_config"] = columnshard_config
+ if column_shard_config:
+ self.yaml_config["column_shard_config"] = column_shard_config
+
self.__build()
if self.grpc_ssl_enable: