aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvitalyisaev <vitalyisaev@ydb.tech>2023-10-20 15:59:05 +0300
committervitalyisaev <vitalyisaev@ydb.tech>2023-10-20 17:22:35 +0300
commit9eaf05cd7641e8b698f228b039e3bd3a11fea23a (patch)
tree20a5851974ede0fc9a90561118bdcbba4862be9e
parent86cae0f179c274076b9794121f25fb396c3a0bb2 (diff)
downloadydb-9eaf05cd7641e8b698f228b039e3bd3a11fea23a.tar.gz
YQ Connector:splitting data stream on the server side
1. Поток данных из SQL источника разбивается на блоки (pages). Основная логика в [этом файле](https://a.yandex-team.ru/review/4701354/files/1#file-ydb/library/yql/providers/generic/connector/app/server/streamer.go). Новый блок выделяется каждые 10000 строк (это поведение будет усовершенствовано в отдельном тикете). 2. Некоторые файлы перемещены из utils в paging для того, чтобы utils не превращался в свалку. 3. Удалены более неактуальные интеграционные тесты на ограничение количества прочитанных строк. 4. Опционально запускается HTTP-сервер с профилировщиком рантайма.
-rw-r--r--ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp15
-rw-r--r--ydb/library/yql/providers/generic/connector/app/config/server.pb.go349
-rw-r--r--ydb/library/yql/providers/generic/connector/app/config/server.proto31
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/cmd.go2
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/config.go57
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/launcher.go115
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/paging/columnar_buffer_arrow_ipc_streaming.go (renamed from ydb/library/yql/providers/generic/connector/app/server/utils/columnar_buffer_arrow_ipc_streaming.go)7
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/paging/columnar_buffer_factory.go (renamed from ydb/library/yql/providers/generic/connector/app/server/utils/columnar_buffer_factory.go)38
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/paging/doc.go3
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/paging/read_limiter.go (renamed from ydb/library/yql/providers/generic/connector/app/server/utils/read_limiter.go)9
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/paging/writer.go102
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/paging/ya.make15
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/rdbms/handler.go5
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/service_connector.go (renamed from ydb/library/yql/providers/generic/connector/app/server/server.go)190
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/service_pprof.go63
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/streamer.go129
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/utils/paging_writer.go97
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/utils/ya.make4
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/ya.make8
-rw-r--r--ydb/library/yql/providers/generic/connector/libcpp/error.cpp14
-rw-r--r--ydb/library/yql/providers/generic/connector/libcpp/error.h19
-rw-r--r--ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp4
22 files changed, 944 insertions, 332 deletions
diff --git a/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp b/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp
index 1683028b398..713e8e91bc5 100644
--- a/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp
+++ b/ydb/library/yql/providers/generic/actors/yql_generic_read_actor.cpp
@@ -172,7 +172,7 @@ namespace NYql::NDq {
YQL_CLOG(TRACE, ProviderGeneric) << "Handle :: EvListSplitsPart :: event handling started"
<< ": splits_size=" << response.splits().size();
- if (!NConnector::IsSuccess(response.error())) {
+ if (!NConnector::IsSuccess(response)) {
return NotifyComputeActorWithError(
TActivationContext::ActorSystem(),
ComputeActorId_,
@@ -266,7 +266,7 @@ namespace NYql::NDq {
YQL_CLOG(TRACE, ProviderGeneric) << "Handle :: EvReadSplitsPart :: event handling started"
<< ": part_size=" << response.arrow_ipc_streaming().size();
- if (!NConnector::IsSuccess(response.error())) {
+ if (!NConnector::IsSuccess(response)) {
return NotifyComputeActorWithError(
TActivationContext::ActorSystem(),
ComputeActorId_,
@@ -369,9 +369,16 @@ namespace NYql::NDq {
i64 /*freeSpace*/) final {
YQL_ENSURE(!buffer.IsWide(), "Wide stream is not supported");
+ YQL_CLOG(TRACE, ProviderGeneric) << "GetAsyncInputData :: start";
+
// Stream is finished
if (!LastReadSplitsResponse_) {
finished = ReadSplitsFinished_;
+
+ if (finished) {
+ YQL_CLOG(INFO, ProviderGeneric) << "GetAsyncInputData :: data reading finished";
+ }
+
return 0;
}
@@ -414,7 +421,6 @@ namespace NYql::NDq {
buffer.emplace_back(std::move(value));
// freeSpace -= size;
- finished = true;
LastReadSplitsResponse_ = std::nullopt;
// TODO: check it, because in S3 the generic cache clearing happens only when LastFileWasProcessed:
@@ -425,6 +431,9 @@ namespace NYql::NDq {
AwaitNextStreamItem<NConnector::IReadSplitsStreamIterator,
TEvPrivate::TEvReadSplitsPart,
TEvPrivate::TEvReadSplitsFinished>(ReadSplitsIterator_);
+ finished = false;
+
+ YQL_CLOG(TRACE, ProviderGeneric) << "GetAsyncInputData :: bytes obtained = " << total;
return total;
}
diff --git a/ydb/library/yql/providers/generic/connector/app/config/server.pb.go b/ydb/library/yql/providers/generic/connector/app/config/server.pb.go
index 9ac7186ed7b..b43cc8dc087 100644
--- a/ydb/library/yql/providers/generic/connector/app/config/server.pb.go
+++ b/ydb/library/yql/providers/generic/connector/app/config/server.pb.go
@@ -86,17 +86,22 @@ type TServerConfig struct {
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
- // Network address server will be listening on
+ // TODO: remove it after YQ-2057
+ //
+ // Deprecated: Do not use.
Endpoint *common.TEndpoint `protobuf:"bytes,1,opt,name=endpoint,proto3" json:"endpoint,omitempty"`
- // TLS settings.
- // Leave it empty for insecure connections.
- Tls *TServerTLSConfig `protobuf:"bytes,2,opt,name=tls,proto3" json:"tls,omitempty"`
+ // Deprecated: Do not use.
+ Tls *TServerTLSConfig `protobuf:"bytes,2,opt,name=tls,proto3" json:"tls,omitempty"`
+ ConnectorServer *TConnectorServerConfig `protobuf:"bytes,5,opt,name=connector_server,json=connectorServer,proto3" json:"connector_server,omitempty"`
// This is a rough restriction for YQ memory consumption until
// https://st.yandex-team.ru/YQ-2057 is implemented.
// Leave it empty if you want to avoid any memory limits.
ReadLimit *TServerReadLimit `protobuf:"bytes,3,opt,name=read_limit,json=readLimit,proto3" json:"read_limit,omitempty"`
// Logger config
Logger *TLoggerConfig `protobuf:"bytes,4,opt,name=logger,proto3" json:"logger,omitempty"`
+ // Go runtime profiler.
+ // Disabled if this part of config is empty.
+ PprofServer *TPprofServerConfig `protobuf:"bytes,6,opt,name=pprof_server,json=pprofServer,proto3" json:"pprof_server,omitempty"`
}
func (x *TServerConfig) Reset() {
@@ -131,6 +136,7 @@ func (*TServerConfig) Descriptor() ([]byte, []int) {
return file_ydb_library_yql_providers_generic_connector_app_config_server_proto_rawDescGZIP(), []int{0}
}
+// Deprecated: Do not use.
func (x *TServerConfig) GetEndpoint() *common.TEndpoint {
if x != nil {
return x.Endpoint
@@ -138,6 +144,7 @@ func (x *TServerConfig) GetEndpoint() *common.TEndpoint {
return nil
}
+// Deprecated: Do not use.
func (x *TServerConfig) GetTls() *TServerTLSConfig {
if x != nil {
return x.Tls
@@ -145,6 +152,13 @@ func (x *TServerConfig) GetTls() *TServerTLSConfig {
return nil
}
+func (x *TServerConfig) GetConnectorServer() *TConnectorServerConfig {
+ if x != nil {
+ return x.ConnectorServer
+ }
+ return nil
+}
+
func (x *TServerConfig) GetReadLimit() *TServerReadLimit {
if x != nil {
return x.ReadLimit
@@ -159,6 +173,72 @@ func (x *TServerConfig) GetLogger() *TLoggerConfig {
return nil
}
+func (x *TServerConfig) GetPprofServer() *TPprofServerConfig {
+ if x != nil {
+ return x.PprofServer
+ }
+ return nil
+}
+
+// TConnectorServerConfig - configuration of the main GRPC server
+type TConnectorServerConfig struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ // Network address server will be listening on
+ Endpoint *common.TEndpoint `protobuf:"bytes,1,opt,name=endpoint,proto3" json:"endpoint,omitempty"`
+ // TLS settings.
+ // Leave it empty for insecure connections.
+ Tls *TServerTLSConfig `protobuf:"bytes,2,opt,name=tls,proto3" json:"tls,omitempty"`
+}
+
+func (x *TConnectorServerConfig) Reset() {
+ *x = TConnectorServerConfig{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_ydb_library_yql_providers_generic_connector_app_config_server_proto_msgTypes[1]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *TConnectorServerConfig) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*TConnectorServerConfig) ProtoMessage() {}
+
+func (x *TConnectorServerConfig) ProtoReflect() protoreflect.Message {
+ mi := &file_ydb_library_yql_providers_generic_connector_app_config_server_proto_msgTypes[1]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use TConnectorServerConfig.ProtoReflect.Descriptor instead.
+func (*TConnectorServerConfig) Descriptor() ([]byte, []int) {
+ return file_ydb_library_yql_providers_generic_connector_app_config_server_proto_rawDescGZIP(), []int{1}
+}
+
+func (x *TConnectorServerConfig) GetEndpoint() *common.TEndpoint {
+ if x != nil {
+ return x.Endpoint
+ }
+ return nil
+}
+
+func (x *TConnectorServerConfig) GetTls() *TServerTLSConfig {
+ if x != nil {
+ return x.Tls
+ }
+ return nil
+}
+
type TServerTLSConfig struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
@@ -173,7 +253,7 @@ type TServerTLSConfig struct {
func (x *TServerTLSConfig) Reset() {
*x = TServerTLSConfig{}
if protoimpl.UnsafeEnabled {
- mi := &file_ydb_library_yql_providers_generic_connector_app_config_server_proto_msgTypes[1]
+ mi := &file_ydb_library_yql_providers_generic_connector_app_config_server_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -186,7 +266,7 @@ func (x *TServerTLSConfig) String() string {
func (*TServerTLSConfig) ProtoMessage() {}
func (x *TServerTLSConfig) ProtoReflect() protoreflect.Message {
- mi := &file_ydb_library_yql_providers_generic_connector_app_config_server_proto_msgTypes[1]
+ mi := &file_ydb_library_yql_providers_generic_connector_app_config_server_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -199,7 +279,7 @@ func (x *TServerTLSConfig) ProtoReflect() protoreflect.Message {
// Deprecated: Use TServerTLSConfig.ProtoReflect.Descriptor instead.
func (*TServerTLSConfig) Descriptor() ([]byte, []int) {
- return file_ydb_library_yql_providers_generic_connector_app_config_server_proto_rawDescGZIP(), []int{1}
+ return file_ydb_library_yql_providers_generic_connector_app_config_server_proto_rawDescGZIP(), []int{2}
}
func (x *TServerTLSConfig) GetKey() string {
@@ -229,7 +309,7 @@ type TServerReadLimit struct {
func (x *TServerReadLimit) Reset() {
*x = TServerReadLimit{}
if protoimpl.UnsafeEnabled {
- mi := &file_ydb_library_yql_providers_generic_connector_app_config_server_proto_msgTypes[2]
+ mi := &file_ydb_library_yql_providers_generic_connector_app_config_server_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -242,7 +322,7 @@ func (x *TServerReadLimit) String() string {
func (*TServerReadLimit) ProtoMessage() {}
func (x *TServerReadLimit) ProtoReflect() protoreflect.Message {
- mi := &file_ydb_library_yql_providers_generic_connector_app_config_server_proto_msgTypes[2]
+ mi := &file_ydb_library_yql_providers_generic_connector_app_config_server_proto_msgTypes[3]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -255,7 +335,7 @@ func (x *TServerReadLimit) ProtoReflect() protoreflect.Message {
// Deprecated: Use TServerReadLimit.ProtoReflect.Descriptor instead.
func (*TServerReadLimit) Descriptor() ([]byte, []int) {
- return file_ydb_library_yql_providers_generic_connector_app_config_server_proto_rawDescGZIP(), []int{2}
+ return file_ydb_library_yql_providers_generic_connector_app_config_server_proto_rawDescGZIP(), []int{3}
}
func (x *TServerReadLimit) GetRows() uint64 {
@@ -280,7 +360,7 @@ type TLoggerConfig struct {
func (x *TLoggerConfig) Reset() {
*x = TLoggerConfig{}
if protoimpl.UnsafeEnabled {
- mi := &file_ydb_library_yql_providers_generic_connector_app_config_server_proto_msgTypes[3]
+ mi := &file_ydb_library_yql_providers_generic_connector_app_config_server_proto_msgTypes[4]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
@@ -293,7 +373,7 @@ func (x *TLoggerConfig) String() string {
func (*TLoggerConfig) ProtoMessage() {}
func (x *TLoggerConfig) ProtoReflect() protoreflect.Message {
- mi := &file_ydb_library_yql_providers_generic_connector_app_config_server_proto_msgTypes[3]
+ mi := &file_ydb_library_yql_providers_generic_connector_app_config_server_proto_msgTypes[4]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
@@ -306,7 +386,7 @@ func (x *TLoggerConfig) ProtoReflect() protoreflect.Message {
// Deprecated: Use TLoggerConfig.ProtoReflect.Descriptor instead.
func (*TLoggerConfig) Descriptor() ([]byte, []int) {
- return file_ydb_library_yql_providers_generic_connector_app_config_server_proto_rawDescGZIP(), []int{3}
+ return file_ydb_library_yql_providers_generic_connector_app_config_server_proto_rawDescGZIP(), []int{4}
}
func (x *TLoggerConfig) GetLogLevel() ELogLevel {
@@ -323,6 +403,65 @@ func (x *TLoggerConfig) GetEnableSqlQueryLogging() bool {
return false
}
+// TPprofServerConfig configures HTTP server delivering Go runtime profiler data
+type TPprofServerConfig struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ // Network address server will be listening on
+ Endpoint *common.TEndpoint `protobuf:"bytes,1,opt,name=endpoint,proto3" json:"endpoint,omitempty"`
+ // TLS settings.
+ // Leave it empty for insecure connections.
+ Tls *TServerTLSConfig `protobuf:"bytes,2,opt,name=tls,proto3" json:"tls,omitempty"`
+}
+
+func (x *TPprofServerConfig) Reset() {
+ *x = TPprofServerConfig{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_ydb_library_yql_providers_generic_connector_app_config_server_proto_msgTypes[5]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *TPprofServerConfig) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*TPprofServerConfig) ProtoMessage() {}
+
+func (x *TPprofServerConfig) ProtoReflect() protoreflect.Message {
+ mi := &file_ydb_library_yql_providers_generic_connector_app_config_server_proto_msgTypes[5]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use TPprofServerConfig.ProtoReflect.Descriptor instead.
+func (*TPprofServerConfig) Descriptor() ([]byte, []int) {
+ return file_ydb_library_yql_providers_generic_connector_app_config_server_proto_rawDescGZIP(), []int{5}
+}
+
+func (x *TPprofServerConfig) GetEndpoint() *common.TEndpoint {
+ if x != nil {
+ return x.Endpoint
+ }
+ return nil
+}
+
+func (x *TPprofServerConfig) GetTls() *TServerTLSConfig {
+ if x != nil {
+ return x.Tls
+ }
+ return nil
+}
+
var File_ydb_library_yql_providers_generic_connector_app_config_server_proto protoreflect.FileDescriptor
var file_ydb_library_yql_providers_generic_connector_app_config_server_proto_rawDesc = []byte{
@@ -336,50 +475,80 @@ var file_ydb_library_yql_providers_generic_connector_app_config_server_proto_raw
0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65, 0x72, 0x73, 0x2f, 0x67, 0x65, 0x6e, 0x65,
0x72, 0x69, 0x63, 0x2f, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2f, 0x61, 0x70,
0x69, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2f, 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e,
- 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x99, 0x02, 0x0a, 0x0d, 0x54, 0x53, 0x65, 0x72,
- 0x76, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x3b, 0x0a, 0x08, 0x65, 0x6e, 0x64,
+ 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xd1, 0x03, 0x0a, 0x0d, 0x54, 0x53, 0x65, 0x72,
+ 0x76, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x3f, 0x0a, 0x08, 0x65, 0x6e, 0x64,
0x70, 0x6f, 0x69, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x4e, 0x59,
0x71, 0x6c, 0x2e, 0x4e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x4e, 0x41,
- 0x70, 0x69, 0x2e, 0x54, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x52, 0x08, 0x65, 0x6e,
- 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x12, 0x3d, 0x0a, 0x03, 0x74, 0x6c, 0x73, 0x18, 0x02, 0x20,
- 0x01, 0x28, 0x0b, 0x32, 0x2b, 0x2e, 0x4e, 0x59, 0x71, 0x6c, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65,
- 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x41, 0x70, 0x70, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e,
- 0x54, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x54, 0x4c, 0x53, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67,
- 0x52, 0x03, 0x74, 0x6c, 0x73, 0x12, 0x4a, 0x0a, 0x0a, 0x72, 0x65, 0x61, 0x64, 0x5f, 0x6c, 0x69,
- 0x6d, 0x69, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2b, 0x2e, 0x4e, 0x59, 0x71, 0x6c,
+ 0x70, 0x69, 0x2e, 0x54, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x42, 0x02, 0x18, 0x01,
+ 0x52, 0x08, 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x12, 0x41, 0x0a, 0x03, 0x74, 0x6c,
+ 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2b, 0x2e, 0x4e, 0x59, 0x71, 0x6c, 0x2e, 0x43,
+ 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x41, 0x70, 0x70, 0x2e, 0x43, 0x6f, 0x6e,
+ 0x66, 0x69, 0x67, 0x2e, 0x54, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x54, 0x4c, 0x53, 0x43, 0x6f,
+ 0x6e, 0x66, 0x69, 0x67, 0x42, 0x02, 0x18, 0x01, 0x52, 0x03, 0x74, 0x6c, 0x73, 0x12, 0x5c, 0x0a,
+ 0x10, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x5f, 0x73, 0x65, 0x72, 0x76, 0x65,
+ 0x72, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x31, 0x2e, 0x4e, 0x59, 0x71, 0x6c, 0x2e, 0x43,
+ 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x41, 0x70, 0x70, 0x2e, 0x43, 0x6f, 0x6e,
+ 0x66, 0x69, 0x67, 0x2e, 0x54, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x53, 0x65,
+ 0x72, 0x76, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x0f, 0x63, 0x6f, 0x6e, 0x6e,
+ 0x65, 0x63, 0x74, 0x6f, 0x72, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x12, 0x4a, 0x0a, 0x0a, 0x72,
+ 0x65, 0x61, 0x64, 0x5f, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32,
+ 0x2b, 0x2e, 0x4e, 0x59, 0x71, 0x6c, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72,
+ 0x2e, 0x41, 0x70, 0x70, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x54, 0x53, 0x65, 0x72,
+ 0x76, 0x65, 0x72, 0x52, 0x65, 0x61, 0x64, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x52, 0x09, 0x72, 0x65,
+ 0x61, 0x64, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x12, 0x40, 0x0a, 0x06, 0x6c, 0x6f, 0x67, 0x67, 0x65,
+ 0x72, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x28, 0x2e, 0x4e, 0x59, 0x71, 0x6c, 0x2e, 0x43,
+ 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x41, 0x70, 0x70, 0x2e, 0x43, 0x6f, 0x6e,
+ 0x66, 0x69, 0x67, 0x2e, 0x54, 0x4c, 0x6f, 0x67, 0x67, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69,
+ 0x67, 0x52, 0x06, 0x6c, 0x6f, 0x67, 0x67, 0x65, 0x72, 0x12, 0x50, 0x0a, 0x0c, 0x70, 0x70, 0x72,
+ 0x6f, 0x66, 0x5f, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32,
+ 0x2d, 0x2e, 0x4e, 0x59, 0x71, 0x6c, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72,
+ 0x2e, 0x41, 0x70, 0x70, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x54, 0x50, 0x70, 0x72,
+ 0x6f, 0x66, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x0b,
+ 0x70, 0x70, 0x72, 0x6f, 0x66, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x22, 0x94, 0x01, 0x0a, 0x16,
+ 0x54, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72,
+ 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x3b, 0x0a, 0x08, 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69,
+ 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x4e, 0x59, 0x71, 0x6c, 0x2e,
+ 0x4e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x4e, 0x41, 0x70, 0x69, 0x2e,
+ 0x54, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x52, 0x08, 0x65, 0x6e, 0x64, 0x70, 0x6f,
+ 0x69, 0x6e, 0x74, 0x12, 0x3d, 0x0a, 0x03, 0x74, 0x6c, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b,
+ 0x32, 0x2b, 0x2e, 0x4e, 0x59, 0x71, 0x6c, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f,
+ 0x72, 0x2e, 0x41, 0x70, 0x70, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x54, 0x53, 0x65,
+ 0x72, 0x76, 0x65, 0x72, 0x54, 0x4c, 0x53, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x03, 0x74,
+ 0x6c, 0x73, 0x22, 0x3e, 0x0a, 0x10, 0x54, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x54, 0x4c, 0x53,
+ 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x02, 0x20,
+ 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x12, 0x0a, 0x04, 0x63, 0x65, 0x72, 0x74,
+ 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x63, 0x65, 0x72, 0x74, 0x4a, 0x04, 0x08, 0x01,
+ 0x10, 0x02, 0x22, 0x26, 0x0a, 0x10, 0x54, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x52, 0x65, 0x61,
+ 0x64, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x72, 0x6f, 0x77, 0x73, 0x18, 0x01,
+ 0x20, 0x01, 0x28, 0x04, 0x52, 0x04, 0x72, 0x6f, 0x77, 0x73, 0x22, 0x8b, 0x01, 0x0a, 0x0d, 0x54,
+ 0x4c, 0x6f, 0x67, 0x67, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x41, 0x0a, 0x09,
+ 0x6c, 0x6f, 0x67, 0x5f, 0x6c, 0x65, 0x76, 0x65, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32,
+ 0x24, 0x2e, 0x4e, 0x59, 0x71, 0x6c, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72,
+ 0x2e, 0x41, 0x70, 0x70, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x45, 0x4c, 0x6f, 0x67,
+ 0x4c, 0x65, 0x76, 0x65, 0x6c, 0x52, 0x08, 0x6c, 0x6f, 0x67, 0x4c, 0x65, 0x76, 0x65, 0x6c, 0x12,
+ 0x37, 0x0a, 0x18, 0x65, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x73, 0x71, 0x6c, 0x5f, 0x71, 0x75,
+ 0x65, 0x72, 0x79, 0x5f, 0x6c, 0x6f, 0x67, 0x67, 0x69, 0x6e, 0x67, 0x18, 0x02, 0x20, 0x01, 0x28,
+ 0x08, 0x52, 0x15, 0x65, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x53, 0x71, 0x6c, 0x51, 0x75, 0x65, 0x72,
+ 0x79, 0x4c, 0x6f, 0x67, 0x67, 0x69, 0x6e, 0x67, 0x22, 0x90, 0x01, 0x0a, 0x12, 0x54, 0x50, 0x70,
+ 0x72, 0x6f, 0x66, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12,
+ 0x3b, 0x0a, 0x08, 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28,
+ 0x0b, 0x32, 0x1f, 0x2e, 0x4e, 0x59, 0x71, 0x6c, 0x2e, 0x4e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63,
+ 0x74, 0x6f, 0x72, 0x2e, 0x4e, 0x41, 0x70, 0x69, 0x2e, 0x54, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69,
+ 0x6e, 0x74, 0x52, 0x08, 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x12, 0x3d, 0x0a, 0x03,
+ 0x74, 0x6c, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2b, 0x2e, 0x4e, 0x59, 0x71, 0x6c,
0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x41, 0x70, 0x70, 0x2e, 0x43,
- 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x54, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x52, 0x65, 0x61,
- 0x64, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x52, 0x09, 0x72, 0x65, 0x61, 0x64, 0x4c, 0x69, 0x6d, 0x69,
- 0x74, 0x12, 0x40, 0x0a, 0x06, 0x6c, 0x6f, 0x67, 0x67, 0x65, 0x72, 0x18, 0x04, 0x20, 0x01, 0x28,
- 0x0b, 0x32, 0x28, 0x2e, 0x4e, 0x59, 0x71, 0x6c, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74,
- 0x6f, 0x72, 0x2e, 0x41, 0x70, 0x70, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x54, 0x4c,
- 0x6f, 0x67, 0x67, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x06, 0x6c, 0x6f, 0x67,
- 0x67, 0x65, 0x72, 0x22, 0x3e, 0x0a, 0x10, 0x54, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x54, 0x4c,
- 0x53, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x02,
- 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x12, 0x0a, 0x04, 0x63, 0x65, 0x72,
- 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x63, 0x65, 0x72, 0x74, 0x4a, 0x04, 0x08,
- 0x01, 0x10, 0x02, 0x22, 0x26, 0x0a, 0x10, 0x54, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x52, 0x65,
- 0x61, 0x64, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x12, 0x12, 0x0a, 0x04, 0x72, 0x6f, 0x77, 0x73, 0x18,
- 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x04, 0x72, 0x6f, 0x77, 0x73, 0x22, 0x8b, 0x01, 0x0a, 0x0d,
- 0x54, 0x4c, 0x6f, 0x67, 0x67, 0x65, 0x72, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x41, 0x0a,
- 0x09, 0x6c, 0x6f, 0x67, 0x5f, 0x6c, 0x65, 0x76, 0x65, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e,
- 0x32, 0x24, 0x2e, 0x4e, 0x59, 0x71, 0x6c, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f,
- 0x72, 0x2e, 0x41, 0x70, 0x70, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x45, 0x4c, 0x6f,
- 0x67, 0x4c, 0x65, 0x76, 0x65, 0x6c, 0x52, 0x08, 0x6c, 0x6f, 0x67, 0x4c, 0x65, 0x76, 0x65, 0x6c,
- 0x12, 0x37, 0x0a, 0x18, 0x65, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x73, 0x71, 0x6c, 0x5f, 0x71,
- 0x75, 0x65, 0x72, 0x79, 0x5f, 0x6c, 0x6f, 0x67, 0x67, 0x69, 0x6e, 0x67, 0x18, 0x02, 0x20, 0x01,
- 0x28, 0x08, 0x52, 0x15, 0x65, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x53, 0x71, 0x6c, 0x51, 0x75, 0x65,
- 0x72, 0x79, 0x4c, 0x6f, 0x67, 0x67, 0x69, 0x6e, 0x67, 0x2a, 0x4b, 0x0a, 0x09, 0x45, 0x4c, 0x6f,
- 0x67, 0x4c, 0x65, 0x76, 0x65, 0x6c, 0x12, 0x09, 0x0a, 0x05, 0x54, 0x52, 0x41, 0x43, 0x45, 0x10,
- 0x00, 0x12, 0x09, 0x0a, 0x05, 0x44, 0x45, 0x42, 0x55, 0x47, 0x10, 0x01, 0x12, 0x08, 0x0a, 0x04,
- 0x49, 0x4e, 0x46, 0x4f, 0x10, 0x02, 0x12, 0x08, 0x0a, 0x04, 0x57, 0x41, 0x52, 0x4e, 0x10, 0x03,
- 0x12, 0x09, 0x0a, 0x05, 0x45, 0x52, 0x52, 0x4f, 0x52, 0x10, 0x04, 0x12, 0x09, 0x0a, 0x05, 0x46,
- 0x41, 0x54, 0x41, 0x4c, 0x10, 0x05, 0x42, 0x49, 0x5a, 0x47, 0x61, 0x2e, 0x79, 0x61, 0x6e, 0x64,
- 0x65, 0x78, 0x2d, 0x74, 0x65, 0x61, 0x6d, 0x2e, 0x72, 0x75, 0x2f, 0x79, 0x64, 0x62, 0x2f, 0x6c,
- 0x69, 0x62, 0x72, 0x61, 0x72, 0x79, 0x2f, 0x79, 0x71, 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x76, 0x69,
- 0x64, 0x65, 0x72, 0x73, 0x2f, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x69, 0x63, 0x2f, 0x63, 0x6f, 0x6e,
- 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2f, 0x61, 0x70, 0x70, 0x2f, 0x63, 0x6f, 0x6e, 0x66, 0x69,
- 0x67, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+ 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x54, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x54, 0x4c, 0x53,
+ 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x03, 0x74, 0x6c, 0x73, 0x2a, 0x4b, 0x0a, 0x09, 0x45,
+ 0x4c, 0x6f, 0x67, 0x4c, 0x65, 0x76, 0x65, 0x6c, 0x12, 0x09, 0x0a, 0x05, 0x54, 0x52, 0x41, 0x43,
+ 0x45, 0x10, 0x00, 0x12, 0x09, 0x0a, 0x05, 0x44, 0x45, 0x42, 0x55, 0x47, 0x10, 0x01, 0x12, 0x08,
+ 0x0a, 0x04, 0x49, 0x4e, 0x46, 0x4f, 0x10, 0x02, 0x12, 0x08, 0x0a, 0x04, 0x57, 0x41, 0x52, 0x4e,
+ 0x10, 0x03, 0x12, 0x09, 0x0a, 0x05, 0x45, 0x52, 0x52, 0x4f, 0x52, 0x10, 0x04, 0x12, 0x09, 0x0a,
+ 0x05, 0x46, 0x41, 0x54, 0x41, 0x4c, 0x10, 0x05, 0x42, 0x49, 0x5a, 0x47, 0x61, 0x2e, 0x79, 0x61,
+ 0x6e, 0x64, 0x65, 0x78, 0x2d, 0x74, 0x65, 0x61, 0x6d, 0x2e, 0x72, 0x75, 0x2f, 0x79, 0x64, 0x62,
+ 0x2f, 0x6c, 0x69, 0x62, 0x72, 0x61, 0x72, 0x79, 0x2f, 0x79, 0x71, 0x6c, 0x2f, 0x70, 0x72, 0x6f,
+ 0x76, 0x69, 0x64, 0x65, 0x72, 0x73, 0x2f, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x69, 0x63, 0x2f, 0x63,
+ 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x2f, 0x61, 0x70, 0x70, 0x2f, 0x63, 0x6f, 0x6e,
+ 0x66, 0x69, 0x67, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
@@ -395,26 +564,34 @@ func file_ydb_library_yql_providers_generic_connector_app_config_server_proto_ra
}
var file_ydb_library_yql_providers_generic_connector_app_config_server_proto_enumTypes = make([]protoimpl.EnumInfo, 1)
-var file_ydb_library_yql_providers_generic_connector_app_config_server_proto_msgTypes = make([]protoimpl.MessageInfo, 4)
+var file_ydb_library_yql_providers_generic_connector_app_config_server_proto_msgTypes = make([]protoimpl.MessageInfo, 6)
var file_ydb_library_yql_providers_generic_connector_app_config_server_proto_goTypes = []interface{}{
- (ELogLevel)(0), // 0: NYql.Connector.App.Config.ELogLevel
- (*TServerConfig)(nil), // 1: NYql.Connector.App.Config.TServerConfig
- (*TServerTLSConfig)(nil), // 2: NYql.Connector.App.Config.TServerTLSConfig
- (*TServerReadLimit)(nil), // 3: NYql.Connector.App.Config.TServerReadLimit
- (*TLoggerConfig)(nil), // 4: NYql.Connector.App.Config.TLoggerConfig
- (*common.TEndpoint)(nil), // 5: NYql.NConnector.NApi.TEndpoint
+ (ELogLevel)(0), // 0: NYql.Connector.App.Config.ELogLevel
+ (*TServerConfig)(nil), // 1: NYql.Connector.App.Config.TServerConfig
+ (*TConnectorServerConfig)(nil), // 2: NYql.Connector.App.Config.TConnectorServerConfig
+ (*TServerTLSConfig)(nil), // 3: NYql.Connector.App.Config.TServerTLSConfig
+ (*TServerReadLimit)(nil), // 4: NYql.Connector.App.Config.TServerReadLimit
+ (*TLoggerConfig)(nil), // 5: NYql.Connector.App.Config.TLoggerConfig
+ (*TPprofServerConfig)(nil), // 6: NYql.Connector.App.Config.TPprofServerConfig
+ (*common.TEndpoint)(nil), // 7: NYql.NConnector.NApi.TEndpoint
}
var file_ydb_library_yql_providers_generic_connector_app_config_server_proto_depIdxs = []int32{
- 5, // 0: NYql.Connector.App.Config.TServerConfig.endpoint:type_name -> NYql.NConnector.NApi.TEndpoint
- 2, // 1: NYql.Connector.App.Config.TServerConfig.tls:type_name -> NYql.Connector.App.Config.TServerTLSConfig
- 3, // 2: NYql.Connector.App.Config.TServerConfig.read_limit:type_name -> NYql.Connector.App.Config.TServerReadLimit
- 4, // 3: NYql.Connector.App.Config.TServerConfig.logger:type_name -> NYql.Connector.App.Config.TLoggerConfig
- 0, // 4: NYql.Connector.App.Config.TLoggerConfig.log_level:type_name -> NYql.Connector.App.Config.ELogLevel
- 5, // [5:5] is the sub-list for method output_type
- 5, // [5:5] is the sub-list for method input_type
- 5, // [5:5] is the sub-list for extension type_name
- 5, // [5:5] is the sub-list for extension extendee
- 0, // [0:5] is the sub-list for field type_name
+ 7, // 0: NYql.Connector.App.Config.TServerConfig.endpoint:type_name -> NYql.NConnector.NApi.TEndpoint
+ 3, // 1: NYql.Connector.App.Config.TServerConfig.tls:type_name -> NYql.Connector.App.Config.TServerTLSConfig
+ 2, // 2: NYql.Connector.App.Config.TServerConfig.connector_server:type_name -> NYql.Connector.App.Config.TConnectorServerConfig
+ 4, // 3: NYql.Connector.App.Config.TServerConfig.read_limit:type_name -> NYql.Connector.App.Config.TServerReadLimit
+ 5, // 4: NYql.Connector.App.Config.TServerConfig.logger:type_name -> NYql.Connector.App.Config.TLoggerConfig
+ 6, // 5: NYql.Connector.App.Config.TServerConfig.pprof_server:type_name -> NYql.Connector.App.Config.TPprofServerConfig
+ 7, // 6: NYql.Connector.App.Config.TConnectorServerConfig.endpoint:type_name -> NYql.NConnector.NApi.TEndpoint
+ 3, // 7: NYql.Connector.App.Config.TConnectorServerConfig.tls:type_name -> NYql.Connector.App.Config.TServerTLSConfig
+ 0, // 8: NYql.Connector.App.Config.TLoggerConfig.log_level:type_name -> NYql.Connector.App.Config.ELogLevel
+ 7, // 9: NYql.Connector.App.Config.TPprofServerConfig.endpoint:type_name -> NYql.NConnector.NApi.TEndpoint
+ 3, // 10: NYql.Connector.App.Config.TPprofServerConfig.tls:type_name -> NYql.Connector.App.Config.TServerTLSConfig
+ 11, // [11:11] is the sub-list for method output_type
+ 11, // [11:11] is the sub-list for method input_type
+ 11, // [11:11] is the sub-list for extension type_name
+ 11, // [11:11] is the sub-list for extension extendee
+ 0, // [0:11] is the sub-list for field type_name
}
func init() { file_ydb_library_yql_providers_generic_connector_app_config_server_proto_init() }
@@ -436,7 +613,7 @@ func file_ydb_library_yql_providers_generic_connector_app_config_server_proto_in
}
}
file_ydb_library_yql_providers_generic_connector_app_config_server_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
- switch v := v.(*TServerTLSConfig); i {
+ switch v := v.(*TConnectorServerConfig); i {
case 0:
return &v.state
case 1:
@@ -448,7 +625,7 @@ func file_ydb_library_yql_providers_generic_connector_app_config_server_proto_in
}
}
file_ydb_library_yql_providers_generic_connector_app_config_server_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
- switch v := v.(*TServerReadLimit); i {
+ switch v := v.(*TServerTLSConfig); i {
case 0:
return &v.state
case 1:
@@ -460,6 +637,18 @@ func file_ydb_library_yql_providers_generic_connector_app_config_server_proto_in
}
}
file_ydb_library_yql_providers_generic_connector_app_config_server_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*TServerReadLimit); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ file_ydb_library_yql_providers_generic_connector_app_config_server_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*TLoggerConfig); i {
case 0:
return &v.state
@@ -471,6 +660,18 @@ func file_ydb_library_yql_providers_generic_connector_app_config_server_proto_in
return nil
}
}
+ file_ydb_library_yql_providers_generic_connector_app_config_server_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*TPprofServerConfig); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
}
type x struct{}
out := protoimpl.TypeBuilder{
@@ -478,7 +679,7 @@ func file_ydb_library_yql_providers_generic_connector_app_config_server_proto_in
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_ydb_library_yql_providers_generic_connector_app_config_server_proto_rawDesc,
NumEnums: 1,
- NumMessages: 4,
+ NumMessages: 6,
NumExtensions: 0,
NumServices: 0,
},
diff --git a/ydb/library/yql/providers/generic/connector/app/config/server.proto b/ydb/library/yql/providers/generic/connector/app/config/server.proto
index d179cc08e73..cedbc2a893d 100644
--- a/ydb/library/yql/providers/generic/connector/app/config/server.proto
+++ b/ydb/library/yql/providers/generic/connector/app/config/server.proto
@@ -7,17 +7,29 @@ option go_package = "github.com/ydb-platform/ydb/ydb/library/yql/providers/gener
// Connector server configuration
message TServerConfig {
- // Network address server will be listening on
- NYql.NConnector.NApi.TEndpoint endpoint = 1;
- // TLS settings.
- // Leave it empty for insecure connections.
- TServerTLSConfig tls = 2;
+ // TODO: remove it after YQ-2057
+ NYql.NConnector.NApi.TEndpoint endpoint = 1 [deprecated=true];
+ TServerTLSConfig tls = 2 [deprecated=true];
+
+ TConnectorServerConfig connector_server = 5;
// This is a rough restriction for YQ memory consumption until
// https://st.yandex-team.ru/YQ-2057 is implemented.
// Leave it empty if you want to avoid any memory limits.
TServerReadLimit read_limit = 3;
// Logger config
TLoggerConfig logger = 4;
+ // Go runtime profiler.
+ // Disabled if this part of config is empty.
+ TPprofServerConfig pprof_server = 6;
+}
+
+// TConnectorServerConfig - configuration of the main GRPC server
+message TConnectorServerConfig {
+ // Network address server will be listening on
+ NYql.NConnector.NApi.TEndpoint endpoint = 1;
+ // TLS settings.
+ // Leave it empty for insecure connections.
+ TServerTLSConfig tls = 2;
}
message TServerTLSConfig {
@@ -52,3 +64,12 @@ enum ELogLevel {
ERROR = 4;
FATAL = 5;
}
+
+// TPprofServerConfig configures HTTP server delivering Go runtime profiler data
+message TPprofServerConfig {
+ // Network address server will be listening on
+ NYql.NConnector.NApi.TEndpoint endpoint = 1;
+ // TLS settings.
+ // Leave it empty for insecure connections.
+ TServerTLSConfig tls = 2;
+}
diff --git a/ydb/library/yql/providers/generic/connector/app/server/cmd.go b/ydb/library/yql/providers/generic/connector/app/server/cmd.go
index c6c603474a7..a5631ee0610 100644
--- a/ydb/library/yql/providers/generic/connector/app/server/cmd.go
+++ b/ydb/library/yql/providers/generic/connector/app/server/cmd.go
@@ -12,7 +12,7 @@ var Cmd = &cobra.Command{
Use: "server",
Short: "Connector server",
Run: func(cmd *cobra.Command, args []string) {
- if err := runServer(cmd, args); err != nil {
+ if err := run(cmd, args); err != nil {
fmt.Println(err)
os.Exit(1)
}
diff --git a/ydb/library/yql/providers/generic/connector/app/server/config.go b/ydb/library/yql/providers/generic/connector/app/server/config.go
index c6450020d15..e3dacbb6da8 100644
--- a/ydb/library/yql/providers/generic/connector/app/server/config.go
+++ b/ydb/library/yql/providers/generic/connector/app/server/config.go
@@ -12,16 +12,32 @@ import (
)
func validateServerConfig(c *config.TServerConfig) error {
- if err := validateEndpoint(c.Endpoint); err != nil {
- return fmt.Errorf("validate `Server`: %w", err)
+ if err := validateConnectorServerConfig(c.ConnectorServer); err != nil {
+ return fmt.Errorf("validate `connector_server`: %w", err)
}
- if err := validateServerTLSConfig(c.Tls); err != nil {
- return fmt.Errorf("validate `TLS`: %w", err)
+ if err := validateServerReadLimit(c.ReadLimit); err != nil {
+ return fmt.Errorf("validate `read_limit`: %w", err)
}
- if err := validateServerReadLimit(c.ReadLimit); err != nil {
- return fmt.Errorf("validate `ReadLimit`: %w", err)
+ if err := validatePprofServerConfig(c.PprofServer); err != nil {
+ return fmt.Errorf("validate `pprof_server`: %w", err)
+ }
+
+ return nil
+}
+
+func validateConnectorServerConfig(c *config.TConnectorServerConfig) error {
+ // TODO: make it required after YQ-2057
+ // if c == nil {
+ // return fmt.Errorf("required field is missing")
+ // }
+ if err := validateEndpoint(c.Endpoint); err != nil {
+ return fmt.Errorf("validate `endpoint`: %w", err)
+ }
+
+ if err := validateServerTLSConfig(c.Tls); err != nil {
+ return fmt.Errorf("validate `tls`: %w", err)
}
return nil
@@ -29,15 +45,15 @@ func validateServerConfig(c *config.TServerConfig) error {
func validateEndpoint(c *api_common.TEndpoint) error {
if c == nil {
- return fmt.Errorf("missing required field `Server`")
+ return fmt.Errorf("required field is missing")
}
if c.Host == "" {
- return fmt.Errorf("invalid value of field `Server.Host`: %v", c.Host)
+ return fmt.Errorf("invalid value of field `host`: %v", c.Host)
}
if c.Port == 0 || c.Port > math.MaxUint16 {
- return fmt.Errorf("invalid value of field `Server.Port`: %v", c.Port)
+ return fmt.Errorf("invalid value of field `port`: %v", c.Port)
}
return nil
@@ -50,11 +66,11 @@ func validateServerTLSConfig(c *config.TServerTLSConfig) error {
}
if err := fileMustExist(c.Key); err != nil {
- return fmt.Errorf("invalid value of field `TLS.Key`: %w", err)
+ return fmt.Errorf("invalid value of field `key`: %w", err)
}
if err := fileMustExist(c.Cert); err != nil {
- return fmt.Errorf("invalid value of field `TLS.Cert`: %w", err)
+ return fmt.Errorf("invalid value of field `cert`: %w", err)
}
return nil
@@ -68,7 +84,24 @@ func validateServerReadLimit(c *config.TServerReadLimit) error {
// but if it's not nil, one must set limits explicitly
if c.GetRows() == 0 {
- return fmt.Errorf("invalid value of field `ServerReadLimit.Rows`")
+ return fmt.Errorf("invalid value of field `rows`")
+ }
+
+ return nil
+}
+
+func validatePprofServerConfig(c *config.TPprofServerConfig) error {
+ if c == nil {
+ // It's OK to disable profiler
+ return nil
+ }
+
+ if err := validateEndpoint(c.Endpoint); err != nil {
+ return fmt.Errorf("validate `endpoint`: %w", err)
+ }
+
+ if err := validateServerTLSConfig(c.Tls); err != nil {
+ return fmt.Errorf("validate `tls`: %w", err)
}
return nil
diff --git a/ydb/library/yql/providers/generic/connector/app/server/launcher.go b/ydb/library/yql/providers/generic/connector/app/server/launcher.go
new file mode 100644
index 00000000000..3711c0dda3a
--- /dev/null
+++ b/ydb/library/yql/providers/generic/connector/app/server/launcher.go
@@ -0,0 +1,115 @@
+package server
+
+import (
+ "fmt"
+ "os"
+ "os/signal"
+ "syscall"
+
+ "github.com/spf13/cobra"
+ "github.com/ydb-platform/ydb/library/go/core/log"
+ "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/config"
+ "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/server/utils"
+)
+
+type service interface {
+ start() error
+ stop()
+}
+
+type launcher struct {
+ services map[string]service
+ logger log.Logger
+}
+
+func (l *launcher) start() <-chan error {
+ errChan := make(chan error, len(l.services))
+
+ for key := range l.services {
+ key := key
+ go func(key string) {
+ l.logger.Info("starting service", log.String("service", key))
+
+ // blocking call
+ errChan <- l.services[key].start()
+ }(key)
+ }
+
+ return errChan
+}
+
+func (l *launcher) stop() {
+ // TODO: make it concurrent
+ for key, s := range l.services {
+ l.logger.Info("stopping service", log.String("service", key))
+ s.stop()
+ }
+}
+
+const (
+ connectorServiceKey = "connector"
+ pprofServiceKey = "pprof"
+)
+
+func newLauncher(logger log.Logger, cfg *config.TServerConfig) (*launcher, error) {
+ l := &launcher{
+ services: make(map[string]service, 2),
+ logger: logger,
+ }
+
+ var err error
+
+ // init GRPC server
+ l.services[connectorServiceKey], err = newServiceConnector(
+ log.With(logger, log.String("service", connectorServiceKey)),
+ cfg)
+ if err != nil {
+ return nil, fmt.Errorf("new connector server: %w", err)
+ }
+
+ // init Pprof server
+ if cfg.PprofServer != nil {
+ l.services[pprofServiceKey] = newServicePprof(
+ log.With(logger, log.String("service", pprofServiceKey)),
+ cfg.PprofServer)
+ }
+
+ return l, nil
+}
+
+func run(cmd *cobra.Command, _ []string) error {
+ configPath, err := cmd.Flags().GetString(configFlag)
+ if err != nil {
+ return fmt.Errorf("get config flag: %v", err)
+ }
+
+ cfg, err := newConfigFromPath(configPath)
+ if err != nil {
+ return fmt.Errorf("new config: %w", err)
+ }
+
+ logger, err := utils.NewLoggerFromConfig(cfg.Logger)
+ if err != nil {
+ return fmt.Errorf("new logger from config: %w", err)
+ }
+
+ l, err := newLauncher(logger, cfg)
+ if err != nil {
+ return fmt.Errorf("new launcher: %w", err)
+ }
+
+ errChan := l.start()
+
+ signalChan := make(chan os.Signal, 1)
+ signal.Notify(signalChan, os.Interrupt, syscall.SIGTERM)
+
+ select {
+ case err := <-errChan:
+ logger.Error("service fatal error", log.Error(err))
+ case sig := <-signalChan:
+ logger.Info("interrupting signal", log.Any("value", sig))
+ l.stop()
+ }
+
+ return nil
+}
diff --git a/ydb/library/yql/providers/generic/connector/app/server/utils/columnar_buffer_arrow_ipc_streaming.go b/ydb/library/yql/providers/generic/connector/app/server/paging/columnar_buffer_arrow_ipc_streaming.go
index 651d381383e..e5c86c71f1c 100644
--- a/ydb/library/yql/providers/generic/connector/app/server/utils/columnar_buffer_arrow_ipc_streaming.go
+++ b/ydb/library/yql/providers/generic/connector/app/server/paging/columnar_buffer_arrow_ipc_streaming.go
@@ -1,4 +1,4 @@
-package utils
+package paging
import (
"bytes"
@@ -9,6 +9,7 @@ import (
"github.com/apache/arrow/go/v13/arrow/ipc"
"github.com/apache/arrow/go/v13/arrow/memory"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
+ "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/server/utils"
api_service_protos "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/libgo/service/protos"
)
@@ -17,7 +18,7 @@ type columnarBufferArrowIPCStreaming struct {
builders []array.Builder
readLimiter ReadLimiter
schema *arrow.Schema
- typeMapper TypeMapper
+ typeMapper utils.TypeMapper
ydbTypes []*Ydb.Type
}
@@ -88,7 +89,7 @@ type columnarBufferArrowIPCStreamingEmptyColumns struct {
arrowAllocator memory.Allocator
readLimiter ReadLimiter
schema *arrow.Schema
- typeMapper TypeMapper
+ typeMapper utils.TypeMapper
rowsAdded int64
}
diff --git a/ydb/library/yql/providers/generic/connector/app/server/utils/columnar_buffer_factory.go b/ydb/library/yql/providers/generic/connector/app/server/paging/columnar_buffer_factory.go
index 460387701f2..a1e2d380d8d 100644
--- a/ydb/library/yql/providers/generic/connector/app/server/utils/columnar_buffer_factory.go
+++ b/ydb/library/yql/providers/generic/connector/app/server/paging/columnar_buffer_factory.go
@@ -1,10 +1,11 @@
-package utils
+package paging
import (
"fmt"
"github.com/apache/arrow/go/v13/arrow/memory"
"github.com/ydb-platform/ydb/library/go/core/log"
+ "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/server/utils"
api_service_protos "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/libgo/service/protos"
)
@@ -20,22 +21,21 @@ type ColumnarBuffer interface {
type ColumnarBufferFactory struct {
arrowAllocator memory.Allocator
readLimiterFactory *ReadLimiterFactory
+ logger log.Logger
+ format api_service_protos.TReadSplitsRequest_EFormat
+ selectWhat *api_service_protos.TSelect_TWhat
+ typeMapper utils.TypeMapper
}
-func (cbf *ColumnarBufferFactory) MakeBuffer(
- logger log.Logger,
- format api_service_protos.TReadSplitsRequest_EFormat,
- selectWhat *api_service_protos.TSelect_TWhat,
- typeMapper TypeMapper,
-) (ColumnarBuffer, error) {
- switch format {
+func (cbf *ColumnarBufferFactory) MakeBuffer() (ColumnarBuffer, error) {
+ switch cbf.format {
case api_service_protos.TReadSplitsRequest_ARROW_IPC_STREAMING:
- schema, builders, err := SelectWhatToArrow(selectWhat, cbf.arrowAllocator)
+ schema, builders, err := utils.SelectWhatToArrow(cbf.selectWhat, cbf.arrowAllocator)
if err != nil {
return nil, fmt.Errorf("convert Select.What to arrow.Schema: %w", err)
}
- ydbTypes, err := SelectWhatToYDBTypes(selectWhat)
+ ydbTypes, err := utils.SelectWhatToYDBTypes(cbf.selectWhat)
if err != nil {
return nil, fmt.Errorf("convert Select.What to Ydb.Types: %w", err)
}
@@ -44,8 +44,8 @@ func (cbf *ColumnarBufferFactory) MakeBuffer(
return &columnarBufferArrowIPCStreamingEmptyColumns{
arrowAllocator: cbf.arrowAllocator,
schema: schema,
- readLimiter: cbf.readLimiterFactory.MakeReadLimiter(logger),
- typeMapper: typeMapper,
+ readLimiter: cbf.readLimiterFactory.MakeReadLimiter(cbf.logger),
+ typeMapper: cbf.typeMapper,
rowsAdded: 0,
}, nil
}
@@ -54,21 +54,29 @@ func (cbf *ColumnarBufferFactory) MakeBuffer(
arrowAllocator: cbf.arrowAllocator,
schema: schema,
builders: builders,
- readLimiter: cbf.readLimiterFactory.MakeReadLimiter(logger),
- typeMapper: typeMapper,
+ readLimiter: cbf.readLimiterFactory.MakeReadLimiter(cbf.logger),
+ typeMapper: cbf.typeMapper,
ydbTypes: ydbTypes,
}, nil
default:
- return nil, fmt.Errorf("unknown format: %v", format)
+ return nil, fmt.Errorf("unknown format: %v", cbf.format)
}
}
func NewColumnarBufferFactory(
+ logger log.Logger,
arrowAllocator memory.Allocator,
readLimiterFactory *ReadLimiterFactory,
+ format api_service_protos.TReadSplitsRequest_EFormat,
+ selectWhat *api_service_protos.TSelect_TWhat,
+ typeMapper utils.TypeMapper,
) *ColumnarBufferFactory {
return &ColumnarBufferFactory{
+ logger: logger,
arrowAllocator: arrowAllocator,
readLimiterFactory: readLimiterFactory,
+ format: format,
+ selectWhat: selectWhat,
+ typeMapper: typeMapper,
}
}
diff --git a/ydb/library/yql/providers/generic/connector/app/server/paging/doc.go b/ydb/library/yql/providers/generic/connector/app/server/paging/doc.go
new file mode 100644
index 00000000000..702d6255f02
--- /dev/null
+++ b/ydb/library/yql/providers/generic/connector/app/server/paging/doc.go
@@ -0,0 +1,3 @@
+// Package paging contains logic of splitting incoming data stream
+// into the separate pages or blocks.
+package paging
diff --git a/ydb/library/yql/providers/generic/connector/app/server/utils/read_limiter.go b/ydb/library/yql/providers/generic/connector/app/server/paging/read_limiter.go
index 4175b6ec49e..765d387cce3 100644
--- a/ydb/library/yql/providers/generic/connector/app/server/utils/read_limiter.go
+++ b/ydb/library/yql/providers/generic/connector/app/server/paging/read_limiter.go
@@ -1,10 +1,11 @@
-package utils
+package paging
import (
"fmt"
"github.com/ydb-platform/ydb/library/go/core/log"
"github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/config"
+ "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/server/utils"
)
// ReadLimiter helps to limitate amount of data returned by Connector server in every read request.
@@ -24,8 +25,10 @@ type readLimiterRows struct {
}
func (rl *readLimiterRows) AddRow() error {
- if rl.rowsRead == rl.rowsLimit {
- return fmt.Errorf("can read only %d line(s) from data source per request: %w", rl.rowsLimit, ErrReadLimitExceeded)
+ if rl.rowsRead >= rl.rowsLimit {
+ return fmt.Errorf("can read only %d line(s) from data source per request: %w",
+ rl.rowsLimit,
+ utils.ErrReadLimitExceeded)
}
rl.rowsRead++
diff --git a/ydb/library/yql/providers/generic/connector/app/server/paging/writer.go b/ydb/library/yql/providers/generic/connector/app/server/paging/writer.go
new file mode 100644
index 00000000000..c56c1ff44c0
--- /dev/null
+++ b/ydb/library/yql/providers/generic/connector/app/server/paging/writer.go
@@ -0,0 +1,102 @@
+package paging
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/ydb-platform/ydb/library/go/core/log"
+ api_service_protos "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/libgo/service/protos"
+)
+
+type Writer struct {
+ buffer ColumnarBuffer // accumulates data from rows
+ bufferQueue chan<- ColumnarBuffer // outgoing buffer queue
+ bufferFactory *ColumnarBufferFactory // creates new buffer
+ pagination *api_service_protos.TPagination // settings
+ rowsReceived uint64 // simple stats
+ logger log.Logger // annotated logger
+ operational bool // flag showing if it's ready to return data
+ ctx context.Context // client context
+}
+
+func (pw *Writer) AddRow(acceptors []any) error {
+ if !pw.operational {
+ return fmt.Errorf("paging writer is not operational")
+ }
+
+ if err := pw.buffer.AddRow(acceptors); err != nil {
+ return fmt.Errorf("acceptors to row set: %w", err)
+ }
+
+ if pw.isEnough() {
+ if err := pw.flush(); err != nil {
+ return fmt.Errorf("flush: %w", err)
+ }
+ }
+
+ pw.rowsReceived++
+
+ return nil
+}
+
+func (pw *Writer) isEnough() bool {
+ // TODO: implement pagination logic, check limits provided by client
+ return pw.rowsReceived%10000 == 0
+}
+
+func (pw *Writer) flush() error {
+ select {
+ case pw.bufferQueue <- pw.buffer:
+ case <-pw.ctx.Done():
+ return pw.ctx.Err()
+ }
+
+ var err error
+
+ pw.buffer, err = pw.bufferFactory.MakeBuffer()
+ if err != nil {
+ return fmt.Errorf("make buffer: %w", err)
+ }
+
+ return nil
+}
+
+func (pw *Writer) Finish() error {
+ if err := pw.flush(); err != nil {
+ return fmt.Errorf("flush: %w", err)
+ }
+
+ pw.operational = false
+
+ // notify reader about end of the stream
+ close(pw.bufferQueue)
+
+ return nil
+}
+
+func NewWriter(
+ ctx context.Context,
+ logger log.Logger,
+ bufferFactory *ColumnarBufferFactory,
+ bufferQueue chan<- ColumnarBuffer,
+ pagination *api_service_protos.TPagination,
+) (*Writer, error) {
+ if pagination != nil {
+ return nil, fmt.Errorf("pagination settings are not supported yet")
+ }
+
+ buffer, err := bufferFactory.MakeBuffer()
+ if err != nil {
+ return nil, fmt.Errorf("wrap buffer: %w", err)
+ }
+
+ return &Writer{
+ bufferFactory: bufferFactory,
+ bufferQueue: bufferQueue,
+ buffer: buffer,
+ logger: logger,
+ pagination: pagination,
+ operational: true,
+ ctx: ctx,
+ }, nil
+}
diff --git a/ydb/library/yql/providers/generic/connector/app/server/paging/ya.make b/ydb/library/yql/providers/generic/connector/app/server/paging/ya.make
new file mode 100644
index 00000000000..03e448c0a17
--- /dev/null
+++ b/ydb/library/yql/providers/generic/connector/app/server/paging/ya.make
@@ -0,0 +1,15 @@
+GO_LIBRARY()
+
+SRCS(
+ columnar_buffer_arrow_ipc_streaming.go
+ columnar_buffer_factory.go
+ doc.go
+ read_limiter.go
+ writer.go
+)
+
+GO_TEST_SRCS(
+ time_test.go
+)
+
+END()
diff --git a/ydb/library/yql/providers/generic/connector/app/server/rdbms/handler.go b/ydb/library/yql/providers/generic/connector/app/server/rdbms/handler.go
index cd4e747fcb3..3a5e81102de 100644
--- a/ydb/library/yql/providers/generic/connector/app/server/rdbms/handler.go
+++ b/ydb/library/yql/providers/generic/connector/app/server/rdbms/handler.go
@@ -8,6 +8,7 @@ import (
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
"github.com/ydb-platform/ydb/library/go/core/log"
api_common "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/api/common"
+ "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/server/paging"
"github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/server/utils"
api_service_protos "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/libgo/service/protos"
)
@@ -24,7 +25,7 @@ type Handler interface {
logger log.Logger,
dataSourceInstance *api_common.TDataSourceInstance,
split *api_service_protos.TSplit,
- pagingWriter *utils.PagingWriter,
+ pagingWriter *paging.Writer,
) error
TypeMapper() utils.TypeMapper
@@ -90,7 +91,7 @@ func (h *handlerImpl[CONN]) ReadSplit(
logger log.Logger,
dataSourceInstance *api_common.TDataSourceInstance,
split *api_service_protos.TSplit,
- pagingWriter *utils.PagingWriter,
+ pagingWriter *paging.Writer,
) error {
conn, err := h.connectionManager.Make(ctx, logger, dataSourceInstance)
if err != nil {
diff --git a/ydb/library/yql/providers/generic/connector/app/server/server.go b/ydb/library/yql/providers/generic/connector/app/server/service_connector.go
index 0055934493e..991c87bebec 100644
--- a/ydb/library/yql/providers/generic/connector/app/server/server.go
+++ b/ydb/library/yql/providers/generic/connector/app/server/service_connector.go
@@ -6,9 +6,10 @@ import (
"net"
"github.com/apache/arrow/go/v13/arrow/memory"
- "github.com/spf13/cobra"
"github.com/ydb-platform/ydb/library/go/core/log"
+ api_common "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/api/common"
"github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/config"
+ "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/server/paging"
"github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/server/rdbms"
"github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/server/utils"
api_service "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/libgo/service"
@@ -17,19 +18,22 @@ import (
"google.golang.org/grpc/credentials"
)
-type Server struct {
+type serviceConnector struct {
api_service.UnimplementedConnectorServer
- handlerFactory *rdbms.HandlerFactory
- columnarBufferFactory *utils.ColumnarBufferFactory
- cfg *config.TServerConfig
- logger log.Logger
+ handlerFactory *rdbms.HandlerFactory
+ memoryAllocator memory.Allocator
+ readLimiterFactory *paging.ReadLimiterFactory
+ cfg *config.TServerConfig
+ grpcServer *grpc.Server
+ listener net.Listener
+ logger log.Logger
}
-func (s *Server) ListTables(_ *api_service_protos.TListTablesRequest, _ api_service.Connector_ListTablesServer) error {
+func (s *serviceConnector) ListTables(_ *api_service_protos.TListTablesRequest, _ api_service.Connector_ListTablesServer) error {
return nil
}
-func (s *Server) DescribeTable(
+func (s *serviceConnector) DescribeTable(
ctx context.Context,
request *api_service_protos.TDescribeTableRequest,
) (*api_service_protos.TDescribeTableResponse, error) {
@@ -68,7 +72,7 @@ func (s *Server) DescribeTable(
return out, nil
}
-func (s *Server) ListSplits(request *api_service_protos.TListSplitsRequest, stream api_service.Connector_ListSplitsServer) error {
+func (s *serviceConnector) ListSplits(request *api_service_protos.TListSplitsRequest, stream api_service.Connector_ListSplitsServer) error {
logger := utils.AnnotateLogger(s.logger, "ListSplits", nil)
logger.Info("request handling started", log.Int("total selects", len(request.Selects)))
@@ -93,7 +97,7 @@ func (s *Server) ListSplits(request *api_service_protos.TListSplitsRequest, stre
return nil
}
-func (s *Server) doListSplitsHandleSelect(
+func (s *serviceConnector) doListSplitsHandleSelect(
stream api_service.Connector_ListSplitsServer,
slct *api_service_protos.TSelect,
totalSplits *int,
@@ -118,7 +122,7 @@ func (s *Server) doListSplitsHandleSelect(
return nil
}
-func (s *Server) doListSplitsResponse(
+func (s *serviceConnector) doListSplitsResponse(
logger log.Logger,
stream api_service.Connector_ListSplitsServer,
response *api_service_protos.TListSplitsResponse,
@@ -136,7 +140,7 @@ func (s *Server) doListSplitsResponse(
return nil
}
-func (s *Server) ReadSplits(request *api_service_protos.TReadSplitsRequest, stream api_service.Connector_ReadSplitsServer) error {
+func (s *serviceConnector) ReadSplits(request *api_service_protos.TReadSplitsRequest, stream api_service.Connector_ReadSplitsServer) error {
logger := utils.AnnotateLogger(s.logger, "ReadSplits", request.DataSourceInstance)
logger.Info("request handling started", log.Int("total_splits", len(request.Splits)))
@@ -154,10 +158,10 @@ func (s *Server) ReadSplits(request *api_service_protos.TReadSplitsRequest, stre
logger = log.With(logger, log.String("data source kind", request.DataSourceInstance.GetKind().String()))
- for i, split := range request.Splits {
- logger.Info("reading split", log.Int("split_ordered_num", i))
+ var totalBytes uint64
- err := s.readSplit(logger, stream, request, split)
+ for _, split := range request.Splits {
+ bytesInSplit, err := s.readSplit(logger, stream, request, split)
if err != nil {
logger.Error("request handling failed", log.Error(err))
@@ -169,143 +173,141 @@ func (s *Server) ReadSplits(request *api_service_protos.TReadSplitsRequest, stre
return err
}
}
+
+ totalBytes += bytesInSplit
}
- logger.Info("request handling finished")
+ logger.Info("request handling finished", log.UInt64("total_bytes", totalBytes))
return nil
}
-func (s *Server) readSplit(
+func (s *serviceConnector) readSplit(
logger log.Logger,
stream api_service.Connector_ReadSplitsServer,
request *api_service_protos.TReadSplitsRequest,
split *api_service_protos.TSplit,
-) error {
- logger.Debug("reading split", log.String("split", split.String()))
+) (uint64, error) {
+ logger.Debug("split reading started")
handler, err := s.handlerFactory.Make(logger, request.DataSourceInstance.Kind)
if err != nil {
- return fmt.Errorf("get handler: %w", err)
+ return 0, fmt.Errorf("get handler: %w", err)
}
- buf, err := s.columnarBufferFactory.MakeBuffer(logger, request.Format, split.Select.What, handler.TypeMapper())
- if err != nil {
- return fmt.Errorf("make buffer: %w", err)
- }
+ columnarBufferFactory := paging.NewColumnarBufferFactory(logger, s.memoryAllocator, s.readLimiterFactory, request.Format, split.Select.What, handler.TypeMapper())
- pagingWriter, err := utils.NewPagingWriter(
+ streamer, err := NewStreamer(
logger,
- buf,
stream,
- request.GetPagination(),
+ request,
+ split,
+ columnarBufferFactory,
+ handler,
)
if err != nil {
- return fmt.Errorf("new paging writer result set: %w", err)
+ return 0, fmt.Errorf("new streamer: %w", err)
}
- if err = handler.ReadSplit(stream.Context(), logger, request.GetDataSourceInstance(), split, pagingWriter); err != nil {
- return fmt.Errorf("read split: %w", err)
- }
-
- rowsReceived, err := pagingWriter.Finish()
+ totalBytesSent, err := streamer.Run()
if err != nil {
- return fmt.Errorf("finish paging writer: %w", err)
+ return 0, fmt.Errorf("run paging streamer: %w", err)
}
- logger.Debug("reading split finished", log.UInt64("rows_received", rowsReceived))
+ logger.Debug("split reading finished", log.UInt64("total_bytes_sent", totalBytesSent))
- return nil
+ return totalBytesSent, nil
}
-func (s *Server) run() error {
- endpoint := utils.EndpointToString(s.cfg.Endpoint)
-
- lis, err := net.Listen("tcp", endpoint)
- if err != nil {
- return fmt.Errorf("net listen: %w", err)
- }
-
- options, err := s.makeOptions()
- if err != nil {
- return fmt.Errorf(": %w", err)
- }
-
- grpcSrv := grpc.NewServer(options...)
-
- api_service.RegisterConnectorServer(grpcSrv, s)
-
- s.logger.Info("listener started", log.String("address", lis.Addr().String()))
+func (s *serviceConnector) start() error {
+ s.logger.Debug("starting GRPC server", log.String("address", s.listener.Addr().String()))
- if err := grpcSrv.Serve(lis); err != nil {
+ if err := s.grpcServer.Serve(s.listener); err != nil {
return fmt.Errorf("listener serve: %w", err)
}
return nil
}
-func (s *Server) makeOptions() ([]grpc.ServerOption, error) {
- var opts []grpc.ServerOption
+func makeGRPCOptions(logger log.Logger, cfg *config.TServerConfig) ([]grpc.ServerOption, error) {
+ var (
+ opts []grpc.ServerOption
+ tls *config.TServerTLSConfig
+ )
- if s.cfg.Tls != nil {
- s.logger.Info("server will use TLS connections")
+ // TODO: drop deprecated fields after YQ-2057
+ switch {
+ case cfg.GetConnectorServer().GetTls() != nil:
+ tls = cfg.GetConnectorServer().GetTls()
+ case cfg.GetTls() != nil:
+ tls = cfg.GetTls()
+ default:
+ logger.Warn("server will use insecure connections")
- s.logger.Info("reading key pair", log.String("cert", s.cfg.Tls.Cert), log.String("key", s.cfg.Tls.Key))
+ return opts, nil
+ }
- creds, err := credentials.NewServerTLSFromFile(s.cfg.Tls.Cert, s.cfg.Tls.Key)
- if err != nil {
- return nil, fmt.Errorf("new server TLS from file: %w", err)
- }
+ logger.Info("server will use TLS connections")
- opts = append(opts, grpc.Creds(creds))
- } else {
- s.logger.Warn("server will use insecure connections")
+ logger.Debug("reading key pair", log.String("cert", tls.Cert), log.String("key", tls.Key))
+
+ creds, err := credentials.NewServerTLSFromFile(tls.Cert, tls.Key)
+ if err != nil {
+ return nil, fmt.Errorf("new server TLS from file: %w", err)
}
+ opts = append(opts, grpc.Creds(creds))
+
return opts, nil
}
-func newServer(
+func (s *serviceConnector) stop() {
+ s.grpcServer.GracefulStop()
+}
+
+func newServiceConnector(
logger log.Logger,
cfg *config.TServerConfig,
-) (*Server, error) {
+) (service, error) {
queryLoggerFactory := utils.NewQueryLoggerFactory(cfg.Logger)
- return &Server{
- handlerFactory: rdbms.NewHandlerFactory(queryLoggerFactory),
- columnarBufferFactory: utils.NewColumnarBufferFactory(
- memory.DefaultAllocator,
- utils.NewReadLimiterFactory(cfg.ReadLimit),
- ),
- logger: logger,
- cfg: cfg,
- }, nil
-}
+ // TODO: drop deprecated fields after YQ-2057
+ var endpoint *api_common.TEndpoint
-func runServer(cmd *cobra.Command, _ []string) error {
- configPath, err := cmd.Flags().GetString(configFlag)
- if err != nil {
- return fmt.Errorf("get config flag: %v", err)
- }
+ switch {
+ case cfg.GetConnectorServer().GetEndpoint() != nil:
+ endpoint = cfg.ConnectorServer.GetEndpoint()
+ case cfg.GetEndpoint() != nil:
+ logger.Warn("Using deprecated field `endpoint` from config. Please update your config.")
- cfg, err := newConfigFromPath(configPath)
- if err != nil {
- return fmt.Errorf("new config: %w", err)
+ endpoint = cfg.GetEndpoint()
+ default:
+ return nil, fmt.Errorf("invalid config: no endpoint")
}
- logger, err := utils.NewLoggerFromConfig(cfg.Logger)
+ listener, err := net.Listen("tcp", utils.EndpointToString(endpoint))
if err != nil {
- return fmt.Errorf("new logger from config: %w", err)
+ return nil, fmt.Errorf("net listen: %w", err)
}
- srv, err := newServer(logger, cfg)
+ options, err := makeGRPCOptions(logger, cfg)
if err != nil {
- return fmt.Errorf("new server: %w", err)
+ return nil, fmt.Errorf("make GRPC options: %w", err)
}
- if err := srv.run(); err != nil {
- return fmt.Errorf("server run: %w", err)
+ grpcServer := grpc.NewServer(options...)
+
+ s := &serviceConnector{
+ handlerFactory: rdbms.NewHandlerFactory(queryLoggerFactory),
+ memoryAllocator: memory.DefaultAllocator,
+ readLimiterFactory: paging.NewReadLimiterFactory(cfg.ReadLimit),
+ logger: logger,
+ grpcServer: grpcServer,
+ listener: listener,
+ cfg: cfg,
}
- return nil
+ api_service.RegisterConnectorServer(grpcServer, s)
+
+ return s, nil
}
diff --git a/ydb/library/yql/providers/generic/connector/app/server/service_pprof.go b/ydb/library/yql/providers/generic/connector/app/server/service_pprof.go
new file mode 100644
index 00000000000..7fcef47bd7f
--- /dev/null
+++ b/ydb/library/yql/providers/generic/connector/app/server/service_pprof.go
@@ -0,0 +1,63 @@
+package server
+
+import (
+ "context"
+ "fmt"
+ "net/http"
+ "net/http/pprof"
+ "time"
+
+ "github.com/ydb-platform/ydb/library/go/core/log"
+ "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/config"
+ "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/server/utils"
+)
+
+type servicePprof struct {
+ httpServer *http.Server
+ mux *http.ServeMux
+ logger log.Logger
+}
+
+func (s *servicePprof) start() error {
+ s.logger.Debug("starting HTTP server", log.String("address", s.httpServer.Addr))
+
+ if err := s.httpServer.ListenAndServe(); err != nil {
+ return fmt.Errorf("http server listen and server: %w", err)
+ }
+
+ return nil
+}
+
+const shutdownTimeout = 5 * time.Second
+
+func (s *servicePprof) stop() {
+ ctx, cancel := context.WithTimeout(context.Background(), shutdownTimeout)
+ defer cancel()
+
+ err := s.httpServer.Shutdown(ctx)
+ if err != nil {
+ s.logger.Error("shutdown http server", log.Error(err))
+ }
+}
+
+func newServicePprof(logger log.Logger, cfg *config.TPprofServerConfig) service {
+ httpServer := &http.Server{
+ Addr: utils.EndpointToString(cfg.Endpoint),
+ }
+
+ mux := http.NewServeMux()
+ mux.HandleFunc("/debug/pprof/", pprof.Index)
+ mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
+ mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
+ mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
+ mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
+
+ // TODO: TLS
+ logger.Warn("server will use insecure connections")
+
+ return &servicePprof{
+ httpServer: httpServer,
+ logger: logger,
+ mux: mux,
+ }
+}
diff --git a/ydb/library/yql/providers/generic/connector/app/server/streamer.go b/ydb/library/yql/providers/generic/connector/app/server/streamer.go
new file mode 100644
index 00000000000..053c4db5a4e
--- /dev/null
+++ b/ydb/library/yql/providers/generic/connector/app/server/streamer.go
@@ -0,0 +1,129 @@
+package server
+
+import (
+ "fmt"
+
+ "github.com/ydb-platform/ydb/library/go/core/log"
+ "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/server/paging"
+ "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/server/rdbms"
+ "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/app/server/utils"
+ api_service "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/libgo/service"
+ api_service_protos "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/libgo/service/protos"
+ "golang.org/x/sync/errgroup"
+)
+
+type Streamer struct {
+ stream api_service.Connector_ReadSplitsServer
+ bufferQueue chan paging.ColumnarBuffer
+ request *api_service_protos.TReadSplitsRequest
+ handler rdbms.Handler
+ split *api_service_protos.TSplit
+ pagingWriter *paging.Writer
+ totalBytesSent uint64
+ logger log.Logger
+}
+
+func (s *Streamer) writeDataToStream() error {
+ for {
+ select {
+ case buffer, ok := <-s.bufferQueue:
+ if !ok {
+ return nil
+ }
+
+ if err := s.sendBufferToStream(buffer); err != nil {
+ return fmt.Errorf("send buffer to stream: %w", err)
+ }
+ case <-s.stream.Context().Done():
+ return s.stream.Context().Err()
+ }
+ }
+}
+
+func (s *Streamer) sendBufferToStream(buffer paging.ColumnarBuffer) error {
+ // buffer must be explicitly marked as unused,
+ // otherwise memory will leak
+ defer buffer.Release()
+
+ resp, err := buffer.ToResponse()
+ if err != nil {
+ return fmt.Errorf("buffer to response: %w", err)
+ }
+
+ utils.DumpReadSplitsResponse(s.logger, resp)
+
+ if err := s.stream.Send(resp); err != nil {
+ return fmt.Errorf("stream send: %w", err)
+ }
+
+ s.totalBytesSent += uint64(len(resp.GetArrowIpcStreaming()))
+
+ return nil
+}
+
+func (s *Streamer) readDataFromSource() error {
+ // run blocking read
+ err := s.handler.ReadSplit(
+ s.stream.Context(),
+ s.logger,
+ s.request.GetDataSourceInstance(),
+ s.split,
+ s.pagingWriter,
+ )
+
+ if err != nil {
+ return fmt.Errorf("read split: %w", err)
+ }
+
+ // when read is finished, dump data to stream
+ if err := s.pagingWriter.Finish(); err != nil {
+ return fmt.Errorf("finish paging writer: %w", err)
+ }
+
+ return nil
+}
+
+func (s *Streamer) Run() (uint64, error) {
+ var group errgroup.Group
+
+ group.Go(s.readDataFromSource)
+ group.Go(s.writeDataToStream)
+
+ if err := group.Wait(); err != nil {
+ return 0, fmt.Errorf("group wait: %w", err)
+ }
+
+ return s.totalBytesSent, nil
+}
+
+func NewStreamer(
+ logger log.Logger,
+ stream api_service.Connector_ReadSplitsServer,
+ request *api_service_protos.TReadSplitsRequest,
+ split *api_service_protos.TSplit,
+ columnarBufferFactory *paging.ColumnarBufferFactory,
+ handler rdbms.Handler,
+) (*Streamer, error) {
+ bufferQueue := make(chan paging.ColumnarBuffer, 10)
+
+ pagingWriter, err := paging.NewWriter(
+ stream.Context(),
+ logger,
+ columnarBufferFactory,
+ bufferQueue,
+ request.GetPagination(),
+ )
+ if err != nil {
+ return nil, fmt.Errorf("new paging writer: %w", err)
+ }
+
+ return &Streamer{
+ logger: logger,
+ stream: stream,
+ split: split,
+ request: request,
+ handler: handler,
+ pagingWriter: pagingWriter,
+ bufferQueue: bufferQueue,
+ }, nil
+}
diff --git a/ydb/library/yql/providers/generic/connector/app/server/utils/paging_writer.go b/ydb/library/yql/providers/generic/connector/app/server/utils/paging_writer.go
deleted file mode 100644
index c5f5328a71e..00000000000
--- a/ydb/library/yql/providers/generic/connector/app/server/utils/paging_writer.go
+++ /dev/null
@@ -1,97 +0,0 @@
-package utils
-
-import (
- "fmt"
-
- "github.com/ydb-platform/ydb/library/go/core/log"
- api_service "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/libgo/service"
- api_service_protos "github.com/ydb-platform/ydb/ydb/library/yql/providers/generic/connector/libgo/service/protos"
-)
-
-type PagingWriter struct {
- buffer ColumnarBuffer // row data accumulator
- stream api_service.Connector_ReadSplitsServer // outgoing data stream
- pagination *api_service_protos.TPagination // settings
- rowsReceived uint64 // simple stats
- logger log.Logger // annotated logger
- operational bool // flag showing if it's ready to return data
-}
-
-func (pw *PagingWriter) AddRow(acceptors []any) error {
- if !pw.operational {
- return fmt.Errorf("paging writer is not operational")
- }
-
- if err := pw.buffer.AddRow(acceptors); err != nil {
- return fmt.Errorf("acceptors to row set: %w", err)
- }
-
- if pw.isEnough() {
- if err := pw.flush(); err != nil {
- return fmt.Errorf("flush: %w", err)
- }
- }
-
- pw.rowsReceived++
-
- return nil
-}
-
-func (pw *PagingWriter) isEnough() bool {
- // TODO: implement pagination logic, check limits provided by client
- return false
-}
-
-func (pw *PagingWriter) flush() error {
- if pw.buffer == nil {
- return nil
- }
-
- response, err := pw.buffer.ToResponse()
- if err != nil {
- return fmt.Errorf("build response: %v", err)
- }
-
- response.Error = NewSuccess()
-
- DumpReadSplitsResponse(pw.logger, response)
-
- if err := pw.stream.Send(response); err != nil {
- return fmt.Errorf("send stream")
- }
-
- pw.buffer.Release()
-
- pw.buffer = nil
-
- return nil
-}
-
-func (pw *PagingWriter) Finish() (uint64, error) {
- if err := pw.flush(); err != nil {
- return 0, fmt.Errorf("flush: %w", err)
- }
-
- pw.operational = false
-
- return pw.rowsReceived, nil
-}
-
-func NewPagingWriter(
- logger log.Logger,
- buffer ColumnarBuffer,
- stream api_service.Connector_ReadSplitsServer,
- pagination *api_service_protos.TPagination,
-) (*PagingWriter, error) {
- if pagination != nil {
- return nil, fmt.Errorf("pagination settings are not supported yet")
- }
-
- return &PagingWriter{
- buffer: buffer,
- logger: logger,
- stream: stream,
- pagination: pagination,
- operational: true,
- }, nil
-}
diff --git a/ydb/library/yql/providers/generic/connector/app/server/utils/ya.make b/ydb/library/yql/providers/generic/connector/app/server/utils/ya.make
index f675fae9243..5205e5a2d05 100644
--- a/ydb/library/yql/providers/generic/connector/app/server/utils/ya.make
+++ b/ydb/library/yql/providers/generic/connector/app/server/utils/ya.make
@@ -2,18 +2,14 @@ GO_LIBRARY()
SRCS(
arrow_helpers.go
- columnar_buffer_arrow_ipc_streaming.go
- columnar_buffer_factory.go
connection_manager.go
converters.go
doc.go
endpoint.go
errors.go
logger.go
- paging_writer.go
protobuf.go
query_builder.go
- read_limiter.go
select_helpers.go
time.go
type_mapper.go
diff --git a/ydb/library/yql/providers/generic/connector/app/server/ya.make b/ydb/library/yql/providers/generic/connector/app/server/ya.make
index 7bd36d8d8f4..54f16efa3f8 100644
--- a/ydb/library/yql/providers/generic/connector/app/server/ya.make
+++ b/ydb/library/yql/providers/generic/connector/app/server/ya.make
@@ -2,9 +2,12 @@ GO_LIBRARY()
SRCS(
cmd.go
- doc.go
config.go
- server.go
+ doc.go
+ launcher.go
+ service_connector.go
+ service_pprof.go
+ streamer.go
validate.go
)
@@ -12,6 +15,7 @@ END()
RECURSE(
clickhouse
+ paging
postgresql
rdbms
utils
diff --git a/ydb/library/yql/providers/generic/connector/libcpp/error.cpp b/ydb/library/yql/providers/generic/connector/libcpp/error.cpp
index 062262ddc20..a465a7b2876 100644
--- a/ydb/library/yql/providers/generic/connector/libcpp/error.cpp
+++ b/ydb/library/yql/providers/generic/connector/libcpp/error.cpp
@@ -12,18 +12,6 @@ namespace NYql::NConnector {
return error;
}
- bool IsSuccess(const NApi::TError& error) {
- YQL_ENSURE(error.status() != Ydb::StatusIds_StatusCode::StatusIds_StatusCode_STATUS_CODE_UNSPECIFIED,
- "error status code is not initialized");
-
- auto ok = error.status() == Ydb::StatusIds_StatusCode::StatusIds_StatusCode_SUCCESS;
- if (ok) {
- YQL_ENSURE(error.issues_size() == 0, "request succeeded, but issues are not empty");
- }
-
- return ok;
- }
-
TIssues ErrorToIssues(const NApi::TError& error) {
TIssues issues;
issues.Reserve(error.get_arr_issues().size() + 1);
@@ -55,8 +43,6 @@ namespace NYql::NConnector {
}
void ErrorToExprCtx(const NApi::TError& error, TExprContext& ctx, const TPosition& position, const TString& summary) {
- YQL_ENSURE(!IsSuccess(error));
-
// add high-level error
TStringBuilder ss;
ss << summary << ": status=" << Ydb::StatusIds_StatusCode_Name(error.status()) << ", message=" << error.message();
diff --git a/ydb/library/yql/providers/generic/connector/libcpp/error.h b/ydb/library/yql/providers/generic/connector/libcpp/error.h
index 715742fc65c..35625947b83 100644
--- a/ydb/library/yql/providers/generic/connector/libcpp/error.h
+++ b/ydb/library/yql/providers/generic/connector/libcpp/error.h
@@ -10,7 +10,24 @@
namespace NYql::NConnector {
NApi::TError NewSuccess();
- bool IsSuccess(const NApi::TError& error);
+ template <typename TResponse>
+ bool IsSuccess(const TResponse& response) {
+ if (!response.has_error()) {
+ return true;
+ }
+
+ const NApi::TError& error = response.error();
+
+ YQL_ENSURE(error.status() != Ydb::StatusIds_StatusCode::StatusIds_StatusCode_STATUS_CODE_UNSPECIFIED,
+ "error status code is not initialized");
+
+ auto ok = error.status() == Ydb::StatusIds_StatusCode::StatusIds_StatusCode_SUCCESS;
+ if (ok) {
+ YQL_ENSURE(error.issues_size() == 0, "request succeeded, but issues are not empty");
+ }
+
+ return ok;
+ }
TIssues ErrorToIssues(const NApi::TError& error);
diff --git a/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp b/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp
index 412961976a1..b6c41cd3c5c 100644
--- a/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp
+++ b/ydb/library/yql/providers/generic/provider/yql_generic_load_meta.cpp
@@ -220,8 +220,7 @@ namespace NYql {
const auto it = Results_.find(TGenericState::TTableAddress(clusterName, tableName));
if (Results_.cend() != it) {
const auto& response = it->second->Response;
- const auto& error = response.error();
- if (NConnector::IsSuccess(error)) {
+ if (NConnector::IsSuccess(response)) {
TGenericState::TTableMeta tableMeta;
tableMeta.Schema = response.schema();
tableMeta.DataSourceInstance = it->second->DataSourceInstance;
@@ -246,6 +245,7 @@ namespace NYql {
break;
}
} else {
+ const auto& error = response.error();
NConnector::ErrorToExprCtx(error, ctx, ctx.GetPosition(read.Pos()),
TStringBuilder()
<< "Loading metadata for table: " << clusterName << '.' << tableName);