1.参考 "自定义java节点开发流程" 编写相关json 及java 代码。
2.参考附件文档 "明道云节点部署文档" 第二点,否则web端无法显示节点。
3.如果是dockers部署,相关jar包及json 文件要放到引擎容器及tomcat 容器相关配置目录映射的物理机目录,然后重启引擎和tomcat 容器即可。
打开eclipse -> File -> import。
选择Existing Projects into Workspace,导入项目后,目录结构展现如下。
注意: 如果eclipse默认的JRE不是1.8版本,请修改成1.8版本
package smartbix.datamining.engine.execute.node.preprocess; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import smartbix.datamining.engine.domain.NodeDefine; import smartbix.datamining.engine.execute.FlowContext; import smartbix.datamining.engine.execute.event.DatasetEvent; import smartbix.datamining.engine.execute.exception.CheckedException; import smartbix.datamining.engine.execute.node.GenericNode; import smartbix.datamining.engine.execute.node.NodeType; import smartbix.datamining.engine.execute.node.utils.CheckMessage; import smartbix.datamining.engine.execute.node.utils.ConfigValidator; /** * 拆分节点样例 * * @since 2018-06-12 */ @NodeType("SPLIT_EXAMPLE") public class SplitNodeExample extends GenericNode { /** * */ private static final long serialVersionUID = -8034217739405888768L; public SplitNodeExample(NodeDefine nodeDefine) { super(nodeDefine); } @Override public void execute(FlowContext flowContext) { DatasetEvent inputDatasetEvent = (DatasetEvent) getInputData(flowContext, 0); Dataset<Row> dataset = inputDatasetEvent.getDataset(); double dataset1Ratio = nodeDefine.getConfigItemDoubleValue("datasetRatio", 0.7); double[] ratios = { dataset1Ratio, 1 - dataset1Ratio }; Dataset<Row>[] randomSplit = dataset.randomSplit(ratios); DatasetEvent outputDatasetEvent1 = new DatasetEvent(randomSplit[0], inputDatasetEvent); DatasetEvent outputDatasetEvent2 = new DatasetEvent(randomSplit[1], inputDatasetEvent); finish(flowContext, outputDatasetEvent1, outputDatasetEvent2); } /** * 检验配置 */ @Override public void checkConfig() { if (ConfigValidator.isBank(nodeDefine, "datasetRatio")) { throw new CheckedException(CheckMessage.Required.getMessage("占比")); } } } |
节点开发指引:
继承GenericNode 类【强制】
添加NodeType注解,并在注解中指明节点类型【强制】
实现只有NodeDefine参数的构造函数【强制】
在execute方法内,编写节点的逻辑代码,必须在execute方法最后调用finish方法【强制】
在checkConfig方法内,实现对节点配置参数检验
获取节点输入事件样例
DatasetEvent inputDatasetEvent = (DatasetEvent) getInputData(flowContext, 0);
获取配置项值样例
double dataset1Ratio = nodeDefine.getConfigItemDoubleValue("datasetRatio", 0.7);
{ "alias": "拆分", "name": "SPLIT_EXAMPLE", "configs": [{ "name": "datasetRatio", "lable": "数据集占比", "type": "int", "tip": "范围是[0,1]的数", "value": 0.7 }], "inputs": [{"id":"input_0","order": 0, "types": ["DATASET"]}], "outputs": [{"id":"output_0","order": 0, "types": ["DATASET"]}, {"id":"output_1","order": 1, "types": ["DATASET"]}], "type": "SPLIT_EXAMPLE", "path": "/数据预处理" } |
属性名 | 必选 | 类型 | 描述 |
---|---|---|---|
name | 是 | String | 节点名称。 |
alias | 是 | String | 节点别名,在界面上显示的名称。 |
configs | 是 | 该节点相关配置项。 | |
inputs | 是 | 节点的输入点,可以配置多个,配置了多少个,节点的顶端就显示多少个点。 | |
outputs | 是 | 节点输出点,可以配置多个,配置了多少个,节点的底端就显示多少个点。 | |
type | 是 | 节点类型,注意:这里节点类型要跟java类中NodeType注解内容一致。 | |
path | 是 | 节点路径,在节点树中所在的路径。 |
package smartbix.datamining.engine.execute.node.preprocess; import java.io.File; import java.text.DecimalFormat; import org.apache.commons.io.FileUtils; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import com.google.gson.Gson; import smartbix.datamining.engine.domain.NodeDefine; import smartbix.datamining.engine.execute.event.DatasetEvent; import smartbix.datamining.engine.execute.flow.SingleNodeFlowContext; import smartbix.datamining.engine.execute.node.NodeClassFactory; import smartbix.datamining.engine.execute.node.NodeFactory; import smartbix.datamining.engine.util.Configuration; public class SplitNodeExampleTest { SplitNodeExample splitNodeExample; SingleNodeFlowContext flowContext; @Before public void before() throws Exception { // 初始化flowContext String master = "local"; SparkSession sparkSession = SparkSession.builder().master(master).appName("SmartBi-Mining").getOrCreate(); String url = Configuration.getFile("data" + File.separator + "Credit_Loan_Data.csv").getAbsolutePath(); Dataset<Row> dataset = sparkSession.read().format("csv").option("sep", ",").option("inferSchema", "true") .option("header", "true").load(url); DatasetEvent datasetEvent = new DatasetEvent(dataset); flowContext = new SingleNodeFlowContext(sparkSession); flowContext.putNodeInputEvent("input_0", datasetEvent); // 初始化节点 NodeDefine nodeDefine = readNodeDefine(); NodeClassFactory.init(); splitNodeExample = (SplitNodeExample)NodeFactory.create(nodeDefine); } @Test public void testExecute() { splitNodeExample.execute(flowContext); DatasetEvent datasetEvent0 = (DatasetEvent) flowContext.getNodeOuputEvent("output_0"); DatasetEvent datasetEvent1 = (DatasetEvent) flowContext.getNodeOuputEvent("output_1"); long count0 = datasetEvent0.getDataset().count(); long count1 = datasetEvent1.getDataset().count(); float ratio = (float) count0 / (count0 + count1); DecimalFormat decimalFormat = new DecimalFormat("0.0"); Assert.assertEquals("0.7", decimalFormat.format(ratio)); } /** * 读取节点定义 */ public NodeDefine readNodeDefine() throws Exception { String defineFileDirPath = SplitNodeExampleTest.class.getResource("").getPath(); File defineFile = new File(defineFileDirPath, "SplitNodeExample.json"); String nodeDefineJson = FileUtils.readFileToString(defineFile); Gson gson = new Gson(); return (NodeDefine) gson.fromJson(nodeDefineJson, NodeDefine.class); } } |
拷贝的路径在系统运维 -> 系统选项 -> 机器学习配置 -> 节点路径中指定。
拷贝打包生成jar包到引擎端。
拷贝的路径在引擎conf目录下plugins/java中 (注意,conf目录路径可以在启动引擎命令参数中指定)。