public static BinlogEntry serializeToBean(byte[] input) {
BinlogEntry binlogEntry = null;
Entry entry = deserializeFromProtoBuf(input);//从 protobuf 反序列化
if(entry != null) {
binlogEntry = serializeToBean(entry);
}
return binlogEntry;
}
public static Entry deserializeFromProtoBuf(byte[] input) {
Entry entry = null;
try {
entry = Entry.parseFrom(input);
//com.alibaba.otter.canal.protocol.CanalEntry#Entry 类的方法,由 protobuf 生成
} catch (InvalidProtocolBufferException var3) {
logger.error("Exception:" + var3);
}
return entry;
}
//将 Entry 解析为一个 bean 类
public static BinlogEntry serializeToBean(Entry entry) {
RowChange rowChange = null;
try {
rowChange = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception var8) {
throw new RuntimeException("parse event has an error , data:" + entry.toString(), var8);
}
BinlogEntry binlogEntry = new BinlogEntry();
String[] logFileNames = entry.getHeader().getLogfileName().split("\\.");
String logFileNo = "000000";
if(logFileNames.length > 1) {
logFileNo = logFileNames[1];
}
binlogEntry.setBinlogFileName(logFileNo);
binlogEntry.setBinlogOffset(entry.getHeader().getLogfileOffset());
binlogEntry.setExecuteTime(entry.getHeader().getExecuteTime());
binlogEntry.setTableName(entry.getHeader().getTableName());
binlogEntry.setEventType(entry.getHeader().getEventType().toString());
Iterator primaryKeysList = rowChange.getRowDatasList().iterator();
while(primaryKeysList.hasNext()) {
RowData rowData = (RowData)primaryKeysList.next();
BinlogRow row = new BinlogRow(binlogEntry.getEventType());
row.setBeforeColumns(getColumnInfo(rowData.getBeforeColumnsList()));
row.setAfterColumns(getColumnInfo(rowData.getAfterColumnsList()));
binlogEntry.addRowData(row);
}
if(binlogEntry.getRowDatas().size() >= 1) {
BinlogRow primaryKeysList1 = (BinlogRow)binlogEntry.getRowDatas().get(0);
binlogEntry.setPrimaryKeys(getPrimaryKeys(primaryKeysList1));
} else {
ArrayList primaryKeysList2 = new ArrayList();
binlogEntry.setPrimaryKeys(primaryKeysList2);
}
return binlogEntry;
}
public class BinlogEntry implements Serializable {
private String binlogFileName;
private long binlogOffset;
private long executeTime;
private String tableName;
private String eventType;
private List<String> primaryKeys;
private List<BinlogRow> rowDatas = new ArrayList();
}
public class BinlogRow implements Serializable {
public static final String EVENT_TYPE_INSERT = "INSERT";
public static final String EVENT_TYPE_UPDATE = "UPDATE";
public static final String EVENT_TYPE_DELETE = "DELETE";
private String eventType;
private Map<String, BinlogColumn> beforeColumns;
private Map<String, BinlogColumn> afterColumns;
}
public class BinlogColumn implements Serializable {
private int index;
private String mysqlType;
private String name;
private boolean isKey;
private boolean updated;
private boolean isNull;
private String value;
}