TimeMachine: Evaluating metrics in the past


#1

(Following up on @bachr’s suggestion to create a separate topic off Get normalized timeseries based off data that was available before a given date .)

Here is a prototype of what we need in a customer project: reevaluate a metric “in the past,” i.e., ignoring data points added after a certain given date. (Of course, assuming data points can be added out of order.)

For measurements, we use dataVersion to store both

  1. “real” data version
  2. approximate data point creation timestamp

We pass a timestamp in the past to a custom normalizer’s normalize function via timeseries header, in particular by setting

  1. earliest to null
  2. latest to that timestamp

before the call. So using the above two pieces of information (dataVersion and latest), normalize removes data points that did not exist at the given timestamp.

We would like to

  1. get feedback, including wrt. non-measurement data treatment
  2. get suggestions on how to isolate a call to evalMetric[s]At from an ongoing normalization using latest information available

Thanks

TimeMachine.c3typ

type TimeMachine {

    /**
     * Passes 'when' down to timeseries headers before normalization, and
     * invalidates last normalization
     * @param spec is the same as for {@link evalMetric}
     * @param when is the time in the past to normalize then-available data
     * @return spec
     */
    prepareNormalization: function(spec: EvalMetricSpec, when: datetime): EvalMetricSpec js server

    /**
     * Passes 'when' down to timeseries headers before normalization, and
     * invalidates last normalization
     * @param spec is the same as for {@link evalMetrics}
     * @param when is the time in the past to normalize then-available data
     * @return spec
     */
    prepareNormalizations: function(spec: EvalMetricsSpec, when: datetime): EvalMetricsSpec js server

    /**
     * Like {@link evalMetric} but pruning data points created after 'when'
     * @param obj identifies the source type on which to call {@link evalMetric}
     * @param spec is the same as for {@link evalMetric}
     * @param when is the time in the past to normalize then-available data
     * @return the result of calling {@link evalMetric} in the past
     */
    evalMetricAt: function(obj: MetricEvaluatable, spec: EvalMetricSpec, when: datetime): Timeseries js server

    /**
     * Like {@link evalMetrics} but pruning data points created after 'when'
     * @param obj identifies the source type on which to call {@link evalMetrics}
     * @param spec is the same as for {@link evalMetrics}
     * @param when is the time in the past to normalize then-available data
     * @return the result of calling {@link evalMetrics} in the past
     */
    evalMetricsAt: function(obj: MetricEvaluatable, spec: EvalMetricsSpec, when: datetime): EvalMetricsResult js server
}

TimeMachine.js

var log = C3.logger('c3.TimeMachine');

function collectLeaves(root, path) {
    if (_.isEmpty(path)) {
        return [root];
    }
    var head = path[0];
    var tail = path.slice(1);
    if (head in root) {
        if (Array.isArray(root[head])) {
            return _.flatten(
                _.map(root[head], function(child) {
                    return collectLeaves(child, tail);
                }),
                true);
        } else {
            return collectLeaves(root[head], tail);
        }
    } else { // hope it is a filter, just skip it TODO filter w/ eval
        return collectLeaves(root, tail);
    }
}

function prepareNormalizations(spec, when) {
    var headers = [];
    var sms = _.flatten(
        _.map(spec.expressions, function(e) {
            return CompoundMetric.listSimpleMetrics(e);
        }),
        true);
    sms.forEach(function(sm) {
        if (!sm.srcType) return; // TODO should not happen
        var tp = c3Type(sm.srcType);
        spec.ids.forEach(function(id) {
            var srcs = tp.fetch({
                filter: Filter.eq('id', id),
                include: sm.path,
                limit: 1
            }).objs;
            if (_.isEmpty(srcs)) return;
            headers = headers.concat(
                collectLeaves(srcs[0], sm.path.split('.')));
        });
    });
    headers.forEach(function(h) {
        h = h.get();
        h.earliest = null; // hint to normalize() to use h.latest
        h.latest = when;
        h.upsert(); // TODO upsertBatch
    });
    PointPhysicalMeasurementSeries.refreshNormalization({
        ids: _.pluck(headers, 'id'),
        async: false
    });
    return spec;
}

function prepareNormalization(spec, when) {
    var spec1 = spec.clone();
    spec1.id = null;
    spec1.ids = [spec.id];
    spec1.expression = null;
    spec1.expressions = [spec.expression];
    prepareNormalizations(spec1, when);
    return spec;
}

function evalMetricsAt(obj, spec, when) {
    return obj.type().evalMetrics(prepareNormalizations(spec, when));
}

function evalMetricAt(obj, spec, when) {
    return obj.type().evalMetric(prepareNormalization(spec, when));
}

TimedValue, Treatment, duplicates, overlapping, time travel