Overview

之前的这篇博客ubuntu14.04单机安装配置zookeeper和kafka,介绍了zookeeperkafka的安装配置,并在命令行下验证了生产者消费者可以跑通。但是实际项目中,需要和java交互,不可能接触到命令行和后台的。本文旨在记录一下javakafka的简单交互,web中道理相同,只不过程序入口换成了action

1.新建项目配置环境

打开eclipse,依次点击Window→Preferences→Java→Build Path→User Libraries,然后在右边选择New
,添加一个自己常用的Library,我命名为kafka。选中kafka,右边选择Add External JARS,然后到之前安装好的kafka的目录,找到libs这个文件夹,如果按照上次配置好的情况,这里应该是15jar文件,见下图:

2016-07-12 17:16:06屏幕截图.png

全部选中,点击确定。这样,我们以后就可以复用了。

然后我们在eclipse中构建一个普通的java项目testKafka。右击项目,依次点击Build Path→Add Libraries→User Library,选择kafka这个library,点击Finish。这样环境就搭建好了。

2.生产者消费者程序

下面编码测试程序,即消息生产者和消息消费者。

2.1 生产者

package testKafka;

import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class MsgProducer {
    private static Producer<String,String> producer;
    private final Properties props=new Properties();
    public MsgProducer(){
        //定义连接的broker list
        props.put("metadata.broker.list", "127.0.0.1:9092");
        //定义序列化类,Java中对象传输之前要序列化
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        producer = new Producer<String, String>(new ProducerConfig(props));
    }
    public static void main(String[] args) {
        MsgProducer mProducer=new MsgProducer();
        //定义topic
        String topic="testkafka";
        
        //定义要发送给topic的消息
        String mString = "Hello kafka!";
                
        //构建消息对象
        KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, mString);
 
        //推送消息到broker
        producer.send(data);
        producer.close();
    }
}

这里需要注意,生产者这里,最少需要两个配置项:metadata.broker.list127.0.0.1:9092serializer.class设置为kafka.serializer.StringEncoder。打开上次配置的producer.properties文件,看到这两项配置分别为metadata.broker.list=localhost:9092serializer.class=kafka.serializer.DefaultEncoderbroker list要一致,否则会报错。
这些项,最好写在配置文件里,方便以后添加服务器时候更改。

2.2 消费者

package testKafka;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class MsgConsumer {
    private final ConsumerConnector consumer;
    private final String topic;

    public MsgConsumer(String zookeeper, String groupId, String topic) {
        Properties props = new Properties();
        //定义连接zookeeper信息
        props.put("zookeeper.connect", zookeeper);
        //定义Consumer所有的groupID
        props.put("group.id", groupId);
        props.put("zookeeper.session.timeout.ms", "500");
        props.put("zookeeper.sync.time.ms", "250");
        props.put("auto.commit.interval.ms", "1000");
        consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
        this.topic = topic;
    }

    public void testConsumer() {
        Map<String, Integer> topicCount = new HashMap<String, Integer>();
        //定义订阅topic数量
        topicCount.put(topic, new Integer(1));
        //返回的是所有topic的Map
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
        //取出我们要需要的topic中的消息流
        List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
        for (final KafkaStream stream : streams) {
            ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();
            while (consumerIte.hasNext()) {
                System.out.println(new String(consumerIte.next().message()));
            }
        }
        if (consumer != null) {
            consumer.shutdown();
        }
            
    }

    public static void main(String[] args) {
        String topic = "testkafka";
        MsgConsumer mConsumer = new MsgConsumer("127.0.0.1:2181", "test-consumer-group", topic);
        mConsumer.testConsumer();
    }

}

这里需要注意,消费者的配置信息,应该和生产者对应。最关键的配置是两项:zookeeper.connectgroup.id。这两项打开consumer.properties就可以看到。

3. 测试

首先,要在命令行中启动zookeeperkafka

在消费者程序里面,运行一下,Console框显示如下:

log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.

这个不是错误信息,不用理睬。
接着在生产者那里,运行一下,Console框显示如下:

log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
log4j:WARN Please initialize the log4j system properly.
Hello kafka!

这样,我们的程序就跑通了。
这里主要参考了kafka官方例子:生产者消费者