4.3 RocketMQ作为外部数据源

2016-02-24 20:57:22 6,912 1

在前面已经分析在Trident中的Emmiter、Co

RocketMqEmmiter.java

import java.util.List;

import org.json.simple.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import storm.trident.operation.TridentCollector;
import storm.trident.spout.IPartitionedTridentSpout.Emitter;
import storm.trident.spout.ISpoutPartition;
import storm.trident.topology.TransactionAttempt;
import backtype.storm.tuple.Values;

import com.alibaba.dubbo.common.utils.LRUCache;
import com.alibaba.rocketmq.client.consumer.PullResult;
import com.alibaba.rocketmq.client.consumer.PullStatus;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.google.common.collect.Lists;
import com.pajk.wrestling.rocketmq.RocketMqConsumer;

public class RocketMQEmitter implements Emitter<List<MessageQueue>, ISpoutPartition, JSONObject> {
	private static final Logger logger=LoggerFactory.getLogger(RocketMQEmitter.class);
	//保证客户端幂等性
	private volatile LRUCache<String, MessageExt> idempotentCache=null;
	{
		idempotentCache=new LRUCache<String, MessageExt>();
		idempotentCache.setMaxCapacity(10000);
	}
	
	@Override
	public List<ISpoutPartition> getOrderedPartitions(
			List<MessageQueue> allPartitionInfo) {
		List<ISpoutPartition> partitions = null;
		if(allPartitionInfo!=null&&allPartitionInfo.size()>0){
			partitions=Lists.newArrayList();
			for (final MessageQueue messageQueue : allPartitionInfo) {
				partitions.add(new ISpoutPartition() {
					@Override
					public String getId() {
						return RocketMqConsumer.makeMessageQueueUniqueId(messageQueue);
					}
				});
			}
		}
		if(partitions==null||partitions.size()==0){
			throw new RuntimeException("partitions is null");
		}
		String partitionsStr=mkpartitionsStr(partitions);
		logger.info("all partitions,{}",partitionsStr);
		return partitions;
	}

	private String mkpartitionsStr(List<ISpoutPartition> partitions) {
		StringBuilder builder=new StringBuilder();
		for (ISpoutPartition iSpoutPartition : partitions) {
			builder.append("\n"+iSpoutPartition.getId());
		}
		return builder.toString();
	}

	/**
	 * 从指定分区中获取数据,注意partition和lastPartitionMeta两个是一一对应的
	 * lastPartitionMeta表示的是当前传入的partition的分区元数据
	 */
	@Override
	public JSONObject emitPartitionBatchNew(TransactionAttempt tx,
			TridentCollector collector, ISpoutPartition partition,
			JSONObject lastPartitionMeta) {
		
		//1、根据partitionId获取对应的消息队列
		MessageQueue mq=RocketMqConsumer.getMessageQueueByUniqueId(partition.getId());
		
		//2、获取队列的当前消费进度offset,先从lastPartitionMeta取,lastPartitionMeta=null,连接远程获取
		long beginOffset = 0;
		if(lastPartitionMeta!=null){
			Object object = lastPartitionMeta.get(RocketMqPartitionMeta.NEXT_OFFSET);
			if(object instanceof String){
				beginOffset=Long.parseLong((String)object);
			}else if(object instanceof Long){
				beginOffset=(Long) object;
			}
		}else{
			try {
				logger.info("queue:{},lastPartitionMeta is null",partition.getId());
				beginOffset=RocketMqConsumer.getConsumer().fetchConsumeOffset(mq, true);
				beginOffset=(beginOffset==-1)?0:beginOffset;
			} catch (MQClientException e) {
				logger.error("fetch queue offset error ,queue:"+mq,e);
				return lastPartitionMeta;
			}
		}
		int batchSize=0;
		//3、获取消息并处理
		PullResult pullResult;
		try {
			pullResult = RocketMqConsumer.getConsumer().pull(mq, RocketMqConsumer.getRocketMQConfig().getTopicTag(), beginOffset, RocketMqConsumer.getRocketMQConfig().getPullBatchSize());
			PullStatus pullStatus = pullResult.getPullStatus();
			switch (pullStatus) {
			case FOUND:
				logger.info("queue:{},found new msgs,pull result{}:",partition.getId(),pullResult);
				List<MessageExt> msgFoundList = pullResult.getMsgFoundList();
				if(msgFoundList!=null&&msgFoundList.size()>0){
					batchSize=msgFoundList.size();
					for (MessageExt messageExt : msgFoundList) {
						if(!idempotentCache.containsKey(messageExt.getMsgId())){
							String msgContent = new String(messageExt.getBody());
							collector.emit(new Values(tx,msgContent));
							idempotentCache.put(messageExt.getMsgId(), messageExt);
							System.out.println("emit message:"+messageExt+",message content:"+msgContent);
						}else{
							logger.warn("message {} has consumed!",messageExt);
						}
						
					}
				}
				break;
			case OFFSET_ILLEGAL://当beginOffset小于Topic队列的minOffset,会出现此问题
				logger.warn("OFFSET_ILLEGAL ,Message Queue:{},pullReuslt:{},caculate beginOffset:{}",new Object[]{mq,pullResult,beginOffset});
				break;
			case NO_NEW_MSG:
				break;
			case NO_MATCHED_MSG://当队列中存在其他Tag的消息时,出现此情况
				logger.warn("May be some msg has other tag exsits in the queue:{},pull status:{}",mq,pullStatus);
				break;
			default:
				logger.warn("UNKNOW STATUS:{},Message Queue:{}",pullStatus,mq);
				break;
			}
			//不管pullStatus的状态如何,都更新ConsumeOffset
			RocketMqConsumer.getConsumer().updateConsumeOffset(mq, pullResult.getNextBeginOffset());
		} catch (Exception e) {
			logger.error("pull message error,topic:"+RocketMqConsumer.getRocketMQConfig().getTopic()+",queue:"+mq,e);
			return lastPartitionMeta;
		}
		
		//4、更新PartitionMeta
		RocketMqPartitionMeta rocketMqPartitionMeta
			=new RocketMqPartitionMeta(partition.getId(), tx.toString(), beginOffset, pullResult.getNextBeginOffset());
		rocketMqPartitionMeta.setBatchSize(batchSize);
		return rocketMqPartitionMeta;
	}


	@Override
	public void refreshPartitions(
			List<ISpoutPartition> partitionResponsibilities) {
		logger.info("refreshPartitions");
		
	}

	@Override
	public void emitPartitionBatch(TransactionAttempt tx,
			TridentCollector collector, ISpoutPartition partition,
			JSONObject partitionMeta) {
	}

	@Override
	public void close() {
		
	}
}

RocketMQCoordinator.java

import java.util.List;

import storm.trident.spout.IPartitionedTridentSpout.Coordinator;

import com.alibaba.rocketmq.common.message.MessageQueue;
import com.pajk.wrestling.rocketmq.RocketMqConsumer;

public class RocketMQCoordinator implements Coordinator<List<MessageQueue>>{

	@Override
	public List<MessageQueue> getPartitionsForBatch() {
		return RocketMqConsumer.getMessageQueues();
	}

	@Override
	public boolean isReady(long txid) {
		return RocketMqConsumer.hasNewMesage();
	}

	@Override
	public void close() {
		
	}
	
}

RocketMqPartitionMeta.java

/**
 * Trident使用json-simple进行序列化元数据到zookeeper
 * json simple只支持8中基本数据类型+List+Map+JsonAware类型
 * @author TIANSHOUZHI336
 *
 */
@SuppressWarnings("unchecked")
public class RocketMqPartitionMeta extends JSONObject{
	
	/**
	 * 
	 */
	public static final long serialVersionUID = 1003604473740641741L;
	public static final String QUEUE_ID="queueId";
	public static final String CURRENT_OFFSET="currentOffset";
	public static final String NEXT_OFFSET="nextOffset";
	public static final String TRANSACTION_ID="transactionId";
	public static final String OCCUR_TIME="occurTime";
	public static final String BATCH_SIZE= "batchSize";
	public RocketMqPartitionMeta() {
		super();
	}
	
	public RocketMqPartitionMeta(String queueId,String transactionId,long currentOffset,long nextOffset){
		this.setCurrentOffset(currentOffset);
		this.setNextOffset(nextOffset);
		this.setQueueId(queueId);
		this.setTransactionId(transactionId);
	}

	public void setQueueId(String queueId){
		this.put(QUEUE_ID, queueId);
	}
	
	public String getQueueId(){
		return (String) this.get(QUEUE_ID);
	}
	
	public void setCurrentOffset(long currentOffset){
		this.put(CURRENT_OFFSET, currentOffset);
	}
	
	public long getCurrentOffset(){
		return (Long) this.get(CURRENT_OFFSET);
	}
	
	public void setNextOffset(long nextOffset){
		this.put(NEXT_OFFSET, nextOffset);
	}
	
	public long getNextOffset(){
		return (Long) this.get(NEXT_OFFSET);
	}
	
	public void setTransactionId(String transactionId){
		this.put(TRANSACTION_ID, transactionId);
	}
	
	public String getTransectionId(){
		return (String) this.get(TRANSACTION_ID);
	}
	
	public void setOccurTime(long occurTime){
		String datetimeStr = DateUtils.dayFormatDateTime(occurTime);
		this.put(OCCUR_TIME,datetimeStr);
	}
	
	public Date getOccurTime(){
		String time=(String) this.get(OCCUR_TIME);
		return DateUtils.parse(time, DateUtils.DEFAULT_FORMAT);
	}
	
	public void setBatchSize(int batchSize){
		this.put(BATCH_SIZE, batchSize);
	}
	public int getBatchSize(){
		return (Integer)this.get(BATCH_SIZE);
	}
	
}

RocketMqIPatitionedTridentSpout.java

import java.util.List;
import java.util.Map;

import org.json.simple.JSONObject;

import storm.trident.spout.IPartitionedTridentSpout;
import storm.trident.spout.ISpoutPartition;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;

import com.alibaba.rocketmq.common.message.MessageQueue;

@SuppressWarnings("rawtypes")
public class RocketMqIPatitionedTridentSpout implements IPartitionedTridentSpout<List<MessageQueue>, ISpoutPartition, JSONObject>{
	/**
	 * 
	 */
	private static final long serialVersionUID = 4572682097086392244L;
	
	@Override
	//这个方法会被调用多次,不适合初始化consumer
	public storm.trident.spout.IPartitionedTridentSpout.Coordinator<List<MessageQueue>> getCoordinator(
			Map conf, TopologyContext context) {
		return new RocketMQCoordinator();
	}

	@Override
	public storm.trident.spout.IPartitionedTridentSpout.Emitter<List<MessageQueue>, ISpoutPartition, JSONObject> getEmitter(
			Map conf, TopologyContext context) {
		return new RocketMQEmitter();
	}

	
	@Override
	public Map getComponentConfiguration() {
		
		return null;
	}

	@Override
	//这个方法也会被调用多次,不是初始化
	public Fields getOutputFields() {
		
		return new Fields("tId", "message");
	}
}

RocketMqConsumer.java

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;
import com.alibaba.rocketmq.client.consumer.MessageQueueListener;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.google.common.collect.Sets;
import com.pajk.wrestling.util.ConfigUtils;

public abstract class RocketMqConsumer {
	
	private static final Logger logger=LoggerFactory.getLogger(RocketMqConsumer.class);
	private static DefaultMQPullConsumer pullConsumer;
	public static boolean initialized=false;
	public static RocketMQConfig rocketMQConfig;
	private static Map<String,MessageQueue> partitionIdQueueMap;
	private static List<MessageQueue> queues;
	
	static{
		initRocketMqConfig();
		initConsumer();
		initMessageQueues();
		registerMessageQueueListener();
	}
	
	private static void initRocketMqConfig() {
		rocketMQConfig = ConfigUtils.getRocketMQConfig();
	}
	
	private static void initConsumer() {
		pullConsumer=new DefaultMQPullConsumer();
		pullConsumer.setConsumerGroup(rocketMQConfig.getGroupId());
		pullConsumer.setInstanceName(rocketMQConfig.getInstanceName());
		pullConsumer.setNamesrvAddr(rocketMQConfig.getNamesrvAddr());
		pullConsumer.setRegisterTopics(Sets.newHashSet(rocketMQConfig.getTopic()));
		
		logger.info("rocketmq pullConsumer config:{}",rocketMQConfig);
		
		try {
			pullConsumer.start();
			logger.info("rocketmq pullConsumer startup success!");
		} catch (MQClientException e) {
			throw new RuntimeException("consumer start fail!",e);
		}
		
		Runtime.getRuntime().addShutdownHook(new Thread() {

			@Override
			public void run() {
				if (pullConsumer != null) {
					pullConsumer.shutdown();
					logger.info("rocketmq pullConsumer shutdown success!");
				}
			}
			
		});
	}
	
	private static void initMessageQueues() {
		try {
			//因为队列可能扩容,每次开启一个新的事务(batch)时,都重新拉取一下最新的队列
			Set<MessageQueue> messageQueues = RocketMqConsumer.getConsumer().fetchSubscribeMessageQueues(RocketMqConsumer.getRocketMQConfig().getTopic());
			queues=new ArrayList<MessageQueue>(messageQueues);
			partitionIdQueueMap=new HashMap<String,MessageQueue>();
			for (MessageQueue messageQueue : messageQueues) {
				partitionIdQueueMap.put(makeMessageQueueUniqueId(messageQueue), messageQueue);
			}
		} catch (MQClientException e) {
			logger.error(e.getMessage(), e);
			throw new RuntimeException(e);
		}
		
	}
	
	private static void registerMessageQueueListener() {
		pullConsumer.registerMessageQueueListener(rocketMQConfig.getTopic(),new MessageQueueListener() {
			
			@Override
			public void messageQueueChanged(String topic, Set<MessageQueue> mqAll,
					Set<MessageQueue> mqDivided) {
				initMessageQueues();
			}
		} );
	}
	
	public static DefaultMQPullConsumer getConsumer(){
		return pullConsumer;
	}
	
	

	
	public static String makeMessageQueueUniqueId(MessageQueue messageQueue) {
		/*String brokerName = messageQueue.getBrokerName();
		String topic = messageQueue.getTopic();
		int queueId = messageQueue.getQueueId();*/
		return messageQueue.getBrokerName()+"-queue-"+messageQueue.getQueueId();
		
	}
	
	
	public static RocketMQConfig getRocketMQConfig(){
		return rocketMQConfig;
	}
	
	public static List<MessageQueue> getMessageQueues(){
		return queues;
	}
	
	public static MessageQueue getMessageQueueByUniqueId(String uniqueId){
		return partitionIdQueueMap.get(uniqueId);
	}
	
	public static boolean hasNewMesage(){
		/*try{
			for (MessageQueue messageQueue : queues) {
				long offset = pullConsumer.fetchConsumeOffset(messageQueue, true);
				offset=(offset<0)?0:offset;
				PullResult pullResult = pullConsumer.pull(messageQueue, rocketMQConfig.getTopicTag(), offset, rocketMQConfig.getPullBatchSize());
				PullStatus pullStatus = pullResult.getPullStatus();
				switch (pullStatus) {
				case FOUND:
					return true;
				case OFFSET_ILLEGAL:
					pullConsumer.updateConsumeOffset(messageQueue, pullResult.getNextBeginOffset());
					logger.warn("OFFSET_ILLEGAL,Queue:{},PullResult:{}",messageQueue,pullResult);break;
				case NO_NEW_MSG:break;
				case NO_MATCHED_MSG:
					logger.warn("May be some msg has other tag exsits in the queue:{},pull status:{}",messageQueue,pullStatus);
					break;
				default:break;
				}
			}
		}catch(Exception e){
			logger.error("decide has new message error", e);
		}*/
		return true;
		
	}
}

RocketMQConfig.java

import java.io.Serializable;

import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;

/**
 * @author Von Gosling
 */
public class RocketMQConfig implements Serializable {
	private static final long serialVersionUID = 4157424979688590880L;
	private String namesrvAddr;
	/**
	 * Unique mark for every JVM instance
	 */
	private String instanceName;
	/**
	 * Group by message actor
	 */
	private String groupId;
	/*
	 * Message topic
	 */
	private String topic;
	/**
	 * Message topic tag
	 */
	private String topicTag;
	/**
	 * Minimal consumer thread count
	 */
	private int consumeThreadMin = 20;
	/**
	 * Maximal consumer thread count
	 */
	private int consumeThreadMax = 64;
	/**
	 * If piled-up message exceeds this value,adjust consumer thread to max
	 * value dynamically
	 */
	private long adjustThreadPoolNumsThreshold = 100000l;
	/**
	 * Local message queue threshold, trigger flow control if exceeds this value
	 */
	private int pullThresholdForQueue = 1024;
	/**
	 * The message size from server for every pull batch
	 */
	private int pullBatchSize = 32;
	/**
	 * Pull interval from server for every pull
	 */
	private long pullInterval = 0;
	/**
	 * Fetch message size from local queue
	 */
	private int consumeMessageBatchMaxSize = 1;
	/**
	 * Consumption of local sequence, will affect performance
	 */
	private boolean ordered;
	/**
	 * The max allowed failures for one single message, skip the failure message
	 * if excesses. -1 means try again until success
	 */
	private int maxFailTimes = 5;

	public RocketMQConfig() {
	}

	public RocketMQConfig(String namesrvAddr,String consumerGroup, String topic, String topicTag) {
		super();
		this.groupId = consumerGroup;
		this.topic = topic;
		this.topicTag = topicTag;
		this.namesrvAddr=namesrvAddr;
	}

	/**
	 * @return the instanceName
	 */
	public String getInstanceName() {
		return instanceName;
	}

	/**
	 * @param instanceName
	 *            the instanceName to set
	 */
	public void setInstanceName(String instanceName) {
		this.instanceName = instanceName;
	}

	/**
	 * @return the groupId
	 */
	public String getGroupId() {
		return groupId;
	}

	/**
	 * @param groupId
	 *            the groupId to set
	 */
	public void setGroupId(String groupId) {
		this.groupId = groupId;
	}

	/**
	 * @return the topic
	 */
	public String getTopic() {
		return topic;
	}

	/**
	 * @param topic
	 *            the topic to set
	 */
	public void setTopic(String topic) {
		this.topic = topic;
	}

	/**
	 * @return the topicTag
	 */
	public String getTopicTag() {
		return topicTag;
	}

	/**
	 * @param topicTag
	 *            the topicTag to set
	 */
	public void setTopicTag(String topicTag) {
		this.topicTag = topicTag;
	}

	/**
	 * @return the consumeThreadMin
	 */
	public int getConsumeThreadMin() {
		return consumeThreadMin;
	}

	/**
	 * @param consumeThreadMin
	 *            the consumeThreadMin to set
	 */
	public void setConsumeThreadMin(int consumeThreadMin) {
		this.consumeThreadMin = consumeThreadMin;
	}

	/**
	 * @return the consumeThreadMax
	 */
	public int getConsumeThreadMax() {
		return consumeThreadMax;
	}

	/**
	 * @param consumeThreadMax
	 *            the consumeThreadMax to set
	 */
	public void setConsumeThreadMax(int consumeThreadMax) {
		this.consumeThreadMax = consumeThreadMax;
	}

	/**
	 * @return the adjustThreadPoolNumsThreshold
	 */
	public long getAdjustThreadPoolNumsThreshold() {
		return adjustThreadPoolNumsThreshold;
	}

	/**
	 * @param adjustThreadPoolNumsThreshold
	 *            the adjustThreadPoolNumsThreshold to set
	 */
	public void setAdjustThreadPoolNumsThreshold(
			long adjustThreadPoolNumsThreshold) {
		this.adjustThreadPoolNumsThreshold = adjustThreadPoolNumsThreshold;
	}

	/**
	 * @return the pullThresholdForQueue
	 */
	public int getPullThresholdForQueue() {
		return pullThresholdForQueue;
	}

	/**
	 * @param pullThresholdForQueue
	 *            the pullThresholdForQueue to set
	 */
	public void setPullThresholdForQueue(int pullThresholdForQueue) {
		this.pullThresholdForQueue = pullThresholdForQueue;
	}

	/**
	 * @return the pullBatchSize
	 */
	public int getPullBatchSize() {
		return pullBatchSize;
	}

	/**
	 * @param pullBatchSize
	 *            the pullBatchSize to set
	 */
	public void setPullBatchSize(int pullBatchSize) {
		this.pullBatchSize = pullBatchSize;
	}

	/**
	 * @return the pullInterval
	 */
	public long getPullInterval() {
		return pullInterval;
	}

	/**
	 * @param pullInterval
	 *            the pullInterval to set
	 */
	public void setPullInterval(long pullInterval) {
		this.pullInterval = pullInterval;
	}

	/**
	 * @return the consumeMessageBatchMaxSize
	 */
	public int getConsumeMessageBatchMaxSize() {
		return consumeMessageBatchMaxSize;
	}

	/**
	 * @param consumeMessageBatchMaxSize
	 *            the consumeMessageBatchMaxSize to set
	 */
	public void setConsumeMessageBatchMaxSize(int consumeMessageBatchMaxSize) {
		this.consumeMessageBatchMaxSize = consumeMessageBatchMaxSize;
	}

	/**
	 * @return the ordered
	 */
	public boolean isOrdered() {
		return ordered;
	}

	/**
	 * @param ordered
	 *            the ordered to set
	 */
	public void setOrdered(boolean ordered) {
		this.ordered = ordered;
	}

	/**
	 * @return the maxFailTimes
	 */
	public int getMaxFailTimes() {
		return maxFailTimes;
	}

	/**
	 * @param maxFailTimes
	 *            the maxFailTimes to set
	 */
	public void setMaxFailTimes(int maxFailTimes) {
		this.maxFailTimes = maxFailTimes;
	}

	@Override
	public String toString() {
		return ToStringBuilder.reflectionToString(this,
				ToStringStyle.SHORT_PREFIX_STYLE);
	}

	public String getNamesrvAddr() {
		return namesrvAddr;
	}

	public void setNamesrvAddr(String namesrvAddr) {
		this.namesrvAddr = namesrvAddr;
	}

}

ConfigUtils.java

import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Properties;

import org.apache.commons.lang.BooleanUtils;

import backtype.storm.Config;

import com.pajk.wrestling.rocketmq.RocketMQConfig;

/**
 * Utilities for RocketMQ spout regarding its configuration and reading values
 * from the storm configuration.
 * 
 * @author Von Gosling
 */
public abstract class ConfigUtils {
	
	public static final String CONFIG_TOPIC = "rocketmq.spout.topic";
	public static final String CONFIG_CONSUMER_GROUP = "rocketmq.spout.consumer.group";
	public static final String CONFIG_TOPIC_TAG = "rocketmq.spout.topic.tag";

	public static final String CONFIG_ROCKETMQ = "rocketmq.config";

	public static final String CONFIG_PREFETCH_SIZE = "rocketmq.prefetch.size";
	
	public static final String CONFIG_NAMESRV_ADDR="public.rocketmq.domain.name";
	
	public static Properties props;
	
	static{
		 props = new Properties();
		try {
			InputStream is = ConfigUtils.class.getClassLoader().getResourceAsStream("wrestling-config.properties");
			props.load(is);
		} catch (IOException e) {
			throw new RuntimeException("load config ocurr error!", e);
		}
	}
	
	public static RocketMQConfig getRocketMQConfig() {
		String topic = props.getProperty(CONFIG_TOPIC);
		String consumerGroup = props.getProperty(ConfigUtils.CONFIG_CONSUMER_GROUP);
		String topicTag = props.getProperty(ConfigUtils.CONFIG_TOPIC_TAG);
		Integer pullBatchSize = Integer.parseInt(props.getProperty(ConfigUtils.CONFIG_PREFETCH_SIZE));
		String nameServerAddr = props.getProperty(CONFIG_NAMESRV_ADDR);
		RocketMQConfig mqConfig = new RocketMQConfig(nameServerAddr,consumerGroup, topic, topicTag);
		try {
			mqConfig.setInstanceName(consumerGroup+"_"+InetAddress.getLocalHost().getHostAddress().replaceAll("\\.", "_")+"_"+System.getProperty("worker.port"));
		} catch (UnknownHostException e) {
			throw new RuntimeException(e);
		}
		if (pullBatchSize != null && pullBatchSize > 0) {
			mqConfig.setPullBatchSize(pullBatchSize);
		}

		boolean ordered = BooleanUtils.toBooleanDefaultIfNull(
				Boolean.valueOf(props.getProperty("rocketmq.spout.ordered")), false);
		mqConfig.setOrdered(ordered);
		return mqConfig;
	}

	public static Config getTopologyConfig() {
		Config config=new Config();
		config.setNumWorkers(Integer.parseInt(props.getProperty("topology.workers")));
		config.setNumAckers(Integer.parseInt(props.getProperty("topology.acker.executors")));
		config.setMaxSpoutPending(Integer.parseInt(props.getProperty("topology.max.spout.pending")));
		config.setMessageTimeoutSecs(Integer.parseInt(props.getProperty("topology.message.timeout.secs")));
		config.put("topology.name", props.getProperty("topology.name"));
		config.setDebug(Boolean.parseBoolean(props.getProperty("topology.debug","false")));
		return config;
	}
	
	public static String get(String key){
		return props.getProperty(key);
	}
	
	public static Integer getInt(String key){
		return Integer.parseInt(props.getProperty(key));
	}
}

wrestling-config.properties

#rocketmq configuration 139.129.97.30
public.rocketmq.domain.name=mq1.test.pajkdc.com:9876;mq2.test.pajkdc.com:9876
#public.rocketmq.domain.name=139.129.97.30:9876
rocketmq.spout.consumer.group=cmtcenter_wrestling_group
rocketmq.spout.topic=CMTCENTER
rocketmq.spout.topic.tag=COMMENT_CREATED
rocketmq.spout.ordered=false
rocketmq.prefetch.size=32

#topopology configuration
topology.acker.executors=1
topology.workers=1
topology.max.spout.pending=1
topology.message.timeout.secs=30
topology.debug=true