Files
PyCrx/crx/connection.py

377 lines
13 KiB
Python

import json
from typing import Union, List, Optional
from urllib.parse import urljoin
from requests import Session, Response
from requests.exceptions import RequestException
from .patchbuilder import PatchBuilder
from .simplenode import SimpleNode
# TODO validation
"""
http://localhost:4502/crx/de/init.jsp?_dc=1549392939742
http://localhost:4502/crx/de/nodetypes.jsp?_dc=1549392939958
http://localhost:4502/crx/server/crx.default/jcr%3aroot/libs.1.json?_dc=1549392123434&node=xnode-265
http://localhost:4502/crx/de/query.jsp?_dc=1549392245191&_charset_=utf-8&type=xpath&stmt=%2Fjcr%3Aroot%2Fbin%2F%2F*%5Bjcr%3Acontains(.%2C%20%27asdf%27)%5D%20order%20by%20%40jcr%3Ascore&showResults=true
http://app30-prd-asd.sbnl.vancis.nl:4502/bin/wcm/references.json?path=%2Fcontent%2Fdam%2Fbeeldbank%2F_0005_home_algemeen.png&predicate=wcmcontent&_charset_=utf-8
Download:
http://app30-prd-asd.sbnl.vancis.nl:4502/crx/server/crx.default/jcr:root/content/dam/beeldbank/vrouw-direct-naar.jpg/jcr:content/renditions/original/jcr:content/jcr:data
OR
download.jsp?path=%2Fcontent%2Fdam%2Fbeeldbank%2Fvrouw-direct-naar.jpg%2Fjcr%3Acontent%2Frenditions%2Foriginal%2Fjcr%3Acontent%2Fjcr%3Adata&index=0
Delete asset via trash
/bin/wcmcommand --data "path="%"2Fcontent"%"2Fdam"%"2Flandelijk"%"2Fjeugdbibliotheek15-18"%"2F615.swf&_charset_=utf-8&cmd=deletePage&force=true"
"""
CRX_SERVER_ROOT = '/crx/server/crx.default/jcr:root/'
CRX_QUERY = '/crx/de/query.jsp'
PACKMGR_EXEC = '/crx/packmgr/service/exec.json'
PACKMGR_UPDATE = '/crx/packmgr/update.jsp'
WCM_COMMAND = '/bin/wcmcommand'
WCM_REFERENCES = '/bin/wcm/references.json'
WCM_PAGE_REFERENCES = '/libs/wcm/core/content/reference.json'
WCM_REPLICATE = '/bin/replicate.json'
WORKFLOW_INSTANCES = '/etc/workflow/instances'
WORKFLOW_LIST_MODELS = '/libs/cq/workflow/content/console/workflows.json'
CREATE_ASSET = '.createasset.html'
SECURITY_AUTHORIZABLES = "/bin/security/authorizables.json"
JSON_DATA_EXTENSION = '.1.json'
QUERY_TYPES = {
'XPATH': 'xpath',
'SQL': 'sql',
'SQL2': 'JCR-SQL2'
}
class CrxException(ValueError):
pass
class CrxNodeNotFound(CrxException):
def __init__(self, path: str, response: Response):
self.path = path
self.response = response
class CrxCantDeleteAsset(CrxException):
def __init__(self, response_body: str, message: str):
super(CrxCantDeleteAsset, self).__init__(message)
self.response = response_body
class Connection:
def __init__(self,
host: str = 'localhost',
port: int = 4502,
protocol: str = 'http',
root: str = CRX_SERVER_ROOT,
query: str = CRX_QUERY,
image_references: str = WCM_REFERENCES,
wcm_replicate: str = WCM_REPLICATE):
self._protocol = protocol
self._host = f'{protocol}://{host}:{port}'
self._data_root = self._host + root
self._query_path = self._host + query
self._image_references = self._host + image_references
self._wcm_replicate = self._host + wcm_replicate
self._session = Session()
self._patch_builder: Optional[PatchBuilder] = None
def login_basic(self, username: str, password: str):
"""
Set the credentials to use for this connection.
Args:
username: The username to use
password: The password to use
"""
self._session.auth = (username, password)
def proxy(self, proxy: str):
if proxy:
self._session.proxies[self._protocol] = proxy
elif self._session.proxies[self._protocol]:
del self._session.proxies[self._protocol]
def query(self, query: str, query_type: str = 'SQL2', raise_on_error: bool = True) -> List[str]:
"""
Perform an query and return the matching paths.
Query may be an XPATH, SQL or SQL2 Query
Args:
query: The query to perform
query_type: The type of the query (defaults to SQL2)
Returns:
The matching paths of the query
"""
response = self._session.get(self._query_path, params={
'_charset': 'utf-8',
'type': QUERY_TYPES.get(query_type, query_type),
'stmt': query,
'showResults': 'true'
})
data = response.json()
# TODO check for error
if not data['success']:
raise ValueError(data['errorMessage'])
return list(map(lambda node: node['path'], data['results']))
def get_image_references(self, path: str):
"""
Find all image references for a given image resource.
This uses the DAM Asset Manager > Image > File References tap's backend
Args:
path: The path of the image to check (no the rendition)
Returns:
The references of the image (see Chrome/Firefox developer tab for details)
"""
response = self._session.get(self._image_references, params={
'path': path
})
return response.json()['pages']
def get_page_references(self, page_path: str):
"""
Check other item are referenced by the current page.
Args:
page_path: The page to check
Returns:
A list of dictionaries that represent the referenced items
Examples:
session.get_page_references('/content/....')
[{
"type": "asset",
"path": "/content/dam/beeldbank/jong-koppel-leest-liggend-op-de-vloer-met-voeten-op-de-bank.jpg",
"name": "jong-koppel-leest-liggend-op-de-vloer-met-voeten-op-de-bank.jpg",
"published": False,
"outdated": False,
"status": "not available",
"disabled": False,
"lastPublished": 0,
"lastModified": 1552398212196
}]
"""
response = self._session.get(self._host + WCM_PAGE_REFERENCES, params={'path': page_path})
return response.json()['assets']
def upload_asset(self, dam_directory: str, filename: str, data: bytes, content_type: str):
"""
Upload an asset to the DAM as if it was uploaded through the GUI
Args:
dam_directory: The directory to upload to (including /content/dam)
filename: The file name of the asset (no path or anything)
data: The content of the asset
content_type: The content type of the asset
Raises:
When an error occurs, Request will raise an error for the incorrect status code
"""""
url: str = self._host + dam_directory + CREATE_ASSET
files = {
'file': (filename, data, content_type),
'fileName': filename,
'_charset_': 'utf-8'
}
resp = self._session.post(url, files=files)
resp.raise_for_status()
def delete_asset(self, dam_path: str, force: bool = False):
"""
Delete an asset to the trash. If force is False (default) don't delete it if it has remaining references
Args:
dam_path: The path of the asset to delete
force: Whether or not to force delete it.
Returns:
True when the asset has been deleted
Raises CrxCantDeleteAsset:
When the asset can't be deleted (for example, insufficient rights or remaining references without force)
"""
url: str = self._host + WCM_COMMAND
response = self._session.post(url, data={'path': dam_path, 'cmd': 'deletePage', 'force': json.dumps(force)})
if not response.ok:
raise CrxCantDeleteAsset(response.text, response.reason)
return True
def get_node_raw(self, path: str):
"""
Get the raw JSON dictionary of a node.
This is mostly an internal method.
Args:
path: The path of the node
Returns:
A dict representing the node
"""
url = urljoin(self._data_root, '.' + path + JSON_DATA_EXTENSION)
try:
response = self._session.get(url)
except RequestException as exception:
raise CrxException() # todo more specific exceptions
if response.status_code == 404:
raise CrxNodeNotFound(path, response)
try:
data = response.json()
except ValueError:
raise # todo
return data
def get_simple_node(self, path: str) -> SimpleNode:
"""
Get a Node as a `SimpleNode` object.
Args:
path: The path of the node
Returns:
The SimpleNode object for that path
"""
return SimpleNode(path, self.get_node_raw(path), self)
def replicate(self, path: str, deactivate: bool = False):
"""
Replicate a page to the publish servers
Args:
path: The page to replicate
deactivate: Deactivate instead of activate
"""
command = 'deactivate' if deactivate else 'activate'
resp = self._session.post(self._wcm_replicate, files={'path': path, 'cmd': command})
resp.raise_for_status()
def get_workflow_models(self):
resp = self._session.get(self._host + WORKFLOW_LIST_MODELS)
return resp.json()['workflows']
def start_workflow_path(self, model: str, path: str, comment: str = None, title: str = None):
resp = self._session.post(
self._host + WORKFLOW_INSTANCES,
data={
'_charset_': 'utf-8',
'payloadType': 'JCR_PATH',
':status': 'browser', # ?
'payload': path,
'model': model,
'startComment': comment or '',
'workflowTitle': title or ''
}
)
resp.raise_for_status()
def download_binary(self, path: str) -> bytes:
"""
Download the binary data of a node. (usually jcr:data).
Usually called via `SimpleNode.download()`
Args:
path: The path of the node property to download
Returns:
The binary content of the response
"""
# TODO verify if it is not b64 encoded. for some reason it is in FireFox
resp = self._session.get(
urljoin(self._data_root, '.' + path)
)
return resp.content
def rename_node(self, old_path: str, new_path: str):
diff = f'>{old_path} : {new_path}'
resp = self._session.post(self._data_root, data={':diff': diff})
resp.raise_for_status()
def start_patch_builder(self) -> PatchBuilder:
self._patch_builder = PatchBuilder(self)
return self._patch_builder
def apply_diff(self, diff: Union[str, bytes]):
files = {
':diff': (
None,
diff,
'text/plain; charset=utf-8'
)
}
# todo check for exception
resp = self._session.post(self._data_root, files=files)
resp.raise_for_status()
def create_package(self, name: str, group: str, version: str = '1.0') -> (bool, str):
resp = self._session.post(self._host + PACKMGR_EXEC, params={'cmd': 'create'}, data={
'_charset_': 'utf-8',
'packageName': name,
'packageVersion': version,
'groupName': group
})
resp.raise_for_status()
data = resp.json()
if not data['success']:
return False, f'/etc/packages/{group}/{data["msg"]}'
return True, resp.json()['path']
def update_package(self, path: str, name: str, group: str, version: str, filters: list, description: str = None):
resp = self._session.post(
self._host + PACKMGR_UPDATE,
files={
'path': (None, path),
'packageName': (None, name),
'groupName': (None, group),
'version': (None, version),
'filter': (None, json.dumps(filters, separators=(',', ':'))),
# 'description': description or '',
'_charset_': (None, 'UTF-8')
}
)
resp.raise_for_status()
return resp.json()['path']
def get_authorizables(self, start: int = 0, user_filter: str = "", ml: int = 0, limit: int = 500, hide_groups: bool = False, hide_users: bool = False):
args = {
'start': start,
'filter': user_filter,
'ml': ml,
'limit': limit,
'hideGroups': json.dumps(hide_groups),
'hideUsers': json.dumps(hide_users),
}
url = self._host + SECURITY_AUTHORIZABLES
resp = self._session.get(url, params=args)
return resp.json()['authorizables']
def add_remove_group(self, path: str, group_name: str):
args = {
'memberAction': (None, 'memberOf'),
'memberEntry': (None, group_name),
}
url = self._host + path
return self._session.post(url, files=args).ok