Commit 6671936d authored by Jörg Richter's avatar Jörg Richter

Revise WebSocketService API (#384, #341)

BREAKING CHANGE

Change 3 WebSocketService methods:

```
void messageToAll(String pluginUri, String message)
->
void sendToAll(String message)
```

```
void messageToAllButOne(HttpServletRequest request, String pluginUri, String message)
->
void sendToAllButOrigin(String message)
```

```
void messageToOne(HttpServletRequest request, String pluginUri, String message)
->
void sendToOrigin(String message);
```

Besides the 3 new method names note that both the "request" and "pluginUri" arguments are not needed anymore. This is now automatic.

1 new WebSocketService method:

```
void sendToSome(String message, Predicate<WebSocketConnection> connectionFilter)
```

You can decide to which connections to send the message by providing a predicate function (use a lambda) that decides per connection.
parent 9a58e5c0
Pipeline #10479 passed with stage
in 8 minutes and 43 seconds
......@@ -203,7 +203,8 @@ class JerseyResponseFilter implements ContainerResponseFilter {
JSONObject message = new JSONObject()
.put("type", "processDirectives")
.put("args", directives.toJSONArray());
wss.messageToAllButOne(request, "systems.dmx.webclient", message.toString());
wss.sendToAllButOrigin(message.toString());
// FIXME: don't send update directives to unauthorized parties
}
......
......@@ -96,7 +96,8 @@ class PluginManager {
synchronized PluginImpl getPlugin(String pluginUri) {
PluginImpl plugin = activatedPlugins.get(pluginUri);
if (plugin == null) {
throw new RuntimeException("Plugin \"" + pluginUri + "\" is not installed/activated");
throw new RuntimeException("Plugin \"" + pluginUri + "\" is not installed/activated. Activated plugins: " +
activatedPlugins.keySet());
}
return plugin;
}
......
package systems.dmx.core.impl;
import systems.dmx.core.service.CoreService;
import systems.dmx.core.service.websocket.WebSocketConnection;
import org.eclipse.jetty.websocket.WebSocket;
import org.eclipse.jetty.websocket.WebSocket.Connection;
......@@ -18,13 +19,14 @@ import java.util.logging.Logger;
* <p>
* Once the actual WebSocket connection is opened or closed the WebSocketConnection is added/removed to a pool.
*/
class WebSocketConnection implements WebSocket, WebSocket.OnTextMessage, WebSocket.OnBinaryMessage {
class WebSocketConnectionImpl implements WebSocketConnection, WebSocket, WebSocket.OnTextMessage,
WebSocket.OnBinaryMessage {
// ---------------------------------------------------------------------------------------------- Instance Variables
String pluginUri;
private String pluginUri; // TODO: drop
String clientId;
HttpSession session;
private HttpSession session;
private WebSocketConnectionPool pool;
private CoreService dmx;
......@@ -37,32 +39,39 @@ class WebSocketConnection implements WebSocket, WebSocket.OnTextMessage, WebSock
// ----------------------------------------------------------------------------------------------------- Constructor
WebSocketConnection(String pluginUri, String clientId, HttpSession session, WebSocketConnectionPool pool,
CoreService dmx) {
WebSocketConnectionImpl(String pluginUri, String clientId, HttpSession session, WebSocketConnectionPool pool,
CoreService dmx) {
this.pluginUri = pluginUri;
this.clientId = clientId;
this.session = session;
this.pool = pool;
this.dmx = dmx;
// Note: info(session) relies on "dmx"
logger.info("### Associating WebSocket connection (client ID " + clientId + ") with " + info(session));
logger.info("### Associating WebSocket connection " + clientId + " (client ID) with " + info(session));
}
// -------------------------------------------------------------------------------------------------- Public Methods
// *** WebSocketConnection ***
@Override
public String getClientId() {
return clientId;
}
// *** WebSocket ***
@Override
public void onOpen(Connection connection) {
logger.info("Opening a WebSocket connection for plugin \"" + pluginUri + "\" (client ID " + clientId + ")");
logger.info("Opening WebSocket connection " + clientId + " (client ID)");
this.connection = connection;
pool.add(this);
pool.addConnection(this);
}
@Override
public void onClose(int code, String message) {
logger.info("Closing a WebSocket connection of plugin \"" + pluginUri + "\" (client ID " + clientId + ")");
pool.remove(this);
logger.info("Closing WebSocket connection " + clientId + " (client ID)");
pool.removeConnection(this);
}
// *** WebSocket.OnTextMessage ***
......@@ -92,8 +101,8 @@ class WebSocketConnection implements WebSocket, WebSocket.OnTextMessage, WebSock
try {
connection.sendMessage(message);
} catch (Exception e) {
pool.remove(this);
logger.log(Level.SEVERE, "Sending message via " + this + " failed -- connection removed", e);
pool.removeConnection(this);
logger.log(Level.SEVERE, "Sending message via " + this + " failed -- connection removed from pool", e);
}
}
......
......@@ -12,10 +12,9 @@ class WebSocketConnectionPool {
// ---------------------------------------------------------------------------------------------- Instance Variables
/**
* 1st hash: plugin URI
* 2nd hash: client ID
* key: client ID
*/
private Map<String, Map<String, WebSocketConnection>> pool = new ConcurrentHashMap();
private Map<String, WebSocketConnectionImpl> pool = new ConcurrentHashMap();
private Logger logger = Logger.getLogger(getClass().getName());
......@@ -26,43 +25,27 @@ class WebSocketConnectionPool {
// ----------------------------------------------------------------------------------------- Package Private Methods
/**
* Returns the open WebSocket connections associated to the given plugin, or <code>null</code> if there are none.
*/
Collection<WebSocketConnection> getConnections(String pluginUri) {
Map connections = pool.get(pluginUri);
return connections != null ? connections.values() : null;
}
WebSocketConnection getConnection(String pluginUri, String clientId) {
Map<String, WebSocketConnection> connections = pool.get(pluginUri);
if (connections == null) {
logger.warning("No WebSocket connection open for plugin \"" + pluginUri + "\"");
return null;
}
WebSocketConnection connection = connections.get(clientId);
WebSocketConnectionImpl getConnection(String clientId) {
WebSocketConnectionImpl connection = pool.get(clientId);
if (connection == null) {
logger.warning("No WebSocket connection open for client ID " + clientId + " (plugin \"" + pluginUri +
"\")");
logger.warning("No open WebSocket connection for client ID " + clientId);
}
return connection;
}
void add(WebSocketConnection connection) {
String pluginUri = connection.pluginUri;
Map connections = pool.get(pluginUri);
if (connections == null) {
connections = new ConcurrentHashMap();
pool.put(pluginUri, connections);
}
connections.put(connection.clientId, connection);
Collection<WebSocketConnectionImpl> getAllConnections() {
return pool.values();
}
void addConnection(WebSocketConnectionImpl connection) {
pool.put(connection.clientId, connection);
}
void remove(WebSocketConnection connection) {
String pluginUri = connection.pluginUri;
boolean removed = getConnections(pluginUri).remove(connection);
void removeConnection(WebSocketConnectionImpl connection) {
boolean removed = pool.remove(connection.clientId) != null;
if (!removed) {
throw new RuntimeException("Removing a connection of plugin \"" + pluginUri + "\" failed");
throw new RuntimeException("Can't remove WebSocket connection " + connection.clientId +
" (client ID) from pool");
}
}
}
......@@ -3,6 +3,7 @@ package systems.dmx.core.impl;
import systems.dmx.core.osgi.CoreActivator;
import systems.dmx.core.service.Cookies;
import systems.dmx.core.service.CoreService;
import systems.dmx.core.service.websocket.WebSocketConnection;
import systems.dmx.core.service.websocket.WebSocketService;
import javax.servlet.http.HttpServletRequest;
......@@ -10,6 +11,7 @@ import javax.servlet.http.HttpServletRequest;
import java.util.Collection;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Predicate;
import java.util.logging.Level;
import java.util.logging.Logger;
......@@ -43,27 +45,26 @@ public class WebSocketServiceImpl implements WebSocketService {
// *** WebSocketService ***
@Override
public void messageToAll(String pluginUri, String message) {
broadcast(pluginUri, message, null); // exclude=null
public void sendToOrigin(String message) {
WebSocketConnectionImpl connection = getConnection();
if (connection != null) {
queueMessage(connection, message);
}
}
@Override
public void messageToAllButOne(HttpServletRequest request, String pluginUri, String message) {
if (request == null) {
throw new IllegalArgumentException("request is null");
}
broadcast(pluginUri, message, getConnection(pluginUri));
public void sendToAll(String message) {
broadcast(message, conn -> true);
}
@Override
public void messageToOne(HttpServletRequest request, String pluginUri, String message) {
if (request == null) {
throw new IllegalArgumentException("request is null");
}
WebSocketConnection connection = getConnection(pluginUri);
if (connection != null) {
queueMessage(connection, message);
}
public void sendToAllButOrigin(String message) {
broadcast(message, conn -> !conn.getClientId().equals(clientId()));
}
@Override
public void sendToSome(String message, Predicate<WebSocketConnection> connectionFilter) {
broadcast(message, connectionFilter);
}
// ---
......@@ -107,30 +108,19 @@ public class WebSocketServiceImpl implements WebSocketService {
// ------------------------------------------------------------------------------------------------- Private Methods
/**
* @return the WebSocket connection that is associated with the current request (based on its "dmx_client_id"
* cookie), or null if no such cookie exists or if called outside request scope (e.g. while system
* startup).
* @return the WebSocket connection that is associated with the current request (based on "dmx_client_id" cookie),
* or null if no such cookie exists or if called outside request scope (e.g. while system startup).
*/
private WebSocketConnection getConnection(String pluginUri) {
private WebSocketConnectionImpl getConnection() {
String clientId = clientId();
return clientId != null ? pool.getConnection(pluginUri, clientId) : null;
return clientId != null ? pool.getConnection(clientId) : null;
}
/**
* @param exclude may be null
*/
private void broadcast(String pluginUri, String message, WebSocketConnection exclude) {
Collection<WebSocketConnection> connections = pool.getConnections(pluginUri);
if (connections != null) {
for (WebSocketConnection connection : connections) {
if (connection != exclude) {
queueMessage(connection, message);
}
}
}
private void broadcast(String message, Predicate<WebSocketConnection> connectionFilter) {
pool.getAllConnections().stream().filter(connectionFilter).forEach(conn -> queueMessage(conn, message));
}
private void queueMessage(WebSocketConnection connection, String message) {
private void queueMessage(WebSocketConnectionImpl connection, String message) {
worker.queueMessage(connection, message);
}
......@@ -165,7 +155,7 @@ public class WebSocketServiceImpl implements WebSocketService {
}
}
private void queueMessage(WebSocketConnection connection, String message) {
private void queueMessage(WebSocketConnectionImpl connection, String message) {
try {
// logger.info("----- queueing message " + Thread.currentThread().getName());
messages.put(new QueuedMessage(connection, message));
......@@ -177,10 +167,10 @@ public class WebSocketServiceImpl implements WebSocketService {
private static class QueuedMessage {
private WebSocketConnection connection;
private WebSocketConnectionImpl connection;
private String message;
private QueuedMessage(WebSocketConnection connection, String message) {
private QueuedMessage(WebSocketConnectionImpl connection, String message) {
this.connection = connection;
this.message = message;
}
......
......@@ -35,7 +35,7 @@ public class WebSocketServlet extends org.eclipse.jetty.websocket.WebSocketServl
public WebSocket doWebSocketConnect(HttpServletRequest request, String protocol) {
try {
checkProtocol(protocol);
return new WebSocketConnection(protocol, clientId(request), session(request), pool, dmx);
return new WebSocketConnectionImpl(protocol, clientId(request), session(request), pool, dmx);
} catch (Exception e) {
throw new RuntimeException("Opening a WebSocket connection " +
(protocol != null ? "for plugin \"" + protocol + "\" " : "") + "failed", e);
......
package systems.dmx.core.service.websocket;
public interface WebSocketConnection {
String getClientId();
}
package systems.dmx.core.service.websocket;
import javax.servlet.http.HttpServletRequest;
import java.util.function.Predicate;
public interface WebSocketService {
void messageToAll(String pluginUri, String message);
void sendToOrigin(String message);
// ### TODO: drop "request" parameter?
void messageToAllButOne(HttpServletRequest request, String pluginUri, String message);
void sendToAll(String message);
// ### TODO: drop "request" parameter?
void messageToOne(HttpServletRequest request, String pluginUri, String message);
void sendToAllButOrigin(String message);
void sendToSome(String message, Predicate<WebSocketConnection> connectionFilter);
// ---
......
......@@ -3,6 +3,7 @@ package systems.dmx.topicmaps;
import systems.dmx.core.Topic;
import systems.dmx.core.model.topicmaps.ViewAssoc;
import systems.dmx.core.model.topicmaps.ViewTopic;
import systems.dmx.core.service.CoreService;
import org.codehaus.jettison.json.JSONObject;
......@@ -19,21 +20,21 @@ class Messenger {
// ---------------------------------------------------------------------------------------------- Instance Variables
private MessengerContext context;
private CoreService dmx;
private Logger logger = Logger.getLogger(getClass().getName());
// ---------------------------------------------------------------------------------------------------- Constructors
Messenger(MessengerContext context) {
this.context = context;
Messenger(CoreService dmx) {
this.dmx = dmx;
}
// ----------------------------------------------------------------------------------------- Package Private Methods
void newTopicmap(Topic topicmapTopic) {
try {
messageToAllButOne(new JSONObject()
sendToAllButOrigin(new JSONObject()
.put("type", "newTopicmap")
.put("args", new JSONObject()
.put("topicmapTopic", topicmapTopic.toJSON())
......@@ -47,7 +48,7 @@ class Messenger {
void addTopicToTopicmap(long topicmapId, ViewTopic topic) {
try {
// FIXME: per connection check read access
messageToAllButOne(new JSONObject()
sendToAllButOrigin(new JSONObject()
.put("type", "addTopicToTopicmap")
.put("args", new JSONObject()
.put("topicmapId", topicmapId)
......@@ -62,7 +63,7 @@ class Messenger {
void addAssocToTopicmap(long topicmapId, ViewAssoc assoc) {
try {
// FIXME: per connection check read access
messageToAllButOne(new JSONObject()
sendToAllButOrigin(new JSONObject()
.put("type", "addAssocToTopicmap")
.put("args", new JSONObject()
.put("topicmapId", topicmapId)
......@@ -76,7 +77,7 @@ class Messenger {
void setTopicPosition(long topicmapId, long topicId, int x, int y) {
try {
messageToAllButOne(new JSONObject()
sendToAllButOrigin(new JSONObject()
.put("type", "setTopicPosition")
.put("args", new JSONObject()
.put("topicmapId", topicmapId)
......@@ -94,7 +95,7 @@ class Messenger {
void setTopicVisibility(long topicmapId, long topicId, boolean visibility) {
try {
messageToAllButOne(new JSONObject()
sendToAllButOrigin(new JSONObject()
.put("type", "setTopicVisibility")
.put("args", new JSONObject()
.put("topicmapId", topicmapId)
......@@ -109,7 +110,7 @@ class Messenger {
void setAssocVisibility(long topicmapId, long assocId, boolean visibility) {
try {
messageToAllButOne(new JSONObject()
sendToAllButOrigin(new JSONObject()
.put("type", "setAssocVisibility")
.put("args", new JSONObject()
.put("topicmapId", topicmapId)
......@@ -124,9 +125,7 @@ class Messenger {
// ------------------------------------------------------------------------------------------------- Private Methods
private void messageToAllButOne(JSONObject message) {
context.getCoreService().getWebSocketService().messageToAllButOne(
context.getRequest(), pluginUri, message.toString()
);
private void sendToAllButOrigin(JSONObject message) {
dmx.getWebSocketService().sendToAllButOrigin(message.toString());
}
}
package systems.dmx.topicmaps;
import systems.dmx.core.service.CoreService;
import javax.servlet.http.HttpServletRequest;
interface MessengerContext {
CoreService getCoreService();
HttpServletRequest getRequest();
}
......@@ -18,7 +18,6 @@ import systems.dmx.core.service.Transactional;
import systems.dmx.core.util.DMXUtils;
import systems.dmx.core.util.IdList;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.GET;
import javax.ws.rs.PUT;
import javax.ws.rs.POST;
......@@ -28,7 +27,6 @@ import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import java.io.InputStream;
import java.util.ArrayList;
......@@ -44,7 +42,7 @@ import java.util.logging.Logger;
@Path("/topicmaps")
@Consumes("application/json")
@Produces("application/json")
public class TopicmapsPlugin extends PluginActivator implements TopicmapsService, MessengerContext {
public class TopicmapsPlugin extends PluginActivator implements TopicmapsService {
// ---------------------------------------------------------------------------------------------- Instance Variables
......@@ -55,10 +53,7 @@ public class TopicmapsPlugin extends PluginActivator implements TopicmapsService
private List<ViewmodelCustomizer> viewmodelCustomizers = new ArrayList();
private Messenger me = new Messenger(this);
@Context
private HttpServletRequest request; // required by Messenger
private Messenger me;
private Logger logger = Logger.getLogger(getClass().getName());
......@@ -420,20 +415,15 @@ public class TopicmapsPlugin extends PluginActivator implements TopicmapsService
// ************************
// *** MessengerContext ***
// ************************
// *************
// *** Hooks ***
// *************
@Override
public CoreService getCoreService() {
return dmx;
}
@Override
public HttpServletRequest getRequest() {
return request;
public void init() {
me = new Messenger(dmx);
}
......
......@@ -34,9 +34,6 @@ import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.servlet.http.HttpServletRequest;
import java.util.Iterator;
import java.util.List;
......@@ -55,9 +52,6 @@ public class WebservicePlugin extends PluginActivator {
// ---------------------------------------------------------------------------------------------- Instance Variables
@Context
private HttpServletRequest request;
private Messenger me = new Messenger("systems.dmx.webclient");
private Logger logger = Logger.getLogger(getClass().getName());
......@@ -600,7 +594,7 @@ public class WebservicePlugin extends PluginActivator {
private void newType(Topic type, String argName, String messageType) {
try {
messageToAllButOne(new JSONObject()
sendToAllButOrigin(new JSONObject()
.put("type", messageType)
.put("args", new JSONObject()
.put(argName, type.toJSON())
......@@ -611,8 +605,8 @@ public class WebservicePlugin extends PluginActivator {
}
}
private void messageToAllButOne(JSONObject message) {
dmx.getWebSocketService().messageToAllButOne(request, pluginUri, message.toString());
private void sendToAllButOrigin(JSONObject message) {
dmx.getWebSocketService().sendToAllButOrigin(message.toString());
}
}
}
......@@ -38,9 +38,6 @@ import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.servlet.http.HttpServletRequest;
import java.util.Iterator;
import java.util.List;
......@@ -85,9 +82,6 @@ public class WorkspacesPlugin extends PluginActivator implements WorkspacesServi
@Inject
private ConfigService configService;
@Context
private HttpServletRequest request;
private Messenger me = new Messenger("systems.dmx.webclient");
private Logger logger = Logger.getLogger(getClass().getName());
......@@ -608,7 +602,7 @@ public class WorkspacesPlugin extends PluginActivator implements WorkspacesServi
private void newWorkspace(Topic workspace) {
try {
messageToAllButOne(new JSONObject()
sendToAllButOrigin(new JSONObject()
.put("type", "newWorkspace")
.put("args", new JSONObject()
.put("workspace", workspace.toJSON())
......@@ -621,8 +615,8 @@ public class WorkspacesPlugin extends PluginActivator implements WorkspacesServi
// ---
private void messageToAllButOne(JSONObject message) {
dmx.getWebSocketService().messageToAllButOne(request, pluginUri, message.toString());
private void sendToAllButOrigin(JSONObject message) {
dmx.getWebSocketService().sendToAllButOrigin(message.toString());
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment