# ##### BEGIN GPL LICENSE BLOCK ##### # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # ##### END GPL LICENSE BLOCK ##### import logging import sys import traceback import bpy from replication.constants import (FETCHED, RP_COMMON, STATE_ACTIVE, STATE_INITIAL, STATE_LOBBY, STATE_QUITTING, STATE_SRV_SYNC, STATE_SYNCING, UP) from replication.exception import NonAuthorizedOperationError, ContextError from replication.interface import session from replication.porcelain import apply, add from . import operators, utils from .presence import (UserFrustumWidget, UserNameWidget, UserSelectionWidget, generate_user_camera, get_view_matrix, refresh_3d_view, refresh_sidebar_view, renderer) this = sys.modules[__name__] # Registered timers this.registry = dict() def is_annotating(context: bpy.types.Context): """ Check if the annotate mode is enabled """ return bpy.context.workspace.tools.from_space_view3d_mode('OBJECT', create=False).idname == 'builtin.annotate' class Timer(object): """Timer binder interface for blender Run a bpy.app.Timer in the background looping at the given rate """ def __init__(self, timeout=10, id=None): self._timeout = timeout self.is_running = False self.id = id if id else self.__class__.__name__ def register(self): """Register the timer into the blender timer system """ if not self.is_running: this.registry[self.id] = self bpy.app.timers.register(self.main) self.is_running = True logging.debug(f"Register {self.__class__.__name__}") else: logging.debug( f"Timer {self.__class__.__name__} already registered") def main(self): try: self.execute() except Exception as e: logging.error(e) self.unregister() session.disconnect(reason=f"Error during timer {self.id} execution") else: if self.is_running: return self._timeout def execute(self): """Main timer loop """ raise NotImplementedError def unregister(self): """Unnegister the timer of the blender timer system """ if bpy.app.timers.is_registered(self.main): logging.info(f"Unregistering {self.id}") bpy.app.timers.unregister(self.main) del this.registry[self.id] self.is_running = False class SessionBackupTimer(Timer): def __init__(self, timeout=10, filepath=None): self._filepath = filepath super().__init__(timeout) def execute(self): session.save(self._filepath) class SessionListenTimer(Timer): def execute(self): session.listen() class ApplyTimer(Timer): def execute(self): if session and session.state == STATE_ACTIVE: nodes = session.list() for node in nodes: node_ref = session.repository.get_node(node) if node_ref.state == FETCHED: try: apply(session.repository, node) except Exception as e: logging.error(f"Fail to apply {node_ref.uuid}") traceback.print_exc() else: if node_ref.bl_reload_parent: for parent in session.repository.get_parents(node): logging.debug("Refresh parent {node}") apply(session.repository, parent.uuid, force=True) class DynamicRightSelectTimer(Timer): def __init__(self, timeout=.1): super().__init__(timeout) self._last_selection = [] self._user = None self._annotating = False def execute(self): settings = utils.get_preferences() if session and session.state == STATE_ACTIVE: # Find user if self._user is None: self._user = session.online_users.get(settings.username) if self._user: ctx = bpy.context annotation_gp = ctx.scene.grease_pencil if annotation_gp and not annotation_gp.uuid: ctx.scene.update_tag() # if an annotation exist and is tracked if annotation_gp and annotation_gp.uuid: registered_gp = session.repository.get_node(annotation_gp.uuid) if is_annotating(bpy.context): # try to get the right on it if registered_gp.owner == RP_COMMON: self._annotating = True logging.debug( "Getting the right on the annotation GP") session.change_owner( registered_gp.uuid, settings.username, ignore_warnings=True, affect_dependencies=False) if registered_gp.owner == settings.username: gp_node = session.repository.get_node(annotation_gp.uuid) if gp_node.has_changed(): session.commit(gp_node.uuid) session.push(gp_node.uuid, check_data=False) elif self._annotating: session.change_owner( registered_gp.uuid, RP_COMMON, ignore_warnings=True, affect_dependencies=False) current_selection = utils.get_selected_objects( bpy.context.scene, bpy.data.window_managers['WinMan'].windows[0].view_layer ) if current_selection != self._last_selection: obj_common = [ o for o in self._last_selection if o not in current_selection] obj_ours = [ o for o in current_selection if o not in self._last_selection] # change old selection right to common for obj in obj_common: node = session.repository.get_node(obj) if node and (node.owner == settings.username or node.owner == RP_COMMON): recursive = True if node.data and 'instance_type' in node.data.keys(): recursive = node.data['instance_type'] != 'COLLECTION' try: session.change_owner( node.uuid, RP_COMMON, ignore_warnings=True, affect_dependencies=recursive) except NonAuthorizedOperationError: logging.warning( f"Not authorized to change {node} owner") # change new selection to our for obj in obj_ours: node = session.repository.get_node(obj) if node and node.owner == RP_COMMON: recursive = True if node.data and 'instance_type' in node.data.keys(): recursive = node.data['instance_type'] != 'COLLECTION' try: session.change_owner( node.uuid, settings.username, ignore_warnings=True, affect_dependencies=recursive) except NonAuthorizedOperationError: logging.warning( f"Not authorized to change {node} owner") else: return self._last_selection = current_selection user_metadata = { 'selected_objects': current_selection } session.update_user_metadata(user_metadata) logging.debug("Update selection") # Fix deselection until right managment refactoring (with Roles concepts) if len(current_selection) == 0 : owned_keys = session.list( filter_owner=settings.username) for key in owned_keys: node = session.repository.get_node(key) try: session.change_owner( key, RP_COMMON, ignore_warnings=True, affect_dependencies=recursive) except NonAuthorizedOperationError: logging.warning( f"Not authorized to change {key} owner") for obj in bpy.data.objects: object_uuid = getattr(obj, 'uuid', None) if object_uuid: is_selectable = not session.is_readonly(object_uuid) if obj.hide_select != is_selectable: obj.hide_select = is_selectable class ClientUpdate(Timer): def __init__(self, timeout=.1): super().__init__(timeout) self.handle_quit = False self.users_metadata = {} def execute(self): settings = utils.get_preferences() if session and renderer: if session.state in [STATE_ACTIVE, STATE_LOBBY]: local_user = session.online_users.get( settings.username) if not local_user: return else: for username, user_data in session.online_users.items(): if username != settings.username: cached_user_data = self.users_metadata.get( username) new_user_data = session.online_users[username]['metadata'] if cached_user_data is None: self.users_metadata[username] = user_data['metadata'] elif 'view_matrix' in cached_user_data and 'view_matrix' in new_user_data and cached_user_data['view_matrix'] != new_user_data['view_matrix']: refresh_3d_view() self.users_metadata[username] = user_data['metadata'] break else: self.users_metadata[username] = user_data['metadata'] local_user_metadata = local_user.get('metadata') scene_current = bpy.context.scene.name local_user = session.online_users.get(settings.username) current_view_corners = generate_user_camera() # Init client metadata if not local_user_metadata or 'color' not in local_user_metadata.keys(): metadata = { 'view_corners': get_view_matrix(), 'view_matrix': get_view_matrix(), 'color': (settings.client_color.r, settings.client_color.g, settings.client_color.b, 1), 'frame_current': bpy.context.scene.frame_current, 'scene_current': scene_current } session.update_user_metadata(metadata) # Update client representation # Update client current scene elif scene_current != local_user_metadata['scene_current']: local_user_metadata['scene_current'] = scene_current session.update_user_metadata(local_user_metadata) elif 'view_corners' in local_user_metadata and current_view_corners != local_user_metadata['view_corners']: local_user_metadata['view_corners'] = current_view_corners local_user_metadata['view_matrix'] = get_view_matrix( ) session.update_user_metadata(local_user_metadata) class SessionStatusUpdate(Timer): def __init__(self, timeout=1): super().__init__(timeout) def execute(self): refresh_sidebar_view() class SessionUserSync(Timer): def __init__(self, timeout=1): super().__init__(timeout) self.settings = utils.get_preferences() def execute(self): if session and renderer: # sync online users session_users = session.online_users ui_users = bpy.context.window_manager.online_users for index, user in enumerate(ui_users): if user.username not in session_users.keys() and \ user.username != self.settings.username: renderer.remove_widget(f"{user.username}_cam") renderer.remove_widget(f"{user.username}_select") renderer.remove_widget(f"{user.username}_name") ui_users.remove(index) break for user in session_users: if user not in ui_users: new_key = ui_users.add() new_key.name = user new_key.username = user if user != self.settings.username: renderer.add_widget( f"{user}_cam", UserFrustumWidget(user)) renderer.add_widget( f"{user}_select", UserSelectionWidget(user)) renderer.add_widget( f"{user}_name", UserNameWidget(user)) class MainThreadExecutor(Timer): def __init__(self, timeout=1, execution_queue=None): super().__init__(timeout) self.execution_queue = execution_queue def execute(self): while not self.execution_queue.empty(): function, kwargs = self.execution_queue.get() logging.debug(f"Executing {function.__name__}") function(**kwargs)