• rocketmq-mysql的EventProcessor

    小编:啊南 37阅读 2020.11.20

    本文主要研究一下rocketmq-mysql的EventProcessor

    EventProcessor

    rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.java

    public class EventProcessor {
        private static final Logger LOGGER = LoggerFactory.getLogger(EventProcessor.class);
     
        private Replicator replicator;
        private Config config;
     
        private DataSource dataSource;
     
        private BinlogPositionManager binlogPositionManager;
     
        private BlockingQueue<Event> queue = new LinkedBlockingQueue<>(100);
     
        private BinaryLogClient binaryLogClient;
     
        private EventListener eventListener;
     
        private Schema schema;
     
        private Map<Long, Table> tableMap = new HashMap<>();
     
        private Transaction transaction;
     
        public EventProcessor(Replicator replicator) {
     
            this.replicator = replicator;
            this.config = replicator.getConfig();
        }
     
        public void start() throws Exception {
     
            initDataSource();
     
            binlogPositionManager = new BinlogPositionManager(config, dataSource);
            binlogPositionManager.initBeginPosition();
     
            schema = new Schema(dataSource);
            schema.load();
     
            eventListener = new EventListener(queue);
            binaryLogClient = new BinaryLogClient(config.mysqlAddr,
                config.mysqlPort,
                config.mysqlUsername,
                config.mysqlPassword);
            binaryLogClient.setBlocking(true);
            binaryLogClient.setServerId(1001);
     
            EventDeserializer eventDeserializer = new EventDeserializer();
            eventDeserializer.setCompatibilityMode(EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
                EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY);
            binaryLogClient.setEventDeserializer(eventDeserializer);
            binaryLogClient.registerEventListener(eventListener);
            binaryLogClient.setBinlogFilename(binlogPositionManager.getBinlogFilename());
            binaryLogClient.setBinlogPosition(binlogPositionManager.getPosition());
     
            binaryLogClient.connect(3000);
     
            LOGGER.info("Started.");
     
            doProcess();
        }
     
        //......
     
    }
    EventProcessor提供了start方法,该方法首先执行initDataSource;之后创建BinlogPositionManager并执行binlogPositionManager.initBeginPosition();然后创建EventListener及BinaryLogClient并执行binaryLogClient.connect(3000);最后执行doProcess方法
    initDataSource
    rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.java

    public class EventProcessor {
     
        //......
     
        private void initDataSource() throws Exception {
            Map<String, String> map = new HashMap<>();
            map.put("driverClassName", "com.mysql.jdbc.Driver");
            map.put("url", "jdbc:mysql://" + config.mysqlAddr + ":" + config.mysqlPort + " useSSL=true&verifyServerCertificate=false");
            map.put("username", config.mysqlUsername);
            map.put("password", config.mysqlPassword);
            map.put("initialSize", "2");
            map.put("maxActive", "2");
            map.put("maxWait", "60000");
            map.put("timeBetweenEvictionRunsMillis", "60000");
            map.put("minEvictableIdleTimeMillis", "300000");
            map.put("validationQuery", "SELECT 1 FROM DUAL");
            map.put("testWhileIdle", "true");
     
            dataSource = DruidDataSourceFactory.createDataSource(map);
        }
     
        //......
     
    }
    initDataSource主要是通过DruidDataSourceFactory来创建dataSource
    doProcess
    rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.java

    public class EventProcessor {
     
        //......
     
        private void doProcess() {
     
            while (true) {
     
                try {
                    Event event = queue.poll(1000, TimeUnit.MILLISECONDS);
                    if (event == null) {
                        checkConnection();
                        continue;
                    }
     
                    switch (event.getHeader().getEventType()) {
                        case TABLE_MAP:
                            processTableMapEvent(event);
                            break;
     
                        case WRITE_ROWS:
                        case EXT_WRITE_ROWS:
                            processWriteEvent(event);
                            break;
     
                        case UPDATE_ROWS:
                        case EXT_UPDATE_ROWS:
                            processUpdateEvent(event);
                            break;
     
                        case DELETE_ROWS:
                        case EXT_DELETE_ROWS:
                            processDeleteEvent(event);
                            break;
     
                        case QUERY:
                            processQueryEvent(event);
                            break;
     
                        case XID:
                            processXidEvent(event);
                            break;
     
                    }
                } catch (Exception e) {
                    LOGGER.error("Binlog process error.", e);
                }
     
            }
        }
                
        //......
     
    }
    doProcess方法会执行queue.poll(1000, TimeUnit.MILLISECONDS)拉取event,如果event为null,则会执行checkConnection;之后根据event.getHeader().getEventType()来做不同处理;主要有processTableMapEvent、processWriteEvent、processUpdateEvent、processDeleteEvent、processQueryEvent、processXidEvent这几种
    processEvent
    rocketmq-externals/rocketmq-mysql/src/main/java/org/apache/rocketmq/mysql/binlog/EventProcessor.java

    public class EventProcessor {
     
        //......
     
        private void processTableMapEvent(Event event) {
            TableMapEventData data = event.getData();
            String dbName = data.getDatabase();
            String tableName = data.getTable();
            Long tableId = data.getTableId();
     
            Table table = schema.getTable(dbName, tableName);
     
            tableMap.put(tableId, table);
        }
     
        private void processWriteEvent(Event event) {
            WriteRowsEventData data = event.getData();
            Long tableId = data.getTableId();
            List<Serializable[]> list = data.getRows();
     
            for (Serializable[] row : list) {
                addRow("WRITE", tableId, row);
            }
        }
     
        private void processUpdateEvent(Event event) {
            UpdateRowsEventData data = event.getData();
            Long tableId = data.getTableId();
            List<Map.Entry<Serializable[], Serializable[]>> list = data.getRows();
     
            for (Map.Entry<Serializable[], Serializable[]> entry : list) {
                addRow("UPDATE", tableId, entry.getValue());
            }
        }
     
        private void processDeleteEvent(Event event) {
            DeleteRowsEventData data = event.getData();
            Long tableId = data.getTableId();
            List<Serializable[]> list = data.getRows();
     
            for (Serializable[] row : list) {
                addRow("DELETE", tableId, row);
            }
     
        }
     
        private void processQueryEvent(Event event) {
            QueryEventData data = event.getData();
            String sql = data.getSql();
     
            if (createTablePattern.matcher(sql).find()) {
                schema.reset();
            }
        }
     
        private void processXidEvent(Event event) {
            EventHeaderV4 header = event.getHeader();
            XidEventData data = event.getData();
     
            String binlogFilename = binaryLogClient.getBinlogFilename();
            Long position = header.getNextPosition();
            Long xid = data.getXid();
     
            BinlogPosition binlogPosition = new BinlogPosition(binlogFilename, position);
            transaction.setNextBinlogPosition(binlogPosition);
            transaction.setXid(xid);
     
            replicator.commit(transaction, true);
     
            transaction = new Transaction(config);
        }
     
        private void addRow(String type, Long tableId, Serializable[] row) {
     
            if (transaction == null) {
                transaction = new Transaction(config);
            }
     
            Table t = tableMap.get(tableId);
            if (t != null) {
     
                while (true) {
                    if (transaction.addRow(type, t, row)) {
                        break;
     
                    } else {
                        transaction.setNextBinlogPosition(replicator.getNextBinlogPosition());
                        replicator.commit(transaction, false);
                        transaction = new Transaction(config);
                    }
                }
     
            }
        }
     
        //......
     
    }
    processWriteEvent、processUpdateEvent、processDeleteEvent都会执行addRow方法,它会执行transaction.addRow(type, t, row),如果返回false则会执行transaction.setNextBinlogPosition以及replicator.commit;processXidEvent会执行binaryLogClient.getBinlogFilename(),更新transaction的xid及binlogPosition,然后执行replicator.commit(transaction, true),并重置transaction
    小结
    EventProcessor提供了start方法,该方法首先执行initDataSource;之后创建BinlogPositionManager并执行binlogPositionManager.initBeginPosition();然后创建EventListener及BinaryLogClient并执行binaryLogClient.connect(3000);最后执行doProcess方法

    关联标签: