Axione-FTTH-Test/axione_api/api.py

68 lines
2.4 KiB
Python

import base64
import http.client
import sys
from datetime import (datetime, timezone)
def ptoRequest(ptoRef):
ts = datetime.now(timezone.utc).isoformat()
return f'''
<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:ent="http://structureadresseftth.axione.fr/model/entreprise" xmlns:com="http://structureadresseftth.axione.fr/model/commun">
<soapenv:Header/>
<soapenv:Body>
<ent:obtentionStructureAdresseDemandeSoap>
<ent:entete versionWS="3.0" horodatageRequete="{ts}">
<com:operateurCommercial nom="AQUILENET" identifiant=""/>
</ent:entete>
<ent:referenceAdresse referenceHexacle="" identifiantImmeuble="" referencePTO="{ptoRef}" referenceBAN="">
</ent:referenceAdresse>
</ent:obtentionStructureAdresseDemandeSoap>
</soapenv:Body>
</soapenv:Envelope>
'''
def query_axione_pto(cfg, ptoRef):
body = ptoRequest(ptoRef)
# Note: the password should be the base64 of username:password.
# Don't ask why.
passwd = base64.b64encode(f"{cfg.username}:{cfg.password}".encode("utf8")).decode("utf8")
headers = {
'User-Agent': 'aquilenet-elig-test/0.1',
'Accept': '*/*',
'Accept-Encoding': 'identity',
'Connection': 'Keep-Alive',
'Authorization': passwd
}
resp = None
if not cfg.debug:
try:
conn = http.client.HTTPSConnection("ws-eligftth-val.axione.fr", 443, source_address=(cfg.source_addr,0), timeout=120)
conn.request("POST", "/v3/fai", body, headers=headers)
response = conn.getresponse()
respData = response.read()
except Exception as e:
print("Error while querying Axione: ", file=sys.stderr)
print(str(e), file=sys.stderr)
print("Query Body: ")
print(body)
print("Query Headers: ")
print(str(headers))
sys.exit(1)
finally:
conn.close()
else:
print("===================")
print("Injecting dummy response for request: ")
print("HEADERS: ")
print(headers)
print("BODY: ")
print(body)
print("===================")
with open("./fixtures/dummy-data-1.xml","r") as f:
dummyData = f.read()
return dummyData
return resp
def parse_response(resp_str):
return