aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov333 <ivanmorozov@ydb.tech>2024-09-29 07:11:51 +0300
committerGitHub <noreply@github.com>2024-09-29 07:11:51 +0300
commite8a4121de1a15f1baef5c2ef6300817b6abcf44c (patch)
tree1e15d1a4f1e0056b396c3ad6fae579b92e02c7c2
parent06c6c6616b5a7ed6174bfe16d24228b741e4242e (diff)
downloadydb-e8a4121de1a15f1baef5c2ef6300817b6abcf44c.tar.gz
accessors actualization (#9857)
-rw-r--r--ydb/core/formats/arrow/accessor/abstract/constructor.h32
-rw-r--r--ydb/core/formats/arrow/accessor/plain/constructor.cpp11
-rw-r--r--ydb/core/formats/arrow/accessor/plain/constructor.h7
-rw-r--r--ydb/core/formats/arrow/accessor/sparsed/accessor.h5
-rw-r--r--ydb/core/formats/arrow/accessor/sparsed/constructor.cpp6
-rw-r--r--ydb/core/formats/arrow/accessor/sparsed/constructor.h7
-rw-r--r--ydb/core/formats/arrow/save_load/loader.cpp21
-rw-r--r--ydb/core/formats/arrow/save_load/loader.h13
-rw-r--r--ydb/core/kqp/ut/olap/sparsed_ut.cpp55
-rw-r--r--ydb/core/tx/columnshard/engines/scheme/column/info.cpp8
10 files changed, 144 insertions, 21 deletions
diff --git a/ydb/core/formats/arrow/accessor/abstract/constructor.h b/ydb/core/formats/arrow/accessor/abstract/constructor.h
index aa99260e097..8cbef70a2a4 100644
--- a/ydb/core/formats/arrow/accessor/abstract/constructor.h
+++ b/ydb/core/formats/arrow/accessor/abstract/constructor.h
@@ -25,10 +25,23 @@ private:
virtual TString DoDebugString() const {
return "";
}
+ virtual bool DoIsEqualWithSameTypeTo(const IConstructor& item) const = 0;
+ virtual std::shared_ptr<arrow::RecordBatch> DoConstruct(
+ const std::shared_ptr<IChunkedArray>& columnData, const TChunkConstructionData& externalInfo) const = 0;
public:
virtual ~IConstructor() = default;
+ std::shared_ptr<arrow::RecordBatch> Construct(
+ const std::shared_ptr<IChunkedArray>& columnData, const TChunkConstructionData& externalInfo) const {
+ AFL_VERIFY(columnData);
+ return DoConstruct(columnData, externalInfo);
+ }
+
+ bool IsEqualWithSameTypeTo(const IConstructor& item) const {
+ return DoIsEqualWithSameTypeTo(item);
+ }
+
TString DebugString() const {
return TStringBuilder() << GetClassName() << ":" << DoDebugString();
}
@@ -65,10 +78,27 @@ public:
class TConstructorContainer: public NBackgroundTasks::TInterfaceProtoContainer<IConstructor> {
private:
using TBase = NBackgroundTasks::TInterfaceProtoContainer<IConstructor>;
-
public:
using TBase::TBase;
+ bool IsEqualTo(const TConstructorContainer& item) const {
+ if (!GetObjectPtr() && !item.GetObjectPtr()) {
+ return true;
+ } else if (!!GetObjectPtr() && !!item.GetObjectPtr()) {
+ if (GetObjectPtr()->GetClassName() != item.GetObjectPtr()->GetClassName()) {
+ return false;
+ }
+ return GetObjectPtr()->IsEqualWithSameTypeTo(*item.GetObjectPtr());
+ } else {
+ return false;
+ }
+ }
+
+ std::shared_ptr<arrow::RecordBatch> Construct(const std::shared_ptr<IChunkedArray>& batch, const TChunkConstructionData& externalInfo) const {
+ AFL_VERIFY(!!GetObjectPtr());
+ return GetObjectPtr()->Construct(batch, externalInfo);
+ }
+
static TConstructorContainer GetDefaultConstructor();
};
diff --git a/ydb/core/formats/arrow/accessor/plain/constructor.cpp b/ydb/core/formats/arrow/accessor/plain/constructor.cpp
index 3ecf41502b3..2a2b4657596 100644
--- a/ydb/core/formats/arrow/accessor/plain/constructor.cpp
+++ b/ydb/core/formats/arrow/accessor/plain/constructor.cpp
@@ -2,8 +2,11 @@
#include "constructor.h"
#include <ydb/library/formats/arrow/accessor/abstract/accessor.h>
+#include <ydb/library/formats/arrow/arrow_helpers.h>
#include <ydb/library/formats/arrow/simple_arrays_cache.h>
+
#include <contrib/libs/apache/arrow/cpp/src/arrow/record_batch.h>
+#include <contrib/libs/apache/arrow/cpp/src/arrow/table.h>
namespace NKikimr::NArrow::NAccessor::NPlain {
@@ -30,4 +33,12 @@ std::shared_ptr<arrow::Schema> TConstructor::DoGetExpectedSchema(const std::shar
return std::make_shared<arrow::Schema>(arrow::FieldVector({ resultColumn }));
}
+std::shared_ptr<arrow::RecordBatch> TConstructor::DoConstruct(
+ const std::shared_ptr<IChunkedArray>& columnData, const TChunkConstructionData& externalInfo) const {
+ auto chunked = columnData->GetChunkedArray();
+ auto schema = std::make_shared<arrow::Schema>(arrow::FieldVector({ std::make_shared<arrow::Field>("val", externalInfo.GetColumnType()) }));
+ auto table = arrow::Table::Make(schema, { chunked }, columnData->GetRecordsCount());
+ return NArrow::ToBatch(table, true);
+}
+
} // namespace NKikimr::NArrow::NAccessor::NPlain
diff --git a/ydb/core/formats/arrow/accessor/plain/constructor.h b/ydb/core/formats/arrow/accessor/plain/constructor.h
index 57c366689eb..243c1e47dec 100644
--- a/ydb/core/formats/arrow/accessor/plain/constructor.h
+++ b/ydb/core/formats/arrow/accessor/plain/constructor.h
@@ -12,6 +12,13 @@ public:
private:
static inline auto Registrator = TFactory::TRegistrator<TConstructor>(GetClassNameStatic());
+
+ virtual bool DoIsEqualWithSameTypeTo(const IConstructor& /*item*/) const override {
+ return true;
+ }
+
+ virtual std::shared_ptr<arrow::RecordBatch> DoConstruct(
+ const std::shared_ptr<IChunkedArray>& columnData, const TChunkConstructionData& externalInfo) const override;
virtual TConclusion<std::shared_ptr<NArrow::NAccessor::IChunkedArray>> DoConstruct(
const std::shared_ptr<arrow::RecordBatch>& originalData, const TChunkConstructionData& externalInfo) const override;
virtual NKikimrArrowAccessorProto::TConstructor DoSerializeToProto() const override;
diff --git a/ydb/core/formats/arrow/accessor/sparsed/accessor.h b/ydb/core/formats/arrow/accessor/sparsed/accessor.h
index 04022496223..07a1a2465ce 100644
--- a/ydb/core/formats/arrow/accessor/sparsed/accessor.h
+++ b/ydb/core/formats/arrow/accessor/sparsed/accessor.h
@@ -153,6 +153,11 @@ public:
return chunk.GetScalar(index - chunk.GetStartPosition());
}
+ std::shared_ptr<arrow::RecordBatch> GetRecordBatchVerified() const {
+ AFL_VERIFY(Records.size() == 1)("size", Records.size());
+ return Records.front().GetRecords();
+ }
+
const TSparsedArrayChunk& GetSparsedChunk(const ui64 position) const {
const auto pred = [](const ui64 position, const TSparsedArrayChunk& item) {
return position < item.GetStartPosition();
diff --git a/ydb/core/formats/arrow/accessor/sparsed/constructor.cpp b/ydb/core/formats/arrow/accessor/sparsed/constructor.cpp
index e3f45cd7532..2962636c367 100644
--- a/ydb/core/formats/arrow/accessor/sparsed/constructor.cpp
+++ b/ydb/core/formats/arrow/accessor/sparsed/constructor.cpp
@@ -31,4 +31,10 @@ bool TConstructor::DoDeserializeFromProto(const NKikimrArrowAccessorProto::TCons
return true;
}
+std::shared_ptr<arrow::RecordBatch> TConstructor::DoConstruct(
+ const std::shared_ptr<IChunkedArray>& columnData, const TChunkConstructionData& externalInfo) const {
+ NArrow::NAccessor::TSparsedArray sparsed(*columnData, externalInfo.GetDefaultValue());
+ return sparsed.GetRecordBatchVerified();
+}
+
} // namespace NKikimr::NArrow::NAccessor::NSparsed
diff --git a/ydb/core/formats/arrow/accessor/sparsed/constructor.h b/ydb/core/formats/arrow/accessor/sparsed/constructor.h
index 0ccf5efdd70..a85b8918eaf 100644
--- a/ydb/core/formats/arrow/accessor/sparsed/constructor.h
+++ b/ydb/core/formats/arrow/accessor/sparsed/constructor.h
@@ -12,6 +12,13 @@ public:
private:
static inline auto Registrator = TFactory::TRegistrator<TConstructor>(GetClassNameStatic());
+
+ virtual bool DoIsEqualWithSameTypeTo(const IConstructor& /*item*/) const override {
+ return true;
+ }
+ virtual std::shared_ptr<arrow::RecordBatch> DoConstruct(
+ const std::shared_ptr<IChunkedArray>& columnData, const TChunkConstructionData& externalInfo) const override;
+
virtual TConclusion<std::shared_ptr<NArrow::NAccessor::IChunkedArray>> DoConstruct(
const std::shared_ptr<arrow::RecordBatch>& originalData, const TChunkConstructionData& externalInfo) const override;
virtual NKikimrArrowAccessorProto::TConstructor DoSerializeToProto() const override;
diff --git a/ydb/core/formats/arrow/save_load/loader.cpp b/ydb/core/formats/arrow/save_load/loader.cpp
index c9328f751d4..33685100bd1 100644
--- a/ydb/core/formats/arrow/save_load/loader.cpp
+++ b/ydb/core/formats/arrow/save_load/loader.cpp
@@ -51,9 +51,13 @@ std::shared_ptr<arrow::RecordBatch> TColumnLoader::ApplyRawVerified(const TStrin
return TStatusValidator::GetValid(Apply(data));
}
+TChunkConstructionData TColumnLoader::BuildAccessorContext(const ui32 recordsCount) const {
+ return TChunkConstructionData(recordsCount, DefaultValue, ResultField->type());
+}
+
std::shared_ptr<IChunkedArray> TColumnLoader::ApplyVerified(const TString& dataStr, const ui32 recordsCount) const {
auto data = TStatusValidator::GetValid(Apply(dataStr));
- return BuildAccessor(data, TChunkConstructionData(recordsCount, DefaultValue, ResultField->type()));
+ return BuildAccessor(data, BuildAccessorContext(recordsCount));
}
std::shared_ptr<IChunkedArray> TColumnLoader::BuildAccessor(
@@ -65,4 +69,19 @@ std::shared_ptr<NKikimr::NArrow::NAccessor::IChunkedArray> TColumnLoader::BuildD
return AccessorConstructor->ConstructDefault(TChunkConstructionData(recordsCount, DefaultValue, ResultField->type())).DetachResult();
}
+bool TColumnLoader::IsEqualTo(const TColumnLoader& item) const {
+ if (!!Transformer != !!item.Transformer) {
+ return false;
+ } else if (!!Transformer && !Transformer->IsEqualTo(*item.Transformer)) {
+ return false;
+ }
+ if (!Serializer.IsEqualTo(item.Serializer)) {
+ return false;
+ }
+ if (!AccessorConstructor.IsEqualTo(item.AccessorConstructor)) {
+ return false;
+ }
+ return true;
+}
+
} // namespace NKikimr::NArrow::NAccessor
diff --git a/ydb/core/formats/arrow/save_load/loader.h b/ydb/core/formats/arrow/save_load/loader.h
index 2d3119ac3fa..eb5a2740ef9 100644
--- a/ydb/core/formats/arrow/save_load/loader.h
+++ b/ydb/core/formats/arrow/save_load/loader.h
@@ -25,17 +25,7 @@ private:
public:
std::shared_ptr<IChunkedArray> BuildDefaultAccessor(const ui32 recordsCount) const;
- bool IsEqualTo(const TColumnLoader& item) const {
- if (!!Transformer != !!item.Transformer) {
- return false;
- } else if (!!Transformer && !Transformer->IsEqualTo(*item.Transformer)) {
- return false;
- }
- if (!Serializer.IsEqualTo(item.Serializer)) {
- return false;
- }
- return true;
- }
+ bool IsEqualTo(const TColumnLoader& item) const;
TString DebugString() const;
@@ -49,6 +39,7 @@ public:
const std::shared_ptr<arrow::Field>& GetField() const;
+ TChunkConstructionData BuildAccessorContext(const ui32 recordsCount) const;
std::shared_ptr<IChunkedArray> ApplyVerified(const TString& data, const ui32 expectedRecordsCount) const;
std::shared_ptr<arrow::RecordBatch> ApplyRawVerified(const TString& data) const;
};
diff --git a/ydb/core/kqp/ut/olap/sparsed_ut.cpp b/ydb/core/kqp/ut/olap/sparsed_ut.cpp
index 73b75f2cc53..bdd895e8e24 100644
--- a/ydb/core/kqp/ut/olap/sparsed_ut.cpp
+++ b/ydb/core/kqp/ut/olap/sparsed_ut.cpp
@@ -8,6 +8,7 @@
#include <ydb/core/tx/columnshard/hooks/testing/controller.h>
#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/core/wrappers/fake_storage.h>
+#include <ydb/core/tx/columnshard/blobs_action/common/const.h>
namespace NKikimr::NKqp {
@@ -21,13 +22,12 @@ Y_UNIT_TEST_SUITE(KqpOlapSparsed) {
const TString StoreName;
ui32 MultiColumnRepCount = 100;
static const ui32 SKIP_GROUPS = 7;
- const TVector<TString> FIELD_NAMES{"utf", "int", "uint", "float", "double"};
+ const TVector<TString> FIELD_NAMES{ "utf", "int", "uint", "float", "double" };
public:
TSparsedDataTest(const TString& storeName)
: Kikimr(Settings)
, CSController(NKikimr::NYDBTest::TControllers::RegisterCSControllerGuard<NKikimr::NYDBTest::NColumnShard::TController>())
- , StoreName(storeName)
- {
+ , StoreName(storeName) {
}
@@ -79,7 +79,7 @@ Y_UNIT_TEST_SUITE(KqpOlapSparsed) {
Fill(&counts[0], &counts[FIELD_NAMES.size() * groupsCount], 0);
- for (auto& row: rows) {
+ for (auto& row : rows) {
auto incCounts = [&](ui32 i, const TString& column) {
if (*NYdb::TValueParser(row.at(column)).GetOptionalBool()) {
counts[i]++;
@@ -94,7 +94,7 @@ Y_UNIT_TEST_SUITE(KqpOlapSparsed) {
incCounts(ind++, "def_float" + grStr);
incCounts(ind++, "def_double" + grStr);
}
- }
+ }
}
void CheckAllFieldsTable(bool firstCall, ui32 countExpectation, ui32* defCountStart) {
@@ -169,7 +169,7 @@ Y_UNIT_TEST_SUITE(KqpOlapSparsed) {
NArrow::NConstruction::TStringPoolFiller sPool(1000, 52, "abcde", frq);
helper.FillTable(sPool, shiftKff, 10000);
},
- [&](bool firstCall) {
+ [&](bool firstCall) {
CheckTable("field", "'abcde'", firstCall, countExpectation, defCountStart);
});
}
@@ -181,7 +181,7 @@ Y_UNIT_TEST_SUITE(KqpOlapSparsed) {
TTypedLocalHelper helper("Utf8", Kikimr);
helper.FillMultiColumnTable(MultiColumnRepCount, shiftKff, 10000);
},
- [&](bool firstCall) {
+ [&](bool firstCall) {
CheckAllFieldsTable(firstCall, countExpectation, defCountStart);
});
}
@@ -302,6 +302,47 @@ Y_UNIT_TEST_SUITE(KqpOlapSparsed) {
TSparsedDataTest test("");
test.Execute();
}
+
+ Y_UNIT_TEST(AccessorActualization) {
+ auto settings = TKikimrSettings().SetWithSampleTables(false);
+ TKikimrRunner kikimr(settings);
+
+ auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();
+ csController->SetOverridePeriodicWakeupActivationPeriod(TDuration::Seconds(1));
+ csController->SetOverrideLagForCompactionBeforeTierings(TDuration::Seconds(1));
+ csController->SetOverrideReduceMemoryIntervalLimit(1LLU << 30);
+
+ TLocalHelper helper(kikimr);
+ helper.SetOptionalStorageId(NOlap::NBlobOperations::TGlobal::DefaultStorageId);
+ helper.CreateTestOlapTable();
+ auto tableClient = kikimr.GetTableClient();
+
+ auto session = tableClient.CreateSession().GetValueSync().GetSession();
+ Tests::NCommon::TLoggerInit(kikimr).SetComponents({ NKikimrServices::TX_COLUMNSHARD }, "CS").SetPriority(NActors::NLog::PRI_DEBUG).Initialize();
+
+ WriteTestData(kikimr, "/Root/olapStore/olapTable", 1000000, 300000000, 10000);
+ csController->WaitIndexation(TDuration::Seconds(3));
+
+ {
+ auto result = session.ExecuteSchemeQuery("ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=ALTER_COLUMN, NAME=uid, `DATA_ACCESSOR_CONSTRUCTOR.CLASS_NAME`=`SPARSED`)").GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+
+ {
+ auto result = session.ExecuteSchemeQuery("ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=UPSERT_OPTIONS, SCHEME_NEED_ACTUALIZATION=`true`)").GetValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+ }
+ csController->WaitActualization(TDuration::Seconds(5));
+ {
+ auto it = tableClient.StreamExecuteScanQuery(R"(
+ --!syntax_v1
+ SELECT count(uid) FROM `/Root/olapStore/olapTable`
+ )").GetValueSync();
+ UNIT_ASSERT_C(it.IsSuccess(), it.GetIssues().ToString());
+ Cerr << StreamResultToYson(it) << Endl;
+ }
+ }
+
}
} // namespace
diff --git a/ydb/core/tx/columnshard/engines/scheme/column/info.cpp b/ydb/core/tx/columnshard/engines/scheme/column/info.cpp
index f139bc63bcd..c8e95fdb1db 100644
--- a/ydb/core/tx/columnshard/engines/scheme/column/info.cpp
+++ b/ydb/core/tx/columnshard/engines/scheme/column/info.cpp
@@ -91,7 +91,13 @@ std::vector<std::shared_ptr<NKikimr::NOlap::IPortionDataChunk>> TSimpleColumnInf
}
std::vector<std::shared_ptr<IPortionDataChunk>> result;
for (auto&& s : source) {
- auto data = sourceColumnFeatures.Loader->ApplyRawVerified(s->GetData());
+ std::shared_ptr<arrow::RecordBatch> data;
+ if (!DataAccessorConstructor.IsEqualTo(sourceColumnFeatures.DataAccessorConstructor)) {
+ auto chunkedArray = sourceColumnFeatures.Loader->ApplyVerified(s->GetData(), s->GetRecordsCountVerified());
+ data = DataAccessorConstructor.Construct(chunkedArray, Loader->BuildAccessorContext(s->GetRecordsCountVerified()));
+ } else {
+ data = sourceColumnFeatures.Loader->ApplyRawVerified(s->GetData());
+ }
result.emplace_back(s->CopyWithAnotherBlob(GetColumnSaver().Apply(data), *this));
}
return result;