Developing a custom extractor module


Use the ETL Development Kit to develop and work with custom extractor modules. To produce data and perform extraction activities, you need to implement the abstract extract method. 

In addition, you also need to implement the following methods:

  • connect: To connect the extractor module to external sources.module to external sources.
  • disconnect: To close the open connections.



About the abstract methods

The call sequence to the methods is always connect-extract-disconnect. However, it is perfectly correct to perform the connection and disconnection inside the extract method. There is no difference in the code run, and the separation between the three phases is only for code readability.

The following example shows an implementation of the extract method:

Example

Consider a scenario where you need to extract the number of orders from the database of an Internet banking application, where orders are traced in the following ORDERS table:

Timestamp

OrderCode

Count

2023-01-01 10:00:00

A12

220

2023-01-01 10:00:00

A13

101

2023-01-01 10:00:00

B12

304

2023-01-01 10:00:00

B13

210

2023-01-01 10:00:00

C12

350

The ETL has to apply a complex decoding rule to OrderCode to obtain the required set of metrics. For instance:

  • If the timestamp is between "00:00:00" and "05:00:00", it implies that the bank teller is closed. Any operation during this period is performed by monitoring robots and is accounted as "TEST".
  • If the order code is "A12", the order is a Credit Card Payment.
  • If the order code starts with the letter "B", the order is an operation on STOCKS.
  • Else, the order has to be accounted in the OTHER category.

Even if the table structure is very simple, the set of interpretation rules to apply is complex and requires the application of a specific logic to the results of the SQL query. This is a typical situation in which a custom extractor can help.


Getting information from a JDBC database

If you are connecting to a JDBC database, make sure that the JDBC driver file (.jar file) is stored in the <Installation_directory_of_Capacity_Optimization>/etl/libext directory on the ETL server that runs the ETL task.

If you are using a database other than Oracle or Microsoft SQL Server, you must explicitly specify the JDBC driver information when you are configuring the extractor. For more information, see Generic-Database-extractor-Java.


Editing the extractor code

Every new custom extractor module that you create in the ETL Development kit uses a code template that contains some auto-generated code pieces. 

The following examples sequentially illustrate the initial code you need to write and the other operations you can perform on extractors.

  1. Write the initial extractor code. Define the extractor, connect, extraction, and disconnect methods.

    Example 1: Initial Code
    package etl.extractor;

    import com.neptuny.cpit.etl.Conf;
    import com.neptuny.cpit.etl.DataSetList;
    import com.neptuny.cpit.etl.extractor.Extractor;

    public class MyExtractorE extends Extractor {
          @Override
           public void connect(Conf cf) throws Exception {
                
    #...connection stuff goes here...
           }
          @Override
           public DataSetList extract() throws Exception {
    #...extraction stuff goes here...

    return null;
           }
          @Override
           public void disconnect() throws Exception {      
    #...disconnection stuff goes here...

           }
    }
  2. Connect to the database by using the connect method. Review the following example:

    Example 2: Connecting to the database
     /**  
    * Connect to data source  
    * param configuration (Conf)
    */

        @Override
       public void connect(Conf cf) throws Exception{
           this.conf=cf;
           String dburl=conf.getProperty(Conf.PROP_EDBURL); //extract.database.url
           String dbusr=conf.getProperty(Conf.PROP_EDBUSR); //extract.database.user
           String dbpwd=conf.getProperty(Conf.PROP_EDBPWD); //extract.database.password
           dbif = new DBInterface();
           String drv = conf.getProperty(Conf.PROP_EDBDRV); //extract.database.driver
           if(drv==null || "".equals(drv)){
               drv=DBInterface.ORADRV; //If not specified, use ORACLE JDBC driver by default
            }
           try {
               //Connecting to database...
               this.dbif.connect(drv,dburl,dbusr,dbpwd);
            } catch (Exception e) {
               Log.err(e, ETLErrorCode.ETL_FAIL_WRONG_DB_CONN_PARAM);
               throw e;
            }
           Log.put("Connected to database "+dburl,1);
        

    In this example, an instance of the DBInterface method has been used to create the connection and the configuration has been stored in two keys, CONF and DBIF, of the object stack.

  3. Prepare the output datasets. The extract method is used to perform the query and apply the business logic to the extracted data. Review the following example:

    Example 3: Preparing output datasets

    If you want to extract the Event count by set metric and the subobject name populated with the order type, you can go to the ETL Datasets view since this metric belongs to the WKLGEN (Generic Business Driver Metrics) dataset. Hence, the output dataset can be built using the following code. 


    @Override
       public DataSetList extract() throws Exception {
           DataSetList dsList = new DataSetList();
           
           if(dbif==null) return dsList;
                 
           DataSet res = new DataSet("WKLGEN");
           this.getConf().getDefChecker().initializeColumns(res);
           
           Log.put("Extracting from data source...");
           dbif.prepare("SELECT timestamp,ordercode,count FROM orders ORDER BY timestamp")
     DataSet extractedSample = dbif.executeQuery();
           
           for(int i=0;i<extractedSample.size();i++){
               String[] row = extractedSample.getRow(i);
               
               String ts      = row[0];
               String op      = row[1];
               String vol     = row[2];
               String hourStr    = ts.substring(11,13);
               int hour = Integer.parseInt(hourStr);
               
               //business logic here
               if (hour<5){
                   op = "TEST";
                } else if ("A12".equals(op)){
                   op = "CREDIT_CARD_PAYMENT";
                } else if (op.startsWith("B")){
                   op = "STOCKS";
                } else {
                   op = "OTHER";
                }
                
               //populate the dataset
               String[] datasetrow = res.newRow();
               res.fillRow("TS",ts,datasetrow);
               res.fillRow("WKLDNM","Orders",datasetrow);
               res.fillRow("DURATION","3600",datasetrow);
               res.fillRow("BYSET_EVENTS",vol,datasetrow);
               res.fillRow("SUBOBJNM",op,datasetrow);
               res.addRow(datasetrow);
               
            }
           
           Log.put("Extraction done. Extracted "+res.size());

    This code example reloads the entire ORDERS table during each run.

  4. Implement an append operation. With parsers, the ETL automatically renames previously parsed files and calls the parse method to parse new files. In the case of an extractor, you need to do this manually. 
    The logical choice is to use the lastcounter parameter as the timestamp and save it in the DSStatus object. 

    Example 4: Manually implement an append operation
     //get data source status (for last counter)
           DSStatus status = this.getConf().getStatus();
           if(status==null){
               status = new DSStatus();
               status.setLastCntr("ORDERS","2023-01-01 00:00:00");
               this.getConf().insertStatus(status);
            }
           String lastCounter = status.getLastCntr("ORDERS");
    dbif.prepare("SELECT timestamp,ordercode,count FROM orders WHERE timestamp>? ORDER BY timestamp");
           dbif.setTSParam(1, lastCounter);
  5. Update the lastcounter parameter to the final value of its ts variable ( the maximum timestamp value in the output dataset). 

    Example 5: Update the _lastcounter_ parameter
     // update lastcounter with last extracted sample ts...
            status.setLastCntr("ORDERS", newLastCounter);
           
            status.setMsg("ORDERS", "Extract OK. " + res.size() + " records extracted");
            this.getConf().updateStatus(status);
  6. Implement the disconnect method to release (disconnect) the connection. 

    Example 6: Implement the _disconnect_ method
    /** * Disconnect from data source */
        @Override
       public void disconnect() throws Exception {
           if(dbif!=null){
               dbif.disconnect();
            }
        }


 

Tip: For faster searching, add an asterisk to the end of your partial query. Example: cert*