Useful Workflow example


#1

Could someone share (the skeleton of) a Workflow script example that has some interesting bits?
For example, it is not clear whether a job passed to WorflowMapReduceStep.create should already be persisted. Not to mention various “previous references”, it takes time to find out what is meant by the documentation.

Thanks in advance


#2

If running locally, access help for the topic with:

http://localhost:8080/api/1/foundation/prod/documentation/topic/workflow shows examples (the batch job example should be relevant for map reduce)


#3

For some reason, I am getting an “unauthorized” error.


#4

FWIW, here is a skeleton that passes a sanity test[1]:

function createNightWork(id) {
    id = id || 'Scheduler_NightWork';
    return Workflow.createSequenceWorkflow(
        id,
        'NightWork',
        [
            createMeasurementImportStep(id),
            createMeasurementCleanStep(id),
            createPMandRCReportStep(id),
            createEvalMetricsMemoStep(id)
        ]
    );
}

function createMeasurementImportStep(id) {
    return WorkflowImportStep.make({
        workflow: id,
        name: 'MeasurementImport',
        spec: { import: true },
        fileListField: null
    });
}

function createMeasurementCleanStep(id) {
    var now = new DateTime();
    return WorkflowMapReduceStep.make({
        workflow: id,
        name: 'MeasurementCleaning',
        mapReduceJob: MeasurementCleaningJob.create({
            startDate: new DateTime(now.plusDays(-1).toString('yyyy-MM-dd')),
            endDate: new DateTime(now.toString('yyyy-MM-dd')),
            targetType: PointPhysicalMeasurementSeries,
            include: 'indexMax,differential,asset,name,unitConstraint,' +
                'treatment,multiplier,measurementType,interval,interpolator',
            filter: Filter.eq('asset.profile', 'GGAZ').and.
                eq('measurementType', 'volume')
        })
    });
}

function createPMandRCReportStep(id) {
    var now = new DateTime();
    var start = now.plusDays(-1);
    var ids = _.pluck(Facility.fetch({
        filter: Filter.eq('facilityType', 'SITE')
    }).objs, 'id');
    var prev = PointMeasurementReportHeader.fetch({
        order: 'descending(end)',
        limit: 1
    }).objs[0];
    var pmrh = PointMeasurementReportHeader.create({
        start: start,
        end: now,
        prev: prev ? { id: prev.id } : null
    });
    return WorkflowMapReduceStep.make({
        workflow: id,
        name: 'PointMeasurementAndRawConsumptionReports',
        mapReduceJob: MeasurementReportJob.create({
            startIns: start,
            endIns: now,
            header: pmrh,
            targetType: Facility,
            include: 'id, startIns, endIns, header, targetType, include, filter',
            filter: Filter.intersects('id', ids)
        })
    });
}

function createEvalMetricsMemoStep(id) {
    var now = new DateTime();
    var start = now.plusDays(-1);
    var emasm = EvalMetricsAtSpec_M.make({
        evalAt: now,
        srcType: 'Facility',
        start: start,
        end: now,
        interval: 'DAY'
    });
    emasm.id = emasm.hash();
    var emas = EvalMetricsAtSpec.make({
        ids: ['s00313'],
        expressions: ['Sum_IPR_Ratio_NI_Heating'],
        memo: emasm,
        read_memo: {}
    });
    var spec = MetricCachingJob.make({ // adds write_memo: true
        emasm: emasm,
        expressions: 'Sum_IPR_Ratio_NI_Heating'
    });
    return WorkflowMapReduceStep.make({
        workflow: id,
        name: 'MemoedEvalMetrics',
        mapReduceJob: MetricCachingJob.create(spec)
    });
}

Should something be coded differently?

[1] There is a strange error in the end, when trying

WorkflowStepRun.removeAll(
    Loggs.strIP('step.id == {stId}', { stId: st.id }));

(the function strIP interpolates its first string argument using the second map argument):

AfterAll ActionError: wrapped RuntimeException: Field 'myWf_MeasurementCleaning' does not exist in expression: 'meta.tenantTagId == 27 && ((startsWith(typeIdent, 'STEP')) && (step.id == myWf_MeasurementCleaning))' for type 'WorkflowStepRun'.

If the WorkflowStepRun has not been created yet, shouldn’t removeAll just return 0?

Thanks


#5

It looks like it is missing the quotes and treats myWf_MeasurementCleaning as a variable. If it was

&& (step.id == 'myWf_MeasurementCleaning'))

, it would likely work.


#6

Exactly, it should be 'step.id == "{stId}"'.


#7

Not sure why Loggs.strIP is necessary, but the Filter helper helps to avoid such bugs:

Filter.eq('step.id', st.id)

#8

Agreed, I used it there by inertia. However, removeAll does not work with Filter, only plain strings.


#9

Hum, perhaps it does work with Filter, I will try (I saw in old code removeAll(Filter.... that should work).


#10

Can you just try to navigate from the console/help menu?


#11

I found it; I had thought it is something else, that example is included in the server.
I would like to know if, for example, WorkflowImportStep is useful; until then, I will write an action.

Thanks


#12

WorkflowImportSpec is useful if you can specify the import spec when creating the step and a previous step provides the FileList to import.


#13

Thanks. I tried to tweak the client code around it by creating a fake WorkflowExportStep but then I lacked info on how to make that. In the meantime, I am close to having a WorkflowActionStep that integrates all files of a given canonical type in S3. I am trying to find out how to mark what has already been integrated.


#14

Here is another attempt (continuation from Details on data integration from/via S3). I figured out that a file list is a FileList object.

The new relevant Scheduler.js code is:

function createNightWork(spec) {
    spec.workflowId = spec.workflowId || 'Scheduler_NightWork';
    return Workflow.createSequenceWorkflow(
        spec.workflowId,
        'NightWork',
        [
            createMeasurementExportStep(spec),
            createMeasurementImportStep(spec),
            createMeasurementCleanStep(spec),
            createMandCReportStep(spec)
        ]
    );
}

function createMeasurementExportStep(spec) {
    var fsc = FileSourceCollection.get('CanonicalSmartCMeasurement');
    return WorkflowExportMeasurementStep.make({
        workflow: spec.workflowId,
        name: 'MeasurementExport',
        spec: BatchExportSpec.make({
            targetType: CanonicalSmartCMeasurement
        }),
        exportedFiles: FileList.make({
            urls: _.map(
                _.filter(
                    fsc.listFiles()._a,
                    function(f) { return f.url.includes('test'); }),
                function(f) { return f.url; })})
    });
}

function createMeasurementImportStep(spec) {
    return WorkflowImportStep.make({
        workflow: spec.workflowId,
        name: 'MeasurementImport',
        spec: BatchImportSpec.make({
            targetType: CanonicalSmartCMeasurement
        }),
        fileListField: 'exportedFiles'
    });
}

function createNightWork(spec) {
    spec.workflowId = spec.workflowId || 'Scheduler_NightWork';
    return Workflow.createSequenceWorkflow(
        spec.workflowId,
        'NightWork',
        [
            createMeasurementExportStep(spec),
            createMeasurementImportStep(spec),
            createMeasurementCleanStep(spec),
            createMandCReportStep(spec)
        ]
    );
}

and the new WorkflowExportMeasurementStep.c3typ is:

entity type WorkflowExportMeasurementStep mixes WorkflowExportStep {

    exportedFiles: FileList schema suffix 'EXPFLS'
}

Now I test at the console:

wf = Scheduler.createNightWork({ workflowId: 'impWf' })
wfs = wf.start()

and after a couple of minutes, I get:

VLog.strIP('{x}', { x: WorkflowStepRun.fetch({filter:'startsWith(id,"impWf")'}) })
14:12:15.130 "{
  "type": "FetchResult<WorkflowStepRun>",
  "objs": [
    {
      "type": "WorkflowStepRun",
      "step": {
        "id": "impWf_MeasurementExport"
      },
      "status": {
        "status": "initial"
      },
      "workflowRun": {
        "id": "impWf_9df26a9b-53a5-4429-8571-b31f20006987"
      },
      "started": "2019-02-05T13:10:46.000Z",
      "id": "impWf_MeasurementExport_c6f03b9d-389a-43d6-9b81-6239a74090bf",
      "version": 1,
      "typeIdent": "STEP:BATCH:EXPORT"
    }
  ],
  "count": 1,
  "hasMore": false
}"

and

VLog.strIP('{x}', { x: WorkflowRun.fetch({filter:'startsWith(id,"impWf")'}) })
14:12:26.790 "{
  "type": "FetchResult<WorkflowRun>",
  "objs": [
    {
      "type": "WorkflowRun",
      "workflow": {
        "id": "impWf"
      },
      "currentSteps": {
        "impWf_MeasurementExport": {
          "type": "WorkflowStep",
          "id": "impWf_MeasurementExport"
        }
      },
      "status": {
        "errors": [
          "wrapped NullPointerException\n\n    at c3.service.exportimport.ExportMethods.validateSpec (ExportMethods.java:304)"
        ],
        "started": "2019-02-05T13:10:47.000Z",
        "startedby": "aleksandar.bakic@c3iot.com",
        "status": "failing"
      },
      "started": "2019-02-05T13:10:46.000Z",
      "id": "impWf_9df26a9b-53a5-4429-8571-b31f20006987",
      "version": 4
    }
  ],
  "count": 1,
  "hasMore": false
}"

I “fixed” some errors before this one. For this one, I searched the doc for validateSpec and got two results: TransformValidateSpec and MetadataValidateSpec, but those do not ring a bell.

What is missing?

Thanks

PS. More info at https://engie-splunk.c3-e.com/en-GB/app/search/search?q=search%20host%3Dstage-engiegridops3-app*%20a_rid%3D"5073.128078614"&display.page.search.mode=verbose&dispatch.sample_ratio=1&earliest=1549372210&latest=1549372290&sid=1549373929.603351


#15

I am back to WorkflowActionStep:

function importMeasurements() {
    try {
    log.info('## importMeasurements start');
    var fsc = FileSourceCollection.get('CanonicalSmartCMeasurement');
    var sfs = _.filter(
        _.map(fsc.listFiles()._a,
              function(f) { return SourceFile.syncFile(f); }));
    log.info(VLog.strIP('## importMeasurements sfs:{x}', { x: sfs }));
    _.each(sfs, function(sf) { // TODO parallelize
        try {
            CanonicalSmartCMeasurement.importData({ data: sf.toFile() });
        } catch (e) {
            log.error(
                VLog.strIP('## importMeasurements error:{x}', { x: e }));
        }
    });
        log.info('## importMeasurements end');
    } catch (e) {
        log.info(VLog.strIP('## importMeasurements caught:{x}', { x: e }));
    } finally {
        log.info('## importMeasurements finally');
    }
}

and

function createMeasurementImportStep(spec) {
    return WorkflowActionStep.make({
        workflow: spec.workflowId,
        name: 'MeasurementImport',
        action: {
            typeName: 'Scheduler',
            actionName: 'importMeasurements'
        }
    });
}

Sometimes, the function importMeasurements gets executed, such as in https://engie-splunk.c3-e.com/en-GB/app/search/search?q=search%20host%3Dstage-engiegridops3-app*%20MeasurementImport%20OR%20importMeasurements&display.page.search.mode=verbose&dispatch.sample_ratio=1&earliest=1549472150&latest=1549472180&sid=1549472314.606403 . Most of the time, however, I cannot find its logs.

As for the log ## importMeasurements sfs:[], sfs should contain one SourceFile that I can obtain at the console:

Finally, when using importData, nothing gets logged unlike when using process (which logs into DataLoadProcessLog). Is there a trick to get persisted logs?

Thanks


#16

After adding this line:

        log.info(VLog.strIP('## importMeasurements fsc:{x}', { x: _.pluck(fsc.listFiles()._a, 'url') }));

Splunk outputs:

## importMeasurements fsc:[]

whereas the same line in the console outputs 161 urls. (Not a typo, I copy-pasted the line to console and tested.)


#17

@AlexBakic It’s not possible to debug this via the forum. If you have a reproducible case (including a c3base branch with your types) with repro instructions, I suggest filing a ticket.


#18

OK, I will try that later because for the moment:

  1. requirements are relaxed, we will use JMS and avoid the parsing problem (due to the missing header)
  2. I found out that whereas fsc.listFiles()._a does not work inside Workflow, just fsc.listFiles() does work.

Finally, a three-step Workflow (two WorkflowMapReduceSteps and one WorkflowActionStep) works, without waiting for JMS processing to finish.