如何使用Java实现Kafka生产者和消费者的示例-创新互联
这篇文章将为大家详细讲解有关如何使用Java实现Kafka生产者和消费者的示例,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。
成都创新互联是专业的安陆网站建设公司,安陆接单;提供成都网站设计、网站建设,网页设计,网站设计,建网站,PHP网站建设等专业做网站服务;采用PHP框架,可快速的进行安陆网站开发网页制作和功能扩展;专业做搜索引擎喜爱的网站,专业的做网站团队,希望更多企业前来合作!Kafka简介
Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。

方式一:kafka-clients
引入依赖
在pom.xml文件中,引入kafka-clients依赖:
org.apache.kafka kafka-clients 2.3.1
生产者
创建一个KafkaProducer的生产者实例:
@Configuration
public class Config {
public final static String bootstrapServers = "127.0.0.1:9092";
@Bean(destroyMethod = "close")
public KafkaProducer kafkaProducer() {
Properties props = new Properties();
//设置Kafka服务器地址
props.put("bootstrap.servers", bootstrapServers);
//设置数据key的序列化处理类
props.put("key.serializer", StringSerializer.class.getName());
//设置数据value的序列化处理类
props.put("value.serializer", StringSerializer.class.getName());
KafkaProducer producer = new KafkaProducer<>(props);
return producer;
}
} 在Controller中进行使用:
@RestController
@Slf4j
public class Controller {
@Autowired
private KafkaProducer kafkaProducer;
@RequestMapping("/kafkaClientsSend")
public String send() {
String uuid = UUID.randomUUID().toString();
RecordMetadata recordMetadata = null;
try {
//将消息发送到Kafka服务器的名称为“one-more-topic”的Topic中
recordMetadata = kafkaProducer.send(new ProducerRecord<>("one-more-topic", uuid)).get();
log.info("recordMetadata: {}", recordMetadata);
log.info("uuid: {}", uuid);
} catch (Exception e) {
log.error("send fail, uuid: {}", uuid, e);
}
return uuid;
}
} 消费者
创建一个KafkaConsumer的消费者实例:
@Configuration
public class Config {
public final static String groupId = "kafka-clients-group";
public final static String bootstrapServers = "127.0.0.1:9092";
@Bean(destroyMethod = "close")
public KafkaConsumer kafkaConsumer() {
Properties props = new Properties();
//设置Kafka服务器地址
props.put("bootstrap.servers", bootstrapServers);
//设置消费组
props.put("group.id", groupId);
//设置数据key的反序列化处理类
props.put("key.deserializer", StringDeserializer.class.getName());
//设置数据value的反序列化处理类
props.put("value.deserializer", StringDeserializer.class.getName());
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
KafkaConsumer kafkaConsumer = new KafkaConsumer<>(props);
//订阅名称为“one-more-topic”的Topic的消息
kafkaConsumer.subscribe(Arrays.asList("one-more-topic"));
return kafkaConsumer;
}
} 在Controller中进行使用:
@RestController
@Slf4j
public class Controller {
@Autowired
private KafkaConsumer kafkaConsumer;
@RequestMapping("/receive")
public List receive() {
从Kafka服务器中的名称为“one-more-topic”的Topic中消费消息
ConsumerRecords records = kafkaConsumer.poll(Duration.ofSeconds(1));
List messages = new ArrayList<>(records.count());
for (ConsumerRecord record : records.records("one-more-topic")) {
String message = record.value();
log.info("message: {}", message);
messages.add(message);
}
return messages;
}
} 方式二:spring-kafka
使用kafka-clients需要我们自己创建生产者或者消费者的bean,如果我们的项目基于SpringBoot构建,那么使用spring-kafka就方便多了。
引入依赖
在pom.xml文件中,引入spring-kafka依赖:
org.springframework.kafka spring-kafka 2.3.12.RELEASE
生产者
在application.yml文件中增加配置:
spring: kafka: #Kafka服务器地址 bootstrap-servers: 127.0.0.1:9092 producer: #设置数据value的序列化处理类 value-serializer: org.apache.kafka.common.serialization.StringSerializer
在Controller中注入KafkaTemplate就可以直接使用了,代码如下:
@RestController
@Slf4j
public class Controller {
@Autowired
private KafkaTemplate template;
@RequestMapping("/springKafkaSend")
public String send() {
String uuid = UUID.randomUUID().toString();
//将消息发送到Kafka服务器的名称为“one-more-topic”的Topic中
this.template.send("one-more-topic", uuid);
log.info("uuid: {}", uuid);
return uuid;
}
} 消费者
在application.yml文件中增加配置:
spring: kafka: #Kafka服务器地址 bootstrap-servers: 127.0.0.1:9092 consumer: #设置数据value的反序列化处理类 value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
创建一个可以被Spring框架扫描到的类,并且在方法上加上@KafkaListener注解,就可以消费消息了,代码如下:
@Component
@Slf4j
public class Receiver {
@KafkaListener(topics = "one-more-topic", groupId = "spring-kafka-group")
public void listen(ConsumerRecord, ?> record) {
Optional> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
String message = (String) kafkaMessage.get();
log.info("message: {}", message);
}
}
}关于“如何使用Java实现Kafka生产者和消费者的示例”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,使各位可以学到更多知识,如果觉得文章不错,请把它分享出去让更多的人看到。
网站标题:如何使用Java实现Kafka生产者和消费者的示例-创新互联
本文网址:http://www.scyingshan.cn/article/cdhohi.html


咨询
建站咨询
