页面树结构
转至元数据结尾
转至元数据起始

正在查看旧版本。 查看 当前版本.

与当前比较 查看页面历史

« 前一个 版本 6 下一个 »

开发环境要求


  1. JDK:jdk1.8
  2. 开发工具:eclipse
  3. java节点模板工程(通过思迈特项目实施人员获取)

开发

  1. 导入自定义java节点模板项目

    打开eclipse---》File—》import

    选择Existing Maven Projects

    导入项目后,目录结构展现如下

    注意: 如果eclipse默认的JRE不是1.8版本,请修改成1.8版本

  2. 节点代码样例(SplitNodeExample)

    1.  

      package smartbix.datamining.engine.execute.node.preprocess;
      import org.apache.spark.sql.Dataset;
      import org.apache.spark.sql.Row;
      import smartbix.datamining.engine.domain.NodeDefine;
      import smartbix.datamining.engine.execute.FlowContext;
      import smartbix.datamining.engine.execute.event.DatasetEvent;
      import smartbix.datamining.engine.execute.exception.CheckedException;
      import smartbix.datamining.engine.execute.node.GenericNode;
      import smartbix.datamining.engine.execute.node.NodeType;
      import smartbix.datamining.engine.execute.node.utils.CheckMessage;
      import smartbix.datamining.engine.execute.node.utils.ConfigValidator;
      /**
       * 拆分节点样例
       *
       * @since 2018-06-12
       */
      @NodeType("SPLIT_EXAMPLE")
      public class SplitNodeExample extends GenericNode {
      	/**
      	 * 
      	 */
      	private static final long serialVersionUID = -8034217739405888768L;
      	public SplitNodeExample(NodeDefine nodeDefine) {
      		super(nodeDefine);
      	}
      	@Override
      	public void execute(FlowContext flowContext) {
      		DatasetEvent inputDatasetEvent = (DatasetEvent) getInputData(flowContext, 0);
      		Dataset<Row> dataset = inputDatasetEvent.getDataset();
      		double dataset1Ratio = nodeDefine.getConfigItemDoubleValue("datasetRatio", 0.7);
      		double[] ratios = { dataset1Ratio, 1 - dataset1Ratio };
      		Dataset<Row>[] randomSplit = dataset.randomSplit(ratios);
      		DatasetEvent outputDatasetEvent1 = new DatasetEvent(randomSplit[0], inputDatasetEvent);
      		DatasetEvent outputDatasetEvent2 = new DatasetEvent(randomSplit[1], inputDatasetEvent);
      		finish(flowContext, outputDatasetEvent1, outputDatasetEvent2);
      	}
      	/**
      	 * 检验配置
      	 */
      	@Override
      	public void checkConfig(FlowContext flowContext) {
      		if (ConfigValidator.isBank(nodeDefine, "datasetRatio")) {
      			throw new CheckedException(CheckMessage.Required.getMessage("占比"));
      		}
      	}
      }

      节点开发指引:

    • 继承GenericNode 类【强制】

    • 节点类所在的包,必须在smartbix.datamining包下面【强制】
    • 添加NodeType注解,并在注解中指明节点类型【强制】

    • 实现只有NodeDefine参数的构造函数【强制】

    • 在execute方法内,编写节点的逻辑代码,必须在execute方法最后调用finish方法【强制】

    • 在checkConfig方法内,实现对节点配置参数检验

    • 获取节点输入事件样例

      DatasetEvent inputDatasetEvent = (DatasetEvent) getInputData(flowContext, 0);
    • 获取配置项值样例

      double dataset1Ratio = nodeDefine.getConfigItemDoubleValue("datasetRatio", 0.7);


    • Dataset类如下方法慎用(数据量大时,可能会造成内存溢出):
      Dataset.collect()
      Dataset.collectAsList()
      Dataset.head(int n)
      Dataset.take(int n)
      Dataset.takeAsList(int n)
      Dataset.toLocalIterator()

  3. 节点定义json样例(SplitNodeExample.json)


    {
      "alias": "拆分",
      "name": "SPLIT_EXAMPLE",
      "configs": [{
        "name": "datasetRatio",
        "lable": "数据集占比",
        "type": "int",
        "tip": "范围是[0,1]的数",
        "value": 0.7
      }],
      "inputs": [{"id":"input_0","order": 0, "types": ["DATASET"]}],
      "outputs": [{"id":"output_0","order": 0, "types": ["DATASET"]}, {"id":"output_1","order": 1, "types": ["DATASET"]}],
      "type": "SPLIT_EXAMPLE",
      "path": "/数据预处理"
    }

    节点定义属性说明


    属性名

    必选

    类型

    描述

    name
    String

    节点名称

    aliasString

    节点别名,在界面上显示的名称

    configs

    该节点相关配置项
    inputs

    节点的输入点,可以配置多个,配置了多少个,节点的顶端就显示多少个点
    outputs

    节点输出点,可以配置多个,配置了多少个,节点的底端就显示多少个点
    type

    节点类型,注意:这里节点类型要跟java类中NodeType注解内容一致
    path
    节点路径,在节点树中所在的路径
  4. 节点单元测试样例(SplitNodeExampleTest)


     
    package smartbix.datamining.engine.execute.node.preprocess;
    
    import java.io.File;
    import java.text.DecimalFormat;
    
    import org.apache.commons.io.FileUtils;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SparkSession;
    import org.junit.Assert;
    import org.junit.Before;
    import org.junit.Test;
    
    import com.google.gson.Gson;
    
    import smartbix.datamining.engine.domain.NodeDefine;
    import smartbix.datamining.engine.execute.event.DatasetEvent;
    import smartbix.datamining.engine.execute.flow.SingleNodeFlowContext;
    import smartbix.datamining.engine.execute.node.NodeClassFactory;
    import smartbix.datamining.engine.execute.node.NodeFactory;
    import smartbix.datamining.engine.util.Configuration;
    
    public class SplitNodeExampleTest {
    
    	SplitNodeExample splitNodeExample;
    
    	SingleNodeFlowContext flowContext;
    
    	@Before
    	public void before() throws Exception {
    		// 初始化flowContext
    		String master = "local";
    		SparkSession sparkSession = SparkSession.builder().master(master).appName("SmartBi-Mining").getOrCreate();
    		String url = Configuration.getFile("data" + File.separator + "Credit_Loan_Data.csv").getAbsolutePath();
    		Dataset<Row> dataset = sparkSession.read().format("csv").option("sep", ",").option("inferSchema", "true")
    				.option("header", "true").load(url);
    		DatasetEvent datasetEvent = new DatasetEvent(dataset);
    		flowContext = new SingleNodeFlowContext(sparkSession);
    		flowContext.putNodeInputEvent("input_0", datasetEvent);
    		// 初始化节点
    		NodeDefine nodeDefine = readNodeDefine();
    		NodeClassFactory.init();
    		splitNodeExample = (SplitNodeExample)NodeFactory.create(nodeDefine);
    	}
    
    	@Test
    	public void testExecute() {
    		splitNodeExample.execute(flowContext);
    		DatasetEvent datasetEvent0 = (DatasetEvent) flowContext.getNodeOuputEvent("output_0");
    		DatasetEvent datasetEvent1 = (DatasetEvent) flowContext.getNodeOuputEvent("output_1");
    		long count0 = datasetEvent0.getDataset().count();
    		long count1 = datasetEvent1.getDataset().count();
    		float ratio = (float) count0 / (count0 + count1);
    		DecimalFormat decimalFormat = new DecimalFormat("0.0");
    		Assert.assertEquals("0.7", decimalFormat.format(ratio));
    	}
    
    	/**
    	 * 读取节点定义
    	 */
    	public NodeDefine readNodeDefine() throws Exception {
    		String defineFileDirPath = "src/test/java/smartbix/datamining/engine/execute/node/preprocess/";
    		File defineFile = new File(defineFileDirPath, "SplitNodeExample.json");
    		String nodeDefineJson = FileUtils.readFileToString(defineFile);
    		Gson gson = new Gson();
    		return (NodeDefine) gson.fromJson(nodeDefineJson, NodeDefine.class);
    	}
    
    }

部署

1. 打包

运行项目中pom.xml文件进行打包

右键pom.xml--Run As–Maven build...
 

在Goals 填入clean package, 然后点击Run,便进行打包

打包后,会在target目录下生成jar包

拷贝节点定义json文件到jar相同目录下,然后把jar包跟json文件压缩成zip包

至此,NodeExample.zip 打出来的最终包


2. 部署包到系统

登录到smartbi,到系统运维—数据挖掘配置—引擎设置---上传自定义java扩展包

上传成功后,到ETL 节点树中就可以找到扩展的节点


拖到图中执行,如果能正常执行表示部署成功

  • 无标签