Databricks Setup Document
- Prerequisites
- What does this notebook do upon execution?
- Methods to create a Service Principal, Warehouse and grant permissions
- Common Methods
- Method to fetch all workspaces of a deployment name
- Method to get the metastore id of a given workspace
- Method to get the name of the Metastore admin of the metastore
- Method to create Service Principal using API
- Method to list groups and fetch metastore admin group id
- Method to add a member to a group
- Method to create a warehouse
- Method to grant permissions to the service principal on a warehouse
- Method to create a catalog
- Method to create schemas
- Method to create a Service Principal Secret
- Method to execute grants to the Service Principal
- Method to enable a System Schema
- Method to enable verbose logging
- Create a Service Principal, warehouse and grant permissions
- Get workspace id of the current deployment name
- Get the metastore id of the resultant workspace id
- Get Metastore admin name of the metastore
- Fetch the id of the metastore_admins_group
- Create a Service Principal
- Add Service Principal to metastore_admins_group
- Create a warehouse
- Grant permissions to the service principal on a warehouse
- Create a catalog
- Create schemas
- Create Service Principal Secret
- Execute grants to the Service Principal
- Enable 'access' System-Schema
- Enable verbose logging
- Result
Prerequisites
- A compute cluster with the
Access mode
set asShared
is required so that the unity catalog is accessible from this notebook. Also this compute cluster should be attached to the notebook before the execution. Please refer to the cluster access mode documentation. - The Metastore Admin group should be created and the user who will execute this notebook must be part of that group along with the other admin users. Please refer to the admin privileges in Unity Catalog documentation.
- To execute the methods in this notebook, the below variables are required.
Databricks Account ID
TrustLogix Service Principal Name
Account Admin Username
andAccount Admin Password
to invoke the Databricks APIs
What does this notebook do upon execution?
- A Service Principal with the given name in the notebook arguments will be created
- The Service Principal will be added to the Metastore Admin group of the current workspace.
- A SQL warehouse with the name
TLX_WAREHOUSE
will be created if it does not exist - Service Principal will be granted
CAN USE
permissions onTLX_WAREHOUSE
- A catalog with the name
tlx_policy_db
will be created if it does not exist - Couple of schemas
tlx_entitlement_schema
andtlx_policies
will be created in the catalogtlx_policy_db
if they don't exist - The Service principal will be granted relevant permissions on the TrustLogix catalog and schemas
- The Service Principal Secret will be created
- It will enable
system.access.audit
table with the verbose option for monitoring purpose.
Import the required libraries
from getpass import getpass
from http import HTTPStatus
from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type
import requests
import json
Set the below values
ACCOUNT_ID = input("DATABRICKS ACCOUNT ID ")
SERVICE_PRINCIPAL_DISPLAY_NAME = input("TRUSTLOGIX SERVICE PRINCIPAL NAME ")
USERNAME = input("ACCOUNT ADMIN USERNAME ")
PASSWORD = getpass("ACCOUNT ADMIN PASSWORD ")
Methods to create a Service Principal, Warehouse and grant permissions
IMPORTANT - DO NOT EDIT
DEPLOYMENT_NAME = spark.conf.get("spark.databricks.workspaceUrl").split(".")[0]
WORKSPACE_URL = f"https://{DEPLOYMENT_NAME}.cloud.databricks.com"
ACCOUNT_URL = "https://accounts.cloud.databricks.com"
WAREHOUSE_NAME = "TLX_WAREHOUSE"
CATALOG_NAME = "tlx_policy_db"
SCHEMAS = ["tlx_entitlement_schema", "tlx_policies"]
DF_SCHEMA = "KEY string, VALUE string"
SUCCESS_MESSAGE = "SUCCESS: {}"
WAIT_TIME_IN_SECONDS = 10
NUMBER_OF_RETRIES = 3
Common Methods
def get_http_basic_auth_for_account_apis():
return requests.auth.HTTPBasicAuth(username = USERNAME, password = PASSWORD)
Method to fetch all workspaces of a deployment name
@retry(stop = stop_after_attempt(NUMBER_OF_RETRIES), wait = wait_fixed(WAIT_TIME_IN_SECONDS), reraise = True)
def get_all_workspaces(deployment_name):
url = f"{ACCOUNT_URL}/api/2.0/accounts/{ACCOUNT_ID}/workspaces"
basic_auth = get_http_basic_auth_for_account_apis()
response = requests.get(url = url, auth = basic_auth)
workspaces = []
if response.status_code == HTTPStatus.OK:
print(SUCCESS_MESSAGE.format("All workspaces fetched."))
workspaces = response.json()
else:
error_message = response.json()["message"]
raise Exception(f"Could not fetch the groups. {error_message}")
for w in workspaces:
if w["deployment_name"] == deployment_name:
return w["workspace_id"]
raise Exception(f"Could not find the workspace with deployment name {deployment_name}.")
Method to get the metastore id of a given workspace
@retry(stop = stop_after_attempt(NUMBER_OF_RETRIES), wait = wait_fixed(WAIT_TIME_IN_SECONDS), reraise = True)
def get_metastore_id_of_workspace(workspace_id):
url = f"{ACCOUNT_URL}/api/2.0/accounts/{ACCOUNT_ID}/workspaces/{workspace_id}/metastore"
basic_auth = get_http_basic_auth_for_account_apis()
response = requests.get(url = url, auth = basic_auth)
if response.status_code == HTTPStatus.OK:
print(SUCCESS_MESSAGE.format("Workspace-Metastore mapping fetched."))
return response.json()["metastore_assignment"]["metastore_id"]
else:
error_message = response.json()["message"]
raise Exception(f"Could not fetch the Workspace-Metastore mapping. {error_message}")
Method to get the name of the Metastore admin of the metastore
@retry(stop = stop_after_attempt(NUMBER_OF_RETRIES), wait = wait_fixed(WAIT_TIME_IN_SECONDS), reraise = True)
def get_metastore_admin_of_metastore(metastore_id):
url = f"{ACCOUNT_URL}/api/2.0/accounts/{ACCOUNT_ID}/metastores/{metastore_id}"
basic_auth = get_http_basic_auth_for_account_apis()
response = requests.get(url = url, auth = basic_auth)
if response.status_code == HTTPStatus.OK:
print(SUCCESS_MESSAGE.format("Metastore details fetched."))
return response.json()["metastore_info"]["owner"]
else:
error_message = response.json()["message"]
raise Exception(f"Could not fetch the Metastore admin of the metastore with id {metastore_id}. {error_message}")
Method to create Service Principal using API
@retry(stop = stop_after_attempt(NUMBER_OF_RETRIES), wait = wait_fixed(WAIT_TIME_IN_SECONDS), reraise = True)
def create_service_principal(service_principal_display_name):
request_body = {
"displayName": service_principal_display_name,
"groups": [],
"entitlements": [
{
"value": "workspace-access"
},
{
"value": "databricks-sql-access"
}
],
"schemas": [
"urn:ietf:params:scim:schemas:core:2.0:ServicePrincipal"
],
"active": True
}
basic_auth = get_http_basic_auth_for_account_apis()
response = requests.post(
url = f"{WORKSPACE_URL}/api/2.0/preview/scim/v2/ServicePrincipals",
auth = basic_auth,
json = request_body
)
response_dict = {}
if response.status_code == HTTPStatus.CREATED:
print(SUCCESS_MESSAGE.format(f"Service principal {service_principal_display_name} created with the following details."))
response_dict = response.json()
else:
error_message = response.json()["message"]
raise Exception(f"Could not create the service principal with name {service_principal_display_name}. {error_message}")
return response_dict
Method to list groups and fetch metastore admin group id
@retry(stop = stop_after_attempt(NUMBER_OF_RETRIES), wait = wait_fixed(WAIT_TIME_IN_SECONDS), reraise = True)
def get_metastore_admins_group_id(metastore_admins_group_name):
url = f"{ACCOUNT_URL}/api/2.0/accounts/{ACCOUNT_ID}/scim/v2/Groups"
basic_auth = get_http_basic_auth_for_account_apis()
response = requests.get(url = url, auth = basic_auth)
response_dict = {}
if response.status_code == HTTPStatus.OK:
print(SUCCESS_MESSAGE.format("All groups fetched."))
response_dict = response.json()
else:
error_message = response.json()["message"]
raise Exception(f"Could not fetch the list of groups. {error_message}")
groups_list = response_dict["Resources"]
for g in groups_list:
if g["displayName"] != metastore_admins_group_name:
continue
return g["id"]
raise Exception(f"Could not find the {metastore_admins_group_name} group. Please create the group and continue with the
script by reexecuting this cell and the following ones.")
Method to add a member to a group
@retry(stop = stop_after_attempt(NUMBER_OF_RETRIES), wait = wait_fixed(WAIT_TIME_IN_SECONDS), reraise = True)
def add_member_to_group(group_id, member_id):
request_body = {
"schemas": [
"urn:ietf:params:scim:api:messages:2.0:PatchOp"
],
"Operations": [
{
"op": "add",
"path": "members",
"value": [
{
"value": f"{member_id}"
}
]
}
]
}
url = f"{ACCOUNT_URL}/api/2.0/accounts/{ACCOUNT_ID}/scim/v2/Groups/{group_id}"
basic_auth = get_http_basic_auth_for_account_apis()
response = requests.patch(url = url, json = request_body, auth = basic_auth)
response_dict = {}
if response.status_code == HTTPStatus.OK:
print(SUCCESS_MESSAGE.format(f"Member with id {member_id} was added to group with id {group_id}"))
else:
error_message = response.json()["message"]
raise Exception(f"Could not add member with id {member_id} to the group with id {group_id}. {error_message}")
Method to create a warehouse
@retry(stop = stop_after_attempt(NUMBER_OF_RETRIES), wait = wait_fixed(WAIT_TIME_IN_SECONDS), reraise = True)
def create_a_warehouse(warehouse_name):
warehouse_id = ""
# ---------- check if warehouse already exists ----------
url = f"{WORKSPACE_URL}/api/2.0/sql/warehouses"
basic_auth = get_http_basic_auth_for_account_apis()
response = requests.get(url = url, auth = basic_auth)
if response.status_code == HTTPStatus.OK:
warehouses_list = response.json()["warehouses"]
for w in warehouses_list:
if w["name"] == warehouse_name:
warehouse_id = w["id"]
print(f"Warehouse with name {warehouse_name} already exists.")
return warehouse_id
else:
error_message = response.json()["message"]
raise Exception(f"Could not list the warehouse. {error_message}")
# ---------- create the warehouse ----------
request_body = {
"name": warehouse_name,
"cluster_size": "2X-Small",
"min_num_clusters": 1,
"max_num_clusters": 1,
"auto_stop_mins": 60,
"creator_name": USERNAME,
"tags": {},
"spot_instance_policy": "COST_OPTIMIZED",
"enable_photon": True,
"channel": {},
"enable_serverless_compute": True,
"warehouse_type": "CLASSIC"
}
url = f"{WORKSPACE_URL}/api/2.0/sql/warehouses"
response = requests.post(url = url, auth = basic_auth, json = request_body)
if response.status_code == HTTPStatus.OK:
print(SUCCESS_MESSAGE.format(f"Warehouse {warehouse_name} created successfully."))
warehouse_id = response.json()["id"]
else:
error_message = response.json()["message"]
raise Exception(f"Could not create warehouse with the name {warehouse_name}. {error_message}")
return warehouse_id
Method to grant permissions to the service principal on a warehouse
@retry(stop = stop_after_attempt(NUMBER_OF_RETRIES), wait = wait_fixed(WAIT_TIME_IN_SECONDS), reraise = True)
def grant_permissions_on_a_warehouse(service_principal_application_id, warehouse_id):
request_body = {
"access_control_list": [
{
"service_principal_name": service_principal_application_id,
"permission_level": "CAN_USE"
}
]
}
url = f"{WORKSPACE_URL}/api/2.0/permissions/warehouses/{warehouse_id}"
basic_auth = get_http_basic_auth_for_account_apis()
response = requests.patch(url = url, auth = basic_auth, json = request_body)
if response.status_code == HTTPStatus.OK:
print(SUCCESS_MESSAGE.format(f"Service principal with application id {service_principal_application_id} was granted
permissions on the warehouse with id {warehouse_id}."))
else:
error_message = response.json()["message"]
raise Exception(f"Could not grant permissions to the service principal with application id
{service_principal_application_id} on the warehouse with id {warehouse_id}. {error_message}")
Method to create a catalog
@retry(stop = stop_after_attempt(NUMBER_OF_RETRIES), wait = wait_fixed(WAIT_TIME_IN_SECONDS), reraise = True)
def create_catalog(catalog_name):
# ---------- check whether the catalog already exists ----------
url = f"{WORKSPACE_URL}/api/2.1/unity-catalog/catalogs/{catalog_name}"
basic_auth = get_http_basic_auth_for_account_apis()
response = requests.get(url = url, auth = basic_auth)
if response.status_code == HTTPStatus.OK:
print(SUCCESS_MESSAGE.format(f"Catalog {catalog_name} already exists."))
return
# ---------- create the catalog ----------
request_body = {
"name": catalog_name,
"securable_type": "CATALOG",
"securable_kind": "CATALOG_STANDARD"
}
url = f"{WORKSPACE_URL}/api/2.1/unity-catalog/catalogs"
response = requests.post(url = url, auth = basic_auth, json = request_body)
if response.status_code == HTTPStatus.OK:
print(SUCCESS_MESSAGE.format(f"Catalog {catalog_name} was created."))
else:
error_message = response.json()["message"]
raise Exception(f"Could not create catalog {catalog_name}. {error_message}")
Method to create schemas
@retry(stop = stop_after_attempt(NUMBER_OF_RETRIES), wait = wait_fixed(WAIT_TIME_IN_SECONDS), reraise = True)
def create_schema(schema_name, catalog_name):
# ---------- check if the schema already exists ----------
url = f"{WORKSPACE_URL}/api/2.1/unity-catalog/schemas/{catalog_name}.{schema_name}"
params = {
"catalog_name" : catalog_name
}
basic_auth = get_http_basic_auth_for_account_apis()
response = requests.get(url = url, auth = basic_auth)
if response.status_code == HTTPStatus.OK:
print(SUCCESS_MESSAGE.format(f"Schema {catalog_name}.{schema_name} already exists."))
return
# ---------- create the schema ----------
url = f"{WORKSPACE_URL}/api/2.1/unity-catalog/schemas"
request_body = {
"name": schema_name,
"catalog_name": catalog_name
}
response = requests.post(url = url, auth = basic_auth, json = request_body)
if response.status_code == HTTPStatus.OK:
print(SUCCESS_MESSAGE.format(f"Schema {schema_name} was created in the catalog {catalog_name}."))
else:
error_message = response.json()["message"]
raise Exception(f"Could not create schema {schema_name} in the catalog {catalog_name}. {error_message}")
Method to create a Service Principal Secret
@retry(stop = stop_after_attempt(NUMBER_OF_RETRIES), wait = wait_fixed(WAIT_TIME_IN_SECONDS), reraise = True)
def create_service_principal_secret(service_principal_id):
url = f"{ACCOUNT_URL}/api/2.0/accounts/{ACCOUNT_ID}/servicePrincipals/{service_principal_id}/credentials/secrets"
basic_auth = get_http_basic_auth_for_account_apis()
response = requests.post(url, auth = basic_auth)
response_dict = {}
if response.status_code == 200:
print(SUCCESS_MESSAGE.format("Service principal Secret was created."))
response_dict = response.json()
else:
error_message = response.json()["message"]
raise Exception(f"Could not create the service principal secret. {error_message}")
return response_dict
Method to execute grants to the Service Principal
@retry(stop = stop_after_attempt(NUMBER_OF_RETRIES), wait = wait_fixed(WAIT_TIME_IN_SECONDS), reraise = True)
def execute_sql_statement(sql_statement):
print(f"Executing ==> {sql_statement}")
spark.sql(sql_statement)
print(SUCCESS_MESSAGE.format("Grants executed."))
@retry(stop = stop_after_attempt(NUMBER_OF_RETRIES), wait = wait_fixed(WAIT_TIME_IN_SECONDS), reraise = True)
def execute_grants_to_service_principal(application_id):
tlx_policy_db_grants = f"GRANT USE CATALOG, CREATE FUNCTION, CREATE TABLE, USE SCHEMA, SELECT ON CATALOG {CATALOG_NAME} TO `
{application_id}`"
execute_sql_statement(tlx_policy_db_grants)
concat_string = f"CONCAT('GRANT USE CATALOG, USE SCHEMA, SELECT, MODIFY, APPLY TAG ON CATALOG ', catalog_name, ' TO `
{application_id}`')"
catalog_grants_df = spark.sql(f"select {concat_string} as grant_statements from `system`.information_schema.catalogs where
catalog_name <> 'system';")
catalog_grants_rows = catalog_grants_df.collect()
for row in catalog_grants_rows:
execute_sql_statement(row['grant_statements'])
print(SUCCESS_MESSAGE.format("All grants were executed successfully"))
Method to enable a System Schema
@retry(stop = stop_after_attempt(NUMBER_OF_RETRIES), wait = wait_fixed(WAIT_TIME_IN_SECONDS), reraise = True)
def enable_system_schema(metastore_id, system_schema_name, application_id):
url = f"{WORKSPACE_URL}/api/2.1/unity-catalog/metastores/{metastore_id}/systemschemas/{system_schema_name}"
basic_auth = get_http_basic_auth_for_account_apis()
response = requests.put(url, auth = basic_auth)
response_dict = {}
if response.status_code == 200:
print(SUCCESS_MESSAGE.format(f"System Schema '{system_schema_name}' is now available."))
elif response.json()["error_code"] == "SCHEMA_ALREADY_EXISTS":
print(SUCCESS_MESSAGE.format(f"System Schema '{system_schema_name}' is already enabled."))
else:
error_message = response.json()["message"]
raise Exception(f"System Schema '{system_schema_name}' was not found. {error_message}")
grant_query = f"GRANT USE CATALOG, USE SCHEMA, SELECT ON CATALOG system TO `{application_id}`"
execute_sql_statement(grant_query)
return
Method to enable verbose logging
@retry(stop = stop_after_attempt(NUMBER_OF_RETRIES), wait = wait_fixed(WAIT_TIME_IN_SECONDS), reraise = True)
def enable_verbose_logging():
url = f"{WORKSPACE_URL}/api/2.0/workspace-conf"
basic_auth = get_http_basic_auth_for_account_apis()
request_body = {
"enableVerboseAuditLogs": "true"
}
response = requests.patch(url, auth = basic_auth, json = request_body)
if response.status_code == 204:
print(SUCCESS_MESSAGE.format(f"Verbose logging has been enabled."))
else:
error_message = response.json()["message"]
raise Exception(f"Could not enable verbose logging. {error_message}")
return
Create a Service Principal, warehouse and grant permissions
Get workspace id of the current deployment name
workspace_id = get_all_workspaces(DEPLOYMENT_NAME)
print(f"WORKSPACE_ID of the deployment {DEPLOYMENT_NAME} is {workspace_id}")
Get the metastore id of the resultant workspace id
metastore_id = get_metastore_id_of_workspace(workspace_id)
print(f"METASTORE_ID of the workspace with id {workspace_id} is {metastore_id}")
Get Metastore admin name of the metastore
metastore_admins_group_name = get_metastore_admin_of_metastore(metastore_id)
print(f"Metastore admin of the metastore with id {metastore_id} is {metastore_admins_group_name}")
Fetch the id of the metastore_admins_group
metastore_admins_group_id = get_metastore_admins_group_id(metastore_admins_group_name)
print(f"Metastore admins group Id : {metastore_admins_group_id}")
Create a Service Principal
# ---------- Creation of Service Principal ----------
created_service_principal = create_service_principal(SERVICE_PRINCIPAL_DISPLAY_NAME)
# ---------- Get the id and applicationId ----------
created_service_principal_id = created_service_principal["id"]
created_service_principal_application_id = created_service_principal["applicationId"]
# ---------- Display as a dataframe ----------
created_service_principal_groups = ", ".join([g["value"] for g in created_service_principal["groups"]])
created_service_principal_entitlements = ", ".join([e["value"] for e in created_service_principal["entitlements"]])
created_service_principal_df_data = [
["id", created_service_principal["id"]],
["applicationId", created_service_principal["applicationId"]],
["displayName", created_service_principal["displayName"]],
["groups", created_service_principal_groups],
["entitlements", created_service_principal_entitlements],
["active", created_service_principal["active"]]
]
created_service_principal_df = spark.createDataFrame(data = created_service_principal_df_data, schema = DF_SCHEMA)
display(created_service_principal_df)
Add Service Principal to metastore_admins_group
# ---------- Add Service Principal to metastore_admins_group ----------
add_member_to_group(metastore_admins_group_id, created_service_principal_id)
Create a warehouse
# ---------- Create a warehouse ----------
warehouse_id = create_a_warehouse(WAREHOUSE_NAME)
print(f"WAREHOUSE ID: {warehouse_id}")
Grant permissions to the service principal on a warehouse
# ---------- Grant permissions to the service principal on the created warehouse warehouse ----------
grant_permissions_on_a_warehouse(created_service_principal_application_id, warehouse_id)
Create a catalog
create_catalog(CATALOG_NAME)
Create schemas
for schema_name in SCHEMAS:
create_schema(schema_name, CATALOG_NAME)
Create Service Principal Secret
# Display as a dataframe
created_service_principal_secret_response = create_service_principal_secret(created_service_principal_id)
if (created_service_principal_secret_response != {}):
created_service_principal_secret_df_data = [
["id", created_service_principal_secret_response["id"]],
["secret", created_service_principal_secret_response["secret"]],
["secret_hash", created_service_principal_secret_response["secret_hash"]],
["create_time", created_service_principal_secret_response["create_time"]],
["update_time", created_service_principal_secret_response["update_time"]],
["status", created_service_principal_secret_response["status"]],
]
created_service_principal_secret_df = spark.createDataFrame(data = created_service_principal_secret_df_data, schema =
DF_SCHEMA)
display(created_service_principal_secret_df)
Execute grants to the Service Principal
execute_grants_to_service_principal(created_service_principal_application_id)
Enable 'access' System-Schema
enable_system_schema(metastore_id, "access", created_service_principal_application_id)
Enable verbose logging
enable_verbose_logging()
Result
# ---------- Display as a dataframe ----------
final_result_df_data = [
["Workspace Deployment Name", DEPLOYMENT_NAME],
["SQL Warehouse Identifier", warehouse_id],
["Trustlogix Service Principal Name", created_service_principal["displayName"]],
["Trustlogix Client ID", created_service_principal_application_id],
["Trustlogix Client Secret", created_service_principal_secret_response["secret"]]
]
final_result_df = spark.createDataFrame(data = final_result_df_data, schema = DF_SCHEMA)
display(final_result_df)