Comet на Embedded Jetty. Первый блин)

комом. Мой первый эксперимент с COMET и с Embedded Jetty. “Пощупать” COMET хотелось уже давно. Во-первых сама по себе технология достаточно интересная, а во-вторых — хотелось попробовать что-то новенькое. Jetty был выбран в качестве сервера для экспериментов как наиболее легковесный (особенно в embedded-варианте) из известных мне Java-серверов.

В качестве примера напишем простенький чат. Будем использовать технику long polling. Допустим у нас запущен Jetty-сервер с приложением-чатом, к которому подключились два клиента Саша и Маша. На диаграмме все их мучения будут выглядеть так:

comet

Теперь попробуем объяснить увиденное с небольшими вставками кода (полный исходный код можно скачать в конце статьи).

Сервер запускается и переходит в режим ожидания новых подключений

private void initializeJettyServer() {
    jettyServer = new Server();
 
    // Thread pool
    QueuedThreadPool threadPool = new QueuedThreadPool();
    threadPool.setMaxThreads(THREAD_POOL_MAX_THREADS);
    jettyServer.setThreadPool(threadPool);
 
    // NIO connector
    SelectChannelConnector connector = new SelectChannelConnector();
    connector.setPort(CONNECTOR_PORT);
    connector.setMaxIdleTime(CONNECTOR_MAX_IDLE_TIME);
    jettyServer.setConnectors(new Connector[] { connector });
 
    // Handlers
    HandlerList handlers = new HandlerList();
    ContextHandlerCollection contexts = new ContextHandlerCollection();
 
    Context rootContext = new Context(contexts, "/", Context.SESSIONS);
    rootContext.addServlet(new ServletHolder(ChatServlet.class), "/chat/*");
 
    ResourceHandler resourceHandler = new ResourceHandler();
    resourceHandler.setResourceBase("../src/client");
 
    handlers.setHandlers(new Handler[] { resourceHandler, contexts });
    jettyServer.setHandler(handlers);
}

Нужно использовать именно SelectChannelConnector, т.к. он является неблокирующим и позволяет использовать Continuation’ы (именно они позволяют создавать “долгие” соединения, рассмотрим чуть поздже). Далее мы подцепляем к нашему серверу сервлет ChatServlet, который будет обрабатывать все AJAX-запросы от клиентов и через ResourceHandler указываем директорию с клиентской частью приложения.

Саша регистрируется на сервере, получает уникальный номер (uid) и подписывается на события сервера

Саша вводит свой ник и нажимает кнопку “Register”, после чего выполняется следующий код:

_register : function(nickname) {
    var request = new AjaxRequest(
        '/chat/',
        { 'action' : 'register', 'nickname' : nickname },
 
        function(responseText) {
            chat._userUid = responseText;
            chat._subscribe();
        }
    );
 
    request.send();
}

На сервлет ChatServlet отправляется запрос с двумя параметрами: действием, которое нужно выполнить (action) и ником (nickname). При успешном выполнении запроса скрипт сохраняет выданный сервером uid и подписывается на события с сервера: регистрация новых пользователей и новые сообщения (рассмотрим далее).

При регистрации пользователя на сервере выполнится функция:

public synchronized String registerUser(String nickname) {
    ChatUser user = new ChatUser(nickname);
    users.put(user.getUid(), user);
 
    return user.getUid();
}

Создаётся новый пользователь с ником, который указал клиент. После чего этот пользователь заносится в хеш. В дальнейшем во всех запросах от пользователя будет указываться uid, выданный при регистрации. По этому uid из хеша будет извлекаться нужная запись.

После регистрации скрипт сразу подписывает Сашу на новые события с сервера путём установки с сервером “долгого” соединения (в нашем случае оно ограничено 30 секундами). На диаграмме такие “долгие” соединения представлены прямоугольниками. При успешном завершении такого соединения скрипт выводит полученный от сервера текст в окно чата.

_subscribe : function() {
    var request = new AjaxRequest(
        '/chat/',
        { 'action' : 'subscribe', 'user-uid' : this._userUid },
 
        function(responseText) {
            if ((responseText != 'null') && (responseText != '')) {
                chat._renderMessage(responseText);
            }
 
            setTimeout(function() { chat._subscribe(); }, 0);
        }
    );
 
    request.send();
}

Вот мы и добрались до Continuation’ов) Итак, что же происходит на стороне сервера при установлении “долгого” соединения? Для начала немного слов про то, что такое Continuation. Эта фишка Jetty, которая позволяет “приостановить” запрос ровно до того момента пока не произойдёт какое либо событие, либо произойдёт таймаут запроса. Типичная последовательность действий при работе с Continuation выглядит так:

  1. Клиент посылает запрос на сервер.
  2. Сервер оборачивает запрос клиента в Continuation-объект и вызывает у него метод suspend, тем самым оставляя соединее открытым.
  3. При возникновении какого-либо события, на которое подписан клиент, сервер получает Continuation-объект для этого клиента, устанавливает для него дополнительные данные, которые будут возвращены клиенту (с помощью вызова метода setObject) и вызывает у него метод resume.
  4. Исходный запрос выполняется ещё раз ровно до того момента, когда был вызван метод suspend. Но в этом случае запрос не “приостанавливается”, а выполняется как обычно. Выбор ветки (“приостановить” или продолжить нормально выполнение) делается на основании свойства Continuation-объекта isPending, которое сигнализирует о том, что запрос уже “приостановлен”.
  5. Клиент снова подписывается на события сервера (посылает запрос). Круг замкнулся.

Особенностью Continuation’ов является то, что они не держат поток пока запрос “приостановлен”. Т.е. один поток сервера может обслуживать несколько открытых соединений (никакого “thread per connection”). Очень круто! Также в документации сказано, что с выходом Servlet 3.0 Continuation’ы будут заменены на “suspendable requests”. Более подробную информацию можно прочитать в материалах в конце статьи.

А вот и сам код:

private void doSubscribeAction(HttpServletRequest request, HttpServletResponse response) throws IOException {
    Continuation continuation = ContinuationSupport.getContinuation(request, null);
 
    if (continuation.isPending()) {
        writeOkResponse(response, (String) continuation.getObject());
        continuation.setObject(null);
    } else {
        String userUid = request.getParameter("user-uid");
 
        if (Chat.getInstance().subscribeUser(userUid, continuation)) {
            continuation.suspend(30000);
        } else {
            writeErrorResponse(response, "User not found. Please, register.");
        }
    }
}

С учётом всего написанного выше код должен быть понятен)

Итак, Саша зарегистрировался в чате и ждёт, но нифига не происходит. Маша изрядно тупит. Но в итоге каким-то чудом и она смогла зарегистрироваться.

Маша регистрируется на сервере, получает уникальный номер (uid) и подписывается на события сервера

Не будем снова разбирать эту цепочку, она такая же как и у Саши. Поэтому посмотрим каким образом ему придёт сообщение о новом участнике чата.

Когда Маша (или Саша, один фиг) регистрируется, то выполняется код:

private void doRegisterAction(HttpServletRequest request, HttpServletResponse response) throws IOException {
    String nickname = request.getParameter("nickname");
    String userUid = Chat.getInstance().registerUser(nickname);
 
    Chat.getInstance().broadcastMessage(userUid, "Welcome, " + nickname + "!");
    writeOkResponse(response, userUid);
}

Наибольший интерес для нас сейчас представляет функция broadcastMessage:

public synchronized void broadcastMessage(String senderUid, String message) {
    for (ChatUser user : users.values()) {
        if (user.getUid().equals(senderUid)) continue;
 
        user.getContinuation().setObject(message);
        user.getContinuation().resume();
    }
}

Она служит для рассылки сообщения message всем участникам чата, кроме пользователя, который стал виновником события (uid равен параметру senderUid).

Саша, на момент регистрации Маши, был подписан на события сервера и с ним был связан Continuation-объект в pending-состоянии. Между Сашей и сервером было открыто соединение, незавершённый запрос. При регистрации Маши выполнилась функция broadcastMessage, которая это запрос завершила, вернув в качестве результата сообщение о регистрации Маши. Таким образом Саше будет отправлено сообщение “Welcome, Маша!”. После чего Саша снова подписывается на события с сервера. Всё просто)

Обмен сообщениями

Всё один в один с тем, что происходит при регистрации.

Произошёл таймаут соединения

Аааааааааа! Мы все умрём!!! На самом деле нет. Всё, что нужно сделать, это просто переподключиться к серверу. Т.е. всегда (как при таймауте, так и в случае ответа от сервера) при завершении “долгого” соединения мы сразу же устанавливаем новое.

Исходный код и исполняемый файл

Для сборки проекта используется Maven 2. Для этого в директории …/embedded-jetty-comet/src/server вызываем команду

mvn clean package

Для запуска тестового примера необходимо в директории …/embedded-jetty-comet/bin выполнить команду (архив надо распаковать полностью — директория src должна лежать рядом с директорией bin)

java -jar comet-server-1.0.jar

Скачать исходный код и исполняемый файл

Материалы:

4 thoughts on “Comet на Embedded Jetty. Первый блин)

  1. Ваня

    Схема классная, в чем рисовал?
    p.s. картинки и схемы принято подписывать 😉

    Reply
  2. Alexander Shchekoldin Post author

    Спасибо! Надеюсь содержание тоже не подкачало) Пересмотрел несколько прог и остановился на Inkscape.
    p.s.: думаю, когда в статье одна картинка, то путаницы не будет)

    Reply
  3. Oleg

    Привет. Очень и очень понятная и простая для понимания статья, но у меня как у новичка есть такой вопрос:
    Вы отправляете все запросы на “/chat/”. Что это? Можно по-подробнее об этой директории? Буду рад получить ответ на мой эмейл адрес 🙂 Заранее спасибо!

    Reply

Leave a Reply

Your email address will not be published. Required fields are marked *