RocketMQ刷盘机制

时间:2021-5-25 作者:qvyue

概览

RocketMQ的存储读写是基于JDK NIO的内存映射机制的,消息存储时首先将消息追加到内存中。在根据不同的刷盘策略在不同的时间进行刷盘。如果是同步刷盘,消息追加到内存后,将同步调用MappedByteBuffer的force()方法,同步等待刷盘结果,进行刷盘结果返回。如果是异步刷盘,在消息追加到内存后立刻,不等待刷盘结果立刻返回存储成功结果给消息发送端。RocketMQ使用一个单独的线程按照一个设定的频率执行刷盘操作。通过在broker配置文件中配置flushDiskType来设定刷盘方式,ASYNC_FLUSH(异步刷盘)、SYNC_FLUSH(同步刷盘)。默认为异步刷盘。本次以Commitlog文件刷盘机制为例来讲解刷盘机制。Consumequeue、IndexFile刷盘原理和Commitlog一直。索引文件的刷盘机制并不是采取定时刷盘机制,而是每更新一次索引文件就会将上一次的改动刷写到磁盘。

刷盘服务是将commitlog、consumequeue两者中的MappedFile文件中的MappedByteBuffer或者FileChannel中的内存中的数据,刷写到磁盘。还有将IndexFile中的MappedByteBuffer(this.mappedByteBuffer = this.mappedFile.getMappedByteBuffer())中内存的数据刷写到磁盘。

刷盘服务的入口

刷盘服务的入口是CommitLog类对象,FlushCommitLogService是刷盘服务对象,如果是同步刷盘它被赋值为GroupCommitService,如果是异步刷盘它被赋值为FlushRealTimeService;还有一个FlushCommitLogService的commitLogService对象,这个是将 TransientStorePoll 中的直接内存ByteBuffer,写到FileChannel映射的磁盘文件中的服务。

// 异步、同步刷盘服务初始化
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
    // 同步刷盘服务为 GroupCommitService
    this.flushCommitLogService = new GroupCommitService();
} else {
    // 异步刷盘服务为 FlushRealTimeService
    this.flushCommitLogService = new FlushRealTimeService();
}

// 定时将 transientStorePoll 中的直接内存 ByteBuffer,提交到内存映射 MappedByteBuffer 中
this.commitLogService = new CommitRealTimeService();
刷盘方法调用入口

putMessage()方法,将消息写入内存的方式不同,调用的刷盘方式也不同。如果是asyncPutMessage()异步将消息写入内存,submitFlushRequest()方法是刷盘入口。如果是putMessage()同步将消息写入内存,handleDiskFlush()方法是刷盘入口。handleDiskFlush()和submitFlushRequest()都包含有同步刷盘和异步刷盘的方法。

// 异步的方式存放消息
public CompletableFuture asyncPutMessage(final MessageExtBrokerInner msg) {

    // 异步存储消息,提交刷盘请求
    CompletableFuture flushResultFuture = submitFlushRequest(result, putMessageResult, msg);
    CompletableFuture replicaResultFuture = submitReplicaRequest(result, putMessageResult, msg);
    // 根据刷盘结果副本结果,返回存放消息的结果
    return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
        if (flushStatus != PutMessageStatus.PUT_OK) {
            putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
        }
        if (replicaStatus != PutMessageStatus.PUT_OK) {
            putMessageResult.setPutMessageStatus(replicaStatus);
        }
        return putMessageResult;
    });
}
// 同步方式存放消息
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {

    // handle 硬盘刷新
    handleDiskFlush(result, putMessageResult, msg);
    // handle 高可用
    handleHA(result, putMessageResult, msg);
    // 返回存储消息的结果
    return putMessageResult;
}

同步刷盘

一条消息调用一次刷盘服务,等待刷盘结果返回,然后再将结果返回;才能处理下一条刷盘消息。以handleDiskFlush()方法来介绍同步刷盘和异步刷盘,这里是区分刷盘方式的分水岭。

/**
 * 一条消息进行刷盘
 * @param result 扩展到内存ByteBuffer的结果
 * @param putMessageResult 放入ByteBuffer这个过程的结果
 * @param messageExt 存放的消息
 */
public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
    // Synchronization flush 同步
    if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
        final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
        // 是否等待服务器将这一条消息存储完毕再返回(等待刷盘完成),还是直接处理其他写队列requestsWrite里面的请求
        if (messageExt.isWaitStoreMsgOK()) {
            //刷盘请求
            GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
            //放入写请求队列
            service.putRequest(request);
            // 同步等待获取刷盘结果
            CompletableFuture flushOkFuture = request.future();
            PutMessageStatus flushStatus = null;
            try {
                // 5秒超市等待刷盘结果
                flushStatus = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
                        TimeUnit.MILLISECONDS);
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                //flushOK=false;
            }
            // 刷盘失败,更新存放消息结果超时
            if (flushStatus != PutMessageStatus.PUT_OK) {
                log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
                    + " client address: " + messageExt.getBornHostString());
                putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
            }
        } else {
            // 唤醒处理刷盘请求写磁盘线程,处理刷盘请求线程和提交刷盘请求之前的协调,通过CountDownLatch(1)操作,通过控制hasNotified状态来实现写队列和读队列的交换
            service.wakeup();
        }
    }
    // 异步
    // Asynchronous flush
    else {
        if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
            flushCommitLogService.wakeup();
        } else {
            commitLogService.wakeup();
        }
    }
}

同步刷盘会创造一个刷盘请求,然后将请求放入处理写刷盘请求的requestsWrite队列,请求里面封装了CompletableFuture对象用来记录刷盘结果,利用CompletableFuturee的get方法同步等待获取结果。flushStatus = flushOkFuture.get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),TimeUnit.MILLISECONDS);flushStatus为刷盘结果,默认等待5秒超时。

GroupCommitService为一个线程,用来定时处理requestsWrite队列里面的写刷盘请求,进行刷盘;它的requestsWrite和requestsRead队列进行了读写分离,写GroupCommitRequest请求到requestsWrite队列,读GroupCommitRequest请求从requestsRead读取,读取请求今夕写盘操作。这两个队列,形成了化零为整,将一个个请求,划分为一批,处理一批的GroupCommitRequest请求,然后requestsWrite和requestsRead队列进行交换,requestsRead作为写队列,requestsWrite作为读队列,实现读写分离。从中使用CountDownLatch2来实现处理刷盘请求线程和提交刷盘请求之前的协调,通过控制hasNotified状态来实现写队列和读队列的交换。

// 同步刷盘服务
class GroupCommitService extends FlushCommitLogService {
    // 两个队列,读写请求分离
    // 刷盘服务写入请求队列
    private volatile List requestsWrite = new ArrayList();
    // 刷盘服务读取请求队列
    private volatile List requestsRead = new ArrayList();
    // 将请求同步写入requestsWrite
    public synchronized void putRequest(final GroupCommitRequest request) {
        synchronized (this.requestsWrite) {
            this.requestsWrite.add(request);
        }
        // 唤醒刷盘线程处理请求
        this.wakeup();
    }
    // 写队列和读队列交换
    private void swapRequests() {
        List tmp = this.requestsWrite;
        this.requestsWrite = this.requestsRead;
        this.requestsRead = tmp;
    }

    private void doCommit() {
        // 上锁读请求队列
        synchronized (this.requestsRead) {
            if (!this.requestsRead.isEmpty()) {
                // 每一个请求进行刷盘
                for (GroupCommitRequest req : this.requestsRead) {
                    // There may be a message in the next file, so a maximum of
                    // two times the flush
                    // 一个落盘请求,处理两次,第一次为false,进行刷盘,一次刷盘的数据是多个offset,并不是只有当前这个offset的值,这个offset的值进行了刷盘,这个请求的第二次刷盘,这个offset已经已经落盘了,
                    // flushWhere这个值在flush方法已经更新变大,所以flushOK=true,跳出for循环,通知flushOKFuture已经完成。
                    boolean flushOK = false;
                    for (int i = 0; i = req.getNextOffset();
                        // false 刷盘
                        if (!flushOK) {
                            //0代码立刻刷盘,不管缓存中消息有多少
                            CommitLog.this.mappedFileQueue.flush(0);
                        }
                    }
                    // flushOK:true,返回ok,已经刷过盘了,不用再刷盘;false:刷盘中,返回超时
                    // 唤醒等待刷盘结果的线程
                    req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);
                }
                // 更新checkpoint的刷盘commitlog的最后刷盘时间,但是只写写到了checkpoint的内存ByteBuffer,并没有刷盘
                long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
                if (storeTimestamp > 0) {
                    CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
                }
                // 清空队列
                this.requestsRead.clear();
            } else {
                // Because of individual messages is set to not sync flush, it
                // will come to this process
                // 因为个别的消息不是同步刷盘的,所以它回到这里进行处理
                CommitLog.this.mappedFileQueue.flush(0);
            }
        }
    }

    public void run() {
        CommitLog.log.info(this.getServiceName() + " service started");
        // 线程是否停止
        while (!this.isStopped()) {
            try {
                // 设置hasNotified为false,未被通知,然后交换写对队列和读队列,重置waitPoint为(1),休息200ms,出事化为10ms,finally设置hasNotified为未被通知,交换写对队列和读队列
                this.waitForRunning(10);
                // 进行刷盘服务处理,一次处理一批请求,单个请求返回给等待刷盘服务结果的线程
                this.doCommit();
            } catch (Exception e) {
                CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
            }
        }
        // 处理非正常停机,sleep10ms,交换写请求队列和读请求队列,等待数据处理
        // Under normal circumstances shutdown, wait for the arrival of the
        // request, and then flush
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            CommitLog.log.warn("GroupCommitService Exception, ", e);
        }

        synchronized (this) {
            this.swapRequests();
        }
        // 进行请求处理
        this.doCommit();

        CommitLog.log.info(this.getServiceName() + " service end");
    }

    @Override
    protected void onWaitEnd() {
        // 写队列和读队列交换
        this.swapRequests();
    }

    @Override
    public String getServiceName() {
        return GroupCommitService.class.getSimpleName();
    }
    // 5 分钟
    @Override
    public long getJointime() {
        return 1000 * 60 * 5;
    }
}
处理刷盘请求线程和提交刷盘请求之前的协调
# org.apache.rocketmq.common.ServiceThread
// 唤醒处理刷盘请求写磁盘线程,处理刷盘请求线程和提交刷盘请求之前的协调,通过控制hasNotified状态来实现写队列和读队列的交换
public void wakeup() {
    // hasNotified默认值是false,未被唤醒,这个操作之后唤醒了,处理刷盘请求
    if (hasNotified.compareAndSet(false, true)) {
        // waitPoint默认是1,然后其他线程处理
        waitPoint.countDown(); // notify
    }
}

/**
 * 设置hasNotified为false,未被通知,然后交换写对队列和读队列,重置waitPoint为(1),休息200ms,finally设置hasNotified为未被通知,交换写对队列和读队列
 * @param interval 200ms
 */
protected void waitForRunning(long interval) {
    // compareAndSet(except,update);如果真实值value==except,设置value值为update,返回true;如果真实值value !=except,真实值不变,返回false;
    // 如果hasNotified真实值为true,那么设置真实值为false,返回true;hasNotified真实值为false,那就返回false,真实值不变
    // 如果已经通知了,那就状态变为未通知,如果是同步刷盘任务,交换写请求队列和读请求队列
    if (hasNotified.compareAndSet(true, false)) {
        // 同步刷盘:写队列和读队列交换
        this.onWaitEnd();
        return;
    }
    // 重置countDownLatch对象,等待接受刷盘请求的线程写入请求到requestsRead,写完后,waitPoint.countDown,唤醒处理刷盘请求的线程,开始刷盘
    //entry to wait
    waitPoint.reset();

    try {
        // 等待interval毫秒
        waitPoint.await(interval, TimeUnit.MILLISECONDS);
    } catch (InterruptedException e) {
        log.error("Interrupted", e);
    } finally {
        // 设置是否通知为false
        hasNotified.set(false);
        this.onWaitEnd();
    }
}
// 等待这个方法的步骤完成。比如:同步刷盘:写队列和读队列交换
protected void onWaitEnd() {
}

异步刷盘

异步刷盘根据是否开启TransientStorePool暂存池,来区分是否有commit操作。开启TransientStorePool会将writerBuffer中的数据commit到FileChannel中(fileChannel.write(writerBuffer)),然后再将FileChannel中的数据通过flush操作(fileChannel.force())到磁盘中;
如果为开启TransientStorePool,就不会有commit操作,直接flush(MappedByteBuffer.force())到磁盘中。

// 异步刷盘
// Asynchronous flush
if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
    //执行flush操作
    flushCommitLogService.wakeup();
} else {
    //执行commit操作,然后唤醒执行flush操作
    commitLogService.wakeup();
}
CommitRealTimeService

定时将 transientStorePool 中的直接内存 ByteBuffer,提交到FileChannel中,然后唤醒刷盘操作。

// 定时将 transientStorePoll 中的直接内存 ByteBuffer,提交到FileChannel中
class CommitRealTimeService extends FlushCommitLogService {

    private long lastCommitTimestamp = 0;

    @Override
    public String getServiceName() {
        return CommitRealTimeService.class.getSimpleName();
    }

    @Override
    public void run() {
        CommitLog.log.info(this.getServiceName() + " service started");
        // 刷盘线程是否停止
        while (!this.isStopped()) {
            // writerBuffer写数据到FileChannel时间间隔200ms
            int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();
            // writerBuffer写数据到FileChannel页数大小4
            int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();

            // writerBuffer写数据到FileChannel跨度时间间隔200ms
            int commitDataThoroughInterval =
                CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();
            // 开始时间
            long begin = System.currentTimeMillis();
            // 触发commit机制有两种方式:1.commit时间超过了两次commit时间间隔,然后只要有数据就进行提交 2.commit数据页数大于默认设置的4页
            // 本次commit时间>上次commit时间+两次commit时间间隔,则进行commit,不用关心commit页数的大小,设置commitDataLeastPages=0
            if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {
                this.lastCommitTimestamp = begin;
                commitDataLeastPages = 0;
            }

            try {
                // result=false,表示提交了数据,多与上次提交的位置;表示此次有数据提交;result=true,表示没有新的数据被提交
                boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
                long end = System.currentTimeMillis();
                // result = false means some data committed.表示此次有数据提交,然后进行刷盘
                if (!result) {
                    this.lastCommitTimestamp = end; // result = false means some data committed.
                    //now wake up flush thread.
                    // 唤起刷盘线程,进行刷盘
                    flushCommitLogService.wakeup();
                }

                if (end - begin > 500) {
                    log.info("Commit data to file costs {} ms", end - begin);
                }
                // 暂停200ms,再运行
                this.waitForRunning(interval);
            } catch (Throwable e) {
                CommitLog.log.error(this.getServiceName() + " service has exception. ", e);
            }
        }

        boolean result = false;
        // 正常关机,循环10次,进行10次的有数据就提交的操作
        for (int i = 0; i 
FlushRealTimeService

异步刷盘服务

class FlushRealTimeService extends FlushCommitLogService {
    private long lastFlushTimestamp = 0;
    // 刷盘次数
    private long printTimes = 0;

    public void run() {
        CommitLog.log.info(this.getServiceName() + " service started");

        while (!this.isStopped()) {
            // 默认值为false,表示await方法等待,如果为true,表示使用Thread.sleep方法等待
            boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
            // 刷盘任务时间间隔,多久刷一次盘500ms
            int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
            // 一次刷写任务至少包含页数,如果待刷写数据不足,小于该参数配置的值,将忽略本次刷写任务,默认4页
            int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
            // 两次真实刷写任务最大跨度,默认10s
            int flushPhysicQueueThoroughInterval =
                CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();

            // 打印记录日志标志
            boolean printFlushProgress = false;

            // Print flush progress
            long currentTimeMillis = System.currentTimeMillis();
            // 触发刷盘机制有两种方式:1.刷盘时间超过了两次刷盘时间间隔,然后只要有数据就进行提交 2.commit数据页数大于默认设置的4页
            // 本次刷盘时间>上次刷盘时间+两次刷盘时间间隔,则进行刷盘,不用关心刷盘页数的大小,设置commitDataLeastPages=0
            if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
                this.lastFlushTimestamp = currentTimeMillis;
                flushPhysicQueueLeastPages = 0;
                // 每间隔10次记录一次刷盘日志
                printFlushProgress = (printTimes++ % 10) == 0;
            }

            try {
                // 刷盘之前,进行线程sleep
                if (flushCommitLogTimed) {
                    Thread.sleep(interval);
                } else {
                    this.waitForRunning(interval);
                }
                // 打印记录日志
                if (printFlushProgress) {
                    this.printFlushProgress();
                }
                // 刷盘开始时间
                long begin = System.currentTimeMillis();
                // 刷盘
                CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
                long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
                // 更新checkpoint最后刷盘时间
                if (storeTimestamp > 0) {
                    CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
                }
                long past = System.currentTimeMillis() - begin;
                if (past > 500) {
                    log.info("Flush data to disk costs {} ms", past);
                }
            } catch (Throwable e) {
                CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
                this.printFlushProgress();
            }
        }
        // while循环结束,正常关机,保证所有的数据刷写到磁盘
        // Normal shutdown, to ensure that all the flush before exit
        boolean result = false;
        for (int i = 0; i 

刷盘是否开启TransientStorePool的区别

这里讲一下刷盘是否开启TransientStorePool的区别。

RocketMQ刷盘机制
image.png
不开启TransientStorePool:

MappedByteBuffer是直接内存,它暂时存储了message消息,MappedFile.mapp()方法做好MappedByteBuffer对象直接内存和落盘文件的映射关系,然后flush()方法执行MappedByteBuffer.force():强制将ByteBuffer中的任何内容的改变写入到磁盘文件。

开启TransientStorePool:

MappedFile的writerBuffer为直接开辟的内存,然后MappedFile的初始化操作,做好FileChannel和磁盘文件的映射,commit()方法实质是执行fileChannel.write(writerBuffer),将writerBuffer的数据写入到FileChannel映射的磁盘文件,flush操作执行FileChannel.force():将映射文件中的数据强制刷新到磁盘。

TransientStorePool的作用

TransientStorePool 相当于在内存层面做了读写分离,写走内存磁盘,读走pagecache,同时最大程度消除了page cache的锁竞争,降低了毛刺。它还使用了锁机制,避免直接内存被交换到swap分区。
参考:https://github.com/apache/rocketmq/issues/2466

FileChannel.force VS MappedByteBuffer.force区别

This method is only guaranteed to force changes that were made to this channel’s file via the methods defined in this class. It may or may not force changes that were made by modifying the content of a{@link MappedByteBuffer mapped byte buffer} obtained by invoking the {@link #map map} method. Invoking the {@link MappedByteBuffer#force force} method of the mapped byte buffer will force changes made to the buffer’s content to be written.

RocketMQ刷盘机制
image.png

FileChannel和MappedByteBuffer都是NIO模块的类,ByteBuffer直接内存映射到磁盘文件通过FileChannel。
FileChannel.force()只会将FileChannel类中方法使FileChannel发生改变的内容强制刷新到存储设备文件中。
MappedByteBuffer.force()会将Map类中方法使ByteBuffer发生改变的内容强制刷新到存储设备文件中。

来源:http://hg.openjdk.java.net/jdk8/jdk8/jdk/file/687fd7c7986d/src/share/classes/java/nio/channels/FileChannel.java

发送消息的方式可以分为:同步、异步、oneway
消息写入ByteBuffer的处理方式分为:同步、异步
刷盘的处理方式分为:同步、异步
三个处理方式互不干扰,发送消息的为同步,写入ByteBuffer可以为异步的方式,刷盘可以为同步的方式。最后,消息发送端会以同步的方式等待写入ByteBuffer、刷盘成功的结果。

声明:本文内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:qvyue@qq.com 进行举报,并提供相关证据,工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。