Как выбрать головку и хвост одновременно в реактивных расширениях

Я хотел бы создать следующий комбинатор

public static IObservable<u> HeadTailSelect<t, u="">
 (this IObservable<t> source, Func<t, iobservable<t="">, U> fn)
{
}
</t,></t></t,></u>

Метод выбора должен быть передан текущему событию и наблюдаемому ко всем будущим событиям, хвосту. Это должно быть гарантировано, что при подписке на хвост в любое время в будущем первое полученное событие будет самым следующим, которое было получено после головы.

Я знаю, что для этого потребуется некоторая буферизация, но я не совсем уверен, как это сделать.

У этого есть некоторые хорошие свойства. Вы можете сделать

IObservable<iobservable<unit>> windows =
source
 .HeadTailSelect((h,tail)=>Observable
 .Interval(TimeSpan.FromSeconds(1))
 .TakeUntil(tail)
 .Select(_=>Unit.Default)
 )
</iobservable<unit>

и избегайте условий гонки, в результате чего в окне требуется TakeUntil регистрироваться после ответа на первое событие, когда вы пропускаете некоторые события.

Также бонус-карма для любых идей о том, как протестировать реализацию.

Для реализации требуется следующий тестовый пример, хотя может быть недостаточно, чтобы доказать, что условия гонки избегают.

public class HeadTailSelect : ReactiveTest
{
 TestScheduler _Scheduler = new TestScheduler();
 [Fact]
 public void ShouldWork()
 {
 var o = _Scheduler.CreateColdObservable
 (OnNext(10, "A")
 , OnNext(11, "B")
 , OnNext(12, "C")
 , OnNext(13, "D")
 , OnNext(14, "E")
 , OnNext(15, "F")
 , OnCompleted<string>(700)
 );
 var data = o.HeadTailSelect((head, tail) => tail.Take(2).ToList())
 .SelectMany(p=>p)
 .Select(l=>String.Join("-", l));
 var actual = _Scheduler.Start(() =>
 data
 , created: 0
 , subscribed: 1
 , disposed: 1000
 );
 actual.Messages.Count()
 .Should()
 .Be(7);
 var messages = actual.Messages.Take(6)
 .Select(v => v.Value.Value)
 .ToList();
 messages[0].Should().Be("B-C");
 messages[1].Should().Be("C-D");
 messages[2].Should().Be("D-E");
 messages[3].Should().Be("E-F");
 messages[4].Should().Be("F");
 messages[5].Should().Be("");
 }
}
</string>
1 ответ

Вот решение кандидата, которое проходит вышеупомянутый тест. Однако я не уверен, что он удовлетворяет требованиям.

/// <summary>
/// Pass the head and tail of the observable to the
/// selector function. Note that 
/// </summary>
/// 
/// 
/// 
/// 
/// 
public static IObservable<u> HeadTailSelect<t, u="">
 (this IObservable<t> source, Func<t, iobservable<t="">, U> fn)
{
 var tail = new Subject<t>();
 return Observable.Create<u>(observer =>
 {
 return source.Subscribe(v =>
 {
 tail.OnNext(v);
 var u = fn(v, tail);
 observer.OnNext(u);
 }
 ,e=> { tail.OnCompleted();observer.OnError(e); }
 ,()=> { tail.OnCompleted();observer.OnCompleted(); });
 });
}
</u></t></t,></t></t,></u>

Обратите внимание, что u скорее всего будет чем-то вроде IObservable и должен быть подписан сразу. Если это будет сделано, я думаю, что все должно быть хорошо.

licensed under cc by-sa 3.0 with attribution.