Details on data integration from/via S3


#1

We just upgraded to 7.8.4.3-1, here are some questions.

  1. Is the automated data integration via Tibco now also going via S3? (We have discovered already integrated files in “s3://” when we took a direct look into S3 for the first time.)
  2. If the answer to 1. is “yes”:
    1. Is all backwards compatible? We are investigating a problem related to the use of character + in file names, it might be due to S3.
    2. What else has changed?
  3. sync*:
    1. What is the story behind the sync operation on SourceFile?
    2. Why do we have to do it before processing a file in S3?
  4. In one test, I created a File list (files in S3) on a FileSourceCollection (for one type CT) and did syncFile on them. I would like to call process on selected files that have not been integrated yet (even if there are both old and new files in my list for the given type CT). I called SourceFile.processAll() once and got files for other types in DataLoadUploadLog and DataLoadProcessLog, too. Perhaps it’s SourceFile.processBatch that I should call on the synced files.
    1. Is sync making note of whether a file has been integrated (I could not see that by inspection)?
    2. Do we have to move away (manually, as it seems that they are not moved away after calling process) the files in S3, that we have integrated, like we do in SFTP?

Thanks


#2
  1. Yes and no. Tibco still ingests files from SFTP and delivers them to C3 via HTTP. However C3 persists the file received from Tibco in S3.
  2. Yes, it is supposed to be fully backwards compatible. I believe that there is a regression with + that is being worked on.
  3. SourceFile is a type in C3 that stores meta information about files in the file store (S3). Without a record in SourceFile, C3 platform is not aware of the existence of the file and will not know to process it. Hence we use syncAll (invoked in a variety of ways) to populate SourceFile with information about what all is in S3.
    4.1.processAll() with no parameters will process everything the platform is aware of. As you said, you want processBatch() if you are working with a specific list.
    4.2. No, we should not move files anywhere, the intent is for S3 to be a read-only location that is never altered in any way. Since SourceFile tracks which files were processed, it should not be necessary to manipulates the files themselves.

#3

Thanks! Regarding 3, I suppose the meta info is in field status (or perhaps fileInfo). How can I “rollback” the status if I want to reload/integrate a file into C3?


#4

I did some tests: when I run SourceFile.processAll() a few times, objects in DataLoadUploadLog and DataLoadProcessLog are updated each time.
I am not sure how to make sure that if a SourceFile has been integrated, it will not be reintegrated. If I do:

fsc = FileSourceCollection.get('CanonicalSmartCSeller')
fs = fsc.listFiles()._a
sfs = _.map(fs, function(f) { return SourceFile.syncFile(f); }) // second time: [null, null]

the last line works the first time only. So then I can use SourceFile.fetch() instead.
If sf = sfs[0] then SourceFile.processBatch([sf]) will do the processing as many times as called. I suppose it is the syncFile where the exactly once semantics is placed.
Please let me know if I misunderstood something, and how to rollback a file for reintegration, so that syncFile returns a SourceFile again.


#5

Hmm, it seems that removing a persisted SourceFile object does the trick (rollback): sync* will create in DB and return these objects only if they are not already in DB.
However, syncAll() in my case returns 42 FileSystemBatchJobs but produces only 19 SourceFile objects. EDIT: There are 19 canonical files uploaded and 42 FileSourceCollection objects in total.


#6

Next, I noticed that the upload path I used was wrong, so I tried to delete the uploaded files. You said read-only – is there a way to remove the uploaded files at all?
Further, I did FileSourceCollection.clearCollection() and removed all the 42 objects. Then I provisioned twice, with and without -r, but FileSourceCollection.fetchCount() is still 0 (no objects, indeed). All this is in local docker server v7.8.4.3-1. How do I recreate the FileSourceCollection data? BTW, wc -l seed/FileSourceCollection/FileSourceCollection.json returns 22, not 42.


#7

you can use SourceFile.resume* to reload a file


#8

if you want to sync a specific FileSourceCollection, you should use FileSourceCollection.sync()


#9

Thanks! I see now the doc for resume: " Updates the status of SourceFile to initial and #sync them". Who would guess by the name, we are used to pause/resume. How about an alias, something like redo, retry?


#10

And for removing the uploaded files, FileSourceCollection has the ability to archive files using the processInbox method but I don’t think thats what you are looking for here. I believe here you want to use the FileSystem apis to remove those files, deleteFiles specifically.

Also, provisioning does not affect seed data. You will need to use the SeedData apis to remove the user changes.


#11

Thanks. I am trying also c3 prov data.
EDIT: c3 prov data does not recreate removed FileSourceCollection objects; I even used -f .../FileSourceCollection.csv.
I found FileSourceCollection.createSeedData() but I need to make the objects myself. Is there a way to reload the csv file?


#12

You can add a filter by status to processAll().

Yes, it is completely fine to remove a file if you know it is bad. It is not actually read only, and deleteFiles() will work. What I meant is - we’d rather not manipulate the files we got from the customer - no modifying, no reshuffling into subfolders. This makes conversations with the customer more fact-based: “here is exactly what you sent us”.


#13

Eventually, I recreated docker image :slight_smile: and got all the 19 objects after provisioning.


#14

Now, a question about SourceFile.processBatch and its async param.
We load some referential data in a specific order, and it works when we curl directly to the import API, even with S3 behind. However, when I target the file API and only copy files to S3, and then invoke, in the same multi-phase style, SourceFile.processBatch on all files in the same phase, I get an error:

Rejected:1:Error while transforming: {Transformation Type:TransformCanonicalSmartCMeterToServicePointMeterAsset, Target Type:ServicePointMeterAsset, errorMsg:wrapped org.mozilla.javascript.JavaScriptException: Error: the denormParents field (AssetHierarchyDenorm) has not yet been correctly denormalized for the FixedAsset with id 's00313-b001-gen001' 

even though I can see in DataLoadProcessLog that the above FixedAssed was created before the error. Do I need to add some more pause if { async: false } as the second argument to SourceFile.processBatch() is not enough?


#15

I would recommend a review of the overall design here. One of C3’s core design principles is that integration should never rely on sequencing. We expect files to arrive out of order and be processed out of order, however at the end everything should converge to a deterministic consistent state.


#16

Thanks, we will try to fix that. At the same time, if we sequence the calls to processBatch in the same way as when we use the import API, why do we get that error?


#17

@garrynigel thoughts? The error does look odd.


#18

if you’re using chunking, then process async will wait for chunking to complete and then return back. Data integration after that happens asynchronously.

if you want full file loading to complete you might not need to do chunking(jmsDisabled = true).
As Yaro mentioned the data integration on sequencing is a bad idea, and I concur, it could be a timing issue.


#19

Regarding 1, it seems that files from Tibco that are persisted by C3, are persisted in the same place where FileSourceCollection maps them to. Perhaps the two (Tibco-sourced and FileSourceCollection/file API) should be separated; is there a recommended way? Also, are those persisted files from Tibco persisted only temporarily?


#20

Thanks, @garrynigel! A few more details, please: if I understand correctly, I need to set jmsDisabled to true someplace in order to make processBatch/async:false wait until the completion of processing. I found field jmsDisabled in FileSourceCollection but am not sure how to add it to the following code that imports data:

    _.each(phases, function(phase) { // phase is a list of canonical type names
        var files = _.reduce(
            phase,
            function(acc, ctp) {
                return acc.concat(
                    _.filter(
                        FileSourceCollection.get(ctp).
                            putField('jmsDisabled', true). // does not seem to help: the result is the same
                            listFiles()._a,
                        siteFilter));
            },
            []);
        if (files.length)
            SourceFile.processBatch(
                SourceFile.syncFileBatch(files),
                { async: false });
    });

Perhaps I need to update the FileSourceCollection object whose jmsDisabled I set to true above?