Как имитировать повторную доставку сообщений в сценарии сеанса JMS AUTO_ACKNOWLEDGE?

В следующем тесте я пытаюсь моделировать следующий сценарий:

  • Запускается очередь сообщений.
  • Запускается пользователь, созданный для отказа при обработке сообщений.
  • Выдается сообщение.
  • Пользователь начинает обработку сообщения.
  • Во время обработки создается исключение для имитации сбоя обработки сообщений. Неудавшийся потребитель остановлен.
  • Другой пользователь запускается с намерением получить сообщение с переадресацией.

Но мой тест терпит неудачу, и сообщение не переадресуется новому потребителю. Я буду признателен за любые намеки на это.

MessageProcessingFailureAndReprocessingTest.java

@ContextConfiguration(locations="com.prototypo.queue.MessageProcessingFailureAndReprocessingTest$ContextConfig",
 loader=JavaConfigContextLoader.class)
public class MessageProcessingFailureAndReprocessingTest extends AbstractJUnit4SpringContextTests {
 @Autowired
 private FailureReprocessTestScenario testScenario;
 @Before
 public void setUp() {
 testScenario.start();
 }
 @After
 public void tearDown() throws Exception {
 testScenario.stop();
 }
 @Test public void 
 should_reprocess_task_after_processing_failure() {
 try {
 Thread.sleep(20*1000);
 assertThat(testScenario.succeedingWorker.processedTasks, is(Arrays.asList(new String[]{
 "task-1",
 })));
 } catch (InterruptedException e) {
 fail();
 }
 }
 @Configurable
 public static class FailureReprocessTestScenario {
 @Autowired
 public BrokerService broker;
 @Autowired
 public MockTaskProducer mockTaskProducer;
 @Autowired
 public FailingWorker failingWorker;
 @Autowired
 public SucceedingWorker succeedingWorker;
 @Autowired
 public TaskScheduler scheduler;
 public void start() {
 Date now = new Date();
 scheduler.schedule(new Runnable() {
 public void run() { failingWorker.start(); }
 }, now);
 Date after1Seconds = new Date(now.getTime() + 1*1000);
 scheduler.schedule(new Runnable() {
 public void run() { mockTaskProducer.produceTask(); }
 }, after1Seconds);
 Date after2Seconds = new Date(now.getTime() + 2*1000);
 scheduler.schedule(new Runnable() {
 public void run() {
 failingWorker.stop();
 succeedingWorker.start();
 }
 }, after2Seconds);
 }
 public void stop() throws Exception {
 succeedingWorker.stop();
 broker.stop();
 }
 }
 @Configuration
 @ImportResource(value={"classpath:applicationContext-jms.xml",
 "classpath:applicationContext-task.xml"})
 public static class ContextConfig {
 @Autowired
 private ConnectionFactory jmsFactory;
 @Bean
 public FailureReprocessTestScenario testScenario() {
 return new FailureReprocessTestScenario();
 }
 @Bean
 public MockTaskProducer mockTaskProducer() {
 return new MockTaskProducer();
 }
 @Bean
 public FailingWorker failingWorker() {
 TaskListener listener = new TaskListener();
 FailingWorker worker = new FailingWorker(listenerContainer(listener));
 listener.setProcessor(worker);
 return worker;
 }
 @Bean
 public SucceedingWorker succeedingWorker() {
 TaskListener listener = new TaskListener();
 SucceedingWorker worker = new SucceedingWorker(listenerContainer(listener));
 listener.setProcessor(worker);
 return worker;
 }
 private DefaultMessageListenerContainer listenerContainer(TaskListener listener) {
 DefaultMessageListenerContainer listenerContainer = new DefaultMessageListenerContainer();
 listenerContainer.setConnectionFactory(jmsFactory);
 listenerContainer.setDestinationName("tasksQueue");
 listenerContainer.setMessageListener(listener);
 listenerContainer.setAutoStartup(false);
 listenerContainer.initialize();
 return listenerContainer;
 }
 }
 public static class FailingWorker implements TaskProcessor {
 private Logger LOG = Logger.getLogger(FailingWorker.class.getName());
 private final DefaultMessageListenerContainer listenerContainer;
 public FailingWorker(DefaultMessageListenerContainer listenerContainer) {
 this.listenerContainer = listenerContainer;
 }
 public void start() {
 LOG.info("FailingWorker.start()");
 listenerContainer.start();
 }
 public void stop() {
 LOG.info("FailingWorker.stop()");
 listenerContainer.stop();
 }
 @Override
 public void processTask(Object task) {
 LOG.info("FailingWorker.processTask(" + task + ")");
 try {
 Thread.sleep(1*1000);
 throw Throwables.propagate(new Exception("Simulate task processing failure"));
 } catch (InterruptedException e) {
 LOG.log(Level.SEVERE, "Unexpected interruption exception");
 }
 }
 }
 public static class SucceedingWorker implements TaskProcessor {
 private Logger LOG = Logger.getLogger(SucceedingWorker.class.getName());
 private final DefaultMessageListenerContainer listenerContainer;
 public final List<string> processedTasks;
 public SucceedingWorker(DefaultMessageListenerContainer listenerContainer) {
 this.listenerContainer = listenerContainer;
 this.processedTasks = new ArrayList<string>();
 }
 public void start() {
 LOG.info("SucceedingWorker.start()");
 listenerContainer.start();
 }
 public void stop() {
 LOG.info("SucceedingWorker.stop()");
 listenerContainer.stop();
 }
 @Override
 public void processTask(Object task) {
 LOG.info("SucceedingWorker.processTask(" + task + ")");
 try {
 TextMessage taskText = (TextMessage) task;
 processedTasks.add(taskText.getText());
 } catch (JMSException e) {
 LOG.log(Level.SEVERE, "Unexpected exception during task processing");
 }
 }
 }
}
</string></string>

TaskListener.java

public class TaskListener implements MessageListener {
 private TaskProcessor processor;
 @Override
 public void onMessage(Message message) {
 processor.processTask(message);
 }
 public void setProcessor(TaskProcessor processor) {
 this.processor = processor;
 }
}

MockTaskProducer.java

@Configurable
public class MockTaskProducer implements ApplicationContextAware {
 private Logger LOG = Logger.getLogger(MockTaskProducer.class.getName());
 @Autowired
 private JmsTemplate jmsTemplate;
 private Destination destination;
 private int taskCounter = 0;
 public void produceTask() {
 LOG.info("MockTaskProducer.produceTask(" + taskCounter + ")");
 taskCounter++;
 jmsTemplate.send(destination, new MessageCreator() {
 @Override
 public Message createMessage(Session session) throws JMSException {
 TextMessage message = session.createTextMessage("task-" + taskCounter);
 return message;
 }
 });
 }
 @Override
 public void setApplicationContext(ApplicationContext applicationContext)
 throws BeansException {
 destination = applicationContext.getBean("tasksQueue", Destination.class);
 }
}
1 ответ

По-видимому, источник документации, который я смотрел вчера Создание надежных JMS-приложений, вводит меня в заблуждение (или, возможно, я понял это неправильно). Особенно, что выдержка:

До тех пор, пока сообщение JMS не будет подтверждено, оно не считается успешно потребляется. Успешное потребление сообщения обычно происходит в три этапа.

  • Клиент получает сообщение.
  • Клиент обрабатывает сообщение.
  • Сообщение подтверждается. Подтверждение инициируется либо поставщиком JMS, либо клиентом, в зависимости от сеанса режим подтверждения.

Я предположил, что AUTO_ACKNOWLEDGE делает именно это - подтвердил сообщение после того, как метод listener возвращает результат. Но, согласно спецификации JMS, это немного отличается, а контейнеры-слушатели Spring, как и ожидалось, не пытаются изменить поведение из спецификации JMS. Это то, что должен сказать javadoc AbstractMessageListenerContainer - я подчеркнул важные предложения:

Контейнер-слушатель предлагает следующее подтверждение сообщения Параметры:

  • "sessionAcknowledgeMode" установлен на "AUTO_ACKNOWLEDGE" (по умолчанию): Автоматическое подтверждение сообщения перед исполнением слушателя; нет redelivery в случае исключения.
  • "sessionAcknowledgeMode" установлен на "CLIENT_ACKNOWLEDGE": автоматическое подтверждение сообщения после успешного выполнения прослушивателя; нет redelivery в случае исключения.
  • "sessionAcknowledgeMode" установлен на "DUPS_OK_ACKNOWLEDGE": ленивое подтверждение сообщения во время или после исполнения слушателя; потенциал redelivery в случае исключения.
  • "sessionTransacted" установлен на "true": подтверждение транзакции после успешного выполнения прослушивателя; гарантированная доставка в случае исключения.

Итак, ключом к моему решению является listenerContainer.setSessionTransacted(true);

Еще одна проблема, с которой я столкнулся, заключалась в том, что поставщик JMS сохраняет повторное предоставление отказавшего сообщения тому же пользователю, который потерпел неудачу во время обработки сообщения. Я не знаю, дает ли спецификация JMS рецепт тому, что должен делать провайдер в таких ситуациях, но для меня работала listenerContainer.shutdown();, чтобы отключить неисправного потребителя и позволить провайдеру повторно отправлять сообщение и давать шанс другому потребителю.

licensed under cc by-sa 3.0 with attribution.