Building a custom normalizer on top of default normalizer



I want to write a custom normalizer to further process my data before it is stored. However I’m not intending to re-implement the 7 normalization steps from C3, I just wish to process the C3 normalization output.

The following approach doesn’t work (because Normalizer.normalize does not exist) but shows what I’d like to do:

// MyNormalizer.js (mixes Normalizer)

function normalize(objs, spec) {
    var ts = Normalizer.normalize(objs, spec);
    // ...additional processing...

    return ts;

How can I achieve this? 7.6 documentation has a link to custom-normalization.c3doc but this page does not exist.




What is the logic you’re trying to accomplish? When you say “custom normalizer” to C3 that means you want to produce normalized data. processing normalized data sounds like a Metric to me. You could also consider writing a customer expression engine function as this answer: Parametric metrics?



Hi Riley,

We get an index from a sensor every minute, but the sensor might get reset and start over from 0 (whereas the true index we are tracking is increasing and is never reset). To improve performances we want to store a normalized series that filters out those resets (by doing a rolling sum of the rolling diff). I picture this as an extension of the normalization process as it’s about storing clean data that we can actually use.

Using metrics (as done today) requires us to perform operations at the source data grain (in our case, one minute) which is very slow and requires to load up to 526k measurements in memory for a year worth of data. Storing a true normalized series (with the guarantee that the reconstructed index is indeed increasing) would let us get rid of the minute grain that is hammering performances.

Other approaches are welcome but our measurements flow every minute and add to months/years of recordings, therefore storing the clean, reconstructed index (as this is the actual physical value we are interested in) seems essential.



I would argue that the “true” measurement is the number that the meter reports, and that the index without a reset (also called a register) is a computed value.

That being said, your point is valid, you want to move a run-time operation (the rollingDiff) to a data-load-time operation to improve performance. I think this is not supported IN NORMALIZATION right now, but i will get more eyes on this.

What you probably want to do is to set the ‘cache’ rules on the metric which computes the growing index. Lets say you have a SimpleMetric called “ComputedIndexConsumption” which does the slow computation you wish to improve. Setting its “cache” field will result in the metric getting computed one time then cached for future calls. See SimpleMetric.cache and the type SimpleMetricCache.



The reason I’m worried with this approach is that data is coming every minute. Hence I’m afraid that the SimpleMetric cache will be invalidated as soon as I push a new measurement into Cassandra. I was hoping to implement incremental normalization to make sure not to recompute a year worth of minute-grain rollingDiff whenever there is a new value (since all the previous values are still valid).

I need to access this data at least daily for all customers and I’d like to avoid renormalizing / recomputing everything at each fetch (this metric won’t be called multiple time within a minute, but it will be called multiple times in between new measurements). Is the SimpleMetric cache smart enough to reuse previous values?



Point of Information, have you seen bad performance already? What numbers are you getting? We currently do this sort of math on 15min interval (not minute) and we get good performance.



Currently, you can normalize the series on the fly (without persisting) using the api “normalize” on the time series header. This will output a map<string, Timeseries> which will contain the tsField -> Timeseries (normalized).

While you are focussed on providing the most optimal solution (which is great and the right attitude), do we have enough data on what the current numbers look like with the approach that riley suggested. Reason is, we might end up doing a lot of pre-mature optimization which might not be worth your time. What riley is suggesting should be a quick check and that will give us enough data to go ahead with the custom normalizer.

Having said that, if you do decide to go ahead with writing the custom normalizer and DO NOT want the final normalized values for each tsField but want results of individual steps in the normalization pipeline, you can use methods like:

  1. GrainDetector.detectGrain
  2. GSplitter.align
  3. HeaderType.normalizationUnit
    etc to get individual results.


In case its not clear, with rohit’s suggestion you would NEED to reimplement all 7 steps of normalization. However, the current implementation of those steps is just to call the APIs that rohit listed and so you could implement your steps that you don’t wish to customize by calling the same functions.

(i still recommend you start with the naive approach, adding more complex improvements as you find performance degrading)

  1. Try a metric with no caching… if its slow then…
  2. Try adding a cache spec to the metric… if its still slow then…
  3. Try writing your own normalizer


Thank you both for your suggestions. We have performance issues with the v5 application currently deployed. The evalMetric takes 3 to 8 seconds for a year worth of data at the month interval and we strongly suspect that having to evaluate it at the minute grain is causing this. It takes up to 30s when data is being loaded in parallel.

Therefore, we want to properly design our v7 application to avoid replicating suboptimal patterns. I understand that overriding normalization might not the right way to do this − but if we want to benchmark it, do you have an exhaustive list of the individual steps you started listing, rohit?

In order to obtain the consolidated index from the numbers the meter reports, we currently compute the rollingDiff and then filter out negative values, but that’s a bad approximation when data is missing (and it’s slow).

What we’d really want is something like this:
rollingDiff(eval('MINUTE', interpolate(rolling('SUM', rollingDiff(NormalizedIndex, 0.01), 'LINEAR', 'MISSING'))))
rolling(rollingDiff()) is used to get rid of the resets and reconstruct an increasing index, interpolate() to smoothen the consumption when data is missing, and the outer rollingDiff to get back to the (approximated) consumption.
This is out of the question considering our current performance issues, hence the need to come up with a better scheme. Maybe a custom expression engine function could help but I don’t know if we’ll miss C3 optimizations by doing so.

Anyway, if we could store the reconstructed index as the normalized value, we could evaluate this timeseries with 15 or 30mn interval (currently impossible because we might miss a reset in the middle) and I suspect it would greatly reduce the response time of this evalMetric.

Further suggestions are welcome, meanwhile I’ll follow the highlighted steps :slight_smile:



This seems like it is similar to RegisterNormalizer where we track increasing register values with resetThreshold, rollOverMax properties. Have you looked at that custom normalizer to see if we can re-use / extend that to meet your needs?

Also, you can look at the implementation of RegisterNormalizer to get an idea about how you can implement your own normalizer.



I have looked at it but I figured it could not work in our case since we have resets (it is not a monotonically increasing timeseries, even though the actual underlying value is monotonically increasing). I’ll have another look to see if it could fit our needs.



The resets are controlled through the resetThreshold, so for example if your meter resets at a count of 1000 (going back to 0 then counting up again) you would set resetThreshold as 1000



I’ve spent some time reviewing the RegisterNormalizer and it is exactly what we expect.

However each time we add a measurement and run a metric on the series, the engine spends between 1 & 3 minutes normalizing (yes, even if I only add a single measurement). This can’t be used in production, so I am missing something here?

[Edit: what makes it absolutely unsuitable for prod is that once new data comes in, any evalmetrics on this series is blocking and lasts minutes − I wouldn’t care so much if normalization happened in the background but it seems that the series can’t be referenced when a normalization is in progress]




How is it scaling? You said 1-3minutes for 1 measurement. Is it 10-30 minutes for 10 measurements? (seems unlikely). I imagine there’s a bit of overhead to start the normalization process at all, but that once it gets going its pretty quick on a per-measurement basis

What kind of latency is required for your use case? e.g. how many meters will you have, how much data will they send, and at what frequency? what is the usage pattern? Do users need to see the data in real time? Are time-critical alerts being generated? Real-time costs $$$, and often a few hours later is good enough.



We are integrating 1-mn data every 15mn, ie. 15 measurements/user every 15 minutes. Our tests are performed with 100 users (but the product will have a few dozen thousands users).

Adding 15 samples to one user (whose series was previously normalized) and then calling an evalMetric triggers a 1-3 minutes long normalization (with the default normalizer & no interpolation it is between 10 & 20s). Once it’s done the metric goes back to standard evaluation times… but 15 mn later it happens again.

In our standard use case, the user will call an evalMetric on a series that has not been normalized for a few hours or days, and it seems that his request will last at least 10s (default normalizer) up to a few minutes (register normalizer), which makes our service unusable in terms of UX.
I believe this might be because C3 normalizes a full month worth of data even though there are only a few new measurements ?

Latency can be discussed (not a few hours, but 15 to 30mn can be a reasonable trade off). I wouldn’t mind if the normalization happened in the background and if the evalMetrics did not use the latest normalized data right away, but the problem I’m seeing here is that the normalization “freezes” the timeseries and make it unavailable for evaluation.



I’m surprised, because Normalization is SUPPOSED to happen in the background… i would ask @rohit.sureka to chime in: “is there a way they might have turned auto-normalization off?”



What I see on the action tree of Cluster.actionDump() is that my evalMetric action actually spans a normalization, which in turn spans a call to RegisterNormalizer.normalize



Here’s what happens today:

  1. When new data is loaded, we invalidate the state of the normalized series to be invalid (default mode) & don’t normalize the series right then. When a series is evaluated, the engine sees that the data is not normalized and triggers normalization to return the fresh data to the user. (which is what you are observing)

There are other modes in which this could potentially be avoided. You can turn on incremental normalization as following:

  1. Set tenant config as follows:

TenantConfig.upsert({id:“Normalize”, value:“ALL”})

What this will do is, whenever new data is loaded, only the part of the data that has changed will be incrementally normalized. This is done by marking the normalization state as invalid & placing an entry in the NormalizationQueue. Do note that: before the queue entry has been picked up, if the series is evaluated the series will be normalized at that time (causing the slow down).

What we’re hearing is, it is ok for the user to get stale data on evaluation by trigger normalization in background. This is NOT implemented in the platform and could be considered as a reasonable feature. Do file a ticket with the services team for this feature and this will be incorporated into the product roadmap.



Thanks Rohit for the detailed explanation.
I thought incremental normalization was not implemented (the link to the incremental normalization topic in the doc is broken), but if it is and if it only normalizes the new measurements, it will probably be a significant performance boost. Will test asap.

[Edit: however I hope that when pushing 15 measurements to a single timeseries in a single pass, I won’t trigger 15 normalizations?]



Incremental Normalization is implemented but NOT enabled by default. In order to enable it we have to set the tenant config as mentioned above:

TenantConfig.upsert({id:“Normalize”, value:“ALL”})