summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorivanmorozov <[email protected]>2023-10-24 11:43:41 +0300
committerivanmorozov <[email protected]>2023-10-24 12:21:51 +0300
commitf19c91a9b704d458b93485a4df46957199b777f7 (patch)
treec7f16e4dda938c31462a862f915ec71235ae4547
parent1e2c03467416b388d4ba5520eb43098143058796 (diff)
KIKIMR-19825: use splitting context for provide info about first/last keys necessary and fields contains
-rw-r--r--ydb/core/formats/arrow/permutations.cpp13
-rw-r--r--ydb/core/formats/arrow/size_calcer.cpp43
-rw-r--r--ydb/core/formats/arrow/size_calcer.h41
-rw-r--r--ydb/core/formats/arrow/special_keys.cpp2
-rw-r--r--ydb/core/formats/arrow/special_keys.h2
-rw-r--r--ydb/core/tx/columnshard/columnshard__write.cpp2
-rw-r--r--ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp2
-rw-r--r--ydb/core/tx/columnshard/operations/slice_builder.cpp4
-rw-r--r--ydb/core/tx/columnshard/operations/slice_builder.h6
-rw-r--r--ydb/core/tx/columnshard/operations/write.cpp2
-rw-r--r--ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp7
-rw-r--r--ydb/core/tx/ev_write/columnshard_splitter.h6
12 files changed, 87 insertions, 43 deletions
diff --git a/ydb/core/formats/arrow/permutations.cpp b/ydb/core/formats/arrow/permutations.cpp
index ef11ab2c498..a6e75019f7c 100644
--- a/ydb/core/formats/arrow/permutations.cpp
+++ b/ydb/core/formats/arrow/permutations.cpp
@@ -142,14 +142,23 @@ std::shared_ptr<arrow::Array> CopyRecords(const std::shared_ptr<arrow::Array>& s
std::unique_ptr<arrow::ArrayBuilder> builder;
TStatusValidator::Validate(arrow::MakeBuilder(arrow::default_memory_pool(), source->type(), &builder));
+ auto& builderImpl = static_cast<TBuilder&>(*builder);
+
+ if constexpr (arrow::has_string_view<typename TWrap::T>::value) {
+ ui64 sumByIndexes = 0;
+ for (auto&& idx : indexes) {
+ sumByIndexes += column.GetView(idx).size();
+ }
+ TStatusValidator::Validate(builderImpl.ReserveData(sumByIndexes));
+ }
+
TStatusValidator::Validate(builder->Reserve(indexes.size()));
{
- auto& builderImpl = static_cast<TBuilder&>(*builder);
const ui32 arraySize = column.length();
for (auto&& i : indexes) {
Y_ABORT_UNLESS(i < arraySize);
- TStatusValidator::Validate(builderImpl.Append(column.GetView(i)));
+ builderImpl.UnsafeAppend(column.GetView(i));
}
}
diff --git a/ydb/core/formats/arrow/size_calcer.cpp b/ydb/core/formats/arrow/size_calcer.cpp
index 1e0acf4484e..6b35f8eeadb 100644
--- a/ydb/core/formats/arrow/size_calcer.cpp
+++ b/ydb/core/formats/arrow/size_calcer.cpp
@@ -8,11 +8,11 @@
namespace NKikimr::NArrow {
-TSplitBlobResult SplitByBlobSize(const std::shared_ptr<arrow::RecordBatch>& batch, const ui32 sizeLimit) {
+TSplitBlobResult SplitByBlobSize(const std::shared_ptr<arrow::RecordBatch>& batch, const TBatchSplitttingContext& context) {
std::vector<TSerializedBatch> resultLocal;
TString errorMessage;
- if (GetBatchDataSize(batch) <= sizeLimit) {
- if (!TSerializedBatch::BuildWithLimit(batch, sizeLimit, resultLocal, &errorMessage)) {
+ if (GetBatchDataSize(batch) <= context.GetSizeLimit()) {
+ if (!TSerializedBatch::BuildWithLimit(batch, context, resultLocal, &errorMessage)) {
return TSplitBlobResult("full batch splitting: " + errorMessage);
} else {
return TSplitBlobResult(std::move(resultLocal));
@@ -26,14 +26,14 @@ TSplitBlobResult SplitByBlobSize(const std::shared_ptr<arrow::RecordBatch>& batc
ui32 startIdx = 0;
for (ui32 i = 0; i < batch->num_rows(); ++i) {
const ui32 rowSize = rowCalculator.GetRowBytesSize(i);
- if (rowSize > sizeLimit) {
- return TSplitBlobResult("there is row with size more then limit (" + ::ToString(sizeLimit) + ")");
+ if (rowSize > context.GetSizeLimit()) {
+ return TSplitBlobResult("there is row with size more then limit (" + ::ToString(context.GetSizeLimit()) + ")");
}
- if (rowCalculator.GetApproxSerializeSize(currentSize + rowSize) > sizeLimit) {
+ if (rowCalculator.GetApproxSerializeSize(currentSize + rowSize) > context.GetSizeLimit()) {
if (!currentSize) {
- return TSplitBlobResult("there is row with size + metadata more then limit (" + ::ToString(sizeLimit) + ")");
+ return TSplitBlobResult("there is row with size + metadata more then limit (" + ::ToString(context.GetSizeLimit()) + ")");
}
- if (!TSerializedBatch::BuildWithLimit(batch->Slice(startIdx, i - startIdx), sizeLimit, resultLocal, &errorMessage)) {
+ if (!TSerializedBatch::BuildWithLimit(batch->Slice(startIdx, i - startIdx), context, resultLocal, &errorMessage)) {
return TSplitBlobResult("cannot build blobs for batch slice (" + ::ToString(i - startIdx) + " rows): " + errorMessage);
}
currentSize = 0;
@@ -42,7 +42,7 @@ TSplitBlobResult SplitByBlobSize(const std::shared_ptr<arrow::RecordBatch>& batc
currentSize += rowSize;
}
if (currentSize) {
- if (!TSerializedBatch::BuildWithLimit(batch->Slice(startIdx, batch->num_rows() - startIdx), sizeLimit, resultLocal, &errorMessage)) {
+ if (!TSerializedBatch::BuildWithLimit(batch->Slice(startIdx, batch->num_rows() - startIdx), context, resultLocal, &errorMessage)) {
return TSplitBlobResult("cannot build blobs for last batch slice (" + ::ToString(batch->num_rows() - startIdx) + " rows): " + errorMessage);
}
}
@@ -214,15 +214,18 @@ ui64 GetArrayDataSize(const std::shared_ptr<arrow::Array>& column) {
return bytes;
}
-NKikimr::NArrow::TSerializedBatch TSerializedBatch::Build(std::shared_ptr<arrow::RecordBatch> batch) {
- TFirstLastSpecialKeys specialKeys(batch);
+NKikimr::NArrow::TSerializedBatch TSerializedBatch::Build(std::shared_ptr<arrow::RecordBatch> batch, const TBatchSplitttingContext& context) {
+ std::optional<TFirstLastSpecialKeys> specialKeys;
+ if (context.GetFieldsForSpecialKeys().size()) {
+ specialKeys = TFirstLastSpecialKeys(batch, context.GetFieldsForSpecialKeys());
+ }
return TSerializedBatch(NArrow::SerializeSchema(*batch->schema()), NArrow::SerializeBatchNoCompression(batch), batch->num_rows(), NArrow::GetBatchDataSize(batch), specialKeys);
}
-bool TSerializedBatch::BuildWithLimit(std::shared_ptr<arrow::RecordBatch> batch, const ui32 sizeLimit, std::optional<TSerializedBatch>& sbL, std::optional<TSerializedBatch>& sbR, TString* errorMessage) {
- TSerializedBatch sb = TSerializedBatch::Build(batch);
+bool TSerializedBatch::BuildWithLimit(std::shared_ptr<arrow::RecordBatch> batch, const TBatchSplitttingContext& context, std::optional<TSerializedBatch>& sbL, std::optional<TSerializedBatch>& sbR, TString* errorMessage) {
+ TSerializedBatch sb = TSerializedBatch::Build(batch, context);
const ui32 length = batch->num_rows();
- if (sb.GetSize() <= sizeLimit) {
+ if (sb.GetSize() <= context.GetSizeLimit()) {
sbL = std::move(sb);
return true;
} else if (length == 1) {
@@ -232,13 +235,13 @@ bool TSerializedBatch::BuildWithLimit(std::shared_ptr<arrow::RecordBatch> batch,
return false;
} else {
const ui32 delta = length / 2;
- TSerializedBatch localSbL = TSerializedBatch::Build(batch->Slice(0, delta));
- TSerializedBatch localSbR = TSerializedBatch::Build(batch->Slice(delta, length - delta));
- if (localSbL.GetSize() > sizeLimit || localSbR.GetSize() > sizeLimit) {
+ TSerializedBatch localSbL = TSerializedBatch::Build(batch->Slice(0, delta), context);
+ TSerializedBatch localSbR = TSerializedBatch::Build(batch->Slice(delta, length - delta), context);
+ if (localSbL.GetSize() > context.GetSizeLimit() || localSbR.GetSize() > context.GetSizeLimit()) {
if (errorMessage) {
*errorMessage = TStringBuilder() << "original batch too big: " << sb.GetSize() << " and after 2 parts split we have: "
<< localSbL.GetSize() << "(" << localSbL.GetRowsCount() << ")" << " / "
- << localSbR.GetSize() << "(" << localSbR.GetRowsCount() << ")" << " part sizes. Its unexpected for limit " << sizeLimit;
+ << localSbR.GetSize() << "(" << localSbR.GetRowsCount() << ")" << " part sizes. Its unexpected for limit " << context.GetSizeLimit();
}
return false;
}
@@ -248,10 +251,10 @@ bool TSerializedBatch::BuildWithLimit(std::shared_ptr<arrow::RecordBatch> batch,
}
}
-bool TSerializedBatch::BuildWithLimit(std::shared_ptr<arrow::RecordBatch> batch, const ui32 sizeLimit, std::vector<TSerializedBatch>& result, TString* errorMessage) {
+bool TSerializedBatch::BuildWithLimit(std::shared_ptr<arrow::RecordBatch> batch, const TBatchSplitttingContext& context, std::vector<TSerializedBatch>& result, TString* errorMessage) {
std::optional<TSerializedBatch> sbL;
std::optional<TSerializedBatch> sbR;
- if (!TSerializedBatch::BuildWithLimit(batch, sizeLimit, sbL, sbR, errorMessage)) {
+ if (!TSerializedBatch::BuildWithLimit(batch, context, sbL, sbR, errorMessage)) {
return false;
}
if (sbL) {
diff --git a/ydb/core/formats/arrow/size_calcer.h b/ydb/core/formats/arrow/size_calcer.h
index 4d14c0cf5aa..259a93836ae 100644
--- a/ydb/core/formats/arrow/size_calcer.h
+++ b/ydb/core/formats/arrow/size_calcer.h
@@ -46,29 +46,54 @@ public:
ui32 GetRowBytesSize(const ui32 row) const;
};
+class TBatchSplitttingContext {
+private:
+ YDB_ACCESSOR(ui64, SizeLimit, 6 * 1024 * 1024);
+ YDB_ACCESSOR_DEF(std::vector<TString>, FieldsForSpecialKeys);
+public:
+ explicit TBatchSplitttingContext(const ui64 size)
+ : SizeLimit(size)
+ {
+
+ }
+
+ void SetFieldsForSpecialKeys(const std::shared_ptr<arrow::Schema>& schema) {
+ std::vector<TString> local;
+ for (auto&& i : schema->fields()) {
+ local.emplace_back(i->name());
+ }
+ std::swap(local, FieldsForSpecialKeys);
+ }
+};
+
class TSerializedBatch {
private:
YDB_READONLY_DEF(TString, SchemaData);
YDB_READONLY_DEF(TString, Data);
YDB_READONLY(ui32, RowsCount, 0);
YDB_READONLY(ui32, RawBytes, 0);
- TFirstLastSpecialKeys SpecialKeys;
+ std::optional<TFirstLastSpecialKeys> SpecialKeys;
public:
size_t GetSize() const {
return Data.size();
}
- const TFirstLastSpecialKeys& GetSpecialKeys() const {
- return SpecialKeys;
+ const TFirstLastSpecialKeys& GetSpecialKeysSafe() const {
+ AFL_VERIFY(SpecialKeys);
+ return *SpecialKeys;
+ }
+
+ bool HasSpecialKeys() const {
+ return !!SpecialKeys;
}
TString DebugString() const;
- static bool BuildWithLimit(std::shared_ptr<arrow::RecordBatch> batch, const ui32 sizeLimit, std::vector<TSerializedBatch>& result, TString* errorMessage);
- static bool BuildWithLimit(std::shared_ptr<arrow::RecordBatch> batch, const ui32 sizeLimit, std::optional<TSerializedBatch>& sbL, std::optional<TSerializedBatch>& sbR, TString* errorMessage);
- static TSerializedBatch Build(std::shared_ptr<arrow::RecordBatch> batch);
+ static bool BuildWithLimit(std::shared_ptr<arrow::RecordBatch> batch, const TBatchSplitttingContext& context, std::vector<TSerializedBatch>& result, TString* errorMessage);
+ static bool BuildWithLimit(std::shared_ptr<arrow::RecordBatch> batch, const TBatchSplitttingContext& context, std::optional<TSerializedBatch>& sbL, std::optional<TSerializedBatch>& sbR, TString* errorMessage);
+ static TSerializedBatch Build(std::shared_ptr<arrow::RecordBatch> batch, const TBatchSplitttingContext& context);
- TSerializedBatch(TString&& schemaData, TString&& data, const ui32 rowsCount, const ui32 rawBytes, const TFirstLastSpecialKeys& specialKeys)
+ TSerializedBatch(TString&& schemaData, TString&& data, const ui32 rowsCount, const ui32 rawBytes, const std::optional<TFirstLastSpecialKeys>& specialKeys)
: SchemaData(schemaData)
, Data(data)
, RowsCount(rowsCount)
@@ -101,7 +126,7 @@ public:
}
};
-TSplitBlobResult SplitByBlobSize(const std::shared_ptr<arrow::RecordBatch>& batch, const ui32 sizeLimit);
+TSplitBlobResult SplitByBlobSize(const std::shared_ptr<arrow::RecordBatch>& batch, const TBatchSplitttingContext& context);
// Return size in bytes including size of bitmap mask
ui64 GetBatchDataSize(const std::shared_ptr<arrow::RecordBatch>& batch);
diff --git a/ydb/core/formats/arrow/special_keys.cpp b/ydb/core/formats/arrow/special_keys.cpp
index 9e86893fb6e..83908f38ad1 100644
--- a/ydb/core/formats/arrow/special_keys.cpp
+++ b/ydb/core/formats/arrow/special_keys.cpp
@@ -32,7 +32,7 @@ TString TSpecialKeys::SerializeToStringDataOnlyNoCompression() const {
return NArrow::SerializeBatchNoCompression(Data);
}
-TFirstLastSpecialKeys::TFirstLastSpecialKeys(std::shared_ptr<arrow::RecordBatch> batch, const std::vector<TString>& columnNames /*= {}*/) {
+TFirstLastSpecialKeys::TFirstLastSpecialKeys(const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<TString>& columnNames /*= {}*/) {
Y_ABORT_UNLESS(batch);
Y_ABORT_UNLESS(batch->num_rows());
std::shared_ptr<arrow::RecordBatch> keyBatch = batch;
diff --git a/ydb/core/formats/arrow/special_keys.h b/ydb/core/formats/arrow/special_keys.h
index bfa4c7275ad..e777c5511c2 100644
--- a/ydb/core/formats/arrow/special_keys.h
+++ b/ydb/core/formats/arrow/special_keys.h
@@ -58,7 +58,7 @@ public:
{
Y_ABORT_UNLESS(Data->num_rows() == 1 || Data->num_rows() == 2);
}
- explicit TFirstLastSpecialKeys(std::shared_ptr<arrow::RecordBatch> batch, const std::vector<TString>& columnNames = {});
+ explicit TFirstLastSpecialKeys(const std::shared_ptr<arrow::RecordBatch>& batch, const std::vector<TString>& columnNames = {});
};
class TMinMaxSpecialKeys: public TSpecialKeys {
diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp
index b8e87f5d9b6..acf7fe151a1 100644
--- a/ydb/core/tx/columnshard/columnshard__write.cpp
+++ b/ydb/core/tx/columnshard/columnshard__write.cpp
@@ -177,7 +177,7 @@ void TColumnShard::Handle(TEvColumnShard::TEvWrite::TPtr& ev, const TActorContex
<< " at tablet " << TabletID());
writeData.MutableWriteMeta().SetWriteMiddle1StartInstant(TMonotonic::Now());
std::shared_ptr<NConveyor::ITask> task = std::make_shared<NOlap::TBuildSlicesTask>(TabletID(), SelfId(),
- StoragesManager->GetInsertOperator()->StartWritingAction("WRITING"), writeData);
+ StoragesManager->GetInsertOperator()->StartWritingAction("WRITING"), writeData, snapshotSchema->GetIndexInfo().GetReplaceKey());
NConveyor::TInsertServiceOperator::AsyncTaskToExecute(task);
}
}
diff --git a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp
index ca7cbfe4a30..a02d99f18ba 100644
--- a/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp
+++ b/ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.cpp
@@ -14,7 +14,7 @@ TIndexedWriteController::TIndexedWriteController(const TActorId& dstActor, const
{
for (auto&& bInfo : BlobsSplitted) {
auto& task = AddWriteTask(TBlobWriteInfo::BuildWriteTask(bInfo.GetData(), Action));
- BlobData.emplace_back(TBlobRange::FromBlobId(task.GetBlobId()), bInfo.GetSpecialKeys(), bInfo.GetRowsCount(), bInfo.GetRawBytes(), AppData()->TimeProvider->Now());
+ BlobData.emplace_back(TBlobRange::FromBlobId(task.GetBlobId()), bInfo.GetSpecialKeysSafe(), bInfo.GetRowsCount(), bInfo.GetRawBytes(), AppData()->TimeProvider->Now());
}
ResourceUsage.SourceMemorySize = WriteData.GetSize();
}
diff --git a/ydb/core/tx/columnshard/operations/slice_builder.cpp b/ydb/core/tx/columnshard/operations/slice_builder.cpp
index 14792dacbbf..ddcc19332c8 100644
--- a/ydb/core/tx/columnshard/operations/slice_builder.cpp
+++ b/ydb/core/tx/columnshard/operations/slice_builder.cpp
@@ -19,7 +19,9 @@ std::optional<std::vector<NKikimr::NArrow::TSerializedBatch>> TBuildSlicesTask::
return {};
}
- auto splitResult = NArrow::SplitByBlobSize(batch, NColumnShard::TLimits::GetMaxBlobSize());
+ NArrow::TBatchSplitttingContext context(NColumnShard::TLimits::GetMaxBlobSize());
+ context.SetFieldsForSpecialKeys(PrimaryKeySchema);
+ auto splitResult = NArrow::SplitByBlobSize(batch, context);
if (!splitResult) {
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", TStringBuilder() << "cannot split batch in according to limits: " + splitResult.GetErrorMessage());
return {};
diff --git a/ydb/core/tx/columnshard/operations/slice_builder.h b/ydb/core/tx/columnshard/operations/slice_builder.h
index 35ab257f15d..efd26eacb9b 100644
--- a/ydb/core/tx/columnshard/operations/slice_builder.h
+++ b/ydb/core/tx/columnshard/operations/slice_builder.h
@@ -13,7 +13,7 @@ private:
const ui64 TabletId;
const NActors::TActorId ParentActorId;
std::optional<std::vector<NArrow::TSerializedBatch>> BuildSlices();
-
+ std::shared_ptr<arrow::Schema> PrimaryKeySchema;
protected:
virtual bool DoExecute() override;
public:
@@ -21,11 +21,13 @@ public:
return "Write::ConstructBlobs::Slices";
}
- TBuildSlicesTask(const ui64 tabletId, const NActors::TActorId parentActorId, const std::shared_ptr<IBlobsWritingAction>& action, const NEvWrite::TWriteData& writeData)
+ TBuildSlicesTask(const ui64 tabletId, const NActors::TActorId parentActorId, const std::shared_ptr<IBlobsWritingAction>& action,
+ const NEvWrite::TWriteData& writeData, const std::shared_ptr<arrow::Schema>& primaryKeySchema)
: Action(action)
, WriteData(writeData)
, TabletId(tabletId)
, ParentActorId(parentActorId)
+ , PrimaryKeySchema(primaryKeySchema)
{
Y_ABORT_UNLESS(Action);
}
diff --git a/ydb/core/tx/columnshard/operations/write.cpp b/ydb/core/tx/columnshard/operations/write.cpp
index c8ed9946a32..7e023405b45 100644
--- a/ydb/core/tx/columnshard/operations/write.cpp
+++ b/ydb/core/tx/columnshard/operations/write.cpp
@@ -26,7 +26,7 @@ namespace NKikimr::NColumnShard {
NEvWrite::TWriteMeta writeMeta((ui64)WriteId, tableId, source);
std::shared_ptr<NConveyor::ITask> task = std::make_shared<NOlap::TBuildSlicesTask>(owner.TabletID(), ctx.SelfID,
- owner.StoragesManager->GetInsertOperator()->StartWritingAction("WRITING_OPERATOR"), NEvWrite::TWriteData(writeMeta, data));
+ owner.StoragesManager->GetInsertOperator()->StartWritingAction("WRITING_OPERATOR"), NEvWrite::TWriteData(writeMeta, data), owner.TablesManager.GetPrimaryIndex()->GetReplaceKey());
NConveyor::TCompServiceOperator::SendTaskToExecute(task);
Status = EOperationStatus::Started;
diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
index 28a7809389c..d952eb745e6 100644
--- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
+++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp
@@ -2685,9 +2685,10 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) {
}
Cerr << "compacted=" << sumCompactedRows << ";inserted=" << sumInsertedRows << ";expected=" << fullNumRows << ";" << Endl;
RebootTablet(runtime, TTestTxConfig::TxTablet0, sender);
- UNIT_ASSERT(sumCompactedRows + sumInsertedRows == fullNumRows);
- UNIT_ASSERT(sumCompactedRows > sumInsertedRows);
- UNIT_ASSERT(sumCompactedBytes > sumInsertedBytes);
+ UNIT_ASSERT(sumCompactedRows == fullNumRows);
+ UNIT_ASSERT(sumCompactedRows < sumCompactedBytes);
+ UNIT_ASSERT(sumInsertedRows == 0);
+ UNIT_ASSERT(sumInsertedBytes == 0);
}
}
diff --git a/ydb/core/tx/ev_write/columnshard_splitter.h b/ydb/core/tx/ev_write/columnshard_splitter.h
index 3c392bb74a0..264697973ad 100644
--- a/ydb/core/tx/ev_write/columnshard_splitter.h
+++ b/ydb/core/tx/ev_write/columnshard_splitter.h
@@ -117,7 +117,8 @@ private:
TFullSplitData result(numShards);
if (numShards == 1) {
- NArrow::TSplitBlobResult blobsSplitted = NArrow::SplitByBlobSize(batch, NColumnShard::TLimits::GetMaxBlobSize() * 0.875);
+ NArrow::TBatchSplitttingContext context(NColumnShard::TLimits::GetMaxBlobSize() * 0.875);
+ NArrow::TSplitBlobResult blobsSplitted = NArrow::SplitByBlobSize(batch, context);
if (!blobsSplitted) {
return TYdbConclusionStatus::Fail(Ydb::StatusIds::SCHEME_ERROR, "cannot split batch in according to limits: " + blobsSplitted.GetErrorMessage());
}
@@ -145,7 +146,8 @@ private:
THashMap<ui64, TString> out;
for (size_t i = 0; i < sharded.size(); ++i) {
if (sharded[i]) {
- NArrow::TSplitBlobResult blobsSplitted = NArrow::SplitByBlobSize(sharded[i], NColumnShard::TLimits::GetMaxBlobSize() * 0.875);
+ NArrow::TBatchSplitttingContext context(NColumnShard::TLimits::GetMaxBlobSize() * 0.875);
+ NArrow::TSplitBlobResult blobsSplitted = NArrow::SplitByBlobSize(sharded[i], context);
if (!blobsSplitted) {
return TYdbConclusionStatus::Fail(Ydb::StatusIds::SCHEME_ERROR, "cannot split batch in according to limits: " + blobsSplitted.GetErrorMessage());
}