2021-09-29 17:55:13 +08:00
|
|
|
|
package maskingJobs;
|
|
|
|
|
|
|
|
|
|
|
|
import com.alibaba.datax.common.element.*;
|
|
|
|
|
|
import com.alibaba.datax.common.exception.DataXException;
|
|
|
|
|
|
import com.alibaba.datax.common.util.Configuration;
|
|
|
|
|
|
import com.alibaba.datax.core.transport.record.DefaultRecord;
|
|
|
|
|
|
import com.alibaba.datax.core.transport.transformer.TransformerErrorCode;
|
|
|
|
|
|
import com.alibaba.datax.core.transport.transformer.TransformerExecution;
|
|
|
|
|
|
|
|
|
|
|
|
import com.alibaba.datax.core.util.TransformerUtil;
|
|
|
|
|
|
import com.alibaba.datax.core.util.container.ClassLoaderSwapper;
|
|
|
|
|
|
import com.alibaba.datax.core.util.container.CoreConstant;
|
|
|
|
|
|
import com.alibaba.datax.plugin.rdbms.reader.Key;
|
|
|
|
|
|
import com.alibaba.datax.plugin.rdbms.reader.util.SingleTableSplitUtil;
|
|
|
|
|
|
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
|
|
|
|
|
|
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
|
|
|
|
|
|
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
|
|
|
|
|
|
import com.alibaba.datax.plugin.rdbms.util.RdbmsException;
|
|
|
|
|
|
import com.google.gson.JsonPrimitive;
|
|
|
|
|
|
import org.apache.commons.lang3.Validate;
|
|
|
|
|
|
import org.bdware.sc.util.JsonUtil;
|
|
|
|
|
|
|
|
|
|
|
|
import java.sql.Connection;
|
|
|
|
|
|
import java.sql.ResultSet;
|
|
|
|
|
|
import java.sql.ResultSetMetaData;
|
|
|
|
|
|
import java.sql.Types;
|
|
|
|
|
|
import java.util.ArrayList;
|
|
|
|
|
|
import java.util.HashMap;
|
|
|
|
|
|
import java.util.List;
|
|
|
|
|
|
import java.util.Map;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public class MaskingJob {
|
|
|
|
|
|
|
|
|
|
|
|
DataBaseType dataBaseType = DataBaseType.MySql;
|
|
|
|
|
|
private String username;
|
|
|
|
|
|
private String password;
|
|
|
|
|
|
private String jdbcUrl;
|
|
|
|
|
|
private String table;
|
|
|
|
|
|
private Configuration maskConf;
|
|
|
|
|
|
private Configuration readerPara;
|
|
|
|
|
|
private List<Record> buffer;
|
|
|
|
|
|
private List<TransformerExecution> transformerExecs;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public void init(String confContent) {
|
|
|
|
|
|
maskConf = Configuration.from(confContent);
|
2023-06-15 11:17:31 +08:00
|
|
|
|
System.out.println("maskConf" + maskConf.toString());
|
|
|
|
|
|
System.out.println(("maskCOnfjob" + maskConf.get("job").toString()));
|
2021-09-29 17:55:13 +08:00
|
|
|
|
readerPara = maskConf.getConfiguration(CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER);
|
|
|
|
|
|
System.out.println(readerPara);
|
|
|
|
|
|
username = readerPara.getString(Key.USERNAME);
|
|
|
|
|
|
password = readerPara.getString(Key.PASSWORD);
|
|
|
|
|
|
jdbcUrl = readerPara.getString(Key.JDBC_URL);
|
|
|
|
|
|
table = readerPara.getString(Key.TABLE);
|
|
|
|
|
|
buffer = new ArrayList<>();
|
2023-06-15 11:17:31 +08:00
|
|
|
|
System.out.println(
|
|
|
|
|
|
"maskConf11" + maskConf.getConfiguration(CoreConstant.DATAX_JOB_CONTENT + "[0]"));
|
|
|
|
|
|
transformerExecs = TransformerUtil.buildTransformerInfo(
|
|
|
|
|
|
maskConf.getConfiguration(CoreConstant.DATAX_JOB_CONTENT + "[0]"));
|
2021-09-29 17:55:13 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
public String buildQuerySql() {
|
|
|
|
|
|
String column = "*";
|
2023-06-15 11:17:31 +08:00
|
|
|
|
// String column = readerPara.getString(Key.COLUMN);
|
2021-09-29 17:55:13 +08:00
|
|
|
|
String table = readerPara.getString(Key.TABLE);
|
|
|
|
|
|
String where = readerPara.getString(Key.WHERE, null);
|
2023-06-15 11:17:31 +08:00
|
|
|
|
// String querySql = SingleTableSplitUtil.buildQuerySql(column, table, where) + " limit
|
|
|
|
|
|
// 100";
|
2021-09-29 17:55:13 +08:00
|
|
|
|
String querySql = SingleTableSplitUtil.buildQuerySql(column, table, where) + " limit 100";
|
|
|
|
|
|
|
|
|
|
|
|
return querySql;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
public JsonPrimitive getMaskedData(String confContent) {
|
|
|
|
|
|
init(confContent);
|
|
|
|
|
|
return startRead();
|
2023-06-15 11:17:31 +08:00
|
|
|
|
// return new JsonPrimitive(getResult());
|
2021-09-29 17:55:13 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
public JsonPrimitive startRead() {
|
|
|
|
|
|
String querySql = buildQuerySql();
|
2023-06-15 11:17:31 +08:00
|
|
|
|
System.out.println("startRead" + dataBaseType + jdbcUrl + username + password);
|
2021-09-29 17:55:13 +08:00
|
|
|
|
Connection conn = DBUtil.getConnection(dataBaseType, jdbcUrl, username, password);
|
2023-06-15 11:17:31 +08:00
|
|
|
|
System.out.println(dataBaseType + jdbcUrl + username + password);
|
2021-09-29 17:55:13 +08:00
|
|
|
|
int columnNumber = 0;
|
2023-06-15 11:17:31 +08:00
|
|
|
|
String res = "";
|
|
|
|
|
|
ArrayList<String> columnName = new ArrayList<>();
|
2021-09-29 17:55:13 +08:00
|
|
|
|
try {
|
|
|
|
|
|
ResultSet rs = DBUtil.query(conn, querySql);
|
|
|
|
|
|
ResultSetMetaData metaData = rs.getMetaData();
|
|
|
|
|
|
columnNumber = metaData.getColumnCount();
|
2023-06-15 11:17:31 +08:00
|
|
|
|
for (int i = 1; i <= metaData.getColumnCount(); i++) {
|
|
|
|
|
|
// 获取列表 index 从1开始、列名、列类型、列的数据长度
|
|
|
|
|
|
// System.out.println("aaa"+metaData.getColumnName(i)+"\t"+metaData.getColumnTypeName(i)+"\t"+metaData.getColumnDisplaySize(i));
|
2021-09-29 17:55:13 +08:00
|
|
|
|
columnName.add(metaData.getColumnName(i));
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
while (rs.next()) {
|
|
|
|
|
|
transportOneRecord(rs, metaData, columnNumber);
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
|
throw RdbmsException.asQueryException(dataBaseType, e, querySql, table, username);
|
|
|
|
|
|
} finally {
|
|
|
|
|
|
DBUtil.closeDBResources(null, conn);
|
|
|
|
|
|
}
|
2023-06-15 11:17:31 +08:00
|
|
|
|
//// for(int i=0;i<columnNumber;i++){
|
|
|
|
|
|
// columnName.add(metaData.getColumnName(i));
|
|
|
|
|
|
// }//
|
|
|
|
|
|
res = getResult(columnName);
|
2021-09-29 17:55:13 +08:00
|
|
|
|
return new JsonPrimitive(res);
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2023-06-15 11:17:31 +08:00
|
|
|
|
private Record transportOneRecord(ResultSet rs, ResultSetMetaData metaData, int columnNumber) {
|
2021-09-29 17:55:13 +08:00
|
|
|
|
Record record = buildRecord(rs, metaData, columnNumber);
|
|
|
|
|
|
sendToWriter(record);
|
|
|
|
|
|
return record;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private void sendToWriter(Record record) {
|
|
|
|
|
|
Validate.notNull(record, "record不能为空.");
|
|
|
|
|
|
|
|
|
|
|
|
record = doTransformer(record);
|
|
|
|
|
|
|
|
|
|
|
|
if (record == null) {
|
|
|
|
|
|
return;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
this.buffer.add(record);
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
private Record doTransformer(Record record) {
|
2023-06-15 11:17:31 +08:00
|
|
|
|
System.out.println("Record" + record);
|
2021-09-29 17:55:13 +08:00
|
|
|
|
if (transformerExecs == null || transformerExecs.size() == 0) {
|
|
|
|
|
|
return record;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2023-06-15 11:17:31 +08:00
|
|
|
|
ClassLoaderSwapper classLoaderSwapper =
|
|
|
|
|
|
ClassLoaderSwapper.newCurrentThreadClassLoaderSwapper();
|
2021-09-29 17:55:13 +08:00
|
|
|
|
|
|
|
|
|
|
Record result = record;
|
|
|
|
|
|
|
|
|
|
|
|
String errorMsg = null;
|
|
|
|
|
|
boolean failed = false;
|
|
|
|
|
|
for (TransformerExecution transformerInfoExec : transformerExecs) {
|
2023-06-15 11:17:31 +08:00
|
|
|
|
System.out.println("transformerExecs" + transformerInfoExec.getTransformerName());
|
2021-09-29 17:55:13 +08:00
|
|
|
|
if (transformerInfoExec.getClassLoader() != null) {
|
2023-06-15 11:17:31 +08:00
|
|
|
|
classLoaderSwapper
|
|
|
|
|
|
.setCurrentThreadClassLoader(transformerInfoExec.getClassLoader());
|
2021-09-29 17:55:13 +08:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
2023-06-15 11:17:31 +08:00
|
|
|
|
* 延迟检查transformer参数的有效性,直接抛出异常,不作为脏数据 不需要在插件中检查参数的有效性。但参数的个数等和插件相关的参数,在插件内部检查
|
2021-09-29 17:55:13 +08:00
|
|
|
|
*/
|
|
|
|
|
|
if (!transformerInfoExec.isChecked()) {
|
|
|
|
|
|
|
2023-06-15 11:17:31 +08:00
|
|
|
|
if (transformerInfoExec.getColumnIndex() != null
|
|
|
|
|
|
&& transformerInfoExec.getColumnIndex() >= record.getColumnNumber()) {
|
|
|
|
|
|
throw DataXException.asDataXException(
|
|
|
|
|
|
TransformerErrorCode.TRANSFORMER_ILLEGAL_PARAMETER,
|
2021-09-29 17:55:13 +08:00
|
|
|
|
String.format("columnIndex[%s] out of bound[%s]. name=%s",
|
|
|
|
|
|
transformerInfoExec.getColumnIndex(), record.getColumnNumber(),
|
|
|
|
|
|
transformerInfoExec.getTransformerName()));
|
|
|
|
|
|
}
|
|
|
|
|
|
transformerInfoExec.setIsChecked(true);
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
try {
|
2023-06-15 11:17:31 +08:00
|
|
|
|
result = transformerInfoExec.getTransformer().evaluate(result,
|
|
|
|
|
|
transformerInfoExec.gettContext(), transformerInfoExec.getFinalParas());
|
2021-09-29 17:55:13 +08:00
|
|
|
|
} catch (Exception e) {
|
2023-06-15 11:17:31 +08:00
|
|
|
|
errorMsg = String.format("transformer(%s) has Exception(%s)",
|
|
|
|
|
|
transformerInfoExec.getTransformerName(), e.getMessage());
|
2021-09-29 17:55:13 +08:00
|
|
|
|
failed = true;
|
2023-06-15 11:17:31 +08:00
|
|
|
|
// LOG.error(errorMsg, e);
|
2021-09-29 17:55:13 +08:00
|
|
|
|
// transformerInfoExec.addFailedRecords(1);
|
2023-06-15 11:17:31 +08:00
|
|
|
|
// 脏数据不再进行后续transformer处理,按脏数据处理,并过滤该record。
|
2021-09-29 17:55:13 +08:00
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
|
|
} finally {
|
|
|
|
|
|
if (transformerInfoExec.getClassLoader() != null) {
|
|
|
|
|
|
classLoaderSwapper.restoreCurrentThreadClassLoader();
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if (result == null) {
|
|
|
|
|
|
break;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (failed) {
|
|
|
|
|
|
return null;
|
|
|
|
|
|
} else {
|
2023-06-15 11:17:31 +08:00
|
|
|
|
System.out.println("result" + result);
|
2021-09-29 17:55:13 +08:00
|
|
|
|
return result;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2023-06-15 11:17:31 +08:00
|
|
|
|
protected Record buildRecord(ResultSet rs, ResultSetMetaData metaData, int columnNumber) {
|
2021-09-29 17:55:13 +08:00
|
|
|
|
final byte[] EMPTY_CHAR_ARRAY = new byte[0];
|
|
|
|
|
|
Record record = new DefaultRecord();
|
|
|
|
|
|
try {
|
|
|
|
|
|
for (int i = 1; i <= columnNumber; i++) {
|
|
|
|
|
|
switch (metaData.getColumnType(i)) {
|
|
|
|
|
|
|
|
|
|
|
|
case Types.CHAR:
|
|
|
|
|
|
case Types.NCHAR:
|
|
|
|
|
|
case Types.VARCHAR:
|
|
|
|
|
|
case Types.LONGVARCHAR:
|
|
|
|
|
|
case Types.NVARCHAR:
|
|
|
|
|
|
case Types.LONGNVARCHAR:
|
|
|
|
|
|
case Types.CLOB:
|
|
|
|
|
|
case Types.NCLOB:
|
|
|
|
|
|
record.addColumn(new StringColumn(rs.getString(i)));
|
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
|
|
case Types.SMALLINT:
|
|
|
|
|
|
case Types.TINYINT:
|
|
|
|
|
|
case Types.INTEGER:
|
|
|
|
|
|
case Types.BIGINT:
|
|
|
|
|
|
record.addColumn(new LongColumn(rs.getString(i)));
|
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
|
|
case Types.NUMERIC:
|
|
|
|
|
|
case Types.DECIMAL:
|
|
|
|
|
|
record.addColumn(new DoubleColumn(rs.getString(i)));
|
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
|
|
case Types.FLOAT:
|
|
|
|
|
|
case Types.REAL:
|
|
|
|
|
|
case Types.DOUBLE:
|
|
|
|
|
|
record.addColumn(new DoubleColumn(rs.getString(i)));
|
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
|
|
case Types.TIME:
|
|
|
|
|
|
record.addColumn(new DateColumn(rs.getTime(i)));
|
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
|
|
// for mysql bug, see http://bugs.mysql.com/bug.php?id=35115
|
|
|
|
|
|
case Types.DATE:
|
|
|
|
|
|
if (metaData.getColumnTypeName(i).equalsIgnoreCase("year")) {
|
|
|
|
|
|
record.addColumn(new LongColumn(rs.getInt(i)));
|
|
|
|
|
|
} else {
|
|
|
|
|
|
record.addColumn(new DateColumn(rs.getDate(i)));
|
|
|
|
|
|
}
|
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
|
|
case Types.TIMESTAMP:
|
|
|
|
|
|
record.addColumn(new DateColumn(rs.getTimestamp(i)));
|
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
|
|
case Types.BINARY:
|
|
|
|
|
|
case Types.VARBINARY:
|
|
|
|
|
|
case Types.BLOB:
|
|
|
|
|
|
case Types.LONGVARBINARY:
|
|
|
|
|
|
record.addColumn(new BytesColumn(rs.getBytes(i)));
|
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
|
|
// warn: bit(1) -> Types.BIT 可使用BoolColumn
|
|
|
|
|
|
// warn: bit(>1) -> Types.VARBINARY 可使用BytesColumn
|
|
|
|
|
|
case Types.BOOLEAN:
|
|
|
|
|
|
case Types.BIT:
|
|
|
|
|
|
record.addColumn(new BoolColumn(rs.getBoolean(i)));
|
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
|
|
case Types.NULL:
|
|
|
|
|
|
String stringData = null;
|
|
|
|
|
|
if (rs.getObject(i) != null) {
|
|
|
|
|
|
stringData = rs.getObject(i).toString();
|
|
|
|
|
|
}
|
|
|
|
|
|
record.addColumn(new StringColumn(stringData));
|
|
|
|
|
|
break;
|
|
|
|
|
|
|
|
|
|
|
|
default:
|
2023-06-15 11:17:31 +08:00
|
|
|
|
throw DataXException.asDataXException(DBUtilErrorCode.UNSUPPORTED_TYPE,
|
|
|
|
|
|
String.format(
|
|
|
|
|
|
"您的配置文件中的列配置信息有误. 因为DataX 不支持数据库读取这种字段类型. 字段名:[%s], 字段名称:[%s], 字段Java类型:[%s]. 请尝试使用数据库函数将其转换datax支持的类型 或者不同步该字段 .",
|
|
|
|
|
|
metaData.getColumnName(i), metaData.getColumnType(i),
|
|
|
|
|
|
metaData.getColumnClassName(i)));
|
2021-09-29 17:55:13 +08:00
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
|
if (e instanceof DataXException) {
|
|
|
|
|
|
throw (DataXException) e;
|
|
|
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
return record;
|
|
|
|
|
|
}
|
|
|
|
|
|
|
2023-06-15 11:17:31 +08:00
|
|
|
|
// private String recordToString(Record record) {
|
|
|
|
|
|
// final String NEWLINE_FLAG = "\n";
|
|
|
|
|
|
// String fieldDelimiter = "\t";
|
|
|
|
|
|
//
|
|
|
|
|
|
// int recordLength = record.getColumnNumber();
|
|
|
|
|
|
// if (0 == recordLength) {
|
|
|
|
|
|
// return NEWLINE_FLAG;
|
|
|
|
|
|
// }
|
|
|
|
|
|
//
|
|
|
|
|
|
// Column column;
|
|
|
|
|
|
// StringBuilder sb = new StringBuilder();
|
|
|
|
|
|
// for (int i = 0; i < recordLength; i++) {
|
|
|
|
|
|
// column = record.getColumn(i);
|
|
|
|
|
|
// sb.append(column.asString()).append(fieldDelimiter);
|
|
|
|
|
|
// }
|
|
|
|
|
|
// sb.setLength(sb.length() - 1);
|
|
|
|
|
|
// sb.append(NEWLINE_FLAG);
|
|
|
|
|
|
//
|
|
|
|
|
|
// return sb.toString();
|
|
|
|
|
|
// }
|
2021-09-29 17:55:13 +08:00
|
|
|
|
|
|
|
|
|
|
public String getResult(ArrayList<String> columnName) {
|
|
|
|
|
|
List<Object> dataList = new ArrayList<>();
|
|
|
|
|
|
int size = buffer.size();
|
2023-06-15 11:17:31 +08:00
|
|
|
|
// System.out.println("CCULUMN"+readerPara.getString(Key.COLUMN).toString());
|
|
|
|
|
|
// String[] colmnNames = readerPara.getString(Key.COLUMN).replace(" ", "").split(",");
|
|
|
|
|
|
int colmnSize = columnName.size();
|
2021-09-29 17:55:13 +08:00
|
|
|
|
for (int i = 0; i < colmnSize; ++i) {
|
|
|
|
|
|
Map<Object, Object> rowData = new HashMap<>();
|
|
|
|
|
|
for (int j = 0; j < size; ++j) {
|
|
|
|
|
|
rowData.put(columnName.get(i), buffer.get(j).getColumn(i).asString());
|
|
|
|
|
|
}
|
|
|
|
|
|
dataList.add(rowData);
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return JsonUtil.toJson(dataList);
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|