Commit c5130b2c authored by Jörg Richter's avatar Jörg Richter

Merge branch 'websocket-servlet'

parents a15b9099 ed3687fa
Pipeline #10508 passed with stages
in 8 minutes and 40 seconds
......@@ -52,7 +52,7 @@ class JerseyResponseFilter implements ContainerResponseFilter {
// ---------------------------------------------------------------------------------------------- Instance Variables
private EventManager em;
private WebSocketService ws;
private WebSocketService wss;
@Context
private HttpServletRequest request;
......@@ -61,9 +61,9 @@ class JerseyResponseFilter implements ContainerResponseFilter {
// ---------------------------------------------------------------------------------------------------- Constructors
JerseyResponseFilter(EventManager em, WebSocketService ws) {
JerseyResponseFilter(EventManager em, WebSocketService wss) {
this.em = em;
this.ws = ws;
this.wss = wss;
}
// -------------------------------------------------------------------------------------------------- Public Methods
......@@ -203,7 +203,8 @@ class JerseyResponseFilter implements ContainerResponseFilter {
JSONObject message = new JSONObject()
.put("type", "processDirectives")
.put("args", directives.toJSONArray());
ws.messageToAllButOne(request, "systems.dmx.webclient", message.toString());
wss.sendToAllButOrigin(message.toString());
// FIXME: don't send update directives to unauthorized parties
}
......
......@@ -96,7 +96,8 @@ class PluginManager {
synchronized PluginImpl getPlugin(String pluginUri) {
PluginImpl plugin = activatedPlugins.get(pluginUri);
if (plugin == null) {
throw new RuntimeException("Plugin \"" + pluginUri + "\" is not installed/activated");
throw new RuntimeException("Plugin \"" + pluginUri + "\" is not installed/activated. Activated plugins: " +
activatedPlugins.keySet());
}
return plugin;
}
......
......@@ -108,8 +108,8 @@ class PrivilegedAccessImpl implements PrivilegedAccess {
//
return _hasPermission(username, operation, workspaceId);
} catch (Exception e) {
throw new RuntimeException("Checking permission for object " + objectId + " failed, typeUri=\"" + typeUri +
"\", " + userInfo(username) + ", operation=" + operation, e);
throw new RuntimeException("Checking permission for object " + objectId + " failed, typeUri=" + typeUri +
", " + userInfo(username) + ", operation=" + operation, e);
}
}
......@@ -489,12 +489,12 @@ class PrivilegedAccessImpl implements PrivilegedAccess {
private boolean permissionIfNoWorkspaceIsAssigned(Operation operation, long objectId, String typeUri) {
switch (operation) {
case READ:
logger.fine("Object " + objectId + " (typeUri=\"" + typeUri +
"\") is not assigned to any workspace -- READ permission is granted");
logger.fine("Object " + objectId + " (typeUri=" + typeUri +
") is not assigned to any workspace -- READ permission is granted");
return true;
case WRITE:
logger.warning("Object " + objectId + " (typeUri=\"" + typeUri +
"\") is not assigned to any workspace -- WRITE permission is refused");
logger.warning("Object " + objectId + " (typeUri=" + typeUri +
") is not assigned to any workspace -- WRITE permission is refused");
return false;
default:
throw new RuntimeException(operation + " is an unsupported operation");
......
......@@ -60,7 +60,7 @@ class WebPublishingService {
// ---------------------------------------------------------------------------------------------------- Constructors
WebPublishingService(AccessLayer al, WebSocketService ws) {
WebPublishingService(AccessLayer al, WebSocketService wss) {
try {
logger.info("Setting up the WebPublishingService");
this.al = al;
......@@ -71,7 +71,7 @@ class WebPublishingService {
// setup container filters
Map<String, Object> properties = jerseyApplication.getProperties();
properties.put(ResourceConfig.PROPERTY_CONTAINER_REQUEST_FILTERS, new JerseyRequestFilter(al.em));
properties.put(ResourceConfig.PROPERTY_CONTAINER_RESPONSE_FILTERS, new JerseyResponseFilter(al.em, ws));
properties.put(ResourceConfig.PROPERTY_CONTAINER_RESPONSE_FILTERS, new JerseyResponseFilter(al.em, wss));
properties.put(ResourceConfig.PROPERTY_RESOURCE_FILTER_FACTORIES, new TransactionFactory(al));
//
// deploy Jersey application in container
......
package systems.dmx.core.impl;
import systems.dmx.core.service.CoreService;
import systems.dmx.core.service.websocket.WebSocketConnection;
import org.eclipse.jetty.websocket.WebSocket;
import org.eclipse.jetty.websocket.WebSocket.Connection;
......@@ -18,13 +19,14 @@ import java.util.logging.Logger;
* <p>
* Once the actual WebSocket connection is opened or closed the WebSocketConnection is added/removed to a pool.
*/
class WebSocketConnection implements WebSocket, WebSocket.OnTextMessage, WebSocket.OnBinaryMessage {
class WebSocketConnectionImpl implements WebSocketConnection, WebSocket, WebSocket.OnTextMessage,
WebSocket.OnBinaryMessage {
// ---------------------------------------------------------------------------------------------- Instance Variables
String pluginUri;
private String pluginUri; // TODO: drop
String clientId;
HttpSession session;
private HttpSession session;
private WebSocketConnectionPool pool;
private CoreService dmx;
......@@ -37,31 +39,47 @@ class WebSocketConnection implements WebSocket, WebSocket.OnTextMessage, WebSock
// ----------------------------------------------------------------------------------------------------- Constructor
WebSocketConnection(String pluginUri, String clientId, HttpSession session, WebSocketConnectionPool pool,
CoreService dmx) {
logger.info("### Associating WebSocket connection (client ID " + clientId + ") with " + info(session));
/**
* @param session not null
*/
WebSocketConnectionImpl(String pluginUri, String clientId, HttpSession session, WebSocketConnectionPool pool,
CoreService dmx) {
this.pluginUri = pluginUri;
this.clientId = clientId;
this.session = session;
this.pool = pool;
this.dmx = dmx;
// Note: info(session) relies on "dmx"
logger.info("### Associating WebSocket connection " + clientId + " (client ID) with " + info(session));
}
// -------------------------------------------------------------------------------------------------- Public Methods
// *** WebSocketConnection ***
@Override
public String getClientId() {
return clientId;
}
@Override
public String getUsername() {
return username(session);
}
// *** WebSocket ***
@Override
public void onOpen(Connection connection) {
logger.info("Opening a WebSocket connection for plugin \"" + pluginUri + "\" (client ID " + clientId + ")");
logger.info("Opening WebSocket connection " + clientId + " (client ID)");
this.connection = connection;
pool.add(this);
pool.addConnection(this);
}
@Override
public void onClose(int code, String message) {
logger.info("Closing a WebSocket connection of plugin \"" + pluginUri + "\" (client ID " + clientId + ")");
pool.remove(this);
logger.info("Closing WebSocket connection " + clientId + " (client ID)");
pool.removeConnection(this);
}
// *** WebSocket.OnTextMessage ***
......@@ -91,11 +109,15 @@ class WebSocketConnection implements WebSocket, WebSocket.OnTextMessage, WebSock
try {
connection.sendMessage(message);
} catch (Exception e) {
pool.remove(this);
logger.log(Level.SEVERE, "Sending message via " + this + " failed -- connection removed", e);
pool.removeConnection(this);
logger.log(Level.SEVERE, "Sending message via " + this + " failed -- connection removed from pool", e);
}
}
void close() {
connection.close();
}
// ------------------------------------------------------------------------------------------------- Private Methods
// === Logging ===
......
......@@ -12,10 +12,9 @@ class WebSocketConnectionPool {
// ---------------------------------------------------------------------------------------------- Instance Variables
/**
* 1st hash: plugin URI
* 2nd hash: client ID
* key: client ID
*/
private Map<String, Map<String, WebSocketConnection>> pool = new ConcurrentHashMap();
private Map<String, WebSocketConnectionImpl> pool = new ConcurrentHashMap();
private Logger logger = Logger.getLogger(getClass().getName());
......@@ -26,43 +25,32 @@ class WebSocketConnectionPool {
// ----------------------------------------------------------------------------------------- Package Private Methods
/**
* Returns the open WebSocket connections associated to the given plugin, or <code>null</code> if there are none.
*/
Collection<WebSocketConnection> getConnections(String pluginUri) {
Map connections = pool.get(pluginUri);
return connections != null ? connections.values() : null;
}
WebSocketConnection getConnection(String pluginUri, String clientId) {
Map<String, WebSocketConnection> connections = pool.get(pluginUri);
if (connections == null) {
logger.warning("No WebSocket connection open for plugin \"" + pluginUri + "\"");
return null;
}
WebSocketConnection connection = connections.get(clientId);
WebSocketConnectionImpl getConnection(String clientId) {
WebSocketConnectionImpl connection = pool.get(clientId);
if (connection == null) {
logger.warning("No WebSocket connection open for client ID " + clientId + " (plugin \"" + pluginUri +
"\")");
logger.warning("No WebSocket connection open for client ID " + clientId);
}
return connection;
}
void add(WebSocketConnection connection) {
String pluginUri = connection.pluginUri;
Map connections = pool.get(pluginUri);
if (connections == null) {
connections = new ConcurrentHashMap();
pool.put(pluginUri, connections);
}
connections.put(connection.clientId, connection);
Collection<WebSocketConnectionImpl> getAllConnections() {
return pool.values();
}
void addConnection(WebSocketConnectionImpl connection) {
pool.put(connection.clientId, connection);
}
void remove(WebSocketConnection connection) {
String pluginUri = connection.pluginUri;
boolean removed = getConnections(pluginUri).remove(connection);
void removeConnection(WebSocketConnectionImpl connection) {
boolean removed = pool.remove(connection.clientId) != null;
if (!removed) {
throw new RuntimeException("Removing a connection of plugin \"" + pluginUri + "\" failed");
throw new RuntimeException("Can't remove WebSocket connection " + connection.clientId +
" (client ID) from pool");
}
}
void close() {
logger.info("Closing " + pool.size() + " WebSocket connections");
getAllConnections().forEach(WebSocketConnectionImpl::close);
}
}
package systems.dmx.core.impl;
import systems.dmx.core.service.CoreService;
import systems.dmx.core.util.JavaUtils;
import org.eclipse.jetty.websocket.WebSocket;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpSession;
import java.util.logging.Logger;
public class WebSocketServlet extends org.eclipse.jetty.websocket.WebSocketServlet {
// ---------------------------------------------------------------------------------------------- Instance Variables
private WebSocketConnectionPool pool;
private CoreService dmx;
private Logger logger = Logger.getLogger(getClass().getName());
// ----------------------------------------------------------------------------------------------------- Constructor
// ### TODO: inject event manager only
WebSocketServlet(WebSocketConnectionPool pool, CoreService dmx) {
this.pool = pool;
this.dmx = dmx;
}
// -------------------------------------------------------------------------------------------------- Public Methods
@Override
public WebSocket doWebSocketConnect(HttpServletRequest request, String protocol) {
try {
checkProtocol(protocol);
return new WebSocketConnectionImpl(protocol, clientId(request), session(request), pool, dmx);
} catch (Exception e) {
throw new RuntimeException("Opening a WebSocket connection " +
(protocol != null ? "for plugin \"" + protocol + "\" " : "") + "failed", e);
}
}
// ------------------------------------------------------------------------------------------------- Private Methods
private void checkProtocol(String pluginUri) {
if (pluginUri == null) {
throw new RuntimeException("A plugin URI is missing in the WebSocket handshake -- Add your " +
"plugin's URI as the 2nd argument to the JavaScript WebSocket constructor");
}
dmx.getPlugin(pluginUri); // check plugin URI, throws if invalid
}
private String clientId(HttpServletRequest request) {
String clientId = JavaUtils.cookieValue(request, "dmx_client_id");
if (clientId == null) {
throw new RuntimeException("Missing \"dmx_client_id\" cookie in upgrade request");
}
return clientId;
}
private HttpSession session(HttpServletRequest request) {
// logger.info("request=" + JavaUtils.requestDump(request));
HttpSession session = request.getSession(false);
if (session == null) {
throw new RuntimeException("No (valid) session associated with upgrade request");
}
return session;
}
}
package systems.dmx.core.service.websocket;
public interface WebSocketConnection {
String getClientId();
/**
* @return the username associated with this WebSocket connection, or null if no one is associated (= not logged in).
*/
String getUsername();
}
package systems.dmx.core.service.websocket;
import javax.servlet.http.HttpServletRequest;
import java.util.function.Predicate;
public interface WebSocketService {
void messageToAll(String pluginUri, String message);
void sendToOrigin(String message);
// ### TODO: drop "request" parameter?
void messageToAllButOne(HttpServletRequest request, String pluginUri, String message);
void sendToAll(String message);
// ### TODO: drop "request" parameter?
void messageToOne(HttpServletRequest request, String pluginUri, String message);
void sendToAllButOrigin(String message);
void sendToReadAllowed(String message, long objectId);
void sendToSome(String message, Predicate<WebSocketConnection> connectionFilter);
// ---
......
......@@ -49,7 +49,7 @@ export default ({store}) => {
const topicModel = new dm5.Topic(topicType.newTopicModel(value)).fillChildren()
// console.log('createTopic', topicModel)
dm5.restClient.createTopic(topicModel).then(topic => {
console.log('Created', topic)
// console.log('Created', topic)
revealTopic(topic)
store.dispatch('_processDirectives', topic.directives)
})
......
......@@ -3,6 +3,7 @@ package systems.dmx.topicmaps;
import systems.dmx.core.Topic;
import systems.dmx.core.model.topicmaps.ViewAssoc;
import systems.dmx.core.model.topicmaps.ViewTopic;
import systems.dmx.core.service.websocket.WebSocketService;
import org.codehaus.jettison.json.JSONObject;
......@@ -19,21 +20,26 @@ class Messenger {
// ---------------------------------------------------------------------------------------------- Instance Variables
private MessengerContext context;
private WebSocketService wss;
private Logger logger = Logger.getLogger(getClass().getName());
// ---------------------------------------------------------------------------------------------------- Constructors
Messenger(MessengerContext context) {
this.context = context;
Messenger(WebSocketService wss) {
this.wss = wss;
}
// ----------------------------------------------------------------------------------------- Package Private Methods
void newTopicmap(Topic topicmapTopic) {
try {
messageToAllButOne(new JSONObject()
// FIXME: send message only to users who have READ permission for the topicmap topic.
// Unfortunately we can't just use sendToReadAllowed() as the permission check is performed in another thread
// (WebSocketService's SendMessageWorker), and the create-topicmap transaction is not yet committed.
// The result would be "org.neo4j.graphdb.NotFoundException: 'typeUri' property not found for NodeImpl#1234"
// (where 1234 is the ID of the just created topicmap).
sendToAllButOrigin(new JSONObject()
.put("type", "newTopicmap")
.put("args", new JSONObject()
.put("topicmapTopic", topicmapTopic.toJSON())
......@@ -46,13 +52,12 @@ class Messenger {
void addTopicToTopicmap(long topicmapId, ViewTopic topic) {
try {
// FIXME: per connection check read access
messageToAllButOne(new JSONObject()
sendToReadAllowed(new JSONObject()
.put("type", "addTopicToTopicmap")
.put("args", new JSONObject()
.put("topicmapId", topicmapId)
.put("viewTopic", topic.toJSON())
)
), topic.getId()
);
} catch (Exception e) {
logger.log(Level.WARNING, "Error while sending a \"addTopicToTopicmap\" message:", e);
......@@ -61,13 +66,12 @@ class Messenger {
void addAssocToTopicmap(long topicmapId, ViewAssoc assoc) {
try {
// FIXME: per connection check read access
messageToAllButOne(new JSONObject()
sendToReadAllowed(new JSONObject()
.put("type", "addAssocToTopicmap")
.put("args", new JSONObject()
.put("topicmapId", topicmapId)
.put("viewAssoc", assoc.toJSON())
)
), assoc.getId()
);
} catch (Exception e) {
logger.log(Level.WARNING, "Error while sending a \"addAssocToTopicmap\" message:", e);
......@@ -76,7 +80,7 @@ class Messenger {
void setTopicPosition(long topicmapId, long topicId, int x, int y) {
try {
messageToAllButOne(new JSONObject()
sendToAllButOrigin(new JSONObject()
.put("type", "setTopicPosition")
.put("args", new JSONObject()
.put("topicmapId", topicmapId)
......@@ -94,7 +98,7 @@ class Messenger {
void setTopicVisibility(long topicmapId, long topicId, boolean visibility) {
try {
messageToAllButOne(new JSONObject()
sendToAllButOrigin(new JSONObject()
.put("type", "setTopicVisibility")
.put("args", new JSONObject()
.put("topicmapId", topicmapId)
......@@ -109,7 +113,7 @@ class Messenger {
void setAssocVisibility(long topicmapId, long assocId, boolean visibility) {
try {
messageToAllButOne(new JSONObject()
sendToAllButOrigin(new JSONObject()
.put("type", "setAssocVisibility")
.put("args", new JSONObject()
.put("topicmapId", topicmapId)
......@@ -124,9 +128,11 @@ class Messenger {
// ------------------------------------------------------------------------------------------------- Private Methods
private void messageToAllButOne(JSONObject message) {
context.getCoreService().getWebSocketService().messageToAllButOne(
context.getRequest(), pluginUri, message.toString()
);
private void sendToAllButOrigin(JSONObject message) {
wss.sendToAllButOrigin(message.toString());
}
private void sendToReadAllowed(JSONObject message, long objectId) {
wss.sendToReadAllowed(message.toString(), objectId);
}
}
package systems.dmx.topicmaps;
import systems.dmx.core.service.CoreService;
import javax.servlet.http.HttpServletRequest;
interface MessengerContext {
CoreService getCoreService();
HttpServletRequest getRequest();
}
......@@ -13,12 +13,10 @@ import systems.dmx.core.model.topicmaps.ViewAssoc;
import systems.dmx.core.model.topicmaps.ViewTopic;
import systems.dmx.core.model.topicmaps.ViewProps;
import systems.dmx.core.osgi.PluginActivator;
import systems.dmx.core.service.CoreService;
import systems.dmx.core.service.Transactional;
import systems.dmx.core.util.DMXUtils;
import systems.dmx.core.util.IdList;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.GET;
import javax.ws.rs.PUT;
import javax.ws.rs.POST;
......@@ -28,7 +26,6 @@ import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import java.io.InputStream;
import java.util.ArrayList;
......@@ -44,7 +41,7 @@ import java.util.logging.Logger;
@Path("/topicmaps")
@Consumes("application/json")
@Produces("application/json")
public class TopicmapsPlugin extends PluginActivator implements TopicmapsService, MessengerContext {
public class TopicmapsPlugin extends PluginActivator implements TopicmapsService {
// ---------------------------------------------------------------------------------------------- Instance Variables
......@@ -55,10 +52,7 @@ public class TopicmapsPlugin extends PluginActivator implements TopicmapsService
private List<ViewmodelCustomizer> viewmodelCustomizers = new ArrayList();
private Messenger me = new Messenger(this);
@Context
private HttpServletRequest request; // required by Messenger
private Messenger me;
private Logger logger = Logger.getLogger(getClass().getName());
......@@ -95,7 +89,7 @@ public class TopicmapsPlugin extends PluginActivator implements TopicmapsService
));
getTopicmapType(topicmapTypeUri).initTopicmapState(topicmapTopic, viewProps, dmx);
//
me.newTopicmap(topicmapTopic); // FIXME: broadcast to eligible users only
me.newTopicmap(topicmapTopic);
return topicmapTopic;
} catch (Exception e) {
throw new RuntimeException("Creating topicmap \"" + name + "\" failed, topicmapTypeUri=\"" +
......@@ -420,20 +414,15 @@ public class TopicmapsPlugin extends PluginActivator implements TopicmapsService
// ************************
// *** MessengerContext ***
// ************************
// *************
// *** Hooks ***
// *************
@Override
public CoreService getCoreService() {
return dmx;
}
@Override
public HttpServletRequest getRequest() {
return request;
public void init() {
me = new Messenger(dmx.getWebSocketService());
}
......
......@@ -277,9 +277,9 @@ const actions = {
player1: {roleTypeUri: 'dmx.core.default', ...playerId1},
player2: {roleTypeUri: 'dmx.core.default', ...playerId2}
}
console.log('createAssoc', assocModel)
// console.log('createAssoc', assocModel)
dm5.restClient.createAssoc(assocModel).then(assoc => {
console.log('Created', assoc)
// console.log('Created', assoc)
dispatch('revealAssoc', {assoc})
dispatch('_processDirectives', assoc.directives)
})
......
......@@ -3,27 +3,27 @@ import dm5 from 'dm5'
const actions = {
createTopicType ({dispatch}, {name, pos}) {
console.log('Creating topic type', name)
// console.log('Creating topic type', name)
dm5.restClient.createTopicType(defaultTopicType(name)).then(topicType => {
console.log('Created', topicType)
// console.log('Created', topicType)
dispatch('putTopicType', topicType)
dispatch('revealTopic', {topic: topicType, pos})
})
},
createAssocType ({dispatch}, {name, pos}) {
console.log('Creating assoc type', name)
// console.log('Creating assoc type', name)
dm5.restClient.createAssocType(defaultAssocType(name)).then(assocType => {
console.log('Created', assocType)
// console.log('Created', assocType)
dispatch('putAssocType', assocType)
dispatch('revealTopic', {topic: assocType, pos})
})
},
createRoleType ({dispatch}, {name, pos}) {
console.log('Creating role type', name)
// console.log('Creating role type', name)
dm5.restClient.createRoleType(defaultRoleType(name)).then(roleType => {
console.log('Created', roleType)
// console.log('Created', roleType)
dispatch('putRoleType', roleType)
dispatch('revealTopic', {topic: roleType, pos})
})
......
......@@ -34,8 +34,8 @@ export default extraElementUI => {
//
let p // a promise resolved once the assets of all installed plugins are registered
if (DEV) {
console.info('[DMX] You are running the webclient in development mode.\nFrontend code is hot reloaded from ' +
'file system (instead fetched from DMX backend server).\nTo get Hot Module Replacement add your plugin to ' +
console.info('[DMX] You are running the DMX webclient in development mode.\nFrontend code is hot reloaded from ' +
'file system (instead retrieved through DMX backend server).\nTo get Hot Module Replacement add your plugin to ' +
'modules/dmx-webclient/src/main/js/plugin_manager.js')
p = Promise.resolve()
} else {
......
......@@ -34,9 +34,6 @@ import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.servlet.http.HttpServletRequest;
import java.util.Iterator;
import java.util.List;
......@@ -55,9 +52,6 @@ public class WebservicePlugin extends PluginActivator {
// ---------------------------------------------------------------------------------------------- Instance Variables
@Context
private HttpServletRequest request;
private Messenger me = new Messenger("systems.dmx.webclient");
private Logger logger = Logger.getLogger(getClass().getName());
......@@ -600,7 +594,7 @@ public class WebservicePlugin extends PluginActivator {
private void newType(Topic type, String argName, String messageType) {
try {
messageToAllButOne(new JSONObject()
sendToAllButOrigin(new JSONObject()
.put("type", messageType)
.put("args", new JSONObject()
.put(argName, type.toJSON())
......@@ -611,8 +605,8 @@ public class WebservicePlugin extends PluginActivator {
}
}