2.1 Message消息对象

2019-12-26 23:27:14 13,224 8

在RocketMQ中,生产者使用Message对象表示一条消息,本文对Message的属性和构造方法进行详细的讲解。

1 Message属性

Message的定义如下所示:

org.apache.rocketmq.common.message.Message

public class Message implements Serializable {
    private String topic;
    private int flag;
    private Map<String, String> properties;
    private byte[] body;
    private String transactionId;

其中:

    topic:表示消息要到的发送主题,必填

    flag:选填,消息的标记,完全由应用设置,RocketMQ不做任何处理,类似于memcached中flag的作用。

    properties:消息属性,主要存储一些消息的元数据信息

    body:消息的内容,这是一个字节数组,序列化方式由应用决定,例如你可以将一个json转为字节数组,也可以通过protol buffer、hessian编码转为字节数组。

    transactionId:事务id,仅在事务消息中使用到 

Message数据结构中各个字段都可以通过get、set方式访问,例如访问topic

msg.setTopic("TopicTest”)
msg.getTopic()


2 构造方法

Message提供了以下构造方法:

public Message()
public Message(String topic, byte[] body)
public Message(String topic, String tags, byte[] body)
public Message(String topic, String tags, String keys, byte[] body)
public Message(String topic, String tags, String keys, int flag, byte[] body, boolean waitStoreMsgOK)

上述构造方法中的参数,会被设置到Message的成员变量中。可以看到部分参数在Message成员变量中并没有定义,这些参数都可以可选的,会被设置到properties中:

tags:表示消息的标签,消费者在消费时,可以根据标签进行过滤,需要注意的是,一个生产者,只能指定一个tag

keys:用于建立索引,之后可以通过命令工具/API/或者管理平台查询key,可以为一个消息设置多个key,用空格""进行分割

waitStoreMsgOK:表示发送消息后,是否需要等待消息同步刷新到磁盘上。如果broker配置为ASYNC_MASTER,那么只需要消息在master上刷新到磁盘即可;如果配置为SYNC_MASTER,那么还需要等待slave也刷新到磁盘。需要注意的是,waitStoreMsgOK默认为false,只有将设置为true的情况下,才会等待刷盘成功再返回。

除了通过构造方法给Message成员变量赋值,也可以通过相关set方法进行赋值,通过对应的get方法取值。除了可以设置上述构造方法中出现的参数,还可以设置:

public void setBuyerId(String buyerId)
public void setDelayTimeLevel(int level)
public void putUserProperty(final String name, final String value)

其中:

BuyerId:

DelayTimeLevel:设置消息的延迟级别,0表示不延迟,大于0会延迟特定时间才被消费

putUserProperty:自定义消息属性。前面提到tags、keys、waitStoreMsgOK等,都会设置到properties中。如果开发者有自定义消息属性的需求,可以通过此方法进行设置。 


3 消息属性保留关键字

properties字段有一些保留的关键字,这些保留关键字定义在MessageConst类中,用户在自定义消息属性时,需要避开这些关键字。注意,rocketmq还提供了putProperty和setProperties两个方法,但是这里不建议用户使用,因为可能会与rocketmq保留的属性名冲突。 

在MessageConst类中,定义了这些属性的key,例如生产者可能会使用到:

org.apache.rocketmq.common.message.MessageConst

public class MessageConst {
    //消息的key
    public static final String PROPERTY_KEYS = "KEYS";
    //消息的tag
    public static final String PROPERTY_TAGS = "TAGS";
    //是否waitStoreMsgOK
    public static final String PROPERTY_WAIT_STORE_MSG_OK = "WAIT";
    //消息延迟级别
    public static final String PROPERTY_DELAY_TIME_LEVEL = "DELAY";
    //BUYER_ID
    public static final String PROPERTY_BUYER_ID = "BUYER_ID";
    ...

        

上一篇:1.4 集群搭建 下一篇:2.5 延迟消息