78 lines
2.3 KiB
Python
78 lines
2.3 KiB
Python
from typing import Union
|
|
|
|
import requests
|
|
from urllib.parse import urljoin
|
|
|
|
from .simplenode import SimpleNode
|
|
|
|
|
|
# TODO validation
|
|
|
|
"""
|
|
http://localhost:4502/crx/de/init.jsp?_dc=1549392939742
|
|
http://localhost:4502/crx/de/nodetypes.jsp?_dc=1549392939958
|
|
http://localhost:4502/crx/server/crx.default/jcr%3aroot/libs.1.json?_dc=1549392123434&node=xnode-265
|
|
http://localhost:4502/crx/de/query.jsp?_dc=1549392245191&_charset_=utf-8&type=xpath&stmt=%2Fjcr%3Aroot%2Fbin%2F%2F*%5Bjcr%3Acontains(.%2C%20%27asdf%27)%5D%20order%20by%20%40jcr%3Ascore&showResults=true
|
|
"""
|
|
|
|
CRX_SERVER_ROOT = '/crx/server/crx.default/jcr:root/'
|
|
CRX_QUERY = '/crx/de/query.jsp'
|
|
|
|
JSON_DATA_EXTENSION = '.1.json'
|
|
|
|
|
|
class CrxException(ValueError):
|
|
pass
|
|
|
|
|
|
class Connection:
|
|
def __init__(self,
|
|
host: str = 'localhost',
|
|
port: int = 4502,
|
|
protocol: str = 'http',
|
|
root: str = CRX_SERVER_ROOT,
|
|
query: str = CRX_QUERY):
|
|
|
|
self.host = f'{protocol}://{host}:{port}'
|
|
self.data_root = self.host + root
|
|
self.query_path = self.host + query
|
|
|
|
self.session = requests.session()
|
|
|
|
def login_basic(self, username: str, password: str):
|
|
self.session.auth = (username, password)
|
|
|
|
def get_node_raw(self, path: str):
|
|
url = urljoin(self.data_root, '.' + path + JSON_DATA_EXTENSION)
|
|
try:
|
|
response = self.session.get(url)
|
|
except requests.exceptions.RequestException as exception:
|
|
raise CrxException() # todo more specific exceptions
|
|
|
|
try:
|
|
data = response.json()
|
|
except ValueError:
|
|
raise # todo
|
|
|
|
return data
|
|
|
|
def get_simple_node(self, path: str) -> SimpleNode:
|
|
return SimpleNode(path, self.get_node_raw(path), self)
|
|
|
|
def rename_node(self, old_path: str, new_path: str):
|
|
diff = f'>{old_path} : {new_path}'
|
|
resp = self.session.post(self.data_root, data={':diff': diff})
|
|
resp.raise_for_status()
|
|
|
|
def apply_diff(self, diff: Union[str, bytes]):
|
|
files = {
|
|
':diff': (
|
|
None,
|
|
diff,
|
|
'text/plain; charset=utf-8'
|
|
)
|
|
}
|
|
# todo check for exception
|
|
resp = self.session.post(self.data_root, files=files)
|
|
resp.raise_for_status()
|