Skip to content
Merged
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 134 additions & 44 deletions src/api/handlers/job_api.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#pylint: disable=too-many-lines,too-few-public-methods,too-many-locals,too-many-statements,too-many-branches
#pylint: disable=too-many-lines,too-few-public-methods,too-many-locals,too-many-statements,too-many-branches
import os
import json
import time
Expand Down Expand Up @@ -43,34 +43,116 @@ def delete_file(path):
except Exception as error:
logger.warning("Failed to delete file: %s", error)


def get_token_by_app_role(app_role_url, role_id, secret_id):
app_role = {'role_id': role_id, 'secret_id': secret_id}
json_data = json.dumps(app_role)
for i in range(0, 10):
res = requests.post(url=app_role_url, data=json_data, verify=False)
if res.status_code == 200:
json_res = json.loads(res.content)
token = json_res['auth']['client_token']
return token
time.sleep(5)
err_msg = "Getting token from Vault error even tried 10 times, url is {}, API response is {}:{}".format(app_role_url, res.status_code, res.text)
abort(400, err_msg)


def get_value_from_vault(url, token, secret_key, verify):
for i in range(0, 10):
response = requests.get(url=url, headers={'X-Vault-Token': token}, verify=verify)
if response.status_code == 200:
json_res = json.loads(response.content)
if json_res['data'].get('data') and isinstance(json_res['data'].get('data'), dict):
value = json_res['data'].get('data').get(secret_key)
else:
value = json_res['data'].get(secret_key)
return value
time.sleep(5)
err_msg = "Getting value from Vault error even tried 10 times, url is {}, API response is {}:{}".format(url, response.status_code, response.text)
abort(400, err_msg)
class Vault():
def __init__(self, base_url, namespace, version, role_id, secret_id):
self.base_url = base_url
self.namespace = namespace
self.version = version
self.role_id = role_id
self.secret_id = secret_id
self.token = None
self.policies = None

def get_token_by_app_role(self):
self.policies = self.get_policies(self.token)
if self.token and self.policies:
return self.token
app_role = {'role_id': self.role_id, 'secret_id': self.secret_id}
app_role_url = self.base_url + '/v1/' + self.namespace + '/auth/approle/login' if self.namespace else self.base_url + '/v1/auth/approle/login'
json_data = json.dumps(app_role)
for i in range(0, 10):
res = requests.post(url=app_role_url, data=json_data, verify=False)
if res.status_code == 200:
json_res = json.loads(res.content)
self.token = json_res['auth']['client_token']
self.policies = self.get_policies(self.token)
return self.token
time.sleep(5)
err_msg = "Getting token from Vault error even tried 10 times, url is {}, API response is {}:{}".format(app_role_url, res.status_code, res.text)
abort(400, err_msg)

def get_policies(self, token):
"""Return the policies associated with the provided token."""
if not token:
return None
try:
lookup_url = self.base_url + '/v1/' + self.namespace + '/auth/token/lookup-self' if self.namespace else self.base_url + 'v1/auth/token/lookup-self'
res = requests.get(url=lookup_url, headers={"X-Vault-Token": token}, verify=False)
if res.status_code == 200:
json_res = json.loads(res.content)
policies = json_res['data']['policies']
return policies
return None
except Exception as e:
logger.debug("Token validation failed: %s", str(e))
return None

def generate_batch_token(self, service_token, ttl="1h"):
"""
Generate a batch token using AppRole credentials.
:param service_token: A service token generated when logging in with AppRole.
:param ttl: Time-to-live for the batch token.
:return: The generated batch token.
"""
if self.policies is None:
self.policies = self.get_policies(service_token)
if 'token-creator' in self.policies:
self.policies.remove('token-creator')

batch_payload = {
"type": "batch",
"policies": self.policies or ['default'],
"ttl": ttl
}
url = self.base_url + '/v1/' + self.namespace + '/auth/token/create' if self.namespace else self.base_url + '/v1/auth/token/create'
try:
for i in range(0, 10):
res = requests.post(url=url, json=batch_payload, headers={"X-Vault-Token": service_token}, verify=False)
if res.status_code == 200:
json_res = json.loads(res.content)
token = json_res['auth']['client_token']
return token
elif res.status_code == 403:
logger.info("Getting batch token from Vault forbidden: {}".format(res.text))
return None
time.sleep(5)
msg = "Getting batch token from Vault failed even tried 10 times, url is {}, API response is {}:{}".format(
url, res.status_code, res.text)
logger.info(msg)
return None
except Exception as e:
logger.info("Exception when getting batch token from Vault: {}".format(e))
return None

def _get_api_url(self, secret_path):
url = self.base_url
if not self.namespace:
self.namespace = ''
if self.version == 'v1':
url += '/v1/' + self.namespace + '/' + secret_path if self.namespace else '/v1/' + secret_path
elif self.version == 'v2':
paths = secret_path.split('/')
url += '/v1/' + self.namespace + '/' + paths[0] + '/data/' + '/'.join(paths[1:]) if self.namespace else '/v1/' + paths[0] + '/data/' + '/'.join(paths[1:])
return url

def get_value_from_vault(self, token, secret_path, secret_key, verify):
try:
url = self._get_api_url(secret_path)
for i in range(0, 10):
response = requests.get(url=url, headers={'X-Vault-Token': token}, verify=verify, timeout=30)
if response.status_code == 200:
json_res = json.loads(response.content)
if json_res['data'].get('data') and isinstance(json_res['data'].get('data'), dict):
value = json_res['data'].get('data').get(secret_key)
else:
value = json_res['data'].get(secret_key)
return value
time.sleep(5)
err_msg = "Getting value from Vault error even tried 10 times, url is {}, API response is {}:{}".format(url, response.status_code, response.text)
abort(400, err_msg)
except Exception as e:
err_msg = "Getting value from Vault exception: {}, url is {}".format(str(e), url)
abort(400, err_msg)


@api.route("/api/job/job", doc=False)
Expand Down Expand Up @@ -278,6 +360,8 @@ def get(self):
''', [data['project']['id']])

is_fork = data['job'].get('fork', False)
# Cache Vault instances per vault name+project so we create them only once
vault_cache = {}

def get_secret_type(name):
try:
Expand All @@ -300,10 +384,10 @@ def get_auth_type(res):
def get_secret(name):
secret_type = get_secret_type(name)
if secret_type == 'vault':
vault = json.loads(name)
vault_name = vault['$vault']
secret_path = vault['$vault_secret_path']
secret_key = vault['$vault_secret_key']
vault_cfg = json.loads(name)
vault_name = vault_cfg['$vault']
secret_path = vault_cfg['$vault_secret_path']
secret_key = vault_cfg['$vault_secret_key']

result = g.db.execute_one("""
SELECT url, version, token, ca, namespace, role_id, secret_id FROM vault WHERE name = %s and project_id = %s
Expand All @@ -313,30 +397,36 @@ def get_secret(name):
abort(400, "Cannot get Vault '%s' in project '%s' " % (vault_name, data['project']['id']))

url, version, token, ca, namespace, role_id, secret_id = result[0], result[1], result[2], result[3], result[4], result[5], result[6]
if not namespace:
namespace = ''
if version == 'v1':
url += '/v1/' + namespace + '/' + secret_path if namespace else '/v1/' + secret_path
elif version == 'v2':
paths = secret_path.split('/')
url += '/v1/' + namespace + '/' + paths[0] + '/data/' + '/'.join(paths[1:]) if namespace else '/v1/' + paths[0] + '/data/' + '/'.join(paths[1:])
# choose validate way
validate_res = get_auth_type(result)

# key vault per project to avoid collisions
vault_key = f"{vault_name}-{data['project']['id']}"
vault_client = vault_cache.get(vault_key)
if not vault_client:
vault_client = Vault(url, namespace, version, role_id, secret_id)
vault_cache[vault_key] = vault_client

if validate_res == 'token':
logger.info('validate way is token')
token_to_use = token
elif validate_res == 'appRole':
app_role_url = result[0] + '/v1/' + namespace + '/auth/approle/login' if namespace else result[0] + '/v1/auth/approle/login'
token = get_token_by_app_role(app_role_url, role_id, secret_id)
logger.info('validate way is appRole')
token_to_use = vault_client.get_token_by_app_role()
batch_token = vault_client.generate_batch_token(token_to_use)
if batch_token:
token_to_use = batch_token
else:
abort(400, "Validate way is '%s' ! result is '%s' " % (validate_res, result))

if not ca:
return get_value_from_vault(url, token, secret_key, False)
logger.info('Start to get value fron vault %s %s %s' % (token_to_use, secret_path, secret_key))
return vault_client.get_value_from_vault(token_to_use, secret_path, secret_key, False)
else:
with tempfile.NamedTemporaryFile(delete=False) as f:
f.write(ca)
f.flush() # ensure all data written
return get_value_from_vault(url, token, secret_key, f.name)
return vault_client.get_value_from_vault(token_to_use, secret_path, secret_key, f.name)
else:
if is_fork:
abort(400, 'Access to secret %s is not allowed from a fork' % name)
Expand Down