Зачем играть 2.5 Ответ на кусок Akka, загружаемый сразу

Я пытаюсь реализовать ответ chunk в webapp, используя PLAY 2 с Akka. Однако вместо того, чтобы загружать ответ куском блоком, весь ответ приходит как один раз. Ниже приведен код, по которому я создаю кусок в контроллере:

/**
 * 
 */ 
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import akka.stream.javadsl.Source;
import akka.util.ByteString;
import org.pmw.tinylog.Logger;
import play.cache.CacheApi;
import play.cache.Cached;
import play.filters.csrf.AddCSRFToken;
import play.filters.csrf.CSRF;
import play.libs.Json;
import play.libs.concurrent.HttpExecutionContext;
import play.mvc.Controller;
import play.mvc.Http;
import play.mvc.Http.Cookie;
import play.mvc.Result;
import akka.NotUsed;
import akka.actor.Status;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Source;
import akka.util.ByteString;
/**
 * @author Abhinabyte
 *
 */
@Singleton
@AddCSRFToken
public class GetHandler extends Controller {
 @Inject
 private CacheApi cache;
 @Inject
 private HttpExecutionContext httpExecutionContext;
 public CompletionStage<result> index() {
return CompletableFuture.supplyAsync( () ->
 Source.<bytestring>actorRef(256, OverflowStrategy.dropNew())
 .mapMaterializedValue(sourceActor -> {
 CompletableFuture.runAsync(() -> {
 sourceActor.tell(ByteString.fromString("1"), null);
 sourceActor.tell(ByteString.fromString("2"), null);
 sourceActor.tell(ByteString.fromString("3"), null);
 try {
 Thread.sleep(3000);//intentional delay
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 sourceActor.tell(ByteString.fromString("444444444444444444444444444444444444444444444444444444444444444444444444"), null);
 sourceActor.tell(new Status.Success(NotUsed.getInstance()), null);
 });
 return sourceActor;
 })
 ).thenApplyAsync( chunks -> ok().chunked(chunks).as("text/html")); 
 }
}
</bytestring></result>

И ниже приведена конфигурация пула потоков Akka на application.conf:

akka {
 jvm-exit-on-fatal-error = on
 actor {
 default-dispatcher {
 fork-join-executor {
 parallelism-factor = 1.0
 parallelism-max = 64
 task-peeking-mode = LIFO
 }
 }
 }
}
play.server.netty {
 eventLoopThreads = 0
 maxInitialLineLength = 4096
 log.wire = false
 transport = "native"
}

Как вы можете видеть перед отправкой последнего последнего куска, я намеренно задерживаю время ответа. Таким образом, логически все данные, которые должны быть доставлены до него, должны быть доставлены до него. Однако в моем случае загружается целая куча данных. Я тестировал во всех браузерах (даже пытался CURL). Что мне здесь не хватает?

3 ответа

Блокировка в mapMaterializedValue будет делать это, потому что она работает в потоке Akka default-dispatcher, тем самым предотвращая маршрутизацию сообщений на длительность (подробнее см. этот ответ), Вы хотите отправить свой медленный, блокирующий код асинхронно, с ссылкой на актера, чтобы он отправлял сообщения. Ваш пример будет делать то, что вы ожидаете, если вы запустите его в будущем:

public CompletionStage<result> test() {
 return CompletableFuture.supplyAsync( () ->
 Source.<bytestring>actorRef(256, OverflowStrategy.dropNew())
 .mapMaterializedValue(sourceActor -> {
 CompletableFuture.runAsync(() -> {
 for (int i = 0; i < 20; i++) {
 sourceActor.tell(ByteString.fromString(String.valueOf(i) + "\n"), null);
 try {
 Thread.sleep(500);//intentional delay
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 }
 sourceActor.tell(new Status.Success(NotUsed.getInstance()), null);
 });
 return sourceActor;
 })
 ).thenApplyAsync( chunks -> ok().chunked(chunks).as("text/html"));
}
</bytestring></result>


Если вы проверите код Source, вы увидите, что первым параметром является bufferSize

public static <t> Source<t,actorref> actorRef(int bufferSize,
 OverflowStrategy overflowStrategy)
</t,actorref></t>

все ваши элементы, которые вы генерируете в потоке, вероятно, имеют менее 256 байт, поэтому генерируется только один фрагмент HTTP. Попробуйте добавить другие элементы, например, в пример @Mikesname.


Это может быть полезно, если вам нужен фрагментированный ответ, используя другой подход.

public Result test() {
 try {
 // Finite list
 List<string> sourceList = Arrays.asList("kiki", "foo", "bar");
 Source<string, ?=""> source = Source.from(sourceList);
 /* Following DB call, which fetch a record at a time, and send it as chunked response.
 final Iterator<string> sourceIterator = Person.fetchAll();
 Source<string, ?=""> source = Source.from(() -> sourceIterator); */
 return ok().chunked(source.via(Flow.of(String.class).map(ByteString::fromString))).as(Http.MimeTypes.TEXT);
 } catch (Exception e) {
 return badRequest(e.getMessage());
 }
}
</string,></string></string,></string>

licensed under cc by-sa 3.0 with attribution.