Как сделать сокет UDP заменять старые сообщения (еще не recv() 'd), когда новые прибыли?

Во-первых, немного контекста, чтобы объяснить, почему я нахожусь на пути "выборки UDP": Я бы хотел, чтобы образцы данных производились с высокой скоростью в течение неизвестного периода времени. Данные, которые я хочу попробовать, находятся на другой машине, чем тот, который потребляет данные. У меня есть выделенное соединение Ethernet между ними, поэтому пропускная способность не является проблемой. Проблема в том, что машина, потребляющая данные, намного медленнее, чем тот, который ее производит. Дополнительное ограничение состоит в том, что, хотя я уверен, что я не получаю все образцы (они просто образцы), обязательно, чтобы я получил last.

Мое первое решение состояло в том, чтобы заставить производителя данных отправить дейтаграмму UDP для каждого произведенного образца и позволить потребителю данных попробовать получить образцы, которые он мог бы, и позволить остальным отбрасываться слоем сокета, когда сокет UDP заполнен. Проблема с этим решением заключается в том, что, когда новые датаграммы UDP поступают и сокет заполняется, это дейтаграммы new, которые отбрасываются, а не старые. Поэтому я не гарантирую, что буду последним!

Мой вопрос: есть ли способ сделать UDP-сокет заменять старые датаграммы при появлении новых?

В настоящее время приемник является машиной Linux, но в будущем это может измениться в пользу другой unix-подобной ОС (возможны окна, поскольку они реализуют сокеты BSD, но менее вероятно) Идеальное решение будет использовать широко распространенные механизмы (например, setsockopt() s).

PS: Я думал о других решениях, но они более сложны (предполагают сильную модификацию отправителя), поэтому я бы хотел, чтобы у меня был определенный статус на возможность того, что я прошу!:)

Обновления: - Я знаю, что ОС на принимающей машине может обрабатывать сетевую нагрузку + сборку трафика, генерируемого отправителем. Просто его поведение по умолчанию заключается в отбрасывании новых датаграмм, когда буфер сокета заполнен. И из-за времени обработки в процессе получения, я знаю, что он станет полным, что я делаю (потеря половины памяти в буфере сокета не является опцией:)). - Я действительно хотел бы избежать того, чтобы вспомогательный процесс выполнял то, что ОС могла сделать при пакетной диспетчеризации, а ресурс отходов просто копировал сообщения в SHM. - Проблема, которую я вижу при изменении отправителя, заключается в том, что код, к которому у меня есть доступ, является просто функцией PleaseSendThisData(), он не знает, что это может быть последний раз, когда он вызван до долгого времени, поэтому я не см. любые выполнимые трюки с этой целью... но я открыт для предложений!:)

Если действительно нет способа изменить поведение приема UDP в соке BSD, тогда хорошо... просто скажите мне, я готов принять эту ужасную правду и начну работать над решением "вспомогательного процесса", когда я вернитесь к нему:)

5 ответов

Это будет сложно получить полностью прямо на стороне слушателя, так как он действительно может пропустить последний пакет в чипе сетевого интерфейса, что позволит вашей программе когда-либо иметь возможность увидеть ее.

Код UDP операционной системы был бы лучшим местом для решения этой проблемы, поскольку он будет получать новые пакеты, даже если он решит их отменить, поскольку он уже слишком много в очереди. Тогда это могло бы принять решение о сбросе старого или опустить новый, но я не знаю, как сказать, что это то, что вы хотели бы сделать.

Вы можете попытаться разобраться с этим в ресивере, имея одну программу или поток, которые всегда пытаются прочитать в новейшем пакете, а другой, который всегда пытается получить этот самый новый пакет. Как это сделать будет отличаться в зависимости от того, были ли вы сделаны как две отдельные программы или как два потока.

В качестве потоков вам понадобится мьютекс (семафор или что-то в этом роде) для защиты указателя (или ссылки) до структуры, используемой для хранения 1 полезной нагрузки UDP и любого другого, что вы хотели там (размер, IP-адрес отправителя, порт отправителя, метка времени и т.д.).

В потоке, который действительно читает пакеты из сокета, будет храниться пакетная информация в структуре, приобретается мьютекс, защищающий этот указатель, заменяется текущим указателем на указатель на только что созданную структуру, выдается мьютекс, сигнализирует процессор который он должен что-то делать, а затем очистить структуру, в которой он только что получил указатель и использовать его для хранения следующего пакета, который входит.

Поток, который фактически обрабатывает полезную нагрузку пакета, должен ждать по сигналу от другого потока и/или периодически (500 мс или около того, вероятно, является хорошей отправной точкой для этого, но вы решили) и перехватить мьютекс, поменяйте его указатель на структуру полезной нагрузки UDP с той, которая есть, отпустите мьютекс, а затем, если структура имеет какие-либо пакетные данные, она должна обработать ее, а затем ждать следующего сигнала. Если у него не было никаких данных, он должен просто идти вперед и ждать следующего сигнала.

Поток процессора, вероятно, должен работать с меньшим приоритетом, чем приемник UDP, чтобы слушатель с меньшей вероятностью когда-либо пропускал пакет. При обработке последнего пакета (тот, который вам действительно нужен) процессор не будет прерван, потому что нет новых пакетов для прослушивателя.

Вы можете расширить это, используя очередь, а не только один указатель, как место замены для двух потоков. Единственный указатель - это всего лишь очередь длиной 1 и очень легко обрабатывается.

Вы также можете расширить это, пытаясь обнаружить поток прослушивателя, если ожидают несколько пакетов, и только на самом деле помещают последнюю из них в очередь для потока процессора. Как вы это сделаете, будет отличаться платформа, но если вы используете * nix, тогда это должно возвращать 0 для сокетов без ожидания:

while (keep_doing_this()) {
 ssize_t len = read(udp_socket_fd, my_udp_packet->buf, my_udp_packet->buf_len); 
 // this could have been recv or recvfrom
 if (len < 0) {
 error();
 }
 int sz;
 int rc = ioctl(udp_socket_fd, FIONREAD, &sz);
 if (rc < 0) {
 error();
 }
 if (!sz) {
 // There aren't any more packets ready, so queue up the one we got
 my_udp_packet->current_len = len;
 my_udp_packet = swap_udp_packet(my_ucp_packet);
 /* swap_udp_packet is code you would have to write to implement what I talked
 about above. */
 tgkill(this_group, procesor_thread_tid, SIGUSR1);
 } else if (sz > my_udp_packet->buf_len) {
 /* You could resize the buffer for the packet payload here if it is too small.*/
 }
}

udp_packet должен быть выделен для каждого потока, а также для указателя подкачки. Если вы используете очередь для обмена, то для каждой позиции в очереди должно быть достаточно udp_packets - поскольку указатель - это всего лишь очередь длиной 1, ему нужно только 1.

Если вы используете POSIX-систему, тогда не используйте сигнал реального времени для сигнализации, потому что они стоят в очереди. Использование регулярного сигнала позволит вам относиться к сигналу во многих случаях так же, как к сигналу только один раз, пока сигнал не будет обработан, в то время как сигналы в реальном времени будут стоять в очереди. Пробуждение периодически для проверки очереди также позволяет вам обрабатывать возможность последнего сигнала, поступающего сразу после того, как вы проверили, есть ли у вас какие-либо новые пакеты, но прежде чем вы вызываете pause для ожидания сигнала.


Просто установите сокет на неблокирующийся и зациклируйте на recv(), пока он не вернет < 0 с errno == EAGAIN. Затем обработайте последний полученный пакет, промойте и повторите.


Я согласен с "caf". Установите сокет в неблокирующий режим.

Всякий раз, когда вы получаете что-то в сокете, читайте в цикле, пока ничего больше не останется. Затем обработайте последнюю прочитанную дейтаграмму.

Только одно примечание: вы должны установить большой системный буфер приема для сокета

int nRcvBufSize = 5*1024*1024; // or whatever you think is ok
setsockopt(sock, SOL_SOCKET, SO_RCVBUF, (char*) &nRcvBufSize, sizeof(nRcvBufSize));


Я почти уверен, что это неразрешимая проблема, тесно связанная с Two Army Problem.

Я могу подумать о грязном решении: установить соединение с боковой связью TCP "control", которое несет последний пакет, который также является индикацией "конечной передачи". В противном случае вам нужно использовать одно из более общих прагматических средств, отмеченных в Инженерные подходы.


Другая идея состоит в том, чтобы иметь выделенный процесс чтения, который ничего не делает, кроме циклов в сокете и считывает входящие пакеты в циклический буфер в общей памяти (вам придется беспокоиться о правильном порядке записи). Что-то вроде kfifo. Здесь тоже неблокируется. Новые данные перекрывают старые данные. Тогда у других процессов всегда был бы доступ к последнему блоку в начале очереди, а все предыдущие фрагменты еще не были перезаписаны.

Может быть слишком сложно для простого одностороннего считывателя, просто вариант.

licensed under cc by-sa 3.0 with attribution.