Commit a1bfb1b3 authored by Jörg Richter's avatar Jörg Richter

WebSocketService: optimize internal SendMessageWorker (#384)

parent 828f1093
Pipeline #10481 passed with stage
in 8 minutes and 29 seconds
......@@ -48,23 +48,27 @@ public class WebSocketServiceImpl implements WebSocketService {
public void sendToOrigin(String message) {
WebSocketConnectionImpl connection = getConnection();
if (connection != null) {
queueMessage(connection, message);
queueMessage(message, connection);
}
}
@Override
public void sendToAll(String message) {
broadcast(message, conn -> true);
queueMessage(message, conn -> true);
}
@Override
public void sendToAllButOrigin(String message) {
broadcast(message, conn -> !conn.getClientId().equals(clientId()));
// Note: the predicate is evaluated in another thread (SendMessageWorker). So to read out the client-id
// cookie -- which is stored thread-locally -- we call clientId() from *this* thread (instead from predicate)
// and hold the result in the predicate's closure.
String clientId = clientId();
queueMessage(message, conn -> !conn.getClientId().equals(clientId));
}
@Override
public void sendToSome(String message, Predicate<WebSocketConnection> connectionFilter) {
broadcast(message, connectionFilter);
queueMessage(message, connectionFilter);
}
// ---
......@@ -107,6 +111,15 @@ public class WebSocketServiceImpl implements WebSocketService {
// ------------------------------------------------------------------------------------------------- Private Methods
private void queueMessage(String message, WebSocketConnectionImpl connection) {
worker.queueMessage(message, connection);
}
private void queueMessage(String message, Predicate<WebSocketConnection> connectionFilter) {
worker.queueMessage(message, connectionFilter);
}
/**
* @return the WebSocket connection that is associated with the current request (based on "dmx_client_id" cookie),
* or null if no such cookie exists or if called outside request scope (e.g. while system startup).
......@@ -116,14 +129,6 @@ public class WebSocketServiceImpl implements WebSocketService {
return clientId != null ? pool.getConnection(clientId) : null;
}
private void broadcast(String message, Predicate<WebSocketConnection> connectionFilter) {
pool.getAllConnections().stream().filter(connectionFilter).forEach(conn -> queueMessage(conn, message));
}
private void queueMessage(WebSocketConnectionImpl connection, String message) {
worker.queueMessage(connection, message);
}
private String clientId() {
Cookies cookies = Cookies.get();
return cookies.has("dmx_client_id") ? cookies.get("dmx_client_id") : null;
......@@ -133,7 +138,7 @@ public class WebSocketServiceImpl implements WebSocketService {
private class SendMessageWorker extends Thread {
private BlockingQueue<QueuedMessage> messages = new LinkedBlockingQueue();
private BlockingQueue<MessageTask> messageQueue = new LinkedBlockingQueue();
private SendMessageWorker() {
setPriority(Thread.MIN_PRIORITY);
......@@ -143,36 +148,68 @@ public class WebSocketServiceImpl implements WebSocketService {
public void run() {
try {
while (true) {
QueuedMessage message = messages.take();
messageQueue.take().send();
yield();
// logger.info("----- sending message " + Thread.currentThread().getName());
message.connection.sendMessage(message.message);
}
} catch (InterruptedException e) {
logger.info("### Terminating SendMessageWorker thread");
logger.info("### Terminating SendMessageWorker");
} catch (Exception e) {
logger.log(Level.WARNING, "An exception occurred in the SendMessageWorker thread -- terminating:", e);
logger.log(Level.WARNING, "An exception occurred in the SendMessageWorker -- terminating:", e);
}
}
private void queueMessage(WebSocketConnectionImpl connection, String message) {
private void queueMessage(String message, WebSocketConnectionImpl connection) {
try {
// logger.info("----- queueing message " + Thread.currentThread().getName());
messages.put(new QueuedMessage(connection, message));
messageQueue.put(new MessageTask(message, connection));
} catch (InterruptedException e) {
logger.log(Level.WARNING, "Queueing a message failed:", e);
}
}
private void queueMessage(String message, Predicate<WebSocketConnection> connectionFilter) {
try {
messageQueue.put(new MessageTask(message, connectionFilter));
} catch (InterruptedException e) {
logger.log(Level.WARNING, "Queueing a message failed:", e);
}
}
}
private static class QueuedMessage {
private class MessageTask {
private WebSocketConnectionImpl connection;
private String message;
private QueuedMessage(WebSocketConnectionImpl connection, String message) {
private WebSocketConnectionImpl connection;
private Predicate<WebSocketConnection> connectionFilter;
/**
* A send-to-one task.
*/
private MessageTask(String message, WebSocketConnectionImpl connection) {
this.message = message;
this.connection = connection;
}
/**
* A send-to-many task.
*/
private MessageTask(String message, Predicate<WebSocketConnection> connectionFilter) {
this.message = message;
this.connectionFilter = connectionFilter;
}
// ---
private void send() {
if (connection != null) {
_send(connection);
} else {
pool.getAllConnections().stream().filter(connectionFilter).forEach(conn -> _send(conn));
}
}
private void _send(WebSocketConnectionImpl conn) {
conn.sendMessage(message);
}
}
}
......@@ -49,7 +49,6 @@ class Messenger {
void addTopicToTopicmap(long topicmapId, ViewTopic topic) {
try {
// FIXME: per connection check read access
sendToAuthorized(new JSONObject()
.put("type", "addTopicToTopicmap")
.put("args", new JSONObject()
......@@ -64,7 +63,6 @@ class Messenger {
void addAssocToTopicmap(long topicmapId, ViewAssoc assoc) {
try {
// FIXME: per connection check read access
sendToAuthorized(new JSONObject()
.put("type", "addAssocToTopicmap")
.put("args", new JSONObject()
......
......@@ -34,8 +34,8 @@ export default extraElementUI => {
//
let p // a promise resolved once the assets of all installed plugins are registered
if (DEV) {
console.info('[DMX] You are running the webclient in development mode.\nFrontend code is hot reloaded from ' +
'file system (instead fetched from DMX backend server).\nTo get Hot Module Replacement add your plugin to ' +
console.info('[DMX] You are running the DMX webclient in development mode.\nFrontend code is hot reloaded from ' +
'file system (instead retrieved through DMX backend server).\nTo get Hot Module Replacement add your plugin to ' +
'modules/dmx-webclient/src/main/js/plugin_manager.js')
p = Promise.resolve()
} else {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment