2.1 Message消息对象
在RocketMQ中,生产者使用Message
对象表示一条消息,本文对Message的属性和构造方法进行详细的讲解。
1 Message属性
Message的定义如下所示:
org.apache.rocketmq.common.message.Message
public class Message implements Serializable { private String topic; private int flag; private Map<String, String> properties; private byte[] body; private String transactionId;
其中:
topic:表示消息要到的发送主题,必填
flag:选填,消息的标记,完全由应用设置,RocketMQ不做任何处理,类似于memcached中flag的作用。
properties:消息属性,主要存储一些消息的元数据信息
body:消息的内容,这是一个字节数组,序列化方式由应用决定,例如你可以将一个json转为字节数组,也可以通过protol buffer、hessian编码转为字节数组。
transactionId:事务id,仅在事务消息中使用到
Message数据结构中各个字段都可以通过get、set方式访问,例如访问topic
msg.setTopic("TopicTest”) msg.getTopic()
2 构造方法
Message提供了以下构造方法:
public Message() public Message(String topic, byte[] body) public Message(String topic, String tags, byte[] body) public Message(String topic, String tags, String keys, byte[] body) public Message(String topic, String tags, String keys, int flag, byte[] body, boolean waitStoreMsgOK)
上述构造方法中的参数,会被设置到Message的成员变量中。可以看到部分参数在Message成员变量中并没有定义,这些参数都可以可选的,会被设置到properties中:
tags:表示消息的标签,消费者在消费时,可以根据标签进行过滤,需要注意的是,一个生产者,只能指定一个tag
keys:用于建立索引,之后可以通过命令工具/API/或者管理平台查询key,可以为一个消息设置多个key,用空格""进行分割
waitStoreMsgOK:表示发送消息后,是否需要等待消息同步刷新到磁盘上。如果broker配置为ASYNC_MASTER,那么只需要消息在master上刷新到磁盘即可;如果配置为SYNC_MASTER,那么还需要等待slave也刷新到磁盘。需要注意的是,waitStoreMsgOK默认为false,只有将设置为true的情况下,才会等待刷盘成功再返回。
除了通过构造方法给Message成员变量赋值,也可以通过相关set方法进行赋值,通过对应的get方法取值。除了可以设置上述构造方法中出现的参数,还可以设置:
public void setBuyerId(String buyerId) public void setDelayTimeLevel(int level) public void putUserProperty(final String name, final String value)
其中:
BuyerId:
DelayTimeLevel:设置消息的延迟级别,0表示不延迟,大于0会延迟特定时间才被消费
putUserProperty:自定义消息属性。前面提到tags、keys、waitStoreMsgOK等,都会设置到properties中。如果开发者有自定义消息属性的需求,可以通过此方法进行设置。
3 消息属性保留关键字
properties字段有一些保留的关键字,这些保留关键字定义在MessageConst类中,用户在自定义消息属性时,需要避开这些关键字。注意,rocketmq还提供了putProperty和setProperties两个方法,但是这里不建议用户使用,因为可能会与rocketmq保留的属性名冲突。
在MessageConst类中,定义了这些属性的key,例如生产者可能会使用到:
org.apache.rocketmq.common.message.MessageConst
public class MessageConst { //消息的key public static final String PROPERTY_KEYS = "KEYS"; //消息的tag public static final String PROPERTY_TAGS = "TAGS"; //是否waitStoreMsgOK public static final String PROPERTY_WAIT_STORE_MSG_OK = "WAIT"; //消息延迟级别 public static final String PROPERTY_DELAY_TIME_LEVEL = "DELAY"; //BUYER_ID public static final String PROPERTY_BUYER_ID = "BUYER_ID"; ...