diff options
author | a-romanov <[email protected]> | 2022-03-05 11:07:01 +0300 |
---|---|---|
committer | a-romanov <[email protected]> | 2022-03-05 11:07:01 +0300 |
commit | 764d7eeeac4d68d0c28028efd1bc1969c6fe08fd (patch) | |
tree | beac595ff61b2944cd21f645d609d264f294f1b7 | |
parent | cfca477dd2974305d9848eb52d85ed6f067f655a (diff) |
Drop unused code.
ref:83edb53bfa57e6e1df50807657cd9bf6528c17b4
-rw-r--r-- | ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_mkql_compiler.cpp | 101 | ||||
-rw-r--r-- | ydb/library/yql/providers/ydb/provider/yql_ydb_mkql_compiler.cpp | 104 |
2 files changed, 1 insertions, 204 deletions
diff --git a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_mkql_compiler.cpp b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_mkql_compiler.cpp index 5a380898e8a..4cbe1890262 100644 --- a/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_mkql_compiler.cpp +++ b/ydb/library/yql/providers/clickhouse/provider/yql_clickhouse_mkql_compiler.cpp @@ -33,93 +33,6 @@ TRuntimeNode BuildNativeParseCall(TRuntimeNode input, TType* inputItemType, TTyp }); } -TRuntimeNode BuildClickHouseInputCall( - TType* outputType, - TType* itemType, - const TString& cluster, - TStringBuf table, - TStringBuf timezone, - const TClickHouseState::TPtr& state, - NCommon::TMkqlBuildContext& ctx) -{ - TStringBuf db, dbTable; - if (!table.TrySplit(".", db, dbTable)) { - db = "default"; - dbTable = table; - } - - auto structType = AS_TYPE(TStructType, itemType); - auto endpoint = state->Configuration->Endpoints.FindPtr(cluster); - Y_ENSURE(endpoint); - - const auto host = endpoint->first; - const auto colonPos = host.rfind(':'); - YQL_ENSURE(colonPos != TString::npos, "Missing port: " << host); - - const auto hostWithoutPort = host.substr(0, colonPos); - const auto port = FromString<ui16>(host.substr(colonPos + 1)); - const bool secure = endpoint->second; - - TString typeConfig; - TStringOutput stream(typeConfig); - NJson::TJsonWriter writer(&stream, NJson::TJsonWriterConfig()); - - writer.OpenMap(); - - writer.Write("db", db); - writer.Write("table", dbTable); - writer.Write("secure", secure); - writer.Write("host", hostWithoutPort); - writer.Write("port", port); - writer.Write("token", TString("cluster:default_") + cluster); - - writer.OpenArray("columns"); - for (ui32 i = 0; i < structType->GetMembersCount(); ++i) { - writer.Write(structType->GetMemberName(i)); - } - writer.CloseArray(); - - writer.CloseMap(); - writer.Flush(); - - const auto voidType = ctx.ProgramBuilder.NewVoid().GetStaticType(); - TCallableTypeBuilder callbackTypeBuilder(ctx.ProgramBuilder.GetTypeEnvironment(), "", voidType); - const auto callbackType = callbackTypeBuilder.Build(); - - const auto strType = ctx.ProgramBuilder.NewDataType(NUdf::TDataType<char*>::Id); - const auto argsType = ctx.ProgramBuilder.NewTupleType({ strType }); - const auto userType = ctx.ProgramBuilder.NewTupleType({ argsType }); - const auto retType = ctx.ProgramBuilder.NewStreamType(itemType); - - TCallableTypeBuilder funcTypeBuilder(ctx.ProgramBuilder.GetTypeEnvironment(), "UDF", retType); - funcTypeBuilder.Add(strType); - - const auto funcType = funcTypeBuilder.Build(); - - TCallableBuilder callbackBuilder(ctx.ProgramBuilder.GetTypeEnvironment(), "DqNotify", callbackType); - auto callback = TRuntimeNode(callbackBuilder.Build(), false); - - auto flow = ctx.ProgramBuilder.ToFlow( - ctx.ProgramBuilder.Apply( - ctx.ProgramBuilder.TypedUdf("ClickHouse.remoteSource", funcType, - ctx.ProgramBuilder.NewVoid(), userType, typeConfig), - TArrayRef<const TRuntimeNode>({ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>(timezone)}) - )); - - if (!AS_TYPE(TFlowType, outputType)->GetItemType()->IsSameType(*itemType)) { - flow = ctx.ProgramBuilder.ExpandMap(flow, - [&](TRuntimeNode item) { - TRuntimeNode::TList fields; - fields.reserve(structType->GetMembersCount()); - auto i = 0U; - std::generate_n(std::back_inserter(fields), structType->GetMembersCount(), [&](){ return ctx.ProgramBuilder.Member(item, structType->GetMemberName(i++)); }); - return fields; - }); - } - - return flow; -} - } void RegisterDqClickHouseMkqlCompilers(NCommon::TMkqlCallableCompilerBase& compiler, const TClickHouseState::TPtr& state) { @@ -134,20 +47,6 @@ void RegisterDqClickHouseMkqlCompilers(NCommon::TMkqlCallableCompilerBase& compi return TRuntimeNode(); }); - - compiler.ChainCallable(TDqReadWideWrap::CallableName(), - [state](const TExprNode& node, NCommon::TMkqlBuildContext& ctx) { - if (const auto wrapper = TDqReadWrapBase(&node); wrapper.Input().Maybe<TClReadTable>().IsValid()) { - const auto clRead = wrapper.Input().Cast<TClReadTable>(); - const auto readType = clRead.Ref().GetTypeAnn()->Cast<TTupleExprType>()->GetItems().back(); - const auto inputItemType = NCommon::BuildType(wrapper.Input().Ref(), *GetSeqItemType(readType), ctx.ProgramBuilder); - const auto cluster = clRead.DataSource().Cluster().StringValue(); - const auto outputType = NCommon::BuildType(wrapper.Ref(), *wrapper.Ref().GetTypeAnn(), ctx.ProgramBuilder); - return BuildClickHouseInputCall(outputType, inputItemType, cluster, clRead.Table(), clRead.Timezone(), state, ctx); - } - - return TRuntimeNode(); - }); } } diff --git a/ydb/library/yql/providers/ydb/provider/yql_ydb_mkql_compiler.cpp b/ydb/library/yql/providers/ydb/provider/yql_ydb_mkql_compiler.cpp index b76d3b00078..1d56916bec1 100644 --- a/ydb/library/yql/providers/ydb/provider/yql_ydb_mkql_compiler.cpp +++ b/ydb/library/yql/providers/ydb/provider/yql_ydb_mkql_compiler.cpp @@ -33,65 +33,9 @@ TRuntimeNode BuildYdbParseCall(TRuntimeNode input, TType* itemType, NCommon::TMk }); } -template<bool Async> -TRuntimeNode BuildYdbInputCall( - const TType* outputType, - TType* itemType, - const std::string_view& database, - const std::string_view& endpoint, - bool secure, - const std::string_view& token, - const std::string_view& table, - const std::string_view& snapshot, - const std::vector<NUdf::TDataTypeId>& keysTypes, - TRuntimeNode limit, - NCommon::TMkqlBuildContext& ctx) -{ - const auto streamType = ctx.ProgramBuilder.NewStreamType(ctx.ProgramBuilder.NewDataType(NUdf::TDataType<char*>::Id)); - const auto structType = static_cast<const TStructType*>(itemType); - - TRuntimeNode::TList columns; - columns.reserve(structType->GetMembersCount()); - auto i = 0U; - std::generate_n(std::back_inserter(columns), structType->GetMembersCount(), [&](){ return ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>(structType->GetMemberName(i++)); }); - - TRuntimeNode::TList keys; - keys.reserve(keysTypes.size()); - std::transform(keysTypes.cbegin(), keysTypes.cend(), std::back_inserter(keys), [&ctx](const NUdf::TDataTypeId id){ return ctx.ProgramBuilder.NewDataLiteral(id); }); - - TCallableBuilder scan(ctx.ProgramBuilder.GetTypeEnvironment(), Async ? "KikScanAsync" : "KikScan", streamType); - - scan.Add(ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>(table)); - scan.Add(ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>(database)); - scan.Add(ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>(endpoint)); - scan.Add(ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>(token)); - scan.Add(ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>(snapshot)); - scan.Add(ctx.ProgramBuilder.NewTuple(columns)); - scan.Add(ctx.ProgramBuilder.NewTuple(keys)); - scan.Add(ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>("")); - scan.Add(ctx.ProgramBuilder.NewDataLiteral<NUdf::EDataSlot::String>("")); - scan.Add(limit); - scan.Add(ctx.ProgramBuilder.NewDataLiteral(secure)); - - auto flow = ctx.ProgramBuilder.ToFlow(ctx.ProgramBuilder.Apply(ctx.ProgramBuilder.Udf("ClickHouseClient.ParseFromYdb", {}, itemType), {TRuntimeNode(scan.Build(), false)})); - - if (!AS_TYPE(TFlowType, outputType)->GetItemType()->IsSameType(*itemType)) { - flow = ctx.ProgramBuilder.ExpandMap(flow, - [&](TRuntimeNode item) { - TRuntimeNode::TList fields; - fields.reserve(structType->GetMembersCount()); - auto j = 0U; - std::generate_n(std::back_inserter(fields), structType->GetMembersCount(), [&](){ return ctx.ProgramBuilder.Member(item, structType->GetMemberName(j++)); }); - return fields; - }); - } - - return flow; -} - } -void RegisterDqYdbMkqlCompilers(NCommon::TMkqlCallableCompilerBase& compiler, const TYdbState::TPtr& state) { +void RegisterDqYdbMkqlCompilers(NCommon::TMkqlCallableCompilerBase& compiler, const TYdbState::TPtr&) { compiler.ChainCallable(TDqSourceWideWrap::CallableName(), [](const TExprNode& node, NCommon::TMkqlBuildContext& ctx) { if (const auto wrapper = TDqSourceWideWrap(&node); wrapper.DataSource().Category().Value() == YdbProviderName) { @@ -102,52 +46,6 @@ void RegisterDqYdbMkqlCompilers(NCommon::TMkqlCallableCompilerBase& compiler, co return TRuntimeNode(); }); - - compiler.ChainCallable(TDqReadWideWrap::CallableName(), - [state](const TExprNode& node, NCommon::TMkqlBuildContext& ctx) { - if (const auto wrapper = TDqReadWrapBase(&node); wrapper.Input().Maybe<TYdbReadTable>().IsValid()) { - const auto read = wrapper.Input().Cast<TYdbReadTable>(); - const auto cluster = read.DataSource().Cluster().StringValue(); - const auto readType = read.Ref().GetTypeAnn()->Cast<TTupleExprType>()->GetItems().back(); - const auto inputItemType = NCommon::BuildType(wrapper.Input().Ref(), *GetSeqItemType(readType), ctx.ProgramBuilder); - const auto outputType = NCommon::BuildType(wrapper.Ref(), *wrapper.Ref().GetTypeAnn(), ctx.ProgramBuilder); - const auto limit = read.LimitHint() ? - ctx.ProgramBuilder.NewOptional(MkqlBuildExpr(read.LimitHint().Cast().Ref(), ctx)): - ctx.ProgramBuilder.NewEmptyOptionalDataLiteral(NUdf::TDataType<ui64>::Id); - const auto& meta = state->Tables[std::make_pair(cluster, read.Table().StringValue())]; - const auto& config = state->Configuration->Clusters[cluster]; - if (meta.ReadAsync) - return BuildYdbInputCall<true>( - outputType, - inputItemType, - config.Database, - config.Endpoint, - config.Secure, - wrapper.Token().Name().Cast().Value(), - read.Table(), - meta.Snapshot.GetSnapshotId(), - meta.KeyTypes, - limit, - ctx - ); - else - return BuildYdbInputCall<false>( - outputType, - inputItemType, - config.Database, - config.Endpoint, - config.Secure, - wrapper.Token().Name().Cast().Value(), - read.Table(), - meta.Snapshot.GetSnapshotId(), - meta.KeyTypes, - limit, - ctx - ); - } - - return TRuntimeNode(); - }); } } |