Details on data integration from/via S3


#21

I cannot get a file to reload even with status: initial (fsc is persisted with jmsDisabled:true):


#22

FileSourceCollection.get(ctp).
putField(‘jmsDisabled’, true).

You want to get that FSC into a variable, use setAt() to set the attribute value, then merge() it back into C3.


#23

OK, the jmsDisabled is a field on FileSourceCollection, not individual files obtained via listFiles()._a. (It is not intuitive; it seems that there is synchronization going on via the safeUlr() method, without a direct dependency from FileSourceCollection to S3File to SourceFile.) However, there is another problem, probably related to backwards compatibility. If I do new style:

fsc = FileSourceCollection.get('CanonicalSmartCSeller');
fs = fsc.listFiles()._a[0];
sf = SourceFile.syncFile(fs); // returns a `SourceFile` only if created, otherwise null, so:
sf = SourceFile.get('s3://engie--stage-engiegridops3/fs/c3/c3/inbox/CanonicalSmartCSeller/inbox/CanonicalSmartCSeller_s00313_20180618.csv');
sf.resume(); // status becomes "initial"
sf.process(); // not passing the `async` argument, processing works

Now “old style”, trying to process synchronously:

fsc.jmsDisabled = true;
fsc.merge();
fsc = fsc.get();
VLog.strIP('{x}', {x:fsc});
==>
"{
  "type": "FileSourceCollection",
  "meta": {
    "tenantTagId": 33,
    "tenant": "engie-vertuoz",
    "tag": "test2",
    "created": "2019-01-24T10:51:32.000Z",
    "createdBy": "authorizer",
    "updated": "2019-02-04T15:31:49.000Z",
    "updatedBy": "aleksandar.bakic@c3iot.com",
    "timestamp": "2019-02-04T15:31:49.000Z"
  },
  "id": "CanonicalSmartCSeller",
  "version": 4,
  "name": "CanonicalSmartCSeller",
  "source": {
    "typeName": "CanonicalSmartCSeller"
  },
  "sourceSystem": {
    "id": "Legacy"
  },
  "oldSyncMode": "MANUAL",
  "syncMode": "MANUAL",
  "jmsDisabled": true,
  "typeIdent": "FILE",
  "userUpdatedFields": [
    "jmsDisabled"
  ]
}"
sf = SourceFile.get('s3://engie--stage-engiegridops3/fs/c3/c3/inbox/CanonicalSmartCSeller/inbox/CanonicalSmartCSeller_s00313_20180618.csv'); // status is "completed"
sf.resume(); // status is "initial"
sf.proces(); // regardless of the `async` argument, status becomes "rejected"
==>
"[Data parsing fails. See log., Invalid value path 'B2B;COMMERCIALISATEUR;EC;ec;s00313<*>' at 35 for type CanonicalSmartCSeller]"

I believe this is related to the legacy header, but am not sure how to fix it.


#24

Is there any way to use the same csv file in both the old (no chunking, sync) and new (chunking, async) integration style?

If not, is there a way to wait until a file is processed in the new (chunking, async) style but without busy-waiting? (I see the following used in TestApi:

function sleepImpl(seconds) {
  var start = new DateTime().getMillis();
  while((new DateTime().getMillis() - start) < seconds * 1000) {}
}

but we’d prefer a method not as wasteful.)

Thanks


#25

There is an afterProcess on FileSourceCollection that you can provide. But this will be called after every individual chunk

   /**
    * Triggers the action in the lambda function after each individual abstract content
    * is processed. It is currently implemented only for FileSourceCollection.
    * The source collection may be used to obtain any parameters necessary for the
    * "after-processing" of the content.
    * @param sourceCollection
    *    the source collection from which parameters are needed for after-processing
    * @param content
    *    the AbstractContent object that may be used in after-processing
    */
   afterProcess: Lambda<function (sourceCollection: SourceCollection)> schema name 'APRO'

#26

Thanks; I do not see how to block-wait for those callbacks.
I will give a stab at WorkflowExportStep/WorkflowImportStep with S3 (not sure if I should use S3File or SourceFile list, will try both) again in the meantime, the doc for the latter says A workflow step that performs an import and waits for its completion.

PS. The measurements being loaded can be processed all together, there are no dependencies among them. But we cannot start processing before they are all integrated (that is why the Workflow use). The timing is tight, we cannot use CronJob.


Useful Workflow example