XA Транзакции между 2 JMS Brokers (ActiveMQ)

Я пытаюсь переместить jms-сообщения между двумя разными, удаленными, activeMQ-брокерами и после большого чтения

Я использую Atomicos, поскольку я пишу отдельное приложение, и я также использую весну, чтобы заставить все это работать.

У меня есть следующая настройка javaconfig bean

@Bean(name="atomikosSrcConnectionFactory")
 public AtomikosConnectionFactoryBean consumerXAConnectionFactory() {
 AtomikosConnectionFactoryBean consumerBean = new AtomikosConnectionFactoryBean();
 consumerBean.setUniqueResourceName("atomikosSrcConnectionFactory");
 consumerBean.setLocalTransactionMode(false);
 return consumerBean;
 }

 @Bean(name="atomikosDstConnectionFactory")
 public AtomikosConnectionFactoryBean producerXAConnectionFactory() {
 AtomikosConnectionFactoryBean producerBean = new AtomikosConnectionFactoryBean();
 producerBean.setUniqueResourceName("atomikosDstConnectionFactory");
 producerBean.setLocalTransactionMode(false);
 return producerBean;
 }

 @Bean(name="jtaTransactionManager")
 public JtaTransactionManager jtaTransactionManager() throws SystemException {
 JtaTransactionManager jtaTM = new JtaTransactionManager();
 jtaTM.setTransactionManager(userTransactionManager());
 jtaTM.setUserTransaction(userTransactionImp());
 return jtaTM;
 }

 @Bean(initMethod="init", destroyMethod="close", name="userTransactionManager")
 public UserTransactionManager userTransactionManager() {
 UserTransactionManager utm = new UserTransactionManager();
 utm.setForceShutdown(false);
 return utm;
 }

 @Bean(name="userTransactionImp")
 public UserTransactionImp userTransactionImp() throws SystemException {
 UserTransactionImp uti = new UserTransactionImp();
 uti.setTransactionTimeout(300);
 return uti;
 }

 @Bean(name="jmsContainer")
 @Lazy(value=true)
 public DefaultMessageListenerContainer jmsContainer() throws SystemException {
 DefaultMessageListenerContainer dmlc = new DefaultMessageListenerContainer();
 dmlc.setAutoStartup(false);
 dmlc.setTransactionManager(jtaTransactionManager());
 dmlc.setSessionTransacted(true);
 dmlc.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
 dmlc.setConnectionFactory(consumerXAConnectionFactory());
 dmlc.setDestinationName("srcQueue");
 return dmlc;
 }

 @Bean(name="transactedJmsTemplate")
 public JmsTemplate transactedJmsTemplate() {

 DynamicDestinationResolver dest = new DynamicDestinationResolver();

 JmsTemplate jmsTmp = new JmsTemplate(producerXAConnectionFactory());

 jmsTmp.setDeliveryPersistent(true);
 jmsTmp.setSessionTransacted(true);
 jmsTmp.setDestinationResolver(dest);
 jmsTmp.setPubSubDomain(false);
 jmsTmp.setReceiveTimeout(20000);
 jmsTmp.setExplicitQosEnabled(true);
 jmsTmp.setSessionTransacted(true);
 jmsTmp.setDefaultDestination(new ActiveMQQueue("destQueue"));
 jmsTmp.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);

 return jmsTmp;
 }

2 AtomikosConnectionFactoryBean обертывают ActiveMQXAConnectionFactory (по одному для каждого брокера) во время выполнения, прежде чем я начну DMLC.

Затем я устанавливаю простой MessageListener (который назначен для dmlc перед его запуском) со следующим методом:

public void onMessage(Message message) {
 final Message rcvedMsg = message;

 try{
 MessageCreator msgCreator = new MessageCreator(){
 public Message createMessage(Session session) throws JMSException{
 Message returnMsg = null;
 if(rcvedMsg instanceof TextMessage){
 TextMessage txtMsg = session.createTextMessage();
 txtMsg.setText(((TextMessage) rcvedMsg).getText());
 returnMsg = txtMsg;
 }
 else if(rcvedMsg instanceof BytesMessage){
 BytesMessage bytesMsg = session.createBytesMessage();
 if(!(((BytesMessage) rcvedMsg).getBodyLength() > Integer.MAX_VALUE)){
 byte[] bodyContent = new byte[(int) ((BytesMessage) rcvedMsg).getBodyLength()];
 bytesMsg.writeBytes(bodyContent);
 returnMsg = bytesMsg;
 }
 }
 return returnMsg;
 }
 };

 jmsTemplate.send(msgCreator);
 }
 catch(JmsException | JMSException e){
 logger.error("Error when transfering message: '{}'. {}",message,e);
 }
}

Приложение запускается без каких-либо конкретных ошибок или предупреждений, однако, как только я помещаю сообщение в исходную очередь, я вижу, через журналы, что метод onMessage снова и снова запускается для одного и того же сообщения, как если бы транзакция продолжалась откат назад и снова перезапущен (нигде нет ошибок).

Я также заметил, что если я использую один и тот же URL-адрес источника и назначения (имея в виду одного и того же брокера, но каждый с ним собственный connectionFactory), он работает, и сообщения передаются по назначению между исходной и целевой очередью.

Мне интересно,

  1. Что я делаю неправильно в настройке? Почему моя транзакция "по-видимому" откатывается снова и снова при использовании двух разных брокеров, но работает при использовании того же (но более двух разных фабрик подключений)?
  2. Я не полностью убежден, что onMessage в настоящее время выполняет правильную транзакцию, поскольку в настоящее время я занимаюсь всеми исключениями и ничего не делаю, и я полагаю, что это приведет к транзакции dmlc до того, как jmstemplate будет отправлена сообщение, но я не уверен. Если это так, лучше ли будет SessionAwareMessageListener? Должен ли я установить @Transacted в методе onMessage?

Может ли кто-нибудь помочь осветить проблему? Весь вход приветствуется.

ОБНОВИТЬ:

Я понял, что проблема с "откатом" связана с тем, что оба AMQ, которые я использовал, были связаны друг с другом через сеть брокеров, и я случайно использовал одно и то же имя очереди для источника и адресата. Это привело к тому, что сообщение было передано приложением от одного AMQ к другому, а затем немедленно, потому что в исходном AMQ был потребитель, сообщение будет перенесено обратно в исходный AMQ, который, в свою очередь, был замечен как новое сообщение от моего приложения и переданное снова, и цикл продолжался бесконечно. Решение, размещенное ниже, помогло с другими проблемами.

1 ответ

try {
 ... Code
} catch (JmsException je) {
 logger.error("Error when transfering message: '{}'. {}",message,e);
}

Вышеприведенный код проглатывает исключение, вы должны либо не поймать исключение, либо реконструировать, чтобы управление транзакциями могло обрабатывать его соответствующим образом. В настоящее время исключений не наблюдается, совершается фиксация, которая может привести к странным результатам.

Я бы сделал что-то вроде следующего: JmsException - из Spring и, как и большинство исключений Spring, RuntimeException. Просто перетащите, также, чтобы зарегистрировать исключение stacktrace, правильно удалить второй {} в вашей записи журнала.

try {
 ... Code
} catch (JmsException je) {
 logger.error("Error when transfering message: '{}'.",message,e);
 throw je;
}

Однако это приведет к дублированию журнала, поскольку Spring также зарегистрирует ошибку.

Для JMSException сделайте что-то вроде этого, превратив его в JmsException.

try {
 ... Code
} catch (JMSException je) {
 logger.error("Error when transfering message: '{}'.",message,e);
 throw JmsUtils.convertJmsAccessException(je);
}

Чтобы получить дополнительную информацию о том, что произойдет, вы, вероятно, захотите включить ведение журнала DEBUG для пакета org.springframework.jms. Это даст вам представление о том, что происходит при отправке/получении сообщения.

Другое дело, что вы используете транзакционные сессии и ручное подтверждение сообщений, однако вы не делаете message.acknowledge() в своем коде. Весна не вызывается из-за транзакции JTA. Вместо этого попробуйте переключить его на SESSION_TRANSACTED. По крайней мере, для DMLC.

licensed under cc by-sa 3.0 with attribution.