Blue Brain BioExplorer
RocketsPlugin.cpp
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2015-2018, EPFL/Blue Brain Project
3  *
4  * The Blue Brain BioExplorer is a tool for scientists to extract and analyse
5  * scientific data from visualization
6  *
7  * This file is part of Blue Brain BioExplorer <https://github.com/BlueBrain/BioExplorer>
8  *
9  * This library is free software; you can redistribute it and/or modify it under
10  * the terms of the GNU Lesser General Public License version 3.0 as published
11  * by the Free Software Foundation.
12  *
13  * This library is distributed in the hope that it will be useful, but WITHOUT
14  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
15  * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
16  * details.
17  *
18  * You should have received a copy of the GNU Lesser General Public License
19  * along with this library; if not, write to the Free Software Foundation, Inc.,
20  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
21  */
22 
23 // needs to be before RocketsPlugin.h to make template instantiation for
24 // _handleRPC work
25 #include "jsonPropertyMap.h"
26 
27 #include "RocketsPlugin.h"
28 
29 #include <Defines.h>
30 #include <Version.h>
31 
36 
40 
41 #ifdef USE_NETWORKING
42 #include <uvw.hpp>
43 #endif
44 
45 #include <rockets/jsonrpc/helpers.h>
46 #include <rockets/jsonrpc/server.h>
47 #include <rockets/server.h>
48 
49 #include "BinaryRequests.h"
50 #include "ImageGenerator.h"
51 #include "Throttle.h"
52 
53 #include <atomic>
54 #include <dirent.h>
55 #include <fstream>
56 #include <limits.h>
57 #include <unistd.h>
58 
59 #include <sys/stat.h>
60 
61 #ifdef BRAYNS_USE_FFMPEG
62 #include "encoder.h"
63 #endif
64 
65 namespace
66 {
67 constexpr int64_t INTERACTIVE_THROTTLE = 1;
68 constexpr int64_t DEFAULT_THROTTLE = 50;
69 constexpr int64_t SLOW_THROTTLE = 750;
70 
71 const int MODEL_NOT_FOUND = -12345;
72 const int INSTANCE_NOT_FOUND = -12346;
73 const int TASK_RESULT_TO_JSON_ERROR = -12347;
74 const int SCHEMA_RPC_ENDPOINT_NOT_FOUND = -12348;
75 const int PARAMETER_FROM_JSON_ERROR = -12349;
76 const int VIDEOSTREAMING_NOT_SUPPORTED = -12350;
77 const int VIDEOSTREAMING_NOT_ENABLED = -12351;
78 
79 // REST PUT & GET, JSONRPC set-* notification, JSONRPC get-* request
80 const std::string ENDPOINT_ANIMATION_PARAMS = "animation-parameters";
81 const std::string ENDPOINT_APP_PARAMS = "application-parameters";
82 const std::string ENDPOINT_CAMERA = "camera";
83 const std::string ENDPOINT_CAMERA_PARAMS = "camera-params";
84 const std::string ENDPOINT_RENDERER = "renderer";
85 const std::string ENDPOINT_RENDERER_PARAMS = "renderer-params";
86 const std::string ENDPOINT_SCENE = "scene";
87 const std::string ENDPOINT_VOLUME_PARAMS = "volume-parameters";
88 const std::string ENDPOINT_RENDER_PARAMS = "rendering-parameters";
89 const std::string ENDPOINT_GEOMETRY_PARAMS = "geometry-parameters";
90 const std::string ENDPOINT_FIELD_PARAMS = "field-parameters";
91 
92 // REST GET, JSONRPC get-* request
93 const std::string ENDPOINT_STATISTICS = "statistics";
94 const std::string ENDPOINT_VERSION = "version";
95 
96 // JSONRPC async requests
97 const std::string METHOD_ADD_MODEL = "add-model";
98 const std::string METHOD_SNAPSHOT = "snapshot";
99 // METHOD_REQUEST_MODEL_UPLOAD from BinaryRequests.h
100 
101 // JSONRPC synchronous requests
102 const std::string METHOD_ADD_CLIP_PLANE = "add-clip-plane";
103 const std::string METHOD_GET_CLIP_PLANES = "get-clip-planes";
104 const std::string METHOD_GET_ENVIRONMENT_MAP = "get-environment-map";
105 const std::string METHOD_GET_INSTANCES = "get-instances";
106 const std::string METHOD_GET_LOADERS = "get-loaders";
107 const std::string METHOD_GET_MODEL_PROPERTIES = "get-model-properties";
108 const std::string METHOD_GET_MODEL_TRANSFER_FUNCTION = "get-model-transfer-function";
109 const std::string METHOD_GET_VIDEOSTREAM = "get-videostream";
110 const std::string METHOD_IMAGE_JPEG = "image-jpeg";
111 const std::string METHOD_SET_STREAMING_METHOD = "image-streaming-mode";
112 const std::string METHOD_TRIGGER_JPEG_STREAM = "trigger-jpeg-stream";
113 const std::string METHOD_INSPECT = "inspect";
114 const std::string METHOD_MODEL_PROPERTIES_SCHEMA = "model-properties-schema";
115 const std::string METHOD_REMOVE_CLIP_PLANES = "remove-clip-planes";
116 const std::string METHOD_REMOVE_MODEL = "remove-model";
117 const std::string METHOD_SCHEMA = "schema";
118 const std::string METHOD_SET_ENVIRONMENT_MAP = "set-environment-map";
119 const std::string METHOD_SET_MODEL_PROPERTIES = "set-model-properties";
120 const std::string METHOD_SET_MODEL_TRANSFER_FUNCTION = "set-model-transfer-function";
121 const std::string METHOD_SET_VIDEOSTREAM = "set-videostream";
122 const std::string METHOD_UPDATE_CLIP_PLANE = "update-clip-plane";
123 const std::string METHOD_UPDATE_INSTANCE = "update-instance";
124 const std::string METHOD_UPDATE_MODEL = "update-model";
125 const std::string METHOD_GET_LIGHTS = "get-lights";
126 const std::string METHOD_ADD_LIGHT_SPHERE = "add-light-sphere";
127 const std::string METHOD_ADD_LIGHT_DIRECTIONAL = "add-light-directional";
128 const std::string METHOD_ADD_LIGHT_QUAD = "add-light-quad";
129 const std::string METHOD_ADD_LIGHT_SPOT = "add-light-spot";
130 const std::string METHOD_ADD_LIGHT_AMBIENT = "add-light-ambient";
131 const std::string METHOD_REMOVE_LIGHTS = "remove-lights";
132 const std::string METHOD_CLEAR_LIGHTS = "clear-lights";
133 
134 const std::string METHOD_FS_EXISTS = "fs-exists";
135 const std::string METHOD_FS_GET_CONTENT = "fs-get-content";
136 const std::string METHOD_FS_GET_ROOT = "fs-get-root";
137 const std::string METHOD_FS_LIST_DIR = "fs-list-dir";
138 
139 // JSONRPC notifications
140 const std::string METHOD_CHUNK = "chunk";
141 const std::string METHOD_QUIT = "quit";
142 const std::string METHOD_EXIT_LATER = "exit-later";
143 const std::string METHOD_RESET_CAMERA = "reset-camera";
144 const std::string METHOD_SET_CAMERA = "set-camera";
145 
146 const std::string LOADERS_SCHEMA = "loaders-schema";
147 
148 const std::string JSON_TYPE = "application/json";
149 
150 using Response = rockets::jsonrpc::Response;
151 
152 std::string hyphenatedToCamelCase(const std::string& hyphenated)
153 {
154  std::string camel = hyphenated;
155 
156  for (size_t x = 0; x < camel.length(); x++)
157  {
158  if (camel[x] == '-')
159  {
160  std::string tempString = camel.substr(x + 1, 1);
161 
162  transform(tempString.begin(), tempString.end(), tempString.begin(), toupper);
163 
164  camel.erase(x, 2);
165  camel.insert(x, tempString);
166  }
167  }
168  camel[0] = toupper(camel[0]);
169  return camel;
170 }
171 
172 std::string getNotificationEndpointName(const std::string& endpoint)
173 {
174  return "set-" + endpoint;
175 }
176 
177 std::string getRequestEndpointName(const std::string& endpoint)
178 {
179  return "get-" + endpoint;
180 }
181 
182 const Response::Error VIDEOSTREAM_NOT_ENABLED_ERROR{"Core was not started with videostream support enabled",
183  VIDEOSTREAMING_NOT_ENABLED};
184 const Response::Error VIDEOSTREAM_NOT_SUPPORTED_ERROR{"Core was not build with videostream support",
185  VIDEOSTREAMING_NOT_SUPPORTED};
186 } // namespace
187 
188 namespace core
189 {
190 template <class T, class PRE>
191 bool preUpdate(const std::string& json, PRE preUpdateFunc,
192  typename std::enable_if<!std::is_abstract<T>::value>::type* = 0)
193 {
194  if (std::function<bool(const T&)>(preUpdateFunc))
195  {
196  T temp;
197  if (!staticjson::from_json_string(json.c_str(), &temp, nullptr))
198  return false;
199  if (!preUpdateFunc(temp))
200  return false;
201  }
202  return true;
203 }
204 
205 template <class T, class PRE>
206 bool preUpdate(const std::string&, PRE, typename std::enable_if<std::is_abstract<T>::value>::type* = 0)
207 {
208  return true;
209 }
210 
211 template <class T, class PRE, class POST>
212 inline bool from_json(
213  T& obj, const std::string& json, PRE preUpdateFunc = [] {}, POST postUpdateFunc = [] {})
214 {
215  staticjson::ParseStatus status;
216 
217  if (!preUpdate<T>(json, preUpdateFunc))
218  return false;
219 
220  const auto success = staticjson::from_json_string(json.c_str(), &obj, &status);
221  if (success)
222  {
223  obj.markModified();
224  if (std::function<void(T&)>(postUpdateFunc))
225  postUpdateFunc(obj);
226  }
227  else
228  CORE_ERROR(status.description());
229  return success;
230 }
231 
233 {
234 public:
236  : _engine(api->getEngine())
237  , _parametersManager(api->getParametersManager())
238  {
240 #ifdef USE_NETWORKING
241  if (uvw::Loop::getDefault()->alive())
242  {
243  _processDelayedNotifies = uvw::Loop::getDefault()->resource<uvw::AsyncHandle>();
244  _processDelayedNotifies->on<uvw::AsyncEvent>([&](const auto&, auto&) { this->processDelayedNotifies(); });
245  }
246 #endif
247  }
248 
250  {
251  // cancel all pending tasks
252  decltype(_tasks) tasksToCancel;
253  {
254  std::lock_guard<std::mutex> lock(_tasksMutex);
255  tasksToCancel = _tasks;
256  }
257 
258  for (const auto& entry : tasksToCancel)
259  {
260  entry.first->cancel();
261  entry.second->wait();
262  }
263 
264 #ifdef USE_NETWORKING
265  if (_processDelayedNotifies)
266  _processDelayedNotifies->close();
267 #endif
268 
269  if (_rocketsServer)
270  _rocketsServer->setSocketListener(nullptr);
271 
272  for (auto i : _objects)
273  i->clearModifiedCallback();
274  }
275 
276  void preRender()
277  {
281  return;
282 
283  try
284  {
286  _rocketsServer->process(0);
287  }
288  catch (const std::exception& exc)
289  {
290  CORE_ERROR("Error while handling HTTP/websocket messages: " << exc.what());
291  }
292  }
293 
294  void postRender()
295  {
296  if (!_rocketsServer || _rocketsServer->getConnectionCount() == 0)
297  return;
298 
300  {
303  else
305  }
306 #ifdef BRAYNS_USE_FFMPEG
307  else
308  _broadcastVideo();
309 #endif
310  }
311 
313  const std::function<void(PropertyMap)>& action)
314  {
315  _jsonrpcServer->connect(desc.methodName,
316  [action, this](const auto& request)
317  {
318  ScopedCurrentClient scope(this->_currentClientID, request.clientID);
319  action(jsonToPropertyMap(request.message));
320  });
321 
323  }
324 
325  void registerNotification(const RpcDescription& desc, const std::function<void()>& action)
326  {
327  _jsonrpcServer->connect(desc.methodName,
328  [action, this](const auto& request)
329  {
330  ScopedCurrentClient scope(this->_currentClientID, request.clientID);
331  action();
332  });
333 
335  }
336 
337  void registerRequest(const RpcParameterDescription& desc, const PropertyMap& input, const PropertyMap& output,
338  const std::function<PropertyMap(PropertyMap)>& action)
339  {
341  desc.methodName,
342  [name = desc.methodName, action](const auto& request)
343  {
344  try
345  {
346  return Response{to_json(action(jsonToPropertyMap(request.message)))};
347  }
348  catch (...)
349  {
350  return Response{Response::Error{"from_json for " + name + " failed", PARAMETER_FROM_JSON_ERROR}};
351  }
352  });
353 
355  }
356 
357  void registerRequest(const RpcDescription& desc, const PropertyMap& output,
358  const std::function<PropertyMap()>& action)
359  {
360  _jsonrpcServer->bind(desc.methodName,
361  [action, this](const auto& request)
362  {
363  ScopedCurrentClient scope(this->_currentClientID, request.clientID);
364  return Response{to_json(action())};
365  });
366 
367  _handleSchema(desc.methodName, buildJsonRpcSchemaRequestPropertyMap(desc, output));
368  }
369 
370  void _registerRequest(const std::string& name, const RetParamFunc& action)
371  {
372  _jsonrpcServer->bind(name,
373  [action, this](const auto& request)
374  {
375  ScopedCurrentClient scope(this->_currentClientID, request.clientID);
376  return Response{action(request.message)};
377  });
378  }
379 
380  void _registerRequest(const std::string& name, const RetFunc& action)
381  {
382  _jsonrpcServer->bind(name,
383  [action, this](const auto& request)
384  {
385  ScopedCurrentClient scope(this->_currentClientID, request.clientID);
386  return Response{action()};
387  });
388  }
389 
390  void _registerNotification(const std::string& name, const ParamFunc& action)
391  {
392  _jsonrpcServer->connect(name,
393  [action, this](const auto& request)
394  {
395  ScopedCurrentClient scope(this->_currentClientID, request.clientID);
396  action(request.message);
397  });
398  }
399 
400  void _registerNotification(const std::string& name, const VoidFunc& action)
401  {
402  _jsonrpcServer->connect(name,
403  [action, this](const auto& request)
404  {
405  ScopedCurrentClient scope(this->_currentClientID, request.clientID);
406  action();
407  });
408  }
409 
411  {
412  // call pending notifies from delayed throttle threads here as notify() and process() are not threadsafe within
413  // Rockets.
414  std::vector<std::function<void()>> delayedNotifies;
415  {
416  std::lock_guard<std::mutex> lock(_delayedNotifiesMutex);
417  delayedNotifies = std::move(_delayedNotifies);
418  }
419 
420  for (const auto& func : delayedNotifies)
421  func();
422  }
423 
425  {
426  try
427  {
428  const auto& appParams = _parametersManager.getApplicationParameters();
429 #ifdef USE_NETWORKING
430  if (uvw::Loop::getDefault()->alive())
431  {
432  _rocketsServer =
433  std::make_unique<rockets::Server>(uv_default_loop(), appParams.getHttpServerURI(), "rockets");
434  _manualProcessing = false;
435  }
436  else
437 #endif
438  _rocketsServer = std::make_unique<rockets::Server>(appParams.getHttpServerURI(), "rockets", 0);
439 
440  CORE_INFO("Rockets server running on " << _rocketsServer->getURI());
441 
442  _jsonrpcServer = std::make_unique<JsonRpcServer>(*_rocketsServer);
443 
444  _parametersManager.getApplicationParameters().setHttpServerURI(_rocketsServer->getURI());
445  }
446  catch (const std::runtime_error& e)
447  {
448  CORE_ERROR("Rockets server could not be initialized: '" << e.what() << "'");
449  return;
450  }
451 
452  _setupWebsocket();
453  _timer.start();
454  }
455 
457  {
458  _rocketsServer->handleClose(
459  [this](const uintptr_t clientID)
460  {
461  _binaryRequests.removeRequest(clientID);
462  CORE_DEBUG("Closing WebSocket with client " << clientID);
463  return std::vector<rockets::ws::Response>{};
464  });
465 
466  _rocketsServer->handleBinary(
467  std::bind(&BinaryRequests::processMessage, std::ref(_binaryRequests), std::placeholders::_1));
468  }
469 
470  void _delayedNotify(const std::function<void()>& notify)
471  {
472  {
473  std::lock_guard<std::mutex> lock(_delayedNotifiesMutex);
474  _delayedNotifies.push_back(notify);
475  }
476 
477 #ifdef USE_NETWORKING
478  if (_processDelayedNotifies)
479  {
480  // dispatch delayed notify from throttle thread to main thread (where the default loop runs) as notify()
481  // and process() are not threadsafe within Rockets.
482  _processDelayedNotifies->send();
483  }
484 #endif
485  }
486 
487  void _rebroadcast(const std::string& endpoint, const std::string& message, const std::set<uintptr_t>& filter)
488  {
489  _delayedNotify(
490  [&, message, filter]
491  {
492  if (_rocketsServer->getConnectionCount() > 1)
493  {
494  try
495  {
496  const auto& msg = rockets::jsonrpc::makeNotification(endpoint, message);
497  _rocketsServer->broadcastText(msg, filter);
498  }
499  catch (const std::exception& e)
500  {
501  CORE_ERROR("Error rebroadcasting notification: " << e.what());
502  }
503  }
504  });
505  }
506 
507  // Utilty to change current client while we are handling a message to skip notification to the current client and
508  // to trigger a delayed notify to avoid a deadlock which happens when sending a message from within a message
509  // handler.
511  {
512  ScopedCurrentClient(uintptr_t& currentClientID, const uintptr_t newID)
513  : _currentClientID(currentClientID)
514  {
515  _currentClientID = newID;
516  CORE_DEBUG("New WebSocket connection established with client " << _currentClientID);
517  }
518 
519  ~ScopedCurrentClient() { _currentClientID = NO_CURRENT_CLIENT; }
520 
521  private:
522  uintptr_t& _currentClientID;
523  };
524 
525  void _bindEndpoint(const std::string& method, rockets::jsonrpc::ResponseCallback action)
526  {
527  _jsonrpcServer->bind(method,
528  [&, action](rockets::jsonrpc::Request request)
529  {
530  ScopedCurrentClient scope(_currentClientID, request.clientID);
531  return action(request);
532  });
533  }
534 
535  template <typename Params, typename RetVal>
536  void _bindEndpoint(const std::string& method, std::function<RetVal(Params)> action)
537  {
538  _jsonrpcServer->bind(method,
539  [&, action](rockets::jsonrpc::Request request)
540  {
541  ScopedCurrentClient scope(_currentClientID, request.clientID);
542 
543  Params params;
544  if (!::from_json(params, request.message))
545  return Response::invalidParams();
546  try
547  {
548  const auto& ret = action(std::move(params));
549  return Response{to_json(ret)};
550  }
551  catch (const rockets::jsonrpc::response_error& e)
552  {
553  return Response{Response::Error{e.what(), e.code}};
554  }
555  });
556  }
557 
558  template <typename RetVal>
559  void _bindEndpoint(const std::string& method, std::function<RetVal()> action)
560  {
561  _jsonrpcServer->bind(method,
562  [&, action](rockets::jsonrpc::Request request)
563  {
564  ScopedCurrentClient scope(_currentClientID, request.clientID);
565 
566  try
567  {
568  const auto& ret = action();
569  return Response{to_json(ret)};
570  }
571  catch (const rockets::jsonrpc::response_error& e)
572  {
573  return Response{Response::Error{e.what(), e.code}};
574  }
575  });
576  }
577 
578  template <typename Params>
579  void _bindEndpoint(const std::string& method, std::function<void(Params)> action)
580  {
581  _jsonrpcServer->bind(method,
582  [&, action](rockets::jsonrpc::Request request)
583  {
584  ScopedCurrentClient scope(_currentClientID, request.clientID);
585 
586  Params params;
587  if (!::from_json(params, request.message))
588  return Response::invalidParams();
589  action(std::move(params));
590  return Response{"\"OK\""};
591  });
592  }
593 
594  void _bindEndpoint(const std::string& method, rockets::jsonrpc::VoidCallback action)
595  {
596  _jsonrpcServer->connect(method,
597  [&, action](rockets::jsonrpc::Request request)
598  {
599  ScopedCurrentClient scope(_currentClientID, request.clientID);
600  action();
601  return Response{"\"OK\""};
602  });
603  }
604 
605  void _bindModelEndpoint(const std::string& method, const std::string& key,
606  std::function<bool(std::string, ModelDescriptorPtr)> action)
607  {
608  _jsonrpcServer->bind(method,
609  [&, key, action](rockets::jsonrpc::Request request)
610  {
611  ScopedCurrentClient scope(_currentClientID, request.clientID);
612 
613  using namespace rapidjson;
614  Document document;
615  document.Parse(request.message.c_str());
616 
617  if (!document.HasMember("id") || !document.HasMember(key.c_str()))
618  return Response::invalidParams();
619 
620  const auto modelID = document["id"].GetInt();
621  auto model = _engine.getScene().getModel(modelID);
622  if (!model)
623  {
624  return Response{Response::Error{"Model not found", MODEL_NOT_FOUND}};
625  }
626 
627  // get the actual property object from the model message (e.g. transfer function) and
628  // pass it as a JSON string to the provided action to consume it.
629  Document propertyDoc;
630  propertyDoc.SetObject() = document[key.c_str()].GetObject();
631  rapidjson::StringBuffer buffer;
632  rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
633  propertyDoc.Accept(writer);
634 
635  if (!action(buffer.GetString(), model))
636  return Response::invalidParams();
637 
638  _engine.triggerRender();
639  _rebroadcast(method, request.message, {request.clientID});
640  return Response{to_json(true)};
641  });
642  }
643 
644  template <class T>
645  void _handleGET(const std::string& endpoint, T& obj, const int64_t throttleTime = DEFAULT_THROTTLE)
646  {
647  using namespace rockets::http;
648 
649  _rocketsServer->handle(Method::GET, endpoint,
650  [&obj](const Request&)
651  { return make_ready_response(Code::OK, to_json(obj), JSON_TYPE); });
652 
653  _handleObjectSchema(endpoint, obj);
654 
655  const std::string rpcEndpoint = getRequestEndpointName(endpoint);
656 
657  _jsonrpcServer->bind(rpcEndpoint, std::function<const T&()>([&obj]() -> const T& { return obj; }));
658  _handleSchema(rpcEndpoint,
659  buildJsonRpcSchemaRequestReturnOnly({rpcEndpoint, "Get the current state of " + endpoint}, obj));
660 
661  // Create new throttle for that endpoint
662  _throttle[endpoint];
663 
664  obj.onModified(
665  [&, endpoint = getNotificationEndpointName(endpoint), throttleTime](const auto& base)
666  {
667  auto& throttle = _throttle[endpoint];
668 
669  // throttle itself is not thread-safe, but we can get called from different threads (c.f. async model
670  // load)
671  std::lock_guard<std::mutex> lock(throttle.first);
672 
673  const auto& castedObj = static_cast<const T&>(base);
674  const auto notify =
675  [&rocketsServer = _rocketsServer, clientID = _currentClientID, endpoint, json = to_json(castedObj)]
676  {
677  if (rocketsServer->getConnectionCount() == 0)
678  return;
679  try
680  {
681  const auto& msg = rockets::jsonrpc::makeNotification(endpoint, json);
682  CORE_DEBUG("Broadcasting to " << clientID << " [" << endpoint << "]");
683 #if 0 // Not quite sure why the UI does not receive messages when the clientID is set :-/
684  if (clientID == NO_CURRENT_CLIENT)
685  rocketsServer->broadcastText(msg);
686  else
687  rocketsServer->broadcastText(msg, {clientID});
688 
689 #else
690  if (endpoint != METHOD_SET_CAMERA) // Dirty hack?!? Probably... This avoid message looping
691  // between UI and Backend when the camera is moved (UI
692  // informs backend, that sends back update to UI, etc)
693  rocketsServer->broadcastText(msg);
694  rocketsServer->broadcastText(msg, {clientID});
695 #endif
696  }
697  catch (const std::exception& e)
698  {
699  CORE_ERROR("Error broadcasting notification: " << e.what());
700  }
701  };
702  const auto delayedNotify = [&, notify] { this->_delayedNotify(notify); };
703 
704  // non-throttled, direct notify can happen directly if we are not in the middle handling an incoming
705  // message; delayed notify must be dispatched to the main thread
706  if (_currentClientID == NO_CURRENT_CLIENT)
707  throttle.second(notify, delayedNotify, throttleTime);
708  else
709  throttle.second(delayedNotify, delayedNotify, throttleTime);
710  });
711 
712  _objects.push_back(&obj);
713  }
714 
715  template <class T>
716  void _handlePUT(const std::string& endpoint, T& obj)
717  {
718  _handlePUT(endpoint, obj, std::function<bool(const T&)>(), std::function<void(T&)>());
719  }
720 
721  template <class T, class PRE, class POST>
722  void _handlePUT(const std::string& endpoint, T& obj, PRE preUpdateFunc, POST postUpdateFunc)
723  {
724  using namespace rockets::http;
725  _rocketsServer->handle(Method::PUT, endpoint,
726  [&obj, preUpdateFunc, postUpdateFunc](const Request& req)
727  {
728  return make_ready_response(from_json(obj, req.body, preUpdateFunc, postUpdateFunc)
729  ? Code::OK
730  : Code::BAD_REQUEST);
731  });
732 
733  _handleObjectSchema(endpoint, obj);
734 
735  const std::string rpcEndpoint = getNotificationEndpointName(endpoint);
736 
737  _bindEndpoint(rpcEndpoint,
738  [&, rpcEndpoint, preUpdateFunc, postUpdateFunc](rockets::jsonrpc::Request request)
739  {
740  if (from_json(obj, request.message, preUpdateFunc, postUpdateFunc))
741  {
742  _engine.triggerRender();
743  return rockets::jsonrpc::Response{to_json(true)};
744  }
745  return rockets::jsonrpc::Response::invalidParams();
746  });
747  const RpcParameterDescription desc{rpcEndpoint, "Set the new state of " + endpoint, "param", endpoint};
748  _handleSchema(rpcEndpoint, buildJsonRpcSchemaRequest<T, bool>(desc, obj));
749  }
750 
751  template <class T>
752  void _handle(const std::string& endpoint, T& obj, const int64_t throttleTime = DEFAULT_THROTTLE)
753  {
754  _handleGET(endpoint, obj, throttleTime);
755  _handlePUT(endpoint, obj);
756  }
757 
758  template <class P, class R>
759  void _handleRPC(const RpcParameterDescription& desc, std::function<R(P)> action)
760  {
761  _bindEndpoint<P, R>(desc.methodName, action);
762  _handleSchema(desc.methodName, buildJsonRpcSchemaRequest<P, R>(desc));
763  }
764 
765  template <class R>
766  void _handleRPC(const RpcDescription& desc, std::function<R()> action)
767  {
768  _bindEndpoint<R>(desc.methodName, action);
769  _handleSchema(desc.methodName, buildJsonRpcSchemaRequestReturnOnly<R>(desc));
770  }
771 
772  template <class P>
773  void _handleRPC(const RpcParameterDescription& desc, std::function<void(P)> action)
774  {
775  _bindEndpoint<P>(desc.methodName, action);
776  _handleSchema(desc.methodName, buildJsonRpcSchemaNotify<P>(desc));
777  }
778 
779  void _handleRPC(const RpcDescription& desc, std::function<void()> action)
780  {
781  _bindEndpoint(desc.methodName, action);
782  _handleSchema(desc.methodName, buildJsonRpcSchemaNotify(desc));
783  }
784 
785  template <class P, class R>
787  std::function<rockets::jsonrpc::CancelRequestCallback(
788  P, uintptr_t, rockets::jsonrpc::AsyncResponse, rockets::jsonrpc::ProgressUpdateCallback)>
789  action)
790  {
791  _jsonrpcServer->bindAsync<P>(desc.methodName, action);
792  _handleSchema(desc.methodName, buildJsonRpcSchemaRequest<P, R>(desc));
793  }
794 
795  template <class P, class R>
797  std::function<std::shared_ptr<Task<R>>(P, uintptr_t)> createTask)
798  {
799  // define the action that is executed on every incoming request from the
800  // client:
801  // - create the task that shall be executed
802  // - wire the result of task to the response callback from rockets
803  // - setup progress reporting during the task execution using libuv
804  // - wire the cancel request from rockets to the task
805 
806  auto action = [&, createTask](P params, auto clientID, auto respond, auto progressCb)
807  {
808  // transform task error to rockets error response
809  auto errorCallback = [&, respond](const TaskRuntimeError& error)
810  {
811  const Response response(Response::Error{error.what(), error.code, error.data});
812  this->_delayedNotify([respond, response] { respond(response); });
813  };
814 
815  try
816  {
817  // transform task result to rockets response
818  auto readyCallback = [&, respond](const R& result)
819  {
820  try
821  {
822  this->_delayedNotify([respond, result] { respond({to_json(result)}); });
823  }
824  catch (const std::runtime_error& e)
825  {
826  const Response response(Response::Error{e.what(), TASK_RESULT_TO_JSON_ERROR});
827  this->_delayedNotify([respond, response] { respond(response); });
828  }
829  };
830 
831  // create the task that shall be executed for this request
832  auto task = createTask(std::move(params), clientID);
833 
834  std::function<void()> finishProgress = [task] { task->progress.update("Done", 1.f); };
835 
836 #ifdef USE_NETWORKING
837  // setup periodic progress reporting if we have libuv running
838  if (uvw::Loop::getDefault()->alive())
839  {
840  auto progressUpdate = uvw::Loop::getDefault()->resource<uvw::TimerHandle>();
841 
842  auto sendProgress = [progressCb, &progress = task->progress] { progress.consume(progressCb); };
843  progressUpdate->on<uvw::TimerEvent>([sendProgress](const auto&, auto&) { sendProgress(); });
844 
845  finishProgress = [task, progressUpdate, sendProgress]
846  {
847  task->progress.update("Done", 1.f);
848  sendProgress();
849  progressUpdate->stop();
850  progressUpdate->close();
851  };
852 
853  using ms = std::chrono::milliseconds;
854  progressUpdate->start(ms(0), ms(SLOW_THROTTLE));
855  }
856 #endif
857 
858  // setup the continuation task that handles the result or error
859  // of the task to handle the responses to rockets accordingly.
860  auto responseTask = std::make_shared<async::task<void>>(task->get().then(
861  [&, readyCallback, errorCallback, task, finishProgress](typename Task<R>::Type result)
862  {
863  finishProgress();
864 
865  try
866  {
867  readyCallback(result.get());
868  }
869  catch (const TaskRuntimeError& e)
870  {
871  errorCallback(e);
872  }
873  catch (const std::exception& e)
874  {
875  errorCallback({e.what()});
876  }
877  catch (const async::task_canceled&)
878  {
879  task->finishCancel();
880  }
881 
882 #ifdef USE_NETWORKING
883  if (_processDelayedNotifies)
884  _processDelayedNotifies->send();
885 #endif
886 
887  std::lock_guard<std::mutex> lock(_tasksMutex);
888  _tasks.erase(task);
889  _binaryRequests.removeTask(task);
890  }));
891 
892  std::lock_guard<std::mutex> lock(_tasksMutex);
893  _tasks.emplace(task, responseTask);
894 
895  // forward the cancel request from rockets to the task
896  auto cancel = [task, responseTask](auto done) { task->cancel(done); };
897 
898  task->schedule();
899 
900  return rockets::jsonrpc::CancelRequestCallback(cancel);
901  }
902  // respond errors during the setup of the task
903  catch (const TaskRuntimeError& e)
904  {
905  errorCallback(e);
906  }
907  catch (const std::exception& e)
908  {
909  errorCallback({e.what()});
910  }
911  return rockets::jsonrpc::CancelRequestCallback();
912  };
913  _handleAsyncRPC<P, R>(desc, action);
914  }
915 
916  template <class T>
917  void _handleObjectSchema(const std::string& endpoint)
918  {
919  _handleSchema(endpoint, buildJsonSchema<T>(hyphenatedToCamelCase(endpoint)));
920  }
921 
922  template <class T>
923  void _handleObjectSchema(const std::string& endpoint, T& obj)
924  {
925  _handleSchema(endpoint, buildJsonSchema(obj, hyphenatedToCamelCase(endpoint)));
926  }
927 
928  void _handleSchema(const std::string& endpoint, const std::string& schema)
929  {
930  using namespace rockets::http;
931  _rocketsServer->handle(Method::GET, endpoint + "/schema",
932  [schema](const Request&) { return make_ready_response(Code::OK, schema, JSON_TYPE); });
933 
934  _schemas[endpoint] = schema;
935  }
936 
938  {
939  _handleAnimationParams();
940  _handleCamera();
941  _handleImageJPEG();
942  _handleTriggerImageStream();
943  _handleSetImageStreamingMode();
944  _handleRenderer();
945  _handleVersion();
946 
947  _handle(ENDPOINT_APP_PARAMS, _parametersManager.getApplicationParameters());
948  _handle(ENDPOINT_VOLUME_PARAMS, _parametersManager.getVolumeParameters());
949  _handle(ENDPOINT_FIELD_PARAMS, _parametersManager.getFieldParameters());
950  _handle(ENDPOINT_RENDER_PARAMS, _parametersManager.getRenderingParameters());
951  _handle(ENDPOINT_GEOMETRY_PARAMS, _parametersManager.getGeometryParameters());
952 
953  // following endpoints need a valid engine
954  const bool disableBroadcasting = std::getenv("ROCKETS_DISABLE_SCENE_BROADCASTING") != nullptr;
955  if (disableBroadcasting)
956  {
957  CORE_WARN("Scene broadcasting has been disabled");
958  }
959  else
960  _handle(ENDPOINT_SCENE, _engine.getScene());
961 
962  _handleGET(ENDPOINT_STATISTICS, _engine.getStatistics(), SLOW_THROTTLE);
963 
964  _handleSchemaRPC();
965 
966  _handleInspect();
967  _handleQuit();
968  _handleExitLater();
969  _handleResetCamera();
970  _handleSnapshot();
971 
972  _handleRequestModelUpload();
973  _handleChunk();
974 
975  _handleSetEnvironmentMap();
976  _handleGetEnvironmentMap();
977 
978  _handleSetVideostream();
979  _handleGetVideostream();
980 
981  _handleAddModel();
982  _handleRemoveModel();
983  _handleUpdateModel();
984  _handleSetModelProperties();
985  _handleGetModelProperties();
986  _handleModelPropertiesSchema();
987 
988  _handleSetModelTransferFunction();
989  _handleGetModelTransferFunction();
990 
991  _handleAddClipPlane();
992  _handleGetClipPlanes();
993  _handleUpdateClipPlane();
994  _handleRemoveClipPlanes();
995 
996  _handleGetInstances();
997  _handleUpdateInstance();
998 
999  _handleGetLoaders();
1000  _handleLoadersSchema();
1001  _handlePropertyObject(_engine.getCamera(), ENDPOINT_CAMERA_PARAMS, "camera");
1002  _handlePropertyObject(_engine.getRenderer(), ENDPOINT_RENDERER_PARAMS, "renderer");
1003  _handleGetLights();
1004  _handleAddLight();
1005  _handleRemoveLights();
1006  _handleClearLights();
1007 
1008  _handleFsExists();
1009  _handleFsGetContent();
1010  _handleFsGetRoot();
1011  _handleFsGetListDir();
1012 
1013  _endpointsRegistered = true;
1014  }
1015 
1017  {
1018  _jsonrpcServer->bind(METHOD_IMAGE_JPEG,
1019  std::function<ImageGenerator::ImageBase64()>(
1020  [&]
1021  {
1022  return _imageGenerator.createImage(
1023  _engine.getFrameBuffer(), "jpg",
1024  _parametersManager.getApplicationParameters().getJpegCompression());
1025  }));
1026  _handleSchema(METHOD_IMAGE_JPEG, buildJsonRpcSchemaRequestReturnOnly<ImageGenerator::ImageBase64>(
1027  {METHOD_IMAGE_JPEG, "Get the current state of " + METHOD_IMAGE_JPEG}));
1028  }
1029 
1031  {
1032  auto& frameBuffer = _engine.getFrameBuffer();
1033  if (frameBuffer.getFrameBufferFormat() == FrameBufferFormat::none || !frameBuffer.isModified())
1034  {
1035  return;
1036  }
1037 
1038  const auto& params = _parametersManager.getApplicationParameters();
1039  const auto fps = params.getImageStreamFPS();
1040  if (fps == 0)
1041  return;
1042 
1043  const auto elapsed = _timer.elapsed() + _leftover;
1044  const auto duration = 1.0 / fps;
1045  if (elapsed < duration)
1046  return;
1047 
1048  _leftover = elapsed - duration;
1049  for (; _leftover > duration;)
1050  _leftover -= duration;
1051  _timer.start();
1052 
1053  const auto image = _imageGenerator.createJPEG(frameBuffer, params.getJpegCompression());
1054  if (image.size > 0)
1055  _rocketsServer->broadcastBinary((const char*)image.data.get(), image.size);
1056  }
1057 
1059  {
1060  if (!_controlledStreamingFlag.load())
1061  {
1062  return;
1063  }
1064 
1065  auto& frameBuffer = _engine.getFrameBuffer();
1066  if (frameBuffer.getFrameBufferFormat() == FrameBufferFormat::none || !frameBuffer.isModified())
1067  {
1068  return;
1069  }
1070 
1071  _controlledStreamingFlag = false;
1072  const auto& params = _parametersManager.getApplicationParameters();
1073 
1074  const auto image = _imageGenerator.createJPEG(frameBuffer, params.getJpegCompression());
1075  if (image.size > 0)
1076  _rocketsServer->broadcastBinary((const char*)image.data.get(), image.size);
1077  }
1078 
1079 #ifdef BRAYNS_USE_FFMPEG
1080  void _broadcastVideo()
1081  {
1082  if (!_videoParams.enabled)
1083  {
1084  _encoder.reset();
1085  if (_videoUpdatedResponse)
1086  _videoUpdatedResponse();
1087  _videoUpdatedResponse = nullptr;
1088  return;
1089  }
1090 
1091  const auto& params = _parametersManager.getApplicationParameters();
1092  const auto fps = params.getImageStreamFPS();
1093  if (fps == 0)
1094  return;
1095 
1096  if (_encoder && _encoder->kbps != _videoParams.kbps)
1097  _encoder.reset();
1098 
1099  auto& frameBuffer = _engine.getFrameBuffer();
1100  if (!_encoder)
1101  {
1102  int width = frameBuffer.getFrameSize().x;
1103  if (width % 2 != 0)
1104  width += 1;
1105  int height = frameBuffer.getFrameSize().y;
1106  if (height % 2 != 0)
1107  height += 1;
1108 
1109  _encoder = std::make_unique<Encoder>(width, height, fps, _videoParams.kbps,
1110  [&rs = _rocketsServer](auto a, auto b) { rs->broadcastBinary(a, b); });
1111  }
1112 
1113  if (_videoUpdatedResponse)
1114  _videoUpdatedResponse();
1115  _videoUpdatedResponse = nullptr;
1116 
1117  if (frameBuffer.getFrameBufferFormat() == FrameBufferFormat::none || !frameBuffer.isModified())
1118  {
1119  return;
1120  }
1121 
1122  _encoder->encode(frameBuffer);
1123  }
1124 #endif
1125 
1127  {
1128  static core::Version version;
1129  using namespace rockets::http;
1130  _rocketsServer->handleGET(ENDPOINT_VERSION, version);
1131  _rocketsServer->handle(Method::GET, ENDPOINT_VERSION + "/schema",
1132  [&](const Request&)
1133  { return make_ready_response(Code::OK, version.getSchema(), JSON_TYPE); });
1134 
1135  _jsonrpcServer->bind(
1136  getRequestEndpointName(ENDPOINT_VERSION), (std::function<core::Version()>)[] { return core::Version(); });
1137 
1138  _handleSchema(ENDPOINT_VERSION, version.getSchema());
1139  }
1140 
1142  {
1143  auto& animParams = _parametersManager.getAnimationParameters();
1144  auto preUpdate = [](const AnimationParameters& obj) { return obj.getDelta() != 0; };
1145 
1146  _handleGET(ENDPOINT_ANIMATION_PARAMS, animParams, INTERACTIVE_THROTTLE);
1147  _handlePUT(ENDPOINT_ANIMATION_PARAMS, animParams, preUpdate, std::function<void(AnimationParameters&)>());
1148  }
1149 
1151  {
1152  auto& camera = _engine.getCamera();
1153  auto preUpdate = [types = camera.getTypes()](const Camera& obj)
1154  {
1155  if (obj.getCurrentType().empty())
1156  return true;
1157  return std::find(types.begin(), types.end(), obj.getCurrentType()) != types.end();
1158  };
1159  _handleGET(ENDPOINT_CAMERA, camera);
1160  _handlePUT(ENDPOINT_CAMERA, camera, preUpdate, std::function<void(Camera&)>());
1161  }
1162 
1164  {
1165  auto& renderer = _engine.getRenderer();
1166  auto preUpdate = [types = renderer.getTypes()](const Renderer& obj)
1167  {
1168  if (obj.getCurrentType().empty())
1169  return true;
1170  return std::find(types.begin(), types.end(), obj.getCurrentType()) != types.end();
1171  };
1172  _handleGET(ENDPOINT_RENDERER, renderer);
1173  _handlePUT(ENDPOINT_RENDERER, renderer, preUpdate, std::function<void(Renderer&)>());
1174  }
1175 
1177  {
1178  const RpcParameterDescription desc{METHOD_SCHEMA, "Get the schema of the given endpoint", Execution::sync,
1179  "endpoint", "name of the endpoint to get its schema"};
1180 
1181  _jsonrpcServer->bind(
1182  METHOD_SCHEMA,
1183  [&schemas = _schemas](const auto& request)
1184  {
1185  SchemaParam param;
1186  if (::from_json(param, request.message))
1187  {
1188  if (schemas.count(param.endpoint) == 0)
1189  return Response{Response::Error{"Endpoint not found", SCHEMA_RPC_ENDPOINT_NOT_FOUND}};
1190 
1191  auto schema = schemas[param.endpoint];
1192  return Response{std::move(schema)};
1193  }
1194  return Response::invalidParams();
1195  });
1196 
1197  _handleSchema(METHOD_SCHEMA, buildJsonRpcSchemaRequest<SchemaParam, std::string>(desc));
1198  }
1199 
1201  {
1202  using Position = std::array<double, 2>;
1203  const RpcParameterDescription desc{METHOD_INSPECT, "Inspect the scene at x-y position", Execution::sync,
1204  "position", "x-y position in normalized coordinates"};
1205  _handleRPC<Position, Renderer::PickResult>(
1206  desc,
1207  [&engine = _engine](const auto& position) {
1208  return engine.getRenderer().pick({float(position[0]), float(position[1])});
1209  });
1210  }
1211 
1213  {
1214  _handleRPC({METHOD_QUIT, "Quit the application"},
1215  [&engine = _engine]
1216  {
1217  engine.setKeepRunning(false);
1218  engine.triggerRender();
1219  });
1220  }
1221 
1223  {
1224  _handleRPC<ExitLaterSchedule>({METHOD_EXIT_LATER, "Schedules Core to shutdown after a given amount of minutes",
1225  "minutes", "Number of minutes after which Core will shut down"},
1226  [&](const ExitLaterSchedule& schedule)
1227  {
1228  std::lock_guard<std::mutex> lock(_scheduleMutex);
1229  if (schedule.minutes > 0)
1230  {
1231  if (_scheduledShutdownActive)
1232  {
1233  _cancelScheduledShutdown = true;
1234  _monitor.notify_all();
1235  _shutDownWorker->join();
1236  _shutDownWorker.reset();
1237  _cancelScheduledShutdown = false;
1238  _scheduledShutdownActive = false;
1239  }
1240 
1241  _scheduledShutdownActive = true;
1242  const uint32_t mins = schedule.minutes;
1243  _shutDownWorker = std::unique_ptr<std::thread>(new std::thread(
1244  [&, mins]()
1245  {
1246  std::chrono::milliseconds timeToWait(mins * 60000);
1247  std::unique_lock<std::mutex> threadLock(_waitLock);
1248  _monitor.wait_for(threadLock, timeToWait);
1249  if (!_cancelScheduledShutdown)
1250  {
1251  _engine.setKeepRunning(false);
1252  _engine.triggerRender();
1253  }
1254  }));
1255  }
1256  });
1257  }
1258 
1260  {
1261  _handleRPC({METHOD_RESET_CAMERA, "Resets the camera to its initial values"},
1262  [&engine = _engine]
1263  {
1264  engine.getCamera().reset();
1265  engine.triggerRender();
1266  });
1267  }
1268 
1270  {
1271  const RpcParameterDescription desc{METHOD_SNAPSHOT, "Make a snapshot of the current view", Execution::async,
1272  "settings", "Snapshot settings for quality and size"};
1273  auto func = [&engine = _engine, &imageGenerator = _imageGenerator](auto&& params, const auto)
1274  {
1275  using SnapshotTask = DeferredTask<ImageGenerator::ImageBase64>;
1276  return std::make_shared<SnapshotTask>(SnapshotFunctor{engine, std::move(params), imageGenerator});
1277  };
1278  _handleTask<SnapshotParams, ImageGenerator::ImageBase64>(desc, func);
1279  }
1280 
1282  {
1283  _handleRPC({METHOD_TRIGGER_JPEG_STREAM, "Triggers the engine to stream a frame to the clients"},
1284  [&] { _triggerControlledStreaming(); });
1285  }
1286 
1288  {
1289  _handleRPC<ImageStreamingMethod>({METHOD_SET_STREAMING_METHOD,
1290  "Set the image streaming method between automatic or "
1291  "controlled",
1292  "type", "Streaming type, either \"stream\" or \"quanta\""},
1293  [&](const ImageStreamingMethod& method)
1294  {
1295  if (method.type == "quanta")
1296  {
1297  _useControlledStream = true;
1298  _controlledStreamingFlag = false;
1299  }
1300  else
1301  _useControlledStream = false;
1302  });
1303  }
1304 
1306  {
1308  "Request upload of blob to trigger adding of model after blob has "
1309  "been received; returns model descriptor on success",
1310  Execution::async, "param", "size, type, name, transformation, etc."};
1311 
1312  _handleTask<BinaryParam, ModelDescriptorPtr>(desc, std::bind(&BinaryRequests::createTask,
1313  std::ref(_binaryRequests), std::placeholders::_1,
1314  std::placeholders::_2, std::ref(_engine)));
1315  }
1316 
1318  {
1319  const RpcParameterDescription desc{METHOD_CHUNK, "Indicate sending of a binary chunk after this message",
1320  "chunk", "object with an ID of the chunk"};
1321 
1322  _handleRPC<Chunk>(desc, [&req = _binaryRequests](const auto& chunk) { req.setNextChunkID(chunk.id); });
1323  }
1324 
1326  {
1327  const RpcParameterDescription desc{METHOD_SET_MODEL_TRANSFER_FUNCTION,
1328  "Set the transfer function of the given model", "param",
1329  "model ID and the new transfer function"};
1330 
1331  _bindModelEndpoint(METHOD_SET_MODEL_TRANSFER_FUNCTION, "transfer_function",
1332  [&](const std::string& json, ModelDescriptorPtr model)
1333  {
1334  auto& tf = model->getModel().getTransferFunction();
1335  if (!::from_json(tf, json))
1336  return false;
1337 
1338  tf.markModified();
1339  return true;
1340  });
1341 
1342  _handleSchema(METHOD_SET_MODEL_TRANSFER_FUNCTION, buildJsonRpcSchemaRequest<ModelTransferFunction, bool>(desc));
1343  }
1344 
1346  {
1347  const RpcParameterDescription desc{METHOD_GET_MODEL_TRANSFER_FUNCTION,
1348  "Get the transfer function of the given model", "id", "the model ID"};
1349 
1350  _jsonrpcServer->bind<ObjectID, TransferFunction>(desc.methodName,
1351  [&engine = _engine](const ObjectID& id)
1352  {
1353  auto model = engine.getScene().getModel(id.id);
1354  if (!model)
1355  throw rockets::jsonrpc::response_error(
1356  "Model not found", MODEL_NOT_FOUND);
1357  return model->getModel().getTransferFunction();
1358  });
1359 
1360  _handleSchema(METHOD_GET_MODEL_TRANSFER_FUNCTION, buildJsonRpcSchemaRequest<ObjectID, TransferFunction>(desc));
1361  }
1362 
1364  {
1365  const RpcParameterDescription desc{METHOD_ADD_CLIP_PLANE, "Add a clip plane; returns the clip plane descriptor",
1366  "plane", "An array of 4 floats"};
1367 
1368  _handleRPC<Plane, ClipPlanePtr>(desc,
1369  [&](const Plane& plane)
1370  {
1371  auto& scene = _engine.getScene();
1372  auto clipPlane = scene.getClipPlane(scene.addClipPlane(plane));
1373  _rebroadcast(METHOD_UPDATE_CLIP_PLANE, to_json(clipPlane),
1374  {_currentClientID});
1375  return clipPlane;
1376  });
1377  }
1378 
1380  {
1381  _handleRPC<ClipPlanes>({METHOD_GET_CLIP_PLANES, "Get all clip planes"},
1382  [&engine = _engine]()
1383  {
1384  auto& scene = engine.getScene();
1385  return scene.getClipPlanes();
1386  });
1387  }
1388 
1390  {
1391  const RpcParameterDescription desc{METHOD_UPDATE_CLIP_PLANE, "Update a clip plane with the given coefficients",
1392  "clip_plane", "Plane id and equation"};
1393  _handleRPC<ClipPlane, bool>(desc,
1394  [&](const ClipPlane& newPlane)
1395  {
1396  auto& scene = _engine.getScene();
1397  if (auto plane = scene.getClipPlane(newPlane.getID()))
1398  {
1399  plane->setPlane(newPlane.getPlane());
1400  _rebroadcast(METHOD_UPDATE_CLIP_PLANE, to_json(newPlane),
1401  {_currentClientID});
1402  _engine.triggerRender();
1403  return true;
1404  }
1405  return false;
1406  });
1407  }
1408 
1410  {
1411  const RpcParameterDescription desc{METHOD_REMOVE_CLIP_PLANES,
1412  "Remove clip planes from the scene given their gids", "ids",
1413  "Array of clip planes IDs"};
1414  _handleRPC<size_ts, bool>(desc,
1415  [&](const size_ts& ids)
1416  {
1417  for (const auto id : ids)
1418  _engine.getScene().removeClipPlane(id);
1419  _rebroadcast(METHOD_REMOVE_CLIP_PLANES, to_json(ids), {_currentClientID});
1420  _engine.triggerRender();
1421  return true;
1422  });
1423  }
1424 
1426  {
1427  const RpcDescription desc{METHOD_GET_LIGHTS, "get all lights"};
1428  _bindEndpoint(METHOD_GET_LIGHTS,
1429  [&engine = _engine](const rockets::jsonrpc::Request& /*request*/)
1430  {
1431  const auto& lights = engine.getScene().getLightManager().getLights();
1432 
1433  std::vector<std::string> jsonStrings;
1434 
1435  for (const auto& kv : lights)
1436  {
1437  RPCLight rpcLight;
1438  rpcLight.id = kv.first;
1439  auto baseLight = kv.second;
1440 
1441  switch (baseLight->_type)
1442  {
1443  case LightType::DIRECTIONAL:
1444  {
1445  rpcLight.type = "directional";
1446  const auto light = static_cast<DirectionalLight*>(baseLight.get());
1447  rpcLight.properties.setProperty({"direction", toArray<3, double>(light->_direction)});
1448  rpcLight.properties.setProperty({"angularDiameter", light->_angularDiameter});
1449  break;
1450  }
1451  case LightType::SPHERE:
1452  {
1453  rpcLight.type = "sphere";
1454  const auto light = static_cast<SphereLight*>(baseLight.get());
1455  rpcLight.properties.setProperty({"position", toArray<3, double>(light->_position)});
1456  rpcLight.properties.setProperty({"radius", light->_radius});
1457  break;
1458  }
1459  case LightType::QUAD:
1460  {
1461  rpcLight.type = "quad";
1462  const auto light = static_cast<QuadLight*>(baseLight.get());
1463  rpcLight.properties.setProperty({"position", toArray<3, double>(light->_position)});
1464  rpcLight.properties.setProperty({"edge1", toArray<3, double>(light->_edge1)});
1465  rpcLight.properties.setProperty({"edge2", toArray<3, double>(light->_edge2)});
1466  break;
1467  }
1468  case LightType::SPOTLIGHT:
1469  {
1470  rpcLight.type = "spotlight";
1471  const auto light = static_cast<SpotLight*>(baseLight.get());
1472  rpcLight.properties.setProperty({"position", toArray<3, double>(light->_position)});
1473  rpcLight.properties.setProperty({"direction", toArray<3, double>(light->_direction)});
1474  rpcLight.properties.setProperty({"openingAngle", light->_openingAngle});
1475  rpcLight.properties.setProperty({"penumbraAngle", light->_penumbraAngle});
1476  rpcLight.properties.setProperty({"radius", light->_radius});
1477  break;
1478  }
1479  case LightType::AMBIENT:
1480  {
1481  rpcLight.type = "ambient";
1482  break;
1483  }
1484  }
1485 
1486  rpcLight.properties.setProperty({"color", toArray<3, double>(baseLight->_color)});
1487  rpcLight.properties.setProperty({"intensity", baseLight->_intensity});
1488  rpcLight.properties.setProperty({"isVisible", baseLight->_isVisible});
1489 
1490  jsonStrings.emplace_back(to_json(rpcLight));
1491  }
1492  return Response{"[" + string_utils::join(jsonStrings, ",") + "]"};
1493  });
1494 
1495  _handleSchema(METHOD_GET_LIGHTS, buildJsonRpcSchemaRequestReturnOnly<std::vector<RPCLight>>(desc));
1496  }
1497 
1499  {
1500  _handleRPC<SpotLight, int>({METHOD_ADD_LIGHT_SPOT, "Add a spotlight, returns id", "light",
1501  "The light and its properties"},
1502  [&engine = _engine](const SpotLight& l)
1503  {
1504  LightManager& lightManager = engine.getScene().getLightManager();
1505  auto light = std::make_shared<SpotLight>(l);
1506  light->_type = LightType::SPOTLIGHT;
1507 
1508  const auto id = lightManager.addLight(light);
1509  engine.triggerRender();
1510  return id;
1511  });
1512 
1513  _handleRPC<DirectionalLight, int>({METHOD_ADD_LIGHT_DIRECTIONAL, "Add a directional light", "light",
1514  "The light and its properties"},
1515  [&engine = _engine](const DirectionalLight& l)
1516  {
1517  LightManager& lightManager = engine.getScene().getLightManager();
1518  auto light = std::make_shared<DirectionalLight>(l);
1519  light->_type = LightType::DIRECTIONAL;
1520 
1521  const auto id = lightManager.addLight(light);
1522  engine.triggerRender();
1523  return id;
1524  });
1525 
1526  _handleRPC<QuadLight, int>({METHOD_ADD_LIGHT_QUAD, "Add a quad light", "light", "The light and its properties"},
1527  [&engine = _engine](const QuadLight& l)
1528  {
1529  LightManager& lightManager = engine.getScene().getLightManager();
1530  auto light = std::make_shared<QuadLight>(l);
1531  light->_type = LightType::QUAD;
1532 
1533  const auto id = lightManager.addLight(light);
1534  engine.triggerRender();
1535  return id;
1536  });
1537 
1538  _handleRPC<SphereLight, int>({METHOD_ADD_LIGHT_SPHERE, "Add a sphere light", "light",
1539  "The light and its properties"},
1540  [&engine = _engine](const SphereLight& l)
1541  {
1542  LightManager& lightManager = engine.getScene().getLightManager();
1543  auto light = std::make_shared<SphereLight>(l);
1544  light->_type = LightType::SPHERE;
1545 
1546  const auto id = lightManager.addLight(light);
1547  engine.triggerRender();
1548  return id;
1549  });
1550 
1551  _handleRPC<AmbientLight, int>({METHOD_ADD_LIGHT_AMBIENT, "Add an ambient light", "light",
1552  "The light and its properties"},
1553  [&engine = _engine](const AmbientLight& l)
1554  {
1555  LightManager& lightManager = engine.getScene().getLightManager();
1556  auto light = std::make_shared<AmbientLight>(l);
1557  light->_type = LightType::AMBIENT;
1558 
1559  const auto id = lightManager.addLight(light);
1560  engine.triggerRender();
1561  return id;
1562  });
1563  }
1564 
1566  {
1567  const RpcParameterDescription desc{METHOD_REMOVE_LIGHTS, "Remove light given their IDs", "ids",
1568  "Array of light IDs"};
1569  _handleRPC<size_ts, bool>(desc,
1570  [&engine = _engine](const size_ts& ids)
1571  {
1572  auto& lightManager = engine.getScene().getLightManager();
1573  for (const auto id : ids)
1574  lightManager.removeLight(id);
1575  engine.triggerRender();
1576  return true;
1577  });
1578  }
1579 
1581  {
1582  const RpcDescription desc{METHOD_CLEAR_LIGHTS, "Remove all lights in the scene"};
1583  _handleRPC(desc,
1584  [&engine = _engine]()
1585  {
1586  auto& lightManager = engine.getScene().getLightManager();
1587  lightManager.clearLights();
1588  engine.triggerRender();
1589  });
1590  }
1591 
1592  FileStats _getFileStats(const std::string& path)
1593  {
1594  FileStats ft;
1595  ft.type = "none";
1596  ft.error = 0;
1597  ft.sizeBytes = 0;
1598 
1599  // Make sure the requested file is within the sandbox path
1600  // TODO: Unhardcode sandbox path and add as braynsService input param
1601  const std::string& sandboxPath = _engine.getParametersManager().getApplicationParameters().getSandboxPath();
1602  size_t pos = path.find(sandboxPath);
1603  if (pos != 0)
1604  {
1605  ft.error = 1;
1606  ft.message = "Path falls outside the sandbox: " + sandboxPath;
1607  return ft;
1608  }
1609 
1610  if (path.find("../") == 0 || path.find("/../") != std::string::npos || path.rfind("/..") == 4)
1611  {
1612  ft.error = 3;
1613  ft.message = "Illegal path detected";
1614  return ft;
1615  }
1616 
1617  struct stat s;
1618  int err = stat(path.c_str(), &s);
1619  if (err == 0)
1620  {
1621  // Path is a regular file
1622  if (s.st_mode & S_IFREG)
1623  {
1624  // Test permissions with fopen
1625  FILE* permissionTest = fopen(path.c_str(), "r");
1626  int openError = errno;
1627  if (permissionTest != nullptr)
1628  fclose(permissionTest);
1629  if (openError == EACCES)
1630  {
1631  ft.error = 2;
1632  ft.message = "Permission for " + path + " denied";
1633  return ft;
1634  }
1635 
1636  // If we reached this point, file is accessible
1637  ft.type = "file";
1638  ft.sizeBytes = s.st_size;
1639  }
1640  // Path is a folder
1641  else if (s.st_mode & S_IFDIR)
1642  {
1643  // Test permissions with access
1644  if (access(path.c_str(), X_OK) < 0)
1645  {
1646  if (errno == EACCES)
1647  {
1648  ft.error = 2;
1649  ft.message = "Permission for " + path + " denied";
1650  return ft;
1651  }
1652  else
1653  {
1654  ft.error = 3;
1655  ft.message = "Unknown error";
1656  return ft;
1657  }
1658  }
1659 
1660  // If we reached this point, folder is accessible
1661  ft.type = "directory";
1662  }
1663  else
1664  {
1665  ft.message = "Unknown type of element";
1666  }
1667  }
1668  else if (err > 0)
1669  {
1670  if (err == EACCES)
1671  {
1672  ft.error = 2;
1673  ft.message = "Permission for " + path + " denied";
1674  }
1675  else
1676  {
1677  ft.error = 3;
1678  ft.message = "Unknown error";
1679  }
1680  }
1681 
1682  return ft;
1683  }
1684 
1686  {
1687  const RpcParameterDescription desc{METHOD_FS_EXISTS,
1688  "Return the type of filer (file or folder) if a given path exists, "
1689  "or none if it does not exists",
1690  "path", "Absolute path to the filer to check"};
1691  _handleRPC<InputPath, FileType>(desc,
1692  [&](const auto& inputPath)
1693  {
1694  this->_rebroadcast(METHOD_FS_EXISTS, to_json(inputPath),
1695  {_currentClientID});
1696  FileStats fs = this->_getFileStats(inputPath.path);
1697  FileType ft;
1698  ft.type = fs.type;
1699  ft.error = fs.error;
1700  ft.message = fs.message;
1701  return ft;
1702  });
1703  }
1704 
1706  {
1707  const RpcParameterDescription desc{METHOD_FS_GET_CONTENT,
1708  "Return the content of a file if possible, or an error otherwise", "path",
1709  "Absolute path to the file"};
1710  _handleRPC<InputPath, FileContent>(desc,
1711  [&](const auto& inputPath)
1712  {
1713  this->_rebroadcast(METHOD_FS_GET_CONTENT, to_json(inputPath),
1714  {_currentClientID});
1715  // Stat the requested file
1716  FileStats ft = this->_getFileStats(inputPath.path);
1717  FileContent fc;
1718 
1719  fc.error = ft.error;
1720  fc.message = ft.message;
1721 
1722  // Continue only if its accessible and its a file
1723  if (fc.error == 0 && ft.type == "file")
1724  {
1725  std::ifstream file(inputPath.path);
1726  if (file)
1727  {
1728  // Read file
1729  file.seekg(0, file.end);
1730  long len = file.tellg();
1731  file.seekg(0, file.beg);
1732  std::vector<char> buffer(static_cast<unsigned long>(len));
1733  file.read(&buffer[0], len);
1734  file.close();
1735  fc.content = std::string(buffer.begin(), buffer.end());
1736  }
1737  else
1738  {
1739  fc.error = 3;
1740  fc.message = "An unknown error occurred when reading the file " +
1741  inputPath.path;
1742  }
1743  }
1744 
1745  return fc;
1746  });
1747  }
1748 
1750  {
1751  const RpcDescription desc{METHOD_FS_GET_ROOT,
1752  "Return the root path of the current "
1753  "execution environment (sandbox)"};
1754  _bindEndpoint(METHOD_FS_GET_ROOT,
1755  [&engine = _engine](const rockets::jsonrpc::Request&)
1756  {
1757  FileRoot fr;
1758  fr.root = engine.getParametersManager().getApplicationParameters().getSandboxPath();
1759  return Response{to_json(fr)};
1760  });
1761 
1762  _handleSchema(METHOD_GET_LIGHTS, buildJsonRpcSchemaRequestReturnOnly<FileRoot>(desc));
1763  }
1764 
1766  {
1767  const RpcParameterDescription desc{METHOD_FS_LIST_DIR,
1768  "Return the content of a file if possible, or an error otherwise", "path",
1769  "Absolute path to the file"};
1770  _handleRPC<InputPath, DirectoryFileList>(
1771  desc,
1772  [&](const auto& inputPath)
1773  {
1774  this->_rebroadcast(METHOD_FS_LIST_DIR, to_json(inputPath), {_currentClientID});
1775 
1776  // Stat the requested path
1777  FileStats ft = this->_getFileStats(inputPath.path);
1778 
1779  DirectoryFileList dfl;
1780  dfl.error = ft.error;
1781  dfl.message = ft.message;
1782 
1783  // Continue only if its a directory and is accessible
1784  const bool isDirectory = ft.type == "directory";
1785  if (ft.error == 0 && isDirectory)
1786  {
1787  DIR* dir;
1788  struct dirent* ent;
1789  if ((dir = opendir(inputPath.path.c_str())) != nullptr)
1790  {
1791  // Iterate over each entry of the directory
1792  std::string slash = inputPath.path[inputPath.path.size() - 1] == '/' ? "" : "/";
1793  while ((ent = readdir(dir)) != nullptr)
1794  {
1795  const std::string fileName(ent->d_name);
1796 
1797  // Discard dots
1798  if (fileName == "." || fileName == "..")
1799  continue;
1800 
1801  // Get file status to known if accessible, type, and
1802  // size
1803  const std::string filePath = inputPath.path + slash + fileName;
1804  FileStats fileStats = this->_getFileStats(filePath);
1805 
1806  if (fileStats.error == 0)
1807  {
1808  if (fileStats.type == "directory")
1809  {
1810  dfl.dirs.push_back(fileName);
1811  }
1812  else if (fileStats.type == "file")
1813  {
1814  // Calc size in octets
1815  size_t totalBits =
1816  static_cast<size_t>(fileStats.sizeBytes) * static_cast<size_t>(CHAR_BIT);
1817  size_t totalOctets = totalBits / 8;
1818  dfl.files.names.push_back(fileName);
1819  dfl.files.sizes.push_back(totalOctets);
1820  }
1821  }
1822  }
1823  closedir(dir);
1824  }
1825  else
1826  {
1827  dfl.error = 3;
1828  dfl.message = "Unknown error when reading contents of directory " + inputPath.path;
1829  }
1830  }
1831  else if (!isDirectory && ft.error == 0)
1832  {
1833  dfl.error = 4;
1834  dfl.message = "The path " + inputPath.path + " is not a directory";
1835  }
1836 
1837  return dfl;
1838  });
1839  }
1840 
1842  {
1843  const RpcParameterDescription desc{METHOD_ADD_MODEL,
1844  "Add model from remote path; returns model descriptor on success",
1845  Execution::async, "model_param",
1846  "Model parameters including name, path, transformation, etc."};
1847 
1848  auto func = [&](const ModelParams& modelParams, const auto)
1849  { return std::make_shared<AddModelTask>(modelParams, _engine); };
1850  _handleTask<ModelParams, ModelDescriptorPtr>(desc, func);
1851  }
1852 
1854  {
1855  const RpcParameterDescription desc{METHOD_REMOVE_MODEL,
1856  "Remove the model(s) with the given ID(s) from the scene", "ids",
1857  "Array of model IDs"};
1858  _handleRPC<size_ts, bool>(desc,
1859  [&engine = _engine](const size_ts& ids)
1860  {
1861  for (const auto id : ids)
1862  engine.getScene().removeModel(id);
1863  engine.triggerRender();
1864  return true;
1865  });
1866  }
1867 
1869  {
1870  _bindEndpoint(METHOD_UPDATE_MODEL,
1871  [&engine = _engine](const rockets::jsonrpc::Request& request)
1872  {
1873  ModelDescriptor newDesc;
1874  if (!::from_json(newDesc, request.message))
1875  return Response::invalidParams();
1876 
1877  auto& scene = engine.getScene();
1878  if (auto model = scene.getModel(newDesc.getModelID()))
1879  {
1880  ::from_json(*model, request.message);
1881  scene.markModified();
1882  engine.triggerRender();
1883  return Response{to_json(true)};
1884  }
1885  return Response{to_json(false)};
1886  });
1887  const RpcParameterDescription desc{METHOD_UPDATE_MODEL, "Update the model with the given values", "model",
1888  "Model descriptor"};
1889  _handleSchema(METHOD_UPDATE_MODEL, buildJsonRpcSchemaRequest<ModelDescriptor, bool>(desc));
1890  }
1891 
1893  {
1894  const RpcParameterDescription desc{METHOD_GET_MODEL_PROPERTIES, "Get the properties of the given model", "id",
1895  "the model ID"};
1896 
1897  _jsonrpcServer->bind<ObjectID, PropertyMap>(desc.methodName,
1898  [&engine = _engine](const ObjectID& id)
1899  {
1900  auto model = engine.getScene().getModel(id.id);
1901  if (!model)
1902  throw rockets::jsonrpc::response_error("Model not found",
1903  MODEL_NOT_FOUND);
1904  return model->getProperties();
1905  });
1906 
1907  _handleSchema(METHOD_GET_MODEL_PROPERTIES, buildJsonRpcSchemaRequest<ObjectID, PropertyMap>(desc));
1908  }
1909 
1911  {
1912  const RpcParameterDescription desc{METHOD_SET_MODEL_PROPERTIES, "Set the properties of the given model",
1913  "param", "model ID and its properties"};
1914 
1915  _bindModelEndpoint(METHOD_SET_MODEL_PROPERTIES, "properties",
1916  [&](const std::string& json, ModelDescriptorPtr model)
1917  {
1918  auto props = model->getProperties();
1919  props.merge(jsonToPropertyMap(json));
1920  model->setProperties(props);
1921  return true;
1922  });
1923 
1924  _handleSchema(METHOD_SET_MODEL_PROPERTIES, buildJsonRpcSchemaRequest<ModelProperties, bool>(desc));
1925  }
1926 
1928  {
1929  const RpcParameterDescription desc{METHOD_MODEL_PROPERTIES_SCHEMA, "Get the property schema of the model", "id",
1930  "ID of the model get its properties schema"};
1931 
1932  _jsonrpcServer->bind(METHOD_MODEL_PROPERTIES_SCHEMA,
1933  [&engine = _engine](const auto& request)
1934  {
1935  ObjectID modelID;
1936  if (::from_json(modelID, request.message))
1937  {
1938  auto model = engine.getScene().getModel(modelID.id);
1939  if (!model)
1940  return Response{Response::Error{"Model not found", MODEL_NOT_FOUND}};
1941 
1942  return Response{buildJsonSchema(model->getProperties(), "ModelProperties")};
1943  }
1944  return Response::invalidParams();
1945  });
1946 
1947  _handleSchema(METHOD_MODEL_PROPERTIES_SCHEMA, buildJsonRpcSchemaRequest<ObjectID, std::string>(desc));
1948  }
1949 
1951  {
1952  const RpcParameterDescription desc{METHOD_GET_INSTANCES, "Get instances", "id, range",
1953  "Model id and result range"};
1954  _handleRPC<GetInstances, ModelInstances>(
1955  desc,
1956  [&engine = _engine](const GetInstances& param) -> ModelInstances
1957  {
1958  auto id = param.modelID;
1959  auto& scene = engine.getScene();
1960  auto model = scene.getModel(id);
1961  if (!model)
1962  throw rockets::jsonrpc::response_error("Model not found", MODEL_NOT_FOUND);
1963 
1964  const auto& instances = model->getInstances();
1965  const Vector2ui range{std::min(param.resultRange.x, unsigned(instances.size())),
1966  std::min(param.resultRange.y, unsigned(instances.size()))};
1967  return {instances.begin() + range.x, instances.begin() + range.y};
1968  });
1969  }
1970 
1972  {
1973  _handleRPC<std::vector<LoaderInfo>>({METHOD_GET_LOADERS, "Get all loaders"},
1974  [&]
1975  {
1976  auto& scene = _engine.getScene();
1977  return scene.getLoaderRegistry().getLoaderInfos();
1978  });
1979  }
1980 
1982  {
1983  _bindEndpoint(METHOD_UPDATE_INSTANCE,
1984  [&](const rockets::jsonrpc::Request& request)
1985  {
1986  ModelInstance newDesc;
1987  if (!::from_json(newDesc, request.message))
1988  return Response::invalidParams();
1989 
1990  auto& scene = _engine.getScene();
1991  auto model = scene.getModel(newDesc.getModelID());
1992  if (!model)
1993  throw rockets::jsonrpc::response_error("Model not found", MODEL_NOT_FOUND);
1994 
1995  auto instance = model->getInstance(newDesc.getInstanceID());
1996  if (!instance)
1997  throw rockets::jsonrpc::response_error("Instance not found", INSTANCE_NOT_FOUND);
1998 
1999  ::from_json(*instance, request.message);
2000  model->getModel().markInstancesDirty();
2001  scene.markModified(false);
2002 
2003  _engine.triggerRender();
2004  _rebroadcast(METHOD_UPDATE_INSTANCE, request.message, {request.clientID});
2005 
2006  return Response{to_json(true)};
2007  });
2008  const RpcParameterDescription desc{METHOD_UPDATE_INSTANCE, "Update the instance with the given values",
2009  "model_instance", "Model instance"};
2010  _handleSchema(METHOD_UPDATE_INSTANCE, buildJsonRpcSchemaRequest<ModelInstance, bool>(desc));
2011  }
2012 
2013  void _handlePropertyObject(PropertyObject& object, const std::string& endpoint, const std::string objectName)
2014  {
2015  const auto requestEndpoint = getRequestEndpointName(endpoint);
2016  const auto notifyEndpoint = getNotificationEndpointName(endpoint);
2017 
2018  _jsonrpcServer->bind<PropertyMap>(requestEndpoint, [&object = object] { return object.getPropertyMap(); });
2019 
2020  _bindEndpoint(notifyEndpoint,
2021  [&, notifyEndpoint](const auto& request)
2022  {
2023  object.updateProperties(jsonToPropertyMap(request.message));
2024  _engine.triggerRender();
2025  this->_rebroadcast(notifyEndpoint, request.message, {request.clientID});
2026  return Response{to_json(true)};
2027  });
2028 
2029  std::vector<std::pair<std::string, PropertyMap>> props;
2030  for (const auto& type : object.getTypes())
2031  props.push_back(std::make_pair(type, object.getPropertyMap(type)));
2032 
2033  // get-<object>-params RPC schema
2034  _handleSchema(requestEndpoint, buildJsonRpcSchemaRequestPropertyMaps(
2035  {requestEndpoint, "Get the params of the current " + objectName}, props));
2036 
2037  // set-<object>-params RPC schema
2038  const RpcParameterDescription desc{notifyEndpoint, "Set the params on the current " + objectName, "params",
2039  "new " + objectName + " params"};
2040  _handleSchema(notifyEndpoint, buildJsonRpcSchemaNotifyPropertyMaps(desc, props));
2041 
2042  // <object>-params object schema
2043  _handleSchema(endpoint, buildJsonSchema(props, hyphenatedToCamelCase(endpoint)));
2044  }
2045 
2047  {
2048  const RpcDescription desc{LOADERS_SCHEMA, "Get the schema for all loaders"};
2049 
2050  _bindEndpoint(LOADERS_SCHEMA,
2051  [&](const rockets::jsonrpc::Request& /*request*/)
2052  {
2053  const auto& loaderInfos = _engine.getScene().getLoaderRegistry().getLoaderInfos();
2054 
2055  std::vector<std::pair<std::string, PropertyMap>> props;
2056  for (const LoaderInfo& li : loaderInfos)
2057  props.emplace_back(li.name, li.properties);
2058 
2059  return Response{buildJsonSchema(props, "loaders")};
2060  });
2061 
2062  _handleSchema(LOADERS_SCHEMA, buildJsonRpcSchemaRequestReturnOnly<std::vector<PropertyMap>>(desc));
2063  }
2064 
2066  {
2067  const RpcParameterDescription desc{METHOD_SET_ENVIRONMENT_MAP, "Set a environment map in the scene", "filename",
2068  "environment map texture file"};
2069 
2070  _handleRPC<EnvironmentMapParam, bool>(desc,
2071  [&](const auto& envMap)
2072  {
2073  this->_rebroadcast(METHOD_SET_ENVIRONMENT_MAP, to_json(envMap),
2074  {_currentClientID});
2075  if (_engine.getScene().setEnvironmentMap(envMap.filename))
2076  {
2077  _engine.triggerRender();
2078  return true;
2079  }
2080  return false;
2081  });
2082  }
2083 
2085  {
2086  const RpcDescription desc{METHOD_GET_ENVIRONMENT_MAP, "Get the environment map from the scene"};
2087 
2088  _handleRPC<EnvironmentMapParam>(desc,
2089  [&]() -> EnvironmentMapParam
2090  { return {_engine.getScene().getEnvironmentMap()}; });
2091  }
2092 
2094  {
2095  const RpcParameterDescription desc{METHOD_SET_VIDEOSTREAM, "Set the video streaming parameters", "params",
2096  "videostream parameters"};
2097 
2098  auto action = [&](const VideoStreamParam& params, auto clientID, auto respond, auto)
2099  {
2100  if (!_parametersManager.getApplicationParameters().useVideoStreaming())
2101  {
2102  respond(Response{Response::Error(VIDEOSTREAM_NOT_ENABLED_ERROR)});
2103  return rockets::jsonrpc::CancelRequestCallback();
2104  }
2105 
2106 #ifdef BRAYNS_USE_FFMPEG
2107  this->_rebroadcast(METHOD_SET_VIDEOSTREAM, to_json(params), {clientID});
2108 
2109  const bool changed = params != _videoParams;
2110  if (!changed)
2111  {
2112  respond(Response{to_json(false)});
2113  return rockets::jsonrpc::CancelRequestCallback();
2114  }
2115 
2116  _engine.triggerRender();
2117  _videoParams = params;
2118  _videoUpdatedResponse = [respond] { respond(Response{to_json(true)}); };
2119  return rockets::jsonrpc::CancelRequestCallback();
2120 #else
2121  respond(Response{Response::Error(VIDEOSTREAM_NOT_SUPPORTED_ERROR)});
2122  return rockets::jsonrpc::CancelRequestCallback();
2123 #endif
2124  };
2125  _handleAsyncRPC<VideoStreamParam, bool>(desc, action);
2126  }
2127 
2129  {
2130  const RpcDescription desc{METHOD_GET_VIDEOSTREAM, "Get the videostream parameters"};
2131 
2132  _handleRPC<VideoStreamParam>(desc,
2133  [&]() -> VideoStreamParam
2134  {
2135 #ifdef BRAYNS_USE_FFMPEG
2136  if (!_parametersManager.getApplicationParameters().useVideoStreaming())
2137  throw rockets::jsonrpc::response_error(VIDEOSTREAM_NOT_ENABLED_ERROR);
2138  return _videoParams;
2139 #else
2140  throw rockets::jsonrpc::response_error(VIDEOSTREAM_NOT_SUPPORTED_ERROR);
2141 #endif
2142  });
2143  }
2144 
2146  {
2147  _controlledStreamingFlag = true;
2148  _engine.triggerRender();
2149  }
2150 
2152 
2153  std::unordered_map<std::string, std::pair<std::mutex, Throttle>> _throttle;
2154  std::vector<std::function<void()>> _delayedNotifies;
2156  static constexpr uintptr_t NO_CURRENT_CLIENT{0};
2157  uintptr_t _currentClientID{NO_CURRENT_CLIENT};
2158 
2159 #ifdef USE_NETWORKING
2160  std::shared_ptr<uvw::AsyncHandle> _processDelayedNotifies;
2161 #endif
2162 
2163  std::unordered_map<std::string, std::string> _schemas;
2164 
2166 
2167  std::unique_ptr<rockets::Server> _rocketsServer;
2168  using JsonRpcServer = rockets::jsonrpc::Server<rockets::Server>;
2169  std::unique_ptr<JsonRpcServer> _jsonrpcServer;
2170 
2171  bool _manualProcessing{true};
2172 
2174 
2176  float _leftover{0.f};
2177 
2178  std::map<TaskPtr, std::shared_ptr<async::task<void>>> _tasks;
2179  std::mutex _tasksMutex;
2180 
2182  // need to delay those as we are initialized first, but other plugins might
2183  // alter the list of renderers for instance
2184  bool _endpointsRegistered{false};
2185 
2186  // Wether to use controlled stream (true = client request frames, false =
2187  // continous stream of frames)
2188  bool _useControlledStream{false};
2189  // Flag used to control the frame send when _useControlledStream = true
2190  std::atomic<bool> _controlledStreamingFlag{false};
2191 
2192  // Wether a scheduled shutdown is running at the momment
2193  bool _scheduledShutdownActive{false};
2194  // Flag to cancel current scheduled shutdown
2195  bool _cancelScheduledShutdown{false};
2196  // Worker in charge of shutdown
2197  std::unique_ptr<std::thread> _shutDownWorker;
2198  // Lock for safe schedule
2199  std::mutex _scheduleMutex;
2200  // Schedule mechanism
2201  std::mutex _waitLock;
2202  std::condition_variable _monitor;
2203 
2204  std::vector<BaseObject*> _objects;
2205 
2206 #ifdef BRAYNS_USE_FFMPEG
2207  std::unique_ptr<Encoder> _encoder;
2208  VideoStreamParam _videoParams;
2209  std::function<void()> _videoUpdatedResponse;
2210 #endif
2211 };
2212 
2213 RocketsPlugin::~RocketsPlugin()
2214 {
2215  _api->setActionInterface(ActionInterfacePtr());
2216 }
2217 
2219 {
2220  _impl = std::make_shared<Impl>(_api);
2221  _api->setActionInterface(_impl);
2222 }
2223 
2224 void RocketsPlugin::preRender()
2225 {
2226  _impl->preRender();
2227 }
2228 void RocketsPlugin::postRender()
2229 {
2230  _impl->postRender();
2231 }
2232 } // namespace core
std::function< std::string(std::string)> RetParamFunc
std::function< void(std::string)> ParamFunc
std::function< void()> VoidFunc
std::function< std::string()> RetFunc
The Camera class is an abstract interface for a camera in a 3D graphics application....
Definition: Camera.h:41
size_t getID() const
Returns id of this clip plane object.
Definition: ClipPlane.h:60
Provides an abstract implementation of a ray-tracing engine.
Definition: Engine.h:59
Manages light sources in a scene.
Definition: LightManager.h:41
PLATFORM_API size_t addLight(LightPtr light)
addLight Attaches a light source to the scene.
The ModelDescriptor struct defines the metadata attached to a model.Model descriptor are exposed via ...
Definition: Model.h:285
A class representing an instance of a 3D model.
Definition: Model.h:80
PLATFORM_API size_t getInstanceID() const
Get the value of _instanceID.
Definition: Model.h:161
PLATFORM_API size_t getModelID() const
Get the value of _modelID.
Definition: Model.h:149
The ModelParams class represents the parameters needed for initializing a model instance.
Definition: Model.h:178
PLATFORM_API ApplicationParameters & getApplicationParameters()
void setProperty(const Property &newProperty)
Definition: PropertyMap.h:307
Renderer class inherits from PropertyObject class The Renderer class has methods to render a FrameBuf...
Definition: Renderer.h:42
std::unordered_map< std::string, std::string > _schemas
std::unique_ptr< rockets::Server > _rocketsServer
void _bindModelEndpoint(const std::string &method, const std::string &key, std::function< bool(std::string, ModelDescriptorPtr)> action)
std::condition_variable _monitor
void _handleRPC(const RpcDescription &desc, std::function< void()> action)
std::unique_ptr< std::thread > _shutDownWorker
void _delayedNotify(const std::function< void()> &notify)
void _bindEndpoint(const std::string &method, rockets::jsonrpc::VoidCallback action)
ParametersManager & _parametersManager
void _handleTask(const RpcParameterDescription &desc, std::function< std::shared_ptr< Task< R >>(P, uintptr_t)> createTask)
void _handleRPC(const RpcParameterDescription &desc, std::function< void(P)> action)
void _handlePUT(const std::string &endpoint, T &obj, PRE preUpdateFunc, POST postUpdateFunc)
void _bindEndpoint(const std::string &method, rockets::jsonrpc::ResponseCallback action)
void _bindEndpoint(const std::string &method, std::function< RetVal(Params)> action)
void _handleObjectSchema(const std::string &endpoint, T &obj)
void _bindEndpoint(const std::string &method, std::function< void(Params)> action)
void registerNotification(const RpcDescription &desc, const std::function< void()> &action)
FileStats _getFileStats(const std::string &path)
void _registerRequest(const std::string &name, const RetParamFunc &action)
void _bindEndpoint(const std::string &method, std::function< RetVal()> action)
std::vector< std::function< void()> > _delayedNotifies
void _handleSchema(const std::string &endpoint, const std::string &schema)
void _handleObjectSchema(const std::string &endpoint)
void registerNotification(const RpcParameterDescription &desc, const PropertyMap &input, const std::function< void(PropertyMap)> &action)
std::vector< BaseObject * > _objects
void _registerNotification(const std::string &name, const ParamFunc &action)
void _handlePUT(const std::string &endpoint, T &obj)
void _handlePropertyObject(PropertyObject &object, const std::string &endpoint, const std::string objectName)
void _rebroadcast(const std::string &endpoint, const std::string &message, const std::set< uintptr_t > &filter)
void registerRequest(const RpcParameterDescription &desc, const PropertyMap &input, const PropertyMap &output, const std::function< PropertyMap(PropertyMap)> &action)
rockets::jsonrpc::Server< rockets::Server > JsonRpcServer
std::unordered_map< std::string, std::pair< std::mutex, Throttle > > _throttle
void _handleGET(const std::string &endpoint, T &obj, const int64_t throttleTime=DEFAULT_THROTTLE)
std::map< TaskPtr, std::shared_ptr< async::task< void > > > _tasks
void _handleRPC(const RpcDescription &desc, std::function< R()> action)
std::unique_ptr< JsonRpcServer > _jsonrpcServer
void _handle(const std::string &endpoint, T &obj, const int64_t throttleTime=DEFAULT_THROTTLE)
void _handleRPC(const RpcParameterDescription &desc, std::function< R(P)> action)
void _registerNotification(const std::string &name, const VoidFunc &action)
void _handleAsyncRPC(const RpcParameterDescription &desc, std::function< rockets::jsonrpc::CancelRequestCallback(P, uintptr_t, rockets::jsonrpc::AsyncResponse, rockets::jsonrpc::ProgressUpdateCallback)> action)
void registerRequest(const RpcDescription &desc, const PropertyMap &output, const std::function< PropertyMap()> &action)
void _registerRequest(const std::string &name, const RetFunc &action)
async::task< T > Type
Definition: Task.h:98
auto & get()
Definition: Task.h:123
std::string to_json(const core::PropertyMap &obj)
core::PropertyMap jsonToPropertyMap(const std::string &json)
std::string join(const std::vector< std::string > &strings, const std::string &joinWith)
Definition: StringUtils.cpp:93
const std::string METHOD_REQUEST_MODEL_UPLOAD
std::string buildJsonRpcSchemaRequestPropertyMaps(const RpcDescription &desc, const std::vector< std::pair< std::string, PropertyMap >> &objs)
std::string buildJsonRpcSchemaNotifyPropertyMaps(const RpcParameterDescription &desc, const std::vector< std::pair< std::string, PropertyMap >> &objs)
bool preUpdate(const std::string &json, PRE preUpdateFunc, typename std::enable_if<!std::is_abstract< T >::value >::type *=0)
std::string buildJsonRpcSchemaNotify(const RpcParameterDescription &desc, P &obj)
Definition: jsonUtils.h:167
std::shared_ptr< ModelDescriptor > ModelDescriptorPtr
Definition: Types.h:112
bool from_json(T &obj, const std::string &json, PRE preUpdateFunc=[] {}, POST postUpdateFunc=[] {})
std::string buildJsonRpcSchemaRequestReturnOnly(const RpcDescription &desc, R &retVal)
Definition: jsonUtils.h:135
std::string buildJsonSchema(std::vector< std::pair< std::string, PropertyMap >> &objs, const std::string &title)
std::string buildJsonRpcSchemaRequestPropertyMap(const RpcDescription &desc, const PropertyMap &obj)
std::string buildJsonRpcSchemaNotifyPropertyMap(const RpcParameterDescription &desc, const PropertyMap &properties)
std::shared_ptr< ActionInterface > ActionInterfacePtr
Definition: Types.h:82
glm::vec< 2, uint32_t > Vector2ui
Definition: MathTypes.h:133
std::vector< ModelInstance > ModelInstances
Definition: Types.h:114
std::array< double, 4 > Plane
Definition: Types.h:308
bool preUpdate(const std::string &, PRE, typename std::enable_if< std::is_abstract< T >::value >::type *=0)
void init(core::Box< U > *, ObjectHandler *)
_LINK_LIBRARIES PUBLIC Core uv if(PLATFORM_NETWORKING_ENABLED) list(APPEND $
Definition: CMakeLists.txt:35
_LINK_LIBRARIES PRIVATE CorePluginRockets endif() add_executable($
Definition: CMakeLists.txt:37
@ plane
Definition: CommonTypes.h:39
@ vector
Definition: CommonTypes.h:68
#define CORE_DEBUG(__msg)
Definition: Logs.h:38
#define CORE_WARN(__msg)
Definition: Logs.h:32
#define CORE_INFO(__msg)
Definition: Logs.h:33
#define CORE_ERROR(__msg)
Definition: Logs.h:31
std::vector< size_t > size_ts
Definition: Types.h:56
std::string message
PropertyMap properties
ScopedCurrentClient(uintptr_t &currentClientID, const uintptr_t newID)
std::string methodName
Definition: Types.h:367