从未如此简单:10分钟带你逆袭Kafka!
final ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>("test2", 0, 1, "hello world"); producer.send(record); // 有回调函数的调用 producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { System.out.println(recordMetadata.topic()); System.out.println(recordMetadata.partition()); System.out.println(recordMetadata.offset()); } }); // 自己定义一个类 producer.send(record, new MyCallback(record)); } catch (Exception e) { result = false; } return result; } } 定义生产者发送成功的回调函数: import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.RecordMetadata;
/** * @ClassName MyCallback * @Description TODO * @Author lingxiangxiang * @Date 3:51 PM * @Version 1.0 **/ public class MyCallback implements Callback { private Object msg;
public MyCallback(Object msg) { this.msg = msg; }
@Override public void onCompletion(RecordMetadata metadata, Exception e) { System.out.println("topic = " + metadata.topic()); System.out.println("partiton = " + metadata.partition()); System.out.println("offset = " + metadata.offset()); System.out.println(msg); } } 生产者测试类:在生产者测试类中,自己遇到一个坑,就是最后自己没有加 sleep,就是怎么检查自己的代码都没有问题,但是最后就是没法发送成功消息,最后加了一个 sleep 就可以了。 因为主函数 main 已经执行完退出,但是消息并没有发送完成,需要进行等待一下。当然,你在生产环境中可能不会遇到这样问题,呵呵! 代码如下: import static java.lang.Thread.sleep;
/** * @ClassName MyKafkaProducerTest * @Description TODO * @Author lingxiangxiang * @Date 3:46 PM * @Version 1.0 **/ public class MyKafkaProducerTest { public static void main(String[] args) throws InterruptedException { MyKafkaProducer producer = new MyKafkaProducer(); boolean result = producer.sendMsg(); System.out.println("send msg " + result); sleep(1000); } } 消费者类: import kafka.utils.ShutdownableThread; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays; import java.util.Collections; import java.util.Properties;
/** * @ClassName MyKafkaConsumer * @Description TODO * @Author lingxiangxiang * @Date 4:12 PM * @Version 1.0 **/ public class MyKafkaConsumer extends ShutdownableThread {
private KafkaConsumer<Integer, String> consumer;
public MyKafkaConsumer() { super("KafkaConsumerTest", false); Properties properties = new Properties(); (编辑:好传媒网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |