diff options
author | ivanmorozov <[email protected]> | 2023-10-24 11:43:41 +0300 |
---|---|---|
committer | ivanmorozov <[email protected]> | 2023-10-24 12:21:51 +0300 |
commit | f19c91a9b704d458b93485a4df46957199b777f7 (patch) | |
tree | c7f16e4dda938c31462a862f915ec71235ae4547 | |
parent | 1e2c03467416b388d4ba5520eb43098143058796 (diff) |
KIKIMR-19825: use splitting context for provide info about first/last keys necessary and fields contains
-rw-r--r-- | ydb/core/formats/arrow/permutations.cpp | 13 | ||||
-rw-r--r-- | ydb/core/formats/arrow/size_calcer.cpp | 43 | ||||
-rw-r--r-- | ydb/core/formats/arrow/size_calcer.h | 41 | ||||
-rw-r--r-- | ydb/core/formats/arrow/special_keys.cpp | 2 | ||||
-rw-r--r-- | ydb/core/formats/arrow/special_keys.h | 2 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/columnshard__write.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/operations/slice_builder.cpp | 4 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/operations/slice_builder.h | 6 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/operations/write.cpp | 2 | ||||
-rw-r--r-- | ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp | 7 | ||||
-rw-r--r-- | ydb/core/tx/ev_write/columnshard_splitter.h | 6 |
12 files changed, 87 insertions, 43 deletions
diff --git a/ydb/core/formats/arrow/permutations.cpp b/ydb/core/formats/arrow/permutations.cpp index ef11ab2c498..a6e75019f7c 100644 --- a/ydb/core/formats/arrow/permutations.cpp +++ b/ydb/core/formats/arrow/permutations.cpp @@ -142,14 +142,23 @@ std::shared_ptr<arrow::Array> CopyRecords(const std::shared_ptr<arrow::Array>& s std::unique_ptr<arrow::ArrayBuilder> builder; TStatusValidator::Validate(arrow::MakeBuilder(arrow::default_memory_pool(), source->type(), &builder)); + auto& builderImpl = static_cast<TBuilder&>(*builder); + + if constexpr (arrow::has_string_view<typename TWrap::T>::value) { + ui64 sumByIndexes = 0; + for (auto&& idx : indexes) { + sumByIndexes += column.GetView(idx).size(); + } + TStatusValidator::Validate(builderImpl.ReserveData(sumByIndexes)); + } + TStatusValidator::Validate(builder->Reserve(indexes.size())); { - auto& builderImpl = static_cast<TBuilder&>(*builder); const ui32 arraySize = column.length(); for (auto&& i : indexes) { Y_ABORT_UNLESS(i < arraySize); - TStatusValidator::Validate(builderImpl.Append(column.GetView(i))); + builderImpl.UnsafeAppend(column.GetView(i)); } } diff --git a/ydb/core/formats/arrow/size_calcer.cpp b/ydb/core/formats/arrow/size_calcer.cpp index 1e0acf4484e..6b35f8eeadb 100644 --- a/ydb/core/formats/arrow/size_calcer.cpp +++ b/ydb/core/formats/arrow/size_calcer.cpp @@ -8,11 +8,11 @@ namespace NKikimr::NArrow { -TSplitBlobResult SplitByBlobSize(const std::shared_ptr<arrow::RecordBatch>& batch, const ui32 sizeLimit) { +TSplitBlobResult SplitByBlobSize(const std::shared_ptr<arrow::RecordBatch>& batch, const TBatchSplitttingContext& context) { std::vector<TSerializedBatch> resultLocal; TString errorMessage; - if (GetBatchDataSize(batch) <= sizeLimit) { - if (!TSerializedBatch::BuildWithLimit(batch, sizeLimit, resultLocal, &errorMessage)) { + if (GetBatchDataSize(batch) <= context.GetSizeLimit()) { + if (!TSerializedBatch::BuildWithLimit(batch, context, resultLocal, &errorMessage)) { return TSplitBlobResult("full batch splitting: " + errorMessage); } else { return TSplitBlobResult(std::move(resultLocal)); @@ -26,14 +26,14 @@ TSplitBlobResult SplitByBlobSize(const std::shared_ptr<arrow::RecordBatch>& batc ui32 startIdx = 0; for (ui32 i = 0; i < batch->num_rows(); ++i) { const ui32 rowSize = rowCalculator.GetRowBytesSize(i); - if (rowSize > sizeLimit) { - return TSplitBlobResult("there is row with size more then limit (" + ::ToString(sizeLimit) + ")"); + if (rowSize > context.GetSizeLimit()) { + return TSplitBlobResult("there is row with size more then limit (" + ::ToString(context.GetSizeLimit()) + ")"); } - if (rowCalculator.GetApproxSerializeSize(currentSize + rowSize) > sizeLimit) { + if (rowCalculator.GetApproxSerializeSize(currentSize + rowSize) > context.GetSizeLimit()) { if (!currentSize) { - return TSplitBlobResult("there is row with size + metadata more then limit (" + ::ToString(sizeLimit) + ")"); + return TSplitBlobResult("there is row with size + metadata more then limit (" + ::ToString(context.GetSizeLimit()) + ")"); } - if (!TSerializedBatch::BuildWithLimit(batch->Slice(startIdx, i - startIdx), sizeLimit, resultLocal, &errorMessage)) { + if (!TSerializedBatch::BuildWithLimit(batch->Slice(startIdx, i - startIdx), context, resultLocal, &errorMessage)) { return TSplitBlobResult("cannot build blobs for batch slice (" + ::ToString(i - startIdx) + " rows): " + errorMessage); } currentSize = 0; @@ -42,7 +42,7 @@ TSplitBlobResult SplitByBlobSize(const std::shared_ptr<arrow::RecordBatch>& batc currentSize += rowSize; } if (currentSize) { - if (!TSerializedBatch::BuildWithLimit(batch->Slice(startIdx, batch->num_rows() - startIdx), sizeLimit, resultLocal, &errorMessage)) { + if (!TSerializedBatch::BuildWithLimit(batch->Slice(startIdx, batch->num_rows() - startIdx), context, resultLocal, &errorMessage)) { return TSplitBlobResult("cannot build blobs for last batch slice (" + ::ToString(batch->num_rows() - startIdx) + " rows): " + errorMessage); } } @@ -214,15 +214,18 @@ ui64 GetArrayDataSize(const std::shared_ptr<arrow::Array>& column) { return bytes; } -NKikimr::NArrow::TSerializedBatch TSerializedBatch::Build(std::shared_ptr<arrow::RecordBatch> batch) { - TFirstLastSpecialKeys specialKeys(batch); +NKikimr::NArrow::TSerializedBatch TSerializedBatch::Build(std::shared_ptr<arrow::RecordBatch> batch, const TBatchSplitttingContext& context) { + std::optional<TFirstLastSpecialKeys> specialKeys; + if (context.GetFieldsForSpecialKeys().size()) { + specialKeys = TFirstLastSpecialKeys(batch, context.GetFieldsForSpecialKeys()); + } return TSerializedBatch(NArrow::SerializeSchema(*batch->schema()), NArrow::SerializeBatchNoCompression(batch), batch->num_rows(), NArrow::GetBatchDataSize(batch), specialKeys); } -bool TSerializedBatch::BuildWithLimit(std::shared_ptr<arrow::RecordBatch> batch, const ui32 sizeLimit, std::optional<TSerializedBatch>& sbL, std::optional<TSerializedBatch>& sbR, TString* errorMessage) { - TSerializedBatch sb = TSerializedBatch::Build(batch); +bool TSerializedBatch::BuildWithLimit(std::shared_ptr<arrow::RecordBatch> batch, const TBatchSplitttingContext& context, std::optional<TSerializedBatch>& sbL, std::optional<TSerializedBatch>& sbR, TString* errorMessage) { + TSerializedBatch sb = TSerializedBatch::Build(batch, context); const ui32 length = batch->num_rows(); - if (sb.GetSize() <= sizeLimit) { + if (sb.GetSize() <= context.GetSizeLimit()) { sbL = std::move(sb); return true; } else if (length == 1) { @@ -232,13 +235,13 @@ bool TSerializedBatch::BuildWithLimit(std::shared_ptr<arrow::RecordBatch> batch, return false; } else { const ui32 delta = length / 2; - TSerializedBatch localSbL = TSerializedBatch::Build(batch->Slice(0, delta)); - TSerializedBatch localSbR = TSerializedBatch::Build(batch->Slice(delta, length - delta)); - if (localSbL.GetSize() > sizeLimit || localSbR.GetSize() > sizeLimit) { + TSerializedBatch localSbL = TSerializedBatch::Build(batch->Slice(0, delta), context); + TSerializedBatch localSbR = TSerializedBatch::Build(batch->Slice(delta, length - delta), context); + if (localSbL.GetSize() > context.GetSizeLimit() || localSbR.GetSize() > context.GetSizeLimit()) { if (errorMessage) { *errorMessage = TStringBuilder() << "original batch too big: " << sb.GetSize() << " and after 2 parts split we have: " << localSbL.GetSize() << "(" << localSbL.GetRowsCount() << ")" << " / " - << localSbR.GetSize() << "(" << localSbR.GetRowsCount() << ")" << " part sizes. Its unexpected for limit " << sizeLimit; + << localSbR.GetSize() << "(" << localSbR.GetRowsCount() << ")" << " part sizes. Its unexpected for limit " << context.GetSizeLimit(); } return false; } @@ -248,10 +251,10 @@ bool TSerializedBatch::BuildWithLimit(std::shared_ptr<arrow::RecordBatch> batch, } } -bool TSerializedBatch::BuildWithLimit(std::shared_ptr<arrow::RecordBatch> batch, const ui32 sizeLimit, std::vector<TSerializedBatch>& result, TString* errorMessage) { +bool TSerializedBatch::BuildWithLimit(std::shared_ptr<arrow::RecordBatch> batch, const TBatchSplitttingContext& context, std::vector<TSerializedBatch>& result, TString* errorMessage) { std::optional<TSerializedBatch> sbL; std::optional<TSerializedBatch> sbR; - if (!TSerializedBatch::BuildWithLimit(batch, sizeLimit, sbL, sbR, errorMessage)) { + if (!TSerializedBatch::BuildWithLimit(batch, context, sbL, sbR, errorMessage)) { return false; } if (sbL) { diff --git a/ydb/core/formats/arrow/size_calcer.h b/ydb/core/formats/arrow/size_calcer.h index 4d14c0cf5aa..259a93836ae 100644 --- a/ydb/core/formats/arrow/size_calcer.h +++ b/ydb/core/formats/arrow/size_calcer.h @@ -46,29 +46,54 @@ public: ui32 GetRowBytesSize(const ui32 row) const; }; +class TBatchSplitttingContext { +private: + YDB_ACCESSOR(ui64, SizeLimit, 6 * 1024 * 1024); + YDB_ACCESSOR_DEF(std::vector<TString>, FieldsForSpecialKeys); +public: + explicit TBatchSplitttingContext(const ui64 size) + : SizeLimit(size) + { + + } + + void SetFieldsForSpecialKeys(const std::shared_ptr<arrow::Schema>& schema) { + std::vector<TString> local; + for (auto&& i : schema->fields()) { + local.emplace_back(i->name()); + } + std::swap(local, FieldsForSpecialKeys); + } +}; + class TSerializedBatch { private: YDB_READONLY_DEF(TString, SchemaData); YDB_READONLY_DEF(TString, Data); YDB_READONLY(ui32, RowsCount, 0); YDB_READONLY(ui32, RawBytes, 0); - TFirstLastSpecialKeys SpecialKeys; + std::optional<TFirstLastSpecialKeys> SpecialKeys; public: size_t GetSize() const { return Data.size(); } - const TFirstLastSpecialKeys& GetSpecialKeys() const { - return SpecialKeys; + const TFirstLastSpecialKeys& GetSpecialKeysSafe() const { + AFL_VERIFY(SpecialKeys); + return *SpecialKeys; + } + + bool HasSpecialKeys() const { + return !!SpecialKeys; } TString DebugString() const; - static bool BuildWithLimit(std::shared_ptr<arrow::RecordBatch> batch, const ui32 sizeLimit, std::vector<TSerializedBatch>& result, TString* errorMessage); - static bool BuildWithLimit(std::shared_ptr<arrow::RecordBatch> batch, const ui32 sizeLimit, std::optional<TSerializedBatch>& sbL, std::optional<TSerializedBatch>& sbR, TString* errorMessage); - static TSerializedBatch Build(std::shared_ptr<arrow::RecordBatch> batch); + static bool BuildWithLimit(std::shared_ptr<arrow::RecordBatch> batch, const TBatchSplitttingContext& context, std::vector<TSerializedBatch>& result, TString* errorMessage); + static bool BuildWithLimit(std::shared_ptr<arrow::RecordBatch> batch, const TBatchSplitttingContext& context, std::optional<TSerializedBatch>& sbL, std::optional<TSerializedBatch>& sbR, TString* errorMessage); + static TSerializedBatch Build(std::shared_ptr<arrow::RecordBatch> batch, const TBatchSplitttingContext& context); - TSerializedBatch(TString&& schemaData, TString&& data, const ui32 rowsCount, const ui32 rawBytes, const TFirstLastSpecialKeys& specialKeys) + TSerializedBatch(TString&& schemaData, TString&& data, const ui32 rowsCount, const ui32 rawBytes, const std::optional<TFirstLastSpecialKeys>& specialKeys) : SchemaData(schemaData) , Data(data) , RowsCount(rowsCount) @@ -101,7 +126,7 @@ public: } }; -TSplitBlobResult SplitByBlobSize(const std::shared_ptr<arrow::RecordBatch>& batch, const ui32 sizeLimit); +TSplitBlobResult SplitByBlobSize(const std::shared_ptr<arrow::RecordBatch>& batch, const TBatchSplitttingContext& context); // Return size in bytes including size of bitmap mask ui64 GetBatchDataSize(const std::shared_ptr<arrow::RecordBatch>& batch); diff --git a/ydb/core/formats/arrow/special_keys.cpp b/ydb/core/formats/arrow/special_keys.cpp index 9e86893fb6e..83908f38ad1 100644 --- a/ydb/core/formats/arrow/special_keys.cpp +++ b/ydb/core/formats/arrow/special_keys.cpp @@ -32,7 +32,7 @@ TString TSpecialKeys::SerializeToStringDataOnlyNoCompression() const { return NArrow::SerializeBatchNoCompression(Data); } -TFirstLastSpecialKeys::TFirstLastSpecialKeys(std::shared_ptr<arrow::RecordBatch> batch, const std::vector<TString>& columnNames /*= {}*/) { +TFirstLastSpecialKeys::TFirstLastSpecialKeys(const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<TString>& columnNames /*= {}*/) { Y_ABORT_UNLESS(batch); Y_ABORT_UNLESS(batch->num_rows()); std::shared_ptr<arrow::RecordBatch> keyBatch = batch; diff --git a/ydb/core/formats/arrow/special_keys.h b/ydb/core/formats/arrow/special_keys.h index bfa4c7275ad..e777c5511c2 100644 --- a/ydb/core/formats/arrow/special_keys.h +++ b/ydb/core/formats/arrow/special_keys.h @@ -58,7 +58,7 @@ public: { Y_ABORT_UNLESS(Data->num_rows() == 1 || Data->num_rows() == 2); } - explicit TFirstLastSpecialKeys(std::shared_ptr<arrow::RecordBatch> batch, const std::vector<TString>& columnNames = {}); + explicit TFirstLastSpecialKeys(const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<TString>& columnNames = {}); }; class TMinMaxSpecialKeys: public TSpecialKeys { diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index b8e87f5d9b6..acf7fe151a1 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -177,7 +177,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex << " at tablet " << TabletID()); writeData.MutableWriteMeta().SetWriteMiddle1StartInstant(TMonotonic::Now()); std::shared_ptr<NConveyor::ITask> task = std::make_shared<NOlap::TBuildSlicesTask>(TabletID(), SelfId(), - StoragesManager->GetInsertOperator()->StartWritingAction("WRITING"), writeData); + StoragesManager->GetInsertOperator()->StartWritingAction("WRITING"), writeData, snapshotSchema->GetIndexInfo().GetReplaceKey()); NConveyor::TInsertServiceOperator::AsyncTaskToExecute(task); } } diff --git a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp index ca7cbfe4a30..a02d99f18ba 100644 --- a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp +++ b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp @@ -14,7 +14,7 @@ TIndexedWriteController::TIndexedWriteController(const TActorId& dstActor, const { for (auto&& bInfo : BlobsSplitted) { auto& task = AddWriteTask(TBlobWriteInfo::BuildWriteTask(bInfo.GetData(), Action)); - BlobData.emplace_back(TBlobRange::FromBlobId(task.GetBlobId()), bInfo.GetSpecialKeys(), bInfo.GetRowsCount(), bInfo.GetRawBytes(), AppData()->TimeProvider->Now()); + BlobData.emplace_back(TBlobRange::FromBlobId(task.GetBlobId()), bInfo.GetSpecialKeysSafe(), bInfo.GetRowsCount(), bInfo.GetRawBytes(), AppData()->TimeProvider->Now()); } ResourceUsage.SourceMemorySize = WriteData.GetSize(); } diff --git a/ydb/core/tx/columnshard/operations/slice_builder.cpp b/ydb/core/tx/columnshard/operations/slice_builder.cpp index 14792dacbbf..ddcc19332c8 100644 --- a/ydb/core/tx/columnshard/operations/slice_builder.cpp +++ b/ydb/core/tx/columnshard/operations/slice_builder.cpp @@ -19,7 +19,9 @@ std::optional<std::vector<NKikimr::NArrow::TSerializedBatch>> TBuildSlicesTask:: return {}; } - auto splitResult = NArrow::SplitByBlobSize(batch, NColumnShard::TLimits::GetMaxBlobSize()); + NArrow::TBatchSplitttingContext context(NColumnShard::TLimits::GetMaxBlobSize()); + context.SetFieldsForSpecialKeys(PrimaryKeySchema); + auto splitResult = NArrow::SplitByBlobSize(batch, context); if (!splitResult) { AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", TStringBuilder() << "cannot split batch in according to limits: " + splitResult.GetErrorMessage()); return {}; diff --git a/ydb/core/tx/columnshard/operations/slice_builder.h b/ydb/core/tx/columnshard/operations/slice_builder.h index 35ab257f15d..efd26eacb9b 100644 --- a/ydb/core/tx/columnshard/operations/slice_builder.h +++ b/ydb/core/tx/columnshard/operations/slice_builder.h @@ -13,7 +13,7 @@ private: const ui64 TabletId; const NActors::TActorId ParentActorId; std::optional<std::vector<NArrow::TSerializedBatch>> BuildSlices(); - + std::shared_ptr<arrow::Schema> PrimaryKeySchema; protected: virtual bool DoExecute() override; public: @@ -21,11 +21,13 @@ public: return "Write::ConstructBlobs::Slices"; } - TBuildSlicesTask(const ui64 tabletId, const NActors::TActorId parentActorId, const std::shared_ptr<IBlobsWritingAction>& action, const NEvWrite::TWriteData& writeData) + TBuildSlicesTask(const ui64 tabletId, const NActors::TActorId parentActorId, const std::shared_ptr<IBlobsWritingAction>& action, + const NEvWrite::TWriteData& writeData, const std::shared_ptr<arrow::Schema>& primaryKeySchema) : Action(action) , WriteData(writeData) , TabletId(tabletId) , ParentActorId(parentActorId) + , PrimaryKeySchema(primaryKeySchema) { Y_ABORT_UNLESS(Action); } diff --git a/ydb/core/tx/columnshard/operations/write.cpp b/ydb/core/tx/columnshard/operations/write.cpp index c8ed9946a32..7e023405b45 100644 --- a/ydb/core/tx/columnshard/operations/write.cpp +++ b/ydb/core/tx/columnshard/operations/write.cpp @@ -26,7 +26,7 @@ namespace NKikimr::NColumnShard { NEvWrite::TWriteMeta writeMeta((ui64)WriteId, tableId, source); std::shared_ptr<NConveyor::ITask> task = std::make_shared<NOlap::TBuildSlicesTask>(owner.TabletID(), ctx.SelfID, - owner.StoragesManager->GetInsertOperator()->StartWritingAction("WRITING_OPERATOR"), NEvWrite::TWriteData(writeMeta, data)); + owner.StoragesManager->GetInsertOperator()->StartWritingAction("WRITING_OPERATOR"), NEvWrite::TWriteData(writeMeta, data), owner.TablesManager.GetPrimaryIndex()->GetReplaceKey()); NConveyor::TCompServiceOperator::SendTaskToExecute(task); Status = EOperationStatus::Started; diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp index 28a7809389c..d952eb745e6 100644 --- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp +++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp @@ -2685,9 +2685,10 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { } Cerr << "compacted=" << sumCompactedRows << ";inserted=" << sumInsertedRows << ";expected=" << fullNumRows << ";" << Endl; RebootTablet(runtime, TTestTxConfig::TxTablet0, sender); - UNIT_ASSERT(sumCompactedRows + sumInsertedRows == fullNumRows); - UNIT_ASSERT(sumCompactedRows > sumInsertedRows); - UNIT_ASSERT(sumCompactedBytes > sumInsertedBytes); + UNIT_ASSERT(sumCompactedRows == fullNumRows); + UNIT_ASSERT(sumCompactedRows < sumCompactedBytes); + UNIT_ASSERT(sumInsertedRows == 0); + UNIT_ASSERT(sumInsertedBytes == 0); } } diff --git a/ydb/core/tx/ev_write/columnshard_splitter.h b/ydb/core/tx/ev_write/columnshard_splitter.h index 3c392bb74a0..264697973ad 100644 --- a/ydb/core/tx/ev_write/columnshard_splitter.h +++ b/ydb/core/tx/ev_write/columnshard_splitter.h @@ -117,7 +117,8 @@ private: TFullSplitData result(numShards); if (numShards == 1) { - NArrow::TSplitBlobResult blobsSplitted = NArrow::SplitByBlobSize(batch, NColumnShard::TLimits::GetMaxBlobSize() * 0.875); + NArrow::TBatchSplitttingContext context(NColumnShard::TLimits::GetMaxBlobSize() * 0.875); + NArrow::TSplitBlobResult blobsSplitted = NArrow::SplitByBlobSize(batch, context); if (!blobsSplitted) { return TYdbConclusionStatus::Fail(Ydb::StatusIds::SCHEME_ERROR, "cannot split batch in according to limits: " + blobsSplitted.GetErrorMessage()); } @@ -145,7 +146,8 @@ private: THashMap<ui64, TString> out; for (size_t i = 0; i < sharded.size(); ++i) { if (sharded[i]) { - NArrow::TSplitBlobResult blobsSplitted = NArrow::SplitByBlobSize(sharded[i], NColumnShard::TLimits::GetMaxBlobSize() * 0.875); + NArrow::TBatchSplitttingContext context(NColumnShard::TLimits::GetMaxBlobSize() * 0.875); + NArrow::TSplitBlobResult blobsSplitted = NArrow::SplitByBlobSize(sharded[i], context); if (!blobsSplitted) { return TYdbConclusionStatus::Fail(Ydb::StatusIds::SCHEME_ERROR, "cannot split batch in according to limits: " + blobsSplitted.GetErrorMessage()); } |