build: config spotless plugin and reformat code

This commit is contained in:
Frank.R.Wu
2023-06-15 11:17:31 +08:00
parent 587310c899
commit 0e5f92e51e
135 changed files with 2847 additions and 2989 deletions

View File

@@ -16,8 +16,6 @@ import com.alibaba.datax.plugin.rdbms.util.DBUtil;
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import com.alibaba.datax.plugin.rdbms.util.RdbmsException;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonPrimitive;
import org.apache.commons.lang3.Validate;
import org.bdware.sc.util.JsonUtil;
@@ -47,8 +45,8 @@ public class MaskingJob {
public void init(String confContent) {
maskConf = Configuration.from(confContent);
System.out.println("maskConf"+maskConf.toString());
System.out.println(("maskCOnfjob"+maskConf.get("job").toString()));
System.out.println("maskConf" + maskConf.toString());
System.out.println(("maskCOnfjob" + maskConf.get("job").toString()));
readerPara = maskConf.getConfiguration(CoreConstant.DATAX_JOB_CONTENT_READER_PARAMETER);
System.out.println(readerPara);
username = readerPara.getString(Key.USERNAME);
@@ -56,16 +54,19 @@ public class MaskingJob {
jdbcUrl = readerPara.getString(Key.JDBC_URL);
table = readerPara.getString(Key.TABLE);
buffer = new ArrayList<>();
System.out.println("maskConf11"+maskConf.getConfiguration(CoreConstant.DATAX_JOB_CONTENT + "[0]"));
transformerExecs = TransformerUtil.buildTransformerInfo(maskConf.getConfiguration(CoreConstant.DATAX_JOB_CONTENT + "[0]"));
System.out.println(
"maskConf11" + maskConf.getConfiguration(CoreConstant.DATAX_JOB_CONTENT + "[0]"));
transformerExecs = TransformerUtil.buildTransformerInfo(
maskConf.getConfiguration(CoreConstant.DATAX_JOB_CONTENT + "[0]"));
}
public String buildQuerySql() {
String column = "*";
//String column = readerPara.getString(Key.COLUMN);
// String column = readerPara.getString(Key.COLUMN);
String table = readerPara.getString(Key.TABLE);
String where = readerPara.getString(Key.WHERE, null);
//String querySql = SingleTableSplitUtil.buildQuerySql(column, table, where) + " limit 100";
// String querySql = SingleTableSplitUtil.buildQuerySql(column, table, where) + " limit
// 100";
String querySql = SingleTableSplitUtil.buildQuerySql(column, table, where) + " limit 100";
return querySql;
@@ -74,24 +75,24 @@ public class MaskingJob {
public JsonPrimitive getMaskedData(String confContent) {
init(confContent);
return startRead();
//return new JsonPrimitive(getResult());
// return new JsonPrimitive(getResult());
}
public JsonPrimitive startRead() {
String querySql = buildQuerySql();
System.out.println("startRead"+dataBaseType+jdbcUrl+username+password);
System.out.println("startRead" + dataBaseType + jdbcUrl + username + password);
Connection conn = DBUtil.getConnection(dataBaseType, jdbcUrl, username, password);
System.out.println(dataBaseType+jdbcUrl+username+password);
System.out.println(dataBaseType + jdbcUrl + username + password);
int columnNumber = 0;
String res="";
ArrayList<String>columnName=new ArrayList<>();
String res = "";
ArrayList<String> columnName = new ArrayList<>();
try {
ResultSet rs = DBUtil.query(conn, querySql);
ResultSetMetaData metaData = rs.getMetaData();
columnNumber = metaData.getColumnCount();
for(int i=1;i<=metaData.getColumnCount();i++){
//获取列表 index 从1开始、列名、列类型、列的数据长度
//System.out.println("aaa"+metaData.getColumnName(i)+"\t"+metaData.getColumnTypeName(i)+"\t"+metaData.getColumnDisplaySize(i));
for (int i = 1; i <= metaData.getColumnCount(); i++) {
// 获取列表 index 从1开始、列名、列类型、列的数据长度
// System.out.println("aaa"+metaData.getColumnName(i)+"\t"+metaData.getColumnTypeName(i)+"\t"+metaData.getColumnDisplaySize(i));
columnName.add(metaData.getColumnName(i));
}
@@ -106,15 +107,14 @@ public class MaskingJob {
} finally {
DBUtil.closeDBResources(null, conn);
}
////for(int i=0;i<columnNumber;i++){
//columnName.add(metaData.getColumnName(i));
//}//
res=getResult(columnName);
//// for(int i=0;i<columnNumber;i++){
// columnName.add(metaData.getColumnName(i));
// }//
res = getResult(columnName);
return new JsonPrimitive(res);
}
private Record transportOneRecord(ResultSet rs, ResultSetMetaData metaData,
int columnNumber) {
private Record transportOneRecord(ResultSet rs, ResultSetMetaData metaData, int columnNumber) {
Record record = buildRecord(rs, metaData, columnNumber);
sendToWriter(record);
return record;
@@ -133,32 +133,34 @@ public class MaskingJob {
}
private Record doTransformer(Record record) {
System.out.println("Record"+record);
System.out.println("Record" + record);
if (transformerExecs == null || transformerExecs.size() == 0) {
return record;
}
ClassLoaderSwapper classLoaderSwapper = ClassLoaderSwapper
.newCurrentThreadClassLoaderSwapper();
ClassLoaderSwapper classLoaderSwapper =
ClassLoaderSwapper.newCurrentThreadClassLoaderSwapper();
Record result = record;
String errorMsg = null;
boolean failed = false;
for (TransformerExecution transformerInfoExec : transformerExecs) {
System.out.println("transformerExecs"+transformerInfoExec.getTransformerName());
System.out.println("transformerExecs" + transformerInfoExec.getTransformerName());
if (transformerInfoExec.getClassLoader() != null) {
classLoaderSwapper.setCurrentThreadClassLoader(transformerInfoExec.getClassLoader());
classLoaderSwapper
.setCurrentThreadClassLoader(transformerInfoExec.getClassLoader());
}
/**
* 延迟检查transformer参数的有效性直接抛出异常不作为脏数据
* 不需要在插件中检查参数的有效性。但参数的个数等和插件相关的参数,在插件内部检查
* 延迟检查transformer参数的有效性直接抛出异常不作为脏数据 不需要在插件中检查参数的有效性。但参数的个数等和插件相关的参数,在插件内部检查
*/
if (!transformerInfoExec.isChecked()) {
if (transformerInfoExec.getColumnIndex() != null && transformerInfoExec.getColumnIndex() >= record.getColumnNumber()) {
throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_ILLEGAL_PARAMETER,
if (transformerInfoExec.getColumnIndex() != null
&& transformerInfoExec.getColumnIndex() >= record.getColumnNumber()) {
throw DataXException.asDataXException(
TransformerErrorCode.TRANSFORMER_ILLEGAL_PARAMETER,
String.format("columnIndex[%s] out of bound[%s]. name=%s",
transformerInfoExec.getColumnIndex(), record.getColumnNumber(),
transformerInfoExec.getTransformerName()));
@@ -167,14 +169,15 @@ public class MaskingJob {
}
try {
result = transformerInfoExec.getTransformer().evaluate(result, transformerInfoExec.gettContext(), transformerInfoExec.getFinalParas());
result = transformerInfoExec.getTransformer().evaluate(result,
transformerInfoExec.gettContext(), transformerInfoExec.getFinalParas());
} catch (Exception e) {
errorMsg = String.format("transformer(%s) has Exception(%s)", transformerInfoExec.getTransformerName(),
e.getMessage());
errorMsg = String.format("transformer(%s) has Exception(%s)",
transformerInfoExec.getTransformerName(), e.getMessage());
failed = true;
//LOG.error(errorMsg, e);
// LOG.error(errorMsg, e);
// transformerInfoExec.addFailedRecords(1);
//脏数据不再进行后续transformer处理按脏数据处理并过滤该record。
// 脏数据不再进行后续transformer处理按脏数据处理并过滤该record。
break;
} finally {
@@ -193,14 +196,13 @@ public class MaskingJob {
if (failed) {
return null;
} else {
System.out.println("result"+result);
System.out.println("result" + result);
return result;
}
}
protected Record buildRecord(ResultSet rs, ResultSetMetaData metaData,
int columnNumber) {
protected Record buildRecord(ResultSet rs, ResultSetMetaData metaData, int columnNumber) {
final byte[] EMPTY_CHAR_ARRAY = new byte[0];
Record record = new DefaultRecord();
try {
@@ -276,14 +278,11 @@ public class MaskingJob {
break;
default:
throw DataXException
.asDataXException(
DBUtilErrorCode.UNSUPPORTED_TYPE,
String.format(
"您的配置文件中的列配置信息有误. 因为DataX 不支持数据库读取这种字段类型. 字段名:[%s], 字段名称:[%s], 字段Java类型:[%s]. 请尝试使用数据库函数将其转换datax支持的类型 或者不同步该字段 .",
metaData.getColumnName(i),
metaData.getColumnType(i),
metaData.getColumnClassName(i)));
throw DataXException.asDataXException(DBUtilErrorCode.UNSUPPORTED_TYPE,
String.format(
"您的配置文件中的列配置信息有误. 因为DataX 不支持数据库读取这种字段类型. 字段名:[%s], 字段名称:[%s], 字段Java类型:[%s]. 请尝试使用数据库函数将其转换datax支持的类型 或者不同步该字段 .",
metaData.getColumnName(i), metaData.getColumnType(i),
metaData.getColumnClassName(i)));
}
}
} catch (Exception e) {
@@ -294,33 +293,33 @@ public class MaskingJob {
return record;
}
// private String recordToString(Record record) {
// final String NEWLINE_FLAG = "\n";
// String fieldDelimiter = "\t";
//
// int recordLength = record.getColumnNumber();
// if (0 == recordLength) {
// return NEWLINE_FLAG;
// }
//
// Column column;
// StringBuilder sb = new StringBuilder();
// for (int i = 0; i < recordLength; i++) {
// column = record.getColumn(i);
// sb.append(column.asString()).append(fieldDelimiter);
// }
// sb.setLength(sb.length() - 1);
// sb.append(NEWLINE_FLAG);
//
// return sb.toString();
// }
// private String recordToString(Record record) {
// final String NEWLINE_FLAG = "\n";
// String fieldDelimiter = "\t";
//
// int recordLength = record.getColumnNumber();
// if (0 == recordLength) {
// return NEWLINE_FLAG;
// }
//
// Column column;
// StringBuilder sb = new StringBuilder();
// for (int i = 0; i < recordLength; i++) {
// column = record.getColumn(i);
// sb.append(column.asString()).append(fieldDelimiter);
// }
// sb.setLength(sb.length() - 1);
// sb.append(NEWLINE_FLAG);
//
// return sb.toString();
// }
public String getResult(ArrayList<String> columnName) {
List<Object> dataList = new ArrayList<>();
int size = buffer.size();
//System.out.println("CCULUMN"+readerPara.getString(Key.COLUMN).toString());
//String[] colmnNames = readerPara.getString(Key.COLUMN).replace(" ", "").split(",");
int colmnSize= columnName.size();
// System.out.println("CCULUMN"+readerPara.getString(Key.COLUMN).toString());
// String[] colmnNames = readerPara.getString(Key.COLUMN).replace(" ", "").split(",");
int colmnSize = columnName.size();
for (int i = 0; i < colmnSize; ++i) {
Map<Object, Object> rowData = new HashMap<>();
for (int j = 0; j < size; ++j) {