aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov333 <111685085+ivanmorozov333@users.noreply.github.com>2024-02-14 10:14:35 +0300
committerGitHub <noreply@github.com>2024-02-14 10:14:35 +0300
commit188a9d421bf7ebb3211b22b2a27aa808d8f59237 (patch)
tree4f96e584e1845087e6031bbf6105e14814b96cb9
parent698fdc293a3686db35f2d1cb9ad2f202c7c051cb (diff)
downloadydb-188a9d421bf7ebb3211b22b2a27aa808d8f59237.tar.gz
fixes for TxWriteIndex volume control, policies of exceptions on searching, eviction tasks for portions usage (#1894)
-rw-r--r--ydb/core/formats/arrow/program.cpp12
-rw-r--r--ydb/core/tx/columnshard/blobs_action/tier/write.cpp9
-rw-r--r--ydb/core/tx/columnshard/blobs_action/tier/write.h4
-rw-r--r--ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp2
-rw-r--r--ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.cpp7
-rw-r--r--ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.h1
-rw-r--r--ydb/core/tx/columnshard/blobs_reader/actor.cpp2
-rw-r--r--ydb/core/tx/columnshard/columnshard__write.cpp60
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.cpp4
-rw-r--r--ydb/core/tx/columnshard/columnshard_impl.h19
-rw-r--r--ydb/core/tx/columnshard/common/limits.cpp4
-rw-r--r--ydb/core/tx/columnshard/common/limits.h9
-rw-r--r--ydb/core/tx/columnshard/common/snapshot.h8
-rw-r--r--ydb/core/tx/columnshard/common/ya.make1
-rw-r--r--ydb/core/tx/columnshard/counters/columnshard.cpp25
-rw-r--r--ydb/core/tx/columnshard/counters/columnshard.h82
-rw-r--r--ydb/core/tx/columnshard/counters/common/owner.h3
-rw-r--r--ydb/core/tx/columnshard/counters/ya.make2
-rw-r--r--ydb/core/tx/columnshard/engines/changes/ttl.cpp8
-rw-r--r--ydb/core/tx/columnshard/engines/changes/with_appended.cpp10
-rw-r--r--ydb/core/tx/columnshard/engines/changes/with_appended.h5
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.cpp59
-rw-r--r--ydb/core/tx/columnshard/engines/column_engine_logs.h19
-rw-r--r--ydb/core/tx/columnshard/engines/portions/meta.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/portions/meta.h4
-rw-r--r--ydb/core/tx/columnshard/engines/portions/portion_info.cpp4
-rw-r--r--ydb/core/tx/columnshard/engines/portions/portion_info.h2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.cpp6
-rw-r--r--ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/optimizer.h11
-rw-r--r--ydb/core/tx/columnshard/engines/storage/storage.cpp5
-rw-r--r--ydb/core/tx/columnshard/engines/storage/storage.h2
-rw-r--r--ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp35
-rw-r--r--ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h19
-rw-r--r--ydb/core/tx/columnshard/engines/writer/write_controller.h13
-rw-r--r--ydb/core/tx/data_events/write_data.h3
35 files changed, 317 insertions, 144 deletions
diff --git a/ydb/core/formats/arrow/program.cpp b/ydb/core/formats/arrow/program.cpp
index 02d3c43d825..a05ea71da08 100644
--- a/ydb/core/formats/arrow/program.cpp
+++ b/ydb/core/formats/arrow/program.cpp
@@ -139,18 +139,20 @@ public:
TKernelFunction(const TFunctionPtr kernelsFunction, arrow::compute::ExecContext* ctx)
: TBase(ctx)
, Function(kernelsFunction)
- {}
+ {
+ AFL_VERIFY(Function);
+ }
arrow::Result<arrow::Datum> Call(const TAssignObject& assign, const TDatumBatch& batch) const override {
auto arguments = TBase::BuildArgs(batch, assign.GetArguments());
if (!arguments) {
return arrow::Status::Invalid("Error parsing args.");
}
- try {
+// try {
return Function->Execute(*arguments, assign.GetOptions(), TBase::Ctx);
- } catch (const std::exception& ex) {
- return arrow::Status::ExecutionError(ex.what());
- }
+// } catch (const std::exception& ex) {
+// return arrow::Status::ExecutionError(ex.what());
+// }
}
};
diff --git a/ydb/core/tx/columnshard/blobs_action/tier/write.cpp b/ydb/core/tx/columnshard/blobs_action/tier/write.cpp
index fa54aa75fad..9620f8d37e5 100644
--- a/ydb/core/tx/columnshard/blobs_action/tier/write.cpp
+++ b/ydb/core/tx/columnshard/blobs_action/tier/write.cpp
@@ -24,7 +24,6 @@ void TWriteAction::DoOnExecuteTxAfterWrite(NColumnShard::TColumnShard& /*self*/,
for (auto&& i : GetBlobsForWrite()) {
dbBlobs.RemoveTierDraftBlobId(GetStorageId(), i.first);
dbBlobs.AddTierBlobToDelete(GetStorageId(), i.first);
- GCInfo->MutableBlobsToDelete().emplace_back(i.first);
}
}
}
@@ -41,4 +40,12 @@ NKikimr::NOlap::TUnifiedBlobId TWriteAction::AllocateNextBlobId(const TString& d
return TUnifiedBlobId(Max<ui32>(), TLogoBlobID(TabletId, now.GetValue() >> 32, now.GetValue() & Max<ui32>(), TLogoBlobID::MaxChannel, data.size(), AtomicIncrement(Counter) % TLogoBlobID::MaxCookie, 1));
}
+void TWriteAction::DoOnCompleteTxAfterWrite(NColumnShard::TColumnShard& /*self*/, const bool blobsWroteSuccessfully) {
+ if (!blobsWroteSuccessfully) {
+ for (auto&& i : GetBlobsForWrite()) {
+ GCInfo->MutableBlobsToDelete().emplace_back(i.first);
+ }
+ }
+}
+
}
diff --git a/ydb/core/tx/columnshard/blobs_action/tier/write.h b/ydb/core/tx/columnshard/blobs_action/tier/write.h
index 9d65f971ce6..425cabc27d5 100644
--- a/ydb/core/tx/columnshard/blobs_action/tier/write.h
+++ b/ydb/core/tx/columnshard/blobs_action/tier/write.h
@@ -27,9 +27,7 @@ protected:
}
virtual void DoOnExecuteTxAfterWrite(NColumnShard::TColumnShard& self, NColumnShard::TBlobManagerDb& dbBlobs, const bool blobsWroteSuccessfully) override;
- virtual void DoOnCompleteTxAfterWrite(NColumnShard::TColumnShard& /*self*/, const bool /*blobsWroteSuccessfully*/) override {
-
- }
+ virtual void DoOnCompleteTxAfterWrite(NColumnShard::TColumnShard& /*self*/, const bool blobsWroteSuccessfully) override;
public:
virtual bool NeedDraftTransaction() const override {
return true;
diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp
index 4d080a3f930..1701f5ae6a5 100644
--- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp
+++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp
@@ -111,7 +111,7 @@ void TTxWrite::Complete(const TActorContext& ctx) {
for (ui32 i = 0; i < buffer.GetAggregations().size(); ++i) {
const auto& writeMeta = buffer.GetAggregations()[i]->GetWriteData()->GetWriteMeta();
ctx.Send(writeMeta.GetSource(), Results[i].release());
- Self->CSCounters.OnWriteTxComplete((now - writeMeta.GetWriteStartInstant()).MilliSeconds());
+ Self->CSCounters.OnWriteTxComplete(now - writeMeta.GetWriteStartInstant());
Self->CSCounters.OnSuccessWriteResponse();
}
diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.cpp b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.cpp
index cd48f921478..429696ac995 100644
--- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.cpp
+++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.cpp
@@ -85,4 +85,11 @@ TTxWriteIndex::TTxWriteIndex(TColumnShard* self, TEvPrivate::TEvWriteIndex::TPtr
Y_ABORT_UNLESS(Ev && Ev->Get()->IndexChanges);
}
+void TTxWriteIndex::Describe(IOutputStream& out) const noexcept {
+ out << TypeName(*this);
+ if (Ev->Get()->IndexChanges) {
+ out << ": " << Ev->Get()->IndexChanges->DebugString();
+ }
+}
+
}
diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.h b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.h
index 09922121d15..5c20461c402 100644
--- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.h
+++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write_index.h
@@ -16,6 +16,7 @@ public:
bool Execute(TTransactionContext& txc, const TActorContext& ctx) override;
void Complete(const TActorContext& ctx) override;
TTxType GetTxType() const override { return TXTYPE_WRITE_INDEX; }
+ virtual void Describe(IOutputStream& out) const noexcept override;
private:
diff --git a/ydb/core/tx/columnshard/blobs_reader/actor.cpp b/ydb/core/tx/columnshard/blobs_reader/actor.cpp
index 6112a82e263..0ba5d774d2a 100644
--- a/ydb/core/tx/columnshard/blobs_reader/actor.cpp
+++ b/ydb/core/tx/columnshard/blobs_reader/actor.cpp
@@ -15,7 +15,7 @@ void TActor::Handle(NBlobCache::TEvBlobCache::TEvReadBlobRangeResult::TPtr& ev)
bool aborted = false;
if (event.Status != NKikimrProto::EReplyStatus::OK) {
WaitingBlobsCount.Sub(Task->GetWaitingCount());
- if (!Task->AddError(event.BlobRange, IBlobsReadingAction::TErrorStatus::Fail(event.Status, "cannot get blob"))) {
+ if (!Task->AddError(event.BlobRange, IBlobsReadingAction::TErrorStatus::Fail(event.Status, "cannot get blob: " + event.Data.substr(1024)))) {
aborted = true;
}
} else {
diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp
index a842ac0cdf0..577d6112acf 100644
--- a/ydb/core/tx/columnshard/columnshard__write.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write.cpp
@@ -1,6 +1,7 @@
#include "columnshard_impl.h"
#include "blobs_action/transaction/tx_write.h"
#include "blobs_action/transaction/tx_draft.h"
+#include "counters/columnshard.h"
#include "operations/slice_builder.h"
#include "operations/write_data.h"
@@ -21,9 +22,17 @@ void TColumnShard::OverloadWriteFail(const EOverloadStatus overloadReason, const
IncCounter(COUNTER_WRITE_OVERLOAD);
CSCounters.OnOverloadInsertTable(writeData.GetSize());
break;
- case EOverloadStatus::Shard:
+ case EOverloadStatus::ShardTxInFly:
IncCounter(COUNTER_WRITE_OVERLOAD);
- CSCounters.OnOverloadShard(writeData.GetSize());
+ CSCounters.OnOverloadShardTx(writeData.GetSize());
+ break;
+ case EOverloadStatus::ShardWritesInFly:
+ IncCounter(COUNTER_WRITE_OVERLOAD);
+ CSCounters.OnOverloadShardWrites(writeData.GetSize());
+ break;
+ case EOverloadStatus::ShardWritesSizeInFly:
+ IncCounter(COUNTER_WRITE_OVERLOAD);
+ CSCounters.OnOverloadShardWritesSize(writeData.GetSize());
break;
case EOverloadStatus::None:
Y_ABORT("invalid function usage");
@@ -45,8 +54,20 @@ TColumnShard::EOverloadStatus TColumnShard::CheckOverloaded(const ui64 tableId)
return EOverloadStatus::InsertTable;
}
- if (WritesMonitor.ShardOverloaded()) {
- return EOverloadStatus::Shard;
+ ui64 txLimit = Settings.OverloadTxInFlight;
+ ui64 writesLimit = Settings.OverloadWritesInFlight;
+ ui64 writesSizeLimit = Settings.OverloadWritesSizeInFlight;
+ if (txLimit && Executor()->GetStats().TxInFly > txLimit) {
+ AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "shard_overload")("reason", "tx_in_fly")("sum", Executor()->GetStats().TxInFly)("limit", txLimit);
+ return EOverloadStatus::ShardTxInFly;
+ }
+ if (writesLimit && WritesMonitor.GetWritesInFlight() > writesLimit) {
+ AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "shard_overload")("reason", "writes_in_fly")("sum", WritesMonitor.GetWritesInFlight())("limit", writesLimit);
+ return EOverloadStatus::ShardWritesInFly;
+ }
+ if (writesSizeLimit && WritesMonitor.GetWritesSizeInFlight() > writesSizeLimit) {
+ AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "shard_overload")("reason", "writes_size_in_fly")("sum", WritesMonitor.GetWritesSizeInFlight())("limit", writesSizeLimit);
+ return EOverloadStatus::ShardWritesSizeInFly;
}
return EOverloadStatus::None;
}
@@ -57,7 +78,8 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo
auto& putResult = ev->Get()->GetPutResult();
OnYellowChannels(putResult);
NOlap::TWritingBuffer& wBuffer = ev->Get()->MutableWritesBuffer();
- auto& baseAggregations = wBuffer.GetAggregations();
+ auto baseAggregations = wBuffer.GetAggregations();
+ wBuffer.InitReplyReceived(TMonotonic::Now());
auto wg = WritesMonitor.FinishWrite(wBuffer.GetSumSize(), wBuffer.GetAggregations().size());
@@ -70,13 +92,13 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo
auto result = std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeMeta, NKikimrTxColumnShard::EResultStatus::ERROR);
ctx.Send(writeMeta.GetSource(), result.release());
- CSCounters.OnFailedWriteResponse();
+ CSCounters.OnFailedWriteResponse(EWriteFailReason::NoTable);
wBuffer.RemoveData(aggr, StoragesManager->GetInsertOperator());
continue;
}
if (putResult.GetPutStatus() != NKikimrProto::OK) {
- CSCounters.OnWritePutBlobsFail((TMonotonic::Now() - writeMeta.GetWriteStartInstant()).MilliSeconds());
+ CSCounters.OnWritePutBlobsFail(TMonotonic::Now() - writeMeta.GetWriteStartInstant());
IncCounter(COUNTER_WRITE_FAIL);
auto errCode = NKikimrTxColumnShard::EResultStatus::STORAGE_ERROR;
@@ -97,16 +119,17 @@ void TColumnShard::Handle(TEvPrivate::TEvWriteBlobsResult::TPtr& ev, const TActo
auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), operation->GetTxId(), NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, "put data fails");
ctx.Send(writeMeta.GetSource(), result.release());
}
- CSCounters.OnFailedWriteResponse();
+ CSCounters.OnFailedWriteResponse(EWriteFailReason::PutBlob);
wBuffer.RemoveData(aggr, StoragesManager->GetInsertOperator());
} else {
const TMonotonic now = TMonotonic::Now();
- CSCounters.OnWritePutBlobsSuccess((now - writeMeta.GetWriteStartInstant()).MilliSeconds());
- CSCounters.OnWriteMiddle1PutBlobsSuccess((now - writeMeta.GetWriteMiddle1StartInstant()).MilliSeconds());
- CSCounters.OnWriteMiddle2PutBlobsSuccess((now - writeMeta.GetWriteMiddle2StartInstant()).MilliSeconds());
- CSCounters.OnWriteMiddle3PutBlobsSuccess((now - writeMeta.GetWriteMiddle3StartInstant()).MilliSeconds());
- CSCounters.OnWriteMiddle4PutBlobsSuccess((now - writeMeta.GetWriteMiddle4StartInstant()).MilliSeconds());
- CSCounters.OnWriteMiddle5PutBlobsSuccess((now - writeMeta.GetWriteMiddle5StartInstant()).MilliSeconds());
+ CSCounters.OnWritePutBlobsSuccess(now - writeMeta.GetWriteStartInstant());
+ CSCounters.OnWriteMiddle1PutBlobsSuccess(now - writeMeta.GetWriteMiddle1StartInstant());
+ CSCounters.OnWriteMiddle2PutBlobsSuccess(now - writeMeta.GetWriteMiddle2StartInstant());
+ CSCounters.OnWriteMiddle3PutBlobsSuccess(now - writeMeta.GetWriteMiddle3StartInstant());
+ CSCounters.OnWriteMiddle4PutBlobsSuccess(now - writeMeta.GetWriteMiddle4StartInstant());
+ CSCounters.OnWriteMiddle5PutBlobsSuccess(now - writeMeta.GetWriteMiddle5StartInstant());
+ CSCounters.OnWriteMiddle6PutBlobsSuccess(now - writeMeta.GetWriteMiddle6StartInstant());
LOG_S_DEBUG("Write (record) into pathId " << writeMeta.GetTableId()
<< (writeMeta.GetWriteId() ? (" writeId " + ToString(writeMeta.GetWriteId())).c_str() : "") << " at tablet " << TabletID());
@@ -139,18 +162,20 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
IncCounter(signalIndex);
ctx.Send(source, std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeMeta, NKikimrTxColumnShard::EResultStatus::ERROR));
- CSCounters.OnFailedWriteResponse();
return;
};
if (!AppDataVerified().ColumnShardConfig.GetWritingEnabled()) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "skip_writing")("reason", "disabled");
+ CSCounters.OnFailedWriteResponse(EWriteFailReason::Disabled);
return returnFail(COUNTER_WRITE_FAIL);
}
if (!TablesManager.IsReadyForWrite(tableId)) {
LOG_S_NOTICE("Write (fail) into pathId:" << writeMeta.GetTableId() << (TablesManager.HasPrimaryIndex()? "": " no index")
<< " at tablet " << TabletID());
+
+ CSCounters.OnFailedWriteResponse(EWriteFailReason::NoTable);
return returnFail(COUNTER_WRITE_FAIL);
}
@@ -159,6 +184,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
if (!arrowData->ParseFromProto(record)) {
LOG_S_ERROR("Write (fail) " << record.GetData().size() << " bytes into pathId " << writeMeta.GetTableId()
<< " at tablet " << TabletID());
+ CSCounters.OnFailedWriteResponse(EWriteFailReason::IncorrectSchema);
return returnFail(COUNTER_WRITE_FAIL);
}
@@ -167,7 +193,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
if (overloadStatus != EOverloadStatus::None) {
std::unique_ptr<NActors::IEventBase> result = std::make_unique<TEvColumnShard::TEvWriteResult>(TabletID(), writeData.GetWriteMeta(), NKikimrTxColumnShard::EResultStatus::OVERLOADED);
OverloadWriteFail(overloadStatus, writeData, std::move(result), ctx);
- CSCounters.OnFailedWriteResponse();
+ CSCounters.OnFailedWriteResponse(EWriteFailReason::Overload);
} else {
if (ui64 writeId = (ui64)HasLongTxWrite(writeMeta.GetLongTxIdUnsafe(), writeMeta.GetWritePartId())) {
LOG_S_DEBUG("Write (duplicate) into pathId " << writeMeta.GetTableId()
@@ -179,7 +205,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
auto result = std::make_unique<TEvColumnShard::TEvWriteResult>(
TabletID(), writeMeta, writeId, NKikimrTxColumnShard::EResultStatus::SUCCESS);
ctx.Send(writeMeta.GetSource(), result.release());
- CSCounters.OnFailedWriteResponse();
+ CSCounters.OnFailedWriteResponse(EWriteFailReason::LongTxDuplication);
return;
}
diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp
index 8ec5ef62099..86ba233e04f 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.cpp
+++ b/ydb/core/tx/columnshard/columnshard_impl.cpp
@@ -297,6 +297,9 @@ void TColumnShard::TryAbortWrites(NIceDb::TNiceDb& db, NOlap::TDbWrapper& dbTabl
failedAborts.push_back(writeId);
}
}
+ if (failedAborts.size()) {
+ AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "failed_aborts")("count", failedAborts.size())("writes_count", writesToAbort.size());
+ }
for (auto& writeId : failedAborts) {
writesToAbort.erase(writeId);
}
@@ -813,6 +816,7 @@ void TColumnShard::SetupCleanupInsertTable() {
if (!InsertTable->GetAborted().size() && !writeIdsToCleanup.size()) {
return;
}
+ AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "cleanup_started")("aborted", InsertTable->GetAborted().size())("to_cleanup", writeIdsToCleanup.size());
Execute(new TTxInsertTableCleanup(this, std::move(writeIdsToCleanup)), TActorContext::AsActorContext());
}
diff --git a/ydb/core/tx/columnshard/columnshard_impl.h b/ydb/core/tx/columnshard/columnshard_impl.h
index 6a94e4b5167..7382ea645cb 100644
--- a/ydb/core/tx/columnshard/columnshard_impl.h
+++ b/ydb/core/tx/columnshard/columnshard_impl.h
@@ -211,7 +211,9 @@ class TColumnShard
void OnTieringModified();
public:
enum class EOverloadStatus {
- Shard /* "shard" */,
+ ShardTxInFly /* "shard_tx" */,
+ ShardWritesInFly /* "shard_writes" */,
+ ShardWritesSizeInFly /* "shard_writes_size" */,
InsertTable /* "insert_table" */,
Disk /* "disk" */,
None /* "none" */
@@ -326,8 +328,8 @@ private:
class TWritesMonitor {
private:
TColumnShard& Owner;
- ui64 WritesInFlight = 0;
- ui64 WritesSizeInFlight = 0;
+ YDB_READONLY(ui64, WritesInFlight, 0);
+ YDB_READONLY(ui64, WritesSizeInFlight, 0);
public:
class TGuard: public TNonCopyable {
@@ -363,17 +365,8 @@ private:
return TGuard(*this);
}
- bool ShardOverloaded() const {
- ui64 txLimit = Owner.Settings.OverloadTxInFlight;
- ui64 writesLimit = Owner.Settings.OverloadWritesInFlight;
- ui64 writesSizeLimit = Owner.Settings.OverloadWritesSizeInFlight;
- return (txLimit && Owner.Executor()->GetStats().TxInFly > txLimit) ||
- (writesLimit && WritesInFlight > writesLimit) ||
- (writesSizeLimit && WritesSizeInFlight > writesSizeLimit);
- }
-
TString DebugString() const {
- return TStringBuilder() << "TWritesMonitor: inflight " << WritesInFlight << " (" << WritesSizeInFlight << " bytes)";
+ return TStringBuilder() << "{object=write_monitor;count=" << WritesInFlight << ";size=" << WritesSizeInFlight << "}";
}
private:
diff --git a/ydb/core/tx/columnshard/common/limits.cpp b/ydb/core/tx/columnshard/common/limits.cpp
new file mode 100644
index 00000000000..7bd45fa70d3
--- /dev/null
+++ b/ydb/core/tx/columnshard/common/limits.cpp
@@ -0,0 +1,4 @@
+#include "limits.h"
+
+namespace NKikimr::NColumnShard {
+} \ No newline at end of file
diff --git a/ydb/core/tx/columnshard/common/limits.h b/ydb/core/tx/columnshard/common/limits.h
new file mode 100644
index 00000000000..04a5cb55128
--- /dev/null
+++ b/ydb/core/tx/columnshard/common/limits.h
@@ -0,0 +1,9 @@
+#pragma once
+#include <util/system/types.h>
+
+namespace NKikimr::NOlap {
+class TGlobalLimits {
+public:
+ static const inline ui64 TxWriteLimitBytes = 256 * 1024 * 1024;
+};
+} \ No newline at end of file
diff --git a/ydb/core/tx/columnshard/common/snapshot.h b/ydb/core/tx/columnshard/common/snapshot.h
index 942daa24e70..389310cd528 100644
--- a/ydb/core/tx/columnshard/common/snapshot.h
+++ b/ydb/core/tx/columnshard/common/snapshot.h
@@ -1,7 +1,9 @@
#pragma once
+#include <ydb/library/conclusion/status.h>
+
#include <util/stream/output.h>
#include <util/string/cast.h>
-#include <ydb/library/conclusion/status.h>
+#include <util/datetime/base.h>
namespace NKikimrColumnShardProto {
class TSnapshot;
@@ -20,6 +22,10 @@ public:
, TxId(txId) {
}
+ constexpr TInstant GetPlanInstant() const noexcept {
+ return TInstant::MilliSeconds(PlanStep);
+ }
+
constexpr ui64 GetPlanStep() const noexcept {
return PlanStep;
}
diff --git a/ydb/core/tx/columnshard/common/ya.make b/ydb/core/tx/columnshard/common/ya.make
index a1006f15143..a73340ea968 100644
--- a/ydb/core/tx/columnshard/common/ya.make
+++ b/ydb/core/tx/columnshard/common/ya.make
@@ -1,6 +1,7 @@
LIBRARY()
SRCS(
+ limits.h
reverse_accessor.cpp
scalars.cpp
snapshot.cpp
diff --git a/ydb/core/tx/columnshard/counters/columnshard.cpp b/ydb/core/tx/columnshard/counters/columnshard.cpp
index 989deae6b9b..e094fbbcd5a 100644
--- a/ydb/core/tx/columnshard/counters/columnshard.cpp
+++ b/ydb/core/tx/columnshard/counters/columnshard.cpp
@@ -2,6 +2,8 @@
#include <ydb/core/base/appdata.h>
#include <ydb/core/base/counters.h>
+#include <ydb/library/actors/core/log.h>
+
namespace NKikimr::NColumnShard {
TCSCounters::TCSCounters()
@@ -24,8 +26,12 @@ TCSCounters::TCSCounters()
OverloadInsertTableBytes = TBase::GetDeriviative("OverloadInsertTable/Bytes");
OverloadInsertTableCount = TBase::GetDeriviative("OverloadInsertTable/Count");
- OverloadShardBytes = TBase::GetDeriviative("OverloadShard/Bytes");
- OverloadShardCount = TBase::GetDeriviative("OverloadShard/Count");
+ OverloadShardTxBytes = TBase::GetDeriviative("OverloadShard/Tx/Bytes");
+ OverloadShardTxCount = TBase::GetDeriviative("OverloadShard/Tx/Count");
+ OverloadShardWritesBytes = TBase::GetDeriviative("OverloadShard/Writes/Bytes");
+ OverloadShardWritesCount = TBase::GetDeriviative("OverloadShard/Writes/Count");
+ OverloadShardWritesSizeBytes = TBase::GetDeriviative("OverloadShard/WritesSize/Bytes");
+ OverloadShardWritesSizeCount = TBase::GetDeriviative("OverloadShard/WritesSize/Count");
InternalCompactionGranuleBytes = TBase::GetValueAutoAggregationsClient("InternalCompaction/Bytes");
InternalCompactionGranulePortionsCount = TBase::GetValueAutoAggregationsClient("InternalCompaction/PortionsCount");
@@ -39,12 +45,25 @@ TCSCounters::TCSCounters()
HistogramSuccessWriteMiddle3PutBlobsDurationMs = TBase::GetHistogram("SuccessWriteMiddle3PutBlobsDurationMs", NMonitoring::ExponentialHistogram(18, 2, 5));
HistogramSuccessWriteMiddle4PutBlobsDurationMs = TBase::GetHistogram("SuccessWriteMiddle4PutBlobsDurationMs", NMonitoring::ExponentialHistogram(18, 2, 5));
HistogramSuccessWriteMiddle5PutBlobsDurationMs = TBase::GetHistogram("SuccessWriteMiddle5PutBlobsDurationMs", NMonitoring::ExponentialHistogram(18, 2, 5));
+ HistogramSuccessWriteMiddle6PutBlobsDurationMs = TBase::GetHistogram("SuccessWriteMiddle6PutBlobsDurationMs", NMonitoring::ExponentialHistogram(18, 2, 5));
HistogramFailedWritePutBlobsDurationMs = TBase::GetHistogram("FailedWritePutBlobsDurationMs", NMonitoring::ExponentialHistogram(18, 2, 5));
HistogramWriteTxCompleteDurationMs = TBase::GetHistogram("WriteTxCompleteDurationMs", NMonitoring::ExponentialHistogram(18, 2, 5));
WritePutBlobsCount = TBase::GetValue("WritePutBlobs");
WriteRequests = TBase::GetValue("WriteRequests");
- FailedWriteRequests = TBase::GetDeriviative("FailedWriteRequests");
+
+ for (auto&& i : GetEnumAllValues<EWriteFailReason>()) {
+ auto sub = CreateSubGroup("reason", ::ToString(i));
+ FailedWriteRequests.emplace(i, sub.GetDeriviative("FailedWriteRequests"));
+ }
+
SuccessWriteRequests = TBase::GetDeriviative("SuccessWriteRequests");
}
+void TCSCounters::OnFailedWriteResponse(const EWriteFailReason reason) const {
+ WriteRequests->Sub(1);
+ auto it = FailedWriteRequests.find(reason);
+ AFL_VERIFY(it != FailedWriteRequests.end());
+ it->second->Add(1);
+}
+
}
diff --git a/ydb/core/tx/columnshard/counters/columnshard.h b/ydb/core/tx/columnshard/counters/columnshard.h
index ffe935c995c..2441ca6fef3 100644
--- a/ydb/core/tx/columnshard/counters/columnshard.h
+++ b/ydb/core/tx/columnshard/counters/columnshard.h
@@ -1,9 +1,21 @@
#pragma once
-#include <library/cpp/monlib/dynamic_counters/counters.h>
#include "common/owner.h"
+#include <library/cpp/monlib/dynamic_counters/counters.h>
+
+#include <util/generic/hash_set.h>
+
namespace NKikimr::NColumnShard {
+enum class EWriteFailReason {
+ Disabled /* "disabled" */,
+ PutBlob /* "put_blob" */,
+ LongTxDuplication /* "long_tx_duplication" */,
+ NoTable /* "no_table" */,
+ IncorrectSchema /* "incorrect_schema" */,
+ Overload /* "overload" */
+};
+
class TCSCounters: public TCommonCountersOwner {
private:
using TBase = TCommonCountersOwner;
@@ -24,8 +36,12 @@ private:
NMonitoring::TDynamicCounters::TCounterPtr OverloadInsertTableBytes;
NMonitoring::TDynamicCounters::TCounterPtr OverloadInsertTableCount;
- NMonitoring::TDynamicCounters::TCounterPtr OverloadShardBytes;
- NMonitoring::TDynamicCounters::TCounterPtr OverloadShardCount;
+ NMonitoring::TDynamicCounters::TCounterPtr OverloadShardTxBytes;
+ NMonitoring::TDynamicCounters::TCounterPtr OverloadShardTxCount;
+ NMonitoring::TDynamicCounters::TCounterPtr OverloadShardWritesBytes;
+ NMonitoring::TDynamicCounters::TCounterPtr OverloadShardWritesCount;
+ NMonitoring::TDynamicCounters::TCounterPtr OverloadShardWritesSizeBytes;
+ NMonitoring::TDynamicCounters::TCounterPtr OverloadShardWritesSizeCount;
std::shared_ptr<TValueAggregationClient> InternalCompactionGranuleBytes;
std::shared_ptr<TValueAggregationClient> InternalCompactionGranulePortionsCount;
@@ -39,54 +55,56 @@ private:
NMonitoring::THistogramPtr HistogramSuccessWriteMiddle3PutBlobsDurationMs;
NMonitoring::THistogramPtr HistogramSuccessWriteMiddle4PutBlobsDurationMs;
NMonitoring::THistogramPtr HistogramSuccessWriteMiddle5PutBlobsDurationMs;
+ NMonitoring::THistogramPtr HistogramSuccessWriteMiddle6PutBlobsDurationMs;
NMonitoring::THistogramPtr HistogramFailedWritePutBlobsDurationMs;
NMonitoring::THistogramPtr HistogramWriteTxCompleteDurationMs;
NMonitoring::TDynamicCounters::TCounterPtr WritePutBlobsCount;
NMonitoring::TDynamicCounters::TCounterPtr WriteRequests;
- NMonitoring::TDynamicCounters::TCounterPtr FailedWriteRequests;
+ THashMap<EWriteFailReason, NMonitoring::TDynamicCounters::TCounterPtr> FailedWriteRequests;
NMonitoring::TDynamicCounters::TCounterPtr SuccessWriteRequests;
public:
void OnStartWriteRequest() const {
WriteRequests->Add(1);
}
- void OnFailedWriteResponse() const {
- WriteRequests->Sub(1);
- FailedWriteRequests->Add(1);
- }
+ void OnFailedWriteResponse(const EWriteFailReason reason) const;
void OnSuccessWriteResponse() const {
WriteRequests->Sub(1);
SuccessWriteRequests->Add(1);
}
- void OnWritePutBlobsSuccess(const ui32 milliseconds) const {
- HistogramSuccessWritePutBlobsDurationMs->Collect(milliseconds);
+ void OnWritePutBlobsSuccess(const TDuration d) const {
+ HistogramSuccessWritePutBlobsDurationMs->Collect(d.MilliSeconds());
WritePutBlobsCount->Sub(1);
}
- void OnWriteMiddle1PutBlobsSuccess(const ui32 milliseconds) const {
- HistogramSuccessWriteMiddle1PutBlobsDurationMs->Collect(milliseconds);
+ void OnWriteMiddle1PutBlobsSuccess(const TDuration d) const {
+ HistogramSuccessWriteMiddle1PutBlobsDurationMs->Collect(d.MilliSeconds());
+ }
+
+ void OnWriteMiddle2PutBlobsSuccess(const TDuration d) const {
+ HistogramSuccessWriteMiddle2PutBlobsDurationMs->Collect(d.MilliSeconds());
}
- void OnWriteMiddle2PutBlobsSuccess(const ui32 milliseconds) const {
- HistogramSuccessWriteMiddle2PutBlobsDurationMs->Collect(milliseconds);
+ void OnWriteMiddle3PutBlobsSuccess(const TDuration d) const {
+ HistogramSuccessWriteMiddle3PutBlobsDurationMs->Collect(d.MilliSeconds());
}
- void OnWriteMiddle3PutBlobsSuccess(const ui32 milliseconds) const {
- HistogramSuccessWriteMiddle3PutBlobsDurationMs->Collect(milliseconds);
+ void OnWriteMiddle4PutBlobsSuccess(const TDuration d) const {
+ HistogramSuccessWriteMiddle4PutBlobsDurationMs->Collect(d.MilliSeconds());
}
- void OnWriteMiddle4PutBlobsSuccess(const ui32 milliseconds) const {
- HistogramSuccessWriteMiddle4PutBlobsDurationMs->Collect(milliseconds);
+ void OnWriteMiddle5PutBlobsSuccess(const TDuration d) const {
+ HistogramSuccessWriteMiddle5PutBlobsDurationMs->Collect(d.MilliSeconds());
}
- void OnWriteMiddle5PutBlobsSuccess(const ui32 milliseconds) const {
- HistogramSuccessWriteMiddle5PutBlobsDurationMs->Collect(milliseconds);
+ void OnWriteMiddle6PutBlobsSuccess(const TDuration d) const {
+ HistogramSuccessWriteMiddle6PutBlobsDurationMs->Collect(d.MilliSeconds());
}
- void OnWritePutBlobsFail(const ui32 milliseconds) const {
- HistogramFailedWritePutBlobsDurationMs->Collect(milliseconds);
+ void OnWritePutBlobsFail(const TDuration d) const {
+ HistogramFailedWritePutBlobsDurationMs->Collect(d.MilliSeconds());
WritePutBlobsCount->Sub(1);
}
@@ -94,8 +112,8 @@ public:
WritePutBlobsCount->Add(1);
}
- void OnWriteTxComplete(const ui32 milliseconds) const {
- HistogramWriteTxCompleteDurationMs->Collect(milliseconds);
+ void OnWriteTxComplete(const TDuration d) const {
+ HistogramWriteTxCompleteDurationMs->Collect(d.MilliSeconds());
}
void OnInternalCompactionInfo(const ui64 bytes, const ui32 portionsCount) const {
@@ -113,9 +131,19 @@ public:
OverloadInsertTableCount->Add(1);
}
- void OnOverloadShard(const ui64 size) const {
- OverloadShardBytes->Add(size);
- OverloadShardCount->Add(1);
+ void OnOverloadShardTx(const ui64 size) const {
+ OverloadShardTxBytes->Add(size);
+ OverloadShardTxCount->Add(1);
+ }
+
+ void OnOverloadShardWrites(const ui64 size) const {
+ OverloadShardWritesBytes->Add(size);
+ OverloadShardWritesCount->Add(1);
+ }
+
+ void OnOverloadShardWritesSize(const ui64 size) const {
+ OverloadShardWritesSizeBytes->Add(size);
+ OverloadShardWritesSizeCount->Add(1);
}
void SkipIndexationInputDueToSplitCompaction(const ui64 size) const {
diff --git a/ydb/core/tx/columnshard/counters/common/owner.h b/ydb/core/tx/columnshard/counters/common/owner.h
index 1ea4bd513b7..456699c80dc 100644
--- a/ydb/core/tx/columnshard/counters/common/owner.h
+++ b/ydb/core/tx/columnshard/counters/common/owner.h
@@ -50,6 +50,9 @@ public:
NMonitoring::TDynamicCounters::TCounterPtr GetDeriviative(const TString& name) const;
void DeepSubGroup(const TString& id, const TString& value);
void DeepSubGroup(const TString& componentName);
+ TCommonCountersOwner CreateSubGroup(const TString& param, const TString& value) const {
+ return TCommonCountersOwner(*this, param, value);
+ }
NMonitoring::THistogramPtr GetHistogram(const TString& name, NMonitoring::IHistogramCollectorPtr&& hCollector) const;
TCommonCountersOwner(const TString& module, TIntrusivePtr<::NMonitoring::TDynamicCounters> baseSignals = nullptr);
diff --git a/ydb/core/tx/columnshard/counters/ya.make b/ydb/core/tx/columnshard/counters/ya.make
index aed1f68de28..5a0dbc7c325 100644
--- a/ydb/core/tx/columnshard/counters/ya.make
+++ b/ydb/core/tx/columnshard/counters/ya.make
@@ -17,4 +17,6 @@ PEERDIR(
ydb/core/base
)
+GENERATE_ENUM_SERIALIZATION(columnshard.h)
+
END()
diff --git a/ydb/core/tx/columnshard/engines/changes/ttl.cpp b/ydb/core/tx/columnshard/engines/changes/ttl.cpp
index cfba0a99406..9a0ad102e54 100644
--- a/ydb/core/tx/columnshard/engines/changes/ttl.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/ttl.cpp
@@ -9,13 +9,7 @@ namespace NKikimr::NOlap {
void TTTLColumnEngineChanges::DoDebugString(TStringOutput& out) const {
TBase::DoDebugString(out);
- if (PortionsToEvict.size()) {
- out << "eviction=(count=" << PortionsToEvict.size() << ";portions=[";
- for (auto& info : PortionsToEvict) {
- out << info.GetPortionInfo() << ";to=" << info.GetFeatures().TargetTierName << ";";
- }
- out << "];";
- }
+ out << "eviction=" << PortionsToEvict.size() << ";";
}
void TTTLColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) {
diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
index 2ff9533af29..8b47d79ffb8 100644
--- a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
+++ b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp
@@ -6,16 +6,6 @@
namespace NKikimr::NOlap {
-void TChangesWithAppend::DoDebugString(TStringOutput& out) const {
- if (ui32 added = AppendedPortions.size()) {
- out << "portions_count:" << added << ";portions=(";
- for (auto& portionInfo : AppendedPortions) {
- out << portionInfo;
- }
- out << "); ";
- }
-}
-
void TChangesWithAppend::DoWriteIndex(NColumnShard::TColumnShard& self, TWriteIndexContext& /*context*/) {
for (auto& portionInfo : AppendedPortions) {
switch (portionInfo.GetPortionInfo().GetMeta().Produced) {
diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.h b/ydb/core/tx/columnshard/engines/changes/with_appended.h
index a398c61b005..779a3ab8a14 100644
--- a/ydb/core/tx/columnshard/engines/changes/with_appended.h
+++ b/ydb/core/tx/columnshard/engines/changes/with_appended.h
@@ -13,7 +13,6 @@ private:
protected:
TSplitSettings SplitSettings;
TSaverContext SaverContext;
- virtual void DoDebugString(TStringOutput& out) const override;
virtual void DoCompile(TFinalizationContext& context) override;
virtual bool DoApplyChanges(TColumnEngineForLogs& self, TApplyChangesContext& context) override;
virtual void DoWriteIndex(NColumnShard::TColumnShard& self, TWriteIndexContext& context) override;
@@ -24,6 +23,10 @@ protected:
std::vector<TPortionInfoWithBlobs> MakeAppendedPortions(const std::shared_ptr<arrow::RecordBatch> batch, const ui64 granule,
const TSnapshot& snapshot, const TGranuleMeta* granuleMeta, TConstructionContext& context) const;
+ virtual void DoDebugString(TStringOutput& out) const override {
+ out << "remove=" << PortionsToRemove.size() << ";append=" << AppendedPortions.size() << ";";
+ }
+
public:
const TSplitSettings& GetSplitSettings() const {
return SplitSettings;
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
index a0bc0dbeffa..17833bc5a6e 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp
@@ -1,6 +1,7 @@
#include "column_engine_logs.h"
#include "filter.h"
+#include <ydb/core/tx/columnshard/common/limits.h>
#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
#include <ydb/core/formats/arrow/one_batch_input_stream.h>
#include <ydb/core/formats/arrow/merging_sorted_input_stream.h>
@@ -238,7 +239,11 @@ std::shared_ptr<TInsertColumnEngineChanges> TColumnEngineForLogs::StartInsert(st
}
std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCompaction(const TCompactionLimits& limits, const THashSet<TPortionAddress>& busyPortions) noexcept {
- auto granule = GranulesStorage->GetGranuleForCompaction(Tables);
+ THashSet<ui64> busyGranuleIds;
+ for (auto&& i : busyPortions) {
+ busyGranuleIds.emplace(i.GetPathId());
+ }
+ auto granule = GranulesStorage->GetGranuleForCompaction(Tables, busyGranuleIds);
if (!granule) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "no granules for start compaction");
return nullptr;
@@ -253,14 +258,16 @@ std::shared_ptr<TColumnEngineChanges> TColumnEngineForLogs::StartCompaction(cons
}
std::shared_ptr<TCleanupColumnEngineChanges> TColumnEngineForLogs::StartCleanup(const TSnapshot& snapshot,
- THashSet<ui64>& pathsToDrop, ui32 maxRecords) noexcept {
+ THashSet<ui64>& pathsToDrop, ui32 /*maxRecords*/) noexcept {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "StartCleanup")("portions_count", CleanupPortions.size());
auto changes = std::make_shared<TCleanupColumnEngineChanges>(StoragesManager);
- ui32 affectedRecords = 0;
// Add all portions from dropped paths
THashSet<ui64> dropPortions;
THashSet<ui64> emptyPaths;
+ ui64 txSize = 0;
+ const ui64 txSizeLimit = TGlobalLimits::TxWriteLimitBytes / 4;
+ changes->NeedRepeat = false;
for (ui64 pathId : pathsToDrop) {
auto itTable = Tables.find(pathId);
if (itTable == Tables.end()) {
@@ -269,25 +276,21 @@ std::shared_ptr<TCleanupColumnEngineChanges> TColumnEngineForLogs::StartCleanup(
}
for (auto& [portion, info] : itTable->second->GetPortions()) {
- affectedRecords += info->NumChunks();
- changes->PortionsToDrop.push_back(*info);
- dropPortions.insert(portion);
-
- if (affectedRecords > maxRecords) {
+ if (txSize + info->GetTxVolume() < txSizeLimit || changes->PortionsToDrop.empty()) {
+ txSize += info->GetTxVolume();
+ } else {
+ changes->NeedRepeat = true;
break;
}
+ changes->PortionsToDrop.push_back(*info);
+ dropPortions.insert(portion);
}
}
for (ui64 pathId : emptyPaths) {
pathsToDrop.erase(pathId);
}
- if (affectedRecords > maxRecords) {
- AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "StartCleanup")("portions_count", CleanupPortions.size())("portions_prepared", changes->PortionsToDrop.size());
- return changes;
- }
-
- while (CleanupPortions.size() && affectedRecords <= maxRecords) {
+ while (CleanupPortions.size() && !changes->NeedRepeat) {
auto it = CleanupPortions.begin();
if (it->first >= snapshot) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "StartCleanupStop")("snapshot", snapshot.DebugString())("current_snapshot", it->first.DebugString());
@@ -295,12 +298,16 @@ std::shared_ptr<TCleanupColumnEngineChanges> TColumnEngineForLogs::StartCleanup(
}
for (auto&& i : it->second) {
Y_ABORT_UNLESS(i.CheckForCleanup(snapshot));
- affectedRecords += i.NumChunks();
+ if (txSize + i.GetTxVolume() < txSizeLimit || changes->PortionsToDrop.empty()) {
+ txSize += i.GetTxVolume();
+ } else {
+ changes->NeedRepeat = true;
+ break;
+ }
changes->PortionsToDrop.push_back(i);
}
CleanupPortions.erase(it);
}
- changes->NeedRepeat = affectedRecords > maxRecords;
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "StartCleanup")("portions_count", CleanupPortions.size())("portions_prepared", changes->PortionsToDrop.size());
if (changes->PortionsToDrop.empty()) {
@@ -312,7 +319,6 @@ std::shared_ptr<TCleanupColumnEngineChanges> TColumnEngineForLogs::StartCleanup(
TDuration TColumnEngineForLogs::ProcessTiering(const ui64 pathId, const TTiering& ttl, TTieringProcessContext& context) const {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "ProcessTiering")("path_id", pathId)("ttl", ttl.GetDebugString());
- ui64 dropBlobs = 0;
auto& indexInfo = VersionedIndex.GetLastSchema()->GetIndexInfo();
Y_ABORT_UNLESS(context.Changes->Tiering.emplace(pathId, ttl).second);
@@ -330,6 +336,7 @@ TDuration TColumnEngineForLogs::ProcessTiering(const ui64 pathId, const TTiering
auto ttlColumnNames = ttl.GetTtlColumns();
Y_ABORT_UNLESS(ttlColumnNames.size() == 1); // TODO: support different ttl columns
ui32 ttlColumnId = indexInfo.GetColumnId(*ttlColumnNames.begin());
+ const TInstant now = TInstant::Now();
for (auto& [portion, info] : itTable->second->GetPortions()) {
if (info->HasRemoveSnapshot()) {
continue;
@@ -339,8 +346,7 @@ TDuration TColumnEngineForLogs::ProcessTiering(const ui64 pathId, const TTiering
continue;
}
- context.AllowDrop = (dropBlobs <= TCompactionLimits::MAX_BLOBS_TO_DELETE);
- const bool tryEvictPortion = ttl.HasTiers() && context.HasMemoryForEviction();
+ const bool tryEvictPortion = ttl.HasTiers() && context.HasLimitsForEviction();
if (auto max = info->MaxValue(ttlColumnId)) {
bool keep = !expireTimestampOpt;
@@ -356,10 +362,15 @@ TDuration TColumnEngineForLogs::ProcessTiering(const ui64 pathId, const TTiering
}
}
- AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "scalar_less_result")("keep", keep)("tryEvictPortion", tryEvictPortion)("allowDrop", context.AllowDrop);
+ AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "scalar_less_result")("keep", keep)("tryEvictPortion", tryEvictPortion)("allowDrop", context.HasLimitsForTtl());
if (keep && tryEvictPortion) {
const TString currentTierName = info->GetMeta().GetTierName() ? info->GetMeta().GetTierName() : IStoragesManager::DefaultStorageId;
TString tierName = "";
+ const TInstant maxChangePortionInstant = info->RecordSnapshotMax().GetPlanInstant();
+ if (now - maxChangePortionInstant < TDuration::Minutes(60)) {
+ AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "skip_portion_to_evict")("reason", "too_fresh")("delta", now - maxChangePortionInstant);
+ continue;
+ }
for (auto& tierRef : ttl.GetOrderedTiers()) {
auto& tierInfo = tierRef.Get();
if (!indexInfo.AllowTtlOverColumn(tierInfo.GetEvictColumnName())) {
@@ -389,22 +400,22 @@ TDuration TColumnEngineForLogs::ProcessTiering(const ui64 pathId, const TTiering
if (currentTierName != tierName) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "tiering switch detected")("from", currentTierName)("to", tierName);
context.Changes->AddPortionToEvict(*info, TPortionEvictionFeatures(tierName, pathId, StoragesManager->GetOperator(tierName)));
- context.AppPortionForCheckMemoryUsage(*info);
+ context.AppPortionForEvictionChecker(*info);
SignalCounters.OnPortionToEvict(info->BlobsBytes());
}
}
- if (!keep && context.AllowDrop) {
+ if (!keep && context.HasLimitsForTtl()) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "portion_remove")("portion", info->DebugString());
- dropBlobs += info->NumBlobs();
AFL_VERIFY(context.Changes->PortionsToRemove.emplace(info->GetAddress(), *info).second);
SignalCounters.OnPortionToDrop(info->BlobsBytes());
+ context.AppPortionForTtlChecker(*info);
}
} else {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "scalar_less_not_max");
SignalCounters.OnPortionNoBorder(info->BlobsBytes());
}
}
- if (dWaiting > TDuration::MilliSeconds(500) && (!context.HasMemoryForEviction() || !context.AllowDrop)) {
+ if (dWaiting > TDuration::MilliSeconds(500) && (!context.HasLimitsForEviction() || !context.HasLimitsForTtl())) {
dWaiting = TDuration::MilliSeconds(500);
}
Y_ABORT_UNLESS(!!dWaiting);
diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h
index 7cd28ac2be6..c281d9b653f 100644
--- a/ydb/core/tx/columnshard/engines/column_engine_logs.h
+++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h
@@ -3,6 +3,7 @@
#include "defs.h"
#include "column_engine.h"
#include <ydb/core/tx/columnshard/common/scalars.h>
+#include <ydb/core/tx/columnshard/common/limits.h>
#include <ydb/core/tx/columnshard/counters/engine_logs.h>
#include <ydb/core/tx/columnshard/columnshard_ttl.h>
#include "scheme/tier_info.h"
@@ -87,21 +88,29 @@ private:
private:
const ui64 MemoryUsageLimit;
ui64 MemoryUsage = 0;
+ ui64 TxWriteVolume = 0;
std::shared_ptr<TColumnEngineChanges::IMemoryPredictor> MemoryPredictor;
public:
- bool AllowEviction = true;
- bool AllowDrop = true;
const TInstant Now;
std::shared_ptr<TTTLColumnEngineChanges> Changes;
std::map<ui64, TDuration> DurationsForced;
const THashSet<TPortionAddress>& BusyPortions;
- void AppPortionForCheckMemoryUsage(const TPortionInfo& info) {
+ void AppPortionForEvictionChecker(const TPortionInfo& info) {
MemoryUsage = MemoryPredictor->AddPortion(info);
+ TxWriteVolume += info.GetTxVolume();
}
- bool HasMemoryForEviction() const {
- return MemoryUsage < MemoryUsageLimit;
+ void AppPortionForTtlChecker(const TPortionInfo& info) {
+ TxWriteVolume += info.GetTxVolume();
+ }
+
+ bool HasLimitsForEviction() const {
+ return MemoryUsage < MemoryUsageLimit && TxWriteVolume < TGlobalLimits::TxWriteLimitBytes;
+ }
+
+ bool HasLimitsForTtl() const {
+ return TxWriteVolume < TGlobalLimits::TxWriteLimitBytes;
}
TTieringProcessContext(const ui64 memoryUsageLimit, std::shared_ptr<TTTLColumnEngineChanges> changes,
diff --git a/ydb/core/tx/columnshard/engines/portions/meta.cpp b/ydb/core/tx/columnshard/engines/portions/meta.cpp
index 9f6a2c30823..fde6d7135c3 100644
--- a/ydb/core/tx/columnshard/engines/portions/meta.cpp
+++ b/ydb/core/tx/columnshard/engines/portions/meta.cpp
@@ -69,7 +69,7 @@ bool TPortionMeta::DeserializeFromProto(const NKikimrTxColumnShard::TIndexPortio
}
std::optional<NKikimrTxColumnShard::TIndexPortionMeta> TPortionMeta::SerializeToProto(const ui32 columnId, const ui32 chunk) const {
- if (columnId != FirstPkColumn || chunk != 0) {
+ if (!IsChunkWithPortionInfo(columnId, chunk)) {
return {};
}
diff --git a/ydb/core/tx/columnshard/engines/portions/meta.h b/ydb/core/tx/columnshard/engines/portions/meta.h
index 7acafc89a77..005021f7c45 100644
--- a/ydb/core/tx/columnshard/engines/portions/meta.h
+++ b/ydb/core/tx/columnshard/engines/portions/meta.h
@@ -26,6 +26,10 @@ public:
EProduced Produced{EProduced::UNSPECIFIED};
ui32 FirstPkColumn = 0;
+ bool IsChunkWithPortionInfo(const ui32 columnId, const ui32 chunkIdx) const {
+ return columnId == FirstPkColumn && chunkIdx == 0;
+ }
+
bool DeserializeFromProto(const NKikimrTxColumnShard::TIndexPortionMeta& portionMeta, const TIndexInfo& indexInfo);
std::optional<NKikimrTxColumnShard::TIndexPortionMeta> SerializeToProto(const ui32 columnId, const ui32 chunk) const;
diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
index 3deb26a0a73..23b21564ab8 100644
--- a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
+++ b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp
@@ -306,6 +306,10 @@ std::vector<NKikimr::NOlap::TPortionInfo::TPage> TPortionInfo::BuildPages() cons
return pages;
}
+ui64 TPortionInfo::GetTxVolume() const {
+ return 1024 + Records.size() * 256 + Indexes.size() * 256;
+}
+
std::shared_ptr<arrow::ChunkedArray> TPortionInfo::TPreparedColumn::Assemble() const {
Y_ABORT_UNLESS(!Blobs.empty());
diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.h b/ydb/core/tx/columnshard/engines/portions/portion_info.h
index 01850d31951..d5a052ae52c 100644
--- a/ydb/core/tx/columnshard/engines/portions/portion_info.h
+++ b/ydb/core/tx/columnshard/engines/portions/portion_info.h
@@ -30,6 +30,8 @@ private:
YDB_READONLY_DEF(std::vector<TIndexChunk>, Indexes);
public:
+ ui64 GetTxVolume() const; // fake-correct method for determ volume on rewrite this portion in transaction progress
+
class TPage {
private:
YDB_READONLY_DEF(std::vector<const TColumnRecord*>, Records);
diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.cpp
index 96923fed2a2..7f38c7dfc00 100644
--- a/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/fetching.cpp
@@ -3,6 +3,8 @@
#include <ydb/core/tx/columnshard/engines/filter.h>
#include <ydb/core/formats/arrow/simple_arrays_cache.h>
+#include <ydb/library/yql/minikql/mkql_terminator.h>
+
namespace NKikimr::NOlap::NPlainReader {
bool TStepAction::DoApply(IDataReader& /*owner*/) const {
@@ -14,6 +16,7 @@ bool TStepAction::DoApply(IDataReader& /*owner*/) const {
}
bool TStepAction::DoExecute() {
+ NMiniKQL::TThrowingBindTerminator bind;
while (Step) {
if (Source->IsEmptyData()) {
Source->Finalize();
@@ -58,6 +61,9 @@ bool TAssemblerStep::DoExecuteInplace(const std::shared_ptr<IDataSource>& source
}
bool TFilterProgramStep::DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const std::shared_ptr<IFetchingStep>& /*step*/) const {
+ AFL_VERIFY(source);
+ AFL_VERIFY(Step);
+ AFL_VERIFY(source->GetStageData().GetTable());
auto filter = Step->BuildFilter(source->GetStageData().GetTable());
source->MutableStageData().AddFilter(filter);
return true;
diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/optimizer.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/optimizer.h
index b941510a4c7..9ce196f5a79 100644
--- a/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/optimizer.h
+++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lbuckets/optimizer.h
@@ -1,11 +1,12 @@
#pragma once
#include "counters.h"
-#include <ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h>
-#include <ydb/core/tx/columnshard/engines/changes/abstract/abstract.h>
-#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
#include <ydb/core/tx/columnshard/blobs_action/abstract/storages_manager.h>
+#include <ydb/core/tx/columnshard/common/limits.h>
#include <ydb/core/tx/columnshard/engines/changes/general_compaction.h>
+#include <ydb/core/tx/columnshard/engines/changes/abstract/abstract.h>
+#include <ydb/core/tx/columnshard/engines/portions/portion_info.h>
+#include <ydb/core/tx/columnshard/engines/storage/optimizer/abstract/optimizer.h>
#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
#include <ydb/library/accessor/accessor.h>
@@ -361,8 +362,12 @@ public:
std::vector<std::shared_ptr<TPortionInfo>> result;
std::shared_ptr<NCompaction::TGeneralCompactColumnEngineChanges::IMemoryPredictor> predictor = NCompaction::TGeneralCompactColumnEngineChanges::BuildMemoryPredictor();
+ ui64 txSizeLimit = 0;
for (auto&& i : sorted) {
result.emplace_back(i);
+ if (txSizeLimit + i->GetTxVolume() > TGlobalLimits::TxWriteLimitBytes / 2) {
+ break;
+ }
if (predictor->AddPortion(*i) > sizeLimit && result.size() > 1) {
break;
}
diff --git a/ydb/core/tx/columnshard/engines/storage/storage.cpp b/ydb/core/tx/columnshard/engines/storage/storage.cpp
index 7e6f7e81b53..c51c2bd6e16 100644
--- a/ydb/core/tx/columnshard/engines/storage/storage.cpp
+++ b/ydb/core/tx/columnshard/engines/storage/storage.cpp
@@ -9,12 +9,15 @@ void TGranulesStorage::UpdateGranuleInfo(const TGranuleMeta& granule) {
}
}
-std::shared_ptr<NKikimr::NOlap::TGranuleMeta> TGranulesStorage::GetGranuleForCompaction(const THashMap<ui64, std::shared_ptr<TGranuleMeta>>& granules) const {
+std::shared_ptr<NKikimr::NOlap::TGranuleMeta> TGranulesStorage::GetGranuleForCompaction(const THashMap<ui64, std::shared_ptr<TGranuleMeta>>& granules, const THashSet<ui64>& busyGranuleIds) const {
const TInstant now = TInstant::Now();
std::optional<NStorageOptimizer::TOptimizationPriority> priority;
std::shared_ptr<TGranuleMeta> granule;
for (auto&& i : granules) {
i.second->ActualizeOptimizer(now);
+ if (busyGranuleIds.contains(i.first)) {
+ continue;
+ }
if (!priority || *priority < i.second->GetCompactionPriority()) {
priority = i.second->GetCompactionPriority();
granule = i.second;
diff --git a/ydb/core/tx/columnshard/engines/storage/storage.h b/ydb/core/tx/columnshard/engines/storage/storage.h
index 1fa7b524c88..7ddba8009f6 100644
--- a/ydb/core/tx/columnshard/engines/storage/storage.h
+++ b/ydb/core/tx/columnshard/engines/storage/storage.h
@@ -62,7 +62,7 @@ public:
return TModificationGuard(*this);
}
- std::shared_ptr<TGranuleMeta> GetGranuleForCompaction(const THashMap<ui64, std::shared_ptr<TGranuleMeta>>& granules) const;
+ std::shared_ptr<TGranuleMeta> GetGranuleForCompaction(const THashMap<ui64, std::shared_ptr<TGranuleMeta>>& granules, const THashSet<ui64>& busyGranuleIds) const;
void UpdateGranuleInfo(const TGranuleMeta& granule);
diff --git a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp
index 318e4889fe0..7b858a3e698 100644
--- a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp
+++ b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp
@@ -35,14 +35,45 @@ void TWideSerializedBatch::InitBlobId(const TUnifiedBlobId& id) {
void TWritingBuffer::InitReadyInstant(const TMonotonic instant) {
for (auto&& aggr : Aggregations) {
- aggr->GetWriteData()->MutableWriteMeta().SetWriteMiddle4StartInstant(instant);
+ aggr->GetWriteData()->MutableWriteMeta().SetWriteMiddle5StartInstant(instant);
}
}
void TWritingBuffer::InitStartSending(const TMonotonic instant) {
for (auto&& aggr : Aggregations) {
- aggr->GetWriteData()->MutableWriteMeta().SetWriteMiddle5StartInstant(instant);
+ aggr->GetWriteData()->MutableWriteMeta().SetWriteMiddle4StartInstant(instant);
+ }
+}
+
+void TWritingBuffer::InitReplyReceived(const TMonotonic instant) {
+ for (auto&& aggr : Aggregations) {
+ aggr->GetWriteData()->MutableWriteMeta().SetWriteMiddle6StartInstant(instant);
+ }
+}
+
+std::vector<NKikimr::NOlap::TWritingBlob> TWritingBuffer::GroupIntoBlobs() {
+ std::vector<TWritingBlob> result;
+ TWritingBlob currentBlob;
+ ui64 sumSize = 0;
+ for (auto&& aggr : Aggregations) {
+ for (auto&& bInfo : aggr->MutableSplittedBlobs()) {
+ if (!currentBlob.AddData(bInfo)) {
+ result.emplace_back(std::move(currentBlob));
+ currentBlob = TWritingBlob();
+ AFL_VERIFY(currentBlob.AddData(bInfo));
+ }
+ sumSize += bInfo.GetSplittedBlobs().GetSize();
+ }
+ }
+ if (currentBlob.GetSize()) {
+ result.emplace_back(std::move(currentBlob));
+ }
+ if (result.size()) {
+ if (sumSize / result.size() < 4 * 1024 * 1024 && result.size() != 1) {
+ AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "error_splitting")("size", sumSize)("count", result.size());
+ }
}
+ return result;
}
}
diff --git a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h
index 7f977a17159..392d76fd10c 100644
--- a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h
+++ b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h
@@ -160,24 +160,9 @@ public:
void InitReadyInstant(const TMonotonic instant);
void InitStartSending(const TMonotonic instant);
+ void InitReplyReceived(const TMonotonic instant);
- std::vector<TWritingBlob> GroupIntoBlobs() {
- std::vector<TWritingBlob> result;
- TWritingBlob currentBlob;
- for (auto&& aggr : Aggregations) {
- for (auto&& bInfo : aggr->MutableSplittedBlobs()) {
- if (!currentBlob.AddData(bInfo)) {
- result.emplace_back(std::move(currentBlob));
- currentBlob = TWritingBlob();
- AFL_VERIFY(currentBlob.AddData(bInfo));
- }
- }
- }
- if (currentBlob.GetSize()) {
- result.emplace_back(std::move(currentBlob));
- }
- return result;
- }
+ std::vector<TWritingBlob> GroupIntoBlobs();
};
class TIndexedWriteController : public NColumnShard::IWriteController, public NColumnShard::TMonitoringObjectsCounter<TIndexedWriteController, true> {
diff --git a/ydb/core/tx/columnshard/engines/writer/write_controller.h b/ydb/core/tx/columnshard/engines/writer/write_controller.h
index 554423c8886..f6d030528b2 100644
--- a/ydb/core/tx/columnshard/engines/writer/write_controller.h
+++ b/ydb/core/tx/columnshard/engines/writer/write_controller.h
@@ -47,6 +47,19 @@ protected:
return WriteTasks.back();
}
public:
+ TString DebugString() const {
+ TStringBuilder sb;
+ for (auto&& i : WritingActions) {
+ sb << i.second->GetStorageId() << ",";
+ }
+ ui64 size = 0;
+ for (auto&& i : WriteTasks) {
+ size += i.GetBlobId().BlobSize();
+ }
+
+ return TStringBuilder() << "size=" << size << ";count=" << WriteTasks.size() << ";actions=" << sb << ";";
+ }
+
void Abort() {
for (auto&& i : WritingActions) {
i.second->Abort();
diff --git a/ydb/core/tx/data_events/write_data.h b/ydb/core/tx/data_events/write_data.h
index 739d9b3e89c..d25c65e65c1 100644
--- a/ydb/core/tx/data_events/write_data.h
+++ b/ydb/core/tx/data_events/write_data.h
@@ -5,6 +5,7 @@
#include <ydb/library/accessor/accessor.h>
#include <ydb/library/actors/core/monotonic.h>
+#include <util/generic/guid.h>
namespace NKikimr::NOlap {
class IBlobsWritingAction;
@@ -31,12 +32,14 @@ class TWriteMeta {
YDB_ACCESSOR(ui64, WritePartId, 0);
YDB_ACCESSOR_DEF(TString, DedupId);
+ YDB_READONLY(TString, Id, TGUID::CreateTimebased().AsUuidString());
YDB_READONLY(TMonotonic, WriteStartInstant, TMonotonic::Now());
YDB_ACCESSOR(TMonotonic, WriteMiddle1StartInstant, TMonotonic::Now());
YDB_ACCESSOR(TMonotonic, WriteMiddle2StartInstant, TMonotonic::Now());
YDB_ACCESSOR(TMonotonic, WriteMiddle3StartInstant, TMonotonic::Now());
YDB_ACCESSOR(TMonotonic, WriteMiddle4StartInstant, TMonotonic::Now());
YDB_ACCESSOR(TMonotonic, WriteMiddle5StartInstant, TMonotonic::Now());
+ YDB_ACCESSOR(TMonotonic, WriteMiddle6StartInstant, TMonotonic::Now());
public:
TWriteMeta(const ui64 writeId, const ui64 tableId, const NActors::TActorId& source)
: WriteId(writeId)