uncategorized

spring+rabbitmq+protobuf

好久没有用Java写过东西了,感觉都手生了。
最近项目中需要用到rabbitmq作为消息队列,在各个模块和服务之间发送Protobuf类型的消息。
这里记录一下简单的接入过程,以及碰到的坑。

RabbitMQ Mac上的简单安装

1
2
3
4
brew install rabbitmq
#启用RabbitMQ UI management: http://localhost:15672
/usr/local/Cellar/rabbitmq/3.6.4/sbin/rabbitmq-plugins enable rabbitmq_management
brew services restart rabbitmq

首先添加spring-boot-starter-amqp的依赖

1
2
3
4
5
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>1.4.3.RELEASE</version>
</dependency>

这里需要说明一下,1.4.3.RELEASE的版本依赖logback 1.1.8, 可能会和自己项目中的版本冲突
那么升级自己的版本,或者在这里直接exclusion掉就OK了。

Maven 依赖版本冲突的问题,可以参考之前的一篇Post

增加配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
logging.level.org.springframework.amqp=INFO
logging.level.com.patterncat=INFO
#spring.rabbitmq.dynamic 是否创建AmqpAdmin bean. 默认为: true
#spring.rabbitmq.listener.acknowledge-mode 指定Acknowledge的模式.
#spring.rabbitmq.listener.auto-startup 是否在启动时就启动mq,默认: true
#spring.rabbitmq.listener.concurrency 指定最小的消费者数量.
#spring.rabbitmq.listener.max-concurrency 指定最大的消费者数量.
#spring.rabbitmq.listener.prefetch 指定一个请求能处理多少个消息,如果有事务的话,必须大于等于transaction数量.
#spring.rabbitmq.listener.transaction-size 指定一个事务处理的消息数量,最好是小于等于prefetch的数量.
#spring.rabbitmq.requested-heartbeat 指定心跳超时,0为不指定.
#spring.rabbitmq.ssl.enabled 是否开始SSL,默认: false
#spring.rabbitmq.ssl.key-store 指定持有SSL certificate的key store的路径
#spring.rabbitmq.ssl.key-store-password 指定访问key store的密码.
#spring.rabbitmq.ssl.trust-store 指定持有SSL certificates的Trust store.
#spring.rabbitmq.ssl.trust-store-password 指定访问trust store的密码.

增加Rabbit配置

注意: 如果要自动创建queue, exchange等,必须添加RabbitAdmin, 否则会报declear queues(xxxx)的异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitMessagingTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableAutoConfiguration
public class RabbitConfig {
/**
* 设置Admin, 用于自动创建exchange, queue
* @param connectionFactory
* @return
*/
@Bean
RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
/**
* 这里主要是用于创建Queue, 如果选择手动创建的话,可以省略
* @param rabbitAdmin
* @return
**/
@Bean
Queue queueDispatch(RabbitAdmin rabbitAdmin) {
Queue queue = new Queue("dispatch-event", true);
rabbitAdmin.declareQueue(queue);
return queue;
}
/**
* 这里主要是用于创建exchange topic, 如果选择手动创建的话,可以省略
* @param rabbitAdmin
* @return
**/
@Bean
TopicExchange exchange(RabbitAdmin rabbitAdmin) {
TopicExchange topicExchange = new TopicExchange("dispatch-exchange");
rabbitAdmin.declareExchange(topicExchange);
return topicExchange;
}
/**
* 将queue和具体的exchange关联起来
*/
@Bean
Binding bindingExchangeDispatch(Queue queueDispatch, TopicExchange exchange,RabbitAdmin rabbitAdmin) {
Binding binding = BindingBuilder.bind(queueDispatch).to(exchange).with("dispatch-event");
rabbitAdmin.declareBinding(binding);
return binding;
}
/**
* 配置消息发送MessageConverter
* @param rabbitTemplate
* @return
*/
@Bean
public RabbitMessagingTemplate rabbitMessagingTemplate(RabbitTemplate rabbitTemplate) {
rabbitTemplate.setMessageConverter(new ProtobufMessageConverter());
RabbitMessagingTemplate rabbitMessagingTemplate = new RabbitMessagingTemplate();
rabbitMessagingTemplate.setRabbitTemplate(rabbitTemplate);
return rabbitMessagingTemplate;
}
/**
* 配置接收消息的MessageConverter
* @param connectionFactory
* @return
*/
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new ProtobufMessageConverter());
factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
return factory;
}
}

自定义Protobuf Message Converter

由于我的proto中包含多个Message的定义(Event, SelectMessage…)
所以选择把Message的类型作为Head一并传递到消费者, 然后根据不同类型,反序列号对应的消息体。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.MessageLite;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
import org.storm.stock.models.StockProto.*;
import java.util.HashMap;
import java.util.Map;
public final class ProtobufMessageConverter implements MessageConverter {
private static Map<String, MessageLite.Builder> builderMap;
static {
builderMap = new HashMap<>();
builderMap.put(Event.class.getSimpleName(), Event.newBuilder());
builderMap.put(SelectMessage.class.getSimpleName(), SelectMessage.newBuilder());
builderMap.put(CreateMessage.class.getSimpleName(), CreateMessage.newBuilder());
builderMap.put(ClearMessage.class.getSimpleName(), ClearMessage.newBuilder());
builderMap.put(BackForthMessage.class.getSimpleName(), BackForthMessage.newBuilder());
}
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
String messageType = object.getClass().getSimpleName();
if (!builderMap.containsKey(messageType)) {
throw new MessageConversionException("not support message type:" + messageType);
}
messageProperties.setHeader("messageType", messageType);
MessageLite messageLite = (MessageLite)object;
return new Message(messageLite.toByteArray(), messageProperties);
}
public Object fromMessage(Message message) throws MessageConversionException {
String messageType = message.getMessageProperties().getHeaders().get("messageType").toString();
if (!builderMap.containsKey(messageType)) {
throw new MessageConversionException("not support message type:" + messageType);
}
try {
MessageLite.Builder builder = builderMap.get(messageType).clear();
builder = builder.mergeFrom(message.getBody());
return builder.build();
} catch (InvalidProtocolBufferException e) {
throw new MessageConversionException("deserialize message error", e);
}
}
}

发送和接受消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.storm.stock.models.StockProto.Event;
import static org.storm.stock.common.Constant.*;
@Component
public class DispatcherEvent {
private static Logger logger = LoggerFactory.getLogger(DispatcherEvent.class);
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 任务调度部分
*/
@Scheduled(fixedDelay = 1000L)
public void trigger(){
Event event = Event.newBuilder().setType(Event.EventType.SELECT).build();
logger.info("Event send: " + event.getType().name());
rabbitTemplate.convertAndSend("dispatch-exchange", "dispatch-event", event);
}
@RabbitListener(queues = "dispatch-event")
public void dispatch(Event event){
logger.info("Event receive: " + event.getType().name());
String queueName = "";
switch (event.getType()){
//Logic for deal message
}
}
}

spring-boot启动程序

1
2
3
4
5
6
7
8
9
10
11
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling
public class Application {
public static void main(String[] args) throws InterruptedException {
SpringApplication.run(Application.class, args);
}
}