开发环境要求
- JDK:jdk1.8
- 开发工具:eclipse
- java节点模板工程(通过思迈特项目实施人员获取)
开发
导入自定义java节点模板项目
打开eclipse---》File—》import

选择Existing Maven Projects
导入项目后,目录结构展现如下

注意: 如果eclipse默认的JRE不是1.8版本,请修改成1.8版本
节点代码样例(SplitNodeExample)
package smartbix.datamining.engine.execute.node.preprocess;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import smartbix.datamining.engine.domain.NodeDefine;
import smartbix.datamining.engine.execute.FlowContext;
import smartbix.datamining.engine.execute.event.DatasetEvent;
import smartbix.datamining.engine.execute.exception.CheckedException;
import smartbix.datamining.engine.execute.node.GenericNode;
import smartbix.datamining.engine.execute.node.NodeType;
import smartbix.datamining.engine.execute.node.utils.CheckMessage;
import smartbix.datamining.engine.execute.node.utils.ConfigValidator;
/**
* 拆分节点样例
*
* @since 2018-06-12
*/
@NodeType("SPLIT_EXAMPLE")
public class SplitNodeExample extends GenericNode {
/**
*
*/
private static final long serialVersionUID = -8034217739405888768L;
public SplitNodeExample(NodeDefine nodeDefine) {
super(nodeDefine);
}
@Override
public void execute(FlowContext flowContext) {
DatasetEvent inputDatasetEvent = (DatasetEvent) getInputData(flowContext, 0);
Dataset<Row> dataset = inputDatasetEvent.getDataset();
double dataset1Ratio = nodeDefine.getConfigItemDoubleValue("datasetRatio", 0.7);
double[] ratios = { dataset1Ratio, 1 - dataset1Ratio };
Dataset<Row>[] randomSplit = dataset.randomSplit(ratios);
DatasetEvent outputDatasetEvent1 = new DatasetEvent(randomSplit[0], inputDatasetEvent);
DatasetEvent outputDatasetEvent2 = new DatasetEvent(randomSplit[1], inputDatasetEvent);
finish(flowContext, outputDatasetEvent1, outputDatasetEvent2);
}
/**
* 检验配置
*/
@Override
public void checkConfig(FlowContext flowContext) {
if (ConfigValidator.isBank(nodeDefine, "datasetRatio")) {
throw new CheckedException(CheckMessage.Required.getMessage("占比"));
}
}
}
节点开发指引:
继承GenericNode 类【强制】
- 节点类所在的包,必须在smartbix.datamining包下面【强制】
添加NodeType注解,并在注解中指明节点类型【强制】
实现只有NodeDefine参数的构造函数【强制】
在execute方法内,编写节点的逻辑代码,必须在execute方法最后调用finish方法【强制】
在checkConfig方法内,实现对节点配置参数检验
获取节点输入事件样例
DatasetEvent inputDatasetEvent = (DatasetEvent) getInputData(flowContext, 0);
获取配置项值样例
double dataset1Ratio = nodeDefine.getConfigItemDoubleValue("datasetRatio", 0.7);
- Dataset类如下方法慎用(数据量大时,可能会造成内存溢出):
Dataset.collect()
Dataset.collectAsList()
Dataset.head(int n)
Dataset.take(int n)
Dataset.takeAsList(int n)
Dataset.toLocalIterator()
节点定义json样例(SplitNodeExample.json)
{
"alias": "拆分",
"name": "SPLIT_EXAMPLE",
"configs": [{
"name": "datasetRatio",
"lable": "数据集占比",
"type": "int",
"tip": "范围是[0,1]的数",
"value": 0.7
}],
"inputs": [{"id":"input_0","order": 0, "types": ["DATASET"]}],
"outputs": [{"id":"output_0","order": 0, "types": ["DATASET"]}, {"id":"output_1","order": 1, "types": ["DATASET"]}],
"type": "SPLIT_EXAMPLE",
"path": "/数据预处理"
}
节点定义属性说明
属性名 | 必选 | 类型 | 描述 |
---|
name | 是 | String | 节点名称 |
alias | 是 | String | 节点别名,在界面上显示的名称 |
configs | 是 |
| 该节点相关配置项 |
inputs | 是 |
| 节点的输入点,可以配置多个,配置了多少个,节点的顶端就显示多少个点 |
outputs | 是 |
| 节点输出点,可以配置多个,配置了多少个,节点的底端就显示多少个点 |
type | 是 |
| 节点类型,注意:这里节点类型要跟java类中NodeType注解内容一致 |
path | 是 |
| 节点路径,在节点树中所在的路径 |
节点单元测试样例(SplitNodeExampleTest)
package smartbix.datamining.engine.execute.node.preprocess;
import java.io.File;
import java.text.DecimalFormat;
import org.apache.commons.io.FileUtils;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import com.google.gson.Gson;
import smartbix.datamining.engine.domain.NodeDefine;
import smartbix.datamining.engine.execute.event.DatasetEvent;
import smartbix.datamining.engine.execute.flow.SingleNodeFlowContext;
import smartbix.datamining.engine.execute.node.NodeClassFactory;
import smartbix.datamining.engine.execute.node.NodeFactory;
import smartbix.datamining.engine.util.Configuration;
public class SplitNodeExampleTest {
SplitNodeExample splitNodeExample;
SingleNodeFlowContext flowContext;
@Before
public void before() throws Exception {
// 初始化flowContext
String master = "local";
SparkSession sparkSession = SparkSession.builder().master(master).appName("SmartBi-Mining").getOrCreate();
String url = Configuration.getFile("data" + File.separator + "Credit_Loan_Data.csv").getAbsolutePath();
Dataset<Row> dataset = sparkSession.read().format("csv").option("sep", ",").option("inferSchema", "true")
.option("header", "true").load(url);
DatasetEvent datasetEvent = new DatasetEvent(dataset);
flowContext = new SingleNodeFlowContext(sparkSession);
flowContext.putNodeInputEvent("input_0", datasetEvent);
// 初始化节点
NodeDefine nodeDefine = readNodeDefine();
NodeClassFactory.init();
splitNodeExample = (SplitNodeExample)NodeFactory.create(nodeDefine);
}
@Test
public void testExecute() {
splitNodeExample.execute(flowContext);
DatasetEvent datasetEvent0 = (DatasetEvent) flowContext.getNodeOuputEvent("output_0");
DatasetEvent datasetEvent1 = (DatasetEvent) flowContext.getNodeOuputEvent("output_1");
long count0 = datasetEvent0.getDataset().count();
long count1 = datasetEvent1.getDataset().count();
float ratio = (float) count0 / (count0 + count1);
DecimalFormat decimalFormat = new DecimalFormat("0.0");
Assert.assertEquals("0.7", decimalFormat.format(ratio));
}
/**
* 读取节点定义
*/
public NodeDefine readNodeDefine() throws Exception {
String defineFileDirPath = "src/test/java/smartbix/datamining/engine/execute/node/preprocess/";
File defineFile = new File(defineFileDirPath, "SplitNodeExample.json");
String nodeDefineJson = FileUtils.readFileToString(defineFile);
Gson gson = new Gson();
return (NodeDefine) gson.fromJson(nodeDefineJson, NodeDefine.class);
}
}
部署
1. 打包
运行项目中pom.xml文件进行打包
右键pom.xml--Run As–Maven build...

在Goals 填入clean package, 然后点击Run,便进行打包

打包后,会在target目录下生成jar包

拷贝节点定义json文件到jar相同目录下,然后把jar包跟json文件压缩成zip包



至此,NodeExample.zip 打出来的最终包
2. 部署包到系统
登录到smartbi,到系统运维—数据挖掘配置—引擎设置---上传自定义java扩展包


上传成功后,到ETL 节点树中就可以找到扩展的节点

拖到图中执行,如果能正常执行表示部署成功
