aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgvit <gvit@ydb.tech>2023-07-26 20:53:38 +0300
committergvit <gvit@ydb.tech>2023-07-26 20:53:38 +0300
commit4a6b7d9f8b16bcaad55e6ede1fcdc196e700b0d2 (patch)
treeef489fc6e353b18d6f447c5ed211509fd4bbe5f4
parent681be68af347d7732a3eb87ccc76251d90c1f822 (diff)
downloadydb-4a6b7d9f8b16bcaad55e6ede1fcdc196e700b0d2.tar.gz
fix output channel initialization KIKIMR-18855
-rw-r--r--ydb/library/yql/dq/runtime/dq_output_channel.cpp28
1 files changed, 16 insertions, 12 deletions
diff --git a/ydb/library/yql/dq/runtime/dq_output_channel.cpp b/ydb/library/yql/dq/runtime/dq_output_channel.cpp
index f426885ce8..ce877846d9 100644
--- a/ydb/library/yql/dq/runtime/dq_output_channel.cpp
+++ b/ydb/library/yql/dq/runtime/dq_output_channel.cpp
@@ -51,7 +51,8 @@ template<bool FastPack>
class TDqOutputChannel : public IDqOutputChannel {
public:
TDqOutputChannel(ui64 channelId, NMiniKQL::TType* outputType,
- const NMiniKQL::THolderFactory& holderFactory, const TDqOutputChannelSettings& settings, const TLogFunc& logFunc)
+ const NMiniKQL::THolderFactory& holderFactory, const TDqOutputChannelSettings& settings, const TLogFunc& logFunc,
+ NDqProto::EDataTransportVersion transportVersion)
: ChannelId(channelId)
, OutputType(outputType)
, BasicStats(ChannelId)
@@ -60,7 +61,7 @@ public:
, Width(OutputType->IsMulti() ? static_cast<NMiniKQL::TMultiType*>(OutputType)->GetElementsCount() : 1u)
, Storage(settings.ChannelStorage)
, HolderFactory(holderFactory)
- , TransportVersion(settings.TransportVersion)
+ , TransportVersion(transportVersion)
, MaxStoredBytes(settings.MaxStoredBytes)
, MaxChunkBytes(settings.MaxChunkBytes)
, ChunkSizeLimit(settings.ChunkSizeLimit)
@@ -376,16 +377,19 @@ IDqOutputChannel::TPtr CreateDqOutputChannel(ui64 channelId, NKikimr::NMiniKQL::
const NKikimr::NMiniKQL::THolderFactory& holderFactory,
const TDqOutputChannelSettings& settings, const TLogFunc& logFunc)
{
- if (settings.TransportVersion == NDqProto::EDataTransportVersion::DATA_TRANSPORT_UV_PICKLE_1_0 ||
- settings.TransportVersion == NDqProto::EDataTransportVersion::DATA_TRANSPORT_OOB_PICKLE_1_0 ||
- settings.TransportVersion == NDqProto::EDataTransportVersion::DATA_TRANSPORT_VERSION_UNSPECIFIED)
- {
- return new TDqOutputChannel<false>(channelId, outputType, holderFactory, settings, logFunc);
- } else {
- YQL_ENSURE(settings.TransportVersion == NDqProto::EDataTransportVersion::DATA_TRANSPORT_UV_FAST_PICKLE_1_0 ||
- settings.TransportVersion == NDqProto::EDataTransportVersion::DATA_TRANSPORT_OOB_FAST_PICKLE_1_0,
- "Unsupported transport version " << (ui32)settings.TransportVersion);
- return new TDqOutputChannel<true>(channelId, outputType, holderFactory, settings, logFunc);
+ auto transportVersion = settings.TransportVersion;
+ switch(transportVersion) {
+ case NDqProto::EDataTransportVersion::DATA_TRANSPORT_VERSION_UNSPECIFIED:
+ transportVersion = NDqProto::EDataTransportVersion::DATA_TRANSPORT_UV_PICKLE_1_0;
+ [[fallthrough]];
+ case NDqProto::EDataTransportVersion::DATA_TRANSPORT_UV_PICKLE_1_0:
+ case NDqProto::EDataTransportVersion::DATA_TRANSPORT_OOB_PICKLE_1_0:
+ return new TDqOutputChannel<false>(channelId, outputType, holderFactory, settings, logFunc, transportVersion);
+ case NDqProto::EDataTransportVersion::DATA_TRANSPORT_UV_FAST_PICKLE_1_0:
+ case NDqProto::EDataTransportVersion::DATA_TRANSPORT_OOB_FAST_PICKLE_1_0:
+ return new TDqOutputChannel<true>(channelId, outputType, holderFactory, settings, logFunc, transportVersion);
+ default:
+ YQL_ENSURE(false, "Unsupported transport version " << (ui32)transportVersion);
}
}