Буферный оператор со счетчиками и временными условиями

У меня есть последовательность, которая очень частая, и я пытался сделать ее более эффективной, обрабатывая события партиями. Оператор буфера со временем и условиями подсчета был чем-то вроде моих требований, за исключением одного небольшого нюанса. Когда вы используете эту перегрузку, подписка получает уведомление после указанной задержки времени, независимо от того, есть ли в буфере какие-либо элементы. Это становится очень раздражающей причиной, когда большую часть времени моя подписка получает пустой список из оператора буфера. Учитывая, что это многопоточное приложение, в котором абонент находится в потоке пользовательского интерфейса, он оказывается не самым оптимальным подходом к обработке элементов пакетами. Мне было интересно, есть ли способ использовать доступные операторы для создания последовательности, которая срабатывает либо при наличии определенного количества элементов в буфере, либо в течение определенного времени, но тогда и только тогда, когда есть какие-либо элементы в буфер. Я знаю, что могу сделать что-то вроде этого:

sequence.Buffer(TimeSpan.FromSeconds(5), 1).Where(e=>e.Count > 0)

Но мне было интересно, есть ли другой способ сделать это, потому что я чувствую, что это не лучший способ.

2 ответа

Я не вижу причины беспокоиться об этом - у вас есть идиоматическое решение. Пустым буфером является информация, поэтому разумно, чтобы реализация framework вернула его. Любой другой метод будет эффективно делать то же самое, что и внутри.

Когда я нахожусь с использованием небольших групп стандартных операторов, я часто обертываю их более подробным методом расширения. Например:

public static class ObservableExtensions
{ public static IObservable<ilist<t>> ToNonEmptyBuffers<t>( this IObservable<t> source, TimeSpan timespan, int count, IScheduler scheduler = null) { scheduler = scheduler ?? Scheduler.Default; return source.Buffer(timespan, count, scheduler ?? Scheduler.Default) .Where(buffer => buffer.Count > 0); }
}
</t></t></ilist<t>

Разрешение:

sequence.ToNonEmptyBuffers(TimeSpan.FromSeconds(5), 1);


Ради "Rx-i-ness" я бросаю в кучу следующее.

Лично я думаю, что ответа Джеймса достаточно (вероятно, лучше во многих сценариях). Единственное различие (по сравнению с выходом) заключается в том, что буферный таймер запускается только тогда, когда появляется новый элемент. Вот почему нам не нужно отфильтровывать пустые буферы. При этом это не может быть самым эффективным решением. Это просто здесь, чтобы показать силу композиции.

var batches = source .GroupByUntil( // This means we're not really grouping, but windowing. // granted, if we needed to group our batches, this is useful! x => 0, group => Observable.Amb( // this means we get a max of 11 per batch group.Skip(10), // This means we get a max batch time of 10 seconds group.Take(1).Delay(TimeSpan.FromSeconds(10)) )) // Since GroupByUntil gives us windows, we can ToArray them. .SelectMany(x => x.ToArray());

licensed under cc by-sa 3.0 with attribution.