2026/4/12 23:43:44
网站建设
项目流程
网站搭建模板,中国电子系统建设三公司网站,网站不备案行吗,北海网站建设一、Log Compaction核心概念
1. 什么是Log Compaction#xff1f;
图表
代码 复制 下载 全屏
graph TBA[原始Log] -- B[Key-Value消息流]B -- C{Log Compaction}C -- D[保留每个Key的最新值]C -- E[删除过期旧值]D -- F[压缩后的Log]subgraph 消…一、Log Compaction核心概念1.什么是Log Compaction图表代码复制下载全屏graph TB A[原始Log] -- B[Key-Value消息流] B -- C{Log Compaction} C -- D[保留每个Key的最新值] C -- E[删除过期旧值] D -- F[压缩后的Log] subgraph 消息示例 G[keyA, value1, offset0] H[keyB, value2, offset1] I[keyA, value3, offset2] J[keyC, value4, offset3] K[keyA, value5, offset4] end subgraph 压缩后 L[keyB, value2] M[keyC, value4] N[keyA, value5] end二、Log Compaction实现机制1.核心数据结构java复制下载public class LogCleaner implements Runnable { // Cleaner状态管理 private final MapTopicPartition, LogCleanerStats cleanerStats new ConcurrentHashMap(); private final MapTopicPartition, CleanerCheckpoint checkpoints new ConcurrentHashMap(); // 压缩任务队列 private final BlockingQueueCleanerTask taskQueue new LinkedBlockingQueue(); // 压缩配置 private final long maxMessageSize; private final double maxIoBytesPerSecond; private final int numThreads; public class CleanerTask { private final TopicPartition topicPartition; private final long startOffset; // 压缩起始偏移量 private final long endOffset; // 压缩结束偏移量 private final NavigableMapLong, Segment segments; // 压缩执行 public void compact() { try { // 1. 构建OffsetMap OffsetMap offsetMap buildOffsetMap(); // 2. 读取脏段Dirty Segments ListSegment dirtySegments selectDirtySegments(); // 3. 执行压缩 ListSegment cleanedSegments doClean(dirtySegments, offsetMap); // 4. 替换旧段 replaceSegments(dirtySegments, cleanedSegments); // 5. 更新检查点 updateCheckpoint(); } catch (Exception e) { logger.error(Clean failed for {}, topicPartition, e); } } } }2.压缩算法实现java复制下载public class LogCleaner { /** * 构建OffsetMap - 记录每个Key的最新偏移量 */ private OffsetMap buildOffsetMap(LogSegment headSegment, long startOffset) { OffsetMap offsetMap new SkimpyOffsetMap(memory); // 从head段开始扫描找到每个key的最新位置 for (RecordBatch batch : headSegment.batchesFrom(startOffset)) { for (Record record : batch) { if (record.hasKey()) { // 使用hash存储key到最新offset的映射 long offset batch.baseOffset() record.offsetDelta(); offsetMap.put(record.key(), offset); } } } return offsetMap; } /** * 执行压缩清理 */ private ListLogSegment doClean(ListLogSegment dirtySegments, OffsetMap offsetMap) throws IOException { ListLogSegment cleanedSegments new ArrayList(); LogSegment currentCleaned null; // 按偏移量顺序处理脏段 for (LogSegment dirtySegment : dirtySegments) { // 读取脏段中的所有记录 for (RecordBatch batch : dirtySegment.batches()) { for (Record record : batch) { if (!record.hasKey()) { // 没有key的消息直接保留 currentCleaned.append(record); continue; } long lastOffset offsetMap.get(record.key()); long currentOffset batch.baseOffset() record.offsetDelta(); if (lastOffset -1) { // Key第一次出现保留 offsetMap.put(record.key(), currentOffset); currentCleaned.append(record); } else if (currentOffset lastOffset) { // 这是该Key的最新版本 offsetMap.put(record.key(), currentOffset); currentCleaned.append(record); } else { // 这是旧版本跳过 cleanerStats.skippedRecords().increment(); } } } // 当前清理段达到大小限制创建新段 if (currentCleaned.size() segmentSize) { cleanedSegments.add(currentCleaned); currentCleaned LogSegment.create(); } } if (currentCleaned ! null currentCleaned.size() 0) { cleanedSegments.add(currentCleaned); } return cleanedSegments; } }3.压缩触发条件java复制下载public class LogCleanerManager { // 压缩检查逻辑 public boolean needsCleaning(TopicPartition tp, Log log) { // 1. 检查Topic是否启用压缩 if (!log.config.compact) { return false; } // 2. 计算脏比率Dirty Ratio long firstDirtyOffset log.firstDirtyOffset(); long logEndOffset log.logEndOffset(); if (firstDirtyOffset logEndOffset) { return false; // 没有脏数据 } double dirtyRatio (double) (logEndOffset - firstDirtyOffset) / (logEndOffset - log.activeSegment().baseOffset()); // 3. 检查脏比率是否超过阈值 double thresholdRatio log.config.minCleanableRatio; return dirtyRatio thresholdRatio; } // 选择要压缩的分区 public ListTopicPartition selectPartitionsToClean() { return allLogs.entrySet().stream() .filter(entry - needsCleaning(entry.getKey(), entry.getValue())) .sorted((a, b) - { // 按脏比率排序最脏的优先 double ratioA calculateDirtyRatio(a.getValue()); double ratioB calculateDirtyRatio(b.getValue()); return Double.compare(ratioB, ratioA); }) .map(Map.Entry::getKey) .collect(Collectors.toList()); } }三、删除策略实现1.基于时间的删除策略java复制下载public class LogManager { // 基于时间的删除实现 public void deleteOldSegmentsByTime() { long now time.milliseconds(); for (Log log : allLogs.values()) { // 检查每个段的创建时间 for (LogSegment segment : log.segments()) { long segmentAge now - segment.created(); // 检查是否超过保留时间 if (segmentAge log.config.retentionMs) { // 检查段是否可以被删除 if (canDeleteSegment(segment, log)) { deleteSegment(segment); } } } } } // 精确的保留时间检查 private boolean shouldDeleteByTime(LogSegment segment, long retentionMs, long currentTime) { // 获取段中最后一条消息的时间戳 long lastModified segment.lastModified(); // 如果消息有时间戳使用消息时间戳 if (log.config.messageTimestampType TimestampType.LOG_APPEND_TIME) { lastModified segment.maxTimestamp(); } return currentTime - lastModified retentionMs; } }2.基于大小的删除策略java复制下载public class LogManager { // 基于日志大小的删除 public void deleteOldSegmentsBySize() { for (Log log : allLogs.values()) { long totalSize log.size(); long retentionSize log.config.retentionSize; if (retentionSize 0 || totalSize retentionSize) { continue; } // 计算需要删除多少数据 long bytesToDelete totalSize - retentionSize; long bytesDeleted 0; // 从最老的段开始删除 for (LogSegment segment : log.segments()) { if (bytesDeleted bytesToDelete) { break; } if (canDeleteSegment(segment, log)) { long segmentSize segment.size(); deleteSegment(segment); bytesDeleted segmentSize; } } } } }篇幅限制下面就只能给大家展示小册部分内容了。整理了一份核心面试笔记包括了Java面试、Spring、JVM、MyBatis、Redis、MySQL、并发编程、微服务、Linux、Springboot、SpringCloud、MQ、Kafc需要全套面试笔记及答案【点击此处即可/免费获取】3.基于偏移量的删除策略java复制下载public class LogManager { // 保留最少数据基于起始偏移量 public void deleteSegmentsToRetainMinOffset() { for (Log log : allLogs.values()) { long logStartOffset log.logStartOffset(); long minOffsetToRetain calculateMinOffsetToRetain(log); // 删除起始偏移量之前的所有段 for (LogSegment segment : log.segments()) { if (segment.baseOffset() minOffsetToRetain) { if (canDeleteSegment(segment, log)) { deleteSegment(segment); } } else { break; // 按偏移量排序后续的段不需要检查 } } } } // 计算最小保留偏移量 private long calculateMinOffsetToRetain(Log log) { // 考虑多个因素 // 1. Consumer滞后 // 2. 事务状态 // 3. 领导副本状态 long minConsumerOffset Long.MAX_VALUE; // 获取所有消费者的最小偏移量 for (ConsumerState consumer : log.consumers()) { long offset consumer.committedOffset(); minConsumerOffset Math.min(minConsumerOffset, offset); } // 考虑事务控制消息 long transactionControlOffset log.lastStableOffset(); // 返回需要保留的最小偏移量 return Math.max(log.logStartOffset(), Math.min(minConsumerOffset, transactionControlOffset)); } }四、高级特性与优化1.增量压缩与检查点java复制下载public class LogCleaner { // 压缩检查点管理 private class CleanerCheckpoint { private final MapTopicPartition, Long lastCleanOffset new HashMap(); // 保存检查点 public synchronized void saveCheckpoint(TopicPartition tp, long cleanOffset) { lastCleanOffset.put(tp, cleanOffset); // 持久化到磁盘 try (FileOutputStream fos new FileOutputStream(checkpointFile)) { Properties props new Properties(); lastCleanOffset.forEach((k, v) - props.setProperty(k.toString(), String.valueOf(v))); props.store(fos, Log cleaner checkpoint); } } // 加载检查点 public synchronized void loadCheckpoint() { try (FileInputStream fis new FileInputStream(checkpointFile)) { Properties props new Properties(); props.load(fis); props.forEach((k, v) - { TopicPartition tp parseTopicPartition(k.toString()); lastCleanOffset.put(tp, Long.parseLong(v.toString())); }); } } } // 增量压缩优化 public void incrementalClean() { for (TopicPartition tp : partitionsToClean) { Log log getLog(tp); // 获取上次压缩位置 long lastCleanOffset checkpoints.getOrDefault(tp, 0L); // 只压缩新的脏数据 long dirtyStart Math.max(lastCleanOffset, log.firstDirtyOffset()); long dirtyEnd log.logEndOffset(); if (dirtyStart dirtyEnd) { CleanerTask task new CleanerTask(tp, dirtyStart, dirtyEnd); taskQueue.add(task); } } } }2.压缩策略配置properties复制下载# Kafka Broker配置示例 ############################# Log Compaction ############################# # 启用压缩 log.cleaner.enabletrue # 清理线程数 log.cleaner.threads8 # 清理器总内存 log.cleaner.dedupe.buffer.size134217728 # 128MB # 压缩触发阈值 log.cleaner.min.cleanable.ratio0.5 # 删除策略配置 log.retention.hours168 # 基于时间7天 log.retention.bytes1073741824 # 基于大小1GB log.retention.check.interval.ms300000 # 每5分钟检查一次 # 段文件配置 log.segment.bytes1073741824 # 1GB log.segment.ms604800000 # 7天 log.segment.delete.delay.ms60000 # 删除延迟1分钟3.压缩性能优化java复制下载public class LogCleaner { // 内存优化 - 分块处理大日志 private ListLogSegment cleanLargeLog(Log log, ListLogSegment dirtySegments, long maxBufferSize) { ListLogSegment cleanedSegments new ArrayList(); long currentBufferUsage 0; for (LogSegment segment : dirtySegments) { long segmentSize segment.size(); if (currentBufferUsage segmentSize maxBufferSize) { // 分批处理 ListLogSegment batch new ArrayList(); batch.add(segment); ListLogSegment cleanedBatch doClean(batch); cleanedSegments.addAll(cleanedBatch); currentBufferUsage 0; } else { currentBufferUsage segmentSize; } } return cleanedSegments; } // IO优化 - 零拷贝和批量写入 private void optimizeIOPerformance() { // 使用sendfile系统调用 FileChannel sourceChannel dirtySegment.fileChannel(); FileChannel targetChannel cleanedSegment.fileChannel(); long position 0; long count dirtySegment.size(); // 零拷贝传输 sourceChannel.transferTo(position, count, targetChannel); // 批量写入索引 cleanedSegment.updateIndex(records); } }五、监控与运维1.压缩状态监控java复制下载public class LogCleanerMetrics { // 关键监控指标 private final Meter compactionRate new Meter(); private final Histogram compactionLatency new Histogram(); private final GaugeDouble dirtyRatio new Gauge(); private final Counter deletedMessages new Counter(); private final Counter retainedMessages new Counter(); // 暴露JMX指标 public void registerMBeans() { MBeanServer mbs ManagementFactory.getPlatformMBeanServer(); ObjectName name new ObjectName( kafka.log:typeLogCleaner,name topicPartition); mbs.registerMBean(new LogCleanerMXBean() { Override public double getCompactionRate() { return compactionRate.getOneMinuteRate(); } Override public double getAvgCompactionLatency() { return compactionLatency.getMean(); } Override public double getDirtyRatio() { return dirtyRatio.getValue(); } Override public long getDeletedMessages() { return deletedMessages.getCount(); } Override public long getRetainedMessages() { retainedMessages.getCount(); } }, name); } }2.运维命令与工具bash复制下载# 查看Topic的压缩状态 kafka-topics.sh --describe --topic my-compacted-topic --bootstrap-server localhost:9092 # 手动触发压缩 kafka-configs.sh --bootstrap-server localhost:9092 \ --entity-type topics \ --entity-name my-compacted-topic \ --alter --add-config cleanup.policycompact # 查看压缩进度 kafka-run-class.sh kafka.tools.LogCleanerProgress \ --bootstrap-server localhost:9092 \ --topic my-compacted-topic # 检查段文件状态 kafka-dump-log.sh --files /tmp/kafka-logs/my-topic-0/00000000000000000000.log \ --print-data-log # 设置删除策略 kafka-configs.sh --bootstrap-server localhost:9092 \ --entity-type topics \ --entity-name my-topic \ --alter \ --add-config retention.ms604800000,retention.bytes10737418243.故障诊断与修复java复制下载public class LogCompactionValidator { // 验证压缩完整性 public void validateCompaction(TopicPartition tp) { Log log getLog(tp); // 1. 检查key的唯一性 MapBytes, Long keyOffsets new HashMap(); for (LogSegment segment : log.segments()) { for (RecordBatch batch : segment.batches()) { for (Record record : batch) { if (record.hasKey()) { Bytes key Bytes.wrap(record.key()); Long prevOffset keyOffsets.get(key); long currentOffset batch.baseOffset() record.offsetDelta(); if (prevOffset ! null currentOffset prevOffset) { // 发现重复的key但偏移量更大 - 正常情况 } else if (prevOffset ! null currentOffset prevOffset) { // 压缩错误发现了更旧的key版本 logger.error(Compaction error: found older version of key {} at offset {} than previous offset {}, key, currentOffset, prevOffset); } keyOffsets.put(key, currentOffset); } } } } // 2. 检查段连续性 long prevEndOffset -1; for (LogSegment segment : log.segments()) { if (prevEndOffset ! -1 segment.baseOffset() ! prevEndOffset) { logger.error(Segment gap found: prevEndOffset{}, baseOffset{}, prevEndOffset, segment.baseOffset()); } prevEndOffset segment.baseOffset() segment.sizeInRecords(); } } }篇幅限制下面就只能给大家展示小册部分内容了。整理了一份核心面试笔记包括了Java面试、Spring、JVM、MyBatis、Redis、MySQL、并发编程、微服务、Linux、Springboot、SpringCloud、MQ、Kafc需要全套面试笔记及答案【点击此处即可/免费获取】六、最佳实践与配置建议1.压缩策略选择指南yaml复制下载应用场景与配置建议: # 场景1: 变更数据捕获CDC cleanup.policy: compact,delete compression.type: snappy retention.ms: 604800000 # 7天 min.compaction.lag.ms: 3600000 # 延迟1小时压缩 # 场景2: 会话存储 cleanup.policy: compact,delete delete.retention.ms: 86400000 # 保留删除记录24小时 segment.ms: 3600000 # 每1小时一个段 min.cleanable.dirty.ratio: 0.1 # 快速压缩 # 场景3: 长期归档 cleanup.policy: compact retention.ms: -1 # 不基于时间删除 retention.bytes: 1099511627776 # 1TB segment.bytes: 5368709120 # 5GB大段文件 # 场景4: 高吞吐日志 cleanup.policy: delete retention.hours: 168 segment.bytes: 1073741824 # 1GB segment.ms: 604800000 # 7天2.性能优化配置properties复制下载# 生产环境推荐配置 # 内存配置 log.cleaner.dedupe.buffer.size268435456 # 256MB log.cleaner.io.buffer.size524288 # 512KB log.cleaner.io.max.bytes.per.second104857600 # 100MB/s限速 # 线程配置 log.cleaner.threads4 # 根据CPU核心数调整 num.io.threads8 num.network.threads3 # 段配置 log.segment.bytes1073741824 # 1GB log.index.interval.bytes4096 # 每4KB建索引 log.flush.interval.messages10000 log.flush.interval.ms1000 # 删除策略 log.retention.check.interval.ms300000 # 5分钟检查一次 log.segment.delete.delay.ms60000 # 延迟1分钟删除 log.cleaner.backoff.ms15000 # 压缩失败后等待15秒3.常见问题解决方案bash复制下载# 问题1: 压缩速度慢 解决方案: 1. 增加清理线程数: log.cleaner.threads8 2. 增加内存: log.cleaner.dedupe.buffer.size536870912 3. 调整段大小: log.segment.bytes536870912 # 问题2: 磁盘空间不足 解决方案: 1. 缩短保留时间: retention.ms172800000 (2天) 2. 启用压缩: compression.typelz4 3. 增加段删除频率: log.cleanup.policydelete # 问题3: 压缩导致CPU使用率高 解决方案: 1. 降低压缩优先级: log.cleaner.io.max.bytes.per.second52428800 2. 调整压缩阈值: log.cleaner.min.cleanable.ratio0.75 3. 分时段压缩: 在业务低峰期进行压缩 # 问题4: Key重复导致数据丢失 解决方案: 1. 启用删除保留: delete.retention.ms86400000 2. 监控压缩状态: 使用LogCleanerProgress工具 3. 定期验证数据完整性七、总结Kafka的Log Compaction和删除策略提供了灵活的数据生命周期管理核心优势空间效率自动删除重复数据节省存储空间数据完整性保证每个key至少有一个最新值性能优化增量压缩减少IO开销灵活配置支持时间、大小、偏移量多种删除策略关键配置点cleanup.policycompact启用压缩min.cleanable.dirty.ratio控制压缩触发阈值retention.ms/retention.bytes设置删除策略delete.retention.ms删除记录的保留时间最佳实践根据业务场景选择合适的清理策略监控压缩状态和磁盘使用率定期验证数据完整性在生产环境前充分测试配置正确配置Log Compaction和删除策略可以在保证数据完整性的同时显著提升Kafka集群的性能和存储效率。