Newer
Older
from __future__ import unicode_literals # support both Python2 and 3
This code implements the PyOTRS library to provide access to the OTRS API (REST)
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
"""
import os
import json
import time
import datetime
import requests
from requests.packages.urllib3 import disable_warnings
# turn of platform insecurity warnings from urllib3
disable_warnings()
# TODO 2016-04-11 (RH) ca_cert, os.environ and proxies can not stay like this!
# path to certificate bundle and set to environment
ca_certs = os.path.abspath("/etc/ssl/certs/ca-certificates.crt")
os.environ["REQUESTS_CA_BUNDLE"] = ca_certs
os.environ["CURL_CA_BUNDLE"] = ca_certs
# disable any proxies
proxies = {"http": "", "https": "", "no": ""}
class PyOTRSError(Exception):
def __init__(self, message):
super(PyOTRSError, self).__init__(message)
self.message = message
class NoBaseURL(PyOTRSError):
pass
class NoWebServiceName(PyOTRSError):
pass
class NoCredentials(PyOTRSError):
pass
class ResponseJSONParseError(PyOTRSError):
pass
class SessionIDStoreNoFileError(PyOTRSError):
pass
class SessionIDStoreNoTimeoutError(PyOTRSError):
pass
class SessionError(PyOTRSError):
pass
class SessionCreateError(PyOTRSError):
pass
class SessionIDFileError(PyOTRSError):
pass
pass
class TicketDynamicFieldsParseError(PyOTRSError):
pass
class TicketSearchNothingToLookFor(PyOTRSError):
pass
class TicketSearchNumberMultipleResults(PyOTRSError):
pass
class TicketError(PyOTRSError):
pass
class OTRSAPIError(PyOTRSError):
pass
class OTRSHTTPError(PyOTRSError):
pass
class UpdateAddArticleError(PyOTRSError):
pass
"""PyOTRS Article class """
def __init__(self, dct):
for key, value in dct.items():
self.__dict__ = dct
def __repr__(self):
"""dummy data (for testing)
return Article({"Subject": "Dümmy Subject",
"Body": "Hallo Bjørn,\n[kt]\n\n -- The End",
"TimeUnit": 0,
"MimeType": "text/plain",
"Charset": "UTF8"})
@classmethod
"""dummy data (for testing)
return Article({"Subject": "Dümmy Subject",
"Body": "Hallo Bjørn,\n[kt]\n\n -- The End",
"TimeUnit": 0,
"MimeType": "text/plain",
"Charset": "UTF8",
"ForceNotificationToUserID": [1, 2]})
def to_dct(self):
return {"Article": self.__dict__}
def validate(self):
"""validate data against a mapping dict - if a key is not present
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
then set it with a default value according to dict
"""
validation_map = {"Body": "API created Article Body",
"Charset": "UTF8",
"MimeType": "text/plain",
"Subject": "API created Article",
"TimeUnit": 0}
dct = self.__dict__
for key, value in validation_map.items():
if not self.__dict__.get(key, None):
dct.update({key: value})
self.__dict__ = dct
class Attachment(object):
"""PyOTRS Attachment class """
def __init__(self, Content=None, ContentType=None, Filename=None):
self.Content = Content # base64 encoded
self.ContentType = ContentType
self.Filename = Filename
def __repr__(self):
return "<{0}: {1}>".format(self.__class__.__name__, self.Filename)
def to_dct(self):
"""represent AttachmentList and related Attachment objects as dict
Returns:
"""
return self.__dict__
@classmethod
"""dummy data (for testing)
return Attachment("YmFyCg==", "text/plain", "dümmy.txt")
class DynamicField(object):
.. warning::
DynamicField representation changed between OTRS 4 and OTRS 5.
**PyOTRS only supports OTRS 5 style!**
"""
def __init__(self, name=None, value=None, dct=None):
if dct:
if name or value:
raise Exception("Please provide either \"dct\" or \"name and value\"")
else:
if name and value:
self.name = name
self.value = value
else:
raise Exception("Please provide either \"dct\" or \"name and value\"")
def __repr__(self):
return "<{0}: {1} --> {2} >".format(self.__class__.__name__, self.name, self.value)
"""represent DynamicField as dict
Returns:
dict
@classmethod
"""dummy1 data (for testing)
Returns: DynamicField
"""
return DynamicField(name="firstname", value="Jane")
@classmethod
"""dummy2 data (for testing)
Returns:
DynamicField
return DynamicField(dct={'Name': 'lastname', 'Value': 'Doe'})
class Ticket(object):
"""PyOTRS Ticket class """
def __init__(self, dct):
self.attachment_list = []
self.dynamic_field_list = []
if isinstance(value, dict):
dct[key] = Ticket(value)
return "<{0}>".format(self.__class__.__name__)
@classmethod
def create_basic(cls,
Title=None,
QueueID=None,
Queue=None,
StateID=None,
State=None,
PriorityID=None,
Priority=None,
CustomerUser=None,
"""create basic ticket
Args:
Title:
QueueID:
Queue:
StateID:
State:
PriorityID:
Priority:
CustomerUser:
**kwargs:
Returns:
"""
if not Title:
raise ValueError("Title is required")
if not Queue and not QueueID:
raise ValueError("Either Queue or QueueID required")
if not State and not StateID:
raise ValueError("Either State or StateID required")
if not Priority and not PriorityID:
raise ValueError("Either Priority or PriorityID required")
if not CustomerUser:
raise ValueError("CustomerUser is required")
dct.update({"Queue": Queue})
dct.update({"QueueID": QueueID})
dct.update({"State": State})
dct.update({"StateID": StateID})
dct.update({"Priority": Priority})
dct.update({"PriorityID": PriorityID})
dct.update({"CustomerUser": CustomerUser})
for key, value in dct.items():
dct.update({key: value})
return Ticket(dct)
def to_dct(self):
"""represent Ticket objects as dict
return {"Ticket": self.__dict__}
@classmethod
"""dummy data (for testing)
return Ticket.create_basic(Queue=u"Raw",
State=u"open",
Priority=u"3 normal",
CustomerUser="root@localhost",
Title="Bäsic Ticket")
@staticmethod
def datetime_to_pending_time_text(datetime_obj=None):
"""datetime_to_pending_time_str
Args:
datetime_obj (Datetime):
Returns:
str: The pending time in the format required for OTRS REST interface
"""
"Year": datetime_obj.year,
"Month": datetime_obj.month,
"Day": datetime_obj.day,
"Hour": datetime_obj.hour,
"Minute": datetime_obj.minute
}
"""Session ID: persistently store to and retrieve from to file
Args:
file_path (unicode): Path on disc
session_timeout (int): OTRS Session Timeout Value (to avoid reusing outdated session id
session_id_value (unicode): A Session ID as str
session_id_created (int): seconds as epoch when a session_id record was created
session_id_expires (int): seconds as epoch when a session_id record expires
Raises:
SessionIDStoreNoFileError
SessionIDStoreNoTimeoutError
.. todo::
Is session_timeout correct here?! Or should this just return "expires" value and caller
needs to decide whether to use it..?!
"""
def __init__(self, file_path=None, session_timeout=None):
if not file_path:
raise SessionIDStoreNoFileError("Argument file_path is required!")
if not session_timeout:
raise SessionIDStoreNoTimeoutError("Argument session_timeout is required!")
self.session_timeout = session_timeout
self.session_id_value = None
self.session_id_created = None
self.session_id_expires = None
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
return "<{0}: {1}>".format(self.__class__.__name__, self.file_path)
def read(self):
"""Retrieve a stored Session ID from file
Returns:
Union(str, None): Retrieved Session ID or None (if none could be read)
"""
if not os.path.isfile(self.file_path):
return None
if not SessionIDStore._validate_file_owner_and_permissions(self.file_path):
return None
with open(self.file_path, "r") as f:
content = f.read()
try:
data = json.loads(content)
self.session_id_value = data['session_id']
self.session_id_created = datetime.datetime.utcfromtimestamp(int(data['created']))
self.session_id_expires = (self.session_id_created +
datetime.timedelta(minutes=self.session_timeout))
if self.session_id_expires > datetime.datetime.utcnow():
return self.session_id_value # still valid
except ValueError:
return None
except KeyError:
return None
except Exception as err:
raise Exception("Exception Type: {0}: {1}".format(type(err), err))
def write(self):
"""Write and store a Session ID to file (rw for user only)
Returns:
bool: True if successful, False otherwise.
.. todo::
(RH) Error Handling and return True/False
"""
if os.path.isfile(self.file_path):
if not SessionIDStore._validate_file_owner_and_permissions(self.file_path):
raise IOError("File exists but is not ok (wrong owner/permissions)!")
with open(self.file_path, 'w') as f:
f.write(json.dumps({'created': str(int(time.time())),
'session_id': self.session_id_value}))
os.chmod(self.file_path, 384) # 384 is '0600'
# TODO 2016-04-23 (RH): check this
if not SessionIDStore._validate_file_owner_and_permissions(self.file_path):
raise IOError("Race condition: Something happened to file during the run!")
return True
def delete(self):
"""remove session id file (e.g. when it only contains an invalid session id
Raises:
NotImplementedError
Returns:
bool: True if successful, False otherwise.
.. todo::
(RH) implement this _remove_session_id_file
"""
raise NotImplementedError("Not yet done")
@staticmethod
def _validate_file_owner_and_permissions(full_file_path):
"""validate SessionIDStore file ownership and permissions
Args:
full_file_path (unicode): full path to file on disk
Returns:
bool: True if valid and correct, otherwise False
"""
if not os.path.isfile(full_file_path):
raise IOError("Does not exist or not a file: {0}".format(full_file_path))
file_lstat = os.lstat(full_file_path)
if not file_lstat.st_uid == os.getuid():
return False
if not file_lstat.st_mode & 0o777 == 384:
""" check for unix permission User R+W only (0600)
>>> oct(384)
'0600' Python 2
>>> oct(384)
'0o600' Python 3 """
return False
return True
class Client(object):
"""PyOTRS Client class - includes Session handling
Args:
baseurl (unicode): Base URL for OTRS System, no trailing slash e.g. http://otrs.example.com
webservicename (unicode): OTRS REST Web Service Name
username (unicode): Username
password (unicode): Password
session_id_file (unicode): Session ID path on disk, used to persistently store Session ID
session_timeout (int): Session Timeout configured in OTRS (usually 28800 seconds = 8h)
https_verify (bool): Should HTTPS certificates be verified (defaults to True)
"""
def __init__(self,
baseurl,
webservicename,
username=None,
password=None,
session_id_file=None,
session_timeout=None,
https_verify=True):
if not baseurl:
raise NoBaseURL("Missing Baseurl (e.g. https://otrs.example.com)")
if not webservicename:
raise NoWebServiceName("Missing WebServiceName (e.g. GenericTicketConnectorREST)")
self.webservicename = webservicename
if not session_timeout:
self.session_timeout = 28800 # 8 hours is OTRS default
else:
self.session_timeout = session_timeout
if not session_id_file:
self.session_id_store = SessionIDStore(file_path="/tmp/.session_id.tmp",
session_timeout=self.session_timeout)
else:
self.session_id_store = SessionIDStore(file_path=session_id_file,
session_timeout=self.session_timeout)
self.https_verify = https_verify
# dummy initialization
self.operation = None
self.http_method = None
self.ticket_list = []
self.attachment_list = []
self.dynamic_field_list = []
"""
GenericInterface::Operation::Session::SessionCreate
Methods (public):
* session_create
* session_restore_or_set_up_new # try to get session_id from a (json) file on filesystem
"""
def session_check_is_valid(self, session_id=None):
"""check whether self.session_id (or session_id) is currently valid
session_id (unicode): optional If set overrides the self.session_id
SessionError if neither self.session_id nor session_id is not set
Returns:
bool: True if valid, False otherwise.
if session_id:
self.session_id = session_id
# TODO 2016-04-13 (RH): Is there a nicer way to check whether session is valid?!
url = "{0.baseurl}/otrs/nph-genericinterface.pl/Webservice/" \
"{0.webservicename}/Ticket/1".format(self)
"""create new OTRS Session and store Session ID
Returns:
bool: True if successful, False otherwise.
.. todo::
2016-04-18 (RH): decide what session_create should return (bool or result content)
"""
url = "{0.baseurl}/otrs/nph-genericinterface.pl/Webservice/" \
"{0.webservicename}/Session".format(self)
payload = {
"UserLogin": self.username,
"Password": self.password
}
# TODO 2016-04-18 (RH): decide what to return here.. bool or result content?
if self._session_create_json(url, payload):
"""Try to restore Session ID from file otherwise create new one
Raises:
SessionCreateError
SessionIDFileError
Returns:
bool: True if successful, False otherwise.
"""
# try to read session_id from file
self.session_id = self.session_id_store.read()
if self.session_id:
print("Using valid Session ID "
"from ({0})".format(self.session_id_store.file_path))
"""most likely invalid session_id so pass. Remove clear session_id_store.."""
# got no (valid) session_id; clean store and try to create new one
self.session_id = None
self.session_id_store.write()
if not self.session_create():
raise SessionCreateError("Failed to create a Session ID!")
# safe new created session_id to file
raise SessionIDFileError("Failed to save Session ID to file!")
else:
print("Saved new Session ID to file: {0}".format(self.session_id_store.file_path))
"""_session_create_json
url (unicode):
payload (dict):
Raises:
OTRSAPIError
Returns:
bool: True if update successful, False otherwise.
.. note::
Uses HTTP Method: POST
"""
self.operation = "SessionCreate"
self.http_method = "POST"
self.response = self._send_request(self.http_method, url, payload, self.https_verify)
self.response_json = self.response.json()
self._validate_response() # TODO
"""
GenericInterface::Operation::Ticket::TicketCreate
def ticket_create(self,
ticket=None,
article=None,
attachment_list=None,
dynamic_field_list=None,
**kwargs):
"""ticket_update_by_ticket_id_set_scout_id
Args:
ticket (Ticket):
article (Article): optional article
attachment_list (list): *Attachment* objects
dynamic_field_list (list): *DynamicField* object
**kwargs: any regular OTRS Fields (not for Dynamic Fields!)
.. todo::
2016-04-18 (RH): decide what ticket_create should return (bool or result content)
"""
url = "{0.baseurl}/otrs/nph-genericinterface.pl/Webservice/" \
"{0.webservicename}/Ticket".format(self)
payload = {"SessionID": self.session_id}
if not ticket:
raise TicketError("provide Ticket!")
if not article:
raise TicketError("provide Article!")
payload.update(ticket.to_dct())
if article:
article.validate()
payload.update(article.to_dct())
if attachment_list:
if dynamic_field_list:
# TODO 2016-04-18(RH): decide what to return here.. bool or result content? Or full new?
"""_ticket_create_json
url (unicode):
payload (dict):
Raises:
OTRSAPIError
ResponseJSONParseError
Returns:
bool: True if successful, False otherwise.
"""
self.operation = "TicketCreate"
self.http_method = "POST"
self.response = self._send_request(self.http_method, url, payload, self.https_verify)
self.response_json = self.response.json()
self._validate_response()
return True
"""
GenericInterface::Operation::Ticket::TicketGet
* ticket_get_by_id
* ticket_get_by_number
"""
def ticket_get_by_id(self, ticket_id, dynamic_fields=True, all_articles=False):
"""ticket_get_by_id
Args:
ticket_id (int): Integer value of a Ticket ID
dynamic_fields (bool): will request OTRS to include all
Dynamic Fields (*default: True*)
all_articles (bool): will request OTRS to include all
Articles (+default: False*)
Returns:
dict: Ticket
.. todo::
2016-04-18 (RH): decide what ticket_get_by_id should return (bool or result content)
"""
url = "{0.baseurl}/otrs/nph-genericinterface.pl/Webservice/" \
"{0.webservicename}/Ticket/{1}".format(self, ticket_id)
payload = {
"SessionID": self.session_id,
"AllArticles": int(all_articles),
"DynamicFields": int(dynamic_fields)
}
# TODO 2016-04-13 (RH): decide what to return here.. bool or ticket content
if self._ticket_get_json(url, payload):
def ticket_get_by_number(self, ticket_number, dynamic_fields=True, all_articles=False):
"""ticket_get_by_number
ticket_number (unicode): Integer value of a Ticket ID
dynamic_fields (bool): will request OTRS to include all
Dynamic Fields (*default: True*)
all_articles (bool): will request OTRS to include all
Articles (*default: False*)
Returns:
dict: Ticket
"""
if isinstance(ticket_number, int):
raise TicketError("Provide ticket_number as str/unicode. Got ticket_number as int.")
result_list = self.ticket_search(TicketNumber=ticket_number)
if len(result_list) == 0:
return None
elif len(result_list) == 1:
tid = result_list[0]
return self.ticket_get_by_id(tid,
dynamic_fields=dynamic_fields,
all_articles=all_articles)
else:
raise TicketSearchNumberMultipleResults("Found more that one result for "
"Ticket Number: {0}".format(ticket_number))
"""_ticket_get_json
url (unicode):
payload (dict):
Raises:
OTRSAPIError
ResponseJSONParseError
Returns:
bool: True if successful, False otherwise.
self.operation = "TicketGet"
self.http_method = "GET"
self.response = self._send_request(self.http_method, url, payload, self.https_verify)
self.response_json = self.response.json()
self._validate_response()
return True
"""
GenericInterface::Operation::Ticket::TicketSearch
"""
def ticket_search(self, **kwargs):
"""Wrapper for search ticket
Args:
**kwargs: Arbitrary keyword arguments (Dynamic Field).
Returns:
list: tickets that were found as list of **str**
If value of kwargs is a datetime object then this object will be
converted to the appropriate string format for REST API.
"""
url = "{0.baseurl}/otrs/nph-genericinterface.pl/Webservice/" \
"{0.webservicename}/Ticket".format(self)
payload = {
"SessionID": self.session_id,
}
if kwargs is not None:
for key, value in kwargs.items():
if isinstance(value, datetime.datetime):
value = value.strftime("%Y-%m-%d %H:%M:%S")
payload.update({key: value})
if len(payload) == 1:
raise TicketSearchNothingToLookFor("Nothing specified to look for!")
if self._ticket_search_json(url, payload):
def ticket_search_full_text(self, pattern):
"""Wrapper for search ticket for full text search
Args:
pattern (unicode): Search pattern (a % will be added to from and end automatically)
Returns:
list: tickets that were found
Notes:
Following examples is a dummy doctest. In order for this to work on Python2 the
whole docstring needs to be a (Python2) unicode string (prefixed u)
>>> str('a') + str('b')
.. warning::
Full Text is not really working yet.
.. note::
Waiting for progress on OTRS Bug: http://bugs.otrs.org/show_bug.cgi?id=11981
"""
pattern_wildcard = "%{0}%".format(pattern)
return self.ticket_search(FullTextIndex="1",
Title=pattern_wildcard,
ContentSearch="OR",
Subject=pattern_wildcard,
Body=pattern_wildcard)
def _ticket_search_json(self, url, payload):
"""_ticket_search_json
url (unicode):
payload (dict):
Raises:
OTRSAPIError
ResponseJSONParseError
Returns:
bool: True if search successful, False otherwise.
self.operation = "TicketSearch"
self.http_method = "GET"
self.response = self._send_request(self.http_method, url, payload, self.https_verify)
self.response_json = self.response.json()
self._validate_response()
"""
GenericInterface::Operation::Ticket::TicketUpdate
Methods (public):
* ticket_update
* ticket_update_set_pending
"""
def ticket_update(self,
ticket_id,
article=None,
**kwargs):
"""ticket_update_by_ticket_id_set_scout_id
attachment_list (list): *Attachment* objects
dynamic_field_list (list): *DynamicField* objects
**kwargs: any regular OTRS Fields (not for Dynamic Fields!)
dict: Response from OTRS API if successful (if Article was added new Art. ID included)
.. todo::
2016-04-17 (RH): decide what ticket_update should return (bool or result content)
"""
url = "{0.baseurl}/otrs/nph-genericinterface.pl/Webservice/" \
"{0.webservicename}/Ticket/{1}".format(self, ticket_id)
payload = {"SessionID": self.session_id}
if article:
article.validate()
payload.update(article.to_dct())
if not article:
raise TicketError("To create an attachment an article is needed!")
# TODO 2016-04-17 (RH): decide what to return here.. bool or result content? Or full new?