diff options
author | Artem Zuikov <chertus@gmail.com> | 2022-07-06 20:24:14 +0300 |
---|---|---|
committer | Artem Zuikov <chertus@gmail.com> | 2022-07-06 20:24:14 +0300 |
commit | b2ec03de03af07aefcce3360eb3a2995d4fac12f (patch) | |
tree | 516c012b9788cf560ebb6bf686526259712bae6d | |
parent | 043d2bac9c33c1cc375da81492ad8a316bf0f2ae (diff) | |
download | ydb-b2ec03de03af07aefcce3360eb3a2995d4fac12f.tar.gz |
KIKIMR-13157: check DB quota in ColumnTables upserts
ref:8cc5d31abf1bc5be595824c4edaf925831c51286
21 files changed, 164 insertions, 96 deletions
diff --git a/ydb/core/grpc_services/base/base.h b/ydb/core/grpc_services/base/base.h index 62113c5c70..e45beccd1a 100644 --- a/ydb/core/grpc_services/base/base.h +++ b/ydb/core/grpc_services/base/base.h @@ -269,6 +269,8 @@ public: virtual void RaiseIssues(const NYql::TIssues& issues) = 0; virtual TMaybe<TString> GetTraceId() const = 0; virtual const TString& GetRequestName() const = 0; + virtual void SetDiskQuotaExceeded(bool disk) = 0; + virtual bool GetDiskQuotaExceeded() const = 0; }; class TRespHookCtx : public TThrRefBase { @@ -353,7 +355,6 @@ public: // Pass request for next processing virtual void Pass(const IFacilityProvider& facility) = 0; - }; // Request context @@ -476,6 +477,13 @@ public: return TMaybe<TString>{}; } + void SetDiskQuotaExceeded(bool) override { + } + + bool GetDiskQuotaExceeded() const override { + return false; + } + void ReplyWithRpcStatus(grpc::StatusCode, const TString&) override { Y_FAIL("Unimplemented"); } @@ -707,6 +715,13 @@ public: return ToMaybe(Ctx_->GetPeerMetaValues(key)); } + void SetDiskQuotaExceeded(bool) override { + } + + bool GetDiskQuotaExceeded() const override { + return false; + } + void RefreshToken(const TString& token, const TActorContext& ctx, TActorId id) { NGRpcService::RefreshToken(token, GetDatabaseName().GetOrElse(""), ctx, id); } @@ -907,6 +922,17 @@ public: return ToMaybe(Ctx_->GetPeerMetaValues(key)); } + void SetDiskQuotaExceeded(bool disk) override { + if (!QuotaExceeded) { + QuotaExceeded = google::protobuf::Arena::CreateMessage<Ydb::QuotaExceeded>(GetArena()); + } + QuotaExceeded->set_disk(disk); + } + + bool GetDiskQuotaExceeded() const override { + return QuotaExceeded ? QuotaExceeded->disk() : false; + } + bool Validate(TString&) override { return true; } @@ -1066,6 +1092,7 @@ private: TString InternalToken_; NYql::TIssueManager IssueManager; Ydb::CostInfo* CostInfo = nullptr; + Ydb::QuotaExceeded* QuotaExceeded = nullptr; ui64 Ru = 0; TRespHook RespHook; TMaybe<NRpcService::TRlPath> RlPath; diff --git a/ydb/core/grpc_services/grpc_request_proxy.cpp b/ydb/core/grpc_services/grpc_request_proxy.cpp index 04b7a3f6f2..d16e1c5284 100644 --- a/ydb/core/grpc_services/grpc_request_proxy.cpp +++ b/ydb/core/grpc_services/grpc_request_proxy.cpp @@ -248,6 +248,9 @@ private: return; } } + if (domain.GetDomainState().GetDiskQuotaExceeded()) { + requestBaseCtx->SetDiskQuotaExceeded(true); + } } else { Counters->IncDatabaseUnavailableCounter(); auto issue = MakeIssue(NKikimrIssues::TIssuesIds::YDB_DB_NOT_READY, "database unavailable"); diff --git a/ydb/core/grpc_services/local_rpc/local_rpc.h b/ydb/core/grpc_services/local_rpc/local_rpc.h index da29f6ae9e..a7c3d378fa 100644 --- a/ydb/core/grpc_services/local_rpc/local_rpc.h +++ b/ydb/core/grpc_services/local_rpc/local_rpc.h @@ -181,6 +181,17 @@ public: CostInfo->set_consumed_units(consumed_units); } + void SetDiskQuotaExceeded(bool disk) override { + if (!QuotaExceeded) { + QuotaExceeded = std::make_unique<Ydb::QuotaExceeded>(); + } + QuotaExceeded->set_disk(disk); + } + + bool GetDiskQuotaExceeded() const override { + return QuotaExceeded ? QuotaExceeded->disk() : false; + } + TMaybe<NRpcService::TRlPath> GetRlPath() const override { return Nothing(); } @@ -201,6 +212,7 @@ private: NYql::TIssueManager IssueManager; google::protobuf::Arena Arena; std::unique_ptr<Ydb::CostInfo> CostInfo; + std::unique_ptr<Ydb::QuotaExceeded> QuotaExceeded; }; template<typename TRpc> diff --git a/ydb/core/grpc_services/rpc_load_rows.cpp b/ydb/core/grpc_services/rpc_load_rows.cpp index 23c163f6f6..be234b7eff 100644 --- a/ydb/core/grpc_services/rpc_load_rows.cpp +++ b/ydb/core/grpc_services/rpc_load_rows.cpp @@ -185,8 +185,8 @@ const Ydb::Table::BulkUpsertRequest* GetProtoRequest(IRequestOpCtx* req) { class TUploadRowsRPCPublic : public NTxProxy::TUploadRowsBase<NKikimrServices::TActivity::GRPC_REQ> { using TBase = NTxProxy::TUploadRowsBase<NKikimrServices::TActivity::GRPC_REQ>; public: - explicit TUploadRowsRPCPublic(IRequestOpCtx* request) - : TBase(GetDuration(GetProtoRequest(request)->operation_params().operation_timeout())) + explicit TUploadRowsRPCPublic(IRequestOpCtx* request, bool diskQuotaExceeded) + : TBase(GetDuration(GetProtoRequest(request)->operation_params().operation_timeout()), diskQuotaExceeded) , Request(request) {} @@ -448,8 +448,8 @@ private: class TUploadColumnsRPCPublic : public NTxProxy::TUploadRowsBase<NKikimrServices::TActivity::GRPC_REQ> { using TBase = NTxProxy::TUploadRowsBase<NKikimrServices::TActivity::GRPC_REQ>; public: - explicit TUploadColumnsRPCPublic(IRequestOpCtx* request) - : TBase(GetDuration(GetProtoRequest(request)->operation_params().operation_timeout())) + explicit TUploadColumnsRPCPublic(IRequestOpCtx* request, bool diskQuotaExceeded) + : TBase(GetDuration(GetProtoRequest(request)->operation_params().operation_timeout()), diskQuotaExceeded) , Request(request) {} @@ -664,23 +664,27 @@ private: }; void DoBulkUpsertRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider &) { + bool diskQuotaExceeded = p->GetDiskQuotaExceeded(); + if (GetProtoRequest(p.get())->has_arrow_batch_settings()) { - TActivationContext::AsActorContext().Register(new TUploadColumnsRPCPublic(p.release())); + TActivationContext::AsActorContext().Register(new TUploadColumnsRPCPublic(p.release(), diskQuotaExceeded)); } else if (GetProtoRequest(p.get())->has_csv_settings()) { - TActivationContext::AsActorContext().Register(new TUploadColumnsRPCPublic(p.release())); + TActivationContext::AsActorContext().Register(new TUploadColumnsRPCPublic(p.release(), diskQuotaExceeded)); } else { - TActivationContext::AsActorContext().Register(new TUploadRowsRPCPublic(p.release())); + TActivationContext::AsActorContext().Register(new TUploadRowsRPCPublic(p.release(), diskQuotaExceeded)); } } template<> IActor* TEvBulkUpsertRequest::CreateRpcActor(NKikimr::NGRpcService::IRequestOpCtx* msg) { + bool diskQuotaExceeded = msg->GetDiskQuotaExceeded(); + if (GetProtoRequest(msg)->has_arrow_batch_settings()) { - return new TUploadColumnsRPCPublic(msg); + return new TUploadColumnsRPCPublic(msg, diskQuotaExceeded); } else if (GetProtoRequest(msg)->has_csv_settings()) { - return new TUploadColumnsRPCPublic(msg); + return new TUploadColumnsRPCPublic(msg, diskQuotaExceeded); } else { - return new TUploadRowsRPCPublic(msg); + return new TUploadRowsRPCPublic(msg, diskQuotaExceeded); } } diff --git a/ydb/core/grpc_services/rpc_long_tx.cpp b/ydb/core/grpc_services/rpc_long_tx.cpp index f7b68622ed..a404de7cf3 100644 --- a/ydb/core/grpc_services/rpc_long_tx.cpp +++ b/ydb/core/grpc_services/rpc_long_tx.cpp @@ -378,16 +378,16 @@ protected: } } - if (entry.Kind != NSchemeCache::TSchemeCacheNavigate::KindOlapTable) { - return ReplyError(Ydb::StatusIds::SCHEME_ERROR, "The specified path is not an olap table"); + if (entry.Kind != NSchemeCache::TSchemeCacheNavigate::KindColumnTable) { + return ReplyError(Ydb::StatusIds::SCHEME_ERROR, "The specified path is not an column table"); } - if (!entry.OlapTableInfo || !entry.OlapTableInfo->Description.HasSharding() - || !entry.OlapTableInfo->Description.HasSchema()) { - return ReplyError(Ydb::StatusIds::SCHEME_ERROR, "Olap table expected"); + if (!entry.ColumnTableInfo || !entry.ColumnTableInfo->Description.HasSharding() + || !entry.ColumnTableInfo->Description.HasSchema()) { + return ReplyError(Ydb::StatusIds::SCHEME_ERROR, "Column table expected"); } - const auto& description = entry.OlapTableInfo->Description; + const auto& description = entry.ColumnTableInfo->Description; const auto& schema = description.GetSchema(); const auto& sharding = description.GetSharding(); @@ -397,7 +397,7 @@ protected: if (!schema.HasEngine() || schema.GetEngine() == NKikimrSchemeOp::COLUMN_ENGINE_NONE || (schema.GetEngine() == NKikimrSchemeOp::COLUMN_ENGINE_REPLACING_TIMESERIES && !sharding.HasHashSharding())) { - return ReplyError(Ydb::StatusIds::SCHEME_ERROR, "Wrong olap table configuration"); + return ReplyError(Ydb::StatusIds::SCHEME_ERROR, "Wrong column table configuration"); } ui64 tableId = entry.TableId.PathId.LocalPathId; @@ -827,13 +827,13 @@ private: } } - if (entry.Kind != NSchemeCache::TSchemeCacheNavigate::KindOlapTable) { - return ReplyError(Ydb::StatusIds::SCHEME_ERROR, "The specified path is not an olap table"); + if (entry.Kind != NSchemeCache::TSchemeCacheNavigate::KindColumnTable) { + return ReplyError(Ydb::StatusIds::SCHEME_ERROR, "The specified path is not an column table"); } - Y_VERIFY(entry.OlapTableInfo); - Y_VERIFY(entry.OlapTableInfo->Description.HasSharding()); - const auto& sharding = entry.OlapTableInfo->Description.GetSharding(); + Y_VERIFY(entry.ColumnTableInfo); + Y_VERIFY(entry.ColumnTableInfo->Description.HasSharding()); + const auto& sharding = entry.ColumnTableInfo->Description.GetSharding(); TableId = entry.TableId.PathId.LocalPathId; for (ui64 shardId : sharding.GetColumnShards()) { diff --git a/ydb/core/kqp/executer/kqp_table_resolver.cpp b/ydb/core/kqp/executer/kqp_table_resolver.cpp index e8aba64987..65345d3dc6 100644 --- a/ydb/core/kqp/executer/kqp_table_resolver.cpp +++ b/ydb/core/kqp/executer/kqp_table_resolver.cpp @@ -81,8 +81,8 @@ private: return; } - if (entry.Kind == NSchemeCache::TSchemeCacheNavigate::KindOlapTable) { - YQL_ENSURE(entry.OlapTableInfo || entry.OlapStoreInfo); + if (entry.Kind == NSchemeCache::TSchemeCacheNavigate::KindColumnTable) { + YQL_ENSURE(entry.ColumnTableInfo || entry.OlapStoreInfo); // NOTE: entry.SysViewInfo might not be empty for OLAP stats virtual tables table->TableKind = ETableKind::Olap; } else if (entry.TableId.IsSystemView()) { diff --git a/ydb/core/kqp/kqp_metadata_loader.cpp b/ydb/core/kqp/kqp_metadata_loader.cpp index 167b8df3c4..79d5c5b771 100644 --- a/ydb/core/kqp/kqp_metadata_loader.cpp +++ b/ydb/core/kqp/kqp_metadata_loader.cpp @@ -109,7 +109,7 @@ TTableMetadataResult GetLoadTableMetadataResult(const NSchemeCache::TSchemeCache return ResultFromError<TResult>(ToString(entry.Status)); } - YQL_ENSURE(entry.Kind == EKind::KindTable || entry.Kind == EKind::KindOlapTable); + YQL_ENSURE(entry.Kind == EKind::KindTable || entry.Kind == EKind::KindColumnTable); TTableMetadataResult result; result.SetSuccess(); @@ -121,7 +121,7 @@ TTableMetadataResult GetLoadTableMetadataResult(const NSchemeCache::TSchemeCache tableMeta->SchemaVersion = entry.TableId.SchemaVersion; if (!tableMeta->SysView.empty()) { - if (entry.Kind == EKind::KindOlapTable) { + if (entry.Kind == EKind::KindColumnTable) { // NOTE: OLAP sys views for stats are themselves represented by OLAP tables tableMeta->Kind = NYql::EKikimrTableKind::Olap; } else { @@ -133,7 +133,7 @@ TTableMetadataResult GetLoadTableMetadataResult(const NSchemeCache::TSchemeCache tableMeta->Kind = NYql::EKikimrTableKind::Datashard; break; - case EKind::KindOlapTable: + case EKind::KindColumnTable: tableMeta->Kind = NYql::EKikimrTableKind::Olap; break; diff --git a/ydb/core/kqp/ut/kqp_olap_ut.cpp b/ydb/core/kqp/ut/kqp_olap_ut.cpp index e3a2125196..4b87a9273a 100644 --- a/ydb/core/kqp/ut/kqp_olap_ut.cpp +++ b/ydb/core/kqp/ut/kqp_olap_ut.cpp @@ -457,7 +457,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { } )"); - legacyClient.CreateOlapTable("/Root/olapStore", R"( + legacyClient.CreateColumnTable("/Root/olapStore", R"( Name: "OlapParametersTable" ColumnShardCount: 1 )"); @@ -1262,7 +1262,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { if (pushEnabled) { qBuilder << R"(PRAGMA Kikimr.KqpPushOlapProcess = "true";)" << Endl; } - + qBuilder << R"(PRAGMA Kikimr.OptEnablePredicateExtract = "false";)" << Endl; qBuilder << "SELECT `timestamp` FROM `/Root/olapStore/olapTable` WHERE "; qBuilder << predicate; diff --git a/ydb/core/sys_view/common/schema.cpp b/ydb/core/sys_view/common/schema.cpp index d45af9c9d4..88f852e168 100644 --- a/ydb/core/sys_view/common/schema.cpp +++ b/ydb/core/sys_view/common/schema.cpp @@ -40,7 +40,7 @@ public: if (!DomainSystemViews.contains(maybeSystemViewName) && !SubDomainSystemViews.contains(maybeSystemViewName) && !OlapStoreSystemViews.contains(maybeSystemViewName) && - !OlapTableSystemViews.contains(maybeSystemViewName)) + !ColumnTableSystemViews.contains(maybeSystemViewName)) { return false; } @@ -64,8 +64,8 @@ public: case ETarget::OlapStore: view = OlapStoreSystemViews.FindPtr(viewName); break; - case ETarget::OlapTable: - view = OlapTableSystemViews.FindPtr(viewName); + case ETarget::ColumnTable: + view = ColumnTableSystemViews.FindPtr(viewName); break; } return view ? TMaybe<TSchema>(*view) : Nothing(); @@ -92,9 +92,9 @@ public: result.push_back(name); } break; - case ETarget::OlapTable: - result.reserve(OlapTableSystemViews.size()); - for (const auto& [name, _] : OlapTableSystemViews) { + case ETarget::ColumnTable: + result.reserve(ColumnTableSystemViews.size()); + for (const auto& [name, _] : ColumnTableSystemViews) { result.push_back(name); } break; @@ -185,8 +185,8 @@ private: } template <typename Table> - void RegisterOlapTableSystemView(const TStringBuf& name) { - TSchemaFiller<Table>::Fill(OlapTableSystemViews[name]); + void RegisterColumnTableSystemView(const TStringBuf& name) { + TSchemaFiller<Table>::Fill(ColumnTableSystemViews[name]); } void RegisterSystemViews() { @@ -215,7 +215,7 @@ private: RegisterSystemView<Schema::QueryMetrics>(QueryMetricsName); RegisterOlapStoreSystemView<Schema::PrimaryIndexStats>(StorePrimaryIndexStatsName); - RegisterOlapTableSystemView<Schema::PrimaryIndexStats>(TablePrimaryIndexStatsName); + RegisterColumnTableSystemView<Schema::PrimaryIndexStats>(TablePrimaryIndexStatsName); RegisterSystemView<Schema::TopPartitions>(TopPartitions1MinuteName); RegisterSystemView<Schema::TopPartitions>(TopPartitions1HourName); @@ -225,7 +225,7 @@ private: THashMap<TString, TSchema> DomainSystemViews; THashMap<TString, TSchema> SubDomainSystemViews; THashMap<TString, TSchema> OlapStoreSystemViews; - THashMap<TString, TSchema> OlapTableSystemViews; + THashMap<TString, TSchema> ColumnTableSystemViews; }; ISystemViewResolver* CreateSystemViewResolver() { diff --git a/ydb/core/sys_view/common/schema.h b/ydb/core/sys_view/common/schema.h index cf7830073f..779d907721 100644 --- a/ydb/core/sys_view/common/schema.h +++ b/ydb/core/sys_view/common/schema.h @@ -464,7 +464,7 @@ public: Domain, SubDomain, OlapStore, - OlapTable + ColumnTable }; struct TSystemViewPath { diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index 8164409a78..41fa8a4312 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -1475,14 +1475,14 @@ namespace Tests { return (NMsgBusProxy::EResponseStatus)response.GetStatus(); } - NMsgBusProxy::EResponseStatus TClient::CreateOlapTable(const TString& parent, const TString& scheme) { + NMsgBusProxy::EResponseStatus TClient::CreateColumnTable(const TString& parent, const TString& scheme) { NKikimrSchemeOp::TColumnTableDescription table; bool parseOk = ::google::protobuf::TextFormat::ParseFromString(scheme, &table); UNIT_ASSERT(parseOk); - return CreateOlapTable(parent, table); + return CreateColumnTable(parent, table); } - NMsgBusProxy::EResponseStatus TClient::CreateOlapTable(const TString& parent, + NMsgBusProxy::EResponseStatus TClient::CreateColumnTable(const TString& parent, const NKikimrSchemeOp::TColumnTableDescription& table) { auto request = std::make_unique<NMsgBusProxy::TBusSchemeOperation>(); auto* op = request->Record.MutableTransaction()->MutableModifyScheme(); diff --git a/ydb/core/testlib/test_client.h b/ydb/core/testlib/test_client.h index e2143be72a..5c788dab40 100644 --- a/ydb/core/testlib/test_client.h +++ b/ydb/core/testlib/test_client.h @@ -394,8 +394,16 @@ namespace Tests { NMsgBusProxy::EResponseStatus CreateOlapStore(const TString& parent, const TString& scheme); NMsgBusProxy::EResponseStatus CreateOlapStore(const TString& parent, const NKikimrSchemeOp::TColumnStoreDescription& store); - NMsgBusProxy::EResponseStatus CreateOlapTable(const TString& parent, const TString& scheme); - NMsgBusProxy::EResponseStatus CreateOlapTable(const TString& parent, const NKikimrSchemeOp::TColumnTableDescription& table); + NMsgBusProxy::EResponseStatus CreateColumnTable(const TString& parent, const TString& scheme); + NMsgBusProxy::EResponseStatus CreateColumnTable(const TString& parent, const NKikimrSchemeOp::TColumnTableDescription& table); +#if 1 // legacy names + NMsgBusProxy::EResponseStatus CreateOlapTable(const TString& parent, const TString& scheme) { + return CreateColumnTable(parent, scheme); + } + NMsgBusProxy::EResponseStatus CreateOlapTable(const TString& parent, const NKikimrSchemeOp::TColumnTableDescription& table) { + return CreateColumnTable(parent, table); + } +#endif NMsgBusProxy::EResponseStatus CreateSolomon(const TString& parent, const TString& name, ui32 parts = 4, ui32 channelProfile = 0); NMsgBusProxy::EResponseStatus StoreTableBackup(const TString& parent, const NKikimrSchemeOp::TBackupTask& task); NMsgBusProxy::EResponseStatus DeleteTopic(const TString& parent, const TString& name); diff --git a/ydb/core/tx/scheme_board/cache.cpp b/ydb/core/tx/scheme_board/cache.cpp index 6a871495f0..f2169e07b8 100644 --- a/ydb/core/tx/scheme_board/cache.cpp +++ b/ydb/core/tx/scheme_board/cache.cpp @@ -220,7 +220,7 @@ namespace { entry.SolomonVolumeInfo.Drop(); entry.PQGroupInfo.Drop(); entry.OlapStoreInfo.Drop(); - entry.OlapTableInfo.Drop(); + entry.ColumnTableInfo.Drop(); entry.CdcStreamInfo.Drop(); entry.SequenceInfo.Drop(); entry.ReplicationInfo.Drop(); @@ -730,7 +730,7 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> { SolomonVolumeInfo.Drop(); PQGroupInfo.Drop(); OlapStoreInfo.Drop(); - OlapTableInfo.Drop(); + ColumnTableInfo.Drop(); CdcStreamInfo.Drop(); SequenceInfo.Drop(); ReplicationInfo.Drop(); @@ -796,7 +796,7 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> { } } - void FillTableInfoFromOlapTable(const NKikimrSchemeOp::TPathDescription& pathDesc) { + void FillTableInfoFromColumnTable(const NKikimrSchemeOp::TPathDescription& pathDesc) { const auto& desc = pathDesc.GetColumnTableDescription(); THashMap<TString, ui32> nameToId; @@ -1149,7 +1149,7 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> { DESCRIPTION_PART(SolomonVolumeInfo); DESCRIPTION_PART(PQGroupInfo); DESCRIPTION_PART(OlapStoreInfo); - DESCRIPTION_PART(OlapTableInfo); + DESCRIPTION_PART(ColumnTableInfo); DESCRIPTION_PART(CdcStreamInfo); DESCRIPTION_PART(SequenceInfo); DESCRIPTION_PART(ReplicationInfo); @@ -1403,14 +1403,14 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> { FillInfo(Kind, OlapStoreInfo, std::move(*pathDesc.MutableColumnStoreDescription())); break; case NKikimrSchemeOp::EPathTypeColumnTable: - Kind = TNavigate::KindOlapTable; + Kind = TNavigate::KindColumnTable; if (Created) { - FillTableInfoFromOlapTable(pathDesc); + FillTableInfoFromColumnTable(pathDesc); } - FillInfo(Kind, OlapTableInfo, std::move(*pathDesc.MutableColumnTableDescription())); - if (OlapTableInfo->Description.HasColumnStorePathId()) { - auto& p = OlapTableInfo->Description.GetColumnStorePathId(); - OlapTableInfo->OlapStoreId = TTableId(p.GetOwnerId(), p.GetLocalId()); + FillInfo(Kind, ColumnTableInfo, std::move(*pathDesc.MutableColumnTableDescription())); + if (ColumnTableInfo->Description.HasColumnStorePathId()) { + auto& p = ColumnTableInfo->Description.GetColumnStorePathId(); + ColumnTableInfo->OlapStoreId = TTableId(p.GetOwnerId(), p.GetLocalId()); } break; case NKikimrSchemeOp::EPathTypeTableIndex: @@ -1491,7 +1491,7 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> { ListNodeEntry->Children.emplace_back(name, pathId, TNavigate::KindOlapStore); break; case NKikimrSchemeOp::EPathTypeColumnTable: - ListNodeEntry->Children.emplace_back(name, pathId, TNavigate::KindOlapTable); + ListNodeEntry->Children.emplace_back(name, pathId, TNavigate::KindColumnTable); break; case NKikimrSchemeOp::EPathTypeRtmrVolume: ListNodeEntry->Children.emplace_back(name, pathId, TNavigate::KindRtmr); @@ -1600,10 +1600,10 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> { entry.Kind = TNavigate::KindTable; if (target == NSysView::ISystemViewResolver::ETarget::OlapStore || - target == NSysView::ISystemViewResolver::ETarget::OlapTable) + target == NSysView::ISystemViewResolver::ETarget::ColumnTable) { // OLAP sys views are represented by OLAP tables - entry.Kind =TNavigate::KindOlapTable; + entry.Kind =TNavigate::KindColumnTable; } entry.Columns = std::move(schema->Columns); @@ -1654,17 +1654,17 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> { FillSystemViewEntry(context, entry, NSysView::ISystemViewResolver::ETarget::OlapStore); entry.OlapStoreInfo = OlapStoreInfo; return; - } else if (Kind == TNavigate::KindOlapTable) { - FillSystemViewEntry(context, entry, NSysView::ISystemViewResolver::ETarget::OlapTable); + } else if (Kind == TNavigate::KindColumnTable) { + FillSystemViewEntry(context, entry, NSysView::ISystemViewResolver::ETarget::ColumnTable); entry.OlapStoreInfo = OlapStoreInfo; - entry.OlapTableInfo = OlapTableInfo; + entry.ColumnTableInfo = ColumnTableInfo; return; } return SetError(context, entry, TNavigate::EStatus::PathErrorUnknown); } - const bool isTable = Kind == TNavigate::KindTable || Kind == TNavigate::KindOlapTable; + const bool isTable = Kind == TNavigate::KindTable || Kind == TNavigate::KindColumnTable; if (entry.Operation == TNavigate::OpTable && !isTable) { return SetError(context, entry, TNavigate::EStatus::PathNotTable); } @@ -1720,7 +1720,7 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> { entry.SolomonVolumeInfo = SolomonVolumeInfo; entry.PQGroupInfo = PQGroupInfo; entry.OlapStoreInfo = OlapStoreInfo; - entry.OlapTableInfo = OlapTableInfo; + entry.ColumnTableInfo = ColumnTableInfo; entry.CdcStreamInfo = CdcStreamInfo; entry.SequenceInfo = SequenceInfo; entry.ReplicationInfo = ReplicationInfo; @@ -1858,11 +1858,11 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> { } keyDesc.Partitioning = std::move(partitions); return; - } else if (Kind == TNavigate::KindOlapTable) { - FillSystemViewEntry(context, entry, NSysView::ISystemViewResolver::ETarget::OlapTable); + } else if (Kind == TNavigate::KindColumnTable) { + FillSystemViewEntry(context, entry, NSysView::ISystemViewResolver::ETarget::ColumnTable); // Add all shards of the OLAP table auto partitions = std::make_shared<TVector<TKeyDesc::TPartitionInfo>>(); - for (ui64 columnShard : OlapTableInfo->Description.GetSharding().GetColumnShards()) { + for (ui64 columnShard : ColumnTableInfo->Description.GetSharding().GetColumnShards()) { partitions->push_back(TKeyDesc::TPartitionInfo(columnShard)); partitions->back().Range = TKeyDesc::TPartitionRangeInfo(); } @@ -1900,10 +1900,10 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> { ++context->Request->ErrorCount; } } - } else if (OlapTableInfo) { + } else if (ColumnTableInfo) { // TODO: return proper partitioning info (KIKIMR-11069) auto partitions = std::make_shared<TVector<TKeyDesc::TPartitionInfo>>(); - for (ui64 columnShard : OlapTableInfo->Description.GetSharding().GetColumnShards()) { + for (ui64 columnShard : ColumnTableInfo->Description.GetSharding().GetColumnShards()) { partitions->push_back(TKeyDesc::TPartitionInfo(columnShard)); partitions->back().Range = TKeyDesc::TPartitionRangeInfo(); } @@ -1985,7 +1985,7 @@ class TSchemeCache: public TMonitorableActor<TSchemeCache> { // OlapStore specific TIntrusivePtr<TNavigate::TOlapStoreInfo> OlapStoreInfo; - TIntrusivePtr<TNavigate::TOlapTableInfo> OlapTableInfo; + TIntrusivePtr<TNavigate::TColumnTableInfo> ColumnTableInfo; // CDC specific TIntrusivePtr<TNavigate::TCdcStreamInfo> CdcStreamInfo; diff --git a/ydb/core/tx/scheme_cache/scheme_cache.h b/ydb/core/tx/scheme_cache/scheme_cache.h index 3e86fba52a..47d64574d5 100644 --- a/ydb/core/tx/scheme_cache/scheme_cache.h +++ b/ydb/core/tx/scheme_cache/scheme_cache.h @@ -120,7 +120,7 @@ struct TSchemeCacheNavigate { KindExtSubdomain = 9, KindIndex = 10, KindOlapStore = 11, - KindOlapTable = 12, + KindColumnTable = 12, KindCdcStream = 13, KindSequence = 14, KindReplication = 15, @@ -180,7 +180,7 @@ struct TSchemeCacheNavigate { NKikimrSchemeOp::TColumnStoreDescription Description; }; - struct TOlapTableInfo : public TAtomicRefCount<TOlapTableInfo> { + struct TColumnTableInfo : public TAtomicRefCount<TColumnTableInfo> { EKind Kind = KindUnknown; NKikimrSchemeOp::TColumnTableDescription Description; TTableId OlapStoreId; @@ -247,7 +247,7 @@ struct TSchemeCacheNavigate { TIntrusiveConstPtr<TKesusInfo> KesusInfo; TIntrusiveConstPtr<TSolomonVolumeInfo> SolomonVolumeInfo; TIntrusiveConstPtr<TOlapStoreInfo> OlapStoreInfo; - TIntrusiveConstPtr<TOlapTableInfo> OlapTableInfo; + TIntrusiveConstPtr<TColumnTableInfo> ColumnTableInfo; TIntrusiveConstPtr<TCdcStreamInfo> CdcStreamInfo; TIntrusiveConstPtr<TSequenceInfo> SequenceInfo; TIntrusiveConstPtr<TReplicationInfo> ReplicationInfo; diff --git a/ydb/core/tx/tx_proxy/resolvereq.cpp b/ydb/core/tx/tx_proxy/resolvereq.cpp index a660c82140..d01f795e49 100644 --- a/ydb/core/tx/tx_proxy/resolvereq.cpp +++ b/ydb/core/tx/tx_proxy/resolvereq.cpp @@ -180,7 +180,7 @@ namespace { auto& entry = resp->ResultSet[index]; table.TableId = entry.TableId; - table.IsOlapTable = (entry.Kind == NSchemeCache::TSchemeCacheNavigate::KindOlapTable); + table.IsColumnTable = (entry.Kind == NSchemeCache::TSchemeCacheNavigate::KindColumnTable); TVector<NScheme::TTypeId> keyColumnTypes(entry.Columns.size()); TVector<TKeyDesc::TColumnOp> columns(entry.Columns.size()); diff --git a/ydb/core/tx/tx_proxy/resolvereq.h b/ydb/core/tx/tx_proxy/resolvereq.h index 66e365c5ce..65a2fc08f7 100644 --- a/ydb/core/tx/tx_proxy/resolvereq.h +++ b/ydb/core/tx/tx_proxy/resolvereq.h @@ -20,7 +20,7 @@ namespace NTxProxy { TSerializedCellVec ToValues; THolder<TKeyDesc> KeyDescription; NSchemeCache::TDomainInfo::TPtr DomainInfo; - bool IsOlapTable = false; + bool IsColumnTable = false; }; using TResolveTableResponses = TVector<TResolveTableResponse>; diff --git a/ydb/core/tx/tx_proxy/snapshotreq.cpp b/ydb/core/tx/tx_proxy/snapshotreq.cpp index 28fc4dd779..727888fd74 100644 --- a/ydb/core/tx/tx_proxy/snapshotreq.cpp +++ b/ydb/core/tx/tx_proxy/snapshotreq.cpp @@ -269,7 +269,7 @@ public: TxProxyMon->TxPrepareResolveHgram->Collect((WallClockResolved - WallClockResolveStarted).MicroSeconds()); - bool hasOlapTable = false; + bool hasColumnTable = false; for (const auto& entry : msg->Tables) { // N.B. we create all keys as a read operation ui32 access = 0; @@ -291,9 +291,9 @@ public: continue; } - if (entry.IsOlapTable) { + if (entry.IsColumnTable) { // OLAP tables don't create snapshots explicitly - hasOlapTable = true; + hasColumnTable = true; continue; } @@ -335,7 +335,7 @@ public: } if (PerShardStates.empty()) { - if (!hasOlapTable) { + if (!hasColumnTable) { // No real (OLTP or OLAP) tables in the request so we can use current time as a fake PlanStep PlanStep = ctx.Now().MilliSeconds(); @@ -1315,7 +1315,7 @@ public: continue; } - if (entry.IsOlapTable) { + if (entry.IsColumnTable) { // OLAP tables don't create snapshots explicitly continue; } diff --git a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h index e123b55c41..9fa2810447 100644 --- a/ydb/core/tx/tx_proxy/upload_rows_common_impl.h +++ b/ydb/core/tx/tx_proxy/upload_rows_common_impl.h @@ -152,6 +152,7 @@ protected: bool WriteToTableShadow = false; bool AllowWriteToPrivateTable = false; + bool DiskQuotaExceeded = false; std::shared_ptr<arrow::RecordBatch> Batch; float RuCost = 0.0; @@ -161,7 +162,7 @@ public: return DerivedActivityType; } - explicit TUploadRowsBase(TDuration timeout = TDuration::Max()) + explicit TUploadRowsBase(TDuration timeout = TDuration::Max(), bool diskQuotaExceeded = false) : TBase() , SchemeCache(MakeSchemeCacheID()) , LeaderPipeCache(MakePipePeNodeCacheID(false)) @@ -169,6 +170,7 @@ public: , WaitingResolveReply(false) , Finished(false) , Status(Ydb::StatusIds::SUCCESS) + , DiskQuotaExceeded(diskQuotaExceeded) {} void Bootstrap(const NActors::TActorContext& ctx) { @@ -464,7 +466,9 @@ private: const NSchemeCache::TSchemeCacheNavigate& request = *ev->Get()->Request; Y_VERIFY(request.ResultSet.size() == 1); - switch (request.ResultSet.front().Status) { + const NSchemeCache::TSchemeCacheNavigate::TEntry& entry = request.ResultSet.front(); + + switch (entry.Status) { case NSchemeCache::TSchemeCacheNavigate::EStatus::Ok: break; case NSchemeCache::TSchemeCacheNavigate::EStatus::LookupError: @@ -481,17 +485,23 @@ private: return ReplyWithError(Ydb::StatusIds::GENERIC_ERROR, Sprintf("Unknown error on table '%s'", GetTable().c_str()), ctx); } - TableKind = request.ResultSet.front().Kind; - bool isOlapTable = (TableKind == NSchemeCache::TSchemeCacheNavigate::KindOlapTable); + TableKind = entry.Kind; + bool isColumnTable = (TableKind == NSchemeCache::TSchemeCacheNavigate::KindColumnTable); - if (request.ResultSet.front().TableId.IsSystemView()) { + if (entry.TableId.IsSystemView()) { return ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, Sprintf("Table '%s' is a system view. Bulk upsert is not supported.", GetTable().c_str()), ctx); } + // TODO: fast fail for all tables? + if (isColumnTable && DiskQuotaExceeded) { + return ReplyWithError(Ydb::StatusIds::UNAVAILABLE, + "Cannot perform writes: database is out of disk space", ctx); + } + ResolveNamesResult.reset(ev->Get()->Request.Release()); - bool makeYdbSchema = isOlapTable || (GetSourceType() != EUploadSource::ProtoValues); + bool makeYdbSchema = isColumnTable || (GetSourceType() != EUploadSource::ProtoValues); TString errorMessage; if (!BuildSchema(ctx, errorMessage, makeYdbSchema)) { return ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, errorMessage, ctx); @@ -504,7 +514,7 @@ private: return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, errorMessage, ctx); } - if (isOlapTable) { + if (isColumnTable) { // TUploadRowsRPCPublic::ExtractBatch() - converted JsonDocument, DynNumbers, ... if (!ExtractBatch(errorMessage)) { return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, errorMessage, ctx); @@ -517,7 +527,7 @@ private: case EUploadSource::ArrowBatch: case EUploadSource::CSV: { - if (isOlapTable) { + if (isColumnTable) { // TUploadColumnsRPCPublic::ExtractBatch() - NOT converted JsonDocument, DynNumbers, ... if (!ExtractBatch(errorMessage)) { return ReplyWithError(Ydb::StatusIds::BAD_REQUEST, errorMessage, ctx); @@ -557,15 +567,15 @@ private: if (TableKind == NSchemeCache::TSchemeCacheNavigate::KindTable) { ResolveShards(ctx); - } else if (isOlapTable) { - WriteToOlapTable(ctx); + } else if (isColumnTable) { + WriteToColumnTable(ctx); } else { return ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, Sprintf("Table '%s': Bulk upsert is not supported for this table kind.", GetTable().c_str()), ctx); } } - void WriteToOlapTable(const NActors::TActorContext& ctx) { + void WriteToColumnTable(const NActors::TActorContext& ctx) { TString accessCheckError; if (!CheckAccess(accessCheckError)) { return ReplyWithError(Ydb::StatusIds::UNAUTHORIZED, accessCheckError, ctx); @@ -663,17 +673,17 @@ private: auto& entry = ResolveNamesResult->ResultSet[0]; - if (entry.Kind != NSchemeCache::TSchemeCacheNavigate::KindOlapTable) { + if (entry.Kind != NSchemeCache::TSchemeCacheNavigate::KindColumnTable) { ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, "The specified path is not an olap table", ctx); return {}; } - if (!entry.OlapTableInfo || !entry.OlapTableInfo->Description.HasSchema()) { + if (!entry.ColumnTableInfo || !entry.ColumnTableInfo->Description.HasSchema()) { ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, "Olap table expected", ctx); return {}; } - const auto& description = entry.OlapTableInfo->Description; + const auto& description = entry.ColumnTableInfo->Description; const auto& schema = description.GetSchema(); #if 1 // TODO: do we need this restriction? diff --git a/ydb/public/api/protos/ydb_common.proto b/ydb/public/api/protos/ydb_common.proto index a63e202f82..2ecb33fb34 100644 --- a/ydb/public/api/protos/ydb_common.proto +++ b/ydb/public/api/protos/ydb_common.proto @@ -17,3 +17,7 @@ message CostInfo { // Total amount of request units (RU), consumed by the operation. double consumed_units = 1; } + +message QuotaExceeded { + bool disk = 1; +} diff --git a/ydb/services/ydb/ydb_common_ut.h b/ydb/services/ydb/ydb_common_ut.h index 05ba1cc386..3a18893037 100644 --- a/ydb/services/ydb/ydb_common_ut.h +++ b/ydb/services/ydb/ydb_common_ut.h @@ -235,7 +235,7 @@ struct TTestOlap { annoyingClient.SetSecurityToken("root@builtin"); NMsgBusProxy::EResponseStatus status = annoyingClient.CreateOlapStore("/Root", tableDescr); UNIT_ASSERT_VALUES_EQUAL(status, NMsgBusProxy::EResponseStatus::MSTATUS_OK); - status = annoyingClient.CreateOlapTable("/Root", Sprintf(R"( + status = annoyingClient.CreateColumnTable("/Root", Sprintf(R"( Name: "%s/%s" ColumnShardCount : %d Sharding { diff --git a/ydb/services/ydb/ydb_olapstore_ut.cpp b/ydb/services/ydb/ydb_olapstore_ut.cpp index e07d9459f1..f48aee4899 100644 --- a/ydb/services/ydb/ydb_olapstore_ut.cpp +++ b/ydb/services/ydb/ydb_olapstore_ut.cpp @@ -76,7 +76,7 @@ Y_UNIT_TEST_SUITE(YdbOlapStore) { NMsgBusProxy::EResponseStatus status = annoyingClient.CreateOlapStore("/Root", tableDescr); UNIT_ASSERT_VALUES_EQUAL(status, NMsgBusProxy::EResponseStatus::MSTATUS_OK); - status = annoyingClient.CreateOlapTable("/Root/OlapStore", Sprintf(R"( + status = annoyingClient.CreateColumnTable("/Root/OlapStore", Sprintf(R"( Name: "%s" ColumnShardCount : %d Sharding { |