aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNikita Vasilev <ns-vasilev@ydb.tech>2024-01-30 13:07:20 +0300
committerGitHub <noreply@github.com>2024-01-30 13:07:20 +0300
commit97103370d3942b7301dfd3d042df52dcf0bc15b5 (patch)
tree79b56959ee1c14838ebf7aed631c85c05f7a6e19
parent1975739200613f4a943b6249c31e3ac9b777f20f (diff)
downloadydb-97103370d3942b7301dfd3d042df52dcf0bc15b5.tar.gz
Fixes for replace select (#1333)
-rw-r--r--ydb/core/base/appdata_fwd.h1
-rw-r--r--ydb/core/formats/arrow/arrow_batch_builder.cpp10
-rw-r--r--ydb/core/formats/arrow/arrow_batch_builder.h1
-rw-r--r--ydb/core/kqp/compile_service/kqp_compile_service.cpp2
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp28
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer.h3
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.cpp9
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h3
-rw-r--r--ydb/core/kqp/runtime/kqp_write_actor.cpp162
-rw-r--r--ydb/core/kqp/session_actor/kqp_session_actor.cpp3
-rw-r--r--ydb/core/kqp/ut/olap/kqp_olap_ut.cpp182
-rw-r--r--ydb/core/protos/table_service_config.proto1
12 files changed, 308 insertions, 97 deletions
diff --git a/ydb/core/base/appdata_fwd.h b/ydb/core/base/appdata_fwd.h
index 329183c5af..33422b430b 100644
--- a/ydb/core/base/appdata_fwd.h
+++ b/ydb/core/base/appdata_fwd.h
@@ -233,6 +233,7 @@ struct TAppData {
bool EnableMvccSnapshotWithLegacyDomainRoot = false;
bool UsePartitionStatsCollectorForTests = false;
bool DisableCdcAutoSwitchingToReadyStateForTests = false;
+ bool EnableOlapSink = false;
TVector<TString> AdministrationAllowedSIDs; // users/groups which allowed to perform administrative tasks
TVector<TString> DefaultUserSIDs;
TString AllAuthenticatedUsers = "all-users@well-known";
diff --git a/ydb/core/formats/arrow/arrow_batch_builder.cpp b/ydb/core/formats/arrow/arrow_batch_builder.cpp
index 3e3fd04865..76c1942155 100644
--- a/ydb/core/formats/arrow/arrow_batch_builder.cpp
+++ b/ydb/core/formats/arrow/arrow_batch_builder.cpp
@@ -241,6 +241,16 @@ void TArrowBatchBuilder::AddRow(const TConstArrayRef<TCell>& key, const TConstAr
}
}
+void TArrowBatchBuilder::AddRow(const TConstArrayRef<TCell>& row) {
+ ++NumRows;
+
+ size_t offset = 0;
+ for (size_t i = 0; i < row.size(); ++i, ++offset) {
+ auto& cell = row[i];
+ AppendCell(cell, offset);
+ }
+}
+
void TArrowBatchBuilder::ReserveData(ui32 columnNo, size_t size) {
if (!BatchBuilder || columnNo >= (ui32)BatchBuilder->num_fields()) {
return;
diff --git a/ydb/core/formats/arrow/arrow_batch_builder.h b/ydb/core/formats/arrow/arrow_batch_builder.h
index 38c30d1cc0..2d4e1f3ed6 100644
--- a/ydb/core/formats/arrow/arrow_batch_builder.h
+++ b/ydb/core/formats/arrow/arrow_batch_builder.h
@@ -161,6 +161,7 @@ public:
void AddRow(const NKikimr::TDbTupleRef& key, const NKikimr::TDbTupleRef& value) override;
void AddRow(const TConstArrayRef<TCell>& key, const TConstArrayRef<TCell>& value);
+ void AddRow(const TConstArrayRef<TCell>& row);
// You have to call it before Start()
void Reserve(size_t numRows) {
diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.cpp b/ydb/core/kqp/compile_service/kqp_compile_service.cpp
index 34b3074d9b..b0da00f2e8 100644
--- a/ydb/core/kqp/compile_service/kqp_compile_service.cpp
+++ b/ydb/core/kqp/compile_service/kqp_compile_service.cpp
@@ -477,6 +477,7 @@ private:
bool enableSequences = TableServiceConfig.GetEnableSequences();
bool enableColumnsWithDefault = TableServiceConfig.GetEnableColumnsWithDefault();
+ bool enableOlapSink = TableServiceConfig.GetEnableOlapSink();
auto mkqlHeavyLimit = TableServiceConfig.GetResourceManager().GetMkqlHeavyProgramMemoryLimit();
@@ -501,6 +502,7 @@ private:
TableServiceConfig.GetIndexAutoChooseMode() != indexAutoChooser ||
TableServiceConfig.GetEnableSequences() != enableSequences ||
TableServiceConfig.GetEnableColumnsWithDefault() != enableColumnsWithDefault ||
+ TableServiceConfig.GetEnableOlapSink() != enableOlapSink ||
TableServiceConfig.GetExtractPredicateRangesLimit() != rangesLimit ||
TableServiceConfig.GetResourceManager().GetMkqlHeavyProgramMemoryLimit() != mkqlHeavyLimit) {
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index ea44de20d6..32ca975492 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -127,12 +127,14 @@ public:
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion,
const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
- const TActorId& creator, TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext)
+ const TActorId& creator, TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
+ const bool enableOlapSink)
: TBase(std::move(request), database, userToken, counters, executerRetriesConfig, chanTransportVersion, aggregation,
maximalSecretsSnapshotWaitTime, userRequestContext, TWilsonKqp::DataExecuter, "DataExecuter"
)
, AsyncIoFactory(std::move(asyncIoFactory))
, StreamResult(streamResult)
+ , EnableOlapSink(enableOlapSink)
{
Target = creator;
@@ -1661,11 +1663,21 @@ private:
}
for (const auto &tableOp : stage.GetTableOps()) {
- if (tableOp.GetTypeCase() != NKqpProto::TKqpPhyTableOperation::kReadOlapRange
- && tableOp.GetTypeCase() != NKqpProto::TKqpPhyTableOperation::kUpsertRows) {
+ if (tableOp.GetTypeCase() != NKqpProto::TKqpPhyTableOperation::kReadOlapRange) {
return true;
}
}
+
+ return false;
+ }
+
+ bool HasOlapSink(const NKqpProto::TKqpPhyStage& stage) {
+ for (const auto& sink : stage.GetSinks()) {
+ if (sink.GetTypeCase() == NKqpProto::TKqpSink::kInternalSink) {
+ return true;
+ }
+ }
+
return false;
}
@@ -1698,7 +1710,8 @@ private:
}
}
- if (stageInfo.Meta.IsOlap() && HasDmlOperationOnOlap(tx.Body->GetType(), stage)) {
+ if ((stageInfo.Meta.IsOlap() && HasDmlOperationOnOlap(tx.Body->GetType(), stage))
+ || (EnableOlapSink && HasOlapSink(stage))) {
auto error = TStringBuilder() << "Data manipulation queries do not support column shard tables.";
LOG_E(error);
ReplyErrorAndDie(Ydb::StatusIds::PRECONDITION_FAILED,
@@ -2201,7 +2214,7 @@ private:
}
}
- const bool singlePartitionOptAllowed = !UnknownAffectedShardCount && !HasExternalSources && (DatashardTxs.size() == 0);
+ const bool singlePartitionOptAllowed = !HasOlapTable && !UnknownAffectedShardCount && !HasExternalSources && (DatashardTxs.size() == 0);
const bool useDataQueryPool = !(HasExternalSources && DatashardTxs.size() == 0);
const bool localComputeTasks = !((HasExternalSources || HasOlapTable || HasDatashardSourceScan) && DatashardTxs.size() == 0);
@@ -2405,6 +2418,7 @@ private:
private:
NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory;
bool StreamResult = false;
+ bool EnableOlapSink = false;
bool HasExternalSources = false;
bool SecretSnapshotRequired = false;
@@ -2446,10 +2460,10 @@ IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const
TKqpRequestCounters::TPtr counters, bool streamResult, const NKikimrConfig::TTableServiceConfig::TAggregationConfig& aggregation,
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator,
- TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext)
+ TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext, const bool enableOlapSink)
{
return new TKqpDataExecuter(std::move(request), database, userToken, counters, streamResult, executerRetriesConfig,
- std::move(asyncIoFactory), chanTransportVersion, aggregation, creator, maximalSecretsSnapshotWaitTime, userRequestContext);
+ std::move(asyncIoFactory), chanTransportVersion, aggregation, creator, maximalSecretsSnapshotWaitTime, userRequestContext, enableOlapSink);
}
} // namespace NKqp
diff --git a/ydb/core/kqp/executer_actor/kqp_executer.h b/ydb/core/kqp/executer_actor/kqp_executer.h
index 6d4cef595b..739f16d040 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer.h
@@ -91,7 +91,8 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery,
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator,
- TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext);
+ TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
+ const bool enableOlapSink);
IActor* CreateKqpSchemeExecuter(TKqpPhyTxHolder::TConstPtr phyTx, NKikimrKqp::EQueryType queryType, const TActorId& target,
const TMaybe<TString>& requestType, const TString& database,
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
index d6ad7a20a1..f43e2ad845 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.cpp
@@ -83,12 +83,13 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory, TPreparedQueryHolder::TConstPtr preparedQuery,
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator,
- TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext)
+ TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
+ const bool enableOlapSink)
{
if (request.Transactions.empty()) {
// commit-only or rollback-only data transaction
YQL_ENSURE(request.LocksOp == ELocksOp::Commit || request.LocksOp == ELocksOp::Rollback);
- return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, userRequestContext);
+ return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, userRequestContext, enableOlapSink);
}
TMaybe<NKqpProto::TKqpPhyTx::EType> txsType;
@@ -104,13 +105,13 @@ IActor* CreateKqpExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TSt
switch (*txsType) {
case NKqpProto::TKqpPhyTx::TYPE_COMPUTE:
case NKqpProto::TKqpPhyTx::TYPE_DATA:
- return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, userRequestContext);
+ return CreateKqpDataExecuter(std::move(request), database, userToken, counters, false, aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, userRequestContext, enableOlapSink);
case NKqpProto::TKqpPhyTx::TYPE_SCAN:
return CreateKqpScanExecuter(std::move(request), database, userToken, counters, aggregation, executerRetriesConfig, preparedQuery, chanTransportVersion, maximalSecretsSnapshotWaitTime, userRequestContext);
case NKqpProto::TKqpPhyTx::TYPE_GENERIC:
- return CreateKqpDataExecuter(std::move(request), database, userToken, counters, true, aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, userRequestContext);
+ return CreateKqpDataExecuter(std::move(request), database, userToken, counters, true, aggregation, executerRetriesConfig, std::move(asyncIoFactory), chanTransportVersion, creator, maximalSecretsSnapshotWaitTime, userRequestContext, enableOlapSink);
default:
YQL_ENSURE(false, "Unsupported physical tx type: " << (ui32)*txsType);
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index 227120f6a4..cd767f1ce4 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -1769,7 +1769,8 @@ IActor* CreateKqpDataExecuter(IKqpGateway::TExecPhysicalRequest&& request, const
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig& executerRetriesConfig,
NYql::NDq::IDqAsyncIoFactory::TPtr asyncIoFactory,
const NKikimrConfig::TTableServiceConfig::EChannelTransportVersion chanTransportVersion, const TActorId& creator,
- TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext);
+ TDuration maximalSecretsSnapshotWaitTime, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
+ const bool enableOlapSink);
IActor* CreateKqpScanExecuter(IKqpGateway::TExecPhysicalRequest&& request, const TString& database,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TKqpRequestCounters::TPtr counters,
diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp
index bb5260bbf9..c6217022ae 100644
--- a/ydb/core/kqp/runtime/kqp_write_actor.cpp
+++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp
@@ -6,6 +6,7 @@
#include <ydb/core/engine/minikql/minikql_engine_host.h>
#include <ydb/core/formats/arrow/arrow_batch_builder.h>
#include <ydb/core/kqp/common/kqp_yql.h>
+#include <ydb/core/scheme/scheme_types_proto.h>
#include <ydb/core/tx/data_events/events.h>
#include <ydb/core/tx/data_events/payload_helper.h>
#include <ydb/core/tx/data_events/shards_splitter.h>
@@ -19,7 +20,7 @@
namespace {
struct TWriteActorBackoffSettings {
- TDuration StartRetryDelay = TDuration::MilliSeconds(100);
+ TDuration StartRetryDelay = TDuration::MilliSeconds(150);
TDuration MaxRetryDelay = TDuration::Seconds(5);
double UnsertaintyRatio = 0.5;
double Multiplier = 2.0;
@@ -81,13 +82,13 @@ public:
, Counters(counters)
, TypeEnv(args.TypeEnv)
, TxId(args.TxId)
- , TableId(Settings.GetTable().GetOwnerId(), Settings.GetTable().GetTableId())
+ , TableId(
+ Settings.GetTable().GetOwnerId(),
+ Settings.GetTable().GetTableId(),
+ Settings.GetTable().GetVersion())
{
YQL_ENSURE(std::holds_alternative<ui64>(TxId));
EgressStats.Level = args.StatsLevel;
-
- BuildColumns();
- PrepareBatchBuilder();
}
void Bootstrap() {
@@ -114,7 +115,9 @@ private:
}
i64 GetFreeSpace() const final {
- return MemoryLimit - (MemoryInFlight + BatchBuilder->Bytes());
+ return SchemeEntry
+ ? MemoryLimit - (MemoryInFlight + BatchBuilder->Bytes())
+ : std::numeric_limits<i64>::min(); // Can't use zero here because compute can use overcommit!
}
void SendData(NMiniKQL::TUnboxedValueBatch&& data, i64, const TMaybe<NYql::NDqProto::TCheckpoint>&, bool finished) final {
@@ -128,18 +131,19 @@ private:
}
void AddToInputQueue(NMiniKQL::TUnboxedValueBatch&& data) {
+ YQL_ENSURE(SchemeEntry);
+ YQL_ENSURE(BatchBuilder);
+
TVector<TCell> cells(Columns.size());
data.ForEachRow([&](const auto& row) {
for (size_t index = 0; index < Columns.size(); ++index) {
- cells[index] = MakeCell(
+ cells[SendIndexToWriteIndexMapping[index]] = MakeCell(
Columns[index].PType,
row.GetElement(index),
TypeEnv,
/* copy */ false);
}
- BatchBuilder->AddRow(
- TConstArrayRef<TCell>{cells.begin(), cells.begin() + KeyColumns.size()},
- TConstArrayRef<TCell>{cells.begin() + KeyColumns.size(), cells.end()});
+ BatchBuilder->AddRow(TConstArrayRef<TCell>{cells.begin(), cells.end()});
});
}
@@ -176,16 +180,21 @@ private:
RuntimeError(TStringBuilder() << "Failed to get table: "
<< TableId << "'", NYql::NDqProto::StatusIds::SCHEME_ERROR);
}
-
auto& resultSet = ev->Get()->Request->ResultSet;
YQL_ENSURE(resultSet.size() == 1);
SchemeEntry = resultSet[0];
YQL_ENSURE(SchemeEntry->Kind == NSchemeCache::TSchemeCacheNavigate::KindColumnTable);
- CA_LOG_D("Resolved TableId=" << TableId);
+ CA_LOG_D("Resolved TableId=" << TableId << " (" << SchemeEntry->TableId.PathId.ToString() << " " << SchemeEntry->TableId.SchemaVersion << ")");
- ProcessRows();
+ if (SchemeEntry->TableId.SchemaVersion != TableId.SchemaVersion) {
+ RuntimeError(TStringBuilder() << "Schema was updated.", NYql::NDqProto::StatusIds::SCHEME_ERROR);
+ }
+
+ Prepare();
+
+ Callbacks->ResumeExecution();
}
void Handle(NKikimr::NEvents::TDataEvents::TEvWriteResult::TPtr& ev) {
@@ -224,9 +233,13 @@ private:
} else if (ev->Get()->GetStatus() == NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED) {
CA_LOG_D("Got completed result TxId=" << ev->Get()->Record.GetTxId() << ", TabletId=" << ev->Get()->Record.GetOrigin());
auto& batchesQueue = InFlightBatches.at(ev->Get()->Record.GetOrigin());
- YQL_ENSURE(!batchesQueue.empty());
- if (ev->Get()->Record.GetTxId() == batchesQueue.front().TxId) {
+ if (!batchesQueue.empty()
+ && (ev->Get()->Record.GetTxId() == batchesQueue.front().TxId
+ || std::find(
+ std::begin(batchesQueue.front().OldTxIds),
+ std::end(batchesQueue.front().OldTxIds),
+ ev->Get()->Record.GetTxId()) != std::end(batchesQueue.front().OldTxIds))) {
const bool needToResume = (GetFreeSpace() <= 0);
EgressStats.Bytes += batchesQueue.front().Data.size();
@@ -247,7 +260,7 @@ private:
void ProcessRows() {
SplitBatchByShards();
- SendNewBatchesToShards();
+ SendBatchesToShards();
if (Finished && SchemeEntry && IsInFlightBatchesEmpty()) {
Callbacks->OnAsyncOutputFinished(GetOutputIndex());
@@ -290,9 +303,9 @@ private:
}
}
- void SendNewBatchesToShards() {
+ void SendBatchesToShards() {
for (auto& [shardId, batches] : InFlightBatches) {
- if (!batches.empty() && batches.front().SendAttempts == 0) {
+ if (!batches.empty() && batches.front().TxId == 0) {
if (const auto txId = AllocateTxId(); txId) {
batches.front().TxId = *txId;
SendRequestShard(shardId);
@@ -331,7 +344,7 @@ private:
Settings.GetTable().GetTableId(),
Settings.GetTable().GetVersion() + 1 // TODO: SchemeShard returns wrong version.
},
- ColumnIds,
+ WriteColumnIds,
payloadIndex,
NKikimrDataEvents::FORMAT_ARROW);
@@ -348,6 +361,19 @@ private:
++inFlightBatch.SendAttempts;
}
+ void RetryShard(const ui64 shardId) {
+ if (!InFlightBatches.contains(shardId) || InFlightBatches.at(shardId).empty()) {
+ return;
+ }
+ CA_LOG_D("Retry ShardID=" << shardId);
+ auto& inFlightBatch = InFlightBatches.at(shardId).front();
+ if (inFlightBatch.TxId != 0) {
+ inFlightBatch.OldTxIds.push_back(inFlightBatch.TxId);
+ inFlightBatch.TxId = 0;
+ }
+ RequestNewTxId();
+ }
+
void Handle(TEvPrivate::TEvShardRequestTimeout::TPtr& ev) {
if (!InFlightBatches.contains(ev->Get()->ShardId)) {
return;
@@ -359,7 +385,7 @@ private:
return;
}
- SendRequestShard(ev->Get()->ShardId);
+ RetryShard(ev->Get()->ShardId);
}
void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev) {
@@ -367,7 +393,7 @@ private:
if (!InFlightBatches.contains(ev->Get()->TabletId)) {
return;
}
- SendRequestShard(ev->Get()->TabletId);
+ RetryShard(ev->Get()->TabletId);
}
void Handle(TEvTxUserProxy::TEvAllocateTxIdResult::TPtr& ev) {
@@ -383,8 +409,8 @@ private:
if (FreeTxIds.empty()) {
return std::nullopt;
}
- const ui64 result = FreeTxIds.back();
- FreeTxIds.pop_back();
+ const ui64 result = FreeTxIds.front();
+ FreeTxIds.pop_front();
return result;
}
@@ -401,52 +427,55 @@ private:
}
void PassAway() override {
+ Send(PipeCacheId, new TEvPipeCache::TEvUnlink(0));
TActorBootstrapped<TKqpWriteActor>::PassAway();
}
- void BuildColumns() {
- KeyColumns.reserve(Settings.KeyColumnsSize());
- i32 number = 0;
- for (const auto& column : Settings.GetKeyColumns()) {
- KeyColumns.emplace_back(
- column.GetName(),
- column.GetId(),
- NScheme::TTypeInfo {
- static_cast<NScheme::TTypeId>(column.GetTypeId()),
- column.GetTypeId() == NScheme::NTypeIds::Pg
- ? NPg::TypeDescFromPgTypeId(column.GetTypeInfo().GetPgTypeId())
- : nullptr
- },
- column.GetTypeInfo().GetPgTypeMod(),
- number++
- );
+ void Prepare() {
+ YQL_ENSURE(SchemeEntry);
+ std::vector<std::pair<TString, NScheme::TTypeInfo>> batchBuilderColumns;
+ THashMap<ui32, ui32> writeColumnIdToIndex;
+ {
+ batchBuilderColumns.reserve(Settings.ColumnsSize());
+ WriteColumnIds.reserve(Settings.ColumnsSize());
+ if (!SchemeEntry->ColumnTableInfo) {
+ RuntimeError("Expected column table.", NYql::NDqProto::StatusIds::SCHEME_ERROR);
+ }
+ if (!SchemeEntry->ColumnTableInfo->Description.HasSchema()) {
+ RuntimeError("Unknown schema for column table.", NYql::NDqProto::StatusIds::SCHEME_ERROR);
+ }
+ i32 number = 0;
+ for (const auto& column : SchemeEntry->ColumnTableInfo->Description.GetSchema().GetColumns()) {
+ Y_ABORT_UNLESS(column.HasTypeId());
+ auto typeInfoMod = NScheme::TypeInfoModFromProtoColumnType(column.GetTypeId(),
+ column.HasTypeInfo() ? &column.GetTypeInfo() : nullptr);
+ batchBuilderColumns.emplace_back(column.GetName(), typeInfoMod.TypeInfo);
+ WriteColumnIds.push_back(column.GetId());
+ writeColumnIdToIndex[column.GetId()] = number++;
+ }
}
- ColumnIds.reserve(Settings.ColumnsSize());
- Columns.reserve(Settings.ColumnsSize());
- number = 0;
- for (const auto& column : Settings.GetColumns()) {
- ColumnIds.push_back(column.GetId());
- Columns.emplace_back(
- column.GetName(),
- column.GetId(),
- NScheme::TTypeInfo {
- static_cast<NScheme::TTypeId>(column.GetTypeId()),
- column.GetTypeId() == NScheme::NTypeIds::Pg
- ? NPg::TypeDescFromPgTypeId(column.GetTypeInfo().GetPgTypeId())
- : nullptr
- },
- column.GetTypeInfo().GetPgTypeMod(),
- number++
- );
+ {
+ SendIndexToWriteIndexMapping.resize(Settings.ColumnsSize());
+ Columns.reserve(Settings.ColumnsSize());
+ i32 number = 0;
+ for (const auto& column : Settings.GetColumns()) {
+ SendIndexToWriteIndexMapping[number] = writeColumnIdToIndex.at(column.GetId());
+ Columns.emplace_back(
+ column.GetName(),
+ column.GetId(),
+ NScheme::TTypeInfo {
+ static_cast<NScheme::TTypeId>(column.GetTypeId()),
+ column.GetTypeId() == NScheme::NTypeIds::Pg
+ ? NPg::TypeDescFromPgTypeId(column.GetTypeInfo().GetPgTypeId())
+ : nullptr
+ },
+ column.GetTypeInfo().GetPgTypeMod(),
+ number++
+ );
+ }
}
- }
- void PrepareBatchBuilder() {
- std::vector<std::pair<TString, NScheme::TTypeInfo>> columns;
- for (const auto& column : Columns) {
- columns.emplace_back(column.Name, column.PType);
- }
std::set<std::string> notNullColumns;
for (const auto& column : Settings.GetColumns()) {
if (column.GetNotNull()) {
@@ -457,7 +486,7 @@ private:
BatchBuilder = std::make_unique<NArrow::TArrowBatchBuilder>(arrow::Compression::UNCOMPRESSED, notNullColumns);
TString err;
- if (!BatchBuilder->Start(columns, 0, 0, err)) {
+ if (!BatchBuilder->Start(batchBuilderColumns, 0, 0, err)) {
RuntimeError("Failed to start batch builder: " + err, NYql::NDqProto::StatusIds::PRECONDITION_FAILED);
}
}
@@ -498,9 +527,9 @@ private:
const NYql::NDq::TTxId TxId;
const TTableId TableId;
- TVector<TSysTables::TTableColumnInfo> KeyColumns;
TVector<TSysTables::TTableColumnInfo> Columns;
- std::vector<ui32> ColumnIds;
+ TVector<ui32> SendIndexToWriteIndexMapping;
+ std::vector<ui32> WriteColumnIds;
std::optional<NSchemeCache::TSchemeCacheNavigate::TEntry> SchemeEntry;
@@ -510,6 +539,7 @@ private:
TString Data;
ui32 SendAttempts = 0;
ui64 TxId = 0;
+ TVector<ui64> OldTxIds;
};
THashMap<ui64, std::deque<TInFlightBatch>> InFlightBatches;
bool Finished = false;
@@ -517,7 +547,7 @@ private:
const i64 MemoryLimit = kInFlightMemoryLimitPerActor;
i64 MemoryInFlight = 0;
- std::vector<ui64> FreeTxIds;
+ std::deque<ui64> FreeTxIds;
};
void RegisterKqpWriteActor(NYql::NDq::TDqAsyncIoFactory& factory, TIntrusivePtr<TKqpCounters> counters) {
diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
index 1ff33679fd..fe2a661c3c 100644
--- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp
+++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp
@@ -1084,7 +1084,8 @@ public:
QueryState ? QueryState->UserToken : TIntrusiveConstPtr<NACLib::TUserToken>(),
RequestCounters, Settings.TableService.GetAggregationConfig(), Settings.TableService.GetExecuterRetriesConfig(),
AsyncIoFactory, QueryState ? QueryState->PreparedQuery : nullptr, Settings.TableService.GetChannelTransportVersion(), SelfId(), 2 * TDuration::Seconds(MetadataProviderConfig.GetRefreshPeriodSeconds()),
- QueryState ? QueryState->UserRequestContext : MakeIntrusive<TUserRequestContext>("", Settings.Database, SessionId));
+ QueryState ? QueryState->UserRequestContext : MakeIntrusive<TUserRequestContext>("", Settings.Database, SessionId),
+ Settings.TableService.GetEnableOlapSink());
auto exId = RegisterWithSameMailbox(executerActor);
LOG_D("Created new KQP executer: " << exId << " isRollback: " << isRollback);
diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
index c5de31b63b..9f9a4701b6 100644
--- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
+++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
@@ -5507,13 +5507,14 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
auto settings = TKikimrSettings()
.SetWithSampleTables(false);
TKikimrRunner kikimr(settings);
+ kikimr.GetTestServer().GetRuntime()->GetAppData(0).EnableOlapSink = true;
Tests::NCommon::TLoggerInit(kikimr).Initialize();
TTableWithNullsHelper(kikimr).CreateTableWithNulls();
auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession();
const TString query = R"(
- CREATE TABLE `/Root/DataShard` (
+ CREATE TABLE `/Root/DataShard1` (
Col1 Uint64 NOT NULL,
Col2 String,
Col3 Int32 NOT NULL,
@@ -5521,7 +5522,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
)
WITH (UNIFORM_PARTITIONS = 2, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 2);
- CREATE TABLE `/Root/ColumnShard` (
+ CREATE TABLE `/Root/ColumnShard1` (
Col1 Uint64 NOT NULL,
Col2 String,
Col3 Int32 NOT NULL,
@@ -5553,7 +5554,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
auto client = kikimr.GetQueryClient();
auto prepareResult = client.ExecuteQuery(R"(
- REPLACE INTO `/Root/DataShard` (Col1, Col2, Col3) VALUES
+ REPLACE INTO `/Root/DataShard1` (Col1, Col2, Col3) VALUES
(1u, "test1", 10), (2u, "test2", 11), (3u, "test3", 12), (4u, NULL, 13);
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString());
@@ -5561,14 +5562,14 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
{
// row -> column
const TString sql = R"(
- REPLACE INTO `/Root/ColumnShard`
- SELECT * FROM `/Root/DataShard`
+ REPLACE INTO `/Root/ColumnShard1`
+ SELECT * FROM `/Root/DataShard1`
)";
auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
UNIT_ASSERT_C(insertResult.IsSuccess(), insertResult.GetIssues().ToString());
auto it = client.StreamExecuteQuery(R"(
- SELECT * FROM `/Root/ColumnShard` ORDER BY Col1, Col2, Col3;
+ SELECT * FROM `/Root/ColumnShard1` ORDER BY Col1, Col2, Col3;
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());
TString output = StreamResultToYson(it);
@@ -5578,14 +5579,14 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
{
// Missing Nullable column
const TString sql = R"(
- REPLACE INTO `/Root/ColumnShard`
- SELECT 10u + Col1 AS Col1, 100 + Col3 AS Col3 FROM `/Root/DataShard`
+ REPLACE INTO `/Root/ColumnShard1`
+ SELECT 10u + Col1 AS Col1, 100 + Col3 AS Col3 FROM `/Root/DataShard1`
)";
auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
UNIT_ASSERT_C(insertResult.IsSuccess(), insertResult.GetIssues().ToString());
auto it = client.StreamExecuteQuery(R"(
- SELECT * FROM `/Root/ColumnShard` ORDER BY Col1, Col2, Col3;
+ SELECT * FROM `/Root/ColumnShard1` ORDER BY Col1, Col2, Col3;
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());
TString output = StreamResultToYson(it);
@@ -5598,7 +5599,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
// column -> column
const TString sql = R"(
REPLACE INTO `/Root/ColumnShard2`
- SELECT * FROM `/Root/ColumnShard`
+ SELECT * FROM `/Root/ColumnShard1`
)";
auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
UNIT_ASSERT_C(insertResult.IsSuccess(), insertResult.GetIssues().ToString());
@@ -5633,10 +5634,64 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
}
}
+ Y_UNIT_TEST(OlapReplace_FromSelectLarge) {
+ auto settings = TKikimrSettings()
+ .SetWithSampleTables(false);
+
+ TTestHelper testHelper(settings);
+
+ TKikimrRunner& kikimr = testHelper.GetKikimr();
+ testHelper.GetRuntime().GetAppData(0).EnableOlapSink = true;
+ Tests::NCommon::TLoggerInit(kikimr).Initialize();
+
+ TVector<TTestHelper::TColumnSchema> schema = {
+ TTestHelper::TColumnSchema().SetName("Col1").SetType(NScheme::NTypeIds::Int64).SetNullable(false),
+ TTestHelper::TColumnSchema().SetName("Col2").SetType(NScheme::NTypeIds::Int32).SetNullable(false),
+ };
+
+ TTestHelper::TColumnTable testTable1;
+ testTable1.SetName("/Root/ColumnShard1").SetPrimaryKey({ "Col1" }).SetSharding({ "Col1" }).SetSchema(schema);
+ testHelper.CreateTable(testTable1);
+
+ TTestHelper::TColumnTable testTable2;
+ testTable2.SetName("/Root/ColumnShard2").SetPrimaryKey({ "Col1" }).SetSharding({ "Col1" }).SetSchema(schema);
+ testHelper.CreateTable(testTable2);
+
+ {
+ TTestHelper::TUpdatesBuilder tableInserter(testTable1.GetArrowSchema(schema));
+ for (size_t index = 0; index < 10000; ++index) {
+ tableInserter.AddRow().Add(index).Add(index * 10);
+ }
+ testHelper.BulkUpsert(testTable1, tableInserter);
+ }
+
+ auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession();
+ auto client = kikimr.GetQueryClient();
+
+ {
+ const TString sql = R"(
+ REPLACE INTO `/Root/ColumnShard2`
+ SELECT * FROM `/Root/ColumnShard1`
+ )";
+ auto insertResult = client.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
+ UNIT_ASSERT_C(insertResult.IsSuccess(), insertResult.GetIssues().ToString());
+
+ auto it = client.StreamExecuteQuery(R"(
+ SELECT COUNT(*) FROM `/Root/ColumnShard2`;
+ )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());
+ TString output = StreamResultToYson(it);
+ CompareYson(
+ output,
+ R"([[10000u]])");
+ }
+ }
+
Y_UNIT_TEST(OlapReplace_Simple) {
auto settings = TKikimrSettings()
.SetWithSampleTables(false);
TKikimrRunner kikimr(settings);
+ kikimr.GetTestServer().GetRuntime()->GetAppData(0).EnableOlapSink = true;
Tests::NCommon::TLoggerInit(kikimr).Initialize();
TTableWithNullsHelper(kikimr).CreateTableWithNulls();
@@ -5645,13 +5700,13 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
const TString query = R"(
CREATE TABLE `/Root/ColumnShard` (
Col1 Uint64 NOT NULL,
- Col2 String NOT NULL,
- Col3 Int32,
+ Col2 Int32,
Col4 String,
- PRIMARY KEY (Col1, Col2)
+ Col3 String NOT NULL,
+ PRIMARY KEY (Col1, Col3)
)
PARTITION BY HASH(Col1)
- WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 16);
+ WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10);
)";
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
@@ -5661,7 +5716,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
auto client = kikimr.GetQueryClient();
{
auto prepareResult = client.ExecuteQuery(R"(
- REPLACE INTO `/Root/ColumnShard` (Col2, Col4, Col3, Col1) VALUES
+ REPLACE INTO `/Root/ColumnShard` (Col3, Col4, Col2, Col1) VALUES
("test100", "100", 1000, 100u);
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString());
@@ -5669,14 +5724,14 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
{
auto prepareResult = client.ExecuteQuery(R"(
- REPLACE INTO `/Root/ColumnShard` (Col1, Col2, Col3, Col4) VALUES
+ REPLACE INTO `/Root/ColumnShard` (Col1, Col3, Col2, Col4) VALUES
(1u, "test1", 10, "1"), (2u, "test2", NULL, "2"), (3u, "test3", 12, NULL), (4u, "test4", NULL, NULL);
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString());
}
auto it = client.StreamExecuteQuery(R"(
- SELECT * FROM `/Root/ColumnShard` ORDER BY Col1, Col2, Col3, Col4;
+ SELECT Col1, Col3, Col2, Col4 FROM `/Root/ColumnShard` ORDER BY Col1, Col3, Col2, Col4;
)", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());
TString output = StreamResultToYson(it);
@@ -5687,6 +5742,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
auto settings = TKikimrSettings()
.SetWithSampleTables(false);
TKikimrRunner kikimr(settings);
+ kikimr.GetTestServer().GetRuntime()->GetAppData(0).EnableOlapSink = true;
Tests::NCommon::TLoggerInit(kikimr).Initialize();
TTableWithNullsHelper(kikimr).CreateTableWithNulls();
@@ -5735,6 +5791,98 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
CompareYson(output, R"([])");
}
}
+
+ Y_UNIT_TEST(OlapReplace_Duplicates) {
+ auto settings = TKikimrSettings()
+ .SetWithSampleTables(false);
+ TKikimrRunner kikimr(settings);
+ kikimr.GetTestServer().GetRuntime()->GetAppData(0).EnableOlapSink = true;
+ Tests::NCommon::TLoggerInit(kikimr).Initialize();
+ TTableWithNullsHelper(kikimr).CreateTableWithNulls();
+
+ auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession();
+
+ const TString query = R"(
+ CREATE TABLE `/Root/ColumnShard` (
+ Col1 Uint64 NOT NULL,
+ Col2 Int32,
+ PRIMARY KEY (Col1)
+ )
+ PARTITION BY HASH(Col1)
+ WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 10);
+ )";
+
+ auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+
+ auto client = kikimr.GetQueryClient();
+ {
+ auto prepareResult = client.ExecuteQuery(R"(
+ REPLACE INTO `/Root/ColumnShard` (Col1, Col2) VALUES
+ (100u, 1000), (100u, 1000);
+ )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString());
+ }
+
+ {
+ auto prepareResult = client.ExecuteQuery(R"(
+ REPLACE INTO `/Root/ColumnShard` (Col1, Col2) VALUES
+ (100u, 1000), (100u, 1000);
+ )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString());
+ }
+
+ auto it = client.StreamExecuteQuery(R"(
+ SELECT * FROM `/Root/ColumnShard` ORDER BY Col1, Col2;
+ )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());
+ TString output = StreamResultToYson(it);
+ CompareYson(output, R"([[100u;[1000]]])");
+ }
+
+ Y_UNIT_TEST(OlapReplace_DisableOlapSink) {
+ auto settings = TKikimrSettings()
+ .SetWithSampleTables(false);
+ TKikimrRunner kikimr(settings);
+ kikimr.GetTestServer().GetRuntime()->GetAppData(0).EnableOlapSink = false;
+ Tests::NCommon::TLoggerInit(kikimr).Initialize();
+ TTableWithNullsHelper(kikimr).CreateTableWithNulls();
+
+ auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession();
+
+ const TString query = R"(
+ CREATE TABLE `/Root/ColumnShard` (
+ Col1 Uint64 NOT NULL,
+ Col2 Int32,
+ PRIMARY KEY (Col1)
+ )
+ PARTITION BY HASH(Col1)
+ WITH (STORE = COLUMN, AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 16);
+ )";
+
+ auto result = session.ExecuteSchemeQuery(query).GetValueSync();
+ UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+
+ auto client = kikimr.GetQueryClient();
+ {
+ auto prepareResult = client.ExecuteQuery(R"(
+ REPLACE INTO `/Root/ColumnShard` (Col1, Col2) VALUES (1u, 1)
+ )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT(!prepareResult.IsSuccess());
+ UNIT_ASSERT_C(
+ prepareResult.GetIssues().ToString().Contains("Data manipulation queries do not support column shard tables."),
+ prepareResult.GetIssues().ToString());
+ }
+
+ {
+ auto it = client.StreamExecuteQuery(R"(
+ SELECT * FROM `/Root/ColumnShard`;
+ )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString());
+ TString output = StreamResultToYson(it);
+ CompareYson(output, R"([])");
+ }
+ }
}
} // namespace NKqp
diff --git a/ydb/core/protos/table_service_config.proto b/ydb/core/protos/table_service_config.proto
index 5ca397d677..54aa9c0d83 100644
--- a/ydb/core/protos/table_service_config.proto
+++ b/ydb/core/protos/table_service_config.proto
@@ -272,4 +272,5 @@ message TTableServiceConfig {
optional bool EnablePgConstsToParams = 53 [default = false];
optional uint64 ExtractPredicateRangesLimit = 54 [default = 10000];
+ optional bool EnableOlapSink = 55 [default = false];
};