aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authora-romanov <a-romanov@yandex-team.ru>2022-03-21 17:32:52 +0300
committera-romanov <a-romanov@yandex-team.ru>2022-03-21 17:32:52 +0300
commitcc6a2b1d8898f6908b30857eff17a9f860adb835 (patch)
tree9eed9e644b42d938f5dc5380f489f5fa1fb5e1ee
parent3f2a0c7d35ea5aef609e266ea0e770c383115dca (diff)
downloadydb-cc6a2b1d8898f6908b30857eff17a9f860adb835.tar.gz
YQ-727 S3 Source coro actor.
ref:a41532935edfdddaec5d5059dbaa1a0cec23f08a
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp209
1 files changed, 126 insertions, 83 deletions
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
index 32e88705f11..4c3dc66c270 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
@@ -38,7 +38,6 @@ struct TEvPrivate {
EvReadFinished,
EvReadError,
EvRetry,
- EvReadDone,
EvEnd
};
@@ -67,11 +66,10 @@ struct TEvPrivate {
explicit TEvRetryEvent(size_t pathIndex) : PathIndex(pathIndex) {}
const size_t PathIndex = 0;
};
-
- struct TEvReadDone : public TEventLocal<TEvReadDone, EvReadDone> {};
};
-} // namespace
+using TPath = std::tuple<TString, size_t>;
+using TPathList = std::vector<TPath>;
class TS3ReadActor : public TActorBootstrapped<TS3ReadActor>, public IDqSourceActor {
private:
@@ -106,9 +104,6 @@ private:
double Epsilon = 0.1;
};
public:
-using TPath = std::tuple<TString, size_t>;
-using TPathList = std::vector<TPath>;
-
TS3ReadActor(ui64 inputIndex,
IHTTPGateway::TPtr gateway,
const TString& url,
@@ -302,54 +297,51 @@ public:
case TEvPrivate::TEvReadFinished::EventType:
return false;
case TEvPrivate::TEvReadError::EventType:
+ Send(ComputeActorId, new IDqSourceActor::TEvSourceError(InputIndex, ev->Get<TEvPrivate::TEvReadError>()->Error, true));
return false;
case TEvPrivate::TEvReadResult::EventType:
- value = NUdf::TUnboxedValue(MakeString(NUdf::TStringRef(std::string_view(ev->Get<TEvPrivate::TEvReadResult>()->Result))));
+ value = MakeString(NUdf::TStringRef(std::string_view(ev->Get<TEvPrivate::TEvReadResult>()->Result)));
return true;
default:
return false;
}
}
private:
- void Run() final {
- try {
- const auto randStub = CreateDeterministicRandomProvider(1);
- const auto timeStub = CreateDeterministicTimeProvider(10000000);
-
- const auto alloc = TypeEnv.BindAllocator();
- const auto pb = std::make_unique<TProgramBuilder>(TypeEnv, FunctionRegistry);
+ void Run() final try {
+ const auto randStub = CreateDeterministicRandomProvider(1);
+ const auto timeStub = CreateDeterministicTimeProvider(10000000);
- TCallableBuilder callableBuilder(TypeEnv, "CoroStream", pb->NewStreamType(pb->NewDataType(NUdf::EDataSlot::String)));
+ const auto alloc = TypeEnv.BindAllocator();
+ const auto pb = std::make_unique<TProgramBuilder>(TypeEnv, FunctionRegistry);
- const auto factory = [this](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* {
- return callable.GetType()->GetName() == "CoroStream" ?
- TCoroStreamWrapper::Make(callable, ctx, this) : GetBuiltinFactory()(callable, ctx);
- };
+ TCallableBuilder callableBuilder(TypeEnv, "CoroStream", pb->NewStreamType(pb->NewDataType(NUdf::EDataSlot::String)));
- TRuntimeNode stream(callableBuilder.Build(), false);
+ const auto factory = [this](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* {
+ return callable.GetType()->GetName() == "CoroStream" ?
+ TCoroStreamWrapper::Make(callable, ctx, this) : GetBuiltinFactory()(callable, ctx);
+ };
- const auto outputItemType = NCommon::ParseTypeFromYson(TStringBuf(RowType), *pb, Cerr);
- const auto userType = pb->NewTupleType({pb->NewTupleType({pb->NewDataType(NUdf::EDataSlot::String)}), pb->NewStructType({}), outputItemType});
- const auto root = pb->Apply(pb->Udf("ClickHouseClient.ParseFormat", {}, userType, Format), {stream});
+ TRuntimeNode stream(callableBuilder.Build(), false);
- TExploringNodeVisitor explorer;
- explorer.Walk(root.GetNode(), TypeEnv);
- TComputationPatternOpts opts(TypeEnv.GetAllocator().Ref(), TypeEnv, factory, &FunctionRegistry, NUdf::EValidateMode::None, NUdf::EValidatePolicy::Exception, "OFF", EGraphPerProcess::Single);
- const auto pattern = MakeComputationPattern(explorer, root, {}, opts);
- const auto graph = pattern->Clone(opts.ToComputationOptions(*randStub, *timeStub));
- const TBindTerminator bind(graph->GetTerminator());
+ const auto outputItemType = NCommon::ParseTypeFromYson(TStringBuf(RowType), *pb, Cerr);
+ const auto userType = pb->NewTupleType({pb->NewTupleType({pb->NewDataType(NUdf::EDataSlot::String)}), pb->NewStructType({}), outputItemType});
+ const auto root = pb->Apply(pb->Udf("ClickHouseClient.ParseFormat", {}, userType, Format), {stream});
- const auto output = graph->GetValue();
- for (NUdf::TUnboxedValue v; NUdf::EFetchStatus::Ok == output.Fetch(v);)
- Outputs->Data.emplace_back(std::move(v));
+ TExploringNodeVisitor explorer;
+ explorer.Walk(root.GetNode(), TypeEnv);
+ TComputationPatternOpts opts(TypeEnv.GetAllocator().Ref(), TypeEnv, factory, &FunctionRegistry, NUdf::EValidateMode::None, NUdf::EValidatePolicy::Exception, "OFF", EGraphPerProcess::Single);
+ const auto pattern = MakeComputationPattern(explorer, root, {}, opts);
+ const auto graph = pattern->Clone(opts.ToComputationOptions(*randStub, *timeStub));
+ const TBindTerminator bind(graph->GetTerminator());
- Outputs.reset();
- } catch (const yexception& err) {
- Cerr << __func__ << " exception " << err.what() << Endl;
- return;
- }
+ const auto output = graph->GetValue();
+ for (NUdf::TUnboxedValue v; NUdf::EFetchStatus::Ok == output.Fetch(v);)
+ Outputs->Data.emplace_back(std::move(v));
- Y_UNUSED(WaitForSpecificEvent<TEvPrivate::TEvReadDone>());
+ Outputs.reset();
+ } catch (const yexception& err) {
+ Send(ComputeActorId, new IDqSourceActor::TEvSourceError(InputIndex, TIssues{TIssue(err.what())}, true));
+ return;
}
void ProcessUnexpectedEvent(TAutoPtr<IEventHandle>) final {
@@ -364,28 +356,93 @@ private:
TOutput::TPtr Outputs;
};
-class TS3ReadCoroActor : public TActorCoro, public IDqSourceActor {
+class TS3ReadCoroActor : public TActorCoro {
public:
- using TPath = std::tuple<TString, size_t>;
- using TPathList = std::vector<TPath>;
-
TS3ReadCoroActor(THolder<TS3ReadCoroImpl> impl,
+ IHTTPGateway::TPtr gateway,
+ const TString& url,
+ const IHTTPGateway::THeaders& headers,
+ const TString& path,
+ const std::size_t expectedSize,
+ TOutput::TPtr outputs)
+ : TActorCoro(THolder<TActorCoroImpl>(impl.Release()))
+ , Gateway(std::move(gateway))
+ , Url(url)
+ , Headers(headers)
+ , Path(path)
+ , ExpectedSize(expectedSize)
+ , Outputs(std::move(outputs))
+ {}
+private:
+ static void OnNewData(TActorSystem* actorSystem, TActorId selfId, IHTTPGateway::TContent&& data) {
+ actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvReadResult(std::move(data), 0)));
+ }
+
+ static void OnDownloadFinished(TActorSystem* actorSystem, TActorId selfId, std::optional<TIssues> result) {
+ if (result)
+ actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvReadError(TIssues{*result}, 0)));
+ else
+ actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvReadFinished(0)));
+ }
+
+ TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parent) {
+ Gateway->Download(Url + Path,
+ Headers, ExpectedSize,
+ std::bind(&TS3ReadCoroActor::OnNewData, TActivationContext::ActorSystem(), self, std::placeholders::_1),
+ std::bind(&TS3ReadCoroActor::OnDownloadFinished, TActivationContext::ActorSystem(), self, std::placeholders::_1));
+
+ return TActorCoro::AfterRegister(self, parent);
+ }
+
+ static IHTTPGateway::THeaders MakeHeader(const TString& token) {
+ return token.empty() ? IHTTPGateway::THeaders() : IHTTPGateway::THeaders{TString("X-YaCloud-SubjectToken:") += token};
+ }
+
+ const IHTTPGateway::TPtr Gateway;
+
+ const TString Url;
+ const IHTTPGateway::THeaders Headers;
+ const TString Path;
+ const std::size_t ExpectedSize;
+ TOutput::TPtr Outputs;
+};
+
+class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public IDqSourceActor {
+public:
+ TS3StreamReadActor(
+ const TTypeEnvironment& typeEnv,
+ const IFunctionRegistry& functionRegistry,
ui64 inputIndex,
IHTTPGateway::TPtr gateway,
const TString& url,
const TString& token,
TPathList&& paths,
- TOutput::TPtr outputs)
- : TActorCoro(THolder<TActorCoroImpl>(impl.Release()))
- , InputIndex(inputIndex)
+ TString format,
+ TString rowType,
+ const NActors::TActorId& computeActorId
+ ) : TypeEnv(typeEnv)
+ , FunctionRegistry(functionRegistry)
, Gateway(std::move(gateway))
+ , InputIndex(inputIndex)
+ , ComputeActorId(computeActorId)
, Url(url)
, Headers(MakeHeader(token))
, Paths(std::move(paths))
- , Outputs(std::move(outputs))
+ , Format(format)
+ , RowType(rowType)
+ , Outputs(std::make_shared<TOutput>())
{}
- static constexpr char ActorName[] = "S3_READ_ACTOR_CORO";
+ void Bootstrap() {
+ for (size_t pathInd = 0; pathInd < Paths.size(); ++pathInd) {
+ const TPath& path = Paths[pathInd];
+ auto impl = MakeHolder<TS3ReadCoroImpl>(TypeEnv, FunctionRegistry, InputIndex, ComputeActorId, Paths.size(), Format, RowType, Outputs);
+ RegisterWithSameMailbox(MakeHolder<TS3ReadCoroActor>(std::move(impl), Gateway, Url, Headers, std::get<TString>(path), std::get<std::size_t>(path), Outputs).Release());
+ };
+ }
+
+ static constexpr char ActorName[] = "S3_READ_ACTOR";
+
private:
void SaveState(const NDqProto::TCheckpoint&, NDqProto::TSourceState&) final {}
void LoadState(const NDqProto::TSourceState&) final {}
@@ -393,60 +450,48 @@ private:
ui64 GetInputIndex() const final { return InputIndex; }
i64 GetSourceData(NKikimr::NMiniKQL::TUnboxedValueVector& buffer, bool& finished, i64) final {
+ if (!Outputs)
+ return 0LL;
+
i64 total = Outputs->Data.size();
std::move(Outputs->Data.begin(), Outputs->Data.end(), std::back_inserter(buffer));
Outputs->Data.clear();
if (Outputs.unique()) {
finished = true;
- Send(SelfId(), new TEvPrivate::TEvReadDone);
+ Outputs.reset();
}
return total;
}
- void PassAway() final {
-//Todo return TActorCoro::PassAway();
- }
-
-
- static void OnNewData(TActorSystem* actorSystem, TActorId selfId, IHTTPGateway::TContent&& data, size_t pathInd) {
- actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvReadResult(std::move(data), pathInd)));
- }
-
- static void OnDownloadFinished(TActorSystem* actorSystem, TActorId selfId, std::optional<TIssues> result, size_t pathInd) {
- if (result)
- actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvReadError(TIssues{*result}, pathInd)));
- else
- actorSystem->Send(new IEventHandle(selfId, TActorId(), new TEvPrivate::TEvReadFinished(pathInd)));
- }
-
- TAutoPtr<IEventHandle> AfterRegister(const TActorId& self, const TActorId& parent) {
- for (size_t pathInd = 0; pathInd < Paths.size(); ++pathInd) {
- const auto& path = Paths[pathInd];
- Gateway->Download(Url + std::get<TString>(path),
- Headers, std::get<size_t>(path),
- std::bind(&TS3ReadCoroActor::OnNewData, TActivationContext::ActorSystem(), self, std::placeholders::_1, pathInd),
- std::bind(&TS3ReadCoroActor::OnDownloadFinished, TActivationContext::ActorSystem(), self, std::placeholders::_1, pathInd));
- };
-
- return TActorCoro::AfterRegister(self, parent);
+ // IActor & IDqSourceActor
+ void PassAway() override { // Is called from Compute Actor
+ TActorBootstrapped<TS3StreamReadActor>::PassAway();
}
static IHTTPGateway::THeaders MakeHeader(const TString& token) {
return token.empty() ? IHTTPGateway::THeaders() : IHTTPGateway::THeaders{TString("X-YaCloud-SubjectToken:") += token};
}
-
- const ui64 InputIndex;
+private:
+ const TTypeEnvironment& TypeEnv;
+ const IFunctionRegistry& FunctionRegistry;
const IHTTPGateway::TPtr Gateway;
+ const ui64 InputIndex;
+ const NActors::TActorId ComputeActorId;
+
const TString Url;
const IHTTPGateway::THeaders Headers;
const TPathList Paths;
- const TOutput::TPtr Outputs;
+ const TString Format, RowType, Compression;
+
+ TOutput::TPtr Outputs;
};
+} // namespace
+
std::pair<NYql::NDq::IDqSourceActor*, IActor*> CreateS3ReadActor(
const TTypeEnvironment& typeEnv,
const IFunctionRegistry& functionRegistry,
@@ -463,7 +508,7 @@ std::pair<NYql::NDq::IDqSourceActor*, IActor*> CreateS3ReadActor(
for (auto i = 0; i < params.GetPath().size(); ++i)
map.emplace(params.GetPath().Get(i).GetPath(), params.GetPath().Get(i).GetSize());
- TS3ReadActor::TPathList paths;
+ TPathList paths;
paths.reserve(map.size());
if (const auto taskParamsIt = taskParams.find(S3ProviderName); taskParamsIt != taskParams.cend()) {
NS3::TRange range;
@@ -481,10 +526,8 @@ std::pair<NYql::NDq::IDqSourceActor*, IActor*> CreateS3ReadActor(
const auto credentialsProviderFactory = CreateCredentialsProviderFactoryForStructuredToken(credentialsFactory, token);
const auto authToken = credentialsProviderFactory->CreateProvider()->GetAuthInfo();
- if (params.HasFormat()) {
- auto outputs = std::make_shared<TOutput>();
- auto impl = MakeHolder<TS3ReadCoroImpl>(typeEnv, functionRegistry, inputIndex, computeActorId, paths.size(), params.GetFormat(), params.GetRowType(), outputs);
- const auto actor = new TS3ReadCoroActor(std::move(impl), inputIndex, std::move(gateway), params.GetUrl(), authToken, std::move(paths), outputs);
+ if (params.HasFormat() && params.HasRowType()) {
+ const auto actor = new TS3StreamReadActor(typeEnv, functionRegistry, inputIndex, std::move(gateway), params.GetUrl(), authToken, std::move(paths), params.GetFormat(), params.GetRowType(), computeActorId);
return {actor, actor};
} else {
const auto actor = new TS3ReadActor(inputIndex, std::move(gateway), params.GetUrl(), authToken, std::move(paths), computeActorId, retryConfig);