2026/3/20 8:58:38
网站建设
项目流程
做装修网站价格,哈尔滨市网站建设公司,国外素材网pinterest,互联网 社区教育网站建设论文Spring Boot 整合 Kafka 进行日志处理是一个常见的任务#xff0c;可以帮助你更好地管理和分析应用程序的日志。以下是一个基本的步骤指南#xff0c;帮助你完成这个整合。
本文介绍了如何在SpringBoot应用中整合Kafka#xff0c;利用Logback收集日志并发送到Kafka。详细步…Spring Boot 整合 Kafka 进行日志处理是一个常见的任务可以帮助你更好地管理和分析应用程序的日志。以下是一个基本的步骤指南帮助你完成这个整合。本文介绍了如何在SpringBoot应用中整合Kafka利用Logback收集日志并发送到Kafka。详细步骤包括添加Kafka相关依赖配置logback.xml自定义KafkaAppender以及提供测试。首先安装kafka.。之前已经安装过了我们这里不再讲。可以参考前面讲的类容https://blog.csdn.net/lchmyhua88/article/details/155640953?spm1001.2014.3001.5502接着我们按照下面的步骤开始1.pom依赖2.logback.xml配置3.配置 kafka消费者4.监听消费消息测试代码1. 添加依赖!-- Spring Boot Starter Kafka -- dependency groupIdorg.springframework.kafka/groupId artifactIdspring-kafka/artifactId /dependency !-- Logback Kafka Appender -- dependency groupIdcom.github.danielwegener/groupId artifactIdlogback-kafka-appender/artifactId version0.2.0-RC2/version /dependency !-- Kafka Clients -- dependency groupIdorg.apache.kafka/groupId artifactIdkafka-clients/artifactId version3.0.0/version /dependency2.配置 Logbackconfiguration !-- 控制台输出appender -- appender nameCONSOLE classch.qos.logback.core.ConsoleAppender encoder pattern%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n/pattern /encoder /appender !-- Kafka appender -- appender nameKAFKA classcom.github.danielwegener.logback.kafka.KafkaAppender encoder classch.qos.logback.classic.encoder.PatternLayoutEncoder pattern%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n/pattern /encoder !-- Kafka topic名称 -- topiclog-topic/topic !-- Kafka bootstrap servers -- keyingStrategy classcom.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy / deliveryStrategy classcom.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy / !-- Kafka producer配置 -- producerConfigbootstrap.servers192.168.33.10:9092/producerConfig producerConfigacks0/producerConfig producerConfiglinger.ms1000/producerConfig producerConfigmax.block.ms0/producerConfig !-- 过滤器设置可选 -- filter classch.qos.logback.classic.filter.ThresholdFilter levelINFO/level /filter /appender !-- 根logger配置 -- root levelINFO appender-ref refCONSOLE/ appender-ref refKAFKA/ /root /configuration3.配置 kafka消费者为了方便测试我们把生产者消费者都配置上spring.kafka.bootstrap-servers192.168.33.10:9092 #spring.kafka.consumer.group-idmy-group spring.kafka.consumer.auto-offset-resetearliest spring.kafka.consumer.enable-auto-committrue spring.kafka.consumer.key-deserializerorg.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializerorg.springframework.kafka.support.serializer.JsonDeserializer spring.kafka.consumer.properties.spring.json.trusted.packages* spring.kafka.producer.key-serializerorg.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializerorg.springframework.kafka.support.serializer.JsonDeserializer4.注入kafka bean ,方便后面调试生产者。Configuration public class KafkaProducerConfig { Value(${spring.kafka.bootstrap-servers}) private String bootstrapServers; Bean public ProducerFactoryString, Object producerFactory() { MapString, Object configProps new HashMap(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return new DefaultKafkaProducerFactory(configProps); } Bean public KafkaTemplateString, Object kafkaTemplate() { return new KafkaTemplate(producerFactory()); } }5.监听消费消息Component public class LogConsumer { KafkaListener(topics log-topic,groupId my-group) public void listen(String message) { System.out.println(收到日志消息: message); } }6.添加日志打印RestController RequestMapping(/kafka) public class LogController { Autowired private KafkaProducerServiceImpl producerService; private static final Logger logger (Logger) LoggerFactory.getLogger(LogController.class); GetMapping(/log) public String log() { //producerService.sendLogMessage(This is a test log message); logger.info(这是一条测试日志消息); logger.warn(这是一条警告日志消息); logger.error(这是一条错误日志消息); return Log message sent; } }下面我们开始测试通过调用接口控制台可以看到我写3条日志kafka消费3条消息这样我们就通过logback把日志收集到kafka里面。方便数据分析。当然你也可以写入es进行分析画像。我们可以通过kafka manger控制台来监控消息。下载kafka-manager下载地址 https://github.com/yahoo/kafka-manager解压后配置application.conf kafka地址保存后指定8888端口启动bin/kafka-manager -Dconfig.fileconf/application.conf -Dhttp.port8888启动后打开控制面板可以看到kafka集群节点信息和分区消费偏移等信息http://192.168.33.10:8888/详情可以去官网看看介绍https://github.com/yahoo/CMAK