high-level consumer 一种high-level版本,比较简单不用关心offset, 会自动的读zookeeper中该Consumer group的last offset
如果consumer比partition多,是浪费,因为kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数  
如果consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀 
如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同  
增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化  
High-level接口中获取不到数据的时候是会block的1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Properties props = new  Properties();
props.put("auto.offset.reset" , "smallest" ); 
props.put("zookeeper.connect" , "localhost:2181" );
props.put("group.id" , "dashcam" );
props.put("zookeeper.session.timeout.ms" , "400" );
props.put("zookeeper.sync.time.ms" , "200" );
props.put("auto.commit.interval.ms" , "1000" );
ConsumerConfig conf = new  ConsumerConfig(props);
ConsumerConnector consumer = 
		kafka.consumer.Consumer.createJavaConsumerConnector(conf);
String topic = "page_visits" ;
Map<String, Integer> topicCountMap = new  HashMap<String, Integer>();
topicCountMap.put(topic, new  Integer(1 ));
Map<String, List<KafkaStream<byte [], byte []>>> consumerMap = 
		consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte [], byte []>> streams = consumerMap.get(topic);
KafkaStream<byte [], byte []> stream = streams.get(0 ); 
ConsumerIterator<byte [], byte []> it = stream.iterator();
while  (it.hasNext()){
    System.out.println("message: "  + new  String(it.next().message()));
}
 
 
查看消息consume情况1
2
3
4
5
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker \
		--group clog-writer --zookeeper xxxxxx:2181
Group         Topic         Pid  Offset      log Size    Lag   Owner
clog-writer   dashcam       0    52009776    52009861   85    writer-14599
clog-writer   dashcam.env   0    10381       10381      0     writer-1459
 
 
关键就是offset,logSize和Lag
重置offset1
2
3
4
5
6
7
8
bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK \
		earliest config/consumer.properties dashcam
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker 
		\--group clog-writer --zookeeper xxxxxx:2181
Group        Topic         Pid  Offset   log Size    Lag        Owner
clog-writer  dashcam       0    0        52009861   52009861   writer-14599
clog-writer  dashcam.env   0    10381    10381      0          writer-1459
 
 
3个参数,
可以看到offset已经被清0,Lag=logSize
low-level consumer 当然用这个接口是有代价的,即partition,broker,offset对你不再透明,需要自己去管理这些,并且还要handle broker leader的切换1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private  PartitionMetadata findLeader (List<String> a_seedBrokers,  
	int  a_port, String a_topic, int  a_partition) {
    PartitionMetadata returnMetaData = null ;
    
    for  (String seed : a_seedBrokers) { 
        SimpleConsumer consumer = null ;
        try  {
            
            consumer = new  SimpleConsumer(seed, a_port, 100000 , 
            				64  * 1024 , "leaderLookup" );
            List<String> topics =Collections.singletonList(a_topic);
            TopicMetadataRequest req = new  TopicMetadataRequest(topics);
            
            kafka.javaapi.TopicMetadataResponse resp = consumer.send(req); 
            
            List<TopicMetadata> metaData = resp.topicsMetadata(); 
            for  (TopicMetadata item : metaData) {
            	
                for (PartitionMetadata part : item.partitionsMetadata())
                {
                	
                    if  (part.partitionId() == a_partition) { 
                        returnMetaData = part;
                        break  loop; 
                    }
                }
            }
        } catch  (Exception e) {
            System.out.println("Error communicating with Broker ["  +
            			seed + "] to find Leader for ["  + a_topic
                    + ", "  + a_partition + "] Reason: "  + e);
        } finally  {
            if  (consumer != null ) consumer.close();
        }
    }
    return  returnMetaData;
}
然后,找到负责该partition的broker leader,从而找到存有该partition副本的那个broker
TopicAndPartition就是对topic和partition信息的封装
不要认为起始的offset一定是0,因为messages会过期,被删除1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public  long  getLastOffset (SimpleConsumer consumer,  
	String topic, int  partition, long  whichTime, String clientName){
        TopicAndPartition topicAndPartition = 
        			new  TopicAndPartition(topic, partition);
        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = 
        new  HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
        
        requestInfo.put(topicAndPartition, 
        		new  PartitionOffsetRequestInfo(whichTime, 1 )); 
        kafka.javaapi.OffsetRequest request = 
        		new  kafka.javaapi.OffsetRequest(requestInfo, 
                	kafka.api.OffsetRequest.CurrentVersion(),
                	clientName);
        
        OffsetResponse response = consumer.getOffsetsBefore(request); 
 
        if  (response.hasError()) {
            System.out.println("Error fetching data Offset Data the"  + 
            	"Broker. Reason: " +response.errorCode(topic, partition) );
            return  0 ;
        }
        
        long [] offsets = response.offsets(topic, partition); 
         
        return  offsets[0 ];
    }
再者,自己去写request并fetch数据1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
FetchRequest req = new  FetchRequestBuilder()
        .clientId(clientName)
        .addFetch(a_topic, a_partition, 
        	readOffset, 100000 ) 
        .build();
FetchResponse fetchResponse = consumer.fetch(req);
 
if  (fetchResponse.hasError()) {
        
}
numErrors = 0 ;
 
long  numRead = 0 ;
for  (MessageAndOffset messageAndOffset : 
			fetchResponse.messageSet(a_topic, a_partition)) {
    long  currentOffset = messageAndOffset.offset();
    
    if  (currentOffset < readOffset) {
        System.out.println("Found an old offset: "  
        		+ currentOffset + " Expecting: "  + readOffset);
        continue ;
    }
    readOffset = messageAndOffset.nextOffset(); 
    ByteBuffer payload = messageAndOffset.message().payload();
 
    byte [] bytes = new  byte [payload.limit()];
    payload.get(bytes);
    System.out.println(String.valueOf(messageAndOffset.offset()) + ": "  
    		+ new  String(bytes, "UTF-8" ));
    numRead++;
}
 
if  (numRead == 0 ) {
    try  {
        Thread.sleep(1000 );
    } catch  (InterruptedException ie) {
    }
}
Error handling1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
if  (fetchResponse.hasError()) {
     numErrors++;
     
     short  code = fetchResponse.errorCode(a_topic, a_partition);
     System.out.println("Error fetching data from the Broker:"  
     		+ leadBroker + " Reason: "  + code);
     if  (numErrors > 5 ) break ;
 
 	
     if  (code == ErrorMapping.OffsetOutOfRangeCode())  {
         
         
         readOffset = getLastOffset(consumer,a_topic, a_partition, 
         		kafka.api.OffsetRequest.LatestTime(), clientName);
         continue ;
     }
     consumer.close();
     consumer = null ;
     
     leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port); 
     continue ;
 }
find new leader1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private  String findNewLeader (String a_oldLeader, String a_topic,  
	int  a_partition, int  a_port) throws  Exception {
       for  (int  i = 0 ; i < 3 ; i++) {
           boolean  goToSleep = false ;
           PartitionMetadata metadata = 
           		findLeader(m_replicaBrokers, 
           			a_port, a_topic, a_partition);
           if  (metadata == null ) {
               goToSleep = true ;
           } else  if  (metadata.leader() == null ) {
               goToSleep = true ;
           } else  if  (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) 
           				&& i == 0 ) {
               
               
               
               
               goToSleep = true ;
           } else  {
               return  metadata.leader().host();
           }
           if  (goToSleep) {
               try  {
                   Thread.sleep(1000 );
               } catch  (InterruptedException ie) {
               }
           }
     }
    System.out.println("Unable to find new leader after Broker failure. "  +
       		"Exiting" );
    throw  new  Exception("Unable to find new leader after Broker failure."  + 
       		"Exiting" );
   }