下载 Oracle Instant Client Basic 即包含 ojdbcx.jar 和 xstreams.jar 。
/**
* @author chenzhenyang
* @date 2021/12/5
*/
@Component
public class OracleXStreamCommandLineRunner implements CommandLineRunner {
private XStreamOut xsOut;
private String xStreamServerName = "DBZXOUT";
@Override
public void run(String... args) throws Exception {
/**
* SELECT CURRENT_SCN FROM V$DATABASE;
*
* 11 XStreamUtility.POS_VERSION_V1
* >=12 XStreamUtility.POS_VERSION_V2
*/
String url="jdbc:oracle:oci:@127.0.0.1:1521/ORCLCDB";
String user="c##dbzuser";
String password="dbz";
Class.forName("oracle.jdbc.driver.OracleDriver");//加载数据驱动
Connection connection = DriverManager.getConnection(url, user, password);// 连接数据库
try {
byte[] startPosition = XStreamUtility.convertSCNToPosition(new NUMBER("4284379", 0), XStreamUtility.POS_VERSION_V2);
xsOut = XStreamOut.attach((oracle.jdbc.OracleConnection) connection, xStreamServerName,
startPosition, 1, 1, XStreamOut.DEFAULT_MODE);
// 2. receive events while running
while (true) {
// System.out.println("Receiving LCR");
xsOut.receiveLCRCallback(new LcrEventHandler(), XStreamOut.DEFAULT_MODE);
}
} finally {
// 3. disconnect
if (this.xsOut != null) {
try {
XStreamOut xsOut2 = this.xsOut;
this.xsOut = null;
xsOut2.detach(XStreamOut.DEFAULT_MODE);
} catch (StreamsException e) {
e.printStackTrace();
}
}
}
}
public static class LcrEventHandler implements XStreamLCRCallbackHandler{
@Override
public void processLCR(LCR lcr) throws StreamsException {
if (lcr instanceof RowLCR) {
RowLCR rowLCR = (RowLCR) lcr;
System.out.println("CommandType: "+ rowLCR.getCommandType());
ColumnValue[] oldValues = rowLCR.getOldValues();
ColumnValue[] newValues = rowLCR.getNewValues();
}
else if (lcr instanceof DDLLCR) {
// dispatchSchemaChangeEvent((DDLLCR) lcr);
System.out.println("process DDLLCR");
}
}
@Override
public void processChunk(ChunkColumnValue chunkColumnValue) throws StreamsException {
System.out.println("processChunk");
}
@Override
public LCR createLCR() throws StreamsException {
System.out.println("createLCR");
return null;
}
@Override
public ChunkColumnValue createChunk() throws StreamsException {
System.out.println("createChunk");
return null;
}
}
}