aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorArtem Zuikov <chertus@gmail.com>2022-07-06 20:24:14 +0300
committerArtem Zuikov <chertus@gmail.com>2022-07-06 20:24:14 +0300
commitb2ec03de03af07aefcce3360eb3a2995d4fac12f (patch)
tree516c012b9788cf560ebb6bf686526259712bae6d
parent043d2bac9c33c1cc375da81492ad8a316bf0f2ae (diff)
downloadydb-b2ec03de03af07aefcce3360eb3a2995d4fac12f.tar.gz
KIKIMR-13157: check DB quota in ColumnTables upserts
ref:8cc5d31abf1bc5be595824c4edaf925831c51286
-rw-r--r--ydb/core/grpc_services/base/base.h29
-rw-r--r--ydb/core/grpc_services/grpc_request_proxy.cpp3
-rw-r--r--ydb/core/grpc_services/local_rpc/local_rpc.h12
-rw-r--r--ydb/core/grpc_services/rpc_load_rows.cpp24
-rw-r--r--ydb/core/grpc_services/rpc_long_tx.cpp24
-rw-r--r--ydb/core/kqp/executer/kqp_table_resolver.cpp4
-rw-r--r--ydb/core/kqp/kqp_metadata_loader.cpp6
-rw-r--r--ydb/core/kqp/ut/kqp_olap_ut.cpp4
-rw-r--r--ydb/core/sys_view/common/schema.cpp20
-rw-r--r--ydb/core/sys_view/common/schema.h2
-rw-r--r--ydb/core/testlib/test_client.cpp6
-rw-r--r--ydb/core/testlib/test_client.h12
-rw-r--r--ydb/core/tx/scheme_board/cache.cpp48
-rw-r--r--ydb/core/tx/scheme_cache/scheme_cache.h6
-rw-r--r--ydb/core/tx/tx_proxy/resolvereq.cpp2
-rw-r--r--ydb/core/tx/tx_proxy/resolvereq.h2
-rw-r--r--ydb/core/tx/tx_proxy/snapshotreq.cpp10
-rw-r--r--ydb/core/tx/tx_proxy/upload_rows_common_impl.h38
-rw-r--r--ydb/public/api/protos/ydb_common.proto4
-rw-r--r--ydb/services/ydb/ydb_common_ut.h2
-rw-r--r--ydb/services/ydb/ydb_olapstore_ut.cpp2
21 files changed, 164 insertions, 96 deletions
diff --git a/ydb/core/grpc_services/base/base.h b/ydb/core/grpc_services/base/base.h
index 62113c5c70..e45beccd1a 100644
--- a/ydb/core/grpc_services/base/base.h
+++ b/ydb/core/grpc_services/base/base.h
@@ -269,6 +269,8 @@ public:
virtual void RaiseIssues(const NYql::TIssues& issues) = 0;
virtual TMaybe<TString> GetTraceId() const = 0;
virtual const TString& GetRequestName() const = 0;
+ virtual void SetDiskQuotaExceeded(bool disk) = 0;
+ virtual bool GetDiskQuotaExceeded() const = 0;
};
class TRespHookCtx : public TThrRefBase {
@@ -353,7 +355,6 @@ public:
// Pass request for next processing
virtual void Pass(const IFacilityProvider& facility) = 0;
-
};
// Request context
@@ -476,6 +477,13 @@ public:
return TMaybe<TString>{};
}
+ void SetDiskQuotaExceeded(bool) override {
+ }
+
+ bool GetDiskQuotaExceeded() const override {
+ return false;
+ }
+
void ReplyWithRpcStatus(grpc::StatusCode, const TString&) override {
Y_FAIL("Unimplemented");
}
@@ -707,6 +715,13 @@ public:
return ToMaybe(Ctx_->GetPeerMetaValues(key));
}
+ void SetDiskQuotaExceeded(bool) override {
+ }
+
+ bool GetDiskQuotaExceeded() const override {
+ return false;
+ }
+
void RefreshToken(const TString& token, const TActorContext& ctx, TActorId id) {
NGRpcService::RefreshToken(token, GetDatabaseName().GetOrElse(""), ctx, id);
}
@@ -907,6 +922,17 @@ public:
return ToMaybe(Ctx_->GetPeerMetaValues(key));
}
+ void SetDiskQuotaExceeded(bool disk) override {
+ if (!QuotaExceeded) {
+ QuotaExceeded = google::protobuf::Arena::CreateMessage<Ydb::QuotaExceeded>(GetArena());
+ }
+ QuotaExceeded->set_disk(disk);
+ }
+
+ bool GetDiskQuotaExceeded() const override {
+ return QuotaExceeded ? QuotaExceeded->disk() : false;
+ }
+
bool Validate(TString&) override {
return true;
}
@@ -1066,6 +1092,7 @@ private:
TString InternalToken_;
NYql::TIssueManager IssueManager;
Ydb::CostInfo* CostInfo = nullptr;
+ Ydb::QuotaExceeded* QuotaExceeded = nullptr;
ui64 Ru = 0;
TRespHook RespHook;
TMaybe<NRpcService::TRlPath> RlPath;
diff --git a/ydb/core/grpc_services/grpc_request_proxy.cpp b/ydb/core/grpc_services/grpc_request_proxy.cpp
index 04b7a3f6f2..d16e1c5284 100644
--- a/ydb/core/grpc_services/grpc_request_proxy.cpp
+++ b/ydb/core/grpc_services/grpc_request_proxy.cpp
@@ -248,6 +248,9 @@ private:
return;
}
}
+ if (domain.GetDomainState().GetDiskQuotaExceeded()) {
+ requestBaseCtx->SetDiskQuotaExceeded(true);
+ }
} else {
Counters->IncDatabaseUnavailableCounter();
auto issue = MakeIssue(NKikimrIssues::TIssuesIds::YDB_DB_NOT_READY, "database unavailable");
diff --git a/ydb/core/grpc_services/local_rpc/local_rpc.h b/ydb/core/grpc_services/local_rpc/local_rpc.h
index da29f6ae9e..a7c3d378fa 100644
--- a/ydb/core/grpc_services/local_rpc/local_rpc.h
+++ b/ydb/core/grpc_services/local_rpc/local_rpc.h
@@ -181,6 +181,17 @@ public:
CostInfo->set_consumed_units(consumed_units);
}
+ void SetDiskQuotaExceeded(bool disk) override {
+ if (!QuotaExceeded) {
+ QuotaExceeded = std::make_unique<Ydb::QuotaExceeded>();
+ }
+ QuotaExceeded->set_disk(disk);
+ }
+
+ bool GetDiskQuotaExceeded() const override {
+ return QuotaExceeded ? QuotaExceeded->disk() : false;
+ }
+
TMaybe<NRpcService::TRlPath> GetRlPath() const override {
return Nothing();
}
@@ -201,6 +212,7 @@ private:
NYql::TIssueManager IssueManager;
google::protobuf::Arena Arena;
std::unique_ptr<Ydb::CostInfo> CostInfo;
+ std::unique_ptr<Ydb::QuotaExceeded> QuotaExceeded;
};
template<typename TRpc>
diff --git a/ydb/core/grpc_services/rpc_load_rows.cpp b/ydb/core/grpc_services/rpc_load_rows.cpp
index 23c163f6f6..be234b7eff 100644
--- a/ydb/core/grpc_services/rpc_load_rows.cpp
+++ b/ydb/core/grpc_services/rpc_load_rows.cpp
@@ -185,8 +185,8 @@ const Ydb::Table::BulkUpsertRequest* GetProtoRequest(IRequestOpCtx* req) {
class TUploadRowsRPCPublic : public NTxProxy::TUploadRowsBase<NKikimrServices::TActivity::GRPC_REQ> {
using TBase = NTxProxy::TUploadRowsBase<NKikimrServices::TActivity::GRPC_REQ>;
public:
- explicit TUploadRowsRPCPublic(IRequestOpCtx* request)
- : TBase(GetDuration(GetProtoRequest(request)->operation_params().operation_timeout()))
+ explicit TUploadRowsRPCPublic(IRequestOpCtx* request, bool diskQuotaExceeded)
+ : TBase(GetDuration(GetProtoRequest(request)->operation_params().operation_timeout()), diskQuotaExceeded)
, Request(request)
{}
@@ -448,8 +448,8 @@ private:
class TUploadColumnsRPCPublic : public NTxProxy::TUploadRowsBase<NKikimrServices::TActivity::GRPC_REQ> {
using TBase = NTxProxy::TUploadRowsBase<NKikimrServices::TActivity::GRPC_REQ>;
public:
- explicit TUploadColumnsRPCPublic(IRequestOpCtx* request)
- : TBase(GetDuration(GetProtoRequest(request)->operation_params().operation_timeout()))
+ explicit TUploadColumnsRPCPublic(IRequestOpCtx* request, bool diskQuotaExceeded)
+ : TBase(GetDuration(GetProtoRequest(request)->operation_params().operation_timeout()), diskQuotaExceeded)
, Request(request)
{}
@@ -664,23 +664,27 @@ private:
};
void DoBulkUpsertRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &) {
+ bool diskQuotaExceeded = p->GetDiskQuotaExceeded();
+
if (GetProtoRequest(p.get())->has_arrow_batch_settings()) {
- TActivationContext::AsActorContext().Register(new TUploadColumnsRPCPublic(p.release()));
+ TActivationContext::AsActorContext().Register(new TUploadColumnsRPCPublic(p.release(), diskQuotaExceeded));
} else if (GetProtoRequest(p.get())->has_csv_settings()) {
- TActivationContext::AsActorContext().Register(new TUploadColumnsRPCPublic(p.release()));
+ TActivationContext::AsActorContext().Register(new TUploadColumnsRPCPublic(p.release(), diskQuotaExceeded));
} else {
- TActivationContext::AsActorContext().Register(new TUploadRowsRPCPublic(p.release()));
+ TActivationContext::AsActorContext().Register(new TUploadRowsRPCPublic(p.release(), diskQuotaExceeded));
}
}
template<>
IActor* TEvBulkUpsertRequest::CreateRpcActor(NKikimr::NGRpcService::IRequestOpCtx* msg) {
+ bool diskQuotaExceeded = msg->GetDiskQuotaExceeded();
+
if (GetProtoRequest(msg)->has_arrow_batch_settings()) {
- return new TUploadColumnsRPCPublic(msg);
+ return new TUploadColumnsRPCPublic(msg, diskQuotaExceeded);
} else if (GetProtoRequest(msg)->has_csv_settings()) {
- return new TUploadColumnsRPCPublic(msg);
+ return new TUploadColumnsRPCPublic(msg, diskQuotaExceeded);
} else {
- return new TUploadRowsRPCPublic(msg);
+ return new TUploadRowsRPCPublic(msg, diskQuotaExceeded);
}
}
diff --git a/ydb/core/grpc_services/rpc_long_tx.cpp b/ydb/core/grpc_services/rpc_long_tx.cpp
index f7b68622ed..a404de7cf3 100644
--- a/ydb/core/grpc_services/rpc_long_tx.cpp
+++ b/ydb/core/grpc_services/rpc_long_tx.cpp
@@ -378,16 +378,16 @@ protected:
}
}
- if (entry.Kind != NSchemeCache::TSchemeCacheNavigate::KindOlapTable) {
- return ReplyError(Ydb::StatusIds::SCHEME_ERROR, "The specified path is not an olap table");
+ if (entry.Kind != NSchemeCache::TSchemeCacheNavigate::KindColumnTable) {
+ return ReplyError(Ydb::StatusIds::SCHEME_ERROR, "The specified path is not an column table");
}
- if (!entry.OlapTableInfo || !entry.OlapTableInfo->Description.HasSharding()
- || !entry.OlapTableInfo->Description.HasSchema()) {
- return ReplyError(Ydb::StatusIds::SCHEME_ERROR, "Olap table expected");
+ if (!entry.ColumnTableInfo || !entry.ColumnTableInfo->Description.HasSharding()
+ || !entry.ColumnTableInfo->Description.HasSchema()) {
+ return ReplyError(Ydb::StatusIds::SCHEME_ERROR, "Column table expected");
}
- const auto& description = entry.OlapTableInfo->Description;
+ const auto& description = entry.ColumnTableInfo->Description;
const auto& schema = description.GetSchema();
const auto& sharding = description.GetSharding();
@@ -397,7 +397,7 @@ protected:
if (!schema.HasEngine() || schema.GetEngine() == NKikimrSchemeOp::COLUMN_ENGINE_NONE ||
(schema.GetEngine() == NKikimrSchemeOp::COLUMN_ENGINE_REPLACING_TIMESERIES && !sharding.HasHashSharding())) {
- return ReplyError(Ydb::StatusIds::SCHEME_ERROR, "Wrong olap table configuration");
+ return ReplyError(Ydb::StatusIds::SCHEME_ERROR, "Wrong column table configuration");
}
ui64 tableId = entry.TableId.PathId.LocalPathId;
@@ -827,13 +827,13 @@ private:
}
}
- if (entry.Kind != NSchemeCache::TSchemeCacheNavigate::KindOlapTable) {
- return ReplyError(Ydb::StatusIds::SCHEME_ERROR, "The specified path is not an olap table");
+ if (entry.Kind != NSchemeCache::TSchemeCacheNavigate::KindColumnTable) {
+ return ReplyError(Ydb::StatusIds::SCHEME_ERROR, "The specified path is not an column table");
}
- Y_VERIFY(entry.OlapTableInfo);
- Y_VERIFY(entry.OlapTableInfo->Description.HasSharding());
- const auto& sharding = entry.OlapTableInfo->Description.GetSharding();
+ Y_VERIFY(entry.ColumnTableInfo);
+ Y_VERIFY(entry.ColumnTableInfo->Description.HasSharding());
+ const auto& sharding = entry.ColumnTableInfo->Description.GetSharding();
TableId = entry.TableId.PathId.LocalPathId;
for (ui64 shardId : sharding.GetColumnShards()) {
diff --git a/ydb/core/kqp/executer/kqp_table_resolver.cpp b/ydb/core/kqp/executer/kqp_table_resolver.cpp
index e8aba64987..65345d3dc6 100644
--- a/ydb/core/kqp/executer/kqp_table_resolver.cpp
+++ b/ydb/core/kqp/executer/kqp_table_resolver.cpp
@@ -81,8 +81,8 @@ private:
return;
}
- if (entry.Kind == NSchemeCache::TSchemeCacheNavigate::KindOlapTable) {
- YQL_ENSURE(entry.OlapTableInfo || entry.OlapStoreInfo);
+ if (entry.Kind == NSchemeCache::TSchemeCacheNavigate::KindColumnTable) {
+ YQL_ENSURE(entry.ColumnTableInfo || entry.OlapStoreInfo);
// NOTE: entry.SysViewInfo might not be empty for OLAP stats virtual tables
table->TableKind = ETableKind::Olap;
} else if (entry.TableId.IsSystemView()) {
diff --git a/ydb/core/kqp/kqp_metadata_loader.cpp b/ydb/core/kqp/kqp_metadata_loader.cpp
index 167b8df3c4..79d5c5b771 100644
--- a/ydb/core/kqp/kqp_metadata_loader.cpp
+++ b/ydb/core/kqp/kqp_metadata_loader.cpp
@@ -109,7 +109,7 @@ TTableMetadataResult GetLoadTableMetadataResult(const NSchemeCache::TSchemeCache
return ResultFromError<TResult>(ToString(entry.Status));
}
- YQL_ENSURE(entry.Kind == EKind::KindTable || entry.Kind == EKind::KindOlapTable);
+ YQL_ENSURE(entry.Kind == EKind::KindTable || entry.Kind == EKind::KindColumnTable);
TTableMetadataResult result;
result.SetSuccess();
@@ -121,7 +121,7 @@ TTableMetadataResult GetLoadTableMetadataResult(const NSchemeCache::TSchemeCache
tableMeta->SchemaVersion = entry.TableId.SchemaVersion;
if (!tableMeta->SysView.empty()) {
- if (entry.Kind == EKind::KindOlapTable) {
+ if (entry.Kind == EKind::KindColumnTable) {
// NOTE: OLAP sys views for stats are themselves represented by OLAP tables
tableMeta->Kind = NYql::EKikimrTableKind::Olap;
} else {
@@ -133,7 +133,7 @@ TTableMetadataResult GetLoadTableMetadataResult(const NSchemeCache::TSchemeCache
tableMeta->Kind = NYql::EKikimrTableKind::Datashard;
break;
- case EKind::KindOlapTable:
+ case EKind::KindColumnTable:
tableMeta->Kind = NYql::EKikimrTableKind::Olap;
break;
diff --git a/ydb/core/kqp/ut/kqp_olap_ut.cpp b/ydb/core/kqp/ut/kqp_olap_ut.cpp
index e3a2125196..4b87a9273a 100644
--- a/ydb/core/kqp/ut/kqp_olap_ut.cpp
+++ b/ydb/core/kqp/ut/kqp_olap_ut.cpp
@@ -457,7 +457,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
}
)");
- legacyClient.CreateOlapTable("/Root/olapStore", R"(
+ legacyClient.CreateColumnTable("/Root/olapStore", R"(
Name: "OlapParametersTable"
ColumnShardCount: 1
)");
@@ -1262,7 +1262,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
if (pushEnabled) {
qBuilder << R"(PRAGMA Kikimr.KqpPushOlapProcess = "true";)" << Endl;
}
-
+
qBuilder << R"(PRAGMA Kikimr.OptEnablePredicateExtract = "false";)" << Endl;
qBuilder << "SELECT `timestamp` FROM `/Root/olapStore/olapTable` WHERE ";
qBuilder << predicate;
diff --git a/ydb/core/sys_view/common/schema.cpp b/ydb/core/sys_view/common/schema.cpp
index d45af9c9d4..88f852e168 100644
--- a/ydb/core/sys_view/common/schema.cpp
+++ b/ydb/core/sys_view/common/schema.cpp
@@ -40,7 +40,7 @@ public:
if (!DomainSystemViews.contains(maybeSystemViewName) &&
!SubDomainSystemViews.contains(maybeSystemViewName) &&
!OlapStoreSystemViews.contains(maybeSystemViewName) &&
- !OlapTableSystemViews.contains(maybeSystemViewName))
+ !ColumnTableSystemViews.contains(maybeSystemViewName))
{
return false;
}
@@ -64,8 +64,8 @@ public:
case ETarget::OlapStore:
view = OlapStoreSystemViews.FindPtr(viewName);
break;
- case ETarget::OlapTable:
- view = OlapTableSystemViews.FindPtr(viewName);
+ case ETarget::ColumnTable:
+ view = ColumnTableSystemViews.FindPtr(viewName);
break;
}
return view ? TMaybe<TSchema>(*view) : Nothing();
@@ -92,9 +92,9 @@ public:
result.push_back(name);
}
break;
- case ETarget::OlapTable:
- result.reserve(OlapTableSystemViews.size());
- for (const auto& [name, _] : OlapTableSystemViews) {
+ case ETarget::ColumnTable:
+ result.reserve(ColumnTableSystemViews.size());
+ for (const auto& [name, _] : ColumnTableSystemViews) {
result.push_back(name);
}
break;
@@ -185,8 +185,8 @@ private:
}
template <typename Table>
- void RegisterOlapTableSystemView(const TStringBuf& name) {
- TSchemaFiller<Table>::Fill(OlapTableSystemViews[name]);
+ void RegisterColumnTableSystemView(const TStringBuf& name) {
+ TSchemaFiller<Table>::Fill(ColumnTableSystemViews[name]);
}
void RegisterSystemViews() {
@@ -215,7 +215,7 @@ private:
RegisterSystemView<Schema::QueryMetrics>(QueryMetricsName);
RegisterOlapStoreSystemView<Schema::PrimaryIndexStats>(StorePrimaryIndexStatsName);
- RegisterOlapTableSystemView<Schema::PrimaryIndexStats>(TablePrimaryIndexStatsName);
+ RegisterColumnTableSystemView<Schema::PrimaryIndexStats>(TablePrimaryIndexStatsName);
RegisterSystemView<Schema::TopPartitions>(TopPartitions1MinuteName);
RegisterSystemView<Schema::TopPartitions>(TopPartitions1HourName);
@@ -225,7 +225,7 @@ private:
THashMap<TString, TSchema> DomainSystemViews;
THashMap<TString, TSchema> SubDomainSystemViews;
THashMap<TString, TSchema> OlapStoreSystemViews;
- THashMap<TString, TSchema> OlapTableSystemViews;
+ THashMap<TString, TSchema> ColumnTableSystemViews;
};
ISystemViewResolver* CreateSystemViewResolver() {
diff --git a/ydb/core/sys_view/common/schema.h b/ydb/core/sys_view/common/schema.h
index cf7830073f..779d907721 100644
--- a/ydb/core/sys_view/common/schema.h
+++ b/ydb/core/sys_view/common/schema.h
@@ -464,7 +464,7 @@ public:
Domain,
SubDomain,
OlapStore,
- OlapTable
+ ColumnTable
};
struct TSystemViewPath {
diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp
index 8164409a78..41fa8a4312 100644
--- a/ydb/core/testlib/test_client.cpp
+++ b/ydb/core/testlib/test_client.cpp
@@ -1475,14 +1475,14 @@ namespace Tests {
return (NMsgBusProxy::EResponseStatus)response.GetStatus();
}
- NMsgBusProxy::EResponseStatus TClient::CreateOlapTable(const TString& parent, const TString& scheme) {
+ NMsgBusProxy::EResponseStatus TClient::CreateColumnTable(const TString& parent, const TString& scheme) {
NKikimrSchemeOp::TColumnTableDescription table;
bool parseOk = ::google::protobuf::TextFormat::ParseFromString(scheme, &table);
UNIT_ASSERT(parseOk);
- return CreateOlapTable(parent, table);
+ return CreateColumnTable(parent, table);
}
- NMsgBusProxy::EResponseStatus TClient::CreateOlapTable(const TString& parent,
+ NMsgBusProxy::EResponseStatus TClient::CreateColumnTable(const TString& parent,
const NKikimrSchemeOp::TColumnTableDescription& table) {
auto request = std::make_unique<NMsgBusProxy::TBusSchemeOperation>();
auto* op = request->Record.MutableTransaction()->MutableModifyScheme();
diff --git a/ydb/core/testlib/test_client.h b/ydb/core/testlib/test_client.h
index e2143be72a..5c788dab40 100644
--- a/ydb/core/testlib/test_client.h
+++ b/ydb/core/testlib/test_client.h
@@ -394,8 +394,16 @@ namespace Tests {
NMsgBusProxy::EResponseStatus CreateOlapStore(const TString& parent, const TString& scheme);
NMsgBusProxy::EResponseStatus CreateOlapStore(const TString& parent, const NKikimrSchemeOp::TColumnStoreDescription& store);
- NMsgBusProxy::EResponseStatus CreateOlapTable(const TString& parent, const TString& scheme);
- NMsgBusProxy::EResponseStatus CreateOlapTable(const TString& parent, const NKikimrSchemeOp::TColumnTableDescription& table);
+ NMsgBusProxy::EResponseStatus CreateColumnTable(const TString& parent, const TString& scheme);
+ NMsgBusProxy::EResponseStatus CreateColumnTable(const TString& parent, const NKikimrSchemeOp::TColumnTableDescription& table);
+#if 1 // legacy names
+ NMsgBusProxy::EResponseStatus CreateOlapTable(const TString& parent, const TString& scheme) {
+ return CreateColumnTable(parent, scheme);
+ }
+ NMsgBusProxy::EResponseStatus CreateOlapTable(const TString& parent, const NKikimrSchemeOp::TColumnTableDescription& table) {
+ return CreateColumnTable(parent, table);
+ }
+#endif
NMsgBusProxy::EResponseStatus CreateSolomon(const TString& parent, const TString& name, ui32 parts = 4, ui32 channelProfile = 0);
NMsgBusProxy::EResponseStatus StoreTableBackup(const TString& parent, const NKikimrSchemeOp::TBackupTask& task);
NMsgBusProxy::EResponseStatus DeleteTopic(const TString& parent, const TString& name);
diff --git a/ydb/core/tx/scheme_board/cache.cpp b/ydb/core/tx/scheme_board/cache.cpp
index 6a871495f0..f2169e07b8 100644
--- a/ydb/core/tx/scheme_board/cache.cpp
+++ b/ydb/core/tx/scheme_board/cache.cpp
@@ -220,7 +220,7 @@ namespace {
entry.SolomonVolumeInfo.Drop();
entry.PQGroupInfo.Drop();
entry.OlapStoreInfo.Drop();
- entry.OlapTableInfo.Drop();
+ entry.ColumnTableInfo.Drop();
entry.CdcStreamInfo.Drop();
entry.SequenceInfo.Drop();
entry.ReplicationInfo.Drop();
@@ -730,7 +730,7 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> {
SolomonVolumeInfo.Drop();
PQGroupInfo.Drop();
OlapStoreInfo.Drop();
- OlapTableInfo.Drop();
+ ColumnTableInfo.Drop();
CdcStreamInfo.Drop();
SequenceInfo.Drop();
ReplicationInfo.Drop();
@@ -796,7 +796,7 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> {
}
}
- void FillTableInfoFromOlapTable(const NKikimrSchemeOp::TPathDescription& pathDesc) {
+ void FillTableInfoFromColumnTable(const NKikimrSchemeOp::TPathDescription& pathDesc) {
const auto& desc = pathDesc.GetColumnTableDescription();
THashMap<TString, ui32> nameToId;
@@ -1149,7 +1149,7 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> {
DESCRIPTION_PART(SolomonVolumeInfo);
DESCRIPTION_PART(PQGroupInfo);
DESCRIPTION_PART(OlapStoreInfo);
- DESCRIPTION_PART(OlapTableInfo);
+ DESCRIPTION_PART(ColumnTableInfo);
DESCRIPTION_PART(CdcStreamInfo);
DESCRIPTION_PART(SequenceInfo);
DESCRIPTION_PART(ReplicationInfo);
@@ -1403,14 +1403,14 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> {
FillInfo(Kind, OlapStoreInfo, std::move(*pathDesc.MutableColumnStoreDescription()));
break;
case NKikimrSchemeOp::EPathTypeColumnTable:
- Kind = TNavigate::KindOlapTable;
+ Kind = TNavigate::KindColumnTable;
if (Created) {
- FillTableInfoFromOlapTable(pathDesc);
+ FillTableInfoFromColumnTable(pathDesc);
}
- FillInfo(Kind, OlapTableInfo, std::move(*pathDesc.MutableColumnTableDescription()));
- if (OlapTableInfo->Description.HasColumnStorePathId()) {
- auto& p = OlapTableInfo->Description.GetColumnStorePathId();
- OlapTableInfo->OlapStoreId = TTableId(p.GetOwnerId(), p.GetLocalId());
+ FillInfo(Kind, ColumnTableInfo, std::move(*pathDesc.MutableColumnTableDescription()));
+ if (ColumnTableInfo->Description.HasColumnStorePathId()) {
+ auto& p = ColumnTableInfo->Description.GetColumnStorePathId();
+ ColumnTableInfo->OlapStoreId = TTableId(p.GetOwnerId(), p.GetLocalId());
}
break;
case NKikimrSchemeOp::EPathTypeTableIndex:
@@ -1491,7 +1491,7 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> {
ListNodeEntry->Children.emplace_back(name, pathId, TNavigate::KindOlapStore);
break;
case NKikimrSchemeOp::EPathTypeColumnTable:
- ListNodeEntry->Children.emplace_back(name, pathId, TNavigate::KindOlapTable);
+ ListNodeEntry->Children.emplace_back(name, pathId, TNavigate::KindColumnTable);
break;
case NKikimrSchemeOp::EPathTypeRtmrVolume:
ListNodeEntry->Children.emplace_back(name, pathId, TNavigate::KindRtmr);
@@ -1600,10 +1600,10 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> {
entry.Kind = TNavigate::KindTable;
if (target == NSysView::ISystemViewResolver::ETarget::OlapStore ||
- target == NSysView::ISystemViewResolver::ETarget::OlapTable)
+ target == NSysView::ISystemViewResolver::ETarget::ColumnTable)
{
// OLAP sys views are represented by OLAP tables
- entry.Kind =TNavigate::KindOlapTable;
+ entry.Kind =TNavigate::KindColumnTable;
}
entry.Columns = std::move(schema->Columns);
@@ -1654,17 +1654,17 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> {
FillSystemViewEntry(context, entry, NSysView::ISystemViewResolver::ETarget::OlapStore);
entry.OlapStoreInfo = OlapStoreInfo;
return;
- } else if (Kind == TNavigate::KindOlapTable) {
- FillSystemViewEntry(context, entry, NSysView::ISystemViewResolver::ETarget::OlapTable);
+ } else if (Kind == TNavigate::KindColumnTable) {
+ FillSystemViewEntry(context, entry, NSysView::ISystemViewResolver::ETarget::ColumnTable);
entry.OlapStoreInfo = OlapStoreInfo;
- entry.OlapTableInfo = OlapTableInfo;
+ entry.ColumnTableInfo = ColumnTableInfo;
return;
}
return SetError(context, entry, TNavigate::EStatus::PathErrorUnknown);
}
- const bool isTable = Kind == TNavigate::KindTable || Kind == TNavigate::KindOlapTable;
+ const bool isTable = Kind == TNavigate::KindTable || Kind == TNavigate::KindColumnTable;
if (entry.Operation == TNavigate::OpTable && !isTable) {
return SetError(context, entry, TNavigate::EStatus::PathNotTable);
}
@@ -1720,7 +1720,7 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> {
entry.SolomonVolumeInfo = SolomonVolumeInfo;
entry.PQGroupInfo = PQGroupInfo;
entry.OlapStoreInfo = OlapStoreInfo;
- entry.OlapTableInfo = OlapTableInfo;
+ entry.ColumnTableInfo = ColumnTableInfo;
entry.CdcStreamInfo = CdcStreamInfo;
entry.SequenceInfo = SequenceInfo;
entry.ReplicationInfo = ReplicationInfo;
@@ -1858,11 +1858,11 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> {
}
keyDesc.Partitioning = std::move(partitions);
return;
- } else if (Kind == TNavigate::KindOlapTable) {
- FillSystemViewEntry(context, entry, NSysView::ISystemViewResolver::ETarget::OlapTable);
+ } else if (Kind == TNavigate::KindColumnTable) {
+ FillSystemViewEntry(context, entry, NSysView::ISystemViewResolver::ETarget::ColumnTable);
// Add all shards of the OLAP table
auto partitions = std::make_shared<TVector<TKeyDesc::TPartitionInfo>>();
- for (ui64 columnShard : OlapTableInfo->Description.GetSharding().GetColumnShards()) {
+ for (ui64 columnShard : ColumnTableInfo->Description.GetSharding().GetColumnShards()) {
partitions->push_back(TKeyDesc::TPartitionInfo(columnShard));
partitions->back().Range = TKeyDesc::TPartitionRangeInfo();
}
@@ -1900,10 +1900,10 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> {
++context->Request->ErrorCount;
}
}
- } else if (OlapTableInfo) {
+ } else if (ColumnTableInfo) {
// TODO: return proper partitioning info (KIKIMR-11069)
auto partitions = std::make_shared<TVector<TKeyDesc::TPartitionInfo>>();
- for (ui64 columnShard : OlapTableInfo->Description.GetSharding().GetColumnShards()) {
+ for (ui64 columnShard : ColumnTableInfo->Description.GetSharding().GetColumnShards()) {
partitions->push_back(TKeyDesc::TPartitionInfo(columnShard));
partitions->back().Range = TKeyDesc::TPartitionRangeInfo();
}
@@ -1985,7 +1985,7 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> {
// OlapStore specific
TIntrusivePtr<TNavigate::TOlapStoreInfo> OlapStoreInfo;
- TIntrusivePtr<TNavigate::TOlapTableInfo> OlapTableInfo;
+ TIntrusivePtr<TNavigate::TColumnTableInfo> ColumnTableInfo;
// CDC specific
TIntrusivePtr<TNavigate::TCdcStreamInfo> CdcStreamInfo;
diff --git a/ydb/core/tx/scheme_cache/scheme_cache.h b/ydb/core/tx/scheme_cache/scheme_cache.h
index 3e86fba52a..47d64574d5 100644
--- a/ydb/core/tx/scheme_cache/scheme_cache.h
+++ b/ydb/core/tx/scheme_cache/scheme_cache.h
@@ -120,7 +120,7 @@ struct TSchemeCacheNavigate {
KindExtSubdomain = 9,
KindIndex = 10,
KindOlapStore = 11,
- KindOlapTable = 12,
+ KindColumnTable = 12,
KindCdcStream = 13,
KindSequence = 14,
KindReplication = 15,
@@ -180,7 +180,7 @@ struct TSchemeCacheNavigate {
NKikimrSchemeOp::TColumnStoreDescription Description;
};
- struct TOlapTableInfo : public TAtomicRefCount<TOlapTableInfo> {
+ struct TColumnTableInfo : public TAtomicRefCount<TColumnTableInfo> {
EKind Kind = KindUnknown;
NKikimrSchemeOp::TColumnTableDescription Description;
TTableId OlapStoreId;
@@ -247,7 +247,7 @@ struct TSchemeCacheNavigate {
TIntrusiveConstPtr<TKesusInfo> KesusInfo;
TIntrusiveConstPtr<TSolomonVolumeInfo> SolomonVolumeInfo;
TIntrusiveConstPtr<TOlapStoreInfo> OlapStoreInfo;
- TIntrusiveConstPtr<TOlapTableInfo> OlapTableInfo;
+ TIntrusiveConstPtr<TColumnTableInfo> ColumnTableInfo;
TIntrusiveConstPtr<TCdcStreamInfo> CdcStreamInfo;
TIntrusiveConstPtr<TSequenceInfo> SequenceInfo;
TIntrusiveConstPtr<TReplicationInfo> ReplicationInfo;
diff --git a/ydb/core/tx/tx_proxy/resolvereq.cpp b/ydb/core/tx/tx_proxy/resolvereq.cpp
index a660c82140..d01f795e49 100644
--- a/ydb/core/tx/tx_proxy/resolvereq.cpp
+++ b/ydb/core/tx/tx_proxy/resolvereq.cpp
@@ -180,7 +180,7 @@ namespace {
auto& entry = resp->ResultSet[index];
table.TableId = entry.TableId;
- table.IsOlapTable = (entry.Kind == NSchemeCache::TSchemeCacheNavigate::KindOlapTable);
+ table.IsColumnTable = (entry.Kind == NSchemeCache::TSchemeCacheNavigate::KindColumnTable);
TVector<NScheme::TTypeId> keyColumnTypes(entry.Columns.size());
TVector<TKeyDesc::TColumnOp> columns(entry.Columns.size());
diff --git a/ydb/core/tx/tx_proxy/resolvereq.h b/ydb/core/tx/tx_proxy/resolvereq.h
index 66e365c5ce..65a2fc08f7 100644
--- a/ydb/core/tx/tx_proxy/resolvereq.h
+++ b/ydb/core/tx/tx_proxy/resolvereq.h
@@ -20,7 +20,7 @@ namespace NTxProxy {
TSerializedCellVec ToValues;
THolder<TKeyDesc> KeyDescription;
NSchemeCache::TDomainInfo::TPtr DomainInfo;
- bool IsOlapTable = false;
+ bool IsColumnTable = false;
};
using TResolveTableResponses = TVector<TResolveTableResponse>;
diff --git a/ydb/core/tx/tx_proxy/snapshotreq.cpp b/ydb/core/tx/tx_proxy/snapshotreq.cpp
index 28fc4dd779..727888fd74 100644
--- a/ydb/core/tx/tx_proxy/snapshotreq.cpp
+++ b/ydb/core/tx/tx_proxy/snapshotreq.cpp
@@ -269,7 +269,7 @@ public:
TxProxyMon->TxPrepareResolveHgram->Collect((WallClockResolved - WallClockResolveStarted).MicroSeconds());
- bool hasOlapTable = false;
+ bool hasColumnTable = false;
for (const auto& entry : msg->Tables) {
// N.B. we create all keys as a read operation
ui32 access = 0;
@@ -291,9 +291,9 @@ public:
continue;
}
- if (entry.IsOlapTable) {
+ if (entry.IsColumnTable) {
// OLAP tables don't create snapshots explicitly
- hasOlapTable = true;
+ hasColumnTable = true;
continue;
}
@@ -335,7 +335,7 @@ public:
}
if (PerShardStates.empty()) {
- if (!hasOlapTable) {
+ if (!hasColumnTable) {
// No real (OLTP or OLAP) tables in the request so we can use current time as a fake PlanStep
PlanStep = ctx.Now().MilliSeconds();
@@ -1315,7 +1315,7 @@ public:
continue;
}
- if (entry.IsOlapTable) {
+ if (entry.IsColumnTable) {
// OLAP tables don't create snapshots explicitly
continue;
}
diff --git a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
index e123b55c41..9fa2810447 100644
--- a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
+++ b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h
@@ -152,6 +152,7 @@ protected:
bool WriteToTableShadow = false;
bool AllowWriteToPrivateTable = false;
+ bool DiskQuotaExceeded = false;
std::shared_ptr<arrow::RecordBatch> Batch;
float RuCost = 0.0;
@@ -161,7 +162,7 @@ public:
return DerivedActivityType;
}
- explicit TUploadRowsBase(TDuration timeout = TDuration::Max())
+ explicit TUploadRowsBase(TDuration timeout = TDuration::Max(), bool diskQuotaExceeded = false)
: TBase()
, SchemeCache(MakeSchemeCacheID())
, LeaderPipeCache(MakePipePeNodeCacheID(false))
@@ -169,6 +170,7 @@ public:
, WaitingResolveReply(false)
, Finished(false)
, Status(Ydb::StatusIds::SUCCESS)
+ , DiskQuotaExceeded(diskQuotaExceeded)
{}
void Bootstrap(const NActors::TActorContext& ctx) {
@@ -464,7 +466,9 @@ private:
const NSchemeCache::TSchemeCacheNavigate& request = *ev->Get()->Request;
Y_VERIFY(request.ResultSet.size() == 1);
- switch (request.ResultSet.front().Status) {
+ const NSchemeCache::TSchemeCacheNavigate::TEntry& entry = request.ResultSet.front();
+
+ switch (entry.Status) {
case NSchemeCache::TSchemeCacheNavigate::EStatus::Ok:
break;
case NSchemeCache::TSchemeCacheNavigate::EStatus::LookupError:
@@ -481,17 +485,23 @@ private:
return ReplyWithError(Ydb::StatusIds::GENERIC_ERROR, Sprintf("Unknown error on table '%s'", GetTable().c_str()), ctx);
}
- TableKind = request.ResultSet.front().Kind;
- bool isOlapTable = (TableKind == NSchemeCache::TSchemeCacheNavigate::KindOlapTable);
+ TableKind = entry.Kind;
+ bool isColumnTable = (TableKind == NSchemeCache::TSchemeCacheNavigate::KindColumnTable);
- if (request.ResultSet.front().TableId.IsSystemView()) {
+ if (entry.TableId.IsSystemView()) {
return ReplyWithError(Ydb::StatusIds::SCHEME_ERROR,
Sprintf("Table '%s' is a system view. Bulk upsert is not supported.", GetTable().c_str()), ctx);
}
+ // TODO: fast fail for all tables?
+ if (isColumnTable && DiskQuotaExceeded) {
+ return ReplyWithError(Ydb::StatusIds::UNAVAILABLE,
+ "Cannot perform writes: database is out of disk space", ctx);
+ }
+
ResolveNamesResult.reset(ev->Get()->Request.Release());
- bool makeYdbSchema = isOlapTable || (GetSourceType() != EUploadSource::ProtoValues);
+ bool makeYdbSchema = isColumnTable || (GetSourceType() != EUploadSource::ProtoValues);
TString errorMessage;
if (!BuildSchema(ctx, errorMessage, makeYdbSchema)) {
return ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, errorMessage, ctx);
@@ -504,7 +514,7 @@ private:
return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, errorMessage, ctx);
}
- if (isOlapTable) {
+ if (isColumnTable) {
// TUploadRowsRPCPublic::ExtractBatch() - converted JsonDocument, DynNumbers, ...
if (!ExtractBatch(errorMessage)) {
return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, errorMessage, ctx);
@@ -517,7 +527,7 @@ private:
case EUploadSource::ArrowBatch:
case EUploadSource::CSV:
{
- if (isOlapTable) {
+ if (isColumnTable) {
// TUploadColumnsRPCPublic::ExtractBatch() - NOT converted JsonDocument, DynNumbers, ...
if (!ExtractBatch(errorMessage)) {
return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, errorMessage, ctx);
@@ -557,15 +567,15 @@ private:
if (TableKind == NSchemeCache::TSchemeCacheNavigate::KindTable) {
ResolveShards(ctx);
- } else if (isOlapTable) {
- WriteToOlapTable(ctx);
+ } else if (isColumnTable) {
+ WriteToColumnTable(ctx);
} else {
return ReplyWithError(Ydb::StatusIds::SCHEME_ERROR,
Sprintf("Table '%s': Bulk upsert is not supported for this table kind.", GetTable().c_str()), ctx);
}
}
- void WriteToOlapTable(const NActors::TActorContext& ctx) {
+ void WriteToColumnTable(const NActors::TActorContext& ctx) {
TString accessCheckError;
if (!CheckAccess(accessCheckError)) {
return ReplyWithError(Ydb::StatusIds::UNAUTHORIZED, accessCheckError, ctx);
@@ -663,17 +673,17 @@ private:
auto& entry = ResolveNamesResult->ResultSet[0];
- if (entry.Kind != NSchemeCache::TSchemeCacheNavigate::KindOlapTable) {
+ if (entry.Kind != NSchemeCache::TSchemeCacheNavigate::KindColumnTable) {
ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, "The specified path is not an olap table", ctx);
return {};
}
- if (!entry.OlapTableInfo || !entry.OlapTableInfo->Description.HasSchema()) {
+ if (!entry.ColumnTableInfo || !entry.ColumnTableInfo->Description.HasSchema()) {
ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, "Olap table expected", ctx);
return {};
}
- const auto& description = entry.OlapTableInfo->Description;
+ const auto& description = entry.ColumnTableInfo->Description;
const auto& schema = description.GetSchema();
#if 1 // TODO: do we need this restriction?
diff --git a/ydb/public/api/protos/ydb_common.proto b/ydb/public/api/protos/ydb_common.proto
index a63e202f82..2ecb33fb34 100644
--- a/ydb/public/api/protos/ydb_common.proto
+++ b/ydb/public/api/protos/ydb_common.proto
@@ -17,3 +17,7 @@ message CostInfo {
// Total amount of request units (RU), consumed by the operation.
double consumed_units = 1;
}
+
+message QuotaExceeded {
+ bool disk = 1;
+}
diff --git a/ydb/services/ydb/ydb_common_ut.h b/ydb/services/ydb/ydb_common_ut.h
index 05ba1cc386..3a18893037 100644
--- a/ydb/services/ydb/ydb_common_ut.h
+++ b/ydb/services/ydb/ydb_common_ut.h
@@ -235,7 +235,7 @@ struct TTestOlap {
annoyingClient.SetSecurityToken("root@builtin");
NMsgBusProxy::EResponseStatus status = annoyingClient.CreateOlapStore("/Root", tableDescr);
UNIT_ASSERT_VALUES_EQUAL(status, NMsgBusProxy::EResponseStatus::MSTATUS_OK);
- status = annoyingClient.CreateOlapTable("/Root", Sprintf(R"(
+ status = annoyingClient.CreateColumnTable("/Root", Sprintf(R"(
Name: "%s/%s"
ColumnShardCount : %d
Sharding {
diff --git a/ydb/services/ydb/ydb_olapstore_ut.cpp b/ydb/services/ydb/ydb_olapstore_ut.cpp
index e07d9459f1..f48aee4899 100644
--- a/ydb/services/ydb/ydb_olapstore_ut.cpp
+++ b/ydb/services/ydb/ydb_olapstore_ut.cpp
@@ -76,7 +76,7 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) {
NMsgBusProxy::EResponseStatus status = annoyingClient.CreateOlapStore("/Root", tableDescr);
UNIT_ASSERT_VALUES_EQUAL(status, NMsgBusProxy::EResponseStatus::MSTATUS_OK);
- status = annoyingClient.CreateOlapTable("/Root/OlapStore", Sprintf(R"(
+ status = annoyingClient.CreateColumnTable("/Root/OlapStore", Sprintf(R"(
Name: "%s"
ColumnShardCount : %d
Sharding {