import json from typing import Union, List, Optional from urllib.parse import urljoin from requests import Session, Response from requests.exceptions import RequestException from .patchbuilder import PatchBuilder from .simplenode import SimpleNode # TODO validation """ http://localhost:4502/crx/de/init.jsp?_dc=1549392939742 http://localhost:4502/crx/de/nodetypes.jsp?_dc=1549392939958 http://localhost:4502/crx/server/crx.default/jcr%3aroot/libs.1.json?_dc=1549392123434&node=xnode-265 http://localhost:4502/crx/de/query.jsp?_dc=1549392245191&_charset_=utf-8&type=xpath&stmt=%2Fjcr%3Aroot%2Fbin%2F%2F*%5Bjcr%3Acontains(.%2C%20%27asdf%27)%5D%20order%20by%20%40jcr%3Ascore&showResults=true http://app30-prd-asd.sbnl.vancis.nl:4502/bin/wcm/references.json?path=%2Fcontent%2Fdam%2Fbeeldbank%2F_0005_home_algemeen.png&predicate=wcmcontent&_charset_=utf-8 Download: http://app30-prd-asd.sbnl.vancis.nl:4502/crx/server/crx.default/jcr:root/content/dam/beeldbank/vrouw-direct-naar.jpg/jcr:content/renditions/original/jcr:content/jcr:data OR download.jsp?path=%2Fcontent%2Fdam%2Fbeeldbank%2Fvrouw-direct-naar.jpg%2Fjcr%3Acontent%2Frenditions%2Foriginal%2Fjcr%3Acontent%2Fjcr%3Adata&index=0 Delete asset via trash /bin/wcmcommand --data "path="%"2Fcontent"%"2Fdam"%"2Flandelijk"%"2Fjeugdbibliotheek15-18"%"2F615.swf&_charset_=utf-8&cmd=deletePage&force=true" """ CRX_SERVER_ROOT = '/crx/server/crx.default/jcr:root/' CRX_QUERY = '/crx/de/query.jsp' PACKMGR_EXEC = '/crx/packmgr/service/exec.json' PACKMGR_UPDATE = '/crx/packmgr/update.jsp' WCM_COMMAND = '/bin/wcmcommand' WCM_REFERENCES = '/bin/wcm/references.json' WCM_PAGE_REFERENCES = '/libs/wcm/core/content/reference.json' WCM_REPLICATE = '/bin/replicate.json' WORKFLOW_INSTANCES = '/etc/workflow/instances' WORKFLOW_LIST_MODELS = '/libs/cq/workflow/content/console/workflows.json' CREATE_ASSET = '.createasset.html' SECURITY_AUTHORIZABLES = "/bin/security/authorizables.json" JSON_DATA_EXTENSION = '.1.json' QUERY_TYPES = { 'XPATH': 'xpath', 'SQL': 'sql', 'SQL2': 'JCR-SQL2' } class CrxException(ValueError): pass class CrxNodeNotFound(CrxException): def __init__(self, path: str, response: Response): self.path = path self.response = response class CrxCantDeleteAsset(CrxException): def __init__(self, response_body: str, message: str): super(CrxCantDeleteAsset, self).__init__(message) self.response = response_body class Connection: def __init__(self, host: str = 'localhost', port: int = 4502, protocol: str = 'http', root: str = CRX_SERVER_ROOT, query: str = CRX_QUERY, image_references: str = WCM_REFERENCES, wcm_replicate: str = WCM_REPLICATE): self._protocol = protocol self._host = f'{protocol}://{host}:{port}' self._data_root = self._host + root self._query_path = self._host + query self._image_references = self._host + image_references self._wcm_replicate = self._host + wcm_replicate self._session = Session() self._patch_builder: Optional[PatchBuilder] = None def login_basic(self, username: str, password: str): """ Set the credentials to use for this connection. Args: username: The username to use password: The password to use """ self._session.auth = (username, password) def proxy(self, proxy: str): if proxy: self._session.proxies[self._protocol] = proxy elif self._session.proxies[self._protocol]: del self._session.proxies[self._protocol] def query(self, query: str, query_type: str = 'SQL2', raise_on_error: bool = True) -> List[str]: """ Perform an query and return the matching paths. Query may be an XPATH, SQL or SQL2 Query Args: query: The query to perform query_type: The type of the query (defaults to SQL2) Returns: The matching paths of the query """ response = self._session.get(self._query_path, params={ '_charset': 'utf-8', 'type': QUERY_TYPES.get(query_type, query_type), 'stmt': query, 'showResults': 'true' }) data = response.json() # TODO check for error if not data['success']: raise ValueError(data['errorMessage']) return list(map(lambda node: node['path'], data['results'])) def get_image_references(self, path: str): """ Find all image references for a given image resource. This uses the DAM Asset Manager > Image > File References tap's backend Args: path: The path of the image to check (no the rendition) Returns: The references of the image (see Chrome/Firefox developer tab for details) """ response = self._session.get(self._image_references, params={ 'path': path }) return response.json()['pages'] def get_page_references(self, page_path: str): """ Check other item are referenced by the current page. Args: page_path: The page to check Returns: A list of dictionaries that represent the referenced items Examples: session.get_page_references('/content/....') [{ "type": "asset", "path": "/content/dam/beeldbank/jong-koppel-leest-liggend-op-de-vloer-met-voeten-op-de-bank.jpg", "name": "jong-koppel-leest-liggend-op-de-vloer-met-voeten-op-de-bank.jpg", "published": False, "outdated": False, "status": "not available", "disabled": False, "lastPublished": 0, "lastModified": 1552398212196 }] """ response = self._session.get(self._host + WCM_PAGE_REFERENCES, params={'path': page_path}) return response.json()['assets'] def upload_asset(self, dam_directory: str, filename: str, data: bytes, content_type: str): """ Upload an asset to the DAM as if it was uploaded through the GUI Args: dam_directory: The directory to upload to (including /content/dam) filename: The file name of the asset (no path or anything) data: The content of the asset content_type: The content type of the asset Raises: When an error occurs, Request will raise an error for the incorrect status code """"" url: str = self._host + dam_directory + CREATE_ASSET files = { 'file': (filename, data, content_type), 'fileName': filename, '_charset_': 'utf-8' } resp = self._session.post(url, files=files) resp.raise_for_status() def delete_asset(self, dam_path: str, force: bool = False): """ Delete an asset to the trash. If force is False (default) don't delete it if it has remaining references Args: dam_path: The path of the asset to delete force: Whether or not to force delete it. Returns: True when the asset has been deleted Raises CrxCantDeleteAsset: When the asset can't be deleted (for example, insufficient rights or remaining references without force) """ url: str = self._host + WCM_COMMAND response = self._session.post(url, data={'path': dam_path, 'cmd': 'deletePage', 'force': json.dumps(force)}) if not response.ok: raise CrxCantDeleteAsset(response.text, response.reason) return True def get_node_raw(self, path: str): """ Get the raw JSON dictionary of a node. This is mostly an internal method. Args: path: The path of the node Returns: A dict representing the node """ url = urljoin(self._data_root, '.' + path + JSON_DATA_EXTENSION) try: response = self._session.get(url) except RequestException as exception: raise CrxException() # todo more specific exceptions if response.status_code == 404: raise CrxNodeNotFound(path, response) try: data = response.json() except ValueError: raise # todo return data def get_simple_node(self, path: str) -> SimpleNode: """ Get a Node as a `SimpleNode` object. Args: path: The path of the node Returns: The SimpleNode object for that path """ return SimpleNode(path, self.get_node_raw(path), self) def replicate(self, path: str, deactivate: bool = False): """ Replicate a page to the publish servers Args: path: The page to replicate deactivate: Deactivate instead of activate """ command = 'deactivate' if deactivate else 'activate' resp = self._session.post(self._wcm_replicate, files={'path': path, 'cmd': command}) resp.raise_for_status() def get_workflow_models(self): resp = self._session.get(self._host + WORKFLOW_LIST_MODELS) return resp.json()['workflows'] def start_workflow_path(self, model: str, path: str, comment: str = None, title: str = None): resp = self._session.post( self._host + WORKFLOW_INSTANCES, data={ '_charset_': 'utf-8', 'payloadType': 'JCR_PATH', ':status': 'browser', # ? 'payload': path, 'model': model, 'startComment': comment or '', 'workflowTitle': title or '' } ) resp.raise_for_status() def download_binary(self, path: str) -> bytes: """ Download the binary data of a node. (usually jcr:data). Usually called via `SimpleNode.download()` Args: path: The path of the node property to download Returns: The binary content of the response """ # TODO verify if it is not b64 encoded. for some reason it is in FireFox resp = self._session.get( urljoin(self._data_root, '.' + path) ) return resp.content def rename_node(self, old_path: str, new_path: str): diff = f'>{old_path} : {new_path}' resp = self._session.post(self._data_root, data={':diff': diff}) resp.raise_for_status() def start_patch_builder(self) -> PatchBuilder: self._patch_builder = PatchBuilder(self) return self._patch_builder def apply_diff(self, diff: Union[str, bytes]): files = { ':diff': ( None, diff, 'text/plain; charset=utf-8' ) } # todo check for exception resp = self._session.post(self._data_root, files=files) resp.raise_for_status() def create_package(self, name: str, group: str, version: str = '1.0') -> (bool, str): resp = self._session.post(self._host + PACKMGR_EXEC, params={'cmd': 'create'}, data={ '_charset_': 'utf-8', 'packageName': name, 'packageVersion': version, 'groupName': group }) resp.raise_for_status() data = resp.json() if not data['success']: return False, f'/etc/packages/{group}/{data["msg"]}' return True, resp.json()['path'] def update_package(self, path: str, name: str, group: str, version: str, filters: list, description: str = None): resp = self._session.post( self._host + PACKMGR_UPDATE, files={ 'path': (None, path), 'packageName': (None, name), 'groupName': (None, group), 'version': (None, version), 'filter': (None, json.dumps(filters, separators=(',', ':'))), # 'description': description or '', '_charset_': (None, 'UTF-8') } ) resp.raise_for_status() return resp.json()['path'] def get_authorizables(self, start: int = 0, user_filter: str = "", ml: int = 0, limit: int = 500, hide_groups: bool = False, hide_users: bool = False): args = { 'start': start, 'filter': user_filter, 'ml': ml, 'limit': limit, 'hideGroups': json.dumps(hide_groups), 'hideUsers': json.dumps(hide_users), } url = self._host + SECURITY_AUTHORIZABLES resp = self._session.get(url, params=args) return resp.json()['authorizables'] def add_remove_group(self, path: str, group_name: str): args = { 'memberAction': (None, 'memberOf'), 'memberEntry': (None, group_name), } url = self._host + path return self._session.post(url, files=args).ok