HIGHQA/EPICOR Middleware

GCM

Overview

Developed a middleware solution that integrates Epicor, an enterprise resource planning (ERP) software, with HIGHQA, a quality control software. This integration bridges the gap between two essential systems, ensuring seamless communication and automating quality control tasks based on ERP data.


Key Features

  1. Timed Synchronization: Using a timed function, the system routinely processes and updates jobs between both applications.
  2. Authentication and Authorization: Implemented token-based authentication to ensure secure access to HIGHQA.
  3. Error Handling: Robust error handling in place to manage HTTP errors, connection issues, timeouts, and other request-related errors.
  4. Dynamic Data Processing: The middleware extracts job data from Epicor, checks for its existence in HIGHQA, and accordingly creates or updates the job.
  5. SMTP Integration: Integrated an SMTP server to send alerts and notifications related to the data processing. For instance, if a master job is missing in HIGHQA, an email alert is triggered.
  6. Closing Jobs: The system can detect completed jobs in Epicor and update their status in HIGHQA to reflect closure.
  7. Swagger API Management: Used Swagger definitions for API payloads and endpoint management, ensuring standardization and ease of updates.
highqa_epicr_job_update.py
#from utils import ErrorHandling, HIGHQAPost
import requests
import json
from swagger import swagger  # Assuming that swagger is a separate module with necessary definitions
import os
from dotenv import load_dotenv
import requests
import json
import threading
from config import HIGHQA_USERNAME, HIGHQA_PASSWORD, EPICOR_USERNAME, EPICOR_PASSWORD,EPICOR_PROCESS_JOB_API,EPICOR_CLOSE_JOB_API
from smtp import SMTP
import urllib3

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)


class HighQA:
    def __init__(self):
        self.headers = {}
        self.TokenUpdateTimer()
        self.Job = {}
        self.CloseJob={}
        self.HighQaPackage = None
        self.JobMasterPackage=None
        self.job_set_payload={}
        self.job_list_payload={}
        self.lot_set_payload={}
        self.lot_list_payload={}
        self.aql_sample_payload={}

    def ErrorHandling(self, Response):
        try:
            Response
            return "Success"
        except requests.exceptions.HTTPError as errh:
            print("An Http Error occurred:" + repr(errh))
            return "An Http Error occurred:" + repr(errh)
        except requests.exceptions.ConnectionError as errc:
            print("An Error Connecting to the API occurred:" + repr(errc))
            return "An Error Connecting to the API occurred:" + repr(errc)
        except requests.exceptions.Timeout as errt:
            print("A Timeout Error occurred:" + repr(errt))
            return "A Timeout Error occurred:" + repr(errt)
        except requests.exceptions.RequestException as err:
            print("An Unknown Error occurred" + repr(err))
            return "An Unknown Error occurred" + repr(err)

    def TokenUpdateTimer(self):
        threading.Timer(43200.0, self.TokenUpdateTimer).start()
        print ("\n*******Setting Token*******\n")
        SMTP.ClearErrors
        responseHighQA = requests.post(swagger.TokenAuth, verify=False, json=(swagger.TokenAuthPayload))
        
        print (responseHighQA)
        AuthToken = responseHighQA.json()
        if responseHighQA.status_code == 200:
            print("\n*******Token Success Response from HighQA*******\n")
            AuthToken = responseHighQA.json()
            global headers
            self.headers = {"Authorization": "Bearer " + AuthToken["AccessToken"]}
            print(self.headers)
        else:
            print("\n*******Token ERROR Response from HighQA*******\n")
            print(responseHighQA.json())

    def HIGHQAPost(self, url, payload, files):
        print("\nCurrent URL " + url + "\n")
        print(files)
        print("Payload \n\n")
        print(json.dumps(payload))
        responseHighQA = requests.post(
        url=url, verify=False,files=files, json=(payload), headers=self.headers)
        print("\n*******HighQA Post Response*******\n")
        print(responseHighQA)
        print("\n\n\n")
        self.ErrorHandling(responseHighQA)
        #print(payload)
        self.HighQaPackage = responseHighQA.json()
        print(self.HighQaPackage)
        self.HighQaPackage2 = responseHighQA
        return responseHighQA.json()

    def JobSetFunc(self, GUID):
        print("\n*******Setting Up Job*******\n")
        # Set swagger.JobSet Payload's PartGUID to already existing Guid in HIGHQA
        self.job_set_payload = swagger.JobSetPayload.copy()
        self.job_set_payload["InputJob"]["PartGUID"] = ('%s' % GUID)
        self.job_set_payload['InputJob']['Number'] = self.Job["Number"]
        self.job_set_payload['InputJob']['Title'] = self.Job["PartDescription"]
        self.job_set_payload['InputJob']['Revision'] = self.Job["PartRev"]
        self.job_set_payload['InputJob']['Quantity'] = self.Job["ProdQty"]
        self.job_set_payload['InputJob']['Status'] = 1
        self.job_set_payload['InputJob']['ActivationDate'] = self.Job["StartDate"]
        self.job_set_payload['InputJob']['ERPID'] = ""
        self.job_set_payload['InputJob']['BarcodeID'] = ""
        self.job_set_payload['InputJob']['AQLMode'] = 1
        self.job_set_payload['InputJob']['AQLTableGUID'] = self.JobMasterPackage.get('Jobs')[0]['AQLTableGUID']
        self.job_set_payload['InputJob']['DeliveryDate'] = self.Job["DueDate"]
        print(self.job_set_payload)
        print(json.dumps(self.job_set_payload))

        self.HIGHQAPost(swagger.JobSet, (self.job_set_payload), '')  # Post Payload in HIGHQAPost Function

        JobGUID = self.HighQaPackage['OutputJob']['GUID']  # Find Lot Guid from new Job

        self.lot_list_payload = swagger.LotListPayload.copy()
        self.lot_list_payload["JobGUID"] = JobGUID
        print(self.lot_list_payload)
        self.HIGHQAPost(swagger.LotList, self.lot_list_payload, '')

        self.lot_set_payload = swagger.LotSetPayload.copy()
        LotGUID = self.HighQaPackage.get('Lots')[0]['GUID']
        self.lot_set_payload["InputLot"]["GUID"] = LotGUID
        self.lot_set_payload["InputLot"]["JobGUID"] = JobGUID
        self.lot_set_payload["InputLot"]["Number"] = self.Job["Number"]
        self.lot_set_payload["InputLot"]["Status"] = 1
        self.lot_set_payload["InputLot"]["StartDate"] = self.Job["StartDate"]
        self.lot_set_payload["InputLot"]["DueDate"] = self.Job["DueDate"]
        self.lot_set_payload["InputLot"]["Size"] = self.Job["ProdQty"]
        self.lot_set_payload["InputLot"]["ERPID"] = ""
        self.lot_set_payload["InputLot"]["BarcodeID"] = 0
        self.lot_set_payload["InputLot"]["SamplesPerHour"] = 0
        self.lot_set_payload["InputLot"]["HoursPerShift"] = 0
        self.lot_set_payload["InputLot"]["QualityStage"] = 1
        self.lot_set_payload["InputLot"]["QualityLevel"] = 0
        self.lot_set_payload["InputLot"]["AQLTableGUID"] = self.JobMasterPackage.get('Jobs')[0]['AQLTableGUID']
        self.lot_set_payload["InputLot"]["InspectionLevel"] = 2
        print(self.lot_set_payload)
        self.HIGHQAPost(swagger.LotSet, self.lot_set_payload, '')
        # Include the logic for the job_set_func from your original code

        self.aql_sample_payload = swagger.AQLSamplePayload.copy()
        self.aql_sample_payload["LotGUID"] = LotGUID
        self.aql_sample_payload["StartFrom"] = 1
        self.aql_sample_payload["Prefix"] = "#"
        self.aql_sample_payload["InspectLastSample"] = "false"
        self.aql_sample_payload["DimsToInspect"] = 0
        self.aql_sample_payload["FirstFullSamples"] = 0
        self.aql_sample_payload["LastFullSamples"] = 0
        self.HIGHQAPost(swagger.AQLSamples,self.aql_sample_payload,'')

        print(self.aql_sample_payload)


    def process_jobs(self):
        print("Grabbing Data From Epicor")
        responseEpicor = requests.get(f'{EPICOR_PROCESS_JOB_API}', verify=False, auth=(f'{EPICOR_USERNAME}', f'{EPICOR_PASSWORD}'))
        packages_Epicor = responseEpicor.json()
        packages_str = json.dumps(packages_Epicor, indent=2)
        print(packages_str)

        if responseEpicor.status_code == 200:
            print('Success!\n')
            for value in packages_Epicor['value']:
                self.extract_job_data(value)
                if self.Job['PSFile']:
                    self.handle_job_with_file()
                else:
                    self.handle_job_without_file()
        elif responseEpicor.status_code == 404:
            self.handle_epicor_error()
        print('Restart')

    def extract_job_data(self, value):
        self.Job["PartNumber"] = value['Part_RootPartNum_c']
        self.JobMaster = value['Part_PartNum'] + "_Master"
        print(self.JobMaster)
        self.Job["PartDescription"] = value['Part_PartDescription']
        self.Job["PartRev"] = value['JobHead_RevisionNum']
        self.Job["Number"] = value['JobHead_JobNum']
        self.Job["StartDate"] = value['JobHead_StartDate'] + '.0000000-07:00'
        self.Job["DueDate"] = value['JobHead_DueDate'] + '.0000000-07:00'
        self.Job["ProdQty"] = value['JobHead_ProdQty']
        self.Job['PSFile'] = value['XFileRef_XFileName']

    def handle_job_with_file(self):
        print("\n*******Files are Added in Epicor*******\n")
        self.job_list_payload = swagger.JobListPayload.copy()
        self.job_list_payload["Number"] = self.Job["Number"]
        self.HIGHQAPost(swagger.JobList, self.job_list_payload, '')
        if self.HighQaPackage.get('Jobs'):
            self.handle_existing_job_in_highqa()
        else:
            self.handle_new_job_in_highqa()

    def handle_existing_job_in_highqa(self):
        print("\n*******\n" + self.Job["Number"] + " Is already in HighQA \n")
        if self.Job["PartRev"] == self.HighQaPackage.get('Jobs')[0]['Revision']:
            print("\n******\n" + self.Job["Number"] + " No Rev Update")
        else:
            self.update_job_revision()

    def update_job_revision(self):
        print("\n******\n" + self.Job["Number"] + " Rev Update!")
        # Add logic to update job revision if needed

    def handle_new_job_in_highqa(self):
        print("\n*******Job is Not in HighQA*******\n")
        print("\n*******Check to see if part Master is in HIGHQA*******\n")
        #JobMaster = self.Job["PartNumber"] + "_Master"
        #print(JobMaster)
        self.job_list_payload = swagger.JobListPayload.copy()
        self.job_list_payload["Number"] = self.JobMaster
        self.HIGHQAPost(swagger.JobList, self.job_list_payload, '')
        print (self.HighQaPackage)
        self.JobMasterPackage = self.HighQaPackage
        if self.HighQaPackage.get('Jobs'):
            self.create_job_from_master_guid()
        else:
            self.notify_missing_job_master()

    def create_job_from_master_guid(self):
        print("\n*******Job Master is in HIGHQA*******\n")
        print("\n*******Creating Job from GUID*******\n")
        self.JobSetFunc(self.HighQaPackage.get('Jobs')[0]['PartGUID'])
        self.JobMasterPackage

    def notify_missing_job_master(self):
        print("\n*******Job Master is NOT in HIGHQA*******\n")
        SMTP.message = """\
            Subject: Hi there

            This Part needs to be added to HIGHQA. No Job Master"""

        Subject = "Part Cannot be Added to HighQA"

        SMTP.EmailAlert(self.Job["Number"],self.Job["PartNumber"],SMTP.alias_email,SMTP.Quality_email,SMTP.message,Subject)

    def close_jobs(self):
        responseEpicor = requests.get(
            f'{EPICOR_CLOSE_JOB_API}', verify=False, auth=(f'{EPICOR_USERNAME}', f'{EPICOR_PASSWORD}'))
        packages_Epicor = responseEpicor.json()
        packages_str = json.dumps(packages_Epicor, indent=2)
        print(packages_str)

        if responseEpicor.status_code == 200:
            print('Success!\n')
            for value in packages_Epicor['value']:
                self.extract_close_job_data(value)
                self.job_list_payload = swagger.JobListPayload.copy()
                self.job_list_payload["Number"] = self.CloseJob["Number"]
                self.HIGHQAPost(swagger.JobList, self.job_list_payload, '')

                if self.HighQaPackage.get('Jobs'):
                    self.update_job_to_close_status(value)
                else:
                    self.handle_close_job_not_found()
        else:
            self.handle_epicor_error()

    def extract_close_job_data(self, value):
        self.CloseJob["Number"] = value['JobHead_JobNum']
        self.CloseJob["PartNumber"] = value['Part_PartNum']

    def update_job_to_close_status(self, value):
        job = self.HighQaPackage.get('Jobs')[0]
        self.job_set_payload["InputJob"].update({
            "GUID": job['GUID'],
            "PartGUID": job['PartGUID'],
            "Number": job['Number'],
            "Title": job['Title'],
            "Revision": job['Revision'],
            "Quantity": job['Quantity'],
            "Status": 3,
            "ActivationDate": job['ActivationDate'],
            "ERPID": "",
            "BarcodeID": "",
            "AQLMode": job['AQLMode'],
            "AQLTableGUID": job['AQLTableGUID'],
            "DeliveryDate": value['JobHead_DueDate'] + '.0000000-07:00'
        })

        print(self.job_set_payload)
        self.HIGHQAPost(swagger.JobSet, self.job_set_payload, '')

    def handle_close_job_not_found(self):
        print('Not Found.')
        SMTP.message = """\
            Job Does Not Exist In Highqa"""

        Subject = "Could not add %s to HighQA" % (self.CloseJob["Number"])
        SMTP.EmailAlert(self.CloseJob["Number"], self.CloseJob["PartNumber"], SMTP.alias_email, SMTP.Quality_email, SMTP.message, Subject)
            


Main.py
import threading
#from config import HIGHQAUSERNAME, HIGHQAPASSWORD
from highqa_epicor_job_update import HighQA
from smtp import SMTP
#from utils import TokenUpdateTimer

def main():

    # Call your job update timer function from the Epicor instance
    #epicor.process_jobs()
    def run_jobs():
        #TokenUpdateTimer()
        epicor_process = HighQA()
        epicor_close=HighQA()
        epicor_process.process_jobs()
        epicor_close.close_jobs()
        threading.Timer(1800, run_jobs).start() # 1800 seconds = 30 minutes

    # Start the run_jobs function
    run_jobs()
if __name__ == "__main__":
    main()

More Projects