Databricks Setup Document

Prerequisites

  1. A compute cluster with the Access mode set as Shared is required so that the unity catalog is accessible from this notebook. Also this compute cluster should be attached to the notebook before the execution. Please refer to the cluster access mode documentation.
  2. The Metastore Admin group should be created and the user who will execute this notebook must be part of that group along with the other admin users. Please refer to the admin privileges in Unity Catalog documentation.
  3. To execute the methods in this notebook, the below variables are required.
    • Databricks Account ID
    • TrustLogix Service Principal Name
    • Account Admin Username and Account Admin Password to invoke the Databricks APIs

What does this notebook do upon execution?

  1. A Service Principal with the given name in the notebook arguments will be created
  2. The Service Principal will be added to the Metastore Admin group of the current workspace.
  3. A SQL warehouse with the name TLX_WAREHOUSE will be created if it does not exist
  4. Service Principal will be granted CAN USE permissions on TLX_WAREHOUSE
  5. A catalog with the name tlx_policy_db will be created if it does not exist
  6. Couple of schemas tlx_entitlement_schema and tlx_policies will be created in the catalog tlx_policy_db if they don't exist
  7. The Service principal will be granted relevant permissions on the TrustLogix catalog and schemas
  8. The Service Principal Secret will be created
  9. It will enable system.access.audit table with the verbose option for monitoring purpose.

Import the required libraries

from getpass import getpass
from http import HTTPStatus
from tenacity import retry, stop_after_attempt, wait_fixed, retry_if_exception_type
import requests
import json

Set the below values

ACCOUNT_ID = input("DATABRICKS ACCOUNT ID ")
SERVICE_PRINCIPAL_DISPLAY_NAME = input("TRUSTLOGIX SERVICE PRINCIPAL NAME ")
USERNAME = input("ACCOUNT ADMIN USERNAME ")
PASSWORD = getpass("ACCOUNT ADMIN PASSWORD ")

Methods to create a Service Principal, Warehouse and grant permissions

IMPORTANT - DO NOT EDIT

DEPLOYMENT_NAME = spark.conf.get("spark.databricks.workspaceUrl").split(".")[0]
WORKSPACE_URL = f"https://{DEPLOYMENT_NAME}.cloud.databricks.com"
ACCOUNT_URL = "https://accounts.cloud.databricks.com"
WAREHOUSE_NAME = "TLX_WAREHOUSE"
CATALOG_NAME = "tlx_policy_db"
SCHEMAS = ["tlx_entitlement_schema""tlx_policies"]
DF_SCHEMA = "KEY string, VALUE string"
SUCCESS_MESSAGE = "SUCCESS: {}"
WAIT_TIME_IN_SECONDS = 10
NUMBER_OF_RETRIES = 3

Common Methods

def get_http_basic_auth_for_account_apis():
  return requests.auth.HTTPBasicAuth(username = USERNAME, password = PASSWORD)

Method to fetch all workspaces of a deployment name

@retry(stop = stop_after_attempt(NUMBER_OF_RETRIES), wait = wait_fixed(WAIT_TIME_IN_SECONDS), reraise = True)
def get_all_workspaces(deployment_name):
  url = f"{ACCOUNT_URL}/api/2.0/accounts/{ACCOUNT_ID}/workspaces"
  basic_auth = get_http_basic_auth_for_account_apis()
 
  response = requests.get(url = url, auth = basic_auth)
  workspaces = []
  if response.status_code == HTTPStatus.OK:
    print(SUCCESS_MESSAGE.format("All workspaces fetched."))
    workspaces = response.json()
  else:
    error_message = response.json()["message"]
    raise Exception(f"Could not fetch the groups. {error_message}")
 
  for w in workspaces:
    if w["deployment_name"] == deployment_name:
      return w["workspace_id"]
  
  raise Exception(f"Could not find the workspace with deployment nam{deployment_name}.")

Method to get the metastore id of a given workspace

@retry(stop = stop_after_attempt(NUMBER_OF_RETRIES), wait = wait_fixed(WAIT_TIME_IN_SECONDS), reraise = True)
def get_metastore_id_of_workspace(workspace_id):
  url = f"{ACCOUNT_URL}/api/2.0/accounts/{ACCOUNT_ID}/workspaces/{workspace_id}/metastore"
  basic_auth = get_http_basic_auth_for_account_apis()
  response = requests.get(url = url, auth = basic_auth)
  if response.status_code == HTTPStatus.OK:
    print(SUCCESS_MESSAGE.format("Workspace-Metastore mapping fetched."))
    return response.json()["metastore_assignment"]["metastore_id"]
  else:
    error_message = response.json()["message"]
    raise Exception(f"Could not fetch the Workspace-Metastore mapping. {error_message}")

Method to get the name of the Metastore admin of the metastore

@retry(stop = stop_after_attempt(NUMBER_OF_RETRIES), wait = wait_fixed(WAIT_TIME_IN_SECONDS), reraise = True)
def get_metastore_admin_of_metastore(metastore_id):
  url = f"{ACCOUNT_URL}/api/2.0/accounts/{ACCOUNT_ID}/metastores/{metastore_id}"
  basic_auth = get_http_basic_auth_for_account_apis()
  response = requests.get(url = url, auth = basic_auth)
  if response.status_code == HTTPStatus.OK:
    print(SUCCESS_MESSAGE.format("Metastore details fetched."))
    return response.json()["metastore_info"]["owner"]
  else:
    error_message = response.json()["message"]
    raise Exception(f"Could not fetch the Metastore admin of the metastore with id {metastore_id}{error_message}")

Method to create Service Principal using API

@retry(stop = stop_after_attempt(NUMBER_OF_RETRIES), wait = wait_fixed(WAIT_TIME_IN_SECONDS), reraise = True)
def create_service_principal(service_principal_display_name):
  request_body = {
    "displayName": service_principal_display_name,
    "groups"[],
    "entitlements"[
      {
        "value""workspace-access"
      },
      {
        "value""databricks-sql-access"
      }
    ],
    "schemas"[
      "urn:ietf:params:scim:schemas:core:2.0:ServicePrincipal"
    ],
    "active"True
  }
 
  basic_auth = get_http_basic_auth_for_account_apis()
  response = requests.post(
    url = f"{WORKSPACE_URL}/api/2.0/preview/scim/v2/ServicePrincipals",
    auth = basic_auth,
    json = request_body
  )
 
  response_dict = {}
  if response.status_code == HTTPStatus.CREATED:
    print(SUCCESS_MESSAGE.format(f"Service principal {service_principal_display_name} created with the following details."))
    response_dict = response.json()
  else:
    error_message = response.json()["message"]
    raise Exception(f"Could not create the service principal with name {service_principal_display_name}{error_message}")
  return response_dict

Method to list groups and fetch metastore admin group id

@retry(stop = stop_after_attempt(NUMBER_OF_RETRIES), wait = wait_fixed(WAIT_TIME_IN_SECONDS), reraise = True)
def get_metastore_admins_group_id(metastore_admins_group_name):
  url = f"{ACCOUNT_URL}/api/2.0/accounts/{ACCOUNT_ID}/scim/v2/Groups"
  basic_auth = get_http_basic_auth_for_account_apis()
  response = requests.get(url = url, auth = basic_auth)
  response_dict = {}
  if response.status_code == HTTPStatus.OK:
    print(SUCCESS_MESSAGE.format("All groups fetched."))
    response_dict = response.json()
  else:
    error_message = response.json()["message"]
    raise Exception(f"Could not fetch the list of groups. {error_message}")
 
  groups_list = response_dict["Resources"]
  for g in groups_list:
    if g["displayName"] != metastore_admins_group_name:
      continue
    return g["id"]
  
  raise Exception(f"Could not find the {metastore_admins_group_name} group. Please create the group and continue with the 
  script by reexecuting this cell and the following ones.")
 

Method to add a member to a group

@retry(stop = stop_after_attempt(NUMBER_OF_RETRIES), wait = wait_fixed(WAIT_TIME_IN_SECONDS), reraise = True)
def add_member_to_group(group_id, member_id):
  request_body = {
    "schemas"[
      "urn:ietf:params:scim:api:messages:2.0:PatchOp"
    ],
    "Operations"[
      {
        "op""add",
        "path""members",
        "value"[
          {
            "value"f"{member_id}" 
          } 
        ]
      }
    ]
  }
 
  url = f"{ACCOUNT_URL}/api/2.0/accounts/{ACCOUNT_ID}/scim/v2/Groups/{group_id}"
  basic_auth = get_http_basic_auth_for_account_apis()
  response = requests.patch(url = url, json = request_body, auth = basic_auth)
  response_dict = {}
  if response.status_code == HTTPStatus.OK:
    print(SUCCESS_MESSAGE.format(f"Member with id {member_id} was added to group with id {group_id}"))
  else:
    error_message = response.json()["message"]
    raise Exception(f"Could not add member with id {member_id} to the group with id {group_id}{error_message}")

Method to create a warehouse

@retry(stop = stop_after_attempt(NUMBER_OF_RETRIES), wait = wait_fixed(WAIT_TIME_IN_SECONDS), reraise = True)
def create_a_warehouse(warehouse_name):
  warehouse_id = ""
 
  # ---------- check if warehouse already exists ----------
  url = f"{WORKSPACE_URL}/api/2.0/sql/warehouses"
 
  basic_auth = get_http_basic_auth_for_account_apis()
  response = requests.get(url = url, auth = basic_auth)
 
  if response.status_code == HTTPStatus.OK:
    warehouses_list = response.json()["warehouses"]
    for w in  warehouses_list:
      if w["name"] == warehouse_name:
        warehouse_id = w["id"]
        print(f"Warehouse with name {warehouse_name} already exists.")
        return warehouse_id
  else:
    error_message = response.json()["message"]
    raise Exception(f"Could not list the warehouse. {error_message}")
  
  # ---------- create the warehouse ----------
  request_body = {
    "name": warehouse_name,
    "cluster_size""2X-Small",
    "min_num_clusters"1,
    "max_num_clusters"1,
    "auto_stop_mins"60,
    "creator_name": USERNAME,
    "tags"{},
    "spot_instance_policy""COST_OPTIMIZED",
    "enable_photon"True,
    "channel"{},
    "enable_serverless_compute": True,
    "warehouse_type""CLASSIC"
  }
 
  url = f"{WORKSPACE_URL}/api/2.0/sql/warehouses"
  
  response = requests.post(url = url, auth = basic_auth, json = request_body)
  
  if response.status_code == HTTPStatus.OK:
    print(SUCCESS_MESSAGE.format(f"Warehouse {warehouse_name} created successfully."))
    warehouse_id = response.json()["id"]
  else:
    error_message = response.json()["message"]
    raise Exception(f"Could not create warehouse with the name {warehouse_name}{error_message}")
  
  return warehouse_id

Method to grant permissions to the service principal on a warehouse

@retry(stop = stop_after_attempt(NUMBER_OF_RETRIES), wait = wait_fixed(WAIT_TIME_IN_SECONDS), reraise = True)
def grant_permissions_on_a_warehouse(service_principal_application_id, warehouse_id):
  request_body = {
  "access_control_list"[
    {
      "service_principal_name": service_principal_application_id,
      "permission_level""CAN_USE"
    }
  ]
}
 
  url = f"{WORKSPACE_URL}/api/2.0/permissions/warehouses/{warehouse_id}"
 
  basic_auth = get_http_basic_auth_for_account_apis()
  response = requests.patch(url = url, auth = basic_auth, json = request_body)
 
  if response.status_code == HTTPStatus.OK:
    print(SUCCESS_MESSAGE.format(f"Service principal with application id {service_principal_application_id} was granted 
    permissions on the warehouse with id {warehouse_id}."))
  else:
    error_message = response.json()["message"]
    raise Exception(f"Could not grant permissions to the service principal with application id 
    {service_principal_application_id} on the warehouse with id {warehouse_id}{error_message}")

Method to create a catalog

@retry(stop = stop_after_attempt(NUMBER_OF_RETRIES), wait = wait_fixed(WAIT_TIME_IN_SECONDS), reraise = True)
def create_catalog(catalog_name):
  
  # ---------- check whether the catalog already exists ----------
  url = f"{WORKSPACE_URL}/api/2.1/unity-catalog/catalogs/{catalog_name}"
 
  basic_auth = get_http_basic_auth_for_account_apis()
  response = requests.get(url = url, auth = basic_auth)
  if response.status_code == HTTPStatus.OK:
    print(SUCCESS_MESSAGE.format(f"Catalog {catalog_name} already exists."))
    return
 
  # ---------- create the catalog ----------
  request_body = {
    "name": catalog_name,
    "securable_type""CATALOG",
    "securable_kind""CATALOG_STANDARD"
  }
 
 
  url = f"{WORKSPACE_URL}/api/2.1/unity-catalog/catalogs"
 
  response = requests.post(url = url, auth = basic_auth, json = request_body)
 
  if response.status_code == HTTPStatus.OK:
    print(SUCCESS_MESSAGE.format(f"Catalog {catalog_name} was created."))
  else:
    error_message = response.json()["message"]
    raise Exception(f"Could not create catalog {catalog_name}{error_message}")

Method to create schemas

@retry(stop = stop_after_attempt(NUMBER_OF_RETRIES), wait = wait_fixed(WAIT_TIME_IN_SECONDS), reraise = True)
def create_schema(schema_name, catalog_name):
  
  # ---------- check if the schema already exists ----------
  url = f"{WORKSPACE_URL}/api/2.1/unity-catalog/schemas/{catalog_name}.{schema_name}"
  
  params = {
    "catalog_name" : catalog_name
  }
  basic_auth = get_http_basic_auth_for_account_apis()
  response = requests.get(url = url, auth = basic_auth)
  if response.status_code == HTTPStatus.OK:
    print(SUCCESS_MESSAGE.format(f"Schema {catalog_name}.{schema_name} already exists."))
    return
  
  # ---------- create the schema ----------
  url = f"{WORKSPACE_URL}/api/2.1/unity-catalog/schemas"
  request_body = {
    "name": schema_name,
    "catalog_name": catalog_name
  }
  
  response = requests.post(url = url, auth = basic_auth, json = request_body)
 
  if response.status_code == HTTPStatus.OK:
    print(SUCCESS_MESSAGE.format(f"Schema {schema_name} was created in the catalog {catalog_name}."))
  else:
    error_message = response.json()["message"]
    raise Exception(f"Could not create schema {schema_name} in the catalog {catalog_name}{error_message}")

Method to create a Service Principal Secret

@retry(stop = stop_after_attempt(NUMBER_OF_RETRIES), wait = wait_fixed(WAIT_TIME_IN_SECONDS), reraise = True)
def create_service_principal_secret(service_principal_id):
  url = f"{ACCOUNT_URL}/api/2.0/accounts/{ACCOUNT_ID}/servicePrincipals/{service_principal_id}/credentials/secrets"
  basic_auth = get_http_basic_auth_for_account_apis()
  response = requests.post(url, auth = basic_auth)
  response_dict = {}
  if response.status_code == 200:
    print(SUCCESS_MESSAGE.format("Service principal Secret was created."))
    response_dict = response.json()
  else:
    error_message = response.json()["message"]
    raise Exception(f"Could not create the service principal secret. {error_message}")
  return response_dict

Method to execute grants to the Service Principal

@retry(stop = stop_after_attempt(NUMBER_OF_RETRIES), wait = wait_fixed(WAIT_TIME_IN_SECONDS), reraise = True)
def execute_sql_statement(sql_statement):
  print(f"Executing ==> {sql_statement}")
  spark.sql(sql_statement)
  print(SUCCESS_MESSAGE.format("Grants executed."))
 
@retry(stop = stop_after_attempt(NUMBER_OF_RETRIES), wait = wait_fixed(WAIT_TIME_IN_SECONDS), reraise = True)
def execute_grants_to_service_principal(application_id):
  tlx_policy_db_grants = f"GRANT USE CATALOG, CREATE FUNCTION, CREATE TABLE, USE SCHEMA, SELECT ON CATALOG {CATALOG_NAME} TO `
  {application_id}`"
  execute_sql_statement(tlx_policy_db_grants)
  
  concat_string = f"CONCAT('GRANT USE CATALOG, USE SCHEMA, SELECT, MODIFY, APPLY TAG ON CATALOG ', catalog_name, ' TO `
  {application_id}`')"
  catalog_grants_df = spark.sql(f"select {concat_string} as grant_statements from `system`.information_schema.catalogs where 
  catalog_name <> 'system';")
  catalog_grants_rows = catalog_grants_df.collect()
  
  for row in catalog_grants_rows:
    execute_sql_statement(row['grant_statements'])
  
  print(SUCCESS_MESSAGE.format("All grants were executed successfully"))

Method to enable a System Schema

@retry(stop = stop_after_attempt(NUMBER_OF_RETRIES), wait = wait_fixed(WAIT_TIME_IN_SECONDS), reraise = True)
def enable_system_schema(metastore_id, system_schema_name, application_id):
  url = f"{WORKSPACE_URL}/api/2.1/unity-catalog/metastores/{metastore_id}/systemschemas/{system_schema_name}"
  basic_auth = get_http_basic_auth_for_account_apis()
  response = requests.put(url, auth = basic_auth)
  response_dict = {}
  if response.status_code == 200:
    print(SUCCESS_MESSAGE.format(f"System Schema '{system_schema_name}' is now available."))
  elif response.json()["error_code"] == "SCHEMA_ALREADY_EXISTS":
    print(SUCCESS_MESSAGE.format(f"System Schema '{system_schema_name}' is already enabled."))
  else:
    error_message = response.json()["message"]
    raise Exception(f"System Schema '{system_schema_name}' was not found. {error_message}")
  
  grant_query = f"GRANT USE CATALOG, USE SCHEMA, SELECT ON CATALOG system TO `{application_id}`"
  execute_sql_statement(grant_query)
  
  return

Method to enable verbose logging

@retry(stop = stop_after_attempt(NUMBER_OF_RETRIES), wait = wait_fixed(WAIT_TIME_IN_SECONDS), reraise = True)
def enable_verbose_logging():
  url = f"{WORKSPACE_URL}/api/2.0/workspace-conf"
  basic_auth = get_http_basic_auth_for_account_apis()
  request_body = {
    "enableVerboseAuditLogs""true"
  }
  response = requests.patch(url, auth = basic_auth, json = request_body)
  if response.status_code == 204:
    print(SUCCESS_MESSAGE.format(f"Verbose logging has been enabled."))
  else:
    error_message = response.json()["message"]
    raise Exception(f"Could not enable verbose logging. {error_message}")
  return

Create a Service Principal, warehouse and grant permissions

Get workspace id of the current deployment name

workspace_id = get_all_workspaces(DEPLOYMENT_NAME)
print(f"WORKSPACE_ID of the deployment {DEPLOYMENT_NAME} is {workspace_id}")

Get the metastore id of the resultant workspace id

metastore_id = get_metastore_id_of_workspace(workspace_id)
print(f"METASTORE_ID of the workspace with id {workspace_id} is {metastore_id}")

Get Metastore admin name of the metastore

metastore_admins_group_name = get_metastore_admin_of_metastore(metastore_id)
print(f"Metastore admin of the metastore with id {metastore_id} is {metastore_admins_group_name}")

Fetch the id of the metastore_admins_group

metastore_admins_group_id = get_metastore_admins_group_id(metastore_admins_group_name)
print(f"Metastore admins group Id : {metastore_admins_group_id}")

Create a Service Principal

# ---------- Creation of Service Principal ----------
created_service_principal = create_service_principal(SERVICE_PRINCIPAL_DISPLAY_NAME)
 
# ---------- Get the id and applicationId ----------
created_service_principal_id = created_service_principal["id"]
created_service_principal_application_id = created_service_principal["applicationId"]
 
# ---------- Display as a dataframe ----------
created_service_principal_groups = ", ".join([g["value"] for g in created_service_principal["groups"]])
created_service_principal_entitlements = ", ".join([e["value"] for e in created_service_principal["entitlements"]])
created_service_principal_df_data = [
  ["id", created_service_principal["id"]],
  ["applicationId", created_service_principal["applicationId"]],
  ["displayName", created_service_principal["displayName"]],
  ["groups", created_service_principal_groups],
  ["entitlements", created_service_principal_entitlements],
  ["active", created_service_principal["active"]]
]
 
created_service_principal_df = spark.createDataFrame(data = created_service_principal_df_data, schema = DF_SCHEMA)
display(created_service_principal_df)

Add Service Principal to metastore_admins_group

# ---------- Add Service Principal to metastore_admins_group ----------
add_member_to_group(metastore_admins_group_id, created_service_principal_id)

Create a warehouse

# ---------- Create a warehouse ----------
warehouse_id = create_a_warehouse(WAREHOUSE_NAME)
print(f"WAREHOUSE ID: {warehouse_id}")

Grant permissions to the service principal on a warehouse

#  ---------- Grant permissions to the service principal on the created warehouse warehouse ----------
grant_permissions_on_a_warehouse(created_service_principal_application_id, warehouse_id)

Create a catalog

create_catalog(CATALOG_NAME)

Create schemas

for schema_name in SCHEMAS:
  create_schema(schema_name, CATALOG_NAME)

Create Service Principal Secret

# Display as a dataframe
created_service_principal_secret_response = create_service_principal_secret(created_service_principal_id)
if (created_service_principal_secret_response != {}):
  created_service_principal_secret_df_data = [
    ["id", created_service_principal_secret_response["id"]],
    ["secret", created_service_principal_secret_response["secret"]],
    ["secret_hash", created_service_principal_secret_response["secret_hash"]],
    ["create_time", created_service_principal_secret_response["create_time"]],
    ["update_time", created_service_principal_secret_response["update_time"]],
    ["status", created_service_principal_secret_response["status"]],
  ]
  created_service_principal_secret_df = spark.createDataFrame(data = created_service_principal_secret_df_data, schema = 
  DF_SCHEMA)
  display(created_service_principal_secret_df)

Execute grants to the Service Principal

execute_grants_to_service_principal(created_service_principal_application_id)

Enable 'access' System-Schema

enable_system_schema(metastore_id, "access", created_service_principal_application_id)

Enable verbose logging

enable_verbose_logging()

Result

# ---------- Display as a dataframe ----------
 
final_result_df_data = [
  ["Workspace Deployment Name", DEPLOYMENT_NAME],
  ["SQL Warehouse Identifier", warehouse_id],
  ["Trustlogix Service Principal Name", created_service_principal["displayName"]],
  ["Trustlogix Client ID", created_service_principal_application_id],
  ["Trustlogix Client Secret", created_service_principal_secret_response["secret"]]
]
 
final_result_df = spark.createDataFrame(data = final_result_df_data, schema = DF_SCHEMA)
display(final_result_df)