aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorssmike <ssmike@ydb.tech>2023-01-31 03:05:48 +0300
committerssmike <ssmike@ydb.tech>2023-01-31 03:05:48 +0300
commit58f259d0f1f2997f29bab23cee45b6fdfb2792d8 (patch)
tree5b00f4fa96d738379b2540437b1a35d171c54610
parente1e1321e25787797767e2819e5eb3f06f3502733 (diff)
downloadydb-58f259d0f1f2997f29bab23cee45b6fdfb2792d8.tar.gz
prepare to enable readranges source in trunk
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp5
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h12
-rw-r--r--ydb/core/kqp/runtime/kqp_read_actor.cpp135
-rw-r--r--ydb/core/protos/tx_datashard.proto4
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h2
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h7
6 files changed, 113 insertions, 52 deletions
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index c85dc3882c..801de6ac37 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -1661,8 +1661,9 @@ private:
if (i64 msc = (i64) RequestControls.MaxShardCount; msc > 0) {
shardsLimit = std::min(shardsLimit, (ui32) msc);
}
- if (shardsLimit > 0 && datashardTasks.size() > shardsLimit) {
- LOG_W("Too many affected shards: datashardTasks=" << datashardTasks.size() << ", limit: " << shardsLimit);
+ size_t shards = datashardTasks.size() + remoteComputeTasks.size();
+ if (shardsLimit > 0 && shards > shardsLimit) {
+ LOG_W("Too many affected shards: datashardTasks=" << shards << ", limit: " << shardsLimit);
Counters->TxProxyMon->TxResultError->Inc();
ReplyErrorAndDie(Ydb::StatusIds::PRECONDITION_FAILED,
YqlIssue({}, TIssuesIds::KIKIMR_PRECONDITION_FAILED, TStringBuilder()
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index 34dea52606..6651813e1d 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -651,7 +651,7 @@ protected:
}
}
- void BuildScanTasksFromSource(TStageInfo& stageInfo, IKqpGateway::TKqpSnapshot snapshot, const TMaybe<ui64> lockTxId = {}) {
+ size_t BuildScanTasksFromSource(TStageInfo& stageInfo, IKqpGateway::TKqpSnapshot snapshot, const TMaybe<ui64> lockTxId = {}) {
THashMap<ui64, std::vector<ui64>> nodeTasks;
THashMap<ui64, ui64> assignedShardsCount;
@@ -695,6 +695,12 @@ protected:
FillTableMeta(stageInfo, settings.MutableTable());
for (auto& keyColumn : keyTypes) {
+ auto columnType = NScheme::ProtoColumnTypeFromTypeInfo(keyColumn);
+ if (columnType.TypeInfo) {
+ *settings.AddKeyColumnTypeInfos() = *columnType.TypeInfo;
+ } else {
+ *settings.AddKeyColumnTypeInfos() = NKikimrProto::TTypeInfo();
+ }
settings.AddKeyColumnTypes(static_cast<ui32>(keyColumn.GetTypeId()));
}
@@ -725,6 +731,9 @@ protected:
settings.SetSorted(source.GetSorted());
settings.SetShardIdHint(shardId);
+ if (Stats) {
+ Stats->AffectedShards.insert(shardId);
+ }
ExtractItemsLimit(stageInfo, source.GetItemsLimit(), Request.TxAlloc->HolderFactory,
Request.TxAlloc->TypeEnv, itemsLimit, itemsLimitParamName, itemsLimitBytes, itemsLimitType);
@@ -744,6 +753,7 @@ protected:
taskSourceSettings->PackFrom(settings);
input.SourceType = NYql::KqpReadRangesSourceName;
}
+ return partitions.size();
}
protected:
diff --git a/ydb/core/kqp/runtime/kqp_read_actor.cpp b/ydb/core/kqp/runtime/kqp_read_actor.cpp
index 65a2a00801..09bd872c4d 100644
--- a/ydb/core/kqp/runtime/kqp_read_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_read_actor.cpp
@@ -31,6 +31,7 @@ bool IsDebugLogEnabled(const NActors::TActorSystem* actorSystem, NActors::NLog::
}
+
namespace NKikimr {
namespace NKqp {
@@ -43,6 +44,19 @@ using namespace NKikimr::NDataShard;
class TKqpReadActor : public TActorBootstrapped<TKqpReadActor>, public NYql::NDq::IDqComputeActorAsyncInput {
using TBase = TActorBootstrapped<TKqpReadActor>;
public:
+ struct TResult {
+ ui64 ShardId;
+ THolder<TEventHandle<TEvDataShard::TEvReadResult>> ReadResult;
+ TMaybe<NKikimr::NMiniKQL::TUnboxedValueVector> Batch;
+ size_t ProcessedRows = 0;
+
+ TResult(ui64 shardId, THolder<TEventHandle<TEvDataShard::TEvReadResult>> readResult)
+ : ShardId(shardId)
+ , ReadResult(std::move(readResult))
+ {
+ }
+ };
+
struct TShardState : public TIntrusiveListItem<TShardState> {
TSmallVec<TSerializedTableRange> Ranges;
TSmallVec<TSerializedCellVec> Points;
@@ -261,8 +275,15 @@ public:
);
KeyColumnTypes.reserve(Settings.GetKeyColumnTypes().size());
- for (auto typeId : Settings.GetKeyColumnTypes()) {
- KeyColumnTypes.push_back(NScheme::TTypeInfo((NScheme::TTypeId)typeId));
+ for (size_t i = 0; i < Settings.KeyColumnTypesSize(); ++i) {
+ auto typeId = Settings.GetKeyColumnTypes(i);
+ KeyColumnTypes.push_back(
+ NScheme::TTypeInfo(
+ (NScheme::TTypeId)typeId,
+ (typeId == NScheme::NTypeIds::Pg) ?
+ NPg::TypeDescFromPgTypeId(
+ Settings.GetKeyColumnTypeInfos(i).GetPgTypeId()
+ ) : nullptr));
}
}
@@ -355,7 +376,7 @@ public:
TKeyDesc::TColumnOp op;
op.Column = column.GetId();
op.Operation = TKeyDesc::EColumnOperation::Read;
- op.ExpectedType = NScheme::TTypeInfo((NScheme::TTypeId)column.GetType());
+ op.ExpectedType = MakeTypeInfo(column);
columns.emplace_back(std::move(op));
}
@@ -561,7 +582,6 @@ public:
limit = EVREAD_MAX_ROWS;
}
if (limit == 0) {
- delete state;
return;
}
@@ -613,6 +633,7 @@ public:
CA_LOG_D(TStringBuilder() << "Send EvRead to shardId: " << state->TabletId << ", tablePath: " << Settings.GetTable().GetTablePath()
<< ", ranges: " << DebugPrintRanges(KeyColumnTypes, ev->Ranges, *AppData()->TypeRegistry)
+ << ", limit: " << limit
<< ", readId = " << id
<< " snapshot = (txid=" << Settings.GetSnapshot().GetTxId() << ",step=" << Settings.GetSnapshot().GetStep() << ")"
<< " lockTxId = " << Settings.GetLockTxId());
@@ -649,6 +670,7 @@ public:
Reads[id].RegisterMessage(*ev->Get());
+
RecievedRowCount += ev->Get()->GetRowsCount();
Results.push({Reads[id].Shard->TabletId, THolder<TEventHandle<TEvDataShard::TEvReadResult>>(ev.Release())});
CA_LOG_D(TStringBuilder() << "new data for read #" << id << " pushed");
@@ -679,8 +701,7 @@ public:
if (IsSystemColumn(Settings.GetColumns(resultColumnIndex).GetId())) {
rowStats.AllocatedBytes += sizeof(NUdf::TUnboxedValue);
} else {
- rowStats.AddStatistics(NMiniKQL::GetUnboxedValueSize(
- row[columnIndex], NScheme::TTypeInfo((NScheme::TTypeId)Settings.GetColumns(resultColumnIndex).GetType())));
+ rowStats.AddStatistics(NMiniKQL::GetUnboxedValueSize(row[columnIndex], MakeTypeInfo(Settings.GetColumns(resultColumnIndex))));
columnIndex += 1;
}
}
@@ -694,24 +715,21 @@ public:
return TypeEnv.BindAllocator();
}
- NMiniKQL::TBytesStatistics PackArrow(
- THolder<TEventHandle<TEvDataShard::TEvReadResult>>& result,
- ui64 shardId,
- NKikimr::NMiniKQL::TUnboxedValueVector& batch)
- {
+ NMiniKQL::TBytesStatistics PackArrow(TResult& handle) {
+ auto& [shardId, result, batch, _] = handle;
NMiniKQL::TBytesStatistics stats;
bool hasResultColumns = false;
if (result->Get()->GetRowsCount() == 0) {
return {};
}
if (Settings.ColumnsSize() == 0) {
- batch.resize(result->Get()->GetRowsCount(), HolderFactory.GetEmptyContainer());
+ batch->resize(result->Get()->GetRowsCount(), HolderFactory.GetEmptyContainer());
} else {
TVector<NUdf::TUnboxedValue*> editAccessors(result->Get()->GetRowsCount());
- batch.reserve(result->Get()->GetRowsCount());
+ batch->reserve(result->Get()->GetRowsCount());
for (ui64 rowIndex = 0; rowIndex < result->Get()->GetRowsCount(); ++rowIndex) {
- batch.emplace_back(HolderFactory.CreateDirectArrayHolder(
+ batch->emplace_back(HolderFactory.CreateDirectArrayHolder(
Settings.columns_size(),
editAccessors[rowIndex])
);
@@ -743,29 +761,29 @@ public:
return stats;
}
- NMiniKQL::TBytesStatistics PackCells(
- THolder<TEventHandle<TEvDataShard::TEvReadResult>>& result,
- ui64 shardId,
- NKikimr::NMiniKQL::TUnboxedValueVector& batch)
- {
+ NMiniKQL::TBytesStatistics PackCells(TResult& handle) {
+ auto& [shardId, result, batch, _] = handle;
NMiniKQL::TBytesStatistics stats;
- batch.reserve(batch.size());
+ batch->reserve(batch->size());
for (size_t rowIndex = 0; rowIndex < result->Get()->GetRowsCount(); ++rowIndex) {
const auto& row = result->Get()->GetCells(rowIndex);
NUdf::TUnboxedValue* rowItems = nullptr;
- batch.emplace_back(HolderFactory.CreateDirectArrayHolder(Settings.ColumnsSize(), rowItems));
+ batch->emplace_back(HolderFactory.CreateDirectArrayHolder(Settings.ColumnsSize(), rowItems));
+ size_t rowSize = 0;
size_t columnIndex = 0;
for (size_t resultColumnIndex = 0; resultColumnIndex < Settings.ColumnsSize(); ++resultColumnIndex) {
auto tag = Settings.GetColumns(resultColumnIndex).GetId();
- auto type = NScheme::TTypeInfo((NScheme::TTypeId)Settings.GetColumns(resultColumnIndex).GetType());
+ auto type = MakeTypeInfo(Settings.GetColumns(resultColumnIndex));
if (IsSystemColumn(tag)) {
NMiniKQL::FillSystemColumn(rowItems[resultColumnIndex], shardId, tag, type);
} else {
rowItems[resultColumnIndex] = NMiniKQL::GetCellValue(row[columnIndex], type);
+ rowSize += row[columnIndex].Size();
columnIndex += 1;
}
}
- stats.AddStatistics(GetRowSize(rowItems));
+ stats.DataBytes += std::max(rowSize, (size_t)8);
+ stats.AllocatedBytes += GetRowSize(rowItems).AllocatedBytes;
}
return stats;
}
@@ -778,39 +796,40 @@ public:
{
ui64 bytes = 0;
while (!Results.empty()) {
- auto& [shardId, result, batch, processedRows] = Results.front();
- auto& msg = *result->Get();
+ auto& result = Results.front();
+ auto& batch = result.Batch;
+ auto& msg = *result.ReadResult->Get();
if (!batch.Defined()) {
batch.ConstructInPlace();
switch (msg.Record.GetResultFormat()) {
case NKikimrTxDataShard::EScanDataFormat::ARROW:
- PackArrow(result, shardId, *batch);
+ BytesStats.AddStatistics(PackArrow(result));
break;
case NKikimrTxDataShard::EScanDataFormat::UNSPECIFIED:
case NKikimrTxDataShard::EScanDataFormat::CELLVEC:
- PackCells(result, shardId, *batch);
+ BytesStats.AddStatistics(PackCells(result));
}
}
- auto id = result->Get()->Record.GetReadId();
+ auto id = result.ReadResult->Get()->Record.GetReadId();
if (!Reads[id]) {
Results.pop();
continue;
}
auto* state = Reads[id].Shard;
- for (; processedRows < batch->size(); ++processedRows) {
- NMiniKQL::TBytesStatistics rowSize = GetRowSize((*batch)[processedRows].GetElements());
+ for (; result.ProcessedRows < batch->size(); ++result.ProcessedRows) {
+ NMiniKQL::TBytesStatistics rowSize = GetRowSize((*batch)[result.ProcessedRows].GetElements());
if (static_cast<ui64>(freeSpace) < bytes + rowSize.AllocatedBytes) {
break;
}
- resultVector.push_back(std::move((*batch)[processedRows]));
+ resultVector.push_back(std::move((*batch)[result.ProcessedRows]));
ProcessedRowCount += 1;
bytes += rowSize.AllocatedBytes;
}
CA_LOG_D(TStringBuilder() << "returned " << resultVector.size() << " rows");
- if (batch->size() == processedRows) {
+ if (batch->size() == result.ProcessedRows) {
auto& record = msg.Record;
if (Reads[id].IsLastMessage(msg)) {
Reads[id].Reset();
@@ -835,7 +854,6 @@ public:
auto cancel = MakeHolder<TEvDataShard::TEvReadCancel>();
cancel->Record.SetReadId(id);
Send(MakePipePeNodeCacheID(false), new TEvPipeCache::TEvForward(cancel.Release(), state->TabletId), IEventHandle::FlagTrackDelivery);
- delete state;
Reads[id].Reset();
ResetReads++;
}
@@ -861,6 +879,29 @@ public:
return bytes;
}
+ void FillExtraStats(NDqProto::TDqTaskStats* stats, bool last) override {
+ if (last) {
+ NDqProto::TDqTableStats* tableStats = nullptr;
+ for (size_t i = 0; i < stats->TablesSize(); ++i) {
+ auto* table = stats->MutableTables(i);
+ if (table->GetTablePath() == Settings.GetTable().GetTablePath()) {
+ tableStats = table;
+ }
+ }
+ if (!tableStats) {
+ tableStats = stats->AddTables();
+ tableStats->SetTablePath(Settings.GetTable().GetTablePath());
+
+ }
+
+ //FIXME: use evread statistics after KIKIMR-16924
+ tableStats->SetReadRows(tableStats->GetReadRows() + RecievedRowCount);
+ tableStats->SetReadBytes(tableStats->GetReadBytes() + BytesStats.DataBytes);
+ tableStats->SetAffectedPartitions(tableStats->GetAffectedPartitions() + InFlightShards.Size());
+ }
+ }
+
+
void SaveState(const NYql::NDqProto::TCheckpoint&, NYql::NDqProto::TSourceState&) override {}
void CommitState(const NYql::NDqProto::TCheckpoint&) override {}
void LoadState(const NYql::NDqProto::TSourceState&) override {}
@@ -897,13 +938,25 @@ public:
return result;
}
+
+ NScheme::TTypeInfo MakeTypeInfo(const NKikimrTxDataShard::TKqpTransaction_TColumnMeta& info) {
+ auto typeId = info.GetType();
+ return NScheme::TTypeInfo(
+ (NScheme::TTypeId)typeId,
+ (typeId == NScheme::NTypeIds::Pg) ?
+ NPg::TypeDescFromPgTypeId(
+ info.GetTypeInfo().GetPgTypeId()
+ ) : nullptr);
+ }
+
private:
NKikimrTxDataShard::TKqpReadRangesSourceSettings Settings;
TVector<NScheme::TTypeInfo> KeyColumnTypes;
- size_t RecievedRowCount = 0;
- size_t ProcessedRowCount = 0;
+ NMiniKQL::TBytesStatistics BytesStats;
+ ui64 RecievedRowCount = 0;
+ ui64 ProcessedRowCount = 0;
ui64 ResetReads = 0;
ui64 ReadId = 0;
TVector<TReadState> Reads;
@@ -915,18 +968,6 @@ private:
TShardQueue InFlightShards;
TShardQueue PendingShards;
- struct TResult {
- ui64 ShardId;
- THolder<TEventHandle<TEvDataShard::TEvReadResult>> ReadResult;
- TMaybe<NKikimr::NMiniKQL::TUnboxedValueVector> Batch;
- size_t ProcessedRows = 0;
-
- TResult(ui64 shardId, THolder<TEventHandle<TEvDataShard::TEvReadResult>> readResult)
- : ShardId(shardId)
- , ReadResult(std::move(readResult))
- {
- }
- };
TQueue<TResult> Results;
TVector<NKikimrTxDataShard::TLock> Locks;
diff --git a/ydb/core/protos/tx_datashard.proto b/ydb/core/protos/tx_datashard.proto
index 6d0de59aab..5d886a2b1a 100644
--- a/ydb/core/protos/tx_datashard.proto
+++ b/ydb/core/protos/tx_datashard.proto
@@ -255,8 +255,10 @@ message TKqpReadRangesSourceSettings {
repeated TKqpTransaction.TColumnMeta Columns = 4;
optional uint64 ItemsLimit = 5;
optional bool Reverse = 6;
- repeated string SkipNullKeys = 7;
+
repeated uint32 KeyColumnTypes = 8;
+ repeated NKikimrProto.TTypeInfo KeyColumnTypeInfos = 15;
+
optional EScanDataFormat DataFormat = 9;
optional NKikimrProto.TRowVersion Snapshot = 10;
optional uint64 ShardIdHint = 11;
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h
index d7245f1632..19c3c1d612 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h
@@ -89,6 +89,8 @@ struct IDqComputeActorAsyncInput {
virtual TMaybe<google::protobuf::Any> ExtraData() { return {}; }
+ virtual void FillExtraStats(NDqProto::TDqTaskStats* /* stats */, bool /* finalized stats */) { }
+
// The same signature as IActor::PassAway().
// It is guaranted that this method will be called with bound MKQL allocator.
// So, it is the right place to destroy all internal UnboxedValues.
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index e51ae3ae14..64d3d11547 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -1917,10 +1917,11 @@ public:
THashMap<ui64, ui64> ingressBytesMap;
for (auto& [inputIndex, sourceInfo] : SourcesMap) {
- if (sourceInfo.AsyncInput) {
+ if (auto* source = sourceInfo.AsyncInput) {
auto ingressBytes = sourceInfo.AsyncInput->GetIngressBytes();
ingressBytesMap.emplace(inputIndex, ingressBytes);
Ingress[sourceInfo.Type] = Ingress.Value(sourceInfo.Type, 0) + ingressBytes;
+ source->FillExtraStats(protoTask, last);
}
}
FillTaskRunnerStats(Task.GetId(), Task.GetStageId(), *taskStats, protoTask, (bool) GetProfileStats(), ingressBytesMap);
@@ -1970,6 +1971,10 @@ public:
protoTransform->SetIngressBytes(ingressBytes);
protoTransform->SetMaxMemoryUsage(transformStats->MaxMemoryUsage);
+
+ if (auto* transform = transformInfo.AsyncInput) {
+ transform->FillExtraStats(protoTask, last);
+ }
}
}