1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
|
#pragma once
#include "yql_generic_settings.h"
#include <yql/essentials/core/yql_data_provider.h>
#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>
#include <ydb/library/yql/providers/generic/connector/libcpp/client.h>
#include <ydb/public/sdk/cpp/client/ydb_types/credentials/credentials.h>
namespace NKikimr::NMiniKQL {
class IFunctionRegistry;
} // namespace NKikimr::NMiniKQL
namespace NYql {
struct TGenericState: public TThrRefBase {
using TPtr = TIntrusivePtr<TGenericState>;
using TTableAddress = std::pair<TString, TString>; // std::pair<clusterName, tableName>
struct TTableMeta {
const TStructExprType* ItemType = nullptr;
TVector<TString> ColumnOrder;
NYql::NConnector::NApi::TSchema Schema;
NYql::TGenericDataSourceInstance DataSourceInstance;
};
using TGetTableResult = std::pair<std::optional<const TTableMeta*>, std::optional<TIssue>>;
TGenericState() = delete;
TGenericState(
TTypeAnnotationContext* types,
const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
const std::shared_ptr<IDatabaseAsyncResolver>& databaseResolver,
const ISecuredServiceAccountCredentialsFactory::TPtr& credentialsFactory,
const NConnector::IClient::TPtr& genericClient,
const TGenericGatewayConfig& gatewayConfig)
: Types(types)
, Configuration(MakeIntrusive<TGenericConfiguration>())
, FunctionRegistry(functionRegistry)
, DatabaseResolver(databaseResolver)
, CredentialsFactory(credentialsFactory)
, GenericClient(genericClient)
{
Configuration->Init(gatewayConfig, databaseResolver, DatabaseAuth, types->Credentials);
}
void AddTable(const TStringBuf& clusterName, const TStringBuf& tableName, TTableMeta&& tableMeta);
TGetTableResult GetTable(const TStringBuf& clusterName, const TStringBuf& tableName) const;
TGetTableResult GetTable(const TStringBuf& clusterName, const TStringBuf& tableName, const TPosition& position) const;
TTypeAnnotationContext* Types;
TGenericConfiguration::TPtr Configuration = MakeIntrusive<TGenericConfiguration>();
const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry;
// key - (database id, database type), value - credentials to access managed APIs
IDatabaseAsyncResolver::TDatabaseAuthMap DatabaseAuth;
std::shared_ptr<IDatabaseAsyncResolver> DatabaseResolver;
// key - cluster name, value - TCredentialsProviderPtr
// It's important to cache credentials providers, because they make IO
// (synchronous call via Token Accessor client) during the construction.
std::unordered_map<TString, NYdb::TCredentialsProviderPtr> CredentialProviders;
ISecuredServiceAccountCredentialsFactory::TPtr CredentialsFactory;
NConnector::IClient::TPtr GenericClient;
private:
THashMap<TTableAddress, TTableMeta> Tables_;
};
} // namespace NYql
|