Как каждому новому подключившемуся пользователю назначить отдельный поток?

Vdvz

Стоит задача каждому новому клиенту отправившему информацию на сервер (клиенты telegram) выделить свой поток обработки данных, при повторных запросах от этого клиента все данные кидать в этот поток. Когда данные будут обработаны, а новые не придут, поток завершится. Если более детально, то клиенты отправляют серверу строку, которая в потоке(для каждого клиента он свой) проверяет есть ли идентичная строка в листе на сервере(со временем в лист будут подгружаться строки). И если до того как строка появится в листе придет еще одна строка от этого же клиента, то она тоже будет ждать идентичной в листе. Поток будет существовать пока все строки не обработаются,а новые не поступят.

Подскажите как это можно реализовать? Пытался создать фабрику потоков и добавлять в map пару id-поток, но в итоге получается, что все данные обрабатываются одним потоком, хотя в карту id добавляются.

1 ответ

Vdvz

Предлагаю следующее решение. Сделать потоки без состояния, тогда можно сразу же создать пул с заданным размером. Это будет и безопаснее и удобнее при реализации. Тогда, при получении очередной порции данных из сокета, создавать задачу, и передавать ее на выполнение потокам из пула.

Сервер тогда будет выглядеть следующим образом:

private static class SocketServerExample {
    private final Selector selector;
    private final InetSocketAddress listenAddress;
    private final ExecutorService executors;

    private SocketServerExample(String address, 
                                int port, 
                                int countOfThreads) throws IOException {

        this.listenAddress = new InetSocketAddress(address, port);
        this.selector = Selector.open();
        this.executors = Executors.newFixedThreadPool(countOfThreads);
    }

    private void start() throws IOException {
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);
        serverChannel.socket().bind(listenAddress);
        serverChannel.register(this.selector, SelectionKey.OP_ACCEPT);

        while (!Thread.currentThread().isInterrupted()) {
            this.selector.select();

            Iterator keys = this.selector.selectedKeys().iterator();
            while (keys.hasNext()) {
                SelectionKey key = (SelectionKey) keys.next();

                keys.remove();

                if (!key.isValid())
                    continue;

                if (key.isAcceptable())
                    accept(key);
                else if (key.isReadable())
                    executors.execute(new RequestHandler(key));
            }
        }
    }

    private void accept(SelectionKey key) throws IOException {
        ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
        SocketChannel channel = serverChannel.accept();
        channel.configureBlocking(false);
        channel.register(this.selector, SelectionKey.OP_READ);
    }
}

Обработчик данных будет таким:

private static class RequestHandler implements Runnable {
    private final SelectionKey selectionKey;

    private RequestHandler(SelectionKey selectionKey) {
        this.selectionKey = selectionKey;
    }

    @Override
    public void run() {
        try {
            SocketChannel channel = (SocketChannel) selectionKey.channel();
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            int numRead = channel.read(buffer);

            if (numRead == -1) {
                channel.close();
                selectionKey.cancel();
                return;
            }
            String data = new String(buffer.array());
            if(data.trim().isEmpty())
                return;

            String content = " >> " + Thread.currentThread().getName() + " >> " + data;
            System.out.println(content);
            buffer = ByteBuffer.wrap(content.getBytes());
            channel.write(buffer);

        } catch (Exception e) {
           // throw new RuntimeException(e);
        }
    }
 }

А чтобы запустить сервер, нужно написать вот так:

final String address = "localhost";
final int port = 9999;

new SocketServerExample(address, port, 10).start();

licensed under cc by-sa 3.0 with attribution.