#include "spack_v1.h"
#include "varint.h"
#include "compression.h"

#include <library/cpp/monlib/encode/buffered/string_pool.h>
#include <library/cpp/monlib/exception/exception.h>
#include <library/cpp/monlib/metrics/histogram_collector.h>
#include <library/cpp/monlib/metrics/metric.h>

#include <util/generic/yexception.h>
#include <util/generic/buffer.h>
#include <util/generic/size_literals.h>
#include <util/stream/format.h>

#ifndef _little_endian_
#error Unsupported platform
#endif

namespace NMonitoring {
    namespace {
#define DECODE_ENSURE(COND, ...) MONLIB_ENSURE_EX(COND, TSpackDecodeError() << __VA_ARGS__)

        constexpr ui64 LABEL_SIZE_LIMIT = 128_MB;

        ///////////////////////////////////////////////////////////////////////
        // TDecoderSpackV1
        ///////////////////////////////////////////////////////////////////////
        class TDecoderSpackV1 {
        public:
            TDecoderSpackV1(IInputStream* in, TStringBuf metricNameLabel)
                : In_(in)
                , MetricNameLabel_(metricNameLabel)
            {
            }

            void Decode(IMetricConsumer* c) {
                c->OnStreamBegin();

                // (1) read header
                size_t readBytes = In_->Read(&Header_, sizeof(Header_));
                DECODE_ENSURE(readBytes == sizeof(Header_), "not enough data in input stream to read header");

                ui8 version = ((Header_.Version >> 8) & 0xff);
                DECODE_ENSURE(version == 1, "versions mismatch (expected: 1, got: " << +version << ')');

                DECODE_ENSURE(Header_.HeaderSize >= sizeof(Header_), "invalid header size");
                if (size_t skipBytes = Header_.HeaderSize - sizeof(Header_)) {
                    DECODE_ENSURE(In_->Skip(skipBytes) == skipBytes, "input stream unexpectedly ended");
                }

                if (Header_.MetricCount == 0) {
                    // emulate empty stream
                    c->OnStreamEnd();
                    return;
                }

                // if compression enabled all below reads must go throught decompressor
                auto compressedIn = CompressedInput(In_, DecodeCompression(Header_.Compression));
                if (compressedIn) {
                    In_ = compressedIn.Get();
                }

                TimePrecision_ = DecodeTimePrecision(Header_.TimePrecision);

                const ui64 labelSizeTotal = ui64(Header_.LabelNamesSize) + Header_.LabelValuesSize;

                DECODE_ENSURE(labelSizeTotal <= LABEL_SIZE_LIMIT, "Label names & values size of " << HumanReadableSize(labelSizeTotal, SF_BYTES)
                    << " exceeds the limit which is " << HumanReadableSize(LABEL_SIZE_LIMIT, SF_BYTES));

                // (2) read string pools
                TVector<char> namesBuf(Header_.LabelNamesSize);
                readBytes = In_->Load(namesBuf.data(), namesBuf.size());
                DECODE_ENSURE(readBytes == Header_.LabelNamesSize, "not enough data to read label names pool");
                TStringPool labelNames(namesBuf.data(), namesBuf.size());

                TVector<char> valuesBuf(Header_.LabelValuesSize);
                readBytes = In_->Load(valuesBuf.data(), valuesBuf.size());
                DECODE_ENSURE(readBytes == Header_.LabelValuesSize, "not enough data to read label values pool");
                TStringPool labelValues(valuesBuf.data(), valuesBuf.size());

                // (3) read common time
                c->OnCommonTime(ReadTime());

                // (4) read common labels
                if (ui32 commonLabelsCount = ReadVarint()) {
                    c->OnLabelsBegin();
                    ReadLabels(labelNames, labelValues, commonLabelsCount, c);
                    c->OnLabelsEnd();
                }

                // (5) read metrics
                ReadMetrics(labelNames, labelValues, c);
                c->OnStreamEnd();
            }

        private:
            void ReadMetrics(
                    const TStringPool& labelNames,
                    const TStringPool& labelValues,
                    IMetricConsumer* c)
            {
                for (ui32 i = 0; i < Header_.MetricCount; i++) {
                    // (5.1) types byte
                    ui8 typesByte = ReadFixed<ui8>();
                    EMetricType metricType = DecodeMetricType(typesByte >> 2);
                    EValueType valueType = DecodeValueType(typesByte & 0x03);

                    c->OnMetricBegin(metricType);

                    // TODO: use it
                    ReadFixed<ui8>(); // skip flags byte

                    auto metricNameValueIndex = std::numeric_limits<ui32>::max();
                    if (Header_.Version >= SV1_02) {
                        metricNameValueIndex = ReadVarint();
                    }

                    // (5.2) labels
                    ui32 labelsCount = ReadVarint();
                    DECODE_ENSURE(Header_.Version >= SV1_02 || labelsCount > 0, "metric #" << i << " has no labels");
                    c->OnLabelsBegin();
                    if (Header_.Version >= SV1_02) {
                        c->OnLabel(MetricNameLabel_, labelValues.Get(metricNameValueIndex));
                    }
                    ReadLabels(labelNames, labelValues, labelsCount, c);
                    c->OnLabelsEnd();

                    // (5.3) values
                    switch (valueType) {
                        case EValueType::NONE:
                            break;
                        case EValueType::ONE_WITHOUT_TS:
                            ReadValue(metricType, TInstant::Zero(), c);
                            break;
                        case EValueType::ONE_WITH_TS: {
                            TInstant time = ReadTime();
                            ReadValue(metricType, time, c);
                            break;
                        }
                        case EValueType::MANY_WITH_TS: {
                            ui32 pointsCount = ReadVarint();
                            for (ui32 i = 0; i < pointsCount; i++) {
                                TInstant time = ReadTime();
                                ReadValue(metricType, time, c);
                            }
                            break;
                        }
                    }

                    c->OnMetricEnd();
                }
            }

            void ReadValue(EMetricType metricType, TInstant time, IMetricConsumer* c) {
                switch (metricType) {
                case EMetricType::GAUGE:
                    c->OnDouble(time, ReadFixed<double>());
                    break;

                case EMetricType::IGAUGE:
                    c->OnInt64(time, ReadFixed<i64>());
                    break;

                case EMetricType::COUNTER:
                case EMetricType::RATE:
                    c->OnUint64(time, ReadFixed<ui64>());
                    break;

                case EMetricType::DSUMMARY:
                    c->OnSummaryDouble(time, ReadSummaryDouble());
                    break;

                case EMetricType::HIST:
                case EMetricType::HIST_RATE:
                    c->OnHistogram(time, ReadHistogram());
                    break;

                case EMetricType::LOGHIST:
                    c->OnLogHistogram(time, ReadLogHistogram());
                    break;

                default:
                    throw TSpackDecodeError() << "Unsupported metric type: " << metricType;
                }
            }

            ISummaryDoubleSnapshotPtr ReadSummaryDouble() {
                ui64 count = ReadFixed<ui64>();
                double sum = ReadFixed<double>();
                double min = ReadFixed<double>();
                double max = ReadFixed<double>();
                double last = ReadFixed<double>();
                return MakeIntrusive<TSummaryDoubleSnapshot>(sum, min, max, last, count);
            }

            TLogHistogramSnapshotPtr ReadLogHistogram() {
                double base = ReadFixed<double>();
                ui64 zerosCount = ReadFixed<ui64>();
                int startPower = static_cast<int>(ReadVarint());
                ui32 count = ReadVarint();
                // see https://a.yandex-team.ru/arc/trunk/arcadia/infra/yasm/stockpile_client/points.cpp?rev=r8593154#L31
                // and https://a.yandex-team.ru/arc/trunk/arcadia/infra/yasm/common/points/hgram/normal/normal.h?rev=r8268697#L9
                // TODO: share this constant value
                Y_ENSURE(count <= 100u, "more than 100 buckets in log histogram: " << count);
                TVector<double> buckets;
                buckets.reserve(count);
                for (ui32 i = 0; i < count; ++i) {
                    buckets.emplace_back(ReadFixed<double>());
                }
                return MakeIntrusive<TLogHistogramSnapshot>(base, zerosCount, startPower, std::move(buckets));
            }

            IHistogramSnapshotPtr ReadHistogram() {
                ui32 bucketsCount = ReadVarint();
                auto s = TExplicitHistogramSnapshot::New(bucketsCount);

                if (SV1_00 == Header_.Version) { // v1.0
                    for (ui32 i = 0; i < bucketsCount; i++) {
                        i64 bound = ReadFixed<i64>();
                        double doubleBound = (bound != Max<i64>())
                                ? static_cast<double>(bound)
                                : Max<double>();

                        (*s)[i].first = doubleBound;
                    }
                } else {
                    for (ui32 i = 0; i < bucketsCount; i++) {
                        double doubleBound = ReadFixed<double>();
                        (*s)[i].first = doubleBound;
                    }
                }


                // values
                for (ui32 i = 0; i < bucketsCount; i++) {
                    (*s)[i].second = ReadFixed<ui64>();
                }
                return s;
            }

            void ReadLabels(
                const TStringPool& labelNames,
                const TStringPool& labelValues,
                ui32 count,
                IMetricConsumer* c)
            {
                for (ui32 i = 0; i < count; i++) {
                    auto nameIdx = ReadVarint();
                    auto valueIdx = ReadVarint();
                    c->OnLabel(labelNames.Get(nameIdx), labelValues.Get(valueIdx));
                }
            }

            TInstant ReadTime() {
                switch (TimePrecision_) {
                    case ETimePrecision::SECONDS:
                        return TInstant::Seconds(ReadFixed<ui32>());
                    case ETimePrecision::MILLIS:
                        return TInstant::MilliSeconds(ReadFixed<ui64>());
                }
                Y_FAIL("invalid time precision");
            }

            template <typename T>
            inline T ReadFixed() {
                T value;
                size_t readBytes = In_->Load(&value, sizeof(T));
                DECODE_ENSURE(readBytes == sizeof(T), "no enough data to read " << TypeName<T>());
                return value;
            }

            inline ui32 ReadVarint() {
                return ReadVarUInt32(In_);
            }

        private:
            IInputStream* In_;
            TString MetricNameLabel_;
            ETimePrecision TimePrecision_;
            TSpackHeader Header_;
        }; // class TDecoderSpackV1

#undef DECODE_ENSURE
    } // namespace

    EValueType DecodeValueType(ui8 byte) {
        EValueType result;
        if (!TryDecodeValueType(byte, &result)) {
            throw TSpackDecodeError() << "unknown value type: " << byte;
        }
        return result;
    }

    bool TryDecodeValueType(ui8 byte, EValueType* result) {
        if (byte == EncodeValueType(EValueType::NONE)) {
            if (result) {
                *result = EValueType::NONE;
            }
            return true;
        } else if (byte == EncodeValueType(EValueType::ONE_WITHOUT_TS)) {
            if (result) {
                *result = EValueType::ONE_WITHOUT_TS;
            }
            return true;
        } else if (byte == EncodeValueType(EValueType::ONE_WITH_TS)) {
            if (result) {
                *result = EValueType::ONE_WITH_TS;
            }
            return true;
        } else if (byte == EncodeValueType(EValueType::MANY_WITH_TS)) {
            if (result) {
                *result = EValueType::MANY_WITH_TS;
            }
            return true;
        } else {
            return false;
        }
    }

    ETimePrecision DecodeTimePrecision(ui8 byte) {
        ETimePrecision result;
        if (!TryDecodeTimePrecision(byte, &result)) {
            throw TSpackDecodeError() << "unknown time precision: " << byte;
        }
        return result;
    }

    bool TryDecodeTimePrecision(ui8 byte, ETimePrecision* result) {
        if (byte == EncodeTimePrecision(ETimePrecision::SECONDS)) {
            if (result) {
                *result = ETimePrecision::SECONDS;
            }
            return true;
        } else if (byte == EncodeTimePrecision(ETimePrecision::MILLIS)) {
            if (result) {
                *result = ETimePrecision::MILLIS;
            }
            return true;
        } else {
            return false;
        }
    }

    EMetricType DecodeMetricType(ui8 byte) {
        EMetricType result;
        if (!TryDecodeMetricType(byte, &result)) {
            throw TSpackDecodeError() << "unknown metric type: " << byte;
        }
        return result;
    }

    bool TryDecodeMetricType(ui8 byte, EMetricType* result) {
        if (byte == EncodeMetricType(EMetricType::GAUGE)) {
            if (result) {
                *result = EMetricType::GAUGE;
            }
            return true;
        } else if (byte == EncodeMetricType(EMetricType::COUNTER)) {
            if (result) {
                *result = EMetricType::COUNTER;
            }
            return true;
        } else if (byte == EncodeMetricType(EMetricType::RATE)) {
            if (result) {
                *result = EMetricType::RATE;
            }
            return true;
        } else if (byte == EncodeMetricType(EMetricType::IGAUGE)) {
            if (result) {
                *result = EMetricType::IGAUGE;
            }
            return true;
        } else if (byte == EncodeMetricType(EMetricType::HIST)) {
            if (result) {
                *result = EMetricType::HIST;
            }
            return true;
        } else if (byte == EncodeMetricType(EMetricType::HIST_RATE)) {
            if (result) {
                *result = EMetricType::HIST_RATE;
            }
            return true;
        } else if (byte == EncodeMetricType(EMetricType::DSUMMARY)) {
            if (result) {
                *result = EMetricType::DSUMMARY;
            }
            return true;
        } else if (byte == EncodeMetricType(EMetricType::LOGHIST)) {
            if (result) {
                *result = EMetricType::LOGHIST;
            }
            return true;
        } else if (byte == EncodeMetricType(EMetricType::UNKNOWN)) {
            if (result) {
                *result = EMetricType::UNKNOWN;
            }
            return true;
        } else {
            return false;
        }
    }

    ui8 EncodeCompression(ECompression c) noexcept {
        switch (c) {
            case ECompression::IDENTITY:
                return 0x00;
            case ECompression::ZLIB:
                return 0x01;
            case ECompression::ZSTD:
                return 0x02;
            case ECompression::LZ4:
                return 0x03;
            case ECompression::UNKNOWN:
                return Max<ui8>();
        }
        Y_FAIL(); // for GCC
    }

    ECompression DecodeCompression(ui8 byte) {
        ECompression result;
        if (!TryDecodeCompression(byte, &result)) {
            throw TSpackDecodeError() << "unknown compression alg: " << byte;
        }
        return result;
    }

    bool TryDecodeCompression(ui8 byte, ECompression* result) {
        if (byte == EncodeCompression(ECompression::IDENTITY)) {
            if (result) {
                *result = ECompression::IDENTITY;
            }
            return true;
        } else if (byte == EncodeCompression(ECompression::ZLIB)) {
            if (result) {
                *result = ECompression::ZLIB;
            }
            return true;
        } else if (byte == EncodeCompression(ECompression::ZSTD)) {
            if (result) {
                *result = ECompression::ZSTD;
            }
            return true;
        } else if (byte == EncodeCompression(ECompression::LZ4)) {
            if (result) {
                *result = ECompression::LZ4;
            }
            return true;
        } else {
            return false;
        }
    }

    void DecodeSpackV1(IInputStream* in, IMetricConsumer* c, TStringBuf metricNameLabel) {
        TDecoderSpackV1 decoder(in, metricNameLabel);
        decoder.Decode(c);
    }

}