Useful Workflow example


#1

Could someone share (the skeleton of) a Workflow script example that has some interesting bits?
For example, it is not clear whether a job passed to WorflowMapReduceStep.create should already be persisted. Not to mention various “previous references”, it takes time to find out what is meant by the documentation.

Thanks in advance


#2

If running locally, access help for the topic with:

http://localhost:8080/api/1/foundation/prod/documentation/topic/workflow shows examples (the batch job example should be relevant for map reduce)


#3

For some reason, I am getting an “unauthorized” error.


#4

FWIW, here is a skeleton that passes a sanity test[1]:

function createNightWork(id) {
    id = id || 'Scheduler_NightWork';
    return Workflow.createSequenceWorkflow(
        id,
        'NightWork',
        [
            createMeasurementImportStep(id),
            createMeasurementCleanStep(id),
            createPMandRCReportStep(id),
            createEvalMetricsMemoStep(id)
        ]
    );
}

function createMeasurementImportStep(id) {
    return WorkflowImportStep.make({
        workflow: id,
        name: 'MeasurementImport',
        spec: { import: true },
        fileListField: null
    });
}

function createMeasurementCleanStep(id) {
    var now = new DateTime();
    return WorkflowMapReduceStep.make({
        workflow: id,
        name: 'MeasurementCleaning',
        mapReduceJob: MeasurementCleaningJob.create({
            startDate: new DateTime(now.plusDays(-1).toString('yyyy-MM-dd')),
            endDate: new DateTime(now.toString('yyyy-MM-dd')),
            targetType: PointPhysicalMeasurementSeries,
            include: 'indexMax,differential,asset,name,unitConstraint,' +
                'treatment,multiplier,measurementType,interval,interpolator',
            filter: Filter.eq('asset.profile', 'GGAZ').and.
                eq('measurementType', 'volume')
        })
    });
}

function createPMandRCReportStep(id) {
    var now = new DateTime();
    var start = now.plusDays(-1);
    var ids = _.pluck(Facility.fetch({
        filter: Filter.eq('facilityType', 'SITE')
    }).objs, 'id');
    var prev = PointMeasurementReportHeader.fetch({
        order: 'descending(end)',
        limit: 1
    }).objs[0];
    var pmrh = PointMeasurementReportHeader.create({
        start: start,
        end: now,
        prev: prev ? { id: prev.id } : null
    });
    return WorkflowMapReduceStep.make({
        workflow: id,
        name: 'PointMeasurementAndRawConsumptionReports',
        mapReduceJob: MeasurementReportJob.create({
            startIns: start,
            endIns: now,
            header: pmrh,
            targetType: Facility,
            include: 'id, startIns, endIns, header, targetType, include, filter',
            filter: Filter.intersects('id', ids)
        })
    });
}

function createEvalMetricsMemoStep(id) {
    var now = new DateTime();
    var start = now.plusDays(-1);
    var emasm = EvalMetricsAtSpec_M.make({
        evalAt: now,
        srcType: 'Facility',
        start: start,
        end: now,
        interval: 'DAY'
    });
    emasm.id = emasm.hash();
    var emas = EvalMetricsAtSpec.make({
        ids: ['s00313'],
        expressions: ['Sum_IPR_Ratio_NI_Heating'],
        memo: emasm,
        read_memo: {}
    });
    var spec = MetricCachingJob.make({ // adds write_memo: true
        emasm: emasm,
        expressions: 'Sum_IPR_Ratio_NI_Heating'
    });
    return WorkflowMapReduceStep.make({
        workflow: id,
        name: 'MemoedEvalMetrics',
        mapReduceJob: MetricCachingJob.create(spec)
    });
}

Should something be coded differently?

[1] There is a strange error in the end, when trying

WorkflowStepRun.removeAll(
    Loggs.strIP('step.id == {stId}', { stId: st.id }));

(the function strIP interpolates its first string argument using the second map argument):

AfterAll ActionError: wrapped RuntimeException: Field 'myWf_MeasurementCleaning' does not exist in expression: 'meta.tenantTagId == 27 && ((startsWith(typeIdent, 'STEP')) && (step.id == myWf_MeasurementCleaning))' for type 'WorkflowStepRun'.

If the WorkflowStepRun has not been created yet, shouldn’t removeAll just return 0?

Thanks


#5

It looks like it is missing the quotes and treats myWf_MeasurementCleaning as a variable. If it was

&& (step.id == 'myWf_MeasurementCleaning'))

, it would likely work.


#6

Exactly, it should be 'step.id == "{stId}"'.


#7

Not sure why Loggs.strIP is necessary, but the Filter helper helps to avoid such bugs:

Filter.eq('step.id', st.id)

#8

Agreed, I used it there by inertia. However, removeAll does not work with Filter, only plain strings.


#9

Hum, perhaps it does work with Filter, I will try (I saw in old code removeAll(Filter.... that should work).


#10

Can you just try to navigate from the console/help menu?


#11

I found it; I had thought it is something else, that example is included in the server.
I would like to know if, for example, WorkflowImportStep is useful; until then, I will write an action.

Thanks


#12

WorkflowImportSpec is useful if you can specify the import spec when creating the step and a previous step provides the FileList to import.


#13

Thanks. I tried to tweak the client code around it by creating a fake WorkflowExpoetStep but then I lacked info on how to make that. In the meantime, I am close to having a WorkflowActionStep that integrates all files of a given canonical type in S3. I am trying to find out how to mark what has already been integrated.