4.3 RocketMQ作为外部数据源
2016-02-24 20:57:22
6,912
1
在前面已经分析在Trident中的Emmiter、Co
RocketMqEmmiter.java
import java.util.List; import org.json.simple.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import storm.trident.operation.TridentCollector; import storm.trident.spout.IPartitionedTridentSpout.Emitter; import storm.trident.spout.ISpoutPartition; import storm.trident.topology.TransactionAttempt; import backtype.storm.tuple.Values; import com.alibaba.dubbo.common.utils.LRUCache; import com.alibaba.rocketmq.client.consumer.PullResult; import com.alibaba.rocketmq.client.consumer.PullStatus; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.message.MessageExt; import com.alibaba.rocketmq.common.message.MessageQueue; import com.google.common.collect.Lists; import com.pajk.wrestling.rocketmq.RocketMqConsumer; public class RocketMQEmitter implements Emitter<List<MessageQueue>, ISpoutPartition, JSONObject> { private static final Logger logger=LoggerFactory.getLogger(RocketMQEmitter.class); //保证客户端幂等性 private volatile LRUCache<String, MessageExt> idempotentCache=null; { idempotentCache=new LRUCache<String, MessageExt>(); idempotentCache.setMaxCapacity(10000); } @Override public List<ISpoutPartition> getOrderedPartitions( List<MessageQueue> allPartitionInfo) { List<ISpoutPartition> partitions = null; if(allPartitionInfo!=null&&allPartitionInfo.size()>0){ partitions=Lists.newArrayList(); for (final MessageQueue messageQueue : allPartitionInfo) { partitions.add(new ISpoutPartition() { @Override public String getId() { return RocketMqConsumer.makeMessageQueueUniqueId(messageQueue); } }); } } if(partitions==null||partitions.size()==0){ throw new RuntimeException("partitions is null"); } String partitionsStr=mkpartitionsStr(partitions); logger.info("all partitions,{}",partitionsStr); return partitions; } private String mkpartitionsStr(List<ISpoutPartition> partitions) { StringBuilder builder=new StringBuilder(); for (ISpoutPartition iSpoutPartition : partitions) { builder.append("\n"+iSpoutPartition.getId()); } return builder.toString(); } /** * 从指定分区中获取数据,注意partition和lastPartitionMeta两个是一一对应的 * lastPartitionMeta表示的是当前传入的partition的分区元数据 */ @Override public JSONObject emitPartitionBatchNew(TransactionAttempt tx, TridentCollector collector, ISpoutPartition partition, JSONObject lastPartitionMeta) { //1、根据partitionId获取对应的消息队列 MessageQueue mq=RocketMqConsumer.getMessageQueueByUniqueId(partition.getId()); //2、获取队列的当前消费进度offset,先从lastPartitionMeta取,lastPartitionMeta=null,连接远程获取 long beginOffset = 0; if(lastPartitionMeta!=null){ Object object = lastPartitionMeta.get(RocketMqPartitionMeta.NEXT_OFFSET); if(object instanceof String){ beginOffset=Long.parseLong((String)object); }else if(object instanceof Long){ beginOffset=(Long) object; } }else{ try { logger.info("queue:{},lastPartitionMeta is null",partition.getId()); beginOffset=RocketMqConsumer.getConsumer().fetchConsumeOffset(mq, true); beginOffset=(beginOffset==-1)?0:beginOffset; } catch (MQClientException e) { logger.error("fetch queue offset error ,queue:"+mq,e); return lastPartitionMeta; } } int batchSize=0; //3、获取消息并处理 PullResult pullResult; try { pullResult = RocketMqConsumer.getConsumer().pull(mq, RocketMqConsumer.getRocketMQConfig().getTopicTag(), beginOffset, RocketMqConsumer.getRocketMQConfig().getPullBatchSize()); PullStatus pullStatus = pullResult.getPullStatus(); switch (pullStatus) { case FOUND: logger.info("queue:{},found new msgs,pull result{}:",partition.getId(),pullResult); List<MessageExt> msgFoundList = pullResult.getMsgFoundList(); if(msgFoundList!=null&&msgFoundList.size()>0){ batchSize=msgFoundList.size(); for (MessageExt messageExt : msgFoundList) { if(!idempotentCache.containsKey(messageExt.getMsgId())){ String msgContent = new String(messageExt.getBody()); collector.emit(new Values(tx,msgContent)); idempotentCache.put(messageExt.getMsgId(), messageExt); System.out.println("emit message:"+messageExt+",message content:"+msgContent); }else{ logger.warn("message {} has consumed!",messageExt); } } } break; case OFFSET_ILLEGAL://当beginOffset小于Topic队列的minOffset,会出现此问题 logger.warn("OFFSET_ILLEGAL ,Message Queue:{},pullReuslt:{},caculate beginOffset:{}",new Object[]{mq,pullResult,beginOffset}); break; case NO_NEW_MSG: break; case NO_MATCHED_MSG://当队列中存在其他Tag的消息时,出现此情况 logger.warn("May be some msg has other tag exsits in the queue:{},pull status:{}",mq,pullStatus); break; default: logger.warn("UNKNOW STATUS:{},Message Queue:{}",pullStatus,mq); break; } //不管pullStatus的状态如何,都更新ConsumeOffset RocketMqConsumer.getConsumer().updateConsumeOffset(mq, pullResult.getNextBeginOffset()); } catch (Exception e) { logger.error("pull message error,topic:"+RocketMqConsumer.getRocketMQConfig().getTopic()+",queue:"+mq,e); return lastPartitionMeta; } //4、更新PartitionMeta RocketMqPartitionMeta rocketMqPartitionMeta =new RocketMqPartitionMeta(partition.getId(), tx.toString(), beginOffset, pullResult.getNextBeginOffset()); rocketMqPartitionMeta.setBatchSize(batchSize); return rocketMqPartitionMeta; } @Override public void refreshPartitions( List<ISpoutPartition> partitionResponsibilities) { logger.info("refreshPartitions"); } @Override public void emitPartitionBatch(TransactionAttempt tx, TridentCollector collector, ISpoutPartition partition, JSONObject partitionMeta) { } @Override public void close() { } }
RocketMQCoordinator.java
import java.util.List; import storm.trident.spout.IPartitionedTridentSpout.Coordinator; import com.alibaba.rocketmq.common.message.MessageQueue; import com.pajk.wrestling.rocketmq.RocketMqConsumer; public class RocketMQCoordinator implements Coordinator<List<MessageQueue>>{ @Override public List<MessageQueue> getPartitionsForBatch() { return RocketMqConsumer.getMessageQueues(); } @Override public boolean isReady(long txid) { return RocketMqConsumer.hasNewMesage(); } @Override public void close() { } }
RocketMqPartitionMeta.java
/** * Trident使用json-simple进行序列化元数据到zookeeper * json simple只支持8中基本数据类型+List+Map+JsonAware类型 * @author TIANSHOUZHI336 * */ @SuppressWarnings("unchecked") public class RocketMqPartitionMeta extends JSONObject{ /** * */ public static final long serialVersionUID = 1003604473740641741L; public static final String QUEUE_ID="queueId"; public static final String CURRENT_OFFSET="currentOffset"; public static final String NEXT_OFFSET="nextOffset"; public static final String TRANSACTION_ID="transactionId"; public static final String OCCUR_TIME="occurTime"; public static final String BATCH_SIZE= "batchSize"; public RocketMqPartitionMeta() { super(); } public RocketMqPartitionMeta(String queueId,String transactionId,long currentOffset,long nextOffset){ this.setCurrentOffset(currentOffset); this.setNextOffset(nextOffset); this.setQueueId(queueId); this.setTransactionId(transactionId); } public void setQueueId(String queueId){ this.put(QUEUE_ID, queueId); } public String getQueueId(){ return (String) this.get(QUEUE_ID); } public void setCurrentOffset(long currentOffset){ this.put(CURRENT_OFFSET, currentOffset); } public long getCurrentOffset(){ return (Long) this.get(CURRENT_OFFSET); } public void setNextOffset(long nextOffset){ this.put(NEXT_OFFSET, nextOffset); } public long getNextOffset(){ return (Long) this.get(NEXT_OFFSET); } public void setTransactionId(String transactionId){ this.put(TRANSACTION_ID, transactionId); } public String getTransectionId(){ return (String) this.get(TRANSACTION_ID); } public void setOccurTime(long occurTime){ String datetimeStr = DateUtils.dayFormatDateTime(occurTime); this.put(OCCUR_TIME,datetimeStr); } public Date getOccurTime(){ String time=(String) this.get(OCCUR_TIME); return DateUtils.parse(time, DateUtils.DEFAULT_FORMAT); } public void setBatchSize(int batchSize){ this.put(BATCH_SIZE, batchSize); } public int getBatchSize(){ return (Integer)this.get(BATCH_SIZE); } }
RocketMqIPatitionedTridentSpout.java
import java.util.List; import java.util.Map; import org.json.simple.JSONObject; import storm.trident.spout.IPartitionedTridentSpout; import storm.trident.spout.ISpoutPartition; import backtype.storm.task.TopologyContext; import backtype.storm.tuple.Fields; import com.alibaba.rocketmq.common.message.MessageQueue; @SuppressWarnings("rawtypes") public class RocketMqIPatitionedTridentSpout implements IPartitionedTridentSpout<List<MessageQueue>, ISpoutPartition, JSONObject>{ /** * */ private static final long serialVersionUID = 4572682097086392244L; @Override //这个方法会被调用多次,不适合初始化consumer public storm.trident.spout.IPartitionedTridentSpout.Coordinator<List<MessageQueue>> getCoordinator( Map conf, TopologyContext context) { return new RocketMQCoordinator(); } @Override public storm.trident.spout.IPartitionedTridentSpout.Emitter<List<MessageQueue>, ISpoutPartition, JSONObject> getEmitter( Map conf, TopologyContext context) { return new RocketMQEmitter(); } @Override public Map getComponentConfiguration() { return null; } @Override //这个方法也会被调用多次,不是初始化 public Fields getOutputFields() { return new Fields("tId", "message"); } }
RocketMqConsumer.java
import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer; import com.alibaba.rocketmq.client.consumer.MessageQueueListener; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.message.MessageQueue; import com.google.common.collect.Sets; import com.pajk.wrestling.util.ConfigUtils; public abstract class RocketMqConsumer { private static final Logger logger=LoggerFactory.getLogger(RocketMqConsumer.class); private static DefaultMQPullConsumer pullConsumer; public static boolean initialized=false; public static RocketMQConfig rocketMQConfig; private static Map<String,MessageQueue> partitionIdQueueMap; private static List<MessageQueue> queues; static{ initRocketMqConfig(); initConsumer(); initMessageQueues(); registerMessageQueueListener(); } private static void initRocketMqConfig() { rocketMQConfig = ConfigUtils.getRocketMQConfig(); } private static void initConsumer() { pullConsumer=new DefaultMQPullConsumer(); pullConsumer.setConsumerGroup(rocketMQConfig.getGroupId()); pullConsumer.setInstanceName(rocketMQConfig.getInstanceName()); pullConsumer.setNamesrvAddr(rocketMQConfig.getNamesrvAddr()); pullConsumer.setRegisterTopics(Sets.newHashSet(rocketMQConfig.getTopic())); logger.info("rocketmq pullConsumer config:{}",rocketMQConfig); try { pullConsumer.start(); logger.info("rocketmq pullConsumer startup success!"); } catch (MQClientException e) { throw new RuntimeException("consumer start fail!",e); } Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { if (pullConsumer != null) { pullConsumer.shutdown(); logger.info("rocketmq pullConsumer shutdown success!"); } } }); } private static void initMessageQueues() { try { //因为队列可能扩容,每次开启一个新的事务(batch)时,都重新拉取一下最新的队列 Set<MessageQueue> messageQueues = RocketMqConsumer.getConsumer().fetchSubscribeMessageQueues(RocketMqConsumer.getRocketMQConfig().getTopic()); queues=new ArrayList<MessageQueue>(messageQueues); partitionIdQueueMap=new HashMap<String,MessageQueue>(); for (MessageQueue messageQueue : messageQueues) { partitionIdQueueMap.put(makeMessageQueueUniqueId(messageQueue), messageQueue); } } catch (MQClientException e) { logger.error(e.getMessage(), e); throw new RuntimeException(e); } } private static void registerMessageQueueListener() { pullConsumer.registerMessageQueueListener(rocketMQConfig.getTopic(),new MessageQueueListener() { @Override public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue> mqDivided) { initMessageQueues(); } } ); } public static DefaultMQPullConsumer getConsumer(){ return pullConsumer; } public static String makeMessageQueueUniqueId(MessageQueue messageQueue) { /*String brokerName = messageQueue.getBrokerName(); String topic = messageQueue.getTopic(); int queueId = messageQueue.getQueueId();*/ return messageQueue.getBrokerName()+"-queue-"+messageQueue.getQueueId(); } public static RocketMQConfig getRocketMQConfig(){ return rocketMQConfig; } public static List<MessageQueue> getMessageQueues(){ return queues; } public static MessageQueue getMessageQueueByUniqueId(String uniqueId){ return partitionIdQueueMap.get(uniqueId); } public static boolean hasNewMesage(){ /*try{ for (MessageQueue messageQueue : queues) { long offset = pullConsumer.fetchConsumeOffset(messageQueue, true); offset=(offset<0)?0:offset; PullResult pullResult = pullConsumer.pull(messageQueue, rocketMQConfig.getTopicTag(), offset, rocketMQConfig.getPullBatchSize()); PullStatus pullStatus = pullResult.getPullStatus(); switch (pullStatus) { case FOUND: return true; case OFFSET_ILLEGAL: pullConsumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset()); logger.warn("OFFSET_ILLEGAL,Queue:{},PullResult:{}",messageQueue,pullResult);break; case NO_NEW_MSG:break; case NO_MATCHED_MSG: logger.warn("May be some msg has other tag exsits in the queue:{},pull status:{}",messageQueue,pullStatus); break; default:break; } } }catch(Exception e){ logger.error("decide has new message error", e); }*/ return true; } }
RocketMQConfig.java
import java.io.Serializable; import org.apache.commons.lang.builder.ToStringBuilder; import org.apache.commons.lang.builder.ToStringStyle; /** * @author Von Gosling */ public class RocketMQConfig implements Serializable { private static final long serialVersionUID = 4157424979688590880L; private String namesrvAddr; /** * Unique mark for every JVM instance */ private String instanceName; /** * Group by message actor */ private String groupId; /* * Message topic */ private String topic; /** * Message topic tag */ private String topicTag; /** * Minimal consumer thread count */ private int consumeThreadMin = 20; /** * Maximal consumer thread count */ private int consumeThreadMax = 64; /** * If piled-up message exceeds this value,adjust consumer thread to max * value dynamically */ private long adjustThreadPoolNumsThreshold = 100000l; /** * Local message queue threshold, trigger flow control if exceeds this value */ private int pullThresholdForQueue = 1024; /** * The message size from server for every pull batch */ private int pullBatchSize = 32; /** * Pull interval from server for every pull */ private long pullInterval = 0; /** * Fetch message size from local queue */ private int consumeMessageBatchMaxSize = 1; /** * Consumption of local sequence, will affect performance */ private boolean ordered; /** * The max allowed failures for one single message, skip the failure message * if excesses. -1 means try again until success */ private int maxFailTimes = 5; public RocketMQConfig() { } public RocketMQConfig(String namesrvAddr,String consumerGroup, String topic, String topicTag) { super(); this.groupId = consumerGroup; this.topic = topic; this.topicTag = topicTag; this.namesrvAddr=namesrvAddr; } /** * @return the instanceName */ public String getInstanceName() { return instanceName; } /** * @param instanceName * the instanceName to set */ public void setInstanceName(String instanceName) { this.instanceName = instanceName; } /** * @return the groupId */ public String getGroupId() { return groupId; } /** * @param groupId * the groupId to set */ public void setGroupId(String groupId) { this.groupId = groupId; } /** * @return the topic */ public String getTopic() { return topic; } /** * @param topic * the topic to set */ public void setTopic(String topic) { this.topic = topic; } /** * @return the topicTag */ public String getTopicTag() { return topicTag; } /** * @param topicTag * the topicTag to set */ public void setTopicTag(String topicTag) { this.topicTag = topicTag; } /** * @return the consumeThreadMin */ public int getConsumeThreadMin() { return consumeThreadMin; } /** * @param consumeThreadMin * the consumeThreadMin to set */ public void setConsumeThreadMin(int consumeThreadMin) { this.consumeThreadMin = consumeThreadMin; } /** * @return the consumeThreadMax */ public int getConsumeThreadMax() { return consumeThreadMax; } /** * @param consumeThreadMax * the consumeThreadMax to set */ public void setConsumeThreadMax(int consumeThreadMax) { this.consumeThreadMax = consumeThreadMax; } /** * @return the adjustThreadPoolNumsThreshold */ public long getAdjustThreadPoolNumsThreshold() { return adjustThreadPoolNumsThreshold; } /** * @param adjustThreadPoolNumsThreshold * the adjustThreadPoolNumsThreshold to set */ public void setAdjustThreadPoolNumsThreshold( long adjustThreadPoolNumsThreshold) { this.adjustThreadPoolNumsThreshold = adjustThreadPoolNumsThreshold; } /** * @return the pullThresholdForQueue */ public int getPullThresholdForQueue() { return pullThresholdForQueue; } /** * @param pullThresholdForQueue * the pullThresholdForQueue to set */ public void setPullThresholdForQueue(int pullThresholdForQueue) { this.pullThresholdForQueue = pullThresholdForQueue; } /** * @return the pullBatchSize */ public int getPullBatchSize() { return pullBatchSize; } /** * @param pullBatchSize * the pullBatchSize to set */ public void setPullBatchSize(int pullBatchSize) { this.pullBatchSize = pullBatchSize; } /** * @return the pullInterval */ public long getPullInterval() { return pullInterval; } /** * @param pullInterval * the pullInterval to set */ public void setPullInterval(long pullInterval) { this.pullInterval = pullInterval; } /** * @return the consumeMessageBatchMaxSize */ public int getConsumeMessageBatchMaxSize() { return consumeMessageBatchMaxSize; } /** * @param consumeMessageBatchMaxSize * the consumeMessageBatchMaxSize to set */ public void setConsumeMessageBatchMaxSize(int consumeMessageBatchMaxSize) { this.consumeMessageBatchMaxSize = consumeMessageBatchMaxSize; } /** * @return the ordered */ public boolean isOrdered() { return ordered; } /** * @param ordered * the ordered to set */ public void setOrdered(boolean ordered) { this.ordered = ordered; } /** * @return the maxFailTimes */ public int getMaxFailTimes() { return maxFailTimes; } /** * @param maxFailTimes * the maxFailTimes to set */ public void setMaxFailTimes(int maxFailTimes) { this.maxFailTimes = maxFailTimes; } @Override public String toString() { return ToStringBuilder.reflectionToString(this, ToStringStyle.SHORT_PREFIX_STYLE); } public String getNamesrvAddr() { return namesrvAddr; } public void setNamesrvAddr(String namesrvAddr) { this.namesrvAddr = namesrvAddr; } }
ConfigUtils.java
import java.io.IOException; import java.io.InputStream; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Properties; import org.apache.commons.lang.BooleanUtils; import backtype.storm.Config; import com.pajk.wrestling.rocketmq.RocketMQConfig; /** * Utilities for RocketMQ spout regarding its configuration and reading values * from the storm configuration. * * @author Von Gosling */ public abstract class ConfigUtils { public static final String CONFIG_TOPIC = "rocketmq.spout.topic"; public static final String CONFIG_CONSUMER_GROUP = "rocketmq.spout.consumer.group"; public static final String CONFIG_TOPIC_TAG = "rocketmq.spout.topic.tag"; public static final String CONFIG_ROCKETMQ = "rocketmq.config"; public static final String CONFIG_PREFETCH_SIZE = "rocketmq.prefetch.size"; public static final String CONFIG_NAMESRV_ADDR="public.rocketmq.domain.name"; public static Properties props; static{ props = new Properties(); try { InputStream is = ConfigUtils.class.getClassLoader().getResourceAsStream("wrestling-config.properties"); props.load(is); } catch (IOException e) { throw new RuntimeException("load config ocurr error!", e); } } public static RocketMQConfig getRocketMQConfig() { String topic = props.getProperty(CONFIG_TOPIC); String consumerGroup = props.getProperty(ConfigUtils.CONFIG_CONSUMER_GROUP); String topicTag = props.getProperty(ConfigUtils.CONFIG_TOPIC_TAG); Integer pullBatchSize = Integer.parseInt(props.getProperty(ConfigUtils.CONFIG_PREFETCH_SIZE)); String nameServerAddr = props.getProperty(CONFIG_NAMESRV_ADDR); RocketMQConfig mqConfig = new RocketMQConfig(nameServerAddr,consumerGroup, topic, topicTag); try { mqConfig.setInstanceName(consumerGroup+"_"+InetAddress.getLocalHost().getHostAddress().replaceAll("\\.", "_")+"_"+System.getProperty("worker.port")); } catch (UnknownHostException e) { throw new RuntimeException(e); } if (pullBatchSize != null && pullBatchSize > 0) { mqConfig.setPullBatchSize(pullBatchSize); } boolean ordered = BooleanUtils.toBooleanDefaultIfNull( Boolean.valueOf(props.getProperty("rocketmq.spout.ordered")), false); mqConfig.setOrdered(ordered); return mqConfig; } public static Config getTopologyConfig() { Config config=new Config(); config.setNumWorkers(Integer.parseInt(props.getProperty("topology.workers"))); config.setNumAckers(Integer.parseInt(props.getProperty("topology.acker.executors"))); config.setMaxSpoutPending(Integer.parseInt(props.getProperty("topology.max.spout.pending"))); config.setMessageTimeoutSecs(Integer.parseInt(props.getProperty("topology.message.timeout.secs"))); config.put("topology.name", props.getProperty("topology.name")); config.setDebug(Boolean.parseBoolean(props.getProperty("topology.debug","false"))); return config; } public static String get(String key){ return props.getProperty(key); } public static Integer getInt(String key){ return Integer.parseInt(props.getProperty(key)); } }
wrestling-config.properties
#rocketmq configuration 139.129.97.30 public.rocketmq.domain.name=mq1.test.pajkdc.com:9876;mq2.test.pajkdc.com:9876 #public.rocketmq.domain.name=139.129.97.30:9876 rocketmq.spout.consumer.group=cmtcenter_wrestling_group rocketmq.spout.topic=CMTCENTER rocketmq.spout.topic.tag=COMMENT_CREATED rocketmq.spout.ordered=false rocketmq.prefetch.size=32 #topopology configuration topology.acker.executors=1 topology.workers=1 topology.max.spout.pending=1 topology.message.timeout.secs=30 topology.debug=true
上一篇:4.0 Trident快速入门