feat: renamed Crx to crx. Added proxy support. Minor code improvements.
This commit is contained in:
7
crx/__init__.py
Normal file
7
crx/__init__.py
Normal file
@@ -0,0 +1,7 @@
|
||||
from .connection import Connection, CrxException, CrxNodeNotFound, CrxCantDeleteAsset
|
||||
from .simplenode import SimpleNode
|
||||
from .util import get_simple_con
|
||||
|
||||
__version__ = (1, 0, 0, 0)
|
||||
|
||||
__all__ = ["Connection", "CrxException", "CrxNodeNotFound", "SimpleNode", "get_simple_con", "CrxCantDeleteAsset"]
|
||||
376
crx/connection.py
Normal file
376
crx/connection.py
Normal file
@@ -0,0 +1,376 @@
|
||||
import json
|
||||
from typing import Union, List, Optional
|
||||
from urllib.parse import urljoin
|
||||
|
||||
from requests import Session, Response
|
||||
from requests.exceptions import RequestException
|
||||
|
||||
from .patchbuilder import PatchBuilder
|
||||
from .simplenode import SimpleNode
|
||||
|
||||
# TODO validation
|
||||
|
||||
"""
|
||||
http://localhost:4502/crx/de/init.jsp?_dc=1549392939742
|
||||
http://localhost:4502/crx/de/nodetypes.jsp?_dc=1549392939958
|
||||
http://localhost:4502/crx/server/crx.default/jcr%3aroot/libs.1.json?_dc=1549392123434&node=xnode-265
|
||||
http://localhost:4502/crx/de/query.jsp?_dc=1549392245191&_charset_=utf-8&type=xpath&stmt=%2Fjcr%3Aroot%2Fbin%2F%2F*%5Bjcr%3Acontains(.%2C%20%27asdf%27)%5D%20order%20by%20%40jcr%3Ascore&showResults=true
|
||||
http://app30-prd-asd.sbnl.vancis.nl:4502/bin/wcm/references.json?path=%2Fcontent%2Fdam%2Fbeeldbank%2F_0005_home_algemeen.png&predicate=wcmcontent&_charset_=utf-8
|
||||
|
||||
Download:
|
||||
http://app30-prd-asd.sbnl.vancis.nl:4502/crx/server/crx.default/jcr:root/content/dam/beeldbank/vrouw-direct-naar.jpg/jcr:content/renditions/original/jcr:content/jcr:data
|
||||
OR
|
||||
download.jsp?path=%2Fcontent%2Fdam%2Fbeeldbank%2Fvrouw-direct-naar.jpg%2Fjcr%3Acontent%2Frenditions%2Foriginal%2Fjcr%3Acontent%2Fjcr%3Adata&index=0
|
||||
|
||||
Delete asset via trash
|
||||
/bin/wcmcommand --data "path="%"2Fcontent"%"2Fdam"%"2Flandelijk"%"2Fjeugdbibliotheek15-18"%"2F615.swf&_charset_=utf-8&cmd=deletePage&force=true"
|
||||
"""
|
||||
|
||||
CRX_SERVER_ROOT = '/crx/server/crx.default/jcr:root/'
|
||||
CRX_QUERY = '/crx/de/query.jsp'
|
||||
|
||||
PACKMGR_EXEC = '/crx/packmgr/service/exec.json'
|
||||
PACKMGR_UPDATE = '/crx/packmgr/update.jsp'
|
||||
|
||||
WCM_COMMAND = '/bin/wcmcommand'
|
||||
WCM_REFERENCES = '/bin/wcm/references.json'
|
||||
WCM_PAGE_REFERENCES = '/libs/wcm/core/content/reference.json'
|
||||
WCM_REPLICATE = '/bin/replicate.json'
|
||||
|
||||
WORKFLOW_INSTANCES = '/etc/workflow/instances'
|
||||
WORKFLOW_LIST_MODELS = '/libs/cq/workflow/content/console/workflows.json'
|
||||
|
||||
CREATE_ASSET = '.createasset.html'
|
||||
|
||||
SECURITY_AUTHORIZABLES = "/bin/security/authorizables.json"
|
||||
|
||||
JSON_DATA_EXTENSION = '.1.json'
|
||||
|
||||
QUERY_TYPES = {
|
||||
'XPATH': 'xpath',
|
||||
'SQL': 'sql',
|
||||
'SQL2': 'JCR-SQL2'
|
||||
}
|
||||
|
||||
|
||||
class CrxException(ValueError):
|
||||
pass
|
||||
|
||||
|
||||
class CrxNodeNotFound(CrxException):
|
||||
def __init__(self, path: str, response: Response):
|
||||
self.path = path
|
||||
self.response = response
|
||||
|
||||
|
||||
class CrxCantDeleteAsset(CrxException):
|
||||
def __init__(self, response_body: str, message: str):
|
||||
super(CrxCantDeleteAsset, self).__init__(message)
|
||||
self.response = response_body
|
||||
|
||||
|
||||
class Connection:
|
||||
def __init__(self,
|
||||
host: str = 'localhost',
|
||||
port: int = 4502,
|
||||
protocol: str = 'http',
|
||||
root: str = CRX_SERVER_ROOT,
|
||||
query: str = CRX_QUERY,
|
||||
image_references: str = WCM_REFERENCES,
|
||||
wcm_replicate: str = WCM_REPLICATE):
|
||||
self._protocol = protocol
|
||||
self._host = f'{protocol}://{host}:{port}'
|
||||
self._data_root = self._host + root
|
||||
self._query_path = self._host + query
|
||||
self._image_references = self._host + image_references
|
||||
self._wcm_replicate = self._host + wcm_replicate
|
||||
|
||||
self._session = Session()
|
||||
|
||||
self._patch_builder: Optional[PatchBuilder] = None
|
||||
|
||||
def login_basic(self, username: str, password: str):
|
||||
"""
|
||||
Set the credentials to use for this connection.
|
||||
|
||||
Args:
|
||||
username: The username to use
|
||||
password: The password to use
|
||||
"""
|
||||
self._session.auth = (username, password)
|
||||
|
||||
def proxy(self, proxy: str):
|
||||
if proxy:
|
||||
self._session.proxies[self._protocol] = proxy
|
||||
elif self._session.proxies[self._protocol]:
|
||||
del self._session.proxies[self._protocol]
|
||||
|
||||
def query(self, query: str, query_type: str = 'SQL2', raise_on_error: bool = True) -> List[str]:
|
||||
"""
|
||||
Perform an query and return the matching paths.
|
||||
Query may be an XPATH, SQL or SQL2 Query
|
||||
|
||||
Args:
|
||||
query: The query to perform
|
||||
query_type: The type of the query (defaults to SQL2)
|
||||
|
||||
Returns:
|
||||
The matching paths of the query
|
||||
"""
|
||||
response = self._session.get(self._query_path, params={
|
||||
'_charset': 'utf-8',
|
||||
'type': QUERY_TYPES.get(query_type, query_type),
|
||||
'stmt': query,
|
||||
'showResults': 'true'
|
||||
})
|
||||
data = response.json()
|
||||
|
||||
# TODO check for error
|
||||
if not data['success']:
|
||||
raise ValueError(data['errorMessage'])
|
||||
|
||||
return list(map(lambda node: node['path'], data['results']))
|
||||
|
||||
def get_image_references(self, path: str):
|
||||
"""
|
||||
Find all image references for a given image resource.
|
||||
This uses the DAM Asset Manager > Image > File References tap's backend
|
||||
|
||||
Args:
|
||||
path: The path of the image to check (no the rendition)
|
||||
|
||||
Returns:
|
||||
The references of the image (see Chrome/Firefox developer tab for details)
|
||||
"""
|
||||
response = self._session.get(self._image_references, params={
|
||||
'path': path
|
||||
})
|
||||
return response.json()['pages']
|
||||
|
||||
def get_page_references(self, page_path: str):
|
||||
"""
|
||||
Check other item are referenced by the current page.
|
||||
|
||||
Args:
|
||||
page_path: The page to check
|
||||
|
||||
Returns:
|
||||
A list of dictionaries that represent the referenced items
|
||||
|
||||
Examples:
|
||||
session.get_page_references('/content/....')
|
||||
[{
|
||||
"type": "asset",
|
||||
"path": "/content/dam/beeldbank/jong-koppel-leest-liggend-op-de-vloer-met-voeten-op-de-bank.jpg",
|
||||
"name": "jong-koppel-leest-liggend-op-de-vloer-met-voeten-op-de-bank.jpg",
|
||||
"published": False,
|
||||
"outdated": False,
|
||||
"status": "not available",
|
||||
"disabled": False,
|
||||
"lastPublished": 0,
|
||||
"lastModified": 1552398212196
|
||||
}]
|
||||
"""
|
||||
response = self._session.get(self._host + WCM_PAGE_REFERENCES, params={'path': page_path})
|
||||
return response.json()['assets']
|
||||
|
||||
def upload_asset(self, dam_directory: str, filename: str, data: bytes, content_type: str):
|
||||
"""
|
||||
Upload an asset to the DAM as if it was uploaded through the GUI
|
||||
|
||||
Args:
|
||||
dam_directory: The directory to upload to (including /content/dam)
|
||||
filename: The file name of the asset (no path or anything)
|
||||
data: The content of the asset
|
||||
content_type: The content type of the asset
|
||||
|
||||
Raises:
|
||||
When an error occurs, Request will raise an error for the incorrect status code
|
||||
"""""
|
||||
url: str = self._host + dam_directory + CREATE_ASSET
|
||||
files = {
|
||||
'file': (filename, data, content_type),
|
||||
'fileName': filename,
|
||||
'_charset_': 'utf-8'
|
||||
}
|
||||
resp = self._session.post(url, files=files)
|
||||
resp.raise_for_status()
|
||||
|
||||
def delete_asset(self, dam_path: str, force: bool = False):
|
||||
"""
|
||||
Delete an asset to the trash. If force is False (default) don't delete it if it has remaining references
|
||||
|
||||
Args:
|
||||
dam_path: The path of the asset to delete
|
||||
force: Whether or not to force delete it.
|
||||
|
||||
Returns:
|
||||
True when the asset has been deleted
|
||||
|
||||
Raises CrxCantDeleteAsset:
|
||||
When the asset can't be deleted (for example, insufficient rights or remaining references without force)
|
||||
"""
|
||||
url: str = self._host + WCM_COMMAND
|
||||
response = self._session.post(url, data={'path': dam_path, 'cmd': 'deletePage', 'force': json.dumps(force)})
|
||||
if not response.ok:
|
||||
raise CrxCantDeleteAsset(response.text, response.reason)
|
||||
return True
|
||||
|
||||
def get_node_raw(self, path: str):
|
||||
"""
|
||||
Get the raw JSON dictionary of a node.
|
||||
This is mostly an internal method.
|
||||
|
||||
Args:
|
||||
path: The path of the node
|
||||
|
||||
Returns:
|
||||
A dict representing the node
|
||||
"""
|
||||
url = urljoin(self._data_root, '.' + path + JSON_DATA_EXTENSION)
|
||||
try:
|
||||
response = self._session.get(url)
|
||||
except RequestException as exception:
|
||||
raise CrxException() # todo more specific exceptions
|
||||
|
||||
if response.status_code == 404:
|
||||
raise CrxNodeNotFound(path, response)
|
||||
|
||||
try:
|
||||
data = response.json()
|
||||
except ValueError:
|
||||
raise # todo
|
||||
|
||||
return data
|
||||
|
||||
def get_simple_node(self, path: str) -> SimpleNode:
|
||||
"""
|
||||
Get a Node as a `SimpleNode` object.
|
||||
|
||||
Args:
|
||||
path: The path of the node
|
||||
|
||||
Returns:
|
||||
The SimpleNode object for that path
|
||||
"""
|
||||
return SimpleNode(path, self.get_node_raw(path), self)
|
||||
|
||||
def replicate(self, path: str, deactivate: bool = False):
|
||||
"""
|
||||
Replicate a page to the publish servers
|
||||
|
||||
Args:
|
||||
path: The page to replicate
|
||||
deactivate: Deactivate instead of activate
|
||||
"""
|
||||
command = 'deactivate' if deactivate else 'activate'
|
||||
resp = self._session.post(self._wcm_replicate, files={'path': path, 'cmd': command})
|
||||
resp.raise_for_status()
|
||||
|
||||
def get_workflow_models(self):
|
||||
resp = self._session.get(self._host + WORKFLOW_LIST_MODELS)
|
||||
return resp.json()['workflows']
|
||||
|
||||
def start_workflow_path(self, model: str, path: str, comment: str = None, title: str = None):
|
||||
resp = self._session.post(
|
||||
self._host + WORKFLOW_INSTANCES,
|
||||
data={
|
||||
'_charset_': 'utf-8',
|
||||
'payloadType': 'JCR_PATH',
|
||||
':status': 'browser', # ?
|
||||
'payload': path,
|
||||
'model': model,
|
||||
'startComment': comment or '',
|
||||
'workflowTitle': title or ''
|
||||
}
|
||||
)
|
||||
resp.raise_for_status()
|
||||
|
||||
def download_binary(self, path: str) -> bytes:
|
||||
"""
|
||||
Download the binary data of a node. (usually jcr:data).
|
||||
Usually called via `SimpleNode.download()`
|
||||
|
||||
Args:
|
||||
path: The path of the node property to download
|
||||
|
||||
Returns:
|
||||
The binary content of the response
|
||||
"""
|
||||
# TODO verify if it is not b64 encoded. for some reason it is in FireFox
|
||||
resp = self._session.get(
|
||||
urljoin(self._data_root, '.' + path)
|
||||
)
|
||||
return resp.content
|
||||
|
||||
def rename_node(self, old_path: str, new_path: str):
|
||||
diff = f'>{old_path} : {new_path}'
|
||||
resp = self._session.post(self._data_root, data={':diff': diff})
|
||||
resp.raise_for_status()
|
||||
|
||||
def start_patch_builder(self) -> PatchBuilder:
|
||||
self._patch_builder = PatchBuilder(self)
|
||||
return self._patch_builder
|
||||
|
||||
def apply_diff(self, diff: Union[str, bytes]):
|
||||
files = {
|
||||
':diff': (
|
||||
None,
|
||||
diff,
|
||||
'text/plain; charset=utf-8'
|
||||
)
|
||||
}
|
||||
# todo check for exception
|
||||
resp = self._session.post(self._data_root, files=files)
|
||||
resp.raise_for_status()
|
||||
|
||||
def create_package(self, name: str, group: str, version: str = '1.0') -> (bool, str):
|
||||
resp = self._session.post(self._host + PACKMGR_EXEC, params={'cmd': 'create'}, data={
|
||||
'_charset_': 'utf-8',
|
||||
'packageName': name,
|
||||
'packageVersion': version,
|
||||
'groupName': group
|
||||
})
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
if not data['success']:
|
||||
return False, f'/etc/packages/{group}/{data["msg"]}'
|
||||
return True, resp.json()['path']
|
||||
|
||||
def update_package(self, path: str, name: str, group: str, version: str, filters: list, description: str = None):
|
||||
resp = self._session.post(
|
||||
self._host + PACKMGR_UPDATE,
|
||||
files={
|
||||
'path': (None, path),
|
||||
'packageName': (None, name),
|
||||
'groupName': (None, group),
|
||||
'version': (None, version),
|
||||
'filter': (None, json.dumps(filters, separators=(',', ':'))),
|
||||
# 'description': description or '',
|
||||
'_charset_': (None, 'UTF-8')
|
||||
}
|
||||
)
|
||||
resp.raise_for_status()
|
||||
return resp.json()['path']
|
||||
|
||||
def get_authorizables(self, start: int = 0, user_filter: str = "", ml: int = 0, limit: int = 500, hide_groups: bool = False, hide_users: bool = False):
|
||||
args = {
|
||||
'start': start,
|
||||
'filter': user_filter,
|
||||
'ml': ml,
|
||||
'limit': limit,
|
||||
'hideGroups': json.dumps(hide_groups),
|
||||
'hideUsers': json.dumps(hide_users),
|
||||
}
|
||||
url = self._host + SECURITY_AUTHORIZABLES
|
||||
resp = self._session.get(url, params=args)
|
||||
|
||||
return resp.json()['authorizables']
|
||||
|
||||
def add_remove_group(self, path: str, group_name: str):
|
||||
args = {
|
||||
'memberAction': (None, 'memberOf'),
|
||||
'memberEntry': (None, group_name),
|
||||
}
|
||||
url = self._host + path
|
||||
return self._session.post(url, files=args).ok
|
||||
28
crx/keepassplugin.py
Normal file
28
crx/keepassplugin.py
Normal file
@@ -0,0 +1,28 @@
|
||||
import sys
|
||||
import gzip
|
||||
import base64
|
||||
from lxml import etree
|
||||
|
||||
if sys.platform == 'win32':
|
||||
iswin = True
|
||||
import win32crypt
|
||||
else:
|
||||
iswin = False
|
||||
win32crypt = None
|
||||
|
||||
|
||||
ADDITIONAL_ENTROPY = b'\xf8\x03\xfaQ\x87\x18I]'
|
||||
PREFIX = 'data:application/vnd.KeePass.Entries-E;base64,'
|
||||
|
||||
|
||||
def parse_keepass_entry(data: str):
|
||||
if not data.startswith(PREFIX):
|
||||
raise ValueError("Not a valid entry")
|
||||
|
||||
data = base64.b64decode(data.split(','[1]))
|
||||
|
||||
if iswin:
|
||||
data = win32crypt.CryptUnprotectData(data, ADDITIONAL_ENTROPY)
|
||||
|
||||
data = gzip.decompress(data)
|
||||
return data.decode() # XML String
|
||||
95
crx/node.py
Normal file
95
crx/node.py
Normal file
@@ -0,0 +1,95 @@
|
||||
from copy import copy, deepcopy
|
||||
import json
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
def parse_iso_date(date: str) -> datetime:
|
||||
return datetime.strptime(date, '%Y-%m-%dT%H:%M:%S.000%z')
|
||||
|
||||
|
||||
def fmt_iso_date(date: datetime) -> str:
|
||||
return date.strftime('%Y-%m-%dT%H:%M:%S.000%z')
|
||||
|
||||
|
||||
PROPERTY_DEFAULT = {
|
||||
int: 'LONG',
|
||||
str: 'STRING',
|
||||
float: 'DOUBLE',
|
||||
}
|
||||
|
||||
"""
|
||||
type: [external, simple, complex]
|
||||
content-type[complex]:
|
||||
"""
|
||||
|
||||
PROPERTY_TYPES = {
|
||||
'BINARY': {
|
||||
'type': 'complex',
|
||||
'content-type': 'jcr-value/binary'
|
||||
},
|
||||
'BOOLEAN': {
|
||||
'type': 'simple',
|
||||
'serialize': json.dumps
|
||||
},
|
||||
'DATE': {
|
||||
'type': 'complex',
|
||||
'content-type': 'jcr-value/date',
|
||||
'serialize': fmt_iso_date,
|
||||
'deserialize': parse_iso_date
|
||||
},
|
||||
'DECIMAL': {
|
||||
'type': 'complex',
|
||||
'content-type': 'jcr-value/decimal',
|
||||
'serialize': str,
|
||||
},
|
||||
'DOUBLE': {
|
||||
'type': 'simple',
|
||||
'serialize': str,
|
||||
},
|
||||
'LONG': {
|
||||
'type': 'simple',
|
||||
'serialize': str,
|
||||
},
|
||||
'NAME': {},
|
||||
'PATH': {},
|
||||
'REFERENCE': {},
|
||||
'STRING': {
|
||||
'type': 'simple',
|
||||
'serialize': json.dumps,
|
||||
'deserialize': json.loads
|
||||
},
|
||||
'UNDEFINED': {},
|
||||
'URI': {},
|
||||
'WEAKREFERENCE': {},
|
||||
}
|
||||
|
||||
|
||||
class Property:
|
||||
pass
|
||||
|
||||
|
||||
class Node:
|
||||
def __init__(self, path: str, primary_type: str, is_new: bool = True, data: dict = None):
|
||||
# TODO validate path and type
|
||||
self.path = path
|
||||
self.data = data or {}
|
||||
self.data['jcr:primaryType'] = primary_type
|
||||
self.original_path = copy(path)
|
||||
self.original_data = deepcopy(data)
|
||||
self.is_new = is_new
|
||||
|
||||
def _build_diff(self) -> str:
|
||||
if self.is_new:
|
||||
return self._build_diff_new()
|
||||
else:
|
||||
return self._build_diff_change()
|
||||
|
||||
def _build_diff_new(self) -> str:
|
||||
data = []
|
||||
ptype_dict = {'jcr:primaryType': self.data['jcr:primaryType']}
|
||||
data.append(f'+{self.path} : {json.dumps(ptype_dict)}')
|
||||
for key, value in self.data:
|
||||
if key == 'jcr:primaryType':
|
||||
continue # primaryType is handled differently
|
||||
data.append(f'^{self.path}/{key} : {json.dumps(value)}')
|
||||
return '\n'.join(data) + '\n'
|
||||
79
crx/patchbuilder.py
Normal file
79
crx/patchbuilder.py
Normal file
@@ -0,0 +1,79 @@
|
||||
from typing import Dict, List, Union, TYPE_CHECKING
|
||||
from logging import Logger
|
||||
import json
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .connection import Connection
|
||||
from .simplenode import SimpleNode
|
||||
|
||||
LOGGER = Logger(__name__)
|
||||
|
||||
|
||||
def _build_patch_set(change: dict) -> str:
|
||||
line = ['^', change['path'], ' : ']
|
||||
if change['type'] in ('Long', 'Float', 'Boolean', 'String'):
|
||||
line.append(json.dumps(change['value']))
|
||||
else:
|
||||
raise ValueError(f'Type {change["type"]!r} is currently not supported!')
|
||||
|
||||
return ''.join(line)
|
||||
|
||||
|
||||
def _build_patch_delete(change: dict) -> str:
|
||||
return f'-{change["path"]} : '
|
||||
|
||||
|
||||
_patch_builders = {
|
||||
'set': _build_patch_set,
|
||||
'delete': _build_patch_delete,
|
||||
}
|
||||
|
||||
|
||||
class PatchBuilder:
|
||||
changes: List[Dict]
|
||||
dry_run: bool
|
||||
|
||||
def __init__(self, connection: 'Connection'):
|
||||
self.connection = connection
|
||||
self.changes = []
|
||||
self.saved = False
|
||||
self.dry_run = False
|
||||
|
||||
def set_value(self, path: str, value: Union[str, int, float], value_type: str):
|
||||
self.changes.append({
|
||||
'action': 'set',
|
||||
'path': path,
|
||||
'value': value,
|
||||
'type': value_type
|
||||
})
|
||||
|
||||
def delete_node(self, path: str):
|
||||
self.changes.append({
|
||||
'action': 'delete',
|
||||
'path': path
|
||||
})
|
||||
|
||||
def save(self):
|
||||
patch = []
|
||||
for entry in self.changes:
|
||||
patch.append(_patch_builders[entry['action']](entry))
|
||||
patch = list(filter(None, patch))
|
||||
if len(patch) == 0:
|
||||
LOGGER.warning("No patch to submit")
|
||||
return
|
||||
|
||||
if self.dry_run:
|
||||
print('\n'.join(patch))
|
||||
else:
|
||||
self.connection.apply_diff('\n'.join(patch))
|
||||
self.saved = True
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
if not self.saved:
|
||||
LOGGER.warning("Patch not saved")
|
||||
|
||||
if self.connection:
|
||||
self.connection._patch_builder = None
|
||||
69
crx/simplenode.py
Normal file
69
crx/simplenode.py
Normal file
@@ -0,0 +1,69 @@
|
||||
from typing import TYPE_CHECKING
|
||||
from logging import Logger
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .connection import Connection
|
||||
|
||||
|
||||
LOGGER = Logger(__name__)
|
||||
|
||||
|
||||
class SimpleNode:
|
||||
_normal_attrs = ['path', '_data', '_connection']
|
||||
|
||||
def __init__(self, path: str, data: dict, connection: 'Connection'):
|
||||
self.path = path
|
||||
self._data = data
|
||||
self._connection = connection
|
||||
|
||||
def download(self, key: str = 'jcr:data') -> bytes:
|
||||
if ':' + key not in self._data:
|
||||
LOGGER.warning(f"Key :{key} is not present, binary probably not available")
|
||||
size = self._data.get(':' + key)
|
||||
if not isinstance(size, int):
|
||||
LOGGER.warning("Size is not present")
|
||||
# TODO value denotes file size, warn/deny large files?
|
||||
return self._connection.download_binary(self.path + '/' + key)
|
||||
|
||||
def __getitem__(self, item):
|
||||
return getattr(self, item)
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
return self.__setattr__(key, value)
|
||||
|
||||
def __setattr__(self, key, value):
|
||||
if key in self._normal_attrs:
|
||||
return super(SimpleNode, self).__setattr__(key, value)
|
||||
if (':' + key) in self._data:
|
||||
value_type = self._data[':' + key]
|
||||
if isinstance(value_type, int):
|
||||
value_type = 'Binary'
|
||||
else:
|
||||
if isinstance(value, str):
|
||||
value_type = 'String'
|
||||
elif isinstance(value, int):
|
||||
value_type = 'Long'
|
||||
elif isinstance(value, bool):
|
||||
value_type = 'Boolean'
|
||||
else:
|
||||
raise ValueError(f"Unknown value type {type(value)!r}")
|
||||
|
||||
self._connection._patch_builder.set_value(self.path + '/' + key, value, value_type)
|
||||
|
||||
def __getattr__(self, item: str):
|
||||
try:
|
||||
return super(SimpleNode, self).__getattr__(item)
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
try:
|
||||
value = self._data.get(item)
|
||||
except KeyError:
|
||||
raise AttributeError()
|
||||
|
||||
if isinstance(value, dict):
|
||||
return self._connection.get_simple_node(self.path + '/' + item)
|
||||
return value
|
||||
|
||||
def __dir__(self):
|
||||
return super(SimpleNode, self).__dir__() + list(self._data.keys())
|
||||
7
crx/util.py
Normal file
7
crx/util.py
Normal file
@@ -0,0 +1,7 @@
|
||||
from .connection import Connection
|
||||
|
||||
|
||||
def get_simple_con():
|
||||
con = Connection()
|
||||
con.login_basic('admin', 'admin')
|
||||
return con
|
||||
Reference in New Issue
Block a user