4. Implement the Extractor

Implement your own extractor following the guidance and sample extractor. For details, see Extractor Implementation Guidelines and Sample Extractor.

Extractor Implementation Guidelines

You can create your own extractor. Note the following guidelines when creating a new extractor.

The DCS Extractor implementation requires you to create the XXXExtractor extractor that extends the BaseExtractor extractor.

All the extractors are packaged in a .JAR that you copy and deploy. The .JAR is reloaded dynamically when a new extraction is triggered without rebooting BA. For deployment, the .JAR must align with the extractor path (relative path to the extractor_manifest.xml file) that is defined in the extractor_manifest.xml file.

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<extractors>
    <extractor>
      <productName>SM</productName>                     
      <dataSourceType>MSSQL,ORACLE,DBDICT_MSSQL</dataSourceType>
      <productVersion>dbdict,7.11,9.2,9.3,9.4</productVersion>        
      <className>com.hp.itba.dwh.extractor.sm.SMExtractor</className>
      <extractorPath>extractor-sm</extractorPath>
      <extractorVersion>1.0</extractorVersion>
</extractor>
<extractor>
	…extractor2…
</extractor>
</extractors>

You should define the extractor information in the manifest file of your own Content Pack (CP). The extractor .JAR file must be located in the path of each Content Pack $HPBA_HOME/ContentPacks/<CP_Name>/EXTRACTOR/extractor_manifest.xml. The DCS framework loads and parses the manifest file to get these items once the extraction is triggered. The extractor manifest may contains several extractors information and each extractor need to specify the product name, data source type, product version, class name extractor path, and extractor version. The product name and product version may have multiple values separated by a comma. They are case sensitive, which means the Cartesian product of these values can be supported by the specified extractor class.

The product version and type should follow the source model folder structure show below.

Note To make sure the content pack can be activated and the extractor correctly found by the DCS framework, the data source type and product version should fully aligned between the extractor_manifest.xml, dataSourceDef.xml and the source folder structure.

The following describes the basic API and workflow for implementation.

Sample Extractor

package com.hp.itba.dwh.extractor.test;

import java.util.List;
import java.util.Map;

import com.hp.btoe.security.crypto.ActiveCrypto;
import com.hp.itba.dwh.dcs.api.IDataSource;
import com.hp.itba.dwh.dcs.api.impl.BaseExtractor;
import com.hp.itba.dwh.dcs.bean.DcsEntity;
import com.hp.itba.dwh.dcs.bean.DcsEntityStatus;
import com.hp.itba.dwh.dcs.common.FlatFileWriter;
import com.hp.itba.dwh.dcs.common.StatusEnum;
import com.hp.itba.dwh.dcs.exception.ExtractorException;
import com.hp.itba.dwh.dcs.util.DateUtil;

public class SampleExtractor extends BaseExtractor {

    @Override
    public String getPlatformVersion() {
         return "10.10.0"; }
    @Override
	public boolean checkConnection() throws ExtractorException {
		logger.debug("start to check the connection for CP {}.", cpKey);
		IDataSource dataSource = metadata.getDataSource();
        List<String[]> properties = dataSource.getConnProperties();
        for (String[] prop : properties) {
            String value = prop[1];
            if ("true".equalsIgnoreCase(prop[2]) && value != null) {
                try {
                    // decrypt the the property value if necessary
                    value = ActiveCrypto.getDefault().decrypt(value);
                } catch (Exception e) {
                    throw new ExtractorException("fail to decrypt password!", e);
                }
            }
        }
        // setup the connection to the data source
        // ...
        logger.debug("end of checking the connection for CP {}.", cpKey);
        return true;
	 }

@Override
	protected void doExtract(String batchId, Map<String, String>
		lastModifiedDateMap, List<DcsEntity> entities) throws ExtractorException,InterruptedException{
		logger.debug("start to extract data for CP {}. batchId: {}", cpKey,batchId);
	// get data source connection information
		IDataSource dataSource = metadata.getDataSource();
		List<String[]> properties = dataSource.getConnProperties();
		for (String[] prop : properties) {
			String value = prop[1];
			if ("true".equalsIgnoreCase(prop[2]) && value != null) {
				try {
					// decrypt the the property value if necessary
					value = ActiveCrypto.getDefault().decrypt(value);
				} catch (Exception e) {
					throw new ExtractorException("fail to decrypt password!", e);
				}
			}
		}
		// setup the connection to the data source
		// ...
		// check whether user click the abort button. If aborted, the wholeextraction should be interrupted
		stopSign.checkState();
		try {
			for (DcsEntity entity : entities) {
				stopSign.checkState();
				DcsEntityStatus entityStatus = entity.getEntityStatus();
				// mark entity status as started
				entityStatus.setStartTime(DateUtil.getCurrentTime());
				entityStatus.setEntityStatus(StatusEnum.ENTITY_STARTED.getValue());
				statusHelper.flushStatus();
				// create the FlatFileWriter with header written
				FlatFileWriter writer = createFlatFileWriter(entity);
				List<com.hp.itba.dwh.common.cp.model.DMExtractionModelColumn>
				outputColumns = entity.getOutputColumns();
				int totalRowCount = 0;
				// for each line in the fetched data
				String[] values = new String[outputColumns.size()];
				// populate the values with the data fetched from the data source
				// ...
				writer.writeLine(values);
				// record the total count of rows extracted and mark the entity as complected
				entityStatus.setResultCount(totalRowCount);
				entityStatus.setEntityStatus(StatusEnum.ENTITY_COMPLETED.getValue());
				entityStatus.setEndTime(DateUtil.getCurrentTime());
				statusHelper.increaseSuccessEntityCount();
				statusHelper.flushStatus();
			}
		} catch (ExtractorException e) {
			throw e;
		} catch (Throwable ex) {
			logger.error("error occurs when extracting data!", ex);
               }
             }
            }