We have a flow where we sign JWT tokens using python GCP IAMCredentialsClient the following way:
def sign_jwt(self, payload, aud=None):
"""
@param payload:
@param aud:
@return:
"""
self.logger.info("Signing jwt")
payload.update({
"iss": self.service_account_email,
"aud": aud if aud else Identity.JWT_AUDIENCE,
"iat": int(time.time()),
})
self.logger.info("Payload", extra=payload)
if not self.signer:
self.signer = IAMCredentialsClient()
signed_jwt_resp = self.signer.sign_jwt(name=self.signer_name, payload=json.dumps(payload))
self.logger.info("Jwt signed successfully")
return signed_jwt_resp.signed_jwt
We then later use the signed JWT for some internal exchanging mechanism that we have and we "count" on GCP validation capabilities for have this exchanging mechanism work. The validation process looks like this:
def _get_certificates(self, service_account):
"""
@param service_account:
@return:
"""
return requests.get(f"https://www.googleapis.com/robot/v1/metadata/x509/{service_account}").json()
def _get_jwt_pkey(self, iss, kid):
"""
@param iss:
@param kid:
@return:
"""
if not iss.endswith(f"@{self.app.project_id}.iam.gserviceaccount.com"):
raise Exception("Invalid issuer")
if iss not in self.certificates:
self.certificates[iss] = self._get_certificates(iss)
certificate = self.certificates.get(iss).get(kid)
# now translate to pkey
# load the public key from the certificate
private_key = load_pem_x509_certificate(certificate.encode()).public_key()
# translate public key bytes to PEM
public_key = private_key.public_bytes(Encoding.PEM, PublicFormat.SubjectPublicKeyInfo)
return public_key
def verify_jwt(self, jwt):
"""
@param jwt:
@return:
"""
try:
# extract metadata [NO VERIFICATION]
header, payload, signed_section, signature = google_unverified_decode(jwt)
# get appropriate public key
public_key = self._get_jwt_pkey(payload.get("iss"), header.get("kid"))
# Actual verify the token
decoded_token = pyjwt.decode(jwt, key=public_key, algorithms=["RS256"], audience=Identity.JWT_AUDIENCE)
return decoded_token
except Exception:
self.logger.exception("Failed to verify jwt")
raise
This works very good for a while but all of a sudden, the signed JWT kid does not exist in https://www.googleapis.com/robot/v1/metadata/x509/my-service@**-**.iam.gserviceaccount.com (which is fetched using the _get_certificates
method) and therefore we cannot verify the JWT token any more and it becomes "useless".
The are a few questions to ask here:
- Why did those keys in https://www.googleapis.com/robot/v1/metadata/x509/my-service@**-**.iam.gserviceaccount.com rotate?
- When does google rotate managed service account keys?
- How can we overcome this problem?
- Where can I find documentation about this?
Thanks in advance!
CodePudding user response:
Google Cloud rotates Google-managed certificate keys about every 12 hours.
If the elapsed time from signing to validating is more than 12 hours, that might be your problem.
The only public reference that I am aware of is here
That link is for Google Cloud Storage, but the key rotation detail applies. SignBlob
and SignJwt
are basically the same API service.
Solution:
You can sign the JWT yourself using your service account's private key to not be subject to key rotation. All Python JWT libraries have that ability.
Example:
import jwt
''' Load the Google Service Account Credentials from Json file '''
with open(filename, 'r') as f:
data = f.read()
json_cred = json.loads(data)
pkey = json_cred['private_key']
...
# Encode the headers and payload and sign creating a Signed JWT (JWS)
signed_jwt = jwt.encode(payload, pkey, algorithm="RS256", headers=headers)