Commit ed3687fa authored by Jörg Richter's avatar Jörg Richter

Fix: SendMessageWorker continues on error (#384)

parent f4109efa
Pipeline #10507 passed with stage
in 8 minutes and 42 seconds
......@@ -108,8 +108,8 @@ class PrivilegedAccessImpl implements PrivilegedAccess {
//
return _hasPermission(username, operation, workspaceId);
} catch (Exception e) {
throw new RuntimeException("Checking permission for object " + objectId + " failed, typeUri=\"" + typeUri +
"\", " + userInfo(username) + ", operation=" + operation, e);
throw new RuntimeException("Checking permission for object " + objectId + " failed, typeUri=" + typeUri +
", " + userInfo(username) + ", operation=" + operation, e);
}
}
......@@ -489,12 +489,12 @@ class PrivilegedAccessImpl implements PrivilegedAccess {
private boolean permissionIfNoWorkspaceIsAssigned(Operation operation, long objectId, String typeUri) {
switch (operation) {
case READ:
logger.fine("Object " + objectId + " (typeUri=\"" + typeUri +
"\") is not assigned to any workspace -- READ permission is granted");
logger.fine("Object " + objectId + " (typeUri=" + typeUri +
") is not assigned to any workspace -- READ permission is granted");
return true;
case WRITE:
logger.warning("Object " + objectId + " (typeUri=\"" + typeUri +
"\") is not assigned to any workspace -- WRITE permission is refused");
logger.warning("Object " + objectId + " (typeUri=" + typeUri +
") is not assigned to any workspace -- WRITE permission is refused");
return false;
default:
throw new RuntimeException(operation + " is an unsupported operation");
......
......@@ -7,9 +7,8 @@ import systems.dmx.core.service.accesscontrol.Operation;
import systems.dmx.core.service.websocket.WebSocketConnection;
import systems.dmx.core.service.websocket.WebSocketService;
import javax.servlet.http.HttpServletRequest;
import org.codehaus.jettison.json.JSONObject;
import java.util.Collection;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Predicate;
......@@ -132,9 +131,7 @@ public class WebSocketServiceImpl implements WebSocketService {
String clientId = clientId();
return conn -> {
boolean isOrigin = conn.getClientId().equals(clientId);
if (isOrigin) {
logger.info(conn.getClientId() + " " + conn.getUsername() + " (origin) -> " + false);
}
logger.info(conn.getClientId() + " " + conn.getUsername() + " (isOrigin) -> " + isOrigin);
return isOrigin;
};
}
......@@ -142,7 +139,7 @@ public class WebSocketServiceImpl implements WebSocketService {
private Predicate<WebSocketConnection> isReadAllowed(long objectId) {
return conn -> {
boolean isReadAllowed = dmx.getPrivilegedAccess().hasPermission(conn.getUsername(), Operation.READ, objectId);
logger.info(conn.getClientId() + " " + conn.getUsername() + " -> " + isReadAllowed);
logger.info(conn.getClientId() + " " + conn.getUsername() + " (isReadAllowed) -> " + isReadAllowed);
return isReadAllowed;
};
}
......@@ -175,16 +172,21 @@ public class WebSocketServiceImpl implements WebSocketService {
@Override
public void run() {
try {
while (true) {
messageQueue.take().send();
boolean stopped = false;
while (!stopped) {
MessageTask task = null;
try {
task = messageQueue.take();
task.sendMessage();
yield();
} catch (InterruptedException e) {
stopped = true;
} catch (Exception e) {
logger.log(Level.SEVERE, "An error occurred in the SendMessageWorker while processing a \"" +
task.getMessageType() + "\" task (aborting this task):", e);
}
} catch (InterruptedException e) {
logger.info("Terminating SendMessageWorker");
} catch (Exception e) {
logger.log(Level.WARNING, "An exception occurred in the SendMessageWorker -- terminating:", e);
}
logger.info("### Terminating SendMessageWorker");
}
private void queueMessage(String message, WebSocketConnectionImpl connection) {
......@@ -229,16 +231,26 @@ public class WebSocketServiceImpl implements WebSocketService {
// ---
private void send() {
private void sendMessage() {
if (connection != null) {
_send(connection);
_sendMessage(connection);
} else {
pool.getAllConnections().stream().filter(connectionFilter).forEach(conn -> _send(conn));
pool.getAllConnections().stream().filter(connectionFilter).forEach(conn -> _sendMessage(conn));
}
}
private void _send(WebSocketConnectionImpl conn) {
private void _sendMessage(WebSocketConnectionImpl conn) {
conn.sendMessage(message);
}
// ---
private String getMessageType() {
try {
return new JSONObject(message).getString("type");
} catch (Exception e) {
throw new RuntimeException("JSON parsing error", e);
}
}
}
}
......@@ -34,7 +34,11 @@ class Messenger {
void newTopicmap(Topic topicmapTopic) {
try {
// FIXME: per connection check read access
// FIXME: send message only to users who have READ permission for the topicmap topic.
// Unfortunately we can't just use sendToReadAllowed() as the permission check is performed in another thread
// (WebSocketService's SendMessageWorker), and the create-topicmap transaction is not yet committed.
// The result would be "org.neo4j.graphdb.NotFoundException: 'typeUri' property not found for NodeImpl#1234"
// (where 1234 is the ID of the just created topicmap).
sendToAllButOrigin(new JSONObject()
.put("type", "newTopicmap")
.put("args", new JSONObject()
......
......@@ -89,7 +89,7 @@ public class TopicmapsPlugin extends PluginActivator implements TopicmapsService
));
getTopicmapType(topicmapTypeUri).initTopicmapState(topicmapTopic, viewProps, dmx);
//
me.newTopicmap(topicmapTopic); // FIXME: broadcast to eligible users only
me.newTopicmap(topicmapTopic);
return topicmapTopic;
} catch (Exception e) {
throw new RuntimeException("Creating topicmap \"" + name + "\" failed, topicmapTypeUri=\"" +
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment