# -*- coding: utf-8 -*- from __future__ import unicode_literals # support both Python2 and 3 """ lib.py PyOTRS lib This code implements the PyOTRS library to provide access to the OTRS API (REST) """ import os import json import time import datetime import requests from requests.packages.urllib3 import disable_warnings # turn of platform insecurity warnings from urllib3 disable_warnings() # TODO 2016-04-11 (RH) ca_cert, os.environ and proxies can not stay like this! # path to certificate bundle and set to environment ca_certs = os.path.abspath("/etc/ssl/certs/ca-certificates.crt") os.environ["REQUESTS_CA_BUNDLE"] = ca_certs os.environ["CURL_CA_BUNDLE"] = ca_certs # disable any proxies proxies = {"http": "", "https": "", "no": ""} class PyOTRSError(Exception): def __init__(self, message): super(PyOTRSError, self).__init__(message) self.message = message class NoBaseURL(PyOTRSError): pass class NoWebServiceName(PyOTRSError): pass class NoCredentials(PyOTRSError): pass class ResponseJSONParseError(PyOTRSError): pass class SessionIDStoreNoFileError(PyOTRSError): pass class SessionIDStoreNoTimeoutError(PyOTRSError): pass class SessionError(PyOTRSError): pass class SessionCreateError(PyOTRSError): pass class SessionIDFileError(PyOTRSError): pass class TicketDynamicFieldsNotRequestedError(PyOTRSError): pass class TicketDynamicFieldsParseError(PyOTRSError): pass class TicketSearchNothingToLookFor(PyOTRSError): pass class TicketSearchNumberMultipleResults(PyOTRSError): pass class TicketError(PyOTRSError): pass class OTRSAPIError(PyOTRSError): pass class OTRSHTTPError(PyOTRSError): pass class UpdateAddArticleError(PyOTRSError): pass class Article(object): """PyOTRS Article class """ def __init__(self, dct): for key, value in dct.items(): self.__dict__ = dct def __repr__(self): return "<{0}>".format(self.__class__.__name__) @classmethod def _dummy(cls): """dummy data (for testing) Returns: """ return Article({"Subject": "Dümmy Subject", "Body": "Hallo Bjørn,\n[kt]\n\n -- The End", "TimeUnit": 0, "MimeType": "text/plain", "Charset": "UTF8"}) @classmethod def _dummy_force_notify(cls): """dummy data (for testing) Returns: """ return Article({"Subject": "Dümmy Subject", "Body": "Hallo Bjørn,\n[kt]\n\n -- The End", "TimeUnit": 0, "MimeType": "text/plain", "Charset": "UTF8", "ForceNotificationToUserID": [1, 2]}) def to_dct(self): """ Returns: """ return {"Article": self.__dict__} def validate(self): """validate data against a mapping dict - if a key is not present then set it with a default value according to dict """ validation_map = {"Body": "API created Article Body", "Charset": "UTF8", "MimeType": "text/plain", "Subject": "API created Article", "TimeUnit": 0} dct = self.__dict__ for key, value in validation_map.items(): if not self.__dict__.get(key, None): dct.update({key: value}) self.__dict__ = dct class Attachment(object): """PyOTRS Attachment class """ def __init__(self, Content=None, ContentType=None, Filename=None): self.Content = Content # base64 encoded self.ContentType = ContentType self.Filename = Filename def __repr__(self): return "<{0}: {1}>".format(self.__class__.__name__, self.Filename) def to_dct(self): """represent AttachmentList and related Attachment objects as dict Returns: """ return self.__dict__ @classmethod def _dummy(cls): """dummy data (for testing) Returns: """ return Attachment("YmFyCg==", "text/plain", "dümmy.txt") class DynamicField(object): """PyOTRS DynamicField class .. warning:: DynamicField representation changed between OTRS 4 and OTRS 5. **PyOTRS only supports OTRS 5 style!** """ def __init__(self, name=None, value=None, dct=None): if dct: if name or value: raise Exception("Please provide either \"dct\" or \"name and value\"") self.name = dct["Name"] self.value = dct["Value"] else: if name and value: self.name = name self.value = value else: raise Exception("Please provide either \"dct\" or \"name and value\"") def __repr__(self): return "<{0}: {1} --> {2} >".format(self.__class__.__name__, self.name, self.value) def to_dct(self): """represent DynamicField as dict Returns: dict """ return {"Name": self.name, "Value": self.value} @classmethod def _dummy1(cls): """dummy1 data (for testing) Returns: DynamicField """ return DynamicField(name="firstname", value="Jane") @classmethod def _dummy2(cls): """dummy2 data (for testing) Returns: DynamicField """ return DynamicField(dct={'Name': 'lastname', 'Value': 'Doe'}) class Ticket(object): """PyOTRS Ticket class """ def __init__(self, dct): self.attachment_list = [] self.dynamic_field_list = [] for key, value in dct.items(): if isinstance(value, dict): dct[key] = Ticket(value) self.__dict__ = dct def __repr__(self): return "<{0}>".format(self.__class__.__name__) @classmethod def create_basic(cls, Title=None, QueueID=None, Queue=None, StateID=None, State=None, PriorityID=None, Priority=None, CustomerUser=None, **kwargs): """create basic ticket Args: Title: QueueID: Queue: StateID: State: PriorityID: Priority: CustomerUser: **kwargs: Returns: """ if not Title: raise ValueError("Title is required") if not Queue and not QueueID: raise ValueError("Either Queue or QueueID required") if not State and not StateID: raise ValueError("Either State or StateID required") if not Priority and not PriorityID: raise ValueError("Either Priority or PriorityID required") if not CustomerUser: raise ValueError("CustomerUser is required") dct = {u"Title": Title} if Queue: dct.update({"Queue": Queue}) else: dct.update({"QueueID": QueueID}) if State: dct.update({"State": State}) else: dct.update({"StateID": StateID}) if Priority: dct.update({"Priority": Priority}) else: dct.update({"PriorityID": PriorityID}) dct.update({"CustomerUser": CustomerUser}) for key, value in dct.items(): dct.update({key: value}) return Ticket(dct) def to_dct(self): """represent Ticket objects as dict Returns: """ return {"Ticket": self.__dict__} @classmethod def _dummy(cls): """dummy data (for testing) Returns: """ return Ticket.create_basic(Queue=u"Raw", State=u"open", Priority=u"3 normal", CustomerUser="root@localhost", Title="Bäsic Ticket") @staticmethod def datetime_to_pending_time_text(datetime_obj=None): """datetime_to_pending_time_str Args: datetime_obj (Datetime): Returns: str: The pending time in the format required for OTRS REST interface """ return { "Year": datetime_obj.year, "Month": datetime_obj.month, "Day": datetime_obj.day, "Hour": datetime_obj.hour, "Minute": datetime_obj.minute } class SessionIDStore(object): """Session ID: persistently store to and retrieve from to file Args: file_path (unicode): Path on disc session_timeout (int): OTRS Session Timeout Value (to avoid reusing outdated session id session_id_value (unicode): A Session ID as str session_id_created (int): seconds as epoch when a session_id record was created session_id_expires (int): seconds as epoch when a session_id record expires Raises: SessionIDStoreNoFileError SessionIDStoreNoTimeoutError .. todo:: Is session_timeout correct here?! Or should this just return "expires" value and caller needs to decide whether to use it..?! """ def __init__(self, file_path=None, session_timeout=None): if not file_path: raise SessionIDStoreNoFileError("Argument file_path is required!") if not session_timeout: raise SessionIDStoreNoTimeoutError("Argument session_timeout is required!") self.file_path = file_path self.session_timeout = session_timeout self.session_id_value = None self.session_id_created = None self.session_id_expires = None def __repr__(self): return "<{0}: {1}>".format(self.__class__.__name__, self.file_path) def read(self): """Retrieve a stored Session ID from file Returns: Union(str, None): Retrieved Session ID or None (if none could be read) """ if not os.path.isfile(self.file_path): return None if not SessionIDStore._validate_file_owner_and_permissions(self.file_path): return None with open(self.file_path, "r") as f: content = f.read() try: data = json.loads(content) self.session_id_value = data['session_id'] self.session_id_created = datetime.datetime.utcfromtimestamp(int(data['created'])) self.session_id_expires = (self.session_id_created + datetime.timedelta(minutes=self.session_timeout)) if self.session_id_expires > datetime.datetime.utcnow(): return self.session_id_value # still valid except ValueError: return None except KeyError: return None except Exception as err: raise Exception("Exception Type: {0}: {1}".format(type(err), err)) def write(self): """Write and store a Session ID to file (rw for user only) Returns: bool: True if successful, False otherwise. .. todo:: (RH) Error Handling and return True/False """ if os.path.isfile(self.file_path): if not SessionIDStore._validate_file_owner_and_permissions(self.file_path): raise IOError("File exists but is not ok (wrong owner/permissions)!") with open(self.file_path, 'w') as f: f.write(json.dumps({'created': str(int(time.time())), 'session_id': self.session_id_value})) os.chmod(self.file_path, 384) # 384 is '0600' # TODO 2016-04-23 (RH): check this if not SessionIDStore._validate_file_owner_and_permissions(self.file_path): raise IOError("Race condition: Something happened to file during the run!") return True def delete(self): """remove session id file (e.g. when it only contains an invalid session id Raises: NotImplementedError Returns: bool: True if successful, False otherwise. .. todo:: (RH) implement this _remove_session_id_file """ raise NotImplementedError("Not yet done") @staticmethod def _validate_file_owner_and_permissions(full_file_path): """validate SessionIDStore file ownership and permissions Args: full_file_path (unicode): full path to file on disk Returns: bool: True if valid and correct, otherwise False """ if not os.path.isfile(full_file_path): raise IOError("Does not exist or not a file: {0}".format(full_file_path)) file_lstat = os.lstat(full_file_path) if not file_lstat.st_uid == os.getuid(): return False if not file_lstat.st_mode & 0o777 == 384: """ check for unix permission User R+W only (0600) >>> oct(384) '0600' Python 2 >>> oct(384) '0o600' Python 3 """ return False return True class Client(object): """PyOTRS Client class - includes Session handling Args: baseurl (unicode): Base URL for OTRS System, no trailing slash e.g. http://otrs.example.com webservicename (unicode): OTRS REST Web Service Name username (unicode): Username password (unicode): Password session_id_file (unicode): Session ID path on disk, used to persistently store Session ID session_timeout (int): Session Timeout configured in OTRS (usually 28800 seconds = 8h) https_verify (bool): Should HTTPS certificates be verified (defaults to True) """ def __init__(self, baseurl, webservicename, username=None, password=None, session_id_file=None, session_timeout=None, https_verify=True): if not baseurl: raise NoBaseURL("Missing Baseurl (e.g. https://otrs.example.com)") self.baseurl = baseurl.rstrip("/") if not webservicename: raise NoWebServiceName("Missing WebServiceName (e.g. GenericTicketConnectorREST)") self.webservicename = webservicename if not session_timeout: self.session_timeout = 28800 # 8 hours is OTRS default else: self.session_timeout = session_timeout if not session_id_file: self.session_id_store = SessionIDStore(file_path="/tmp/.session_id.tmp", session_timeout=self.session_timeout) else: self.session_id_store = SessionIDStore(file_path=session_id_file, session_timeout=self.session_timeout) self.https_verify = https_verify # dummy initialization self.operation = None self.http_method = None self.response = None self.response_json = None self.response_type = None self.ticket_search_result = None self.ticket_list = [] self.attachment_list = [] self.dynamic_field_list = [] # credentials self.username = username self.password = password self.session_id = None """ GenericInterface::Operation::Session::SessionCreate Methods (public): * session_check_is_valid * session_create * session_restore_or_set_up_new # try to get session_id from a (json) file on filesystem """ def session_check_is_valid(self, session_id=None): """check whether self.session_id (or session_id) is currently valid Args: session_id (unicode): optional If set overrides the self.session_id Raises: SessionError if neither self.session_id nor session_id is not set Returns: bool: True if valid, False otherwise. .. note:: Calls _ticket_get_json (GET) """ if not session_id and not self.session_id: raise SessionError("No value set for session_id!") if session_id: self.session_id = session_id # TODO 2016-04-13 (RH): Is there a nicer way to check whether session is valid?! url = "{0.baseurl}/otrs/nph-genericinterface.pl/Webservice/" \ "{0.webservicename}/Ticket/1".format(self) payload = {"SessionID": self.session_id} return self._ticket_get_json(url, payload) def session_create(self): """create new OTRS Session and store Session ID Returns: bool: True if successful, False otherwise. .. note:: Uses HTTP Method: POST .. todo:: 2016-04-18 (RH): decide what session_create should return (bool or result content) """ url = "{0.baseurl}/otrs/nph-genericinterface.pl/Webservice/" \ "{0.webservicename}/Session".format(self) payload = { "UserLogin": self.username, "Password": self.password } # TODO 2016-04-18 (RH): decide what to return here.. bool or result content? if self._session_create_json(url, payload): return self.response_json else: return None def session_restore_or_set_up_new(self): """Try to restore Session ID from file otherwise create new one Raises: SessionCreateError SessionIDFileError Returns: bool: True if successful, False otherwise. """ # try to read session_id from file self.session_id = self.session_id_store.read() if self.session_id: # got one.. check whether it's still valid try: if self.session_check_is_valid(): print("Using valid Session ID " "from ({0})".format(self.session_id_store.file_path)) return True except OTRSAPIError: """most likely invalid session_id so pass. Remove clear session_id_store..""" # got no (valid) session_id; clean store and try to create new one self.session_id = None self.session_id_store.write() if not self.session_create(): raise SessionCreateError("Failed to create a Session ID!") # safe new created session_id to file if not self.session_id_store.write(): raise SessionIDFileError("Failed to save Session ID to file!") else: print("Saved new Session ID to file: {0}".format(self.session_id_store.file_path)) return True def _session_create_json(self, url, payload): """_session_create_json Args: url (unicode): payload (dict): Raises: OTRSAPIError Returns: bool: True if update successful, False otherwise. .. note:: Uses HTTP Method: POST """ self.operation = "SessionCreate" self.http_method = "POST" self.response = self._send_request(self.http_method, url, payload, self.https_verify) self.response_json = self.response.json() self._validate_response() # TODO return True """ GenericInterface::Operation::Ticket::TicketCreate Methods (public): * ticket_create """ def ticket_create(self, ticket=None, article=None, attachment_list=None, dynamic_field_list=None, **kwargs): """ticket_update_by_ticket_id_set_scout_id Args: ticket (Ticket): article (Article): optional article attachment_list (list): *Attachment* objects dynamic_field_list (list): *DynamicField* object **kwargs: any regular OTRS Fields (not for Dynamic Fields!) .. todo:: 2016-04-18 (RH): decide what ticket_create should return (bool or result content) """ url = "{0.baseurl}/otrs/nph-genericinterface.pl/Webservice/" \ "{0.webservicename}/Ticket".format(self) payload = {"SessionID": self.session_id} if not ticket: raise TicketError("provide Ticket!") if not article: raise TicketError("provide Article!") payload.update(ticket.to_dct()) if article: article.validate() payload.update(article.to_dct()) if attachment_list: payload.update({"Attachment": attachment_list}) if dynamic_field_list: payload.update({"DynamicField": dynamic_field_list}) # TODO 2016-04-18(RH): decide what to return here.. bool or result content? Or full new? if self._ticket_create_json(url, payload): return self.response_json else: return None def _ticket_create_json(self, url, payload): """_ticket_create_json Args: url (unicode): payload (dict): Raises: OTRSAPIError ResponseJSONParseError Returns: bool: True if successful, False otherwise. """ self.operation = "TicketCreate" self.http_method = "POST" self.response = self._send_request(self.http_method, url, payload, self.https_verify) self.response_json = self.response.json() self._validate_response() return True """ GenericInterface::Operation::Ticket::TicketGet Methods (public): * ticket_get_by_id * ticket_get_by_number """ def ticket_get_by_id(self, ticket_id, dynamic_fields=True, all_articles=False): """ticket_get_by_id Args: ticket_id (int): Integer value of a Ticket ID dynamic_fields (bool): will request OTRS to include all Dynamic Fields (*default: True*) all_articles (bool): will request OTRS to include all Articles (+default: False*) Returns: dict: Ticket .. todo:: 2016-04-18 (RH): decide what ticket_get_by_id should return (bool or result content) """ url = "{0.baseurl}/otrs/nph-genericinterface.pl/Webservice/" \ "{0.webservicename}/Ticket/{1}".format(self, ticket_id) payload = { "SessionID": self.session_id, "AllArticles": int(all_articles), "DynamicFields": int(dynamic_fields) } # TODO 2016-04-13 (RH): decide what to return here.. bool or ticket content if self._ticket_get_json(url, payload): return self.response_json['Ticket'][0] else: return None def ticket_get_by_number(self, ticket_number, dynamic_fields=True, all_articles=False): """ticket_get_by_number Args: ticket_number (unicode): Integer value of a Ticket ID dynamic_fields (bool): will request OTRS to include all Dynamic Fields (*default: True*) all_articles (bool): will request OTRS to include all Articles (*default: False*) Returns: dict: Ticket """ if isinstance(ticket_number, int): raise TicketError("Provide ticket_number as str/unicode. Got ticket_number as int.") result_list = self.ticket_search(TicketNumber=ticket_number) if len(result_list) == 0: return None elif len(result_list) == 1: tid = result_list[0] return self.ticket_get_by_id(tid, dynamic_fields=dynamic_fields, all_articles=all_articles) else: raise TicketSearchNumberMultipleResults("Found more that one result for " "Ticket Number: {0}".format(ticket_number)) def _ticket_get_json(self, url, payload): """_ticket_get_json Args: url (unicode): payload (dict): Raises: OTRSAPIError ResponseJSONParseError Returns: bool: True if successful, False otherwise. .. note:: Uses HTTP Method: GET """ self.operation = "TicketGet" self.http_method = "GET" self.response = self._send_request(self.http_method, url, payload, self.https_verify) self.response_json = self.response.json() self._validate_response() return True """ GenericInterface::Operation::Ticket::TicketSearch Methods (public): * ticket_search * ticket_search_full_text """ def ticket_search(self, **kwargs): """Wrapper for search ticket Args: **kwargs: Arbitrary keyword arguments (Dynamic Field). Returns: list: tickets that were found as list of **str** .. note:: If value of kwargs is a datetime object then this object will be converted to the appropriate string format for REST API. """ url = "{0.baseurl}/otrs/nph-genericinterface.pl/Webservice/" \ "{0.webservicename}/Ticket".format(self) payload = { "SessionID": self.session_id, } if kwargs is not None: for key, value in kwargs.items(): if isinstance(value, datetime.datetime): value = value.strftime("%Y-%m-%d %H:%M:%S") payload.update({key: value}) if len(payload) == 1: raise TicketSearchNothingToLookFor("Nothing specified to look for!") if self._ticket_search_json(url, payload): return self.ticket_search_result def ticket_search_full_text(self, pattern): """Wrapper for search ticket for full text search Args: pattern (unicode): Search pattern (a % will be added to from and end automatically) Returns: list: tickets that were found Notes: Following examples is a dummy doctest. In order for this to work on Python2 the whole docstring needs to be a (Python2) unicode string (prefixed u) Examples: >>> 2 + 2 4 >>> str('a') + str('b') 'ab' .. warning:: Full Text is not really working yet. .. note:: Waiting for progress on OTRS Bug: http://bugs.otrs.org/show_bug.cgi?id=11981 """ pattern_wildcard = "%{0}%".format(pattern) return self.ticket_search(FullTextIndex="1", Title=pattern_wildcard, ContentSearch="OR", Subject=pattern_wildcard, Body=pattern_wildcard) def _ticket_search_json(self, url, payload): """_ticket_search_json _search_json_ticket_data Args: url (unicode): payload (dict): Raises: OTRSAPIError ResponseJSONParseError Returns: bool: True if search successful, False otherwise. .. note:: Uses HTTP Method: GET """ self.operation = "TicketSearch" self.http_method = "GET" self.response = self._send_request(self.http_method, url, payload, self.https_verify) self.response_json = self.response.json() self._validate_response() return True """ GenericInterface::Operation::Ticket::TicketUpdate Methods (public): * ticket_update * ticket_update_set_pending """ def ticket_update(self, ticket_id, article=None, attachment_list=None, dynamic_field_list=None, **kwargs): """ticket_update_by_ticket_id_set_scout_id Args: ticket_id (int): article (Article): attachment_list (list): *Attachment* objects dynamic_field_list (list): *DynamicField* objects **kwargs: any regular OTRS Fields (not for Dynamic Fields!) Returns: dict: Response from OTRS API if successful (if Article was added new Art. ID included) .. todo:: 2016-04-17 (RH): decide what ticket_update should return (bool or result content) """ url = "{0.baseurl}/otrs/nph-genericinterface.pl/Webservice/" \ "{0.webservicename}/Ticket/{1}".format(self, ticket_id) payload = {"SessionID": self.session_id} if article: article.validate() payload.update(article.to_dct()) if attachment_list: if not article: raise TicketError("To create an attachment an article is needed!") payload.update({"Attachment": attachment_list}) if dynamic_field_list: payload.update({"DynamicField": dynamic_field_list}) # TODO 2016-04-17 (RH): decide what to return here.. bool or result content? Or full new? if self._ticket_update_json(url, payload): return self.response_json else: return None def ticket_update_set_pending(self, ticket_id, new_state="pending reminder", pending_days=1, pending_hours=0): """ticket_update_set_state_pending Args: ticket_id (int): new_state (unicode): defaults to "pending reminder" pending_days (int): defaults to 1 pending_hours (int): defaults to 0 Returns: bool: True if update successful, False otherwise. .. todo:: 2016-04-17 (RH): decide what ticket_update_set_pending should return """ url = "{0.baseurl}/otrs/nph-genericinterface.pl/Webservice/" \ "{0.webservicename}/Ticket/{1}".format(self, ticket_id) datetime_now = datetime.datetime.utcnow() pending_till = datetime_now + datetime.timedelta(days=pending_days, hours=pending_hours) pending_till_str = pending_till.strftime("%Y-%m-%d %H:%M:%S UTC") pt = Ticket.datetime_to_pending_time_text(datetime_obj=pending_till) payload = { "SessionID": self.session_id, "Ticket": {"State": new_state, "PendingTime": pt} } # TODO 2016-04-17 (RH): decide what to return here.. bool or result content? Or full new? if self._ticket_update_json(url, payload): return "Set pending till {0} with state \"{1}\"".format(pending_till_str, new_state) else: return None def _ticket_update_json(self, url, payload): """_ticket_update_json Args: url (unicode): payload (dict): Raises: OTRSAPIError Returns: bool: True if update successful, False otherwise. .. note:: Uses HTTP Method: PATCH """ self.operation = "TicketUpdate" self.http_method = "PATCH" self.response = self._send_request(self.http_method, url, payload, self.https_verify) self.response_json = self.response.json() self._validate_response() # TODO 2016-04-17 (RH): is this "extra net" needed?! if "Article" in payload.keys(): try: article_id = self.response_json.get('ArticleID', None) if not article_id: raise UpdateAddArticleError("No new Article was created?!") except Exception as err: raise UpdateAddArticleError("Unknown Exception: {0}".format(err)) return True @staticmethod def _send_request(http_method=None, url=None, payload=None, https_verify=True): """_ticket_request Args: http_method (unicode): HTTP Method to be used url (unicode): URL payload (dict): Payload to be sent https_verify (bool): (optional) whether the SSL cert will be verified. A CA_BUNDLE path can also be provided. Defaults to True. Raises: OTRSHTTPError: Returns: requests.Response: .. note:: Supported HTTP Methods: GET, HEAD, PATCH, POST, PUT """ if not (http_method and url and payload): raise ValueError("http_method, url and payload are required.") if not http_method.upper() in ["GET", "HEAD", "PATCH", "POST", "PUT"]: raise NotImplementedError("Accepted HTTP Methods: GET, HEAD, PATCH, POST, PUT") headers = {"Content-Type": "application/json"} json_payload = json.dumps(payload) # print("sending {0} to {1} as {2}".format(payload, url, http_method.upper())) try: response = requests.request(http_method.upper(), url, proxies=proxies, data=json_payload, headers=headers, verify=https_verify) # critical error: HTTP request resulted in an error! except Exception as err: # raise OTRSHTTPError("get http") raise OTRSHTTPError("Failed to access OTRS. Check Hostname, Proxy, SSL Certificate!" "Error with http communication: {0}".format(err)) return response def _validate_response(self): """_validate_response Raises: OTRSAPIError Returns: bool True """ if self.operation == "SessionCreate": self.response_type = "SessionID" elif self.operation == "TicketGet": self.response_type = "Ticket" elif self.operation == "TicketCreate" or self.operation == "TicketUpdate": self.response_type = "TicketID" elif self.operation == "TicketSearch": self.response_type = "TicketID" else: raise NotImplementedError("Unknown Operation!") # handle TicketSearch operation first. special: empty search result has no "TicketID" self.ticket_search_result = [] if self.operation == "TicketSearch": if not self.response_json: return True if self.response_json.get(self.response_type, None): self.ticket_search_result = self.response_json['TicketID'] return True # now handle other operations if self.response_json.get(self.response_type, None): self.response_error = False elif self.response_json.get("Error", None): self.response_error = True else: self.response_error = True # critical error: Unknown response from OTRS API - FAIL NOW! raise ResponseJSONParseError("Unknown key in response JSON DICT!") # report error if self.response_error: raise OTRSAPIError("Failed to access OTRS API. Check Username and Password! " "Session ID expired?!\nOTRS Error Code: {0}\nOTRS Error Mesg: {1}" "".format(self.response_json["Error"]["ErrorCode"], self.response_json["Error"]["ErrorMessage"])) return True # EOF