Newer
Older
This code implements the PyOTRS library to provide access to the OTRS API (REST)
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
"""
import os
import json
import time
import datetime
import logging
import requests
from requests.packages.urllib3 import disable_warnings
# turn of platform insecurity warnings from urllib3
disable_warnings()
logger = logging.getLogger(__name__)
# TODO 2016-04-11 (RH) ca_cert, os.environ and proxies can not stay like this!
# path to certificate bundle and set to environment
ca_certs = os.path.abspath("/etc/ssl/certs/ca-certificates.crt")
os.environ["REQUESTS_CA_BUNDLE"] = ca_certs
os.environ["CURL_CA_BUNDLE"] = ca_certs
# disable any proxies
proxies = {"http": "", "https": "", "no": ""}
class PyOTRSError(Exception):
def __init__(self, message):
super(PyOTRSError, self).__init__(message)
self.message = message
class NoBaseURL(PyOTRSError):
pass
class NoWebServiceName(PyOTRSError):
pass
class NoCredentials(PyOTRSError):
pass
class ResponseJSONParseError(PyOTRSError):
pass
class SessionError(PyOTRSError):
pass
class SessionCreateError(PyOTRSError):
pass
class SessionIDFileError(PyOTRSError):
pass
pass
class TicketDynamicFieldsParseError(PyOTRSError):
pass
class TicketSearchNothingToLookFor(PyOTRSError):
pass
class TicketError(PyOTRSError):
pass
class OTRSAPIError(PyOTRSError):
pass
class OTRSHTTPError(PyOTRSError):
pass
class UpdateAddArticleError(PyOTRSError):
pass
"""PyOTRS Article class """
def __init__(self, dct):
for key, value in dct.items():
try:
logger.debug("Parse Article: {0} --> {1}".format(key, value))
except UnicodeEncodeError:
logger.warn("Parse Article: Unicode error for Value of Key {0}".format(key))
return Article({"Subject": "Dümmy Subject",
"Body": "Hallo Bjørn,\n[kt]\n\n -- The End",
"TimeUnit": 0,
"MimeType": "text/plain",
"Charset": "UTF8"})
@classmethod
return Article({"Subject": "Dümmy Subject",
"Body": "Hallo Bjørn,\n[kt]\n\n -- The End",
"TimeUnit": 0,
"MimeType": "text/plain",
"Charset": "UTF8",
"ForceNotificationToUserID": [1, 2]})
def to_dct(self):
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
return {"Article": self.__dict__}
def validate(self):
""" validate data against a mapping dict - if a key is not present
then set it with a default value according to dict
"""
validation_map = {"Body": "API created Article Body",
"Charset": "UTF8",
"MimeType": "text/plain",
"Subject": "API created Article",
"TimeUnit": 0}
dct = self.__dict__
for key, value in validation_map.items():
if not self.__dict__.get(key, None):
dct.update({key: value})
self.__dict__ = dct
class Attachment(object):
"""PyOTRS Attachment class """
def __init__(self, Content=None, ContentType=None, Filename=None):
self.Content = Content # base64 encoded
self.ContentType = ContentType
self.Filename = Filename
def __repr__(self):
return "<{0}: {1}>".format(self.__class__.__name__, self.Filename)
@classmethod
def dummy(cls):
return Attachment("YmFyCg==", "text/plain", "dümmy.txt")
class AttachmentList(object):
"""PyOTRS Attachment class """
def __init__(self, attachments=None):
attachment_list = []
if isinstance(attachments, Attachment):
attachment_list.append(attachments)
elif isinstance(attachments, list):
for item in attachments:
attachment_list.append(item)
self.attachment_list = attachment_list
def __iter__(self):
for item in self.attachment_list:
yield item
def __repr__(self):
return "<{0}: {1} item(s)>".format(self.__class__.__name__, len(self.attachment_list))
def add(self, attachment):
""" add Attachment to AttachmentList
Args:
attachment:
Returns:
"""
def to_dct(self):
""" represent AttachmentList and related Attachment objects as dict
Returns:
"""
_lst = []
for attachments in self:
_lst.append(attachments.__dict__)
return {"Attachment": _lst}
@classmethod
def dummy(cls):
return AttachmentList(Attachment.dummy())
class DynamicField(object):
def __init__(self, name=None, value=None, dct=None):
if dct:
if name or value:
raise Exception("Please provide either \"dct\" or \"name and value\"")
else:
if name and value:
self.name = name
self.value = value
else:
raise Exception("Please provide either \"dct\" or \"name and value\"")
def __repr__(self):
return "<{0}: {1} --> {2} >".format(self.__class__.__name__, self.name, self.value)
def to_dct(self):
""" represent DynamicField as dict
Returns: dict
"""
return {"Name": self.name, "Value": self.value}
@classmethod
Returns: DynamicField
"""
return DynamicField(name="firstname", value="Jane")
@classmethod
def dummy2(cls):
""" dummy2 data (for testing)
Returns: DynamicField
return DynamicField(dct={'Name': 'lastname', 'Value': 'Doe'})
class DynamicFieldList(object):
def __init__(self, dynamic_field_dicts=None):
dynamic_field_list = []
if isinstance(dynamic_field_dicts, DynamicField):
dynamic_field_list.append(dynamic_field_dicts)
elif isinstance(dynamic_field_dicts, list):
for item in dynamic_field_dicts:
dynamic_field_list.append(item)
self.dynamic_field_list = dynamic_field_list
def __iter__(self):
for item in self.dynamic_field_list:
yield item
def __repr__(self):
return "<{0}: {1} item(s)>".format(self.__class__.__name__, len(self.dynamic_field_list))
""" add DynamicField object to DynamicFieldList
Args:
self.dynamic_field_list.append(dynamic_field)
def to_dct(self):
""" represent DynamicFieldList and related DynamicField objects as dict
Returns:
"""
_lst = []
for dynamic_field in self:
_lst.append(dynamic_field.to_dct())
return {"DynamicField": _lst}
@classmethod
def dummy(cls):
return DynamicFieldList([DynamicField(name="firstname", value="Jane"),
DynamicField(dct={'Name': 'lastname', 'Value': 'Doe'})])
class Ticket(object):
"""PyOTRS Ticket class """
def __init__(self, dct):
for key, value in dct.items():
try:
logger.debug("Parse Ticket: {0} --> {1}".format(key, value))
except UnicodeEncodeError:
logger.warn("Parse Ticket: Unicode error for Value of Key {0}".format(key))
if isinstance(value, dict):
dct[key] = Ticket(value)
return "<{0}>".format(self.__class__.__name__)
@classmethod
def create_basic(cls,
Title=None,
QueueID=None,
Queue=None,
StateID=None,
State=None,
PriorityID=None,
Priority=None,
CustomerUser=None,
""" create basic ticket
Args:
Title:
QueueID:
Queue:
StateID:
State:
PriorityID:
Priority:
CustomerUser:
**kwargs:
Returns:
"""
if not Title:
raise ValueError("Title is required")
if not Queue and not QueueID:
raise ValueError("Either Queue or QueueID required")
if not State and not StateID:
raise ValueError("Either State or StateID required")
if not Priority and not PriorityID:
raise ValueError("Either Priority or PriorityID required")
if not CustomerUser:
raise ValueError("CustomerUser is required")
dct = {u"Title": Title}
if Queue:
dct.update({u"Queue": Queue})
else:
dct.update({u"QueueID": QueueID})
if State:
dct.update({u"State": State})
else:
dct.update({u"StateID": StateID})
if Priority:
dct.update({u"Priority": Priority})
else:
dct.update({u"PriorityID": PriorityID})
dct.update({u"CustomerUser": CustomerUser})
logger.debug("Create Basic Ticket: {0} --> {1}".format(key, value))
def to_dct(self):
""" represent Ticket objects as dict
Returns:
"""
return {"Ticket": self.__dict__}
@classmethod
def dummy(cls):
return Ticket.create_basic(Queue=u"Raw",
State=u"open",
Priority=u"3 normal",
CustomerUser="root@localhost",
Title="Bäsic Ticket")
@staticmethod
def datetime_to_pending_time_str(datetime_obj=None):
""" datetime_to_pending_time_str
Args:
datetime_obj (Datetime):
Returns:
str: The pending time in the format required for OTRS REST interface
"""
"Year": datetime_obj.year,
"Month": datetime_obj.month,
"Day": datetime_obj.day,
"Hour": datetime_obj.hour,
"Minute": datetime_obj.minute
}
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
class Client(object):
"""PyOTRS Client class - includes Session handling
Args:
baseurl (str): Base URL for OTRS System, no trailing slash e.g. http://otrs.example.com
webservicename (str): OTRS REST Web Service Name
username (str): Username
password (str): Password
session_id_file (str): Session ID full path on disk, used to persistently store Session ID
session_timeout (int): Session Timeout configured in OTRS (usually 28800 seconds = 8h)
https_verify (bool): Should HTTPS certificates be verified (defaults to True)
"""
def __init__(self,
baseurl,
webservicename,
username=None,
password=None,
session_id_file=None,
session_timeout=None,
https_verify=True):
if not baseurl:
raise NoBaseURL("Missing Baseurl (e.g. https://otrs.example.com)")
self.baseurl = baseurl
if not webservicename:
raise NoWebServiceName("Missing WebServiceName (e.g. GenericTicketConnectorREST)")
self.webservicename = webservicename
if not session_id_file:
self.session_id_file = "/tmp/.session_id.tmp"
else:
self.session_id_file = session_id_file
if not session_timeout:
self.session_timeout = 28800 # 8 hours is OTRS default
self.https_verify = https_verify
# dummy initialization
self.operation = None
self.http_method = None
"""
GenericInterface::Operation::Session::SessionCreate
Methods (public):
* session_create
* session_restore_or_set_up_new # try to get session_id from a (json) file on filesystem
def session_check_is_valid(self, session_id=None):
""" check whether self.session_id (or session_id) is currently valid
Args:
session_id (str): optional If set overrides the self.session_id
SessionError if neither self.session_id nor session_id is not set
Returns:
bool: True if valid, False otherwise.
if session_id:
self.session_id = session_id
# TODO 2016-04-13 (RH): Is there a nicer way to check whether session is valid?!
url = "{0.baseurl}/otrs/nph-genericinterface.pl/Webservice/" \
"{0.webservicename}/Ticket/1".format(self)
def session_create(self):
""" create new OTRS Session and store Session ID
Returns:
bool: True if successful, False otherwise.
.. todo::
2016-04-18 (RH): decide what session_create should return (bool or result content)
"""
url = "{0.baseurl}/otrs/nph-genericinterface.pl/Webservice/" \
"{0.webservicename}/Session".format(self)
payload = {
"UserLogin": self.username,
"Password": self.password
}
# TODO 2016-04-18 (RH): decide what to return here.. bool or result content?
if self._session_create_json(url, payload):
def session_restore_or_set_up_new(self):
""" Try to restore Session ID from file otherwise create new one
Raises:
SessionCreateError
SessionIDFileError
Returns:
bool: True if successful, False otherwise.
"""
# try to read session_id from file
if self._read_session_id_from_file():
# got one.. check whether it's still valid
try:
logger.debug("Using valid Session ID from ({0})".format(self.session_id_file))
return True
except OTRSAPIError:
pass # most likely invalid session_id so pass. Remove clear session_id_file..
# got no (valid) session_id from file.. try to create new one
if not self.session_create():
logger.error("Failed to create a Session ID!")
raise SessionCreateError("Failed to create a Session ID!")
# safe new created session_id to file
if not self._write_session_id_to_file():
logger.error("Failed to save Session ID to file: {0}".format(self.session_id_file))
raise SessionIDFileError("Failed to save Session ID to file!")
else:
logger.debug("Saved new Session ID to file: {0}".format(self.session_id_file))
return True
def _session_create_json(self, url, payload):
""" _session_create_json
Args:
url (str):
payload (dict):
Raises:
OTRSAPIError
Returns:
bool: True if update successful, False otherwise.
.. note::
Uses HTTP Method: POST
"""
self.operation = "SessionCreate"
self.http_method = "POST"
self.response = self._send_request(self.http_method, url, payload, self.https_verify)
self.response_json = self.response.json()
self._validate_response() # TODO
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
def _read_session_id_from_file(self):
""" Retrieve a stored Session ID from file
Returns:
bool: True if successful, False otherwise.
"""
if os.path.isfile(self.session_id_file):
with open(self.session_id_file, "r") as f:
content = f.read()
try:
data = json.loads(content)
created = datetime.datetime.utcfromtimestamp(int(data['created']))
expires = created + datetime.timedelta(minutes=self.session_timeout)
if expires > datetime.datetime.utcnow():
self.session_id = data['session_id'] # still valid
return True
except ValueError as err:
logger.error("JSON Parse Exception: {}".format(err))
pass # pass this exception as
except Exception as err:
logger.error("Some Exception: {}".format(err))
raise Exception("Some Exception: {}".format(err))
return False
def _write_session_id_to_file(self):
""" Write and store a Session ID to file (rw for user only)
Returns:
bool: True if successful, False otherwise.
.. todo::
(RH) Error Handling and return True/False
"""
with os.fdopen(os.open(self.session_id_file, os.O_WRONLY | os.O_CREAT, 0o600), 'w') as f:
f.write(json.dumps({'created': str(int(time.time())),
'session_id': self.session_id}))
return True
def _remove_session_id_file(self):
""" remove session id file (e.g. when it only contains an invalid session id
Raises:
NotImplementedError
Returns:
bool: True if successful, False otherwise.
.. todo::
(RH) implement this _remove_session_id_file
"""
raise NotImplementedError("Not yet done")
"""
GenericInterface::Operation::Ticket::TicketCreate
public methods:
* ticket_create
"""
def ticket_create(self,
ticket=None,
article=None,
attachment_list=None,
dynamic_field_list=None,
**kwargs):
""" ticket_update_by_ticket_id_set_scout_id
Args:
ticket (Ticket):
article (Article): optional article
dynamic_field_list (DynamicFieldList):
**kwargs: any regular OTRS Fields (not for Dynamic Fields!)
.. todo::
2016-04-18 (RH): decide what ticket_create should return (bool or result content)
"""
url = "{0.baseurl}/otrs/nph-genericinterface.pl/Webservice/" \
"{0.webservicename}/Ticket".format(self)
payload = {"SessionID": self.session_id}
if not ticket:
raise TicketError("provide Ticket!")
if not article:
raise TicketError("provide Article!")
payload.update(ticket.to_dct())
if article:
article.validate()
payload.update(article.to_dct())
if attachment_list:
payload.update(attachment_list.to_dct())
if dynamic_field_list:
payload.update(dynamic_field_list.to_dct())
# TODO 2016-04-18(RH): decide what to return here.. bool or result content? Or full new?
def _ticket_create_json(self, url, payload):
""" _ticket_create_json
Args:
url (str):
payload (dict):
Raises:
OTRSAPIError
ResponseJSONParseError
Returns:
bool: True if successful, False otherwise.
"""
self.operation = "TicketCreate"
self.http_method = "POST"
self.response = self._send_request(self.http_method, url, payload, self.https_verify)
self.response_json = self.response.json()
self._validate_response()
return True
"""
GenericInterface::Operation::Ticket::TicketGet
public methods:
* ticket_get_by_id
* ticket_get_by_number
"""
def ticket_get_by_id(self, ticket_id, dynamic_fields=True, all_articles=False):
""" ticket_get_by_id
Args:
ticket_id (int): Integer value of a Ticket ID
dynamic_fields (bool): will request OTRS to include all
Dynamic Fields (*default: True*)
all_articles (bool): will request OTRS to include all
Articles (+default: False*)
Returns:
dict: Ticket
.. todo::
2016-04-18 (RH): decide what ticket_get_by_id should return (bool or result content)
"""
url = "{0.baseurl}/otrs/nph-genericinterface.pl/Webservice/" \
"{0.webservicename}/Ticket/{1}".format(self, ticket_id)
payload = {
"SessionID": self.session_id,
"AllArticles": int(all_articles),
"DynamicFields": int(dynamic_fields)
}
# TODO 2016-04-13 (RH): decide what to return here.. bool or ticket content
if self._ticket_get_json(url, payload):
def ticket_get_by_number(self, ticket_number, dynamic_fields=True, all_articles=False):
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
Args:
ticket_number (str): Integer value of a Ticket ID
dynamic_fields (bool): will request OTRS to include all
Dynamic Fields (*default: True*)
all_articles (bool): will request OTRS to include all
Articles (*default: False*)
Returns:
dict: Ticket
"""
if not isinstance(ticket_number, str):
raise TicketError("Please provide ticket_number as str!")
result_list = self.ticket_search(TicketNumber=ticket_number)
if len(result_list) == 0:
logger.debug("No Tickets found with that Ticket Number :-/")
return None
elif len(result_list) == 1:
tid = result_list[0]
return self.ticket_get_by_id(tid,
dynamic_fields=dynamic_fields,
all_articles=all_articles)
else:
logger.debug("Found more that one result with that Ticket Number?!")
return None
""" _ticket_get_json
Args:
url (str):
payload (dict):
Raises:
OTRSAPIError
ResponseJSONParseError
Returns:
bool: True if successful, False otherwise.
self.operation = "TicketGet"
self.http_method = "GET"
self.response = self._send_request(self.http_method, url, payload, self.https_verify)
self.response_json = self.response.json()
self._validate_response()
return True
"""
GenericInterface::Operation::Ticket::TicketSearch
public methods:
* ticket_search
"""
def ticket_search(self, **kwargs):
"""Wrapper for search ticket
Args:
**kwargs: Arbitrary keyword arguments (Dynamic Field).
Returns:
list: tickets that were found as list of **str**
If value of kwargs is a datetime object then this object will be
converted to the appropriate string format for REST API.
"""
url = "{0.baseurl}/otrs/nph-genericinterface.pl/Webservice/" \
"{0.webservicename}/Ticket".format(self)
payload = {
"SessionID": self.session_id,
}
if kwargs is not None:
for key, value in kwargs.items():
if isinstance(value, datetime.datetime):
value = value.strftime("%Y-%m-%d %H:%M:%S")
payload.update({key: value})
if len(payload) == 1:
raise TicketSearchNothingToLookFor("Nothing specified to look for!")
if self._ticket_search_json(url, payload):
def ticket_search_full_text(self, pattern):
"""Wrapper for search ticket for full text search
Args:
pattern (str): Search pattern (a % will be added to from and end automatically)
Returns:
list: tickets that were found
Notes:
This is a note
Examples:
>>> 2 + 2
4
>>> 'a' + 'b'
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
.. warning::
Full Text is not really working yet.
.. note::
Waiting for progress on OTRS Bug: http://bugs.otrs.org/show_bug.cgi?id=11981
"""
pattern_wildcard = "%{0}%".format(pattern)
return self.ticket_search(FullTextIndex="1",
Title=pattern_wildcard,
ContentSearch="OR",
Subject=pattern_wildcard,
Body=pattern_wildcard)
def _ticket_search_json(self, url, payload):
""" _ticket_search_json
_search_json_ticket_data
Args:
url (str):
payload (dict):
Raises:
OTRSAPIError
ResponseJSONParseError
Returns:
bool: True if search successful, False otherwise.
self.operation = "TicketSearch"
self.http_method = "GET"
self.response = self._send_request(self.http_method, url, payload, self.https_verify)
self.response_json = self.response.json()
self._validate_response()
return True
# GenericInterface :: Operation::Ticket :: TicketUpdate
# public methods
# * ticket_update
def ticket_update(self,
ticket_id,
article=None,
**kwargs):
""" ticket_update_by_ticket_id_set_scout_id
Args: