Source code for ftrack_action_handler.action.advanced

# :coding: utf-8
# :copyright: Copyright (c) 2017-2021 ftrack

import json
import logging
import os
import uuid
from ftrack_action_handler.action import BaseAction

logging.basicConfig(level=logging.INFO)


# --------------------------------------------------------------
# Advanced Action Class.
# --------------------------------------------------------------


[docs]class AdvancedBaseAction(BaseAction): '''Custom Action base class `label` a descriptive string identifying your action. `variant` To group actions together, give them the same label and specify a unique variant per action. `identifier` a unique identifier for your action. `description` a verbose descriptive text for you action ''' __KNOWN_TYPES__ = ['Context', 'AssetVersion', 'FileComponent'] # TODO: maybe always append uuid to identifier? # Action filters allowed_roles = [] # Roles allowed for this action to run allowed_groups = [] # Groups allowed for this action to run ignored_types = [] # Types ignored for this action to run allowed_types = [] # Types allowed for this action to run limit_to_user = None # Limit the action to the user which spans it allow_empty_context = False # Allow to run without a selection def __repr__(self): '''Action object representation.''' return '<{0}:{1}>'.format(self.__class__.__name__, self.identifier)
[docs] def __init__(self, session, limit_to_user=None, make_unique=False): '''Expects a ftrack_api.Session instance and optional user limiter''' super(AdvancedBaseAction, self).__init__(session) if not all([self.label, self.identifier]): msg = ( 'Error initializing action {0} :' ' mandatory variables are set to : {1}'.format( self.__class__.__name__, ', '.join( [ self.label or 'No Action label Set', self.identifier or 'No Action identifier Set', ] ), ) ) self.logger.critical(msg) raise RuntimeError(msg) self.raw_identifier = self.identifier if limit_to_user: self.limit_to_user = limit_to_user # if the action is tied to one user only # we need to make the action name # unique otherwise it will # trigger all actions that are on the event hub # with that name self.identifier = '{}_{}'.format( self.identifier, str(uuid.uuid4()) ) if not limit_to_user and make_unique: self.identifier = '{}_{}'.format( self.identifier, str(uuid.uuid4()) ) self._session = session self.job_id = None prefix = os.getenv('FTRACK_ACTION_PREFIX', None) if prefix: self.label = '{} {}'.format(prefix.title(), self.label) self.identifier = '{}_{}'.format(prefix, self.identifier) self.logger.debug(self.label) self.logger.debug(self.identifier)
# -------------------------------------------------------------- # Settings stored in user metadata # --------------------------------------------------------------
[docs] def read_settings_from_user(self, event): '''read settings from the user if there are any Returns a dict like values coming from the interface ''' action_user = self.get_action_user(event) return json.loads( action_user['metadata'].get(self.raw_identifier, '{}'))
[docs] def write_settings_to_user(self, event, settings=None): '''*event* the unmodified original event *settings* dict with information to store related to the action if none is provided all values from the event[data] are taken ''' if not settings: settings = event['data']['values'] action_user = self.get_action_user(event) action_user['metadata'][self.raw_identifier] = json.dumps(settings) self.logger.info('stored {0} on user {1}'.format(settings, action_user)) action_user.session.commit()
# -------------------------------------------------------------- # Custom Action methods # -------------------------------------------------------------- def _identify_entity_(self, entity): '''Identify provided *entity*.''' entity_types = self.__KNOWN_TYPES__ entity_type = None _id = entity.get('entityId') for entity_type in entity_types: entity = self.session.get(entity_type, _id) has_type = getattr(entity, 'entity_type', None) if has_type: entity_type = has_type break if not entity_type: msg = 'Could not identify entity {0}'.format(entity) self.logger.critical(msg) raise RuntimeError(msg) return entity_type def _get_selection_(self, event): '''From a raw *event* dictionary, extract the selected entities.''' data = event['data'] selection = data.get('selection', []) return selection
[docs] def get_action_user(self, event): '''From a raw *event* dictionary, extract the source user, and return it in form of an :py:class:`ftrack.UserEntity` ''' return self.session.query( 'select id, user_security_roles, username, memberships' ' from User where username is "{0}"'.format( event['source']['user']['username'] ) ).one()
def _check_permissions_(self, ftrack_user): '''Checks that the specified *ftrack_user* has the permissions set in :py:attr:`base._base_action.BaseAction.ALLOWED_GROUPS` and :py:attr:`base._base_action.BaseAction.ALLOWED_ROLES`.''' group_valid = True role_valid = True if not self.allowed_roles and not self.allowed_groups: return True if self.allowed_groups: group_valid = False groups = self.session.query( 'select name, memberships from Group' ).all() group_users = [] for group in groups: if group['name'] not in self.allowed_groups: continue for member in group['memberships']: group_users.append(member) group_users = [x['username'] for x in group_users] if ftrack_user['username'] not in group_users: group_valid = False if self.allowed_roles: role_valid = False _roles = [] roles = [ r['security_role']['name'] for r in ftrack_user['user_security_roles'] ] for role in roles: _roles.append(role in self.allowed_roles) role_valid = any(_roles) result = group_valid and role_valid return result def _check_allowed_types_(self, selection): '''Check whether the entities in *selection* are among the :py:attr:`base._base_action.BaseAction.IGNORED_TYPES` or in :py:attr:`base._base_action.BaseAction.ALLOWED_TYPES`. ''' if not selection: if self.allow_empty_context: return True return False if self.ignored_types: for selected_item in selection: entity_type = self._identify_entity_(selected_item) if entity_type in self.ignored_types: self.logger.debug( 'Ignoring. Item of type %s is in ignored types: %s', entity_type, self.ignored_types, ) return False if self.allowed_types: for selected_item in selection: entity_type = self._identify_entity_(selected_item) if entity_type not in self.allowed_types: self.logger.debug( 'Ignoring. Type %s it is not in allowed types: %s', entity_type, self.allowed_types, ) return False return True def _check_limit_to_user_(self, action_user): '''Check whether this action should be allowed only on the current user. ''' if self.limit_to_user is not None: if action_user['username'] != self.limit_to_user: return False return True return True def _get_entity_type(self, entity): '''Return translated entity type that can be used with API.''' # Get entity type and make sure it is lower cased. Most places except # the component tab in the Sidebar will use lower case notation. entity_type = entity.get('entityType').replace('_', '').lower() for schema in self.session.schemas: alias_for = schema.get('alias_for') if ( alias_for and isinstance(alias_for, str) and alias_for.lower() == entity_type ): return schema['id'] for schema in self.session.schemas: if schema['id'].lower() == entity_type: return schema['id'] raise ValueError( 'Unable to translate entity type: {0}.'.format(entity_type) ) # -------------------------------------------------------------- # Default Action Method Overwrites # -------------------------------------------------------------- def _discover(self, event): entities = self._translate_event(self.session, event) self.logger.info(entities) discoverable = True # Check user. action_user = self.get_action_user(event) user_only = self._check_limit_to_user_(action_user) if not user_only: self.logger.debug( 'Action %s is not enabled for user %s', self.identifier, action_user['username'], ) discoverable = False # Collect and check selected entities. selection = self._get_selection_(event) is_allowed = self._check_allowed_types_(selection) if not is_allowed: self.logger.debug( 'Action %s is not allowed for the selected types.', self.identifier, ) discoverable = False # Check permissions and groups has_permissions = self._check_permissions_(action_user) if not has_permissions: self.logger.debug( 'Action %s is not enabled user %s' ' does not have permissions.', self.identifier, action_user['username'], ) discoverable = False accepts = self.discover(self.session, entities, event) if accepts and discoverable: self.logger.debug('Action: %s discovered', self.label) return { 'items': [ { 'icon': self.icon, 'label': self.label, 'variant': self.variant, 'description': self.description, 'actionIdentifier': self.identifier, } ] }
[docs] def discover(self, session, entities, event): '''Return true if we can handle the selected entities. *session* is a `ftrack_api.Session` instance *entities* is a list of tuples each containing the entity type and the entity id. If the entity is a hierarchical you will always get the entity type TypedContext, once retrieved through a get operation you will have the 'real' entity type ie. example Shot, Sequence or Asset Build. *event* the unmodified original event ''' return True
# -------------------------------------------------------------- # Job management. # --------------------------------------------------------------
[docs] def create_job(self, event, description): '''Create a new job.''' user_id = event['source']['user']['id'] job = self.session.create( 'Job', { 'user': self.session.get('User', user_id), 'status': 'running', 'data': json.dumps({'description': u'{}'.format(description)}), }, ) self.session.commit() job_id = job.get('id') self.job_id = job_id return self.job_id
[docs] def attach_component_to_job(self, job_id, component_id, description): '''Attach a component to a job.''' self.session.create( 'JobComponent', {'component_id': component_id, 'job_id': job_id} ) job = self.session.get('Job', job_id) job['data'] = json.dumps({'description': u'{}'.format(description)}) job['status'] = 'done' self.session.commit()
[docs] def mark_job_as_failed(self, job_id, error_message): '''Mark a job as failed.''' job = self.session.get('Job', job_id) job['data'] = json.dumps({'description': u'{}'.format(error_message)}) job['status'] = 'failed' self.session.commit()
[docs] def mark_job_as_done(self, job_id, description): '''Mark a job as done.''' job = self.session.get('Job', job_id) job['data'] = json.dumps({'description': u'{}'.format(description)}) job['status'] = 'done' self.session.commit()