mirror of
https://gitee.com/BDWare/cp.git
synced 2026-02-15 18:09:28 +00:00
initial commit
This commit is contained in:
337
src/main/data-mask/maskingJobs/MaskingJob.java
Normal file
337
src/main/data-mask/maskingJobs/MaskingJob.java
Normal file
@@ -0,0 +1,337 @@
|
||||
package maskingJobs;
|
||||
|
||||
import com.alibaba.datax.common.element.*;
|
||||
import com.alibaba.datax.common.exception.DataXException;
|
||||
import com.alibaba.datax.common.util.Configuration;
|
||||
import com.alibaba.datax.core.transport.record.DefaultRecord;
|
||||
import com.alibaba.datax.core.transport.transformer.TransformerErrorCode;
|
||||
import com.alibaba.datax.core.transport.transformer.TransformerExecution;
|
||||
|
||||
import com.alibaba.datax.core.util.TransformerUtil;
|
||||
import com.alibaba.datax.core.util.container.ClassLoaderSwapper;
|
||||
import com.alibaba.datax.core.util.container.CoreConstant;
|
||||
import com.alibaba.datax.plugin.rdbms.reader.Key;
|
||||
import com.alibaba.datax.plugin.rdbms.reader.util.SingleTableSplitUtil;
|
||||
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
|
||||
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
|
||||
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
|
||||
import com.alibaba.datax.plugin.rdbms.util.RdbmsException;
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.GsonBuilder;
|
||||
import com.google.gson.JsonPrimitive;
|
||||
import org.apache.commons.lang3.Validate;
|
||||
import org.bdware.sc.util.JsonUtil;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.ResultSetMetaData;
|
||||
import java.sql.Types;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
|
||||
public class MaskingJob {
|
||||
|
||||
DataBaseType dataBaseType = DataBaseType.MySql;
|
||||
private String username;
|
||||
private String password;
|
||||
private String jdbcUrl;
|
||||
private String table;
|
||||
private Configuration maskConf;
|
||||
private Configuration readerPara;
|
||||
private List<Record> buffer;
|
||||
private List<TransformerExecution> transformerExecs;
|
||||
|
||||
|
||||
public void init(String confContent) {
|
||||
maskConf = Configuration.from(confContent);
|
||||
System.out.println("maskConf"+maskConf.toString());
|
||||
System.out.println(("maskCOnfjob"+maskConf.get("job").toString()));
|
||||
readerPara = maskConf.getConfiguration(CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER);
|
||||
System.out.println(readerPara);
|
||||
username = readerPara.getString(Key.USERNAME);
|
||||
password = readerPara.getString(Key.PASSWORD);
|
||||
jdbcUrl = readerPara.getString(Key.JDBC_URL);
|
||||
table = readerPara.getString(Key.TABLE);
|
||||
buffer = new ArrayList<>();
|
||||
System.out.println("maskConf11"+maskConf.getConfiguration(CoreConstant.DATAX_JOB_CONTENT + "[0]"));
|
||||
transformerExecs = TransformerUtil.buildTransformerInfo(maskConf.getConfiguration(CoreConstant.DATAX_JOB_CONTENT + "[0]"));
|
||||
}
|
||||
|
||||
public String buildQuerySql() {
|
||||
String column = "*";
|
||||
//String column = readerPara.getString(Key.COLUMN);
|
||||
String table = readerPara.getString(Key.TABLE);
|
||||
String where = readerPara.getString(Key.WHERE, null);
|
||||
//String querySql = SingleTableSplitUtil.buildQuerySql(column, table, where) + " limit 100";
|
||||
String querySql = SingleTableSplitUtil.buildQuerySql(column, table, where) + " limit 100";
|
||||
|
||||
return querySql;
|
||||
}
|
||||
|
||||
public JsonPrimitive getMaskedData(String confContent) {
|
||||
init(confContent);
|
||||
return startRead();
|
||||
//return new JsonPrimitive(getResult());
|
||||
}
|
||||
|
||||
public JsonPrimitive startRead() {
|
||||
String querySql = buildQuerySql();
|
||||
System.out.println("startRead"+dataBaseType+jdbcUrl+username+password);
|
||||
Connection conn = DBUtil.getConnection(dataBaseType, jdbcUrl, username, password);
|
||||
System.out.println(dataBaseType+jdbcUrl+username+password);
|
||||
int columnNumber = 0;
|
||||
String res="";
|
||||
ArrayList<String>columnName=new ArrayList<>();
|
||||
try {
|
||||
ResultSet rs = DBUtil.query(conn, querySql);
|
||||
ResultSetMetaData metaData = rs.getMetaData();
|
||||
columnNumber = metaData.getColumnCount();
|
||||
for(int i=1;i<=metaData.getColumnCount();i++){
|
||||
//获取列表 index 从1开始、列名、列类型、列的数据长度
|
||||
//System.out.println("aaa"+metaData.getColumnName(i)+"\t"+metaData.getColumnTypeName(i)+"\t"+metaData.getColumnDisplaySize(i));
|
||||
columnName.add(metaData.getColumnName(i));
|
||||
|
||||
}
|
||||
|
||||
while (rs.next()) {
|
||||
transportOneRecord(rs, metaData, columnNumber);
|
||||
}
|
||||
|
||||
|
||||
} catch (Exception e) {
|
||||
throw RdbmsException.asQueryException(dataBaseType, e, querySql, table, username);
|
||||
} finally {
|
||||
DBUtil.closeDBResources(null, conn);
|
||||
}
|
||||
////for(int i=0;i<columnNumber;i++){
|
||||
//columnName.add(metaData.getColumnName(i));
|
||||
//}//
|
||||
res=getResult(columnName);
|
||||
return new JsonPrimitive(res);
|
||||
}
|
||||
|
||||
private Record transportOneRecord(ResultSet rs, ResultSetMetaData metaData,
|
||||
int columnNumber) {
|
||||
Record record = buildRecord(rs, metaData, columnNumber);
|
||||
sendToWriter(record);
|
||||
return record;
|
||||
}
|
||||
|
||||
private void sendToWriter(Record record) {
|
||||
Validate.notNull(record, "record不能为空.");
|
||||
|
||||
record = doTransformer(record);
|
||||
|
||||
if (record == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.buffer.add(record);
|
||||
}
|
||||
|
||||
private Record doTransformer(Record record) {
|
||||
System.out.println("Record"+record);
|
||||
if (transformerExecs == null || transformerExecs.size() == 0) {
|
||||
return record;
|
||||
}
|
||||
|
||||
ClassLoaderSwapper classLoaderSwapper = ClassLoaderSwapper
|
||||
.newCurrentThreadClassLoaderSwapper();
|
||||
|
||||
Record result = record;
|
||||
|
||||
String errorMsg = null;
|
||||
boolean failed = false;
|
||||
for (TransformerExecution transformerInfoExec : transformerExecs) {
|
||||
System.out.println("transformerExecs"+transformerInfoExec.getTransformerName());
|
||||
if (transformerInfoExec.getClassLoader() != null) {
|
||||
classLoaderSwapper.setCurrentThreadClassLoader(transformerInfoExec.getClassLoader());
|
||||
}
|
||||
|
||||
/**
|
||||
* 延迟检查transformer参数的有效性,直接抛出异常,不作为脏数据
|
||||
* 不需要在插件中检查参数的有效性。但参数的个数等和插件相关的参数,在插件内部检查
|
||||
*/
|
||||
if (!transformerInfoExec.isChecked()) {
|
||||
|
||||
if (transformerInfoExec.getColumnIndex() != null && transformerInfoExec.getColumnIndex() >= record.getColumnNumber()) {
|
||||
throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_ILLEGAL_PARAMETER,
|
||||
String.format("columnIndex[%s] out of bound[%s]. name=%s",
|
||||
transformerInfoExec.getColumnIndex(), record.getColumnNumber(),
|
||||
transformerInfoExec.getTransformerName()));
|
||||
}
|
||||
transformerInfoExec.setIsChecked(true);
|
||||
}
|
||||
|
||||
try {
|
||||
result = transformerInfoExec.getTransformer().evaluate(result, transformerInfoExec.gettContext(), transformerInfoExec.getFinalParas());
|
||||
} catch (Exception e) {
|
||||
errorMsg = String.format("transformer(%s) has Exception(%s)", transformerInfoExec.getTransformerName(),
|
||||
e.getMessage());
|
||||
failed = true;
|
||||
//LOG.error(errorMsg, e);
|
||||
// transformerInfoExec.addFailedRecords(1);
|
||||
//脏数据不再进行后续transformer处理,按脏数据处理,并过滤该record。
|
||||
break;
|
||||
|
||||
} finally {
|
||||
if (transformerInfoExec.getClassLoader() != null) {
|
||||
classLoaderSwapper.restoreCurrentThreadClassLoader();
|
||||
}
|
||||
}
|
||||
|
||||
if (result == null) {
|
||||
break;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
if (failed) {
|
||||
return null;
|
||||
} else {
|
||||
System.out.println("result"+result);
|
||||
return result;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
protected Record buildRecord(ResultSet rs, ResultSetMetaData metaData,
|
||||
int columnNumber) {
|
||||
final byte[] EMPTY_CHAR_ARRAY = new byte[0];
|
||||
Record record = new DefaultRecord();
|
||||
try {
|
||||
for (int i = 1; i <= columnNumber; i++) {
|
||||
switch (metaData.getColumnType(i)) {
|
||||
|
||||
case Types.CHAR:
|
||||
case Types.NCHAR:
|
||||
case Types.VARCHAR:
|
||||
case Types.LONGVARCHAR:
|
||||
case Types.NVARCHAR:
|
||||
case Types.LONGNVARCHAR:
|
||||
case Types.CLOB:
|
||||
case Types.NCLOB:
|
||||
record.addColumn(new StringColumn(rs.getString(i)));
|
||||
break;
|
||||
|
||||
case Types.SMALLINT:
|
||||
case Types.TINYINT:
|
||||
case Types.INTEGER:
|
||||
case Types.BIGINT:
|
||||
record.addColumn(new LongColumn(rs.getString(i)));
|
||||
break;
|
||||
|
||||
case Types.NUMERIC:
|
||||
case Types.DECIMAL:
|
||||
record.addColumn(new DoubleColumn(rs.getString(i)));
|
||||
break;
|
||||
|
||||
case Types.FLOAT:
|
||||
case Types.REAL:
|
||||
case Types.DOUBLE:
|
||||
record.addColumn(new DoubleColumn(rs.getString(i)));
|
||||
break;
|
||||
|
||||
case Types.TIME:
|
||||
record.addColumn(new DateColumn(rs.getTime(i)));
|
||||
break;
|
||||
|
||||
// for mysql bug, see http://bugs.mysql.com/bug.php?id=35115
|
||||
case Types.DATE:
|
||||
if (metaData.getColumnTypeName(i).equalsIgnoreCase("year")) {
|
||||
record.addColumn(new LongColumn(rs.getInt(i)));
|
||||
} else {
|
||||
record.addColumn(new DateColumn(rs.getDate(i)));
|
||||
}
|
||||
break;
|
||||
|
||||
case Types.TIMESTAMP:
|
||||
record.addColumn(new DateColumn(rs.getTimestamp(i)));
|
||||
break;
|
||||
|
||||
case Types.BINARY:
|
||||
case Types.VARBINARY:
|
||||
case Types.BLOB:
|
||||
case Types.LONGVARBINARY:
|
||||
record.addColumn(new BytesColumn(rs.getBytes(i)));
|
||||
break;
|
||||
|
||||
// warn: bit(1) -> Types.BIT 可使用BoolColumn
|
||||
// warn: bit(>1) -> Types.VARBINARY 可使用BytesColumn
|
||||
case Types.BOOLEAN:
|
||||
case Types.BIT:
|
||||
record.addColumn(new BoolColumn(rs.getBoolean(i)));
|
||||
break;
|
||||
|
||||
case Types.NULL:
|
||||
String stringData = null;
|
||||
if (rs.getObject(i) != null) {
|
||||
stringData = rs.getObject(i).toString();
|
||||
}
|
||||
record.addColumn(new StringColumn(stringData));
|
||||
break;
|
||||
|
||||
default:
|
||||
throw DataXException
|
||||
.asDataXException(
|
||||
DBUtilErrorCode.UNSUPPORTED_TYPE,
|
||||
String.format(
|
||||
"您的配置文件中的列配置信息有误. 因为DataX 不支持数据库读取这种字段类型. 字段名:[%s], 字段名称:[%s], 字段Java类型:[%s]. 请尝试使用数据库函数将其转换datax支持的类型 或者不同步该字段 .",
|
||||
metaData.getColumnName(i),
|
||||
metaData.getColumnType(i),
|
||||
metaData.getColumnClassName(i)));
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
if (e instanceof DataXException) {
|
||||
throw (DataXException) e;
|
||||
}
|
||||
}
|
||||
return record;
|
||||
}
|
||||
|
||||
// private String recordToString(Record record) {
|
||||
// final String NEWLINE_FLAG = "\n";
|
||||
// String fieldDelimiter = "\t";
|
||||
//
|
||||
// int recordLength = record.getColumnNumber();
|
||||
// if (0 == recordLength) {
|
||||
// return NEWLINE_FLAG;
|
||||
// }
|
||||
//
|
||||
// Column column;
|
||||
// StringBuilder sb = new StringBuilder();
|
||||
// for (int i = 0; i < recordLength; i++) {
|
||||
// column = record.getColumn(i);
|
||||
// sb.append(column.asString()).append(fieldDelimiter);
|
||||
// }
|
||||
// sb.setLength(sb.length() - 1);
|
||||
// sb.append(NEWLINE_FLAG);
|
||||
//
|
||||
// return sb.toString();
|
||||
// }
|
||||
|
||||
public String getResult(ArrayList<String> columnName) {
|
||||
List<Object> dataList = new ArrayList<>();
|
||||
int size = buffer.size();
|
||||
//System.out.println("CCULUMN"+readerPara.getString(Key.COLUMN).toString());
|
||||
//String[] colmnNames = readerPara.getString(Key.COLUMN).replace(" ", "").split(",");
|
||||
int colmnSize= columnName.size();
|
||||
for (int i = 0; i < colmnSize; ++i) {
|
||||
Map<Object, Object> rowData = new HashMap<>();
|
||||
for (int j = 0; j < size; ++j) {
|
||||
rowData.put(columnName.get(i), buffer.get(j).getColumn(i).asString());
|
||||
}
|
||||
dataList.add(rowData);
|
||||
}
|
||||
|
||||
|
||||
return JsonUtil.toJson(dataList);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user