Commit 9443a34d authored by Jörg Richter's avatar Jörg Richter

Replace WebSocket server by servlet; only 1 port in use! (#384)

parent 45dcc0fa
Pipeline #10477 passed with stage
in 8 minutes and 4 seconds
......@@ -52,7 +52,7 @@ class JerseyResponseFilter implements ContainerResponseFilter {
// ---------------------------------------------------------------------------------------------- Instance Variables
private EventManager em;
private WebSocketService ws;
private WebSocketService wss;
@Context
private HttpServletRequest request;
......@@ -61,9 +61,9 @@ class JerseyResponseFilter implements ContainerResponseFilter {
// ---------------------------------------------------------------------------------------------------- Constructors
JerseyResponseFilter(EventManager em, WebSocketService ws) {
JerseyResponseFilter(EventManager em, WebSocketService wss) {
this.em = em;
this.ws = ws;
this.wss = wss;
}
// -------------------------------------------------------------------------------------------------- Public Methods
......@@ -203,7 +203,7 @@ class JerseyResponseFilter implements ContainerResponseFilter {
JSONObject message = new JSONObject()
.put("type", "processDirectives")
.put("args", directives.toJSONArray());
ws.messageToAllButOne(request, "systems.dmx.webclient", message.toString());
wss.messageToAllButOne(request, "systems.dmx.webclient", message.toString());
}
......
......@@ -60,7 +60,7 @@ class WebPublishingService {
// ---------------------------------------------------------------------------------------------------- Constructors
WebPublishingService(AccessLayer al, WebSocketService ws) {
WebPublishingService(AccessLayer al, WebSocketService wss) {
try {
logger.info("Setting up the WebPublishingService");
this.al = al;
......@@ -71,7 +71,7 @@ class WebPublishingService {
// setup container filters
Map<String, Object> properties = jerseyApplication.getProperties();
properties.put(ResourceConfig.PROPERTY_CONTAINER_REQUEST_FILTERS, new JerseyRequestFilter(al.em));
properties.put(ResourceConfig.PROPERTY_CONTAINER_RESPONSE_FILTERS, new JerseyResponseFilter(al.em, ws));
properties.put(ResourceConfig.PROPERTY_CONTAINER_RESPONSE_FILTERS, new JerseyResponseFilter(al.em, wss));
properties.put(ResourceConfig.PROPERTY_RESOURCE_FILTER_FACTORIES, new TransactionFactory(al));
//
// deploy Jersey application in container
......
......@@ -39,12 +39,13 @@ class WebSocketConnection implements WebSocket, WebSocket.OnTextMessage, WebSock
WebSocketConnection(String pluginUri, String clientId, HttpSession session, WebSocketConnectionPool pool,
CoreService dmx) {
logger.info("### Associating WebSocket connection (client ID " + clientId + ") with " + info(session));
this.pluginUri = pluginUri;
this.clientId = clientId;
this.session = session;
this.pool = pool;
this.dmx = dmx;
// Note: info(session) relies on "dmx"
logger.info("### Associating WebSocket connection (client ID " + clientId + ") with " + info(session));
}
// -------------------------------------------------------------------------------------------------- Public Methods
......
package systems.dmx.core.impl;
import systems.dmx.core.osgi.CoreActivator;
import systems.dmx.core.service.Cookies;
import systems.dmx.core.service.CoreService;
import systems.dmx.core.service.websocket.WebSocketService;
import systems.dmx.core.util.JavaUtils;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.websocket.WebSocket;
import org.eclipse.jetty.websocket.WebSocketHandler;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpSession;
import java.util.Collection;
import java.util.concurrent.BlockingQueue;
......@@ -26,14 +19,12 @@ public class WebSocketServiceImpl implements WebSocketService {
// ------------------------------------------------------------------------------------------------------- Constants
private static final int WEBSOCKETS_PORT = Integer.getInteger("dmx.websockets.port", 8081);
private static final String WEBSOCKETS_URL = System.getProperty("dmx.websockets.url", "ws://localhost:8081");
// Note: the default values are required in case no config file is in effect. This applies when DM is started
// via feature:install from Karaf. The default values must match the values defined in project POM.
// Note: the default value is required in case no config file is in effect. This applies when DM is started
// via feature:install from Karaf. The default value must match the value defined in project POM.
// ---------------------------------------------------------------------------------------------- Instance Variables
private WebSocketsServer server; // instantiated in start()
private WebSocketConnectionPool pool; // instantiated in start()
private SendMessageWorker worker; // instantiated in start()
private CoreService dmx;
......@@ -86,16 +77,14 @@ public class WebSocketServiceImpl implements WebSocketService {
public void start() {
try {
logger.info("##### Starting Jetty WebSocket server");
server = new WebSocketsServer(WEBSOCKETS_PORT);
logger.info("##### Starting WebSocket service");
pool = new WebSocketConnectionPool();
CoreActivator.getHttpService().registerServlet("/websocket", new WebSocketServlet(pool, dmx), null, null);
worker = new SendMessageWorker();
worker.start();
server.start();
// ### server.join();
logger.info("Jetty WebSocket server started successfully");
logger.info("WebSocket service started successfully");
} catch (Exception e) {
logger.log(Level.SEVERE, "Starting Jetty WebSocket server failed", e);
logger.log(Level.SEVERE, "Starting WebSocket service failed", e);
}
}
......@@ -103,15 +92,15 @@ public class WebSocketServiceImpl implements WebSocketService {
void stop() {
try {
if (server != null) {
logger.info("### Stopping Jetty WebSocket server");
if (worker != null) {
logger.info("### Stopping WebSocket service (httpService=" + CoreActivator.getHttpService() + ")");
// CoreActivator.getHttpService().unregister("/websocket"); // HTTP service already gone
worker.interrupt();
server.stop();
} else {
logger.info("Stopping Jetty WebSocket server SKIPPED -- not yet started");
logger.info("Stopping WebSocket service SKIPPED -- it was not successfully started");
}
} catch (Exception e) {
logger.log(Level.SEVERE, "Stopping Jetty WebSocket server failed", e);
logger.log(Level.SEVERE, "Stopping WebSocket service failed", e);
}
}
......@@ -150,59 +139,8 @@ public class WebSocketServiceImpl implements WebSocketService {
return cookies.has("dmx_client_id") ? cookies.get("dmx_client_id") : null;
}
// ------------------------------------------------------------------------------------------------- Private Classes
private class WebSocketsServer extends Server {
private WebSocketsServer(int port) {
// add connector
Connector connector = new SelectChannelConnector();
connector.setPort(port);
addConnector(connector);
// set handler
setHandler(new WebSocketHandler() {
@Override
public WebSocket doWebSocketConnect(HttpServletRequest request, String protocol) {
try {
checkProtocol(protocol);
return new WebSocketConnection(protocol, clientId(request), session(request), pool, dmx);
} catch (Exception e) {
throw new RuntimeException("Opening a WebSocket connection " +
(protocol != null ? "for plugin \"" + protocol + "\" " : "") + "failed", e);
}
}
});
}
private void checkProtocol(String pluginUri) {
if (pluginUri == null) {
throw new RuntimeException("A plugin URI is missing in the WebSocket handshake -- Add your " +
"plugin's URI as the 2nd argument to the JavaScript WebSocket constructor");
}
dmx.getPlugin(pluginUri); // check plugin URI, throws if invalid
}
private String clientId(HttpServletRequest request) {
String clientId = JavaUtils.cookieValue(request, "dmx_client_id");
if (clientId == null) {
throw new RuntimeException("Missing \"dmx_client_id\" cookie in websocket request");
}
return clientId;
}
private HttpSession session(HttpServletRequest request) {
// logger.info("request=" + JavaUtils.requestDump(request));
HttpSession session = request.getSession(false);
if (session == null) {
// FIXME
// throw new RuntimeException("No (valid) session associated with websocket request");
}
return session;
}
}
private class SendMessageWorker extends Thread {
private BlockingQueue<QueuedMessage> messages = new LinkedBlockingQueue();
......
package systems.dmx.core.impl;
import systems.dmx.core.service.CoreService;
import systems.dmx.core.util.JavaUtils;
import org.eclipse.jetty.websocket.WebSocket;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpSession;
import java.util.logging.Logger;
public class WebSocketServlet extends org.eclipse.jetty.websocket.WebSocketServlet {
// ---------------------------------------------------------------------------------------------- Instance Variables
private WebSocketConnectionPool pool;
private CoreService dmx;
private Logger logger = Logger.getLogger(getClass().getName());
// ----------------------------------------------------------------------------------------------------- Constructor
// ### TODO: inject event manager only
WebSocketServlet(WebSocketConnectionPool pool, CoreService dmx) {
this.pool = pool;
this.dmx = dmx;
}
// -------------------------------------------------------------------------------------------------- Public Methods
@Override
public WebSocket doWebSocketConnect(HttpServletRequest request, String protocol) {
try {
checkProtocol(protocol);
return new WebSocketConnection(protocol, clientId(request), session(request), pool, dmx);
} catch (Exception e) {
throw new RuntimeException("Opening a WebSocket connection " +
(protocol != null ? "for plugin \"" + protocol + "\" " : "") + "failed", e);
}
}
// ------------------------------------------------------------------------------------------------- Private Methods
private void checkProtocol(String pluginUri) {
if (pluginUri == null) {
throw new RuntimeException("A plugin URI is missing in the WebSocket handshake -- Add your " +
"plugin's URI as the 2nd argument to the JavaScript WebSocket constructor");
}
dmx.getPlugin(pluginUri); // check plugin URI, throws if invalid
}
private String clientId(HttpServletRequest request) {
String clientId = JavaUtils.cookieValue(request, "dmx_client_id");
if (clientId == null) {
throw new RuntimeException("Missing \"dmx_client_id\" cookie in upgrade request");
}
return clientId;
}
private HttpSession session(HttpServletRequest request) {
logger.info("request=" + JavaUtils.requestDump(request));
HttpSession session = request.getSession(false);
if (session == null) {
// FIXME
// throw new RuntimeException("No (valid) session associated with upgrade request");
}
return session;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment