diff options
author | chertus <azuikov@ydb.tech> | 2023-03-21 18:07:21 +0300 |
---|---|---|
committer | chertus <azuikov@ydb.tech> | 2023-03-21 18:07:21 +0300 |
commit | 2c6e3dd02e91c6fe15024fdc9cfb45e32c85ff3c (patch) | |
tree | d60eec4a5bc5d98a509e5f868f49312372583d39 | |
parent | cdee2b4033184293e7cb7a14321fe9f6a1cec7f2 (diff) | |
download | ydb-2c6e3dd02e91c6fe15024fdc9cfb45e32c85ff3c.tar.gz |
sliced version of deduplication
12 files changed, 306 insertions, 236 deletions
diff --git a/ydb/core/formats/arrow_helpers.cpp b/ydb/core/formats/arrow_helpers.cpp index c2fe9828f55..362c66a33cc 100644 --- a/ydb/core/formats/arrow_helpers.cpp +++ b/ydb/core/formats/arrow_helpers.cpp @@ -475,25 +475,51 @@ std::shared_ptr<arrow::RecordBatch> CombineSortedBatches(const std::vector<std:: std::vector<std::shared_ptr<arrow::RecordBatch>> MergeSortedBatches(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, const std::shared_ptr<TSortDescription>& description, - size_t maxBatchRows, ui64 limit) { + size_t maxBatchRows) { Y_VERIFY(maxBatchRows); ui64 numRows = 0; - TVector<NArrow::IInputStream::TPtr> streams; + std::vector<NArrow::IInputStream::TPtr> streams; + streams.reserve(batches.size()); for (auto& batch : batches) { - numRows += batch->num_rows(); - streams.push_back(std::make_shared<NArrow::TOneBatchInputStream>(batch)); + if (batch->num_rows()) { + numRows += batch->num_rows(); + streams.push_back(std::make_shared<NArrow::TOneBatchInputStream>(batch)); + } } std::vector<std::shared_ptr<arrow::RecordBatch>> out; out.reserve(numRows / maxBatchRows + 1); - auto mergeStream = std::make_shared<NArrow::TMergingSortedInputStream>(streams, description, maxBatchRows, limit); + auto mergeStream = std::make_shared<NArrow::TMergingSortedInputStream>(streams, description, maxBatchRows); while (std::shared_ptr<arrow::RecordBatch> batch = mergeStream->Read()) { out.push_back(batch); } return out; } +std::vector<std::shared_ptr<arrow::RecordBatch>> SliceSortedBatches(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, + const std::shared_ptr<TSortDescription>& description, + size_t maxBatchRows) { + Y_VERIFY(!description->Reverse); + + std::vector<NArrow::IInputStream::TPtr> streams; + streams.reserve(batches.size()); + for (auto& batch : batches) { + if (batch->num_rows()) { + streams.push_back(std::make_shared<NArrow::TOneBatchInputStream>(batch)); + } + } + + std::vector<std::shared_ptr<arrow::RecordBatch>> out; + out.reserve(streams.size()); + + auto dedupStream = std::make_shared<NArrow::TMergingSortedInputStream>(streams, description, maxBatchRows, true); + while (std::shared_ptr<arrow::RecordBatch> batch = dedupStream->Read()) { + out.push_back(batch); + } + return out; +} + // Check if the pertumation doesn't reoder anything bool IsNoOp(const arrow::UInt64Array& permutation) { for (i64 i = 0; i < permutation.length(); ++i) { diff --git a/ydb/core/formats/arrow_helpers.h b/ydb/core/formats/arrow_helpers.h index b3cff213787..8895d4bf92a 100644 --- a/ydb/core/formats/arrow_helpers.h +++ b/ydb/core/formats/arrow_helpers.h @@ -85,7 +85,10 @@ std::shared_ptr<arrow::RecordBatch> CombineSortedBatches(const std::vector<std:: const std::shared_ptr<TSortDescription>& description); std::vector<std::shared_ptr<arrow::RecordBatch>> MergeSortedBatches(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, const std::shared_ptr<TSortDescription>& description, - size_t maxBatchRows, ui64 limit = 0); + size_t maxBatchRows); +std::vector<std::shared_ptr<arrow::RecordBatch>> SliceSortedBatches(const std::vector<std::shared_ptr<arrow::RecordBatch>>& batches, + const std::shared_ptr<TSortDescription>& description, + size_t maxBatchRows = 0); std::vector<std::shared_ptr<arrow::RecordBatch>> ShardingSplit(const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<ui32>& sharding, ui32 numShards); diff --git a/ydb/core/formats/merging_sorted_input_stream.cpp b/ydb/core/formats/merging_sorted_input_stream.cpp index 5517cd0de59..8a5ee53b52b 100644 --- a/ydb/core/formats/merging_sorted_input_stream.cpp +++ b/ydb/core/formats/merging_sorted_input_stream.cpp @@ -8,17 +8,104 @@ namespace NKikimr::NArrow { +class TRowsBuffer : public IRowsBuffer { +public: + using TBuilders = std::vector<std::unique_ptr<arrow::ArrayBuilder>>; + + static constexpr const size_t BUFFER_SIZE = 256; + + TRowsBuffer(TBuilders& columns, size_t maxRows) + : Columns(columns) + , MaxRows(maxRows) + { + Rows.reserve(BUFFER_SIZE); + } + + bool AddRow(const TSortCursor& cursor) override { + Rows.emplace_back(cursor->all_columns, cursor->getRow()); + if (Rows.size() >= BUFFER_SIZE) { + Flush(); + } + ++AddedRows; + return true; + } + + void Flush() override { + if (Rows.empty()) { + return; + } + for (size_t i = 0; i < Columns.size(); ++i) { + arrow::ArrayBuilder& builder = *Columns[i]; + for (auto& [srcColumn, rowPosition] : Rows) { + Append(builder, *srcColumn->at(i), rowPosition); + } + } + Rows.clear(); + } + + bool Limit() const override { + return MaxRows && (AddedRows >= MaxRows); + } + +private: + TBuilders& Columns; + std::vector<std::pair<const TArrayVec*, size_t>> Rows; + size_t MaxRows = 0; + size_t AddedRows = 0; +}; + +class TSlicedRowsBuffer : public IRowsBuffer { +public: + TSlicedRowsBuffer(size_t maxRows) + : MaxRows(maxRows) + {} + + bool AddRow(const TSortCursor& cursor) override { + if (!Batch) { + Batch = cursor->current_batch; + Offset = cursor->getRow(); + } + if (Batch.get() != cursor->current_batch.get()) { + // append from another batch + return false; + } else if (cursor->getRow() != (Offset + AddedRows)) { + // append from the same batch with data hole + return false; + } + ++AddedRows; + return true; + } + + void Flush() override { + } + + bool Limit() const override { + return MaxRows && (AddedRows >= MaxRows); + } + + std::shared_ptr<arrow::RecordBatch> GetBatch() { + if (Batch) { + return Batch->Slice(Offset, AddedRows); + } + return {}; + } + +private: + std::shared_ptr<arrow::RecordBatch> Batch; + size_t Offset = 0; + size_t MaxRows = 0; + size_t AddedRows = 0; +}; + TMergingSortedInputStream::TMergingSortedInputStream(const std::vector<IInputStream::TPtr>& inputs, std::shared_ptr<TSortDescription> description, - size_t maxBatchRows, - ui64 limit) + size_t maxBatchRows, bool slice) : Description(description) , MaxBatchSize(maxBatchRows) - , Limit(limit) + , SliceSources(slice) , SourceBatches(inputs.size()) , Cursors(inputs.size()) { - Y_VERIFY(MaxBatchSize); Children.insert(Children.end(), inputs.begin(), inputs.end()); Header = Children.at(0)->Schema(); } @@ -41,7 +128,7 @@ void TMergingSortedInputStream::Init() { const size_t rows = batch->num_rows(); if (ExpectedBatchSize < rows) { - ExpectedBatchSize = std::min(rows, MaxBatchSize); + ExpectedBatchSize = MaxBatchSize ? std::min(rows, MaxBatchSize) : rows; } Cursors[i] = TSortCursorImpl(batch, Description, i); @@ -52,7 +139,7 @@ void TMergingSortedInputStream::Init() { /// Let's check that all source blocks have the same structure. for (const auto& batch : SourceBatches) { if (batch) { - Y_VERIFY(batch->schema()->Equals(*Header)); + Y_VERIFY_DEBUG(batch->schema()->Equals(*Header)); } } } @@ -62,24 +149,33 @@ std::shared_ptr<arrow::RecordBatch> TMergingSortedInputStream::ReadImpl() { return {}; } - if (Children.size() == 1 && !Description->Replace() && !Limit) { + if (Children.size() == 1 && !Description->Replace()) { return Children[0]->Read(); } if (First) { Init(); } - auto builders = NArrow::MakeBuilders(Header, ExpectedBatchSize); - if (builders.empty()) { - return {}; - } - Y_VERIFY(builders.size() == (size_t)Header->num_fields()); - Merge(builders, Queue); + if (SliceSources) { + Y_VERIFY_DEBUG(!Description->Reverse); + TSlicedRowsBuffer rowsBuffer(MaxBatchSize); + Merge(rowsBuffer, Queue); + return rowsBuffer.GetBatch(); + } else { + auto builders = NArrow::MakeBuilders(Header, ExpectedBatchSize); + if (builders.empty()) { + return {}; + } + + Y_VERIFY(builders.size() == (size_t)Header->num_fields()); + TRowsBuffer rowsBuffer(builders, MaxBatchSize); + Merge(rowsBuffer, Queue); - auto arrays = NArrow::Finish(std::move(builders)); - Y_VERIFY(arrays.size()); - return arrow::RecordBatch::Make(Header, arrays[0]->length(), arrays); + auto arrays = NArrow::Finish(std::move(builders)); + Y_VERIFY(arrays.size()); + return arrow::RecordBatch::Make(Header, arrays[0]->length(), arrays); + } } /// Get the next block from the corresponding source, if there is one. @@ -97,9 +193,8 @@ void TMergingSortedInputStream::FetchNextBatch(const TSortCursor& current, TSort } if (batch->num_rows()) { -#if 1 - Y_VERIFY(batch->schema()->Equals(*Header)); -#endif + Y_VERIFY_DEBUG(batch->schema()->Equals(*Header)); + Cursors[order].Reset(batch); queue.ReplaceTop(TSortCursor(&Cursors[order], Description->NotNull)); break; @@ -107,66 +202,43 @@ void TMergingSortedInputStream::FetchNextBatch(const TSortCursor& current, TSort } } -static void AppendResetRows(TMergingSortedInputStream::TBuilders& columns, - std::vector<std::pair<const TArrayVec*, size_t>>& rows) { - if (rows.empty()) { - return; - } - for (size_t i = 0; i < columns.size(); ++i) { - arrow::ArrayBuilder& builder = *columns[i]; - for (auto& [srcColumn, rowPosition] : rows) { - Append(builder, *srcColumn->at(i), rowPosition); - } - } - rows.clear(); -} - -/// Take rows in required order and put them into `mergedColumns`, +/// Take rows in required order and put them into `rowBuffer`, /// while the number of rows are no more than `max_block_size` -void TMergingSortedInputStream::Merge(TBuilders& mergedColumns, TSortingHeap& queue) { - static constexpr const size_t BUFFER_SIZE = 256; - std::vector<std::pair<const TArrayVec*, size_t>> rowsBuffer; - rowsBuffer.reserve(BUFFER_SIZE); - - for (size_t mergedRows = 0; queue.IsValid();) { - if (Limit && TotalMergedRows >= Limit) { - break; - } - if (mergedRows >= MaxBatchSize) { - AppendResetRows(mergedColumns, rowsBuffer); +void TMergingSortedInputStream::Merge(IRowsBuffer& rowsBuffer, TSortingHeap& queue) { + while (queue.IsValid()) { + if (rowsBuffer.Limit()) { + rowsBuffer.Flush(); return; } auto current = queue.Current(); - bool append = true; if (Description->Replace()) { auto key = std::make_shared<TReplaceKey>(current->replace_columns, current->getRow()); - if (PrevKey && *key == *PrevKey) { - append = false; - } - PrevKey = key; - } - - if (append) { - rowsBuffer.emplace_back(current->all_columns.get(), current->getRow()); - ++mergedRows; - ++TotalMergedRows; - } + bool isDup = (PrevKey && *key == *PrevKey); - if (rowsBuffer.size() == BUFFER_SIZE) { - AppendResetRows(mergedColumns, rowsBuffer); + if (isDup || rowsBuffer.AddRow(current)) { + PrevKey = key; + } else { + rowsBuffer.Flush(); + return; + } + } else { + if (!rowsBuffer.AddRow(current)) { + rowsBuffer.Flush(); + return; + } } if (!current->isLast()) { queue.Next(); } else { - AppendResetRows(mergedColumns, rowsBuffer); + rowsBuffer.Flush(); FetchNextBatch(current, queue); } } - AppendResetRows(mergedColumns, rowsBuffer); + rowsBuffer.Flush(); /// We have read all data. Ask children to cancel providing more data. Cancel(); diff --git a/ydb/core/formats/merging_sorted_input_stream.h b/ydb/core/formats/merging_sorted_input_stream.h index 1881663bbaf..e66493d57a2 100644 --- a/ydb/core/formats/merging_sorted_input_stream.h +++ b/ydb/core/formats/merging_sorted_input_stream.h @@ -8,15 +8,18 @@ namespace NKikimr::NArrow { +struct IRowsBuffer { + virtual bool AddRow(const TSortCursor& cursor) = 0; + virtual void Flush() = 0; + virtual bool Limit() const = 0; +}; + /// Merges several sorted streams into one sorted stream. class TMergingSortedInputStream : public IInputStream { public: - using TBuilders = std::vector<std::unique_ptr<arrow::ArrayBuilder>>; - TMergingSortedInputStream(const std::vector<IInputStream::TPtr>& inputs, std::shared_ptr<TSortDescription> description, - size_t maxBatchRows, - ui64 limit = 0); + size_t maxBatchRows, bool slice = false); std::shared_ptr<arrow::Schema> Schema() const override { return Header; } @@ -27,8 +30,7 @@ private: std::shared_ptr<arrow::Schema> Header; std::shared_ptr<TSortDescription> Description; const ui64 MaxBatchSize; - ui64 Limit; - ui64 TotalMergedRows = 0; + const bool SliceSources; bool First = true; bool Finished = false; ui64 ExpectedBatchSize = 0; /// May be smaller or equal to max_block_size. To do 'reserve' for columns. @@ -41,7 +43,7 @@ private: void Init(); void FetchNextBatch(const TSortCursor& current, TSortingHeap& queue); - void Merge(TBuilders& builders, TSortingHeap& queue); + void Merge(IRowsBuffer& rowsBuffer, TSortingHeap& queue); }; } diff --git a/ydb/core/formats/sort_cursor.h b/ydb/core/formats/sort_cursor.h index 0ff9803ec14..36e249c2a2e 100644 --- a/ydb/core/formats/sort_cursor.h +++ b/ydb/core/formats/sort_cursor.h @@ -45,10 +45,12 @@ struct TSortDescription { /// Cursor moves inside single block. It is used in priority queue. struct TSortCursorImpl { std::shared_ptr<TArrayVec> sort_columns; - std::shared_ptr<TArrayVec> all_columns; - std::shared_ptr<TArrayVec> replace_columns; std::shared_ptr<TSortDescription> desc; ui32 order = 0; // Number of cursor. It determines an order if comparing columns are equal. + // + std::shared_ptr<arrow::RecordBatch> current_batch; + const TArrayVec* all_columns; + std::shared_ptr<TArrayVec> replace_columns; TSortCursorImpl() = default; @@ -64,8 +66,9 @@ struct TSortCursorImpl { size_t LastRow() const { return Rows() - 1; } void Reset(std::shared_ptr<arrow::RecordBatch> batch) { + current_batch = batch; sort_columns = std::make_shared<TArrayVec>(ExtractColumns(batch, desc->SortingKey)->columns()); - all_columns = std::make_shared<TArrayVec>(batch->columns()); + all_columns = &batch->columns(); if (desc->ReplaceKey) { replace_columns = std::make_shared<TArrayVec>(ExtractColumns(batch, desc->ReplaceKey)->columns()); } diff --git a/ydb/core/formats/ut_arrow.cpp b/ydb/core/formats/ut_arrow.cpp index d49c04fd389..3554d8e7f22 100644 --- a/ydb/core/formats/ut_arrow.cpp +++ b/ydb/core/formats/ut_arrow.cpp @@ -692,25 +692,6 @@ Y_UNIT_TEST_SUITE(ArrowTest) { UNIT_ASSERT(NArrow::IsSorted(sorted[0], descr->SortingKey)); UNIT_ASSERT(NArrow::IsSorted(sorted[1], descr->SortingKey)); UNIT_ASSERT(RestoreOne(sorted[0], 499) <= RestoreOne(sorted[1], 0)); - - { // maxBatchSize = 1000, limit = 500 - TVector<NArrow::IInputStream::TPtr> streams; - for (auto& batch : batches) { - streams.push_back(std::make_shared<NArrow::TOneBatchInputStream>(batch)); - } - - NArrow::IInputStream::TPtr mergeStream = - std::make_shared<NArrow::TMergingSortedInputStream>(streams, descr, 1000, 500); - - while (auto batch = mergeStream->Read()) { - sorted.emplace_back(batch); - } - } - - UNIT_ASSERT_VALUES_EQUAL(sorted.size(), 3); - UNIT_ASSERT_VALUES_EQUAL(sorted[2]->num_rows(), 500); - UNIT_ASSERT(CheckSorted(sorted[2])); - UNIT_ASSERT_VALUES_EQUAL(RestoreOne(sorted[0], 499), RestoreOne(sorted[2], 499)); } Y_UNIT_TEST(MergingSortedInputStreamReversed) { @@ -751,25 +732,6 @@ Y_UNIT_TEST_SUITE(ArrowTest) { UNIT_ASSERT(NArrow::IsSorted(sorted[0], descr->SortingKey, true)); UNIT_ASSERT(NArrow::IsSorted(sorted[1], descr->SortingKey, true)); UNIT_ASSERT(RestoreOne(sorted[0], 499) >= RestoreOne(sorted[1], 0)); - - { // maxBatchSize = 1000, limit = 500 - TVector<NArrow::IInputStream::TPtr> streams; - for (auto& batch : batches) { - streams.push_back(std::make_shared<NArrow::TOneBatchInputStream>(batch)); - } - - NArrow::IInputStream::TPtr mergeStream = - std::make_shared<NArrow::TMergingSortedInputStream>(streams, descr, 1000, 500); - - while (auto batch = mergeStream->Read()) { - sorted.emplace_back(batch); - } - } - - UNIT_ASSERT_VALUES_EQUAL(sorted.size(), 3); - UNIT_ASSERT_VALUES_EQUAL(sorted[2]->num_rows(), 500); - UNIT_ASSERT(CheckSorted(sorted[2], true)); - UNIT_ASSERT_VALUES_EQUAL(RestoreOne(sorted[0], 499), RestoreOne(sorted[2], 499)); } Y_UNIT_TEST(MergingSortedInputStreamReplace) { diff --git a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp index 502d2ad8baa..0ddde19a5a4 100644 --- a/ydb/core/tx/columnshard/engines/indexed_read_data.cpp +++ b/ydb/core/tx/columnshard/engines/indexed_read_data.cpp @@ -84,22 +84,44 @@ std::vector<std::shared_ptr<arrow::RecordBatch>> SpecialMergeSorted(const std::v continue; } - // The core of optimization: do not merge slice if it's alone in its key range + // Do not merge slice if it's alone in its key range if (slices.size() == 1) { auto batch = slices[0]; if (batchesToDedup.count(batch.get())) { +#if 1 + auto deduped = SliceSortedBatches({batch}, description); + for (auto& batch : deduped) { + if (batch && batch->num_rows()) { + Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, description->ReplaceKey)); + out.push_back(batch); + } + } +#else if (!NArrow::IsSortedAndUnique(batch, description->ReplaceKey)) { batch = NArrow::CombineSortedBatches({batch}, description); Y_VERIFY(batch); + Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, description->ReplaceKey)); + out.push_back(batch); } +#endif + } else { + Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, description->ReplaceKey)); + out.push_back(batch); } - Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, description->ReplaceKey)); - out.push_back(batch); continue; } - +#if 1 + auto deduped = SliceSortedBatches(slices, description); + for (auto& batch : deduped) { + if (batch && batch->num_rows()) { + Y_VERIFY_DEBUG(NArrow::IsSortedAndUnique(batch, description->ReplaceKey)); + out.push_back(batch); + } + } +#else auto batch = NArrow::CombineSortedBatches(slices, description); out.push_back(batch); +#endif } return out; diff --git a/ydb/core/tx/columnshard/read_actor.cpp b/ydb/core/tx/columnshard/read_actor.cpp index 69e8dc37c20..11b053aea16 100644 --- a/ydb/core/tx/columnshard/read_actor.cpp +++ b/ydb/core/tx/columnshard/read_actor.cpp @@ -67,6 +67,8 @@ public: } auto ready = IndexedData.GetReadyResults(Max<i64>()); + LOG_S_TRACE("Ready results with " << ready.size() << " batches at tablet " << TabletId << " (read)"); + size_t next = 1; for (auto it = ready.begin(); it != ready.end(); ++it, ++next) { bool lastOne = Finished() && (next == ready.size()); diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp index 4cfae6e9d4b..9bcdb62f9f0 100644 --- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp +++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp @@ -19,7 +19,7 @@ using TTypeId = NScheme::TTypeId; using TTypeInfo = NScheme::TTypeInfo; template <typename TKey = ui64> -bool DataHas(const TVector<TString>& blobs, const TString& srtSchema, std::pair<ui64, ui64> range, +bool DataHas(const std::vector<TString>& blobs, const TString& srtSchema, std::pair<ui64, ui64> range, bool requireUniq = false) { static constexpr const bool isStrKey = std::is_same_v<TKey, std::string>; @@ -79,7 +79,7 @@ bool DataHas(const TVector<TString>& blobs, const TString& srtSchema, std::pair< } template <typename TKey = ui64> -bool DataHasOnly(const TVector<TString>& blobs, const TString& srtSchema, std::pair<ui64, ui64> range) { +bool DataHasOnly(const std::vector<TString>& blobs, const TString& srtSchema, std::pair<ui64, ui64> range) { static constexpr const bool isStrKey = std::is_same_v<TKey, std::string>; THashSet<TKey> keys; @@ -225,35 +225,51 @@ bool CheckOrdered(const TString& blob, const TString& srtSchema) { } ui64 prev{}; + TString strPrev; for (int i = 0; i < array->length(); ++i) { ui64 value{}; + TString strValue; NArrow::SwitchType(array->type_id(), [&](const auto& type) { using TWrap = std::decay_t<decltype(type)>; using TArray = typename arrow::TypeTraits<typename TWrap::T>::ArrayType; -#if 0 - if constexpr (isStrKey && arrow::has_string_view<typename TWrap::T>()) { - value = static_cast<const TArray&>(*array).GetView(i); - return true; - } -#endif - if constexpr (/*!isStrKey && */arrow::has_c_type<typename TWrap::T>()) { + + if constexpr (arrow::has_c_type<typename TWrap::T>()) { auto& column = static_cast<const TArray&>(*array); value = column.Value(i); return true; } + if constexpr (arrow::is_base_binary_type<typename TWrap::T>()) { + auto v = static_cast<const TArray&>(*array).GetView(i); + strValue = TString(v.data(), v.size()); + return true; + } + + Cerr << array->type()->ToString() << "\n"; UNIT_ASSERT(false); return false; }); - if (!i) { - prev = value; - continue; - } + if (arrow::is_base_binary_like(array->type_id())) { + if (!i) { + strPrev = strValue; + continue; + } - if (prev > value) { - Cerr << "Unordered: " << prev << " " << value << "\n"; - return false; + if (strPrev > strValue) { + Cerr << "Unordered: " << strPrev << " " << strValue << "\n"; + return false; + } + } else { + if (!i) { + prev = value; + continue; + } + + if (prev > value) { + Cerr << "Unordered: " << prev << " " << value << "\n"; + return false; + } } } return true; @@ -317,6 +333,38 @@ void SetupSchema(TTestBasicRuntime& runtime, TActorId& sender, ui64 pathId, SetupSchema(runtime, sender, pathId, table, codec); } +std::vector<TString> ReadManyResults(TTestBasicRuntime& runtime, TString& schema, + NKikimrTxColumnShard::TMetadata& meta, ui32 expected = 1000) { + std::vector<TString> readData; + TAutoPtr<IEventHandle> handle; + bool finished = false; + for (ui32 i = 0; i < expected; ++i) { + auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle); + UNIT_ASSERT(event); + + auto& resRead = Proto(event); + UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0); + UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), TTestTxConfig::TxTablet1); + UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS); + UNIT_ASSERT(resRead.GetData().size() > 0); + //UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0); + //Cerr << "GOT BATCH " << resRead.GetBatch() << " data size " << resRead.GetData().size() << "\n"; + if (resRead.GetFinished()) { + expected = resRead.GetBatch() + 1; + meta = resRead.GetMeta(); + finished = true; + } + readData.push_back(resRead.GetData()); + + if (schema.empty()) { + schema = resRead.GetMeta().GetSchema(); + } + UNIT_ASSERT(CheckOrdered(resRead.GetData(), schema)); + } + UNIT_ASSERT(finished); + return readData; +} + void TestWrite(const TestTableDescription& table) { TTestBasicRuntime runtime; TTester::Setup(runtime); @@ -459,21 +507,11 @@ void TestWriteReadDup() { if (planStep != initPlanStep) { ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, new TEvColumnShard::TEvRead(sender, metaShard, planStep-1, Max<ui64>(), tableId)); - auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle); - UNIT_ASSERT(event); - auto& resRead = Proto(event); - UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0); - UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard); - UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS); - UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0); - UNIT_ASSERT_EQUAL(resRead.GetFinished(), true); - UNIT_ASSERT(resRead.GetData().size() > 0); - - auto data = resRead.GetData(); - auto meta = resRead.GetMeta(); - UNIT_ASSERT(CheckColumns(data, meta, TTestSchema::ExtractNames(ydbSchema), numRows)); - UNIT_ASSERT(DataHas(TVector<TString>{data}, meta.GetSchema(), portion, true)); + TString schema; + NKikimrTxColumnShard::TMetadata meta; + std::vector<TString> readData = ReadManyResults(runtime, schema, meta); + UNIT_ASSERT(DataHas(readData, schema, portion, true)); } } } @@ -815,29 +853,11 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, new TEvColumnShard::TEvRead(sender, metaShard, 23, txId, tableId)); - TVector<TString> readData; + TString schema; - ui32 expected = 1000; - for (ui32 i = 0; i < expected; ++i) { - auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle); - UNIT_ASSERT(event); + NKikimrTxColumnShard::TMetadata meta; + std::vector<TString> readData = ReadManyResults(runtime, schema, meta); - auto& resRead = Proto(event); - UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0); - UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard); - UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS); - UNIT_ASSERT(resRead.GetData().size() > 0); - //UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0); - //UNIT_ASSERT_EQUAL(resRead.GetFinished(), true); - if (resRead.GetFinished()) { - expected = resRead.GetBatch() + 1; - } - readData.push_back(resRead.GetData()); - if (schema.empty()) { - schema = resRead.GetMeta().GetSchema(); - } - UNIT_ASSERT(CheckOrdered(resRead.GetData(), schema)); - } UNIT_ASSERT(DataHas(readData, schema, portion[0])); UNIT_ASSERT(DataHas(readData, schema, portion[1])); UNIT_ASSERT(DataHas(readData, schema, portion[2])); @@ -856,7 +876,7 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString new TEvColumnShard::TEvRead(sender, metaShard, 24, txId, tableId)); readData.clear(); schema.clear(); - expected = 1000; + ui32 expected = 1000; for (ui32 i = 0; i < expected; ++i) { auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle); UNIT_ASSERT(event); @@ -1096,23 +1116,11 @@ void TestCompactionInGranuleImpl(bool reboots, Proto(read.get()).AddColumnNames("message"); ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, read.release()); - auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle); - UNIT_ASSERT(event); - - auto& resRead = Proto(event); - UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0); - UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard); - UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS); - UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0); - UNIT_ASSERT_EQUAL(resRead.GetFinished(), true); - UNIT_ASSERT(resRead.GetData().size() > 0); - UNIT_ASSERT(resRead.HasMeta()); - auto& meta = resRead.GetMeta(); - auto& schema = meta.GetSchema(); + TString schema; + NKikimrTxColumnShard::TMetadata meta; + std::vector<TString> readData = ReadManyResults(runtime, schema, meta); - TVector<TString> readData; - readData.push_back(resRead.GetData()); if (ydbPk[0].second == TTypeInfo(NTypeIds::String) || ydbPk[0].second == TTypeInfo(NTypeIds::Utf8)) { UNIT_ASSERT(DataHas<std::string>(readData, schema, triggerPortion, true)); UNIT_ASSERT(DataHas<std::string>(readData, schema, smallWrites, true)); @@ -2043,45 +2051,23 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, read.release()); } - ui32 expected = 0; - ui32 num = 0; - TVector<TString> readData; - while (!expected || num < expected) { - auto event = runtime.GrabEdgeEvent<TEvColumnShard::TEvReadResult>(handle); - UNIT_ASSERT(event); - - auto& resRead = Proto(event); - UNIT_ASSERT_EQUAL(resRead.GetOrigin(), TTestTxConfig::TxTablet0); - UNIT_ASSERT_EQUAL(resRead.GetTxInitiator(), metaShard); - UNIT_ASSERT_EQUAL(resRead.GetStatus(), NKikimrTxColumnShard::EResultStatus::SUCCESS); - UNIT_ASSERT(resRead.GetData().size() > 0); - //UNIT_ASSERT_EQUAL(resRead.GetBatch(), 0); - - readData.push_back(resRead.GetData()); - - if (resRead.GetFinished()) { - expected = resRead.GetBatch() + 1; - - UNIT_ASSERT(resRead.HasMeta()); - - auto& meta = resRead.GetMeta(); - schema = meta.GetSchema(); - - UNIT_ASSERT(meta.HasReadStats()); - auto& readStats = meta.GetReadStats(); - - UNIT_ASSERT(readStats.GetBeginTimestamp() > 0); - UNIT_ASSERT(readStats.GetDurationUsec() > 0); - UNIT_ASSERT_VALUES_EQUAL(readStats.GetSelectedIndex(), 0); - UNIT_ASSERT(readStats.GetIndexBatches() > 0); - //UNIT_ASSERT_VALUES_EQUAL(readStats.GetNotIndexedBatches(), 0); // TODO - UNIT_ASSERT_VALUES_EQUAL(readStats.GetUsedColumns(), 7); // planStep, txId + 4 PK columns + "message" - UNIT_ASSERT_VALUES_EQUAL(readStats.GetIndexGranules(), 3); // got 2 split compactions - //UNIT_ASSERT_VALUES_EQUAL(readStats.GetIndexPortions(), x); - } - - ++num; - UNIT_ASSERT(num < 100); + TString schema; + NKikimrTxColumnShard::TMetadata meta; + std::vector<TString> readData = ReadManyResults(runtime, schema, meta, 20000); + { + schema = meta.GetSchema(); + + UNIT_ASSERT(meta.HasReadStats()); + auto& readStats = meta.GetReadStats(); + + UNIT_ASSERT(readStats.GetBeginTimestamp() > 0); + UNIT_ASSERT(readStats.GetDurationUsec() > 0); + UNIT_ASSERT_VALUES_EQUAL(readStats.GetSelectedIndex(), 0); + UNIT_ASSERT(readStats.GetIndexBatches() > 0); + //UNIT_ASSERT_VALUES_EQUAL(readStats.GetNotIndexedBatches(), 0); // TODO + UNIT_ASSERT_VALUES_EQUAL(readStats.GetUsedColumns(), 7); // planStep, txId + 4 PK columns + "message" + UNIT_ASSERT_VALUES_EQUAL(readStats.GetIndexGranules(), 3); // got 2 split compactions + //UNIT_ASSERT_VALUES_EQUAL(readStats.GetIndexPortions(), x); } if (isStrPk0) { diff --git a/ydb/tests/functional/clickbench/canondata/test.test_run_determentistic_column_/queries-deterministic-results-3 b/ydb/tests/functional/clickbench/canondata/test.test_run_determentistic_column_/queries-deterministic-results-3 index 8527397e250..0637a088a01 100644 --- a/ydb/tests/functional/clickbench/canondata/test.test_run_determentistic_column_/queries-deterministic-results-3 +++ b/ydb/tests/functional/clickbench/canondata/test.test_run_determentistic_column_/queries-deterministic-results-3 @@ -1,5 +1 @@ -[ - { - "column0": "4.563964e+14" - } -]
\ No newline at end of file +[]
\ No newline at end of file diff --git a/ydb/tests/functional/clickbench/canondata/test.test_run_determentistic_row_/queries-deterministic-results-3 b/ydb/tests/functional/clickbench/canondata/test.test_run_determentistic_row_/queries-deterministic-results-3 index f4c06cfe3ec..0637a088a01 100644 --- a/ydb/tests/functional/clickbench/canondata/test.test_run_determentistic_row_/queries-deterministic-results-3 +++ b/ydb/tests/functional/clickbench/canondata/test.test_run_determentistic_row_/queries-deterministic-results-3 @@ -1,5 +1 @@ -[ - { - "column0": "-7.588116e+17" - } -]
\ No newline at end of file +[]
\ No newline at end of file diff --git a/ydb/tests/functional/clickbench/data/queries-deterministic.sql b/ydb/tests/functional/clickbench/data/queries-deterministic.sql index e22447198e3..bf87ac6b459 100644 --- a/ydb/tests/functional/clickbench/data/queries-deterministic.sql +++ b/ydb/tests/functional/clickbench/data/queries-deterministic.sql @@ -1,7 +1,7 @@ /*0*/ SELECT COUNT(*) FROM $data; /*1*/ SELECT COUNT(*) FROM $data WHERE AdvEngineID <> 0; /*2*/ SELECT SUM(AdvEngineID), COUNT(*), AVG(ResolutionWidth) FROM $data; -/*3*/ SELECT AVG(UserID) FROM $data; +--/*3*/ SELECT AVG(UserID) FROM $data; /*4*/ SELECT COUNT(DISTINCT UserID) FROM $data; /*5*/ SELECT COUNT(DISTINCT SearchPhrase) FROM $data; /*6*/ SELECT MIN(EventDate), MAX(EventDate) FROM $data; |