I built a python client REST API wrapper for an Okta authenticated pricing API. My code runs but I am not getting any response back. I need to be able to get a JSON response. I believe a print statement would work but I hit a wall do not know what argument to pass the print statement in order to receive a response from "conn = http.client.HTTPSConnection("sso.lukka.tech")"
import http.client
import urllib.request, urllib.parse, urllib.error
import json
import base64
def loadJson(response):
body = response.read()
if body == "" or body is None:
print(("Empty response found with status " str(response.status)))
return {}
else:
return json.loads(body)
class DataPricingClient:
def __init__(self, host, clientId, clientSecret):
conn = http.client.HTTPSConnection("sso.lukka.tech")
path = "/oauth2/aus1imo2fqcx5Ik4Q0h8/v1/token"
encodedData = base64.b64encode(bytes(f"{clientId}:{clientSecret}", "ISO-8859-1")).decode("ascii")
authHeader = "Basic " encodedData
headers = {
"Authorization": authHeader,
"Accept": "application/json",
"Content-Type": "application/x-www-form-urlencoded"
}
params = urllib.parse.urlencode({
"grant_type": "client_credentials",
"scope": "pricing"
})
conn.request("POST", path, params, headers)
response = conn.getresponse()
if response.status != 200:
raise ApiErrorException(response.status, "Failed to get access token")
self.host = host
self.accessToken = loadJson(response)["access_token"]
def default_headers(self):
return {
"Content-Type": "application/json",
"Accept": "application/json",
"Authorization": "Bearer " self.accessToken
}
def _send_request(self,path):
with http.client.HTTPSConnection(self.host) as conn:
headers = self.default_headers()
conn.request("GET", path, None, headers)
response = conn.getresponse()
return response
def get_available_sources(self):
path = "/v1/pricing/sources"
return _send_request(path)
def get_source_details(self, sourceId):
path = f"/v1/pricing/sources/{sourceId}"
return _send_request(path)
def get_latest_prices(self,asOf, sourceId, pairCodes):
path = f"/v1/pricing/sources/{sourceId} {pairCodes}"
return _send_request(path)
def historical_prices(self, sourceId, pairCode, begTs, endTs, fill, limit, variances):
path = f"/v1/pricing/sources/{sourceId} {pairCode} {begTs} {endTs} {fill} {limit} {variances}"
return _send_request(path)
class ApiErrorException(Exception):
def __init__(self, status, msg):
self.msg = "Error " str(status) ": " msg
def __str__(self):
return self.msg
if __name__ == '__main__':
from pricing_api_creds import lukka_pricing
c = DataPricingClient(**lukka_pricing)
CodePudding user response:
I have tried the api you have given in the problem . it seems to working and also getting the response back
click to see the response image
CodePudding user response:
The problem you have is because of a redirect. The API is basically telling you you have to go to a different page. You can do that manually (by yourself) or use a library to do that. One way to do it with http.client
is like in this answer: https://stackoverflow.com/a/20475712/3352383
But I would suggest you to use requests because it does it automatically and usually it is also easier to use. Good luck!