Editing the extractor code


Every new custom extractor module that you create in the Integration Studio uses a code template that contains some auto-generated code pieces for your convenience.

To understand the working of an extractor, review the following example: 

  1. Initial Code: Defining the extractor, connect, extraction, and disconnect methods.

    Example 1: Initial Code
    package etl.extractor;

    import com.neptuny.cpit.etl.Conf;
    import com.neptuny.cpit.etl.DataSetList;
    import com.neptuny.cpit.etl.extractor.Extractor;

    public class MyExtractorE extends Extractor {
          @Override
           public void connect(Conf cf) throws Exception {
                
    #...connection stuff goes here...
           }
          @Override
           public DataSetList extract() throws Exception {
    #...extraction stuff goes here...

    return null;
           }
          @Override
           public void disconnect() throws Exception {      
    #...disconnection stuff goes here...

           }
    }
  2. An important operation is connecting to the database using the connect method. The following example illustrates how to do this:

    Example 2: Connecting to the database
     /**  
    * Connect to data source  
    * param configuration (Conf)
    */

        @Override
       public void connect(Conf cf) throws Exception{
           this.conf=cf;
           String dburl=conf.getProperty(Conf.PROP_EDBURL); //extract.database.url
           String dbusr=conf.getProperty(Conf.PROP_EDBUSR); //extract.database.user
           String dbpwd=conf.getProperty(Conf.PROP_EDBPWD); //extract.database.password
           dbif = new DBInterface();
           String drv = conf.getProperty(Conf.PROP_EDBDRV); //extract.database.driver
           if(drv==null || "".equals(drv)){
               drv=DBInterface.ORADRV; //If not specified, use ORACLE JDBC driver by default
            }
           try {
               //Connecting to database...
               this.dbif.connect(drv,dburl,dbusr,dbpwd);
            } catch (Exception e) {
               Log.err(e, ETLErrorCode.ETL_FAIL_WRONG_DB_CONN_PARAM);
               throw e;
            }
           Log.put("Connected to database "+dburl,1);
        

    In this example, an instance of the DBInterface method has been used to create the connection and the configuration has been stored in two keys, CONF and DBIF, of the object stack.

    Note

    The extract method is used to perform the query and apply the business logic to extracted data.

  3. After a connection is established, the extractor needs to prepare the output datasets. The following example illustrates how this is done:

    Example 3: Preparing output datasets

    If you want to extract the Event count by set metric and the subobject name populated with the order type, you can go to the ETL Datasets view, since this metric belongs to the WKLGEN (Generic Business Driver Metrics) dataset. Hence, the output dataset can be built using the following code:

     

    @Override
       public DataSetList extract() throws Exception {
           DataSetList dsList = new DataSetList();
           
           if(dbif==null) return dsList;
                 
           DataSet res = new DataSet("WKLGEN");
           this.getConf().getDefChecker().initializeColumns(res);
           
           Log.put("Extracting from data source...");
           dbif.prepare("SELECT timestamp,ordercode,count FROM orders ORDER BY timestamp")
     DataSet extractedSample = dbif.executeQuery();
           
           for(int i=0;i<extractedSample.size();i++){
               String[] row = extractedSample.getRow(i);
               
               String ts      = row[0];
               String op      = row[1];
               String vol     = row[2];
               String hourStr    = ts.substring(11,13);
               int hour = Integer.parseInt(hourStr);
               
               //business logic here
               if (hour<5){
                   op = "TEST";
                } else if ("A12".equals(op)){
                   op = "CREDIT_CARD_PAYMENT";
                } else if (op.startsWith("B")){
                   op = "STOCKS";
                } else {
                   op = "OTHER";
                }
                
               //populate the dataset
               String[] datasetrow = res.newRow();
               res.fillRow("TS",ts,datasetrow);
               res.fillRow("WKLDNM","Orders",datasetrow);
               res.fillRow("DURATION","3600",datasetrow);
               res.fillRow("BYSET_EVENTS",vol,datasetrow);
               res.fillRow("SUBOBJNM",op,datasetrow);
               res.addRow(datasetrow);
               
            }
           
           Log.put("Extraction done. Extracted "+res.size());

    Note

    This code example reloads the entire ORDERS table during each run.

  4. The next step is to manually implement an append operation. With parsers, the ETL automatically renames previously parsed files and calls the parse method to parse new files. In case of an extractor, you need to do this manually. 
    The logical choice is to use the lastcounter parameter as the timestamp and save it in the DSStatus object. The following example illustrates the code to add before the query:

    Example 4: Manually implement an append operation
     //get data source status (for last counter)
           DSStatus status = this.getConf().getStatus();
           if(status==null){
               status = new DSStatus();
               status.setLastCntr("ORDERS","2016-01-01 00:00:00");
               this.getConf().insertStatus(status);
            }
           String lastCounter = status.getLastCntr("ORDERS");
    dbif.prepare("SELECT timestamp,ordercode,count FROM orders WHERE timestamp>? ORDER BY timestamp");
           dbif.setTSParam(1, lastCounter);
  5. Next, you need to update the lastcounter parameter to the final value of its ts variable ( the maximum timestamp value in the output dataset). The following example illustrates how you can do this:

    Example 5: Update the _lastcounter_ parameter
     // update lastcounter with last extracted sample ts...
            status.setLastCntr("ORDERS", newLastCounter);
           
            status.setMsg("ORDERS", "Extract OK. " + res.size() + " records extracted");
            this.getConf().updateStatus(status);
  6. Last, remember to implement the disconnect method to release (disconnect) the connection. The following example illustrates how to do this:

    Example 6: Implement the _disconnect_ method
    /** * Disconnect from data source */
        @Override
       public void disconnect() throws Exception {
           if(dbif!=null){
               dbif.disconnect();
            }
        }

Where to go from here

Activating a custom extractor module

 

Tip: For faster searching, add an asterisk to the end of your partial query. Example: cert*