Как имитировать повторную доставку сообщений в сценарии сеанса JMS AUTO_ACKNOWLEDGE?

В следующем тесте я пытаюсь моделировать следующий сценарий:

  • Запускается очередь сообщений.
  • Запускается пользователь, созданный для отказа при обработке сообщений.
  • Выдается сообщение.
  • Пользователь начинает обработку сообщения.
  • Во время обработки создается исключение для имитации сбоя обработки сообщений. Неудавшийся потребитель остановлен.
  • Другой пользователь запускается с намерением получить сообщение с переадресацией.

Но мой тест терпит неудачу, и сообщение не переадресуется новому потребителю. Я буду признателен за любые намеки на это.

MessageProcessingFailureAndReprocessingTest.java

@ContextConfiguration(locations="com.prototypo.queue.MessageProcessingFailureAndReprocessingTest$ContextConfig", loader=JavaConfigContextLoader.class)
public class MessageProcessingFailureAndReprocessingTest extends AbstractJUnit4SpringContextTests { @Autowired private FailureReprocessTestScenario testScenario; @Before public void setUp() { testScenario.start(); } @After public void tearDown() throws Exception { testScenario.stop(); } @Test public void should_reprocess_task_after_processing_failure() { try { Thread.sleep(20*1000); assertThat(testScenario.succeedingWorker.processedTasks, is(Arrays.asList(new String[]{ "task-1", }))); } catch (InterruptedException e) { fail(); } } @Configurable public static class FailureReprocessTestScenario { @Autowired public BrokerService broker; @Autowired public MockTaskProducer mockTaskProducer; @Autowired public FailingWorker failingWorker; @Autowired public SucceedingWorker succeedingWorker; @Autowired public TaskScheduler scheduler; public void start() { Date now = new Date(); scheduler.schedule(new Runnable() { public void run() { failingWorker.start(); } }, now); Date after1Seconds = new Date(now.getTime() + 1*1000); scheduler.schedule(new Runnable() { public void run() { mockTaskProducer.produceTask(); } }, after1Seconds); Date after2Seconds = new Date(now.getTime() + 2*1000); scheduler.schedule(new Runnable() { public void run() { failingWorker.stop(); succeedingWorker.start(); } }, after2Seconds); } public void stop() throws Exception { succeedingWorker.stop(); broker.stop(); } } @Configuration @ImportResource(value={"classpath:applicationContext-jms.xml", "classpath:applicationContext-task.xml"}) public static class ContextConfig { @Autowired private ConnectionFactory jmsFactory; @Bean public FailureReprocessTestScenario testScenario() { return new FailureReprocessTestScenario(); } @Bean public MockTaskProducer mockTaskProducer() { return new MockTaskProducer(); } @Bean public FailingWorker failingWorker() { TaskListener listener = new TaskListener(); FailingWorker worker = new FailingWorker(listenerContainer(listener)); listener.setProcessor(worker); return worker; } @Bean public SucceedingWorker succeedingWorker() { TaskListener listener = new TaskListener(); SucceedingWorker worker = new SucceedingWorker(listenerContainer(listener)); listener.setProcessor(worker); return worker; } private DefaultMessageListenerContainer listenerContainer(TaskListener listener) { DefaultMessageListenerContainer listenerContainer = new DefaultMessageListenerContainer(); listenerContainer.setConnectionFactory(jmsFactory); listenerContainer.setDestinationName("tasksQueue"); listenerContainer.setMessageListener(listener); listenerContainer.setAutoStartup(false); listenerContainer.initialize(); return listenerContainer; } } public static class FailingWorker implements TaskProcessor { private Logger LOG = Logger.getLogger(FailingWorker.class.getName()); private final DefaultMessageListenerContainer listenerContainer; public FailingWorker(DefaultMessageListenerContainer listenerContainer) { this.listenerContainer = listenerContainer; } public void start() { LOG.info("FailingWorker.start()"); listenerContainer.start(); } public void stop() { LOG.info("FailingWorker.stop()"); listenerContainer.stop(); } @Override public void processTask(Object task) { LOG.info("FailingWorker.processTask(" + task + ")"); try { Thread.sleep(1*1000); throw Throwables.propagate(new Exception("Simulate task processing failure")); } catch (InterruptedException e) { LOG.log(Level.SEVERE, "Unexpected interruption exception"); } } } public static class SucceedingWorker implements TaskProcessor { private Logger LOG = Logger.getLogger(SucceedingWorker.class.getName()); private final DefaultMessageListenerContainer listenerContainer; public final List<string> processedTasks; public SucceedingWorker(DefaultMessageListenerContainer listenerContainer) { this.listenerContainer = listenerContainer; this.processedTasks = new ArrayList<string>(); } public void start() { LOG.info("SucceedingWorker.start()"); listenerContainer.start(); } public void stop() { LOG.info("SucceedingWorker.stop()"); listenerContainer.stop(); } @Override public void processTask(Object task) { LOG.info("SucceedingWorker.processTask(" + task + ")"); try { TextMessage taskText = (TextMessage) task; processedTasks.add(taskText.getText()); } catch (JMSException e) { LOG.log(Level.SEVERE, "Unexpected exception during task processing"); } } }
}
</string></string>

TaskListener.java

public class TaskListener implements MessageListener { private TaskProcessor processor; @Override public void onMessage(Message message) { processor.processTask(message); } public void setProcessor(TaskProcessor processor) { this.processor = processor; }
}

MockTaskProducer.java

@Configurable
public class MockTaskProducer implements ApplicationContextAware { private Logger LOG = Logger.getLogger(MockTaskProducer.class.getName()); @Autowired private JmsTemplate jmsTemplate; private Destination destination; private int taskCounter = 0; public void produceTask() { LOG.info("MockTaskProducer.produceTask(" + taskCounter + ")"); taskCounter++; jmsTemplate.send(destination, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { TextMessage message = session.createTextMessage("task-" + taskCounter); return message; } }); } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { destination = applicationContext.getBean("tasksQueue", Destination.class); }
}
1 ответ

По-видимому, источник документации, который я смотрел вчера Создание надежных JMS-приложений, вводит меня в заблуждение (или, возможно, я понял это неправильно). Особенно, что выдержка:

До тех пор, пока сообщение JMS не будет подтверждено, оно не считается успешно потребляется. Успешное потребление сообщения обычно происходит в три этапа.

<ul> <li> Клиент получает сообщение.</li> <li> Клиент обрабатывает сообщение.</li> <li> Сообщение подтверждается. Подтверждение инициируется либо поставщиком JMS, либо клиентом, в зависимости от сеанса режим подтверждения.</li> </ul>

Я предположил, что AUTO_ACKNOWLEDGE делает именно это - подтвердил сообщение после того, как метод listener возвращает результат. Но, согласно спецификации JMS, это немного отличается, а контейнеры-слушатели Spring, как и ожидалось, не пытаются изменить поведение из спецификации JMS. Это то, что должен сказать javadoc AbstractMessageListenerContainer - я подчеркнул важные предложения:

Контейнер-слушатель предлагает следующее подтверждение сообщения Параметры:

<ul> <li> "sessionAcknowledgeMode" установлен на "AUTO_ACKNOWLEDGE" (по умолчанию): <span> Автоматическое подтверждение сообщения перед исполнением слушателя; нет redelivery в случае исключения.</span></li> <li> "sessionAcknowledgeMode" установлен на "CLIENT_ACKNOWLEDGE": автоматическое подтверждение сообщения после успешного выполнения прослушивателя; нет redelivery в случае исключения.</li> <li> "sessionAcknowledgeMode" установлен на "DUPS_OK_ACKNOWLEDGE": ленивое подтверждение сообщения во время или после исполнения слушателя; потенциал redelivery в случае исключения.</li> <li> "sessionTransacted" установлен на "true": <span> подтверждение транзакции после успешного выполнения прослушивателя; гарантированная доставка в случае исключения.</span></li> </ul>

Итак, ключом к моему решению является listenerContainer.setSessionTransacted(true);

Еще одна проблема, с которой я столкнулся, заключалась в том, что поставщик JMS сохраняет повторное предоставление отказавшего сообщения тому же пользователю, который потерпел неудачу во время обработки сообщения. Я не знаю, дает ли спецификация JMS рецепт тому, что должен делать провайдер в таких ситуациях, но для меня работала listenerContainer.shutdown();, чтобы отключить неисправного потребителя и позволить провайдеру повторно отправлять сообщение и давать шанс другому потребителю.

licensed under cc by-sa 3.0 with attribution.