Custom Pipeline

#1

Hi,
I’m trying to create a custom Pipeline in python which have to :

  • apply PCA
  • do something with results using a custom function
  • apply gradient boosting

I did :

  • create a PCA pipeline

  • create GBoosting pipeline

  • create a type My_custom.c3typ in this way:
    entity type My_custom extends CustomPythonMLPipe type key 'SCPCAGB'

  • create My_custom.py:

         class My_custom:
         
         def __init__(self, preProcessPipeline = None, GBPipe = None):
                 self.preProcessPipeline = preProcessPipeline
                 self.GBPipe = GBPipe
    
         def train(self, input, targetOutput=None):
    
                 prePipeline, GBPipe = retrieve_pipeline(self)
                 trained_pipe = c3.MLSerialPipeline.train(this=prePipeline,input = input)
                 pipeline_result = trained_pipe.process(input = input)
                 c3.MLSerialPipeline.merge(this= trained_pipe)
                 n = c3.MLSerialPipeline.hyperParams(this=prePipeline)['pca__n_components']['value']
                 trained_pipe = c3.MLSerialPipeline.train(this=GBPipe, input = obtain_listed_value(pipeline_result,n), targetOutput = targetOutput)
         c3.MLSerialPipeline.merge(this= trained_pipe)
    
    
    
             def process(self, input):
    
                 prePipeline = self.preProcessPipeline
                 GBPipe = self.GBPipeline
                 pipeline_result = c3.MLSerialPipeline.process(this=prePipeline,input = input)
                 result = c3.MLSerialPipeline.process(this=GBPipe, input = obtain_listed_value(pipeline_result,n))
                 return result
    
             def params(self, paramKeys=None):
                 if paramKeys is None:
                            return {"preProcessPipeline": self.preProcessPipeline}  
                 else:
                     all_param_keys = ["preProcessPipeline","GBPipe"]  
                     return {k: getattr(self, k) for k in paramKeys if (k in all_param_keys) and hasattr(self, k)}
    
             def with_params(self, params):
                 for k, v in params.iteritems():
                     setattr(self, k, v)
                 return self 
    
    

Results:

  • When I run from console the follow command :

     custom = My_custom.make(
                  name="MyPipe",
                  preProcessPipeline="prePipe_id",
                  GBPipe="GBPipe_id",
                  technique=MLTechnique.make()
               )      
    

And it returns me this error: "Write failed: Value is required for field technique in type My_custom"

What is my error?

Thank you

0 Likes

#2

I believe you need to make an actual technique. Try the following:

myTechnique = MLTechnique.make(name="mysupercooltechnique").create()

 custom = My_custom.make(
              name="MyPipe",
              preProcessPipeline="prePipe_id",
              GBPipe="GBPipe_id",
              technique=technique
           )    

Also, you might want to check if a technique already exists. See:

MetadataStore.tag().typesThatMixin("MLTechnique")
0 Likes

#3

don’t work.
I think the problem was in the logic, for this reason I changed all my code in this:

class My_custom:
import pandas as pd

def __init__(self, t_depth = 20, freq = 10):
    self.t_depth = t_depth
    self.freq = freq

def train(self, input, targetOutput=None):
    df = c3.Dataset.toPandas(dataset=input)
    # do something on df
    self.trainingScores = df.to_json()
    return self



def process(self, input):
    df = c3.Dataset.toPandas(dataset=input)
    # do something on df
    input= c3.Dataset.fromPython(pythonData=df)
    return input

def params(self, paramKeys=None):
    if paramKeys is None:
        return {"t_depth": self.t_depth , "freq" :self.freq}
    else:
        all_param_keys = ["t_depth","freq"]
        return {k: getattr(self, k) for k in paramKeys if (k in all_param_keys) and hasattr(self, k)}

def with_params(self, params):
    for k, v in params.iteritems():
        setattr(self, k, v)
    return self

And I defined my pipeline in json in this way:

{
"type": "MLSerialPipeline",
"id": "prova1",
"name": "prova1",
"steps": [    
{
	"type": "MLDatasetDatasetStep",
	"name": "My_custom_name",
	"pipe": {
		"type": "My_custom",
		"name": "mypipe",
		"technique": {
			"type": "MLTechnique",
			"hyperParameters": {
				"t_depth": 20,
        		"freq" : 20
    			}
    		}
    	}
    }
    ]
}

If I fetch this pipeline in pipe variable and I try pipe.train(ds) , this returns me the error : json.JSONEncoder.default(self, obj), TypeError: array([[ ....]]) is not JSON serializable

Some one know why?

Thank you

0 Likes

#4

I solved this error by changing the “env” in my_custom.c3typ, now the error is the following: 'MiniDataFrame' object has no attribute toJson I think in the function called here : df = c3.Dataset.toPandas(dataset=input)

0 Likes

#5

Trying to debug with you open loop here, since I don’t know what c3 server version you are using, and don’t have access to your server logs to confirm my hunch on what is going on.

I think the problem is that you are making calls to python inline functions, c3.Dataset.toPandas() and c3.Dataset.fromPython(), but if you look at the python runtime definition for CustomPythonMLPipe.train() and process(), you’ll see that it uses the “remote” connector instead of the “remote-types” connector needed for python inline functions. The details are explained in TutorialC3RemoteInlineFunctions.ipynb, which should show up by default in the “tutorials” folder if you start C3’s Jupyter service.

But to address your immediate issue, you could try to use convert_datasets_to_pandas_df(dataset), which should be available as a “mixed in python function” because your Type has PythonMLHelper as an ancestor (again, assuming you have the c3 server version after this was introduced… most likely yes). This was a way to share python code between Types that does not call back to C3. (We forgot to document that we added that function in PythonMLHelper, so just try it out to see if it exists. Will make a ticket to fix the documentation.)

To help you understand the error message you are getting, the JSON serialization error is because C3 remote python is trying to serialize the data to pass to C3. This is the default behavior on python inline functions (trying to execute a server-side python C3 action) when you use “remote” as a connector, not “remote-types”. The data right before one of the calls is a numpy array, which cannot be directly be JSON serialized and passed back to c3 server. (If most of this doesn’t make sense, don’t worry.)

You bring up a good point, that a lot of users may not know the subtle differences between remote and remote-types as connectors in the python runtime definition when writing a Type extending CustomPythonMLPipe (and want to use python inline functions, which are quite handy). We’ll make a ticket to change the python runtime definition used by CustomPythonMLPipe to use “remote-types” in a future version of c3server.

1 Like

#6

Hi Phoebus,
thank you for your explanation, I don’t think to have TutorialC3RemoteInlineFunctions.ipynb, in order to solve my issue I’m looked for the convert_datasets_to_pandas_df(dataset) function but I don’t have it in my_type, How I can call it in order to solve this issue ?

0 Likes

#7

Just saw your second message, with the error 'MiniDataFrame' object has no attribute toJson. Looks like you found a good alternative to convert_datasets_to_pandas_df(dataset), by modifying the python runtime environment.

Looks like the documentation on CustomPythonMLPipe is wrong - the input to your python class’s train() (NOT CustomPythonMLPipe.train()) is a MiniDataFrame, not a C3 Dataset. A MiniDataFrame is a barebones version of pandas Dataframe that you can index into, and you could pass into scikit-learn (something defined in PythonMLHelper to avoid loading pandas and speed up processing). But it is not a C3 Type.

So you may be able to avoid c3.Dataset.toPandas() altogether, and not bother with python inline functions.

To make your life easier, you should look at TutorialIntroMLPipeline.ipynb for an example of how to write a custom pipe (ExampleCustomPythonMLPipe at the bottom of the tutorial). Again, this is available in C3’s Jupyter service in the tutorials folder. You should ask your C3 contact how to get access to notebooks in C3’s Jupyter service if you don’t have access already.

By the way, you should not assign self.trainingScores = df.to_json(). CustomPythonMLPipe.train() will compute training scores using the metrics stored in CustomPythonMLPipe.scoringMetrics. If you really need to do custom manipulations to training scores, you would probably write a MLPipe from scratch, mixing in MLLeafPipe directly.

0 Likes

#8

The problem is that, I need dataframe because I have to use some functions on it (e.g. pandas.shift() ) which require, oviously, a pandas dataframe.
By using of only the MiniDataFrame I cannot use these functions.

Do you know how I can obtain in my train() function a pandas DataFrame?

Thank you

0 Likes

#9

One possible path forward is to convert your MiniDataFrame to a pandas Dataframe in your code, using code like:

# assume the variable "input" is a MiniDataFrame
df = pd.DataFrame(data=input.data, index=input.index, columns=input.columns)
# Do stuff with df

To convert back to MiniDataFrame (at the end of process), you would do

output = MiniDataFrame(data=df.values, index=df.index, columns=df.columns)
0 Likes

#10

Great! Works!
Only one last question, as said before I have a multi-steps pipeline, the output of training have to be the input for the train function of the following pipeline (normal sklearn pipeline).
How I can implement this functionality (a sort of fit_transform)?
In custom pipeline the returned value of train function is the object itself

0 Likes

#11

Create a pipeline combining all 3 steps (PCA -> Custom -> Gradient Boosting), where the processingFunction of the first step is transform, and the last step is predict or predict_proba (your middle step is custom, so no need to specify anything).

When calling train the pipeline will do the following:

  1. call train on pipe1(PCA)
  2. call process on pipe1 (now trained)
  3. call train on pipe2 (custom) using output of step 2.
  4. call process on pipe2 using output of step 2.
  5. call train on pipe3 using output of step 4.
  6. call process on pipe3 using output of step 4.

So calling train will do the right thing.

Note: while we fully support what you are trying to accomplish, we support exclusively linear pipelines today (as opposed to all Directed Graphs); that limitation most commonly causes trouble when you have 2 supervised steps that need to be trained on different targets. In that case you have to rely on 2 independent pipelines.

1 Like

#12

Ok,
but how can I replicate a target variable from step 1 to step 3 without apply anything (neither pipe 1 and pipe 2) ?

The target for step 3 is defined with a metric which return only 0 and 1, can I call it in custom pipeline process and add the column to the returned dataframe? (this have to be done only during the training )

0 Likes

#13

Not sure I understand… Is the target an output of pipe1, or is it a metric?

The signature of MLPipe shows that myMLPipe.train(X, y) takes an independent y, so not processed by the pipeline.

0 Likes

#14

no, the target is defined a priori before call train function in pipe1, I need to replicate it from pipe1 to pipe3.

0 Likes

#15

Ok then, as mentioned, just call train on your pipeline (obviously don’t do it on each pipe, otherwise it defeats the purpose of a pipeline…); the target will only be used in the last step.

0 Likes

#16

ok, just for recap.

y = the target is defined before the pipeline by means of an EvalMetric;
X = the dataset is defined before the pipeline by means of an EvalMetrics;

TRAIN PHASE = MyPipeLine:

  • step1 = pipe1(PCA) need only (X)
  • step2 = pipe2(Custom) need only (X)
  • step3 = pipe3(GradientBoosting) need both (X,y)

So if I run MyPipeLine.train(X,y) then y should be propagated until step3, right?

0 Likes

#17

Yes, have you tried?

0 Likes

#18

Not yet,
In train function I need to store a targetOutput as a list into the custom pipeline hyperParameters.
I write the following code:

        df_target = pd.DataFrame(data=targetOutput.data, index=targetOutput.index, columns=targetOutput.columns)
        target = list(df_target.values.flatten())
        self.n_one = sum(target)
        self.target = target
        self.n_zero = len(target) - self.n_one

but it returns me this error:
ValueError: Found input variables with inconsistent numbers of samples: [17, 18]↵"↵JSON: { ...

Obviously I declare these parameters into the params() function.

How I can store a list into the hyperParameters?

0 Likes

#19

In Machine Learning, hyperparameters refer to configuration of the hypothesis function, for example: the number of trees, or their depth, in a random forest.
targetOutput is not an hyperparameter it’s data you use for training, I don’t get what you are trying to do.

0 Likes

#20

I need to store the targetOutput given in the train function into the process function.
setting of hyperParameters n_one , n_zero works, the only problem is to set the hyperParameter target which is a list.

So my question is how I can store a list into hyperParameter during the training phase?

0 Likes