Details on data integration from/via S3

#21

I cannot get a file to reload even with status: initial (fsc is persisted with jmsDisabled:true):

0 Likes

#22

FileSourceCollection.get(ctp).
putField(‘jmsDisabled’, true).

You want to get that FSC into a variable, use setAt() to set the attribute value, then merge() it back into C3.

1 Like

#23

OK, the jmsDisabled is a field on FileSourceCollection, not individual files obtained via listFiles()._a. (It is not intuitive; it seems that there is synchronization going on via the safeUlr() method, without a direct dependency from FileSourceCollection to S3File to SourceFile.) However, there is another problem, probably related to backwards compatibility. If I do new style:

fsc = FileSourceCollection.get('CanonicalSmartCSeller');
fs = fsc.listFiles()._a[0];
sf = SourceFile.syncFile(fs); // returns a `SourceFile` only if created, otherwise null, so:
sf = SourceFile.get('s3://engie--stage-engiegridops3/fs/c3/c3/inbox/CanonicalSmartCSeller/inbox/CanonicalSmartCSeller_s00313_20180618.csv');
sf.resume(); // status becomes "initial"
sf.process(); // not passing the `async` argument, processing works

Now “old style”, trying to process synchronously:

fsc.jmsDisabled = true;
fsc.merge();
fsc = fsc.get();
VLog.strIP('{x}', {x:fsc});
==>
"{
  "type": "FileSourceCollection",
  "meta": {
    "tenantTagId": 33,
    "tenant": "engie-vertuoz",
    "tag": "test2",
    "created": "2019-01-24T10:51:32.000Z",
    "createdBy": "authorizer",
    "updated": "2019-02-04T15:31:49.000Z",
    "updatedBy": "aleksandar.bakic@c3iot.com",
    "timestamp": "2019-02-04T15:31:49.000Z"
  },
  "id": "CanonicalSmartCSeller",
  "version": 4,
  "name": "CanonicalSmartCSeller",
  "source": {
    "typeName": "CanonicalSmartCSeller"
  },
  "sourceSystem": {
    "id": "Legacy"
  },
  "oldSyncMode": "MANUAL",
  "syncMode": "MANUAL",
  "jmsDisabled": true,
  "typeIdent": "FILE",
  "userUpdatedFields": [
    "jmsDisabled"
  ]
}"
sf = SourceFile.get('s3://engie--stage-engiegridops3/fs/c3/c3/inbox/CanonicalSmartCSeller/inbox/CanonicalSmartCSeller_s00313_20180618.csv'); // status is "completed"
sf.resume(); // status is "initial"
sf.proces(); // regardless of the `async` argument, status becomes "rejected"
==>
"[Data parsing fails. See log., Invalid value path 'B2B;COMMERCIALISATEUR;EC;ec;s00313<*>' at 35 for type CanonicalSmartCSeller]"

I believe this is related to the legacy header, but am not sure how to fix it.

0 Likes

#24

Is there any way to use the same csv file in both the old (no chunking, sync) and new (chunking, async) integration style?

If not, is there a way to wait until a file is processed in the new (chunking, async) style but without busy-waiting? (I see the following used in TestApi:

function sleepImpl(seconds) {
  var start = new DateTime().getMillis();
  while((new DateTime().getMillis() - start) < seconds * 1000) {}
}

but we’d prefer a method not as wasteful.)

Thanks

0 Likes

#25

There is an afterProcess on FileSourceCollection that you can provide. But this will be called after every individual chunk

   /**
    * Triggers the action in the lambda function after each individual abstract content
    * is processed. It is currently implemented only for FileSourceCollection.
    * The source collection may be used to obtain any parameters necessary for the
    * "after-processing" of the content.
    * @param sourceCollection
    *    the source collection from which parameters are needed for after-processing
    * @param content
    *    the AbstractContent object that may be used in after-processing
    */
   afterProcess: Lambda<function (sourceCollection: SourceCollection)> schema name 'APRO'
0 Likes

#26

Thanks; I do not see how to block-wait for those callbacks.
I will give a stab at WorkflowExportStep/WorkflowImportStep with S3 (not sure if I should use S3File or SourceFile list, will try both) again in the meantime, the doc for the latter says A workflow step that performs an import and waits for its completion.

PS. The measurements being loaded can be processed all together, there are no dependencies among them. But we cannot start processing before they are all integrated (that is why the Workflow use). The timing is tight, we cannot use CronJob.

0 Likes

Useful Workflow example
#27

It’s possible that the error disappears when AsyncProcessingDisabled is false. If true, could you please explain what could be going on?
Thanks

EDIT. We did things in the following order:

  1. asked the ops to clean cache on the cluster
  2. set AsyncProcessingDisabled to false
  3. loaded canonicals up to the one resulting in MeterAsset, and it worked fine
  4. set AsyncProcessingDisabled to true
  5. loaded canonicals like above but for another site, and we got the error
  6. set AsyncProcessingDisabled to false
  7. reloaded the previous canonical file for the same site, and still got the error
  8. loaded canonicals like above but for a third site, and it worked fine
0 Likes

#28

@rohit.sureka when you say afterProcess will be called after every chunk, how does it work if when I write a function that moves the file to archive folder like below:

afterProcess: {
    language: "javascript",
    implementation: function f(CanonicalAccountAttribute, file) {
            var destFile = file.url.replace('inbox','archive');
            file.move(destFile);
    }
  }

Does this move each individual chunk or the entire file?

0 Likes

#29

@venkata.paruchuri, in the above example of afterProcess lambda function what exactly does the “file” parameter consists? From the definition of the afterProcess lambda it says it’s abstract content.

var sourceFile = SourceFile.get(file.id) doesn’t seem to be working.
Is there a way I can get the SourceFile instance of this?

I tried this,
function f(sc, file) {var sourceFile = SourceFile.fetch({filter:“sourceCollectionId == '” + sc.id + “’”}).objs[0]; But not sure if it always get consistent result when there are multiple entries for SourceFile for this SourceCollection.

0 Likes

#30

“file” parameter takes the instance of the file on which it is working. You do not have to fetch the SourceFile object separately. It will have the file instance on which it is working.

When you say SourceFile.get(file.id) doesnt seem to be working? what is happening? get method takes the id of SourceFile instance

0 Likes

#31

Thanks Venkat! We just came to know that the content parameter has been removed in 7.9 and that’s why it isn’t working for me. We were told it will be fixed in the next upgrade.

0 Likes