How best to store EvalMetricsResult to Cassandra?


#1

As you know, EvalMetricsResult is a double map id -> expr -> ts, and I would like to store it to Cassandra so that I can easily fetch back the ts for any combination of (id, expr). Are there recommended patters for mapping from type system’s map to Cassandra that are applicable to this case? (I assume just creating an entity type which mixes Persistable<EvalMetricsResult> is not the recommended way, but I may be wrong.)

Thanks


#2

You can add a new type called

@db(datastore=“cassandra”)
entity type PersistedEvalMetricsResult schema name “psrevalmtrc” {

res: EvalMetricsResult

}


#3

Thanks, that looks great if only one entity type is enough to persist everything. Then, we’d have to optimize for reading individual (id, expr) pairs. (I have already written some code that also persists EvalMetricsSpec that goes together (not using mixes).)


#4

Btw this is going to be a very expensive operation. Why do you need to store the result? We should make sure the evaluation is fast enough. Also you should use the individual simple metric cache for storing simple metric results


#5

We are reimplementing some legacy metrics which also take long. We are currently at 1-2 minutes (@bachr optimized them down from about 1h, he may fill in details). The requirement is 3s, which we think is not doable, the metrics are over a whole site at once. The legacy application also uses stored results.


#6

Yes, that’s why I am asking how best to break an EvalMetricsResult into individual timeseries for each (id, expression) pair. We’re using this pair as the Cassandra partitionKeyField, and we use another one to point to the PersistedEvalMetricsResult (for each individual persisted timeseries), and some more utilities…


#7

I managed to persist an EvalMetricsResult into both Postgres and Cassandra, but am not yet able to read it back. Not sure how to go about the following error:

Unable to execute sql SELECT T.ID, T.VERSION
FROM C3_2_EMRM T
WHERE T.TENANT_TAG_ID=:v1 AND (T.ID=:v2 AND 
   EXISTS(SELECT 1 FROM C3_2_EMRTSM M
   WHERE M.TENANT_TAG_ID=:v1 AND M.EMRM_R=T.ID AND (M.PARTITION_S IN (:v3))))
ORDER BY T.ID
LIMIT 2002
ERROR: relation "c3_2_emrtsm" does not exist
  Position: 108

when calling EvalMetricsResult_M.read(emas) where emas is an EvalMetricsAtSpec.

(I did

DbAdmin.executeQuery(
"SELECT table_schema || '.' || table_name
FROM information_schema.tables
WHERE table_type = 'BASE TABLE'
  AND table_schema NOT IN ('pg_catalog', 'information_schema')",
2000)

but I could not find much interesting.)

The read function first computes partition field of EvalMetricsResultTS_M (stored in Cassandra) so that it can fill out the ts array of timeseries in the final EvalMetricsResult value.

function read(emas) {
    var emrm = EvalMetricsResult_M.fetch({
        include: 'memo, ts',
        filter: Filter.eq('id', emas.memo.hash()).
            and.intersects(
                'ts.partition',
                partitions(emas.ids, emas.expressions))
    }).objs[0];
    log.info('## emrm: {}', JSON.stringify(emrm, null, 2));
    if (emrm)
        return toEvalMetricsResult(emrm);
    else return null;
}

The types involved are (those with suffix _M are to be persisted for memoization purpose):

entity type EvalMetricsResult_M schema name 'EMRM' {
    id: string // for get, === memo.hash()
    fhash: string // for filter, === memo.filterHash()
    memo: EvalMetricsAtSpec_M
    ts: [EvalMetricsResultTS_M](emrm) // for ungrouping
    read: function(emas: EvalMetricsAtSpec): EvalMetricsResult js server
    filter: function(emas: EvalMetricsAtSpec): [EvalMetricsResult_M] js server
    write: function(emr: EvalMetricsResult, emas: EvalMetricsAtSpec): ObjList js s
}

@db(compactType=true,
    datastore='cassandra',
    partitionKeyField='partition',
    persistenceOrder='start',
    persistDuplicates=false,
    shortId=true,
    shortIdReservationRange=100000)
entity type EvalMetricsResultTS_M schema name 'EMRTSM' {
    partition: string
    emsId: string
    emsExpr: string
    emrm: EvalMetricsResult_M
    start: datetime
    end: datetime
    interval: string
    data: [double]
    estimates: [byte]
    missing: [byte]
    timeZone: string
    unit: Unit
}

type EvalMetricsAtSpec {
    memo: EvalMetricsAtSpec_M
    ids: set<string>
    expressions: set<string>
    bindings: map<string, any>
    read_memo: EvalMetricsAtReadMemo
    write_memo: boolean
    toEvalMetricsSpec: member function(): EvalMetricsSpec js server
}

Am I missing some annotations or else? Perhaps I should put EvalMetricsResult_M into Cassandra, as @rohit.sureka suggested.

Thanks

EDIT: If I add @db(datastore='cassandra') above EvalMetricsResult_M, there’s no more error, but null is returned due to intersects('ts.parition', [<existing partition field value>]).


#8

@AlexBakic As I mentioned before, it is a REALLY bad idea to store the results on the entire EvalMetricsResult. I feel you are spending your time and energy in an area where it shouldn’t be spent.

The right approach should be to:

  1. Find which simple metrics are slow
  2. Cache those simple metrics by setting the “cache” field on the metric

#9

We’re doing that for the “default mode”, but the requirements exist for checking old results, like a month earlier. We need to provide evaluation results for each day, and keep them available (or recomputable) by ignoring new data that may affect the old results (some values are manually corrected, extrapolation replaced by interpolation etc.). If I understand correctly, the cache is for the latest data only, but that is only a small part here. Of course, the requirements pose technical challenges: for example, how to produce a result under 3s for a period a month ago when the evaluation normally takes minutes? The last time we looked, it is pure CPU, no DB work (e.g., the consumption of an apartment is weighted by coefficients that are computed across all the other apartments on a site).

We tried putting "cache": { "intervals": ["DAY", "MONTH"], "monthsInPast": 12 } in json files but then it messed up our tests (test objects triggered some invalidations, we suppose). Is there a more dynamic way?

Even if it is a bad idea (usually, memoization is good), is there a way to complete that fetch? We are almost done.

Thanks


#10

If you’re adamant about storing metric results, why not implement an analytic that fires periodically and persists the results to your data store of choosing? You’re going to need to implement a bunch of logic to trigger the evaluation, functionality that is already available via stream analytics.

Storing the EvalMetricsResult object seems strange and will only introduce complexity when/if the data needs to be read. Store it as another time series rather than in object structure designed for internal use.


#11

The type EvalMetricsResultTS_M stores one timeseries, and the type EvalMetricsResult_M is just a wrapper. The real EvalMetricsResult is created upon reading all the requested timeseries, in memory. The customer plans to run cron jobs to persist results every night, as data arrive mostly once a day.
Not everything needs to be stored (the _M types do not have all the fields), but enough to keep it sane when there are different EvalMetricsSpecs, evaluations in the present vs past, when the consumption for one apartment a month ago needs to be shown as a proof (without reconstructing the whole EvalMetricsResult but only for some id-expression pairs). So, it is just glue around computed timeseries.


#12

Here is the function that reconstructs partial EvalMetricsResult in memory, after fetching requested features (id-expression pairs):

function toEvalMetricsResult(emrm) {
    var res = {};
    _.each(emrm.ts, function(ts) {
        if (!res[ts.emsId])
            res[ts.emsId] = {};
        res[ts.emsId][ts.emsExpr] = NormTimeseriesDouble.make({
            m_start: ts.start,
            m_end: ts.end,
            m_interval: ts.interval,
            m_data: ts.data,
            m_estimates: ts.estimates,
            m_missing: ts.missing,
            m_timeZone: ts.timeZone,
            m_unit: ts.unit
        });
    });
    return EvalMetricsResult.make({ result: res });
}

#13

Again, you’re reinventing code that already exists and introducing unnecessary glue code. If you store the data as a traditional timeseries you can then leverage platform functions (metrics, fetch, etc.) to access and explore the data. If you follow your current path, you need to write a custom function just to access the data and additional wrappers to perform additional handling (i.e. time series analysis) if necessary.

The approach you’re taking seems to introduce unnecessary complexity on both storage and evaluation.


#14

Could you give me some pointers to the platform functions for storing the results of evalMetrics into database so that I can find them later based on the EvalMetricsSpec used to produce them? I will then try to simplify the code and take into account the other requirements (latest/past, some/all timeseries).

Thanks


#15

As a workaround, I did two fetches and connected the results in JavaScript:

function read(emas) {
    var emrm = EvalMetricsResult_M.get(emas.memo.hash(), 'memo');
    if (emrm) {
        emrm.ts = EvalMetricsResultTS_M.fetch({
            filter: Filter.intersects(
                'partition', partitions(emas.ids, emas.expressions)).
                and.eq('emrm.id', emas.memo.hash())
        }).objs;
        return toEvalMetricsResult(emrm);
    } else return null;
}

So the API TimeMachine.evalMetrics unifies

  • the default mode, where we work with the latest data, including automatic invalidations, caching, etc. and we can also memoize results
  • the time machine mode, where we work with data in the past, ignoring invalidations after a given time point, and memoization instead of caching

It is enough not to specify fields memo.evalAt and write_memo, and it behaves as the regular evalMetrics call.


#16

@rohit.sureka, @scottk
We have the same needs for another project where we call rollupMetrics on a set of ~400 sources, with cached SimpleMetric but it takes 20secs.
On a dashboard that contains several tiles, this may lead to an overall loading time of more than 1 minute (which is unacceptable by the customer).
Is it possible to implement this mechanism in the platform? it seems to be recurrent needs to cache evalMetric/rollupMetric results.


#17

@NabilKoroghli good idea on storing results of RollupMetric in a cache. I can work on that for v7.9