aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authora-romanov <a-romanov@yandex-team.ru>2022-03-18 03:14:32 +0300
committera-romanov <a-romanov@yandex-team.ru>2022-03-18 03:14:32 +0300
commit972191bd72f3e29e1070f02cc43917a1fc8a81ee (patch)
tree130f9d8753b5b7fcd248481cb7fcfa4f4d2ab0cd
parent29742ca2d4486c34dd664eaf05731175fe2b30a7 (diff)
downloadydb-972191bd72f3e29e1070f02cc43917a1fc8a81ee.tar.gz
YQ-727 S3 Source coro actor first draft.
ref:e08f3219646c5a1216fd053eea1e60c477a113c0
-rw-r--r--ydb/library/yql/providers/s3/actors/CMakeLists.txt1
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp250
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h2
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp2
-rw-r--r--ydb/library/yql/providers/s3/proto/source.proto3
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp10
6 files changed, 259 insertions, 9 deletions
diff --git a/ydb/library/yql/providers/s3/actors/CMakeLists.txt b/ydb/library/yql/providers/s3/actors/CMakeLists.txt
index e95f3343a1..6e59be0cdd 100644
--- a/ydb/library/yql/providers/s3/actors/CMakeLists.txt
+++ b/ydb/library/yql/providers/s3/actors/CMakeLists.txt
@@ -16,6 +16,7 @@ target_link_libraries(providers-s3-actors PUBLIC
yutil
yql-minikql-computation
common-token_accessor-client
+ common-schema-mkql
yql-public-types
dq-actors-compute
providers-common-http_gateway
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index d2d0670654..32e88705f1 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -1,15 +1,26 @@
#include "yql_s3_read_actor.h"
#include <ydb/library/yql/minikql/mkql_string_util.h>
+#include <ydb/library/yql/minikql/computation/mkql_computation_node.h>
+#include <ydb/library/yql/minikql/mkql_program_builder.h>
+#include <ydb/library/yql/minikql/invoke_builtins/mkql_builtins.h>
+#include <ydb/library/yql/minikql/mkql_function_registry.h>
+#include <ydb/library/yql/minikql/mkql_terminator.h>
+#include <ydb/library/yql/minikql/comp_nodes/mkql_factories.h>
+#include <ydb/library/yql/providers/common/schema/mkql/yql_mkql_schema.h>
#include <ydb/library/yql/utils/yql_panic.h>
+
#include <ydb/library/yql/providers/s3/proto/range.pb.h>
#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
+#include <library/cpp/actors/core/actor_coroutine.h>
#include <library/cpp/actors/core/events.h>
#include <library/cpp/actors/core/event_local.h>
#include <library/cpp/actors/core/hfunc.h>
+#include <util/generic/size_literals.h>
+
#include <queue>
namespace NYql::NDq {
@@ -24,8 +35,10 @@ struct TEvPrivate {
EvBegin = EventSpaceBegin(TEvents::ES_PRIVATE),
EvReadResult = EvBegin,
+ EvReadFinished,
EvReadError,
EvRetry,
+ EvReadDone,
EvEnd
};
@@ -34,20 +47,28 @@ struct TEvPrivate {
// Events
struct TEvReadResult : public TEventLocal<TEvReadResult, EvReadResult> {
- TEvReadResult(IHTTPGateway::TContent&& result): Result(std::move(result)) {}
+ TEvReadResult(IHTTPGateway::TContent&& result, size_t pathInd): Result(std::move(result)), PathIndex(pathInd) {}
IHTTPGateway::TContent Result;
+ const size_t PathIndex = 0;
+ };
+
+ struct TEvReadFinished : public TEventLocal<TEvReadFinished, EvReadFinished> {
+ TEvReadFinished(size_t pathInd) : PathIndex(pathInd) {}
+ const size_t PathIndex = 0;
};
struct TEvReadError : public TEventLocal<TEvReadError, EvReadError> {
- TEvReadError(TIssues&& error, size_t pathInd): Error(std::move(error)), PathIndex(pathInd) {}
- TIssues Error;
- size_t PathIndex = 0;
+ TEvReadError(TIssues&& error, size_t pathInd) : Error(std::move(error)), PathIndex(pathInd) {}
+ const TIssues Error;
+ const size_t PathIndex = 0;
};
struct TEvRetryEvent : public NActors::TEventLocal<TEvRetryEvent, EvRetry> {
explicit TEvRetryEvent(size_t pathIndex) : PathIndex(pathIndex) {}
- size_t PathIndex = 0;
+ const size_t PathIndex = 0;
};
+
+ struct TEvReadDone : public TEventLocal<TEvReadDone, EvReadDone> {};
};
} // namespace
@@ -137,7 +158,7 @@ private:
static void OnDownloadFinished(TActorSystem* actorSystem, TActorId selfId, IHTTPGateway::TResult&& result, size_t pathInd) {
switch (result.index()) {
case 0U:
- actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvReadResult(std::get<IHTTPGateway::TContent>(std::move(result)))));
+ actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvReadResult(std::get<IHTTPGateway::TContent>(std::move(result)), pathInd)));
return;
case 1U:
actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvReadError(std::get<TIssues>(std::move(result)), pathInd)));
@@ -223,7 +244,212 @@ private:
ui32 MaxRetriesPerPath = 3;
};
+using namespace NKikimr::NMiniKQL;
+
+struct TOutput {
+ TUnboxedValueDeque Data;
+ using TPtr = std::shared_ptr<TOutput>;
+};
+
+class TS3ReadCoroImpl : public TActorCoroImpl {
+private:
+ class TCoroStreamWrapper: public TMutableComputationNode<TCoroStreamWrapper> {
+ using TBaseComputation = TMutableComputationNode<TCoroStreamWrapper>;
+ public:
+ class TStreamValue : public TComputationValue<TStreamValue> {
+ public:
+ using TBase = TComputationValue<TStreamValue>;
+
+ TStreamValue(TMemoryUsageInfo* memInfo, TS3ReadCoroImpl* coro)
+ : TBase(memInfo), Coro(coro)
+ {}
+
+ private:
+ NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& value) final {
+ return Coro->Next(value) ? NUdf::EFetchStatus::Ok : NUdf::EFetchStatus::Finish;
+ }
+
+ private:
+ TS3ReadCoroImpl *const Coro;
+ };
+
+ TCoroStreamWrapper(TComputationMutables& mutables, TS3ReadCoroImpl* coro)
+ : TBaseComputation(mutables), Coro(coro)
+ {}
+
+ NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
+ return ctx.HolderFactory.Create<TStreamValue>(Coro);
+ }
+
+ static IComputationNode* Make(TCallable&, const TComputationNodeFactoryContext& ctx, TS3ReadCoroImpl* coro) {
+ return new TCoroStreamWrapper(ctx.Mutables, coro);
+ }
+ private:
+ void RegisterDependencies() const final {}
+ TS3ReadCoroImpl *const Coro;
+ };
+
+public:
+ TS3ReadCoroImpl(const TTypeEnvironment& typeEnv, const IFunctionRegistry& functionRegistry, ui64 inputIndex, const NActors::TActorId& computeActorId, ui64, TString format, TString rowType, TOutput::TPtr outputs)
+ : TActorCoroImpl(256_KB), TypeEnv(typeEnv), FunctionRegistry(functionRegistry), InputIndex(inputIndex), Format(std::move(format)), RowType(std::move(rowType)), ComputeActorId(computeActorId), Outputs(std::move(outputs))
+ {}
+
+ bool Next(NUdf::TUnboxedValue& value) {
+ TypeEnv.GetAllocator().Release();
+ const auto ev = WaitForSpecificEvent<TEvPrivate::TEvReadResult, TEvPrivate::TEvReadError, TEvPrivate::TEvReadFinished>();
+ TypeEnv.GetAllocator().Acquire();
+ switch (const auto etype = ev->GetTypeRewrite()) {
+ case TEvPrivate::TEvReadFinished::EventType:
+ return false;
+ case TEvPrivate::TEvReadError::EventType:
+ return false;
+ case TEvPrivate::TEvReadResult::EventType:
+ value = NUdf::TUnboxedValue(MakeString(NUdf::TStringRef(std::string_view(ev->Get<TEvPrivate::TEvReadResult>()->Result))));
+ return true;
+ default:
+ return false;
+ }
+ }
+private:
+ void Run() final {
+ try {
+ const auto randStub = CreateDeterministicRandomProvider(1);
+ const auto timeStub = CreateDeterministicTimeProvider(10000000);
+
+ const auto alloc = TypeEnv.BindAllocator();
+ const auto pb = std::make_unique<TProgramBuilder>(TypeEnv, FunctionRegistry);
+
+ TCallableBuilder callableBuilder(TypeEnv, "CoroStream", pb->NewStreamType(pb->NewDataType(NUdf::EDataSlot::String)));
+
+ const auto factory = [this](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* {
+ return callable.GetType()->GetName() == "CoroStream" ?
+ TCoroStreamWrapper::Make(callable, ctx, this) : GetBuiltinFactory()(callable, ctx);
+ };
+
+ TRuntimeNode stream(callableBuilder.Build(), false);
+
+ const auto outputItemType = NCommon::ParseTypeFromYson(TStringBuf(RowType), *pb, Cerr);
+ const auto userType = pb->NewTupleType({pb->NewTupleType({pb->NewDataType(NUdf::EDataSlot::String)}), pb->NewStructType({}), outputItemType});
+ const auto root = pb->Apply(pb->Udf("ClickHouseClient.ParseFormat", {}, userType, Format), {stream});
+
+ TExploringNodeVisitor explorer;
+ explorer.Walk(root.GetNode(), TypeEnv);
+ TComputationPatternOpts opts(TypeEnv.GetAllocator().Ref(), TypeEnv, factory, &FunctionRegistry, NUdf::EValidateMode::None, NUdf::EValidatePolicy::Exception, "OFF", EGraphPerProcess::Single);
+ const auto pattern = MakeComputationPattern(explorer, root, {}, opts);
+ const auto graph = pattern->Clone(opts.ToComputationOptions(*randStub, *timeStub));
+ const TBindTerminator bind(graph->GetTerminator());
+
+ const auto output = graph->GetValue();
+ for (NUdf::TUnboxedValue v; NUdf::EFetchStatus::Ok == output.Fetch(v);)
+ Outputs->Data.emplace_back(std::move(v));
+
+ Outputs.reset();
+ } catch (const yexception& err) {
+ Cerr << __func__ << " exception " << err.what() << Endl;
+ return;
+ }
+
+ Y_UNUSED(WaitForSpecificEvent<TEvPrivate::TEvReadDone>());
+ }
+
+ void ProcessUnexpectedEvent(TAutoPtr<IEventHandle>) final {
+ Send(ComputeActorId, new IDqSourceActor::TEvSourceError(InputIndex, TIssues{TIssue("Unexpected event")}, true));
+ }
+private:
+ const TTypeEnvironment& TypeEnv;
+ const IFunctionRegistry& FunctionRegistry;
+ const ui64 InputIndex;
+ const TString Format, RowType, Compression;
+ const NActors::TActorId ComputeActorId;
+ TOutput::TPtr Outputs;
+};
+
+class TS3ReadCoroActor : public TActorCoro, public IDqSourceActor {
+public:
+ using TPath = std::tuple<TString, size_t>;
+ using TPathList = std::vector<TPath>;
+
+ TS3ReadCoroActor(THolder<TS3ReadCoroImpl> impl,
+ ui64 inputIndex,
+ IHTTPGateway::TPtr gateway,
+ const TString& url,
+ const TString& token,
+ TPathList&& paths,
+ TOutput::TPtr outputs)
+ : TActorCoro(THolder<TActorCoroImpl>(impl.Release()))
+ , InputIndex(inputIndex)
+ , Gateway(std::move(gateway))
+ , Url(url)
+ , Headers(MakeHeader(token))
+ , Paths(std::move(paths))
+ , Outputs(std::move(outputs))
+ {}
+
+ static constexpr char ActorName[] = "S3_READ_ACTOR_CORO";
+private:
+ void SaveState(const NDqProto::TCheckpoint&, NDqProto::TSourceState&) final {}
+ void LoadState(const NDqProto::TSourceState&) final {}
+ void CommitState(const NDqProto::TCheckpoint&) final {}
+ ui64 GetInputIndex() const final { return InputIndex; }
+
+ i64 GetSourceData(NKikimr::NMiniKQL::TUnboxedValueVector& buffer, bool& finished, i64) final {
+ i64 total = Outputs->Data.size();
+ std::move(Outputs->Data.begin(), Outputs->Data.end(), std::back_inserter(buffer));
+ Outputs->Data.clear();
+
+ if (Outputs.unique()) {
+ finished = true;
+ Send(SelfId(), new TEvPrivate::TEvReadDone);
+ }
+
+ return total;
+ }
+
+ void PassAway() final {
+//Todo return TActorCoro::PassAway();
+ }
+
+
+ static void OnNewData(TActorSystem* actorSystem, TActorId selfId, IHTTPGateway::TContent&& data, size_t pathInd) {
+ actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvReadResult(std::move(data), pathInd)));
+ }
+
+ static void OnDownloadFinished(TActorSystem* actorSystem, TActorId selfId, std::optional<TIssues> result, size_t pathInd) {
+ if (result)
+ actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvReadError(TIssues{*result}, pathInd)));
+ else
+ actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvReadFinished(pathInd)));
+ }
+
+ TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parent) {
+ for (size_t pathInd = 0; pathInd < Paths.size(); ++pathInd) {
+ const auto& path = Paths[pathInd];
+ Gateway->Download(Url + std::get<TString>(path),
+ Headers, std::get<size_t>(path),
+ std::bind(&TS3ReadCoroActor::OnNewData, TActivationContext::ActorSystem(), self, std::placeholders::_1, pathInd),
+ std::bind(&TS3ReadCoroActor::OnDownloadFinished, TActivationContext::ActorSystem(), self, std::placeholders::_1, pathInd));
+ };
+
+ return TActorCoro::AfterRegister(self, parent);
+ }
+
+ static IHTTPGateway::THeaders MakeHeader(const TString& token) {
+ return token.empty() ? IHTTPGateway::THeaders() : IHTTPGateway::THeaders{TString("X-YaCloud-SubjectToken:") += token};
+ }
+
+ const ui64 InputIndex;
+
+ const IHTTPGateway::TPtr Gateway;
+
+ const TString Url;
+ const IHTTPGateway::THeaders Headers;
+ const TPathList Paths;
+ const TOutput::TPtr Outputs;
+};
+
std::pair<NYql::NDq::IDqSourceActor*, IActor*> CreateS3ReadActor(
+ const TTypeEnvironment& typeEnv,
+ const IFunctionRegistry& functionRegistry,
IHTTPGateway::TPtr gateway,
NS3::TSource&& params,
ui64 inputIndex,
@@ -254,8 +480,16 @@ std::pair<NYql::NDq::IDqSourceActor*, IActor*> CreateS3ReadActor(
const auto token = secureParams.Value(params.GetToken(), TString{});
const auto credentialsProviderFactory = CreateCredentialsProviderFactoryForStructuredToken(credentialsFactory, token);
const auto authToken = credentialsProviderFactory->CreateProvider()->GetAuthInfo();
- const auto actor = new TS3ReadActor(inputIndex, std::move(gateway), params.GetUrl(), authToken, std::move(paths), computeActorId, retryConfig);
- return {actor, actor};
+
+ if (params.HasFormat()) {
+ auto outputs = std::make_shared<TOutput>();
+ auto impl = MakeHolder<TS3ReadCoroImpl>(typeEnv, functionRegistry, inputIndex, computeActorId, paths.size(), params.GetFormat(), params.GetRowType(), outputs);
+ const auto actor = new TS3ReadCoroActor(std::move(impl), inputIndex, std::move(gateway), params.GetUrl(), authToken, std::move(paths), outputs);
+ return {actor, actor};
+ } else {
+ const auto actor = new TS3ReadActor(inputIndex, std::move(gateway), params.GetUrl(), authToken, std::move(paths), computeActorId, retryConfig);
+ return {actor, actor};
+ }
}
} // namespace NYql::NDq
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h
index 685ddf7e41..1edfb68ddf 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h
@@ -10,6 +10,8 @@
namespace NYql::NDq {
std::pair<NYql::NDq::IDqSourceActor*, NActors::IActor*> CreateS3ReadActor(
+ const NKikimr::NMiniKQL::TTypeEnvironment& typeEnv,
+ const NKikimr::NMiniKQL::IFunctionRegistry& functionRegistry,
IHTTPGateway::TPtr gateway,
NS3::TSource&& params,
ui64 inputIndex,
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp
index 5319a692a5..f24dbda547 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp
@@ -12,7 +12,7 @@ void RegisterS3ReadActorFactory(
const std::shared_ptr<NYql::NS3::TRetryConfig>& retryConfig) {
factory.Register<NS3::TSource>("S3Source",
[credentialsFactory, gateway, retryConfig](NS3::TSource&& settings, IDqSourceActorFactory::TArguments&& args) {
- return CreateS3ReadActor(gateway, std::move(settings), args.InputIndex, args.SecureParams, args.TaskParams, args.ComputeActorId, credentialsFactory, retryConfig);
+ return CreateS3ReadActor(args.TypeEnv, *args.HolderFactory.GetFunctionRegistry(), gateway, std::move(settings), args.InputIndex, args.SecureParams, args.TaskParams, args.ComputeActorId, credentialsFactory, retryConfig);
});
}
diff --git a/ydb/library/yql/providers/s3/proto/source.proto b/ydb/library/yql/providers/s3/proto/source.proto
index bb8786851d..73a89ec12d 100644
--- a/ydb/library/yql/providers/s3/proto/source.proto
+++ b/ydb/library/yql/providers/s3/proto/source.proto
@@ -12,4 +12,7 @@ message TSource {
string Url = 1;
string Token = 2;
repeated TPath Path = 3;
+ optional string RowType = 4;
+ optional string Format = 5;
+ optional string Compression = 6;
}
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
index 87a0b23234..3106467d79 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
@@ -3,6 +3,7 @@
#include <ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h>
#include <ydb/library/yql/providers/common/dq/yql_dq_integration_impl.h>
+#include <ydb/library/yql/providers/common/schema/expr/yql_expr_schema.h>
#include <ydb/library/yql/providers/dq/common/yql_dq_settings.h>
#include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h>
#include <ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.h>
@@ -151,6 +152,15 @@ public:
p->SetSize(FromString<ui64>(paths.Item(i).Size().Value()));
}
+ if (const auto mayParseSettings = settings.Maybe<TS3ParseSettings>()) {
+ const auto parseSettings = mayParseSettings.Cast();
+ srcDesc.SetFormat(parseSettings.Format().StringValue().c_str());
+ srcDesc.SetRowType(NCommon::WriteTypeToYson(parseSettings.RowType().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType(), NYT::NYson::EYsonFormat::Text));
+
+ if (const auto compression = parseSettings.Compression())
+ srcDesc.SetCompression(compression.Cast().StringValue().c_str());
+ }
+
protoSettings.PackFrom(srcDesc);
sourceType = "S3Source";
}