1.为什么会有文件过期删除机制
由于RocketMQ操作CommitLog、ConsumeQueue文件是基于文件内存映射机制,并且在启动的时候会将所有的文件加载,为了避免内存与磁盘的浪费、能够让磁盘能够循环利用、避免因为磁盘不足导致消息无法写入等引入了文件过期删除机制
2.RocketMQ删除过期文件的思路
RocketMQ顺序写CommitLog文件、ComsumeQueue文件,所有的写操作都会落到最后一个文件上,因此在当前写文件之前的文件将不会有数据插入,也就不会有任何变动,因此可通过时间来做判断,比如超过72小时未更新的文件将会被删除
PS:RocketMQ删除过期文件时不会关注该文件的内容是否全部被消费
3.文件过期删除机制实现
3.1 触发删除的操作
image.png
由上图可知,触发文件清除操作的是一个定时任务,而且只有定时任务
// Resource reclaim interval
private int cleanResourceInterval = 10000;
文件过期删除定时任务的周期由该删除决定,默认每10s执行一次
3.2 删除源码分析
MesssageSDefaultStore#CleanCommitLogService#deleteExpiredFiles
private void deleteExpiredFiles() {
//省略
}
我们一点一点来分析其中的代码
long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
上面三个属性需要重点讲解下
- fileReservedTime:文件过期时间,也就是从文件最后一次的更新时间到现在为止,如果超过该时间,则是过期文件可被删除
- deletePhysicFilesInterval:删除物理文件的时间间隔,在一次定时任务触发时,可能会有多个物理文件超过过期时间可被删除,因此删除一个文件后需要间隔deletePhysicFilesInterval这个时间再删除另外一个文件,我猜测可能是由于删除文件是一个非常耗费IO的操作,会引起消息插入消费的延迟(相比于正常情况下),所以不建议直接删除所有过期文件
- destroyMapedFileIntervalForcibly:在删除文件时,如果该文件还被线程引用,此时会阻止此次删除操作,同时将该文件标记不可用并且纪录当前时间戳destroyMapedFileIntervalForcibly这个表示文件在第一次删除拒绝后,文件保存的最大时间,在此时间内一直会被拒绝删除,当超过这个时间时,会将引用每次减少1000,直到引用 小于等于 0为止,即可删除该文件,实现代码如下:
ReferenceResource#shutdown
public void shutdown(final long intervalForcibly) {
if (this.available) {
this.available = false;
this.firstShutdownTimestamp = System.currentTimeMillis();
this.release();
} else if (this.getRefCount() > 0) {
if ((System.currentTimeMillis() - this.firstShutdownTimestamp) >= intervalForcibly) {
this.refCount.set(-1000 - this.getRefCount());
this.release();
}
}
}
MesssageSDefaultStore#CleanCommitLogService#deleteExpiredFiles
boolean timeup = this.isTimeToDelete();
boolean spacefull = this.isSpaceToDelete();
boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;
if (timeup || spacefull || manualDelete) {
//执行删除逻辑
}
-
image.png
UtilAll#isItTimeToDo
public static boolean isItTimeToDo(final String when) {
String[] whiles = when.split(";");
if (whiles.length > 0) {
Calendar now = Calendar.getInstance();
for (String w : whiles) {
int nowHour = Integer.parseInt(w);
if (nowHour == now.get(Calendar.HOUR_OF_DAY)) {
return true;
}
}
}
return false;
}
-
spacefull:磁盘空间是否充足,磁盘不足返回true,执行过期文件删除策略,我们进去看看this.isSpaceToDelete()这个方法
MesssageSDefaultStore#CleanCommitLogService#isSpaceToDelete
double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0; //是否立即清除 cleanImmediately = false; { String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog(); double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic); if (physicRatio > diskSpaceWarningLevelRatio) { /**#1*/ boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull(); if (diskok) { DefaultMessageStore.log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full"); } cleanImmediately = true; } else if (physicRatio > diskSpaceCleanForciblyRatio) {/**#2*/ cleanImmediately = true; } else {/**#3*/ boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK(); if (!diskok) { DefaultMessageStore.log.info("physic disk space OK " + physicRatio + ", so mark disk ok"); } } if (physicRatio < 0 || physicRatio > ratio) {/**#4*/ DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + physicRatio); return true; } }
1:物理使用率大于diskSpaceWarningLevelRatio(默认90%可通过参数设置),则会阻止新消息的插入
2:物理使用率大于diskSpaceCleanForciblyRatio(默认85%,可设置),则过进行过期物理文件的删除
3:恢复磁盘可写 配合 #1使用
4:物理磁盘使用率小于diskMaxUsedSpaceRatio 表示磁盘使用正常
-
manualDelete:预留,手工触发,目前rocketmq暂未封装
MappedFileQueue#deleteExpiredFileByTime
for (int i = 0; i < mfsLength; i++) {
MappedFile mappedFile = (MappedFile) mfs[i];
long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;
if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {
if (mappedFile.destroy(intervalForcibly)) {
files.add(mappedFile);
deleteCount++;
if (files.size() >= DELETE_FILES_BATCH_MAX) {
break;
}
if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {
try {
Thread.sleep(deleteFilesInterval);
} catch (InterruptedException e) {
}
}
} else {
break;
}
} else {
//avoid deleting files in the middle
break;
}
}
image.png
从第一个文件开始遍历,判断该文件的最大存活时间(该文件的最后一次更新时间+文件的存活时间)小于 当前系统时间 或者 需要强制删除文件(上面有讲,哪些情况下强制删除),则执行MappedFile#destroy 方法清除MappedFile占用的相关资源,如果执行成功则将该文件加入待删除文件列表中统一从磁盘中删除。DELETE_FILES_BATCH_MAX 决定每次能够删除的文件个数上限、deleteFilesInterval连续清除两个文件的时间间隔由该参数决定(deletePhysicFilesInterval)
4.总结-整体流程
- 开启定时任务每10s扫描是否有文件需要删除
- 有三种情况会进入删除文件操作:到了deleteWhere指定的时间点(默认是凌晨4点)、磁盘不足、手动触发
- 对于磁盘不足的情况,当磁盘使用率大于磁盘空间警戒线水位(默认是90%),会阻止消息写入,当超过85%时会强制删除文件(需要设置允许强制删除参数,否者不生效),其他两种情况都只能删除过期的文件(文件最后更新时间+文件最大的存活时间 < 当前时间)
- 当被删除的文件存在引用时,会有一个文件删除缓存时间,在这段时间内,该文件不会被删除,主要是留给引用该文件程序一些时间,当超过了文件删除缓存时间后,每次都会将该文件的引用减少1000,直到减少小于等于0后才释放该文件引用的相关资源,然后将该文件放入一个“文件删除集合”中
- 一次连续删除文件中间会存在一定的间隔,不会连续释放文件相关的资源
- 一次连续删除的文件和不大于10
- 将“文件删除集合”中的文件从