Newer
Older
from __future__ import unicode_literals # support both Python2 and 3
This code implements the PyOTRS library to provide access to the OTRS API (REST)
log = logging.getLogger(__name__)
TICKET_CONNECTOR_CONFIG_DEFAULT = {
'Config': {
'SessionCreate': {'RequestMethod': 'POST',
'Route': '/Session',
'Result': 'SessionID'},
'TicketCreate': {'RequestMethod': 'POST',
'Route': '/Ticket',
'Result': 'TicketID'},
'TicketGet': {'RequestMethod': 'GET',
'Route': '/Ticket/:TicketID',
'TicketGetList': {'RequestMethod': 'GET',
'Route': '/TicketList',
'Result': 'Ticket'},
'TicketSearch': {'RequestMethod': 'GET',
'Route': '/Ticket',
'Result': 'TicketID'},
'TicketUpdate': {'RequestMethod': 'PATCH',
'Route': '/Ticket/:TicketID',
'Result': 'TicketID'},
}
}
FAQ_CONNECTOR_CONFIG_DEFAULT = {
'Name': 'GenericFAQConnectorREST',
'Config': {
'LanguageList': {'RequestMethod': 'GET',
'Route': '/LanguageList',
'Result': 'Language'},
'PublicCategoryList': {'RequestMethod': 'GET',
'Route': '/PublicCategoryList',
'Result': 'Category'},
'PublicFAQGet': {'RequestMethod': 'GET',
'Route': '/PublicFAQGet',
'Result': 'FAQItem'},
'PublicFAQSearch': {'RequestMethod': 'POST',
'Route': '/PublicFAQSearch',
'Result': 'ID'},
}
}
LINK_CONNECTOR_CONFIG_DEFAULT = {
'Name': 'GenericLinkConnectorREST',
'Config': {
'LinkAdd': {'RequestMethod': 'POST',
'Route': '/LinkAdd',
'Result': 'LinkAdd'},
'LinkDelete': {'RequestMethod': 'DELETE',
'Route': '/LinkDelete',
'Result': 'LinkDelete'},
'LinkDeleteAll': {'RequestMethod': 'DELETE',
'Route': '/LinkDeleteAll',
'Result': 'LinkDeleteAll'},
'LinkList': {'RequestMethod': 'GET',
'Route': '/LinkList',
'Result': 'LinkList'},
'PossibleLinkList': {'RequestMethod': 'GET',
'Route': '/PossibleLinkList',
'Result': 'PossibleLinkList'},
'PossibleObjectsList': {'RequestMethod': 'GET',
'Route': '/PossibleObjectsList',
'Result': 'PossibleObject'},
'PossibleTypesList': {'RequestMethod': 'GET',
'Route': '/PossibleTypesList',
'Result': 'PossibleType'},
}
class PyOTRSError(Exception):
def __init__(self, message):
super(PyOTRSError, self).__init__(message)
self.message = message
pass
class SessionCreateError(PyOTRSError):
pass
class SessionNotCreated(PyOTRSError):
pass
"""PyOTRS Article class """
fields = {}
for key, value in dct.items():
fields.update({key: dct[key]})
try:
self.aid = int(fields.get("ArticleID"))
except TypeError:
self.aid = 0
self.fields = fields
self.attachments = self._parse_attachments()
self.dynamic_fields = self._parse_dynamic_fields()
if self.aid != 0:
return "<ArticleID: {1}>".format(self.__class__.__name__, self.aid)
elif _len == 1:
return "<ArticleID: {1} (1 Attachment)>".format(self.__class__.__name__,
return "<ArticleID: {1} ({2} Attachments)>".format(self.__class__.__name__,
else:
return "<{0}>".format(self.__class__.__name__)
def to_dct(self, attachments=True, attachment_cont=True, dynamic_fields=True):
attachments (bool): if True will include, otherwise exclude:
"Attachment" (default: True)
attachment_cont (bool): if True will include, otherwise exclude:
"Attachment" > "Content" (default: True)
dynamic_fields (bool): if True will include, otherwise exclude:
if self.attachments:
dct.update({"Attachment": [x.to_dct(content=attachment_cont) for x in
self.attachments]})
if self.dynamic_fields:
dct.update({"DynamicField": [x.to_dct() for x in self.dynamic_fields]})
"""parse Attachment from Ticket and return as **list** of **Attachment** objects"""
lst = self.fields.get("Attachment")
if lst:
return [Attachment(item) for item in lst]
else:
return []
def _parse_dynamic_fields(self):
"""parse DynamicField from Ticket and return as **list** of **DynamicField** objects"""
lst = self.fields.get("DynamicField")
if lst:
return [DynamicField.from_dct(item) for item in lst]
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
def attachment_get(self, a_filename):
"""attachment_get
Args:
a_filename (str): Filename of Attachment to retrieve
Returns:
**Attachment** or **None**
"""
result = [x for x in self.attachments if x.Filename == "{0}".format(a_filename)]
if result:
return result[0]
else:
return None
def dynamic_field_get(self, df_name):
"""dynamic_field_get
Args:
df_name (str): Name of DynamicField to retrieve
Returns:
**DynamicField** or **None**
"""
result = [x for x in self.dynamic_fields if x.name == "{0}".format(df_name)]
if result:
return result[0]
else:
return None
def field_get(self, f_name):
return self.fields.get(f_name)
def validate(self, validation_map=None):
"""validate data against a mapping dict - if a key is not present
then set it with a default value according to dict
validation_map (dict): A mapping for all Article fields that have to be set. During
validation every required field that is not set will be set to a default value
specified in this dict.
.. note::
There is also a blacklist (fields to be removed) but this is currently
hardcoded to *dynamic_fields* and *attachments*.
if not validation_map:
validation_map = {"Body": "API created Article Body",
"Charset": "UTF8",
"MimeType": "text/plain",
"Subject": "API created Article",
"TimeUnit": 0}
for key, value in validation_map.items():
if not self.fields.get(key, None):
self.fields.update({key: value})
"""dummy data (for testing)
return Article({"Subject": "Dümmy Subject",
"Body": "Hallo Bjørn,\n[kt]\n\n -- The End",
"TimeUnit": 0,
"MimeType": "text/plain",
"Charset": "UTF8"})
@classmethod
"""dummy data (for testing)
return Article({"Subject": "Dümmy Subject",
"Body": "Hallo Bjørn,\n[kt]\n\n -- The End",
"TimeUnit": 0,
"MimeType": "text/plain",
"Charset": "UTF8",
"ForceNotificationToUserID": [1, 2]})
class Attachment(object):
"""PyOTRS Attachment class """
def __init__(self, dct):
self.__dict__ = dct
def __repr__(self):
if hasattr(self, 'Filename'):
return "<{0}: {1}>".format(self.__class__.__name__, self.Filename)
else:
return "<{0}>".format(self.__class__.__name__)
"""represent Attachment object as dict
Args:
content (bool): if True will include, otherwise exclude: "Content" (default: True)
Returns:
"""
dct = self.__dict__
if content:
return dct
else:
dct.pop("Content")
return dct
@classmethod
def create_basic(cls, Content=None, ContentType=None, Filename=None):
"""create a basic Attachment object
Args:
Content (str): base64 encoded content
ContentType (str): MIME type of content (e.g. text/plain)
Filename (str): file name (e.g. file.txt)
Returns:
**Attachment**: An Attachment object.
"""
return Attachment({'Content': Content,
'ContentType': ContentType,
'Filename': Filename})
@classmethod
def create_from_file(cls, file_path):
"""save Attachment to a folder on disc
file_path (str): The full path to the file from which an Attachment should be created.
with io.open(file_path, 'rb') as f:
content = f.read()
content_type = mimetypes.guess_type(file_path)[0]
if not content_type:
content_type = "application/octet-stream"
return Attachment({'Content': base64.b64encode(content).decode('utf-8'),
'ContentType': content_type,
'Filename': os.path.basename(file_path)})
folder (str): The directory where this attachment should be saved to.
"""
if not hasattr(self, 'Content') or not hasattr(self, 'Filename'):
raise ValueError("invalid Attachment")
file_path = os.path.join(os.path.abspath(folder), self.Filename)
with open(file_path, 'wb') as f:
f.write(base64.b64decode(self.Content))
@classmethod
"""dummy data (for testing)
return Attachment.create_basic("YmFyCg==", "text/plain", "dümmy.txt")
class DynamicField(object):
search_operator (str): Search operator (defaults to: "Equals")
Valid options are:
"Equals", "Like", "GreaterThan", "GreaterThanEquals",
"SmallerThan", "SmallerThanEquals"
search_patterns (list): List of patterns (str or datetime) to search for
.. warning::
**PyOTRS only supports OTRS 5 style!**
DynamicField representation changed between OTRS 4 and OTRS 5.
SEARCH_OPERATORS = ("Equals", "Like", "GreaterThan", "GreaterThanEquals",
"SmallerThan", "SmallerThanEquals",)
def __init__(self, name, value=None, search_patterns=None, search_operator="Equals"):
if not isinstance(search_patterns, list):
self.search_patterns = [search_patterns]
else:
self.search_patterns = search_patterns
if search_operator not in DynamicField.SEARCH_OPERATORS:
raise NotImplementedError("Invalid Operator: \"{0}\"".format(search_operator))
self.search_operator = search_operator
def __repr__(self):
return "<{0}: {1}: {2}>".format(self.__class__.__name__, self.name, self.value)
@classmethod
def from_dct(cls, dct):
"""create DynamicField from dct
Args:
dct (dict):
Returns:
"""
return cls(name=dct["Name"], value=dct["Value"])
"""represent DynamicField as dict
Returns:
def to_dct_search(self):
"""represent DynamicField as dict for search operations
Returns:
**dict**: DynamicField as dict for search operations
"""
_lst = []
for item in self.search_patterns:
if isinstance(item, datetime.datetime):
item = item.strftime("%Y-%m-%d %H:%M:%S")
_lst.append(item)
return {"DynamicField_{0}".format(self.name): {self.search_operator: _lst}}
@classmethod
"""dummy1 data (for testing)
Returns:
**DynamicField**: A list of DynamicField objects.
"""
return DynamicField(name="firstname", value="Jane")
@classmethod
"""dummy2 data (for testing)
Returns:
return DynamicField.from_dct({'Name': 'lastname', 'Value': 'Doe'})
"""PyOTRS Ticket class
Args:
tid (int): OTRS Ticket ID as integer
fields (dict): OTRS Top Level fields
articles (list): List of Article objects
dynamic_fields (list): List of DynamicField objects
Dmitry Mantis
committed
self.fields = {}
self.fields.update(dct)
Dmitry Mantis
committed
self.tid = int(self.fields.get("TicketID", 0))
self.dynamic_fields = self._parse_dynamic_fields()
Dmitry Mantis
committed
if self.tid:
return "<{0}: {1}>".format(self.__class__.__name__, self.tid)
else:
return "<{0}>".format(self.__class__.__name__)
def _parse_articles(self):
"""parse Article from Ticket and return as **list** of **Article** objects"""
Dmitry Mantis
committed
lst = self.fields.get("Article", [])
return [Article(item) for item in lst]
def _parse_dynamic_fields(self):
"""parse DynamicField from Ticket and return as **list** of **DynamicField** objects"""
Dmitry Mantis
committed
lst = self.fields.get("DynamicField", [])
return [DynamicField.from_dct(item) for item in lst]
articles (bool): if True will include, otherwise exclude:
article_attachments (bool): if True will include, otherwise exclude:
"Article" > "Attachment" (default: True)
article_attachment_cont (bool): if True will include, otherwise exclude:
"Article" > "Attachment" > "Content" (default: True)
article_dynamic_fields (bool): if True will include, otherwise exclude:
dynamic_fields (bool): if True will include, otherwise exclude:
Returns:
.. note::
Does not contain Articles or DynamicFields (currently)
"""
dct = {}
Dmitry Mantis
committed
dct.update(self.fields)
dct.update({"Article": [x.to_dct(attachments=article_attachments,
for x in self.articles]})
except AttributeError:
pass
try:
if self.dynamic_fields:
dct.update({"DynamicField": [x.to_dct() for x in self.dynamic_fields]})
except AttributeError:
pass
"""article_get
Args:
Returns:
**Article** or **None**
"""
Dmitry Mantis
committed
result = [x for x in self.articles if x.field_get("ArticleID") == str(aid)]
return result[0] if result else None
def dynamic_field_get(self, df_name):
"""dynamic_field_get
Args:
df_name (str): Name of DynamicField to retrieve
Returns:
**DynamicField** or **None**
"""
Dmitry Mantis
committed
result = [x for x in self.dynamic_fields if x.name == df_name]
return result[0] if result else None
def field_get(self, f_name):
return self.fields.get(f_name)
@classmethod
def create_basic(cls,
Title=None,
QueueID=None,
Queue=None,
StateID=None,
State=None,
PriorityID=None,
Priority=None,
"""create basic ticket
Title (str): OTRS Ticket Title
QueueID (str): OTRS Ticket QueueID (e.g. "1")
Queue (str): OTRS Ticket Queue (e.g. "raw")
TypeID (str): OTRS Ticket TypeID (e.g. "1")
Type (str): OTRS Ticket Type (e.g. "Problem")
StateID (str): OTRS Ticket StateID (e.g. "1")
State (str): OTRS Ticket State (e.g. "open" or "new")
PriorityID (str): OTRS Ticket PriorityID (e.g. "1")
Priority (str): OTRS Ticket Priority (e.g. "low")
CustomerUser (str): OTRS Ticket CustomerUser
raise ArgumentMissingError("Title is required")
raise ArgumentMissingError("Either Queue or QueueID required")
raise ArgumentMissingError("Either State or StateID required")
raise ArgumentMissingError("Either Priority or PriorityID required")
if not CustomerUser:
raise ArgumentMissingError("CustomerUser is required")
if Type and TypeID:
raise ArgumentInvalidError("Either Type or TypeID - not both")
dct.update({"Queue": Queue})
dct.update({"QueueID": QueueID})
if Type:
dct.update({"Type": Type})
if TypeID:
dct.update({"TypeID": TypeID})
dct.update({"State": State})
dct.update({"StateID": StateID})
dct.update({"Priority": Priority})
dct.update({"PriorityID": PriorityID})
dct.update({"CustomerUser": CustomerUser})
for key, value in dct.items():
dct.update({key: value})
return Ticket(dct)
@classmethod
"""dummy data (for testing)
return Ticket.create_basic(Queue=u"Raw",
State=u"open",
Priority=u"3 normal",
CustomerUser="root@localhost",
Title="Bäsic Ticket")
@staticmethod
def datetime_to_pending_time_text(datetime_object=None):
Args:
Returns:
**str**: The pending time in the format required for OTRS REST interface.
"""
"Year": datetime_object.year,
"Month": datetime_object.month,
"Day": datetime_object.day,
"Hour": datetime_object.hour,
"Minute": datetime_object.minute
}
class SessionStore(object):
"""Session ID: persistently store to and retrieve from to file
Args:
session_timeout (int): OTRS Session Timeout Value (to avoid reusing outdated session id
created (int): seconds as epoch when a session_id record was created
expires (int): seconds as epoch when a session_id record expires
def __init__(self, file_path=None, session_timeout=None,
value=None, created=None, expires=None):
raise ArgumentMissingError("Argument file_path is required!")
raise ArgumentMissingError("Argument session_timeout is required!")
self.timeout = session_timeout
self.value = value
self.created = created
self.expires = expires
return "<{0}: {1}>".format(self.__class__.__name__, self.file_path)
def read(self):
"""Retrieve a stored Session ID from file
Returns:
**str** or **None**: Retrieved Session ID or None (if none could be read)
"""
if not os.path.isfile(self.file_path):
return None
if not SessionStore._validate_file_owner_and_permissions(self.file_path):
return None
with open(self.file_path, "r") as f:
content = f.read()
try:
data = json.loads(content)
self.value = data['session_id']
self.created = datetime.datetime.utcfromtimestamp(int(data['created']))
self.expires = (self.created + datetime.timedelta(minutes=self.timeout))
if self.expires > datetime.datetime.utcnow():
return self.value # still valid
except ValueError:
return None
except KeyError:
return None
except Exception as err:
raise Exception("Exception Type: {0}: {1}".format(type(err), err))
"""Write and store a Session ID to file (rw for user only)
new_value (str): if none then empty value will be writen to file
if not SessionStore._validate_file_owner_and_permissions(self.file_path):
raise IOError("File exists but is not ok (wrong owner/permissions)!")
with open(self.file_path, 'w') as f:
f.write(json.dumps({'created': str(int(time.time())),
os.chmod(self.file_path, 384) # 384 is '0600'
# TODO 2016-04-23 (RH): check this
if not SessionStore._validate_file_owner_and_permissions(self.file_path):
raise IOError("Race condition: Something happened to file during the run!")
return True
def delete(self):
"""remove session id file (e.g. when it only contains an invalid session id
Raises:
NotImplementedError
Returns:
.. todo::
(RH) implement this _remove_session_id_file
"""
raise NotImplementedError("Not yet done")
@staticmethod
def _validate_file_owner_and_permissions(full_file_path):
"""validate SessionStore file ownership and permissions
**bool**: **True** if valid and correct, otherwise **False**...
"""
if not os.path.isfile(full_file_path):
raise IOError("Does not exist or not a file: {0}".format(full_file_path))
file_lstat = os.lstat(full_file_path)
if not file_lstat.st_uid == os.getuid():
return False
if not file_lstat.st_mode & 0o777 == 384:
""" check for unix permission User R+W only (0600)
>>> oct(384)
'0600' Python 2
>>> oct(384)
'0o600' Python 3 """
return False
return True
class Client(object):
"""PyOTRS Client class - includes Session handling
Args:
baseurl (str): Base URL for OTRS System, no trailing slash e.g. http://otrs.example.com
username (str): Username
password (str): Password
session_id_file (str): Session ID path on disc, used to persistently store Session ID
session_timeout (int): Session Timeout configured in OTRS (usually 28800 seconds = 8h)
session_validation_ticket_id (int): Ticket ID of an existing ticket - used to perform
several check - e.g. validate log in (defaults to 1)
webservice_config_ticket (dict): OTRS REST Web Service Name - Ticket Connector
webservice_config_faq (dict): OTRS REST Web Service Name - FAQ Connector
webservice_config_link (dict): OTRS REST Web Service Name - Link Connector
proxies (dict): Proxy settings - refer to requests docs for
more information - default to no proxies
https_verify (bool): Should HTTPS certificates be verified (defaults to True)
ca_cert_bundle (str): file path - if specified overrides python/system default for
Root CA bundle that will be used.
auth (tuple): e.g. ("user", "pass") - see requests documentation ("auth") for details
client_auth_cert (str): file path containing both certificate and key (unencrypted) in
PEM format to use for TLS client authentication (passed to requests as "cert")
user_agent (str): optional HTTP UserAgent string
webservice_path (str): OTRS REST Web Service Path part - defaults to
"/otrs/nph-genericinterface.pl/Webservice/"
webservice_config_ticket=None,
webservice_config_faq=None,
webservice_config_link=None,
proxies=None,
https_verify=True,
user_agent=None,
webservice_path="/otrs/nph-genericinterface.pl/Webservice/"):
self.webservice_path = webservice_path
if not session_timeout:
self.session_timeout = 28800 # 8 hours is OTRS default
else:
self.session_timeout = session_timeout
self.session_id_store = SessionStore(file_path="/tmp/.pyotrs_session_id",
session_timeout=self.session_timeout)
self.session_id_store = SessionStore(file_path=session_id_file,
session_timeout=self.session_timeout)
self.session_validation_ticket_id = session_validation_ticket_id
# A dictionary for mapping OTRS WebService operations to HTTP Method, Route and
# Result string.
if not webservice_config_ticket:
webservice_config_ticket = TICKET_CONNECTOR_CONFIG_DEFAULT
if not webservice_config_faq:
webservice_config_faq = FAQ_CONNECTOR_CONFIG_DEFAULT
if not webservice_config_link:
webservice_config_link = LINK_CONNECTOR_CONFIG_DEFAULT
self.ws_ticket = webservice_config_ticket['Name']
self.ws_faq = webservice_config_faq['Name']
self.ws_link = webservice_config_link['Name']
self.routes_ticket = [x[1]["Route"] for x in webservice_config_ticket['Config'].items()]
self.routes_faq = [x[1]["Route"] for x in webservice_config_faq['Config'].items()]
self.routes_link = [x[1]["Route"] for x in webservice_config_link['Config'].items()]
webservice_config.update(webservice_config_ticket['Config'])
webservice_config.update(webservice_config_faq['Config'])
webservice_config.update(webservice_config_link['Config'])
self.ws_config = webservice_config
if not proxies:
self.proxies = {"http": "", "https": "", "no": ""}
else:
if not isinstance(proxies, dict):
raise ValueError("Proxy settings need to be provided as dict!")
self.proxies = proxies
if https_verify:
if not ca_cert_bundle:
self.https_verify = https_verify
else:
ca_certs = os.path.abspath(ca_cert_bundle)
if not os.path.isfile(ca_certs):
raise ValueError("Certificate file does not exist: {0}".format(ca_certs))
self.https_verify = ca_certs
else:
self.https_verify = False
self.client_auth_cert = client_auth_cert
# credentials
self.username = username
self.password = password
"""
GenericInterface::Operation::Session::SessionCreate
* session_restore_or_set_up_new # try to get session_id from a (json) file on disc
def session_check_is_valid(self, session_id=None):
"""check whether session_id is currently valid
session_id (str): optional If set overrides the self.session_id
ArgumentMissingError: if session_id is not set