3.1 Java API

2016-03-02 20:56:37 12,243 2

kafka在( org.apache.kafka.clients)包中引入了新的java客户端。这是老的scala客户端的替代,但是为了兼容性它们会共存一段时间。这些客户端分布在独立的jar文件当中,并且依赖很少。

java版的客户端是在产品环境下经过测试的,并且比老的scala客户端更快,支持更多的特性。你可以通过在maven项目的pom.xml文件中引入如下依赖来在项目中引入这个客户端:

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.9.0.1</version>
    </dependency>
    <!-- 由于新版的客户端没有引入日志框架实现的依赖,所以我们要自己引入 -->
      <dependency>
          <groupId>ch.qos.logback</groupId>
          <artifactId>logback-core</artifactId>
          <version>1.1.3</version>
      </dependency>
      <dependency>
          <groupId>ch.qos.logback</groupId>
          <artifactId>logback-classic</artifactId>
          <version>1.1.1</version>
      </dependency>

一、Producer 相关API

1)、ProducerRecord<K, V>

表示由Producer发送的消息。其中K,V分别表示的一条消息的key与value。

key是可选的。在没有指定key发送到哪个partition的情况下,producer会计算key的hash值,从而确定这条消息发送到哪个分区。如果partition和key都没有指定,那么producer将会使用round-robin的方式发送消息到分区。

ProducerRecord的构造方法有:

 public ProducerRecord(String topic, Integer partition, K key, V value) {
        if (topic == null)
            throw new IllegalArgumentException("Topic cannot be null");
        this.topic = topic;
        this.partition = partition;
        this.key = key;
        this.value = value;
    }
 public ProducerRecord(String topic, K key, V value) {
        this(topic, null, key, value);
    }
 public ProducerRecord(String topic, V value) {
        this(topic, null, value);
    }

其实最终调用的都是第一个构造方法,topic参数用于指定消息发送到哪个Topic,必填的。partition指的是消息发送哪个分区,可以为null,如果这个参数指定了,那么key将不会起作用。value就是消息的实际内容。

2) RecordMetadata

用于表示消息发送后,服务端接收到消息的确认信息。包括这个消息发送到了那个topic下的那个partition,偏移量offset是多少等。其定义如下:

public final class RecordMetadata {

    private final long offset;//表示偏移量
    private final TopicPartition topicPartition;//维护了topic和partition的信息
    ...
    }

3)Producer<K, V>

用于表示消息的发送者。K、V的含义与ProducerRecord<K,V>相同。

该接口定义如下:

public interface Producer<K, V> extends Closeable {

    /**
     *异步发送一条信息,返回Future对象,其中包含了服务端的响应信息RecordMetadata
     */
    public Future<RecordMetadata> send(ProducerRecord<K, V> record);

    /**
     * 发送一条消息,当服务端确认后,执行指定的回调函数
     */
    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
    
    /**
     * 如果producer有很多消息要发送,那么producer可能会产生消息堆积,调用这个方法会发送所有堆积的消息,在这个
     *方法执行完成之前,线程一直会阻塞
     */
    public void flush();

    /**
     * 获取一个Topic下的所有分区信息,因为分区的数量可能会改变,因此不应该缓存这个分区信息的内容
     */
    public List<PartitionInfo> partitionsFor(String topic);

    /**
     * 监控相关
     */
    public Map<MetricName, ? extends Metric> metrics();

    /**
     * 关闭,等待所有消息发送完
     */
    public void close();

    /**
     * 指定时间内要关闭完成,如果没有完成,强制关闭,不管有没有消息还没发送
     */
    public void close(long timeout, TimeUnit unit);

}

Producer有两个实现类,KafkaProducerMockProducer。其中,在实际开发中,我们用的都是KafkaProducer来发送消息,MockProducer从名字就可以看出来是用于测试。

4)PartitionInfo

用于表示一个Topic下面的分区信息,其接口定义如下:

public class PartitionInfo {

    private final String topic;//这个分区属于哪个Topic
    private final int partition;//这个分区的编号
    private final Node leader;//这个分区的leader位于哪一个节点上
    private final Node[] replicas;//这个分区配置的备份因子信息
    private final Node[] inSyncReplicas;//实际生效的本分因子信息
    ...
    }

这个类在我们发送消息的时候实际上并没有什么用。但是如果我们自己要写一个kafka的监控程序的时候,可能会需要使用到这个类来获取到相关元数据。

5)Serializer类与Deserializer

Serializer类的作用是指定消息中key或者value的序列化方式。其实现类包括:

QQ截图20160302002333.png

因为kafka最终都是将消息中的内容变成二进制数据进行存储的。而我们发送的消息可能是字符串,也可能是一个整数,因此转换成字节数组的方式也是不同的。这个接口中定义的最重要的方法就是

/**
     * @param topic topic associated with data
     * @param data typed data
     * @return serialized bytes
     */
    public byte[] serialize(String topic, T data);

实际上因为在序列化的时候,我们只关注数据本身,也只需要序列化数据,topic参数实际上是没用的。我们可以看一下StringSerializer的实现:

@Override
    public byte[] serialize(String topic, String data) {
        try {
            if (data == null)
                return null;
            else
                return data.getBytes(encoding);
        } catch (UnsupportedEncodingException e) {
            throw new SerializationException("Error when serializing string to byte[] due to
             unsupported encoding " + encoding);
        }
    }

我们可以看到topic这个参数并没有用到。

对于String数据,其转换成字节数据的方式是通过getBytes方法,而对于Long或者Integer类型的数据,其转换成字节数据是通过byteValue()方法。这也是这么多种Serializer存在的原因。

对于每一种Serializer都有一个Deserializer,通常我们是在Producer端指定Serializer,在Consumer端指定Deserializer,相信这个非常容易理解,唯一要注意的是,二者必须是要对应的,否则可能会出现反序列化失败的情况。


6)MsgProducer.java

在讲解了这么多概念性的内容之后,我们来实际的编写一个可以向kafka集群发送消息的Producer。

package com.tianshouzhi.kafka;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class MsgProducer {
    public static void main(String[] args) {
         Properties props = new Properties();
         props.put("bootstrap.servers", "115.28.65.149:2181");
         props.put("acks", "all");
         props.put("retries", 0);
         props.put("batch.size", 16384);
         props.put("linger.ms", 1);
         props.put("buffer.memory", 33554432);
         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
         Producer<String, String> producer = new KafkaProducer<>(props);
         for(int i = 0; i < 100; i++)
             producer.send(new ProducerRecord<String, String>("test", Integer.toString(i), Integer.toString(i)));

         producer.close();
    }
}

这个类中唯一值得我们关注的可能就是Properties中的配置信息了,这些配置项有的是必须指定的(没有默认值),有的是有默认值可以不指定的,如果指定了就覆盖默认值。关于如何配置Producer,我们在后文会详细讲解,这里只特别介绍一个:bootstrap.servers。这个配置项的值接受kafka集群中broker的访问地址。官方的解释是:这是Kafka客户端在初始化的情况下用来连接kafka集群的,而且必须指定,不过也不需要列出集群中所有的broker地址,这个参数只是在启动的时候用一下,获取到连接之后,会自动获取到集群中所有broker的信息。个人觉得这里有点奇怪,因为既然kafka是基于zookeeper的集群,那么为什么不直接连接zookeeper地址,获取到broker地址的连接信息呢?因为这样才算是彻底的将客户端与服务端解耦。也许kafka作者是有其用意的吧,不过笔者暂时未能领会为什么要这样设置。

二、Consumer 相关API

关于Comsumer相关的API,因为有了Producer的对比,Conusmer端的API是非常容易理解的。

消息的接受者,用接口Conusmer表示,其有两个实现类KafkaConsumerMockConsumer(用于测试)。消息的本身使用ConsumerRecordsConsumerRecord表示。

1)KafkaConsumer.java

public class KafkaConsumer<K, V> implements Consumer<K, V>  {
   
    /**
     * 获取当前分配到这个consumer的分区信息。如果是通过调用assign(List<TopicPartition> partitions)方式进行订阅,
     *那么将返回相同的分区信息。
     *如果调用的是subscribe(List<String> topics,..)方式订阅消息,
     *那么将会返回当前topic自动分配给这个consumer的分区信息(可能为空)
     */
    public Set<TopicPartition> assignment(){..}

    /**
     * 返回当前consumer订阅的Topic(一个Consumer可以订阅多个Topic)
     */
    public Set<String> subscription(){..}

    /**
     *内部调用了subscribe(List<String> topics, ConsumerRebalanceListener callback)
     */
    public void subscribe(List<String> topics);
     /**
     * 传入要订阅的Topic(可以有多个),动态分配partition。
     *注意Topic的订阅是不能累加的,也就是说,如果当前已经订阅了其他的topic,那么调用此方法会替换原来订阅的topic。
     *这也意味着:assign(List<TopicPartition> partitions)和本方法是不能联合使用的。
     *如果传入的参数为空,那么相当于unsubscribe()的作用
     *作为group管理的一部分,将会持续跟踪属于同一个group的consumers,并且在以下事件触发时,引发rebalence操作:
     * 1、任何一个topic下的分区数量发生变化
     * 2、Topic创建或者删除
     * 3、consumer group中的某个consumer instance死掉
     * 4、一个新的consumer加入当前consumer group(通过join)
     */
    public void subscribe(List<String> topics, ConsumerRebalanceListener callback){..}

    /**
     * 手工指定订阅的Topic和partition信息
     */
    public void assign(List<TopicPartition> partitions){..}

    /**
    * 支持使用正则表达式的方式订阅Topic
    */
    public void subscribe(Pattern pattern, ConsumerRebalanceListener callback){..}

    /**
     * 取消订阅(assgin方式和subscribe的订阅都会被取消)
     */
    public void unsubscribe(){..}

    /**
     * 从topic中主动拉取消息,如果没有订阅任何topic就调用这个方法,会报错。
     *从poll名字既可以看出拉取的方式是基于轮询,轮询topic中的每个分区查看有没有最新消息。
     *在每一次轮询时,consumer都会使用上一次的消费的offset作为本次轮询的start offset。
     *上一次消费offset可以通过seek(TopicPartition partition, long offset)方法手工设定,或者自动设置
     */
    public ConsumerRecords<K, V> poll(long timeout){..}

    /**
     *内部调用了commitSync(Map<TopicPartition, OffsetAndMetadata> offsets)
     */
    public void commitSync(){..}

    /**
     * 同步提交上一次poll时,当前consumer订阅的所有topic的所有partition的消费偏移量(offset)信息
     * 这个消费偏移量信息是存储在kafka集群上的。当consumer每次rebalence或者启动的时候,都会使用到这个偏移量
     * 例如:lastProcessedMessageOffset + 1.
     */
    public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets){..}

    /**
     * 内部调用了commitAsync(OffsetCommitCallback callback)
     */
    public void commitAsync(){..}

    /**
     *异步提交上一次poll时,当前consumer订阅的所有topic的所有partition的消费偏移量(offset)信息
     */
    public void commitAsync(OffsetCommitCallback callback){..}

    /**
     * 异步提交指定的topic的某些partition的消费偏移量(offset)信息
     */
    public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback){..}

    /**
     * 手工指定消费进度,在下一次poll时,将会使用指定的进度进行消费,慎用。
     */
    public void seek(TopicPartition partition, long offset){..}

    /**
     * 从指定分区的开头进行消费,
     *这个方法不会立即执行,要等到下一次poll(long)或者position(TopicPartition)方法执行时,才会调用。
     */
    public void seekToBeginning(TopicPartition... partitions){..}

    /**
     * 设置指定分区所有消息已经消费完
     *这个方法不会立即执行,要等到下一次poll(long)或者position(TopicPartition)方法执行时,才会调用。
     */
    public void seekToEnd(TopicPartition... partitions){..}

    /**
     * 获取指定分区中下一次开始消费的偏移量
     */
    public long position(TopicPartition partition){..}

    /**
     * 获取指定分区最后一次提交的消费进度(不论这个offset是否由当前进程提交)
     *如果传入的参数代表的分区没有分配给当前consumer,或者consumer没有缓存当前分区的消费进度信息,这个方法可能会阻塞进行远程调用
     */
    public OffsetAndMetadata committed(TopicPartition partition){..}

    /**
     * 获取由当前Consumer持有的metrics信息
     */
    public Map<MetricName, ? extends Metric> metrics(){..}

    /**
     * 获取一个Topic的所有分区信息,如果本地没有缓存的话,需要进行远程调用
     */
    public List<PartitionInfo> partitionsFor(String topic){..}

    /**
     * 获取当前用户有权限查看的所有的topic的所有分区信息
     */
    public Map<String, List<PartitionInfo>> listTopics(){..}

    /**
     * 暂停对某些分区消息的订阅,当再下一次poll时,不会消费这些分区的消息,直到调用 resume(TopicPartition)后,重新进行消费
     * 这个方法不会触发rebalence
     */
    public void pause(TopicPartition... partitions){..}

    /**
     * 恢复对某个partiton消息的订阅,如果这个partition之前没有被pause,这个方法不会执行任何操作
     */
    public void resume(TopicPartition... partitions){..}

    /**
     * 关闭consumer,等待一些操作完成。例如如果auto-commit是支持的,将会提交当前消费的offset
     */
    public void close(){..}

    /**
     * 唤醒操作,如果一个consumer的poll需要消耗很长时间,可以使用这个方法进行终止。
     */
    public void wakeup(){..}

}

2) ConsumerRecords与ConsumerRecord

ConsumerRecords是一个容器,Consumer.poll(long) 的返回值就是ConsumerRecords

ConsumerRecords内部包含了poll时获取到每个topic partition的多条ConsumerRecord(表示一条消息)

public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {
    private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
     ...
     }

ConsumerRecord比ProducerRecord多了一个字段offset,表示当前消息在topic partition中的偏移量    

public final class ConsumerRecord<K, V> {
    private final String topic;
    private final int partition;
    private final long offset;
    private final K key;
    private final V value;
     ...
     }

 3) Consumer使用案例MsgConsumer.java

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class MsgConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "115.28.65.149:9092");
        props.put("group.id", "testConsumer");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("test"));
        Runtime.getRuntime().addShutdownHook(new Thread(){
            @Override
            public void run() {
                consumer.close();
            }
            
        });
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s",
                        record.offset(), record.key(), record.value());
        }
        
    }
}

这段代码也没啥有特点的地方,就是从指定的Topic中消费消息,称之为订阅(subscribe),一个consumer可以同时订阅多个Topic。

Conusmer消费消息的时候,使用poll(long timeout)即拉取的方式

上一篇:3.0 Kafka Java客户端 下一篇:3.2 Scala Api