Использование коллекции $UnmodifiableCollection с Apache Flink

При использовании Apache Flink со следующим кодом:

DataStream<list<string>> result = source.window(Time.of(1, TimeUnit.SECONDS)).mapWindow(new WindowMapFunction<string, list<string="">>() {
 @Override
 public void mapWindow(Iterable<string> iterable, Collector<list<string>> collector) throws Exception {
 List<string> top5 = Ordering.natural().greatestOf(iterable, 5);
 collector.collect(top5);
 }
}).flatten();
</string></list<string></string></string,></list<string>

Я получил это исключение

Caused by: java.lang.UnsupportedOperationException
 at java.util.Collections$UnmodifiableCollection.add(Collections.java:1055)
 at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
 at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
 at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
 at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:211)
 at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:110)
 at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:41)
 at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
 at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:125)
 at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:127)
 at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:56)
 at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:172)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
 at java.lang.Thread.run(Thread.java:745)

Как я могу использовать UnmodifiableCollection с Flink?

1 ответ

Проблема заключается в том, что по умолчанию CollectionSerializer Kryo не может десериализовать коллекцию снова, потому что ее невозможно модифицировать (вызов .add() завершается с ошибкой).

Чтобы устранить проблему, мы можем использовать UnmodifiableCollectionsSerializer из проекта kryo-serializers. Flink транзитивно зависит от проекта, поэтому нет необходимости добавлять его в качестве зависимости.

Далее, мы должны зарегистрировать сериализатор с экземплярами Flink Kryo.

StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
Class<!--?--> unmodColl = Class.forName("java.util.Collections$UnmodifiableCollection");
see.getConfig().addDefaultKryoSerializer(unmodColl, UnmodifiableCollectionsSerializer.class);

Обычно нам не нужно вызывать Class.forName() для регистрации сериализатора, но в этом случае java.util.Collections$UnmodifiableCollection является видимым пакетом, поэтому мы не можем напрямую обращаться к классу.

licensed under cc by-sa 3.0 with attribution.