aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ydb/core/formats/arrow/arrow_filter.cpp57
-rw-r--r--ydb/core/formats/arrow/arrow_filter.h5
-rw-r--r--ydb/core/formats/arrow/program/collection.h3
-rw-r--r--ydb/core/formats/arrow/ut/ut_column_filter.cpp21
-rw-r--r--ydb/core/tx/columnshard/columnshard_private_events.h2
-rw-r--r--ydb/core/tx/columnshard/counters/scan.h11
-rw-r--r--ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h5
-rw-r--r--ydb/core/tx/columnshard/engines/reader/common/description.h6
-rw-r--r--ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.cpp1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/events.cpp16
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/events.h35
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/manager.cpp17
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/manager.h33
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/ya.make12
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp29
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.h26
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.cpp34
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.h25
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.h2
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.h19
-rw-r--r--ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/ya.make1
-rw-r--r--ydb/core/tx/columnshard/engines/reader/transaction/tx_internal_scan.cpp1
22 files changed, 342 insertions, 19 deletions
diff --git a/ydb/core/formats/arrow/arrow_filter.cpp b/ydb/core/formats/arrow/arrow_filter.cpp
index 7be4883bea8..f7544701c2c 100644
--- a/ydb/core/formats/arrow/arrow_filter.cpp
+++ b/ydb/core/formats/arrow/arrow_filter.cpp
@@ -297,6 +297,9 @@ const std::vector<bool>& TColumnFilter::BuildSimpleFilter() const {
class TMergePolicyAnd {
private:
public:
+ static bool HasValue(const bool /*a*/, const bool /*b*/) {
+ return true;
+ }
static bool Calc(const bool a, const bool b) {
return a && b;
}
@@ -307,11 +310,17 @@ public:
return TColumnFilter::BuildDenyFilter();
}
}
+ static TColumnFilter MergeWithSimple(const bool simpleValue, const TColumnFilter& filter) {
+ return MergeWithSimple(filter, simpleValue);
+ }
};
class TMergePolicyOr {
private:
public:
+ static bool HasValue(const bool /*a*/, const bool /*b*/) {
+ return true;
+ }
static bool Calc(const bool a, const bool b) {
return a || b;
}
@@ -322,6 +331,43 @@ public:
return filter;
}
}
+ static TColumnFilter MergeWithSimple(const bool simpleValue, const TColumnFilter& filter) {
+ return MergeWithSimple(filter, simpleValue);
+ }
+};
+
+class TMergePolicyApplyFilter {
+private:
+public:
+ static bool HasValue(const bool /*a*/, const bool b) {
+ return b;
+ }
+ static bool Calc(const bool a, const bool /*b*/) {
+ return a;
+ }
+ static TColumnFilter MergeWithSimple(const TColumnFilter& filter, const bool simpleValue) {
+ if (simpleValue) {
+ return filter;
+ } else {
+ return TColumnFilter::BuildDenyFilter();
+ }
+ }
+ static TColumnFilter MergeWithSimple(const bool simpleValue, const TColumnFilter& filter) {
+ const auto count = filter.GetRecordsCount();
+ if (simpleValue) {
+ TColumnFilter result = TColumnFilter::BuildAllowFilter();
+ if (count) {
+ result.Add(true, *count);
+ }
+ return result;
+ } else {
+ TColumnFilter result = TColumnFilter::BuildDenyFilter();
+ if (count) {
+ result.Add(false, *count);
+ }
+ return result;
+ }
+ }
};
class TColumnFilter::TMergerImpl {
@@ -340,7 +386,7 @@ public:
if (Filter1.empty() && Filter2.empty()) {
return TColumnFilter(TMergePolicy::Calc(Filter1.DefaultFilterValue, Filter2.DefaultFilterValue));
} else if (Filter1.empty()) {
- return TMergePolicy::MergeWithSimple(Filter2, Filter1.DefaultFilterValue);
+ return TMergePolicy::MergeWithSimple(Filter1.DefaultFilterValue, Filter2);
} else if (Filter2.empty()) {
return TMergePolicy::MergeWithSimple(Filter1, Filter2.DefaultFilterValue);
} else {
@@ -359,7 +405,7 @@ public:
while (it1 != Filter1.Filter.cend() && it2 != Filter2.Filter.cend()) {
const ui32 delta = TColumnFilter::CrossSize(pos2, pos2 + *it2, pos1, pos1 + *it1);
- if (delta) {
+ if (delta && TMergePolicy::HasValue(curValue1, curValue2)) {
if (!count || curCurrent != TMergePolicy::Calc(curValue1, curValue2)) {
resultFilter.emplace_back(delta);
curCurrent = TMergePolicy::Calc(curValue1, curValue2);
@@ -405,10 +451,15 @@ TColumnFilter TColumnFilter::Or(const TColumnFilter& extFilter) const {
return TMergerImpl(*this, extFilter).Merge<TMergePolicyOr>();
}
+TColumnFilter TColumnFilter::ApplyFilterFrom(const TColumnFilter& extFilter) const {
+ ResetCaches();
+ return TMergerImpl(*this, extFilter).Merge<TMergePolicyApplyFilter>();
+}
+
TColumnFilter TColumnFilter::CombineSequentialAnd(const TColumnFilter& extFilter) const {
ResetCaches();
if (Filter.empty()) {
- return TMergePolicyAnd::MergeWithSimple(extFilter, DefaultFilterValue);
+ return TMergePolicyAnd::MergeWithSimple(DefaultFilterValue, extFilter);
} else if (extFilter.Filter.empty()) {
return TMergePolicyAnd::MergeWithSimple(*this, extFilter.DefaultFilterValue);
} else {
diff --git a/ydb/core/formats/arrow/arrow_filter.h b/ydb/core/formats/arrow/arrow_filter.h
index 9a135f5961a..4fae0884223 100644
--- a/ydb/core/formats/arrow/arrow_filter.h
+++ b/ydb/core/formats/arrow/arrow_filter.h
@@ -263,8 +263,9 @@ public:
return TColumnFilter(false);
}
- TColumnFilter And(const TColumnFilter& extFilter) const Y_WARN_UNUSED_RESULT;
- TColumnFilter Or(const TColumnFilter& extFilter) const Y_WARN_UNUSED_RESULT;
+ [[nodiscard]] TColumnFilter And(const TColumnFilter& extFilter) const;
+ [[nodiscard]] TColumnFilter Or(const TColumnFilter& extFilter) const;
+ [[nodiscard]] TColumnFilter ApplyFilterFrom(const TColumnFilter& filter) const;
class TApplyContext {
private:
diff --git a/ydb/core/formats/arrow/program/collection.h b/ydb/core/formats/arrow/program/collection.h
index 5d1f6a5b299..ccd62cc6be6 100644
--- a/ydb/core/formats/arrow/program/collection.h
+++ b/ydb/core/formats/arrow/program/collection.h
@@ -453,6 +453,9 @@ public:
}
void AddFilter(const TColumnFilter& filter) {
+ if (filter.IsTotalAllowFilter()) {
+ return;
+ }
if (!UseFilter) {
*Filter = Filter->And(filter);
} else {
diff --git a/ydb/core/formats/arrow/ut/ut_column_filter.cpp b/ydb/core/formats/arrow/ut/ut_column_filter.cpp
index 5994e8c45ce..11acbe16261 100644
--- a/ydb/core/formats/arrow/ut/ut_column_filter.cpp
+++ b/ydb/core/formats/arrow/ut/ut_column_filter.cpp
@@ -28,6 +28,27 @@ Y_UNIT_TEST_SUITE(ColumnFilter) {
UNIT_ASSERT_VALUES_EQUAL(JoinSeq(",", resultVec), "1,1,1,0,1,0,0");
}
+ Y_UNIT_TEST(ApplyFilterToFilter) {
+ TColumnFilter filter1({ true, true, true, false, true, true, false });
+ TColumnFilter filter2({ false, true, true, true, true, false, false });
+
+ {
+ auto result = filter1.ApplyFilterFrom(filter2);
+
+ UNIT_ASSERT_VALUES_EQUAL(result.GetRecordsCountVerified(), 4);
+ auto resultVec = result.BuildSimpleFilter();
+ UNIT_ASSERT_VALUES_EQUAL(JoinSeq(",", resultVec), "1,1,0,1");
+ }
+
+ {
+ auto result = filter2.ApplyFilterFrom(filter1);
+
+ UNIT_ASSERT_VALUES_EQUAL(result.GetRecordsCountVerified(), 5);
+ auto resultVec = result.BuildSimpleFilter();
+ UNIT_ASSERT_VALUES_EQUAL(JoinSeq(",", resultVec), "0,1,1,1,0");
+ }
+ }
+
Y_UNIT_TEST(FilterSlice) {
TColumnFilter filter({ true, true, true, false, true, true, false });
{
diff --git a/ydb/core/tx/columnshard/columnshard_private_events.h b/ydb/core/tx/columnshard/columnshard_private_events.h
index be8b01f88be..779cfe83e53 100644
--- a/ydb/core/tx/columnshard/columnshard_private_events.h
+++ b/ydb/core/tx/columnshard/columnshard_private_events.h
@@ -69,6 +69,8 @@ struct TEvPrivate {
EvRemovePortionDataAccessor,
EvMetadataAccessorsInfo,
+ EvRequestFilter,
+
EvEnd
};
diff --git a/ydb/core/tx/columnshard/counters/scan.h b/ydb/core/tx/columnshard/counters/scan.h
index 9c5313f240f..f1cc245931f 100644
--- a/ydb/core/tx/columnshard/counters/scan.h
+++ b/ydb/core/tx/columnshard/counters/scan.h
@@ -354,6 +354,7 @@ private:
std::shared_ptr<TAtomicCounter> ResourcesAllocationTasksCount = std::make_shared<TAtomicCounter>();
std::shared_ptr<TAtomicCounter> ResultsForSourceCount = std::make_shared<TAtomicCounter>();
std::shared_ptr<TAtomicCounter> ResultsForReplyGuard = std::make_shared<TAtomicCounter>();
+ std::shared_ptr<TAtomicCounter> FilterFetchingGuard = std::make_shared<TAtomicCounter>();
std::shared_ptr<TAtomicCounter> TotalExecutionDurationUs = std::make_shared<TAtomicCounter>();
THashMap<ui32, std::shared_ptr<TAtomicCounter>> SkipNodesCount;
THashMap<ui32, std::shared_ptr<TAtomicCounter>> ExecuteNodesCount;
@@ -393,7 +394,8 @@ public:
<< "ReadTasksCount:" << ReadTasksCount->Val() << ";"
<< "ResourcesAllocationTasksCount:" << ResourcesAllocationTasksCount->Val() << ";"
<< "ResultsForSourceCount:" << ResultsForSourceCount->Val() << ";"
- << "ResultsForReplyGuard:" << ResultsForReplyGuard->Val() << ";";
+ << "ResultsForReplyGuard:" << ResultsForReplyGuard->Val() << ";"
+ << "FilterFetchingGuard:" << FilterFetchingGuard->Val() << ";";
}
TCounterGuard GetResultsForReplyGuard() const {
@@ -428,9 +430,14 @@ public:
return TCounterGuard(AssembleTasksCount);
}
+ TCounterGuard GetFilterFetchingGuard() const {
+ return TCounterGuard(FilterFetchingGuard);
+ }
+
bool InWaiting() const {
return MergeTasksCount->Val() || AssembleTasksCount->Val() || ReadTasksCount->Val() || ResourcesAllocationTasksCount->Val() ||
- FetchAccessorsCount->Val() || ResultsForSourceCount->Val() || FetchBlobsCount->Val() || ResultsForReplyGuard->Val();
+ FetchAccessorsCount->Val() || ResultsForSourceCount->Val() || FetchBlobsCount->Val() || ResultsForReplyGuard->Val() ||
+ FilterFetchingGuard->Val();
}
const THashMap<ui32, std::shared_ptr<TAtomicCounter>>& GetSkipStats() const {
diff --git a/ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h b/ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h
index aac729f0a4a..b369cec4737 100644
--- a/ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h
+++ b/ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h
@@ -58,6 +58,7 @@ protected:
std::shared_ptr<ISnapshotSchema> ResultIndexSchema;
ui64 TxId = 0;
std::optional<ui64> LockId;
+ EDeduplicationPolicy DeduplicationPolicy = EDeduplicationPolicy::ALLOW_DUPLICATES;
public:
using TConstPtr = std::shared_ptr<const TReadMetadataBase>;
@@ -107,6 +108,10 @@ public:
return LockId;
}
+ EDeduplicationPolicy GetDeduplicationPolicy() const {
+ return DeduplicationPolicy;
+ }
+
void OnReadFinished(NColumnShard::TColumnShard& owner) const {
DoOnReadFinished(owner);
}
diff --git a/ydb/core/tx/columnshard/engines/reader/common/description.h b/ydb/core/tx/columnshard/engines/reader/common/description.h
index 0572c4e8fc4..8c1edddea79 100644
--- a/ydb/core/tx/columnshard/engines/reader/common/description.h
+++ b/ydb/core/tx/columnshard/engines/reader/common/description.h
@@ -13,6 +13,11 @@ enum class ERequestSorting {
DESC /* "descending" */,
};
+enum class EDeduplicationPolicy {
+ ALLOW_DUPLICATES = 0,
+ PREVENT_DUPLICATES,
+};
+
// Describes read/scan request
struct TReadDescription {
private:
@@ -35,6 +40,7 @@ public:
// operations with potentially different columns. We have to remove columns to support -Inf (Null) and +Inf.
std::shared_ptr<NOlap::TPKRangesFilter> PKRangesFilter;
NYql::NDqProto::EDqStatsMode StatsMode = NYql::NDqProto::EDqStatsMode::DQ_STATS_MODE_NONE;
+ EDeduplicationPolicy DeduplicationPolicy = EDeduplicationPolicy::ALLOW_DUPLICATES;
// List of columns
std::vector<ui32> ColumnIds;
diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.cpp b/ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.cpp
index 1e3999e2ede..934a603e4df 100644
--- a/ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.cpp
@@ -41,6 +41,7 @@ TConclusionStatus TReadMetadata::Init(
}
StatsMode = readDescription.StatsMode;
+ DeduplicationPolicy = readDescription.DeduplicationPolicy;
return TConclusionStatus::Success();
}
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/events.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/events.cpp
new file mode 100644
index 00000000000..602e4aa26bb
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/events.cpp
@@ -0,0 +1,16 @@
+#include "events.h"
+
+#include <ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.h>
+
+namespace NKikimr::NOlap::NReader::NSimple::NDuplicateFiltering {
+
+TEvRequestFilter::TEvRequestFilter(const IDataSource& source, const std::shared_ptr<IFilterSubscriber>& subscriber)
+ : MinPK(source.GetMinPK())
+ , MaxPK(source.GetMaxPK())
+ , SourceId(source.GetSourceId())
+ , RecordsCount(source.GetRecordsCount())
+ , MaxVersion(source.GetContext()->GetCommonContext()->GetReadMetadata()->GetRequestSnapshot())
+ , Subscriber(subscriber) {
+}
+
+} // namespace NKikimr::NOlap::NReader::NSimple::NDuplicateFiltering
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/events.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/events.h
new file mode 100644
index 00000000000..6a386fdde4f
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/events.h
@@ -0,0 +1,35 @@
+#pragma once
+
+#include <ydb/core/formats/arrow/arrow_filter.h>
+#include <ydb/core/tx/columnshard/columnshard_private_events.h>
+#include <ydb/core/tx/columnshard/engines/reader/common_reader/iterator/source.h>
+
+#include <ydb/library/actors/core/event_local.h>
+
+namespace NKikimr::NOlap::NReader::NSimple {
+class IDataSource;
+}
+
+namespace NKikimr::NOlap::NReader::NSimple::NDuplicateFiltering {
+
+class IFilterSubscriber {
+public:
+ virtual void OnFilterReady(NArrow::TColumnFilter&&) = 0;
+ virtual void OnFailure(const TString& reason) = 0;
+ virtual ~IFilterSubscriber() = default;
+};
+
+class TEvRequestFilter: public NActors::TEventLocal<TEvRequestFilter, NColumnShard::TEvPrivate::EvRequestFilter> {
+private:
+ NArrow::TSimpleRow MinPK;
+ NArrow::TSimpleRow MaxPK;
+ YDB_READONLY_DEF(ui64, SourceId);
+ YDB_READONLY_DEF(ui64, RecordsCount);
+ TSnapshot MaxVersion;
+ YDB_READONLY_DEF(std::shared_ptr<IFilterSubscriber>, Subscriber);
+
+public:
+ TEvRequestFilter(const IDataSource& source, const std::shared_ptr<IFilterSubscriber>& subscriber);
+};
+
+} // namespace NKikimr::NOlap::NReader::NSimple::NDuplicateFiltering
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/manager.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/manager.cpp
new file mode 100644
index 00000000000..6a856b5b802
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/manager.cpp
@@ -0,0 +1,17 @@
+#include "manager.h"
+
+#include <ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.h>
+
+namespace NKikimr::NOlap::NReader::NSimple::NDuplicateFiltering {
+
+TDuplicateManager::TDuplicateManager(const TSpecialReadContext& /*context*/)
+ : TActor(&TDuplicateManager::StateMain) {
+}
+
+void TDuplicateManager::Handle(const TEvRequestFilter::TPtr& ev) {
+ NArrow::TColumnFilter filter = NArrow::TColumnFilter::BuildAllowFilter();
+ filter.Add(true, ev->Get()->GetRecordsCount());
+ ev->Get()->GetSubscriber()->OnFilterReady(std::move(filter));
+}
+
+} // namespace NKikimr::NOlap::NReader::NSimple::NDuplicateFiltering
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/manager.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/manager.h
new file mode 100644
index 00000000000..9b710f35925
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/manager.h
@@ -0,0 +1,33 @@
+#pragma once
+
+#include "events.h"
+
+#include <ydb/library/actors/core/actor_bootstrapped.h>
+
+namespace NKikimr::NOlap::NReader::NSimple {
+class TSpecialReadContext;
+}
+
+namespace NKikimr::NOlap::NReader::NSimple::NDuplicateFiltering {
+
+class TDuplicateManager: public NActors::TActor<TDuplicateManager> {
+private:
+ STATEFN(StateMain) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvRequestFilter, Handle);
+ hFunc(NActors::TEvents::TEvPoison, Handle);
+ default:
+ AFL_VERIFY(false)("unexpected_event", ev->GetTypeName());
+ }
+ }
+
+ void Handle(const TEvRequestFilter::TPtr&);
+ void Handle(const NActors::TEvents::TEvPoison::TPtr&) {
+ PassAway();
+ }
+
+public:
+ TDuplicateManager(const TSpecialReadContext& context);
+};
+
+} // namespace NKikimr::NOlap::NReader::NSimple::NDuplicateFiltering
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/ya.make b/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/ya.make
new file mode 100644
index 00000000000..3e972005540
--- /dev/null
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/ya.make
@@ -0,0 +1,12 @@
+LIBRARY()
+
+SRCS(
+ manager.cpp
+ events.cpp
+)
+
+PEERDIR(
+ ydb/core/tx/columnshard/engines/reader/common_reader/iterator
+)
+
+END()
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp
index 59f3601bf55..59c560c9e98 100644
--- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.cpp
@@ -2,6 +2,7 @@
#include "source.h"
#include <ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetch_steps.h>
+#include <ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/manager.h>
#include <ydb/core/tx/limiter/grouped_memory/usage/service.h>
namespace NKikimr::NOlap::NReader::NSimple {
@@ -42,14 +43,15 @@ std::shared_ptr<TFetchingScript> TSpecialReadContext::DoGetColumnsFetchingPlan(c
needShardingFilter = true;
}
}
+ const bool preventDuplicates = GetReadMetadata()->GetDeduplicationPolicy() == EDeduplicationPolicy::PREVENT_DUPLICATES;
{
auto& result = CacheFetchingScripts[needSnapshots ? 1 : 0][partialUsageByPK ? 1 : 0][useIndexes ? 1 : 0][needShardingFilter ? 1 : 0]
- [hasDeletions ? 1 : 0];
+ [hasDeletions ? 1 : 0][preventDuplicates ? 1 : 0];
if (result.NeedInitialization()) {
TGuard<TMutex> g(Mutex);
if (auto gInit = result.StartInitialization()) {
gInit->InitializationFinished(
- BuildColumnsFetchingPlan(needSnapshots, partialUsageByPK, useIndexes, needShardingFilter, hasDeletions));
+ BuildColumnsFetchingPlan(needSnapshots, partialUsageByPK, useIndexes, needShardingFilter, hasDeletions, preventDuplicates));
}
AFL_VERIFY(!result.NeedInitialization());
}
@@ -58,7 +60,7 @@ std::shared_ptr<TFetchingScript> TSpecialReadContext::DoGetColumnsFetchingPlan(c
}
std::shared_ptr<TFetchingScript> TSpecialReadContext::BuildColumnsFetchingPlan(const bool needSnapshots, const bool partialUsageByPredicateExt,
- const bool /*useIndexes*/, const bool needFilterSharding, const bool needFilterDeletion) const {
+ const bool /*useIndexes*/, const bool needFilterSharding, const bool needFilterDeletion, const bool preventDuplicates) const {
const bool partialUsageByPredicate = partialUsageByPredicateExt && GetPredicateColumns()->GetColumnsCount();
NCommon::TFetchingScriptBuilder acc(*this);
@@ -91,6 +93,9 @@ std::shared_ptr<TFetchingScript> TSpecialReadContext::BuildColumnsFetchingPlan(c
acc.AddAssembleStep(*GetSpecColumns(), "SPEC", NArrow::NSSA::IMemoryCalculationPolicy::EStage::Filter, false);
acc.AddStep(std::make_shared<TSnapshotFilter>());
}
+ if (preventDuplicates) {
+ acc.AddStep(std::make_shared<TDuplicateFilter>());
+ }
const auto& chainProgram = GetReadMetadata()->GetProgram().GetChainVerified();
acc.AddStep(std::make_shared<NCommon::TProgramStep>(chainProgram));
}
@@ -99,8 +104,18 @@ std::shared_ptr<TFetchingScript> TSpecialReadContext::BuildColumnsFetchingPlan(c
return std::move(acc).Build();
}
-TSpecialReadContext::TSpecialReadContext(const std::shared_ptr<TReadContext>& commonContext)
- : TBase(commonContext) {
+void TSpecialReadContext::RegisterActors() {
+ AFL_VERIFY(!DuplicatesManager);
+ if (GetReadMetadata()->GetDeduplicationPolicy() == EDeduplicationPolicy::PREVENT_DUPLICATES) {
+ DuplicatesManager = NActors::TActivationContext::Register(new NDuplicateFiltering::TDuplicateManager(*this));
+ }
+}
+
+void TSpecialReadContext::UnregisterActors() {
+ if (DuplicatesManager) {
+ NActors::TActivationContext::AsActorContext().Send(DuplicatesManager, new NActors::TEvents::TEvPoison);
+ DuplicatesManager = TActorId();
+ }
}
TString TSpecialReadContext::ProfileDebugString() const {
@@ -109,8 +124,8 @@ TString TSpecialReadContext::ProfileDebugString() const {
return (val & (1 << pos)) ? 1 : 0;
};
- for (ui32 i = 0; i < (1 << 5); ++i) {
- auto& script = CacheFetchingScripts[GetBit(i, 0)][GetBit(i, 1)][GetBit(i, 2)][GetBit(i, 3)][GetBit(i, 4)];
+ for (ui32 i = 0; i < (1 << 6); ++i) {
+ auto& script = CacheFetchingScripts[GetBit(i, 0)][GetBit(i, 1)][GetBit(i, 2)][GetBit(i, 3)][GetBit(i, 4)][GetBit(i, 5)];
if (script.HasScript()) {
sb << script.ProfileDebugString() << ";";
}
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.h
index c1859a3062d..cb9b07b9834 100644
--- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.h
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/context.h
@@ -11,18 +11,22 @@
namespace NKikimr::NOlap::NReader::NSimple {
class IDataSource;
+class TSourceConstructor;
using TColumnsSet = NCommon::TColumnsSet;
using TColumnsSetIds = NCommon::TColumnsSetIds;
using EMemType = NCommon::EMemType;
using TFetchingScript = NCommon::TFetchingScript;
-class TSpecialReadContext: public NCommon::TSpecialReadContext {
+class TSpecialReadContext: public NCommon::TSpecialReadContext, TNonCopyable {
private:
using TBase = NCommon::TSpecialReadContext;
+ TActorId DuplicatesManager = TActorId();
+
+private:
std::shared_ptr<TFetchingScript> BuildColumnsFetchingPlan(const bool needSnapshots, const bool partialUsageByPredicateExt,
- const bool useIndexes, const bool needFilterSharding, const bool needFilterDeletion) const;
+ const bool useIndexes, const bool needFilterSharding, const bool needFilterDeletion, const bool needFilterDuplicates) const;
TMutex Mutex;
- std::array<std::array<std::array<std::array<std::array<NCommon::TFetchingScriptOwner, 2>, 2>, 2>, 2>, 2> CacheFetchingScripts;
+ std::array<std::array<std::array<std::array<std::array<std::array<NCommon::TFetchingScriptOwner, 2>, 2>, 2>, 2>, 2>, 2> CacheFetchingScripts;
std::shared_ptr<TFetchingScript> AskAccumulatorsScript;
virtual std::shared_ptr<TFetchingScript> DoGetColumnsFetchingPlan(const std::shared_ptr<NCommon::IDataSource>& source) override;
@@ -30,7 +34,21 @@ private:
public:
virtual TString ProfileDebugString() const override;
- TSpecialReadContext(const std::shared_ptr<TReadContext>& commonContext);
+ void RegisterActors();
+ void UnregisterActors();
+
+ const TActorId& GetDuplicatesManagerVerified() const {
+ AFL_VERIFY(DuplicatesManager);
+ return DuplicatesManager;
+ }
+
+ TSpecialReadContext(const std::shared_ptr<TReadContext>& commonContext)
+ : TBase(commonContext) {
+ }
+
+ ~TSpecialReadContext() {
+ AFL_VERIFY(!DuplicatesManager);
+ }
};
} // namespace NKikimr::NOlap::NReader::NSimple
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.cpp
index 23447a08d49..3db14fb1c9a 100644
--- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.cpp
@@ -3,6 +3,7 @@
#include "source.h"
#include <ydb/core/tx/columnshard/engines/filter.h>
+#include <ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/events.h>
#include <ydb/core/tx/conveyor/usage/service.h>
#include <ydb/core/tx/limiter/grouped_memory/usage/service.h>
@@ -203,4 +204,37 @@ TConclusion<bool> TPrepareResultStep::DoExecuteInplace(const std::shared_ptr<IDa
}
}
+void TDuplicateFilter::TFilterSubscriber::OnFilterReady(NArrow::TColumnFilter&& filter) {
+ if (auto source = Source.lock()) {
+ AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "fetch_filter")("source", source->GetSourceId())("filter", filter.DebugString());
+ if (source->GetContext()->IsAborted()) {
+ return;
+ }
+ if (const std::shared_ptr<NArrow::TColumnFilter> appliedFilter = source->GetStageData().GetAppliedFilter()) {
+ filter = filter.ApplyFilterFrom(*appliedFilter);
+ }
+ source->MutableStageData().AddFilter(std::move(filter));
+ Step.Next();
+ auto task = std::make_shared<TStepAction>(source, std::move(Step), source->GetContext()->GetCommonContext()->GetScanActorId(), false);
+ NConveyor::TScanServiceOperator::SendTaskToExecute(task, source->GetContext()->GetCommonContext()->GetConveyorProcessId());
+ }
+}
+
+void TDuplicateFilter::TFilterSubscriber::OnFailure(const TString& reason) {
+ if (auto source = Source.lock()) {
+ source->GetContext()->GetCommonContext()->AbortWithError("cannot build duplicate filter: " + reason);
+ }
+}
+
+TDuplicateFilter::TFilterSubscriber::TFilterSubscriber(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step)
+ : Source(source)
+ , Step(step)
+ , TaskGuard(source->GetContext()->GetCommonContext()->GetCounters().GetFilterFetchingGuard()) {
+}
+
+TConclusion<bool> TDuplicateFilter::DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const {
+ source->StartFetchingDuplicateFilter(std::make_shared<TFilterSubscriber>(source, step));
+ return false;
+}
+
} // namespace NKikimr::NOlap::NReader::NSimple
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.h
index d1dcc822b03..3b9da2b28e0 100644
--- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.h
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/fetching.h
@@ -4,6 +4,7 @@
#include <ydb/core/tx/columnshard/engines/reader/common/conveyor_task.h>
#include <ydb/core/tx/columnshard/engines/reader/common_reader/iterator/columns_set.h>
#include <ydb/core/tx/columnshard/engines/reader/common_reader/iterator/fetching.h>
+#include <ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates/events.h>
#include <ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h>
#include <ydb/core/tx/columnshard/engines/scheme/index_info.h>
#include <ydb/core/tx/limiter/grouped_memory/usage/abstract.h>
@@ -212,4 +213,28 @@ public:
}
};
+class TDuplicateFilter: public IFetchingStep {
+private:
+ using TBase = IFetchingStep;
+
+ class TFilterSubscriber: public NDuplicateFiltering::IFilterSubscriber {
+ private:
+ std::weak_ptr<IDataSource> Source;
+ TFetchingScriptCursor Step;
+ NColumnShard::TCounterGuard TaskGuard;
+
+ virtual void OnFilterReady(NArrow::TColumnFilter&& filter) override;
+ virtual void OnFailure(const TString& reason) override;
+
+ public:
+ TFilterSubscriber(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step);
+ };
+
+public:
+ virtual TConclusion<bool> DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const override;
+ TDuplicateFilter()
+ : TBase("DUPLICATE") {
+ }
+};
+
} // namespace NKikimr::NOlap::NReader::NSimple
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.h
index 19c8e14aced..5a64e9250e9 100644
--- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.h
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/plain_read_data.h
@@ -19,6 +19,7 @@ private:
protected:
virtual TConclusionStatus DoStart() override {
+ SpecialReadContext->RegisterActors();
return Scanner->Start();
}
@@ -69,6 +70,7 @@ public:
if (SpecialReadContext->IsActive()) {
Abort("unexpected on destructor");
}
+ SpecialReadContext->UnregisterActors();
}
};
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.h b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.h
index 0146875d97a..d2a0d456433 100644
--- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.h
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.h
@@ -192,7 +192,7 @@ public:
const TReplaceKeyAdapter& GetStart() const {
return Start;
}
- const TReplaceKeyAdapter GetFinish() const {
+ const TReplaceKeyAdapter& GetFinish() const {
return Finish;
}
@@ -250,6 +250,12 @@ public:
return DoStartFetchingAccessor(sourcePtr, step);
}
+ void StartFetchingDuplicateFilter(std::shared_ptr<NDuplicateFiltering::IFilterSubscriber>&& subscriber) {
+ NActors::TActivationContext::AsActorContext().Send(
+ std::static_pointer_cast<TSpecialReadContext>(GetContext())->GetDuplicatesManagerVerified(),
+ new NDuplicateFiltering::TEvRequestFilter(*this, std::move(subscriber)));
+ }
+
virtual TInternalPathId GetPathId() const = 0;
virtual bool HasIndexes(const std::set<ui32>& indexIds) const = 0;
@@ -260,6 +266,9 @@ public:
virtual ui64 GetIndexRawBytes(const std::set<ui32>& indexIds) const = 0;
+ virtual NArrow::TSimpleRow GetMinPK() const = 0;
+ virtual NArrow::TSimpleRow GetMaxPK() const = 0;
+
void Abort() {
DoAbort();
}
@@ -430,6 +439,14 @@ public:
return GetStageData().GetPortionAccessor().GetIndexRawBytes(indexIds, false);
}
+ virtual NArrow::TSimpleRow GetMinPK() const override {
+ return Portion->GetMeta().IndexKeyStart();
+ }
+
+ virtual NArrow::TSimpleRow GetMaxPK() const override {
+ return Portion->GetMeta().IndexKeyEnd();
+ }
+
const TPortionInfo& GetPortionInfo() const {
return *Portion;
}
diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/ya.make b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/ya.make
index baff60941f2..37599539e47 100644
--- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/ya.make
+++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/ya.make
@@ -16,6 +16,7 @@ PEERDIR(
ydb/core/tx/columnshard/engines/reader/common_reader/iterator
ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/collections
ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/sync_points
+ ydb/core/tx/columnshard/engines/reader/simple_reader/duplicates
ydb/core/tx/conveyor/usage
ydb/core/tx/limiter/grouped_memory/usage
)
diff --git a/ydb/core/tx/columnshard/engines/reader/transaction/tx_internal_scan.cpp b/ydb/core/tx/columnshard/engines/reader/transaction/tx_internal_scan.cpp
index 14c0cb9ef08..863407ef326 100644
--- a/ydb/core/tx/columnshard/engines/reader/transaction/tx_internal_scan.cpp
+++ b/ydb/core/tx/columnshard/engines/reader/transaction/tx_internal_scan.cpp
@@ -49,6 +49,7 @@ void TTxInternalScan::Complete(const TActorContext& ctx) {
read.PathId = request.GetPathId();
read.LockId = LockId;
read.ReadNothing = !Self->TablesManager.HasTable(read.PathId);
+ read.DeduplicationPolicy = EDeduplicationPolicy::PREVENT_DUPLICATES;
std::unique_ptr<IScannerConstructor> scannerConstructor(new NPlain::TIndexScannerConstructor(context));
read.ColumnIds = request.GetColumnIds();
if (request.RangesFilter) {