Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 124 additions & 24 deletions src/firetower/auth/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from django.core.exceptions import ValidationError
from django.core.validators import URLValidator, validate_email

from firetower.auth.models import ExternalProfile, ExternalProfileType
from firetower.integrations.services import SlackService

logger = logging.getLogger(__name__)
Expand All @@ -12,7 +13,7 @@

def sync_user_profile_from_slack(user: User) -> bool:
"""
Sync a user's profile (name, avatar) from Slack.
Sync a user's profile (name, avatar, Slack ID) from Slack.

Args:
user: User instance to sync
Expand All @@ -30,6 +31,7 @@ def sync_user_profile_from_slack(user: User) -> bool:
logger.info(f"No Slack profile found for {user.email}")
return False

slack_user_id = slack_profile.get("slack_user_id", "")
first_name = slack_profile.get("first_name", "")
last_name = slack_profile.get("last_name", "")
avatar_url = slack_profile.get("avatar_url", "")
Expand All @@ -46,11 +48,9 @@ def sync_user_profile_from_slack(user: User) -> bool:
user.save()
logger.info(f"Updated profile from Slack for {user.email}")

# Update avatar from Slack profile
if avatar_url and hasattr(user, "userprofile"):
profile = user.userprofile
if profile.avatar_url != avatar_url:
# Validate avatar URL (HTTPS only for security)
try:
URLValidator(schemes=["https"])(avatar_url)
profile.avatar_url = avatar_url
Expand All @@ -60,9 +60,101 @@ def sync_user_profile_from_slack(user: User) -> bool:
except ValidationError:
logger.warning(f"Invalid or insecure avatar URL: {avatar_url}")

if slack_user_id:
external_profile, created = ExternalProfile.objects.get_or_create(
user=user,
type=ExternalProfileType.SLACK,
defaults={"external_id": slack_user_id},
)
if not created and external_profile.external_id != slack_user_id:
external_profile.external_id = slack_user_id
external_profile.save()
logger.info(f"Updated Slack ID for user {user.email}")
needs_save = True
elif created:
logger.info(f"Created Slack ExternalProfile for user {user.email}")
needs_save = True

return needs_save


def get_or_create_user_from_slack_id(slack_user_id: str) -> User | None:
"""
Get or create a Django user from Slack user ID.

Args:
slack_user_id: Slack user ID (e.g., U12345678)

Returns:
User instance or None if Slack API fails

Note:
This function optimizes API calls by first checking if the user
already exists via ExternalProfile lookup. Only calls Slack API
if the user doesn't exist locally.
"""
if not slack_user_id:
logger.warning("Cannot get/create user - no Slack user ID provided")
return None

try:
external_profile = ExternalProfile.objects.get(
type=ExternalProfileType.SLACK,
external_id=slack_user_id,
)
logger.info(f"Found existing user for Slack ID: {slack_user_id}")
return external_profile.user
except ExternalProfile.DoesNotExist:
pass

slack_user_info = _slack_service.get_user_info(slack_user_id)

if not slack_user_info:
logger.warning(f"Could not fetch user info from Slack for ID: {slack_user_id}")
return None

email = slack_user_info.get("email", "")
first_name = slack_user_info.get("first_name", "")
last_name = slack_user_info.get("last_name", "")
avatar_url = slack_user_info.get("avatar_url", "")

if not email:
logger.warning(f"Slack user {slack_user_id} has no email, cannot create user")
return None

username = f"slack:{slack_user_id}"

user = User.objects.create(
username=username,
email=email,
first_name=first_name[:150],
last_name=last_name[:150],
is_active=True,
)
user.set_unusable_password()
user.save()

if avatar_url:
try:
URLValidator(schemes=["https"])(avatar_url)
user.userprofile.avatar_url = avatar_url
user.userprofile.save()
except ValidationError:
logger.warning(f"Invalid avatar URL for Slack user {slack_user_id}")

ExternalProfile.objects.create(
user=user,
type=ExternalProfileType.SLACK,
external_id=slack_user_id,
)

logger.info(
f"Created new user from Slack: {email} (Slack ID: {slack_user_id}, username: {username})"
)

return user


def get_or_create_user_from_iap(iap_user_id: str, email: str) -> User:
"""
Get or create a Django user from IAP authentication.
Expand All @@ -78,46 +170,54 @@ def get_or_create_user_from_iap(iap_user_id: str, email: str) -> User:
ValueError: If email or iap_user_id is missing

Note:
Name and avatar are fetched from Slack only on user creation.
Use the "Sync with Slack" admin action to update existing users.
If a Slack-provisioned user exists with matching email, it will be
merged by updating the username from "slack:{slack_id}" to the IAP ID.
"""
if not iap_user_id or not email:
raise ValueError("IAP user ID and email are required for user creation")

# Validate email format
try:
validate_email(email)
except ValidationError:
raise ValueError(f"Invalid email format: {email}")

# Validate IAP user ID length (Django username field max_length is 150)
if len(iap_user_id) > 150:
raise ValueError(
f"IAP user ID exceeds maximum length of 150 characters: {len(iap_user_id)}"
)

user, created = User.objects.get_or_create(
username=iap_user_id,
defaults={
"email": email,
"is_active": True,
},
)

if created:
user.set_unusable_password()
user.save()
logger.info(f"Created new user from IAP: {email} (IAP ID: {iap_user_id})")

# Fetch profile from Slack on user creation only
sync_user_profile_from_slack(user)
else:
# For existing users, only update email if changed
try:
user = User.objects.get(username=iap_user_id)
if user.email != email:
logger.info(
f"Updated email for user {iap_user_id}: {user.email} -> {email}"
)
user.email = email
user.save()
return user
except User.DoesNotExist:
pass

slack_user = User.objects.filter(email=email, username__startswith="slack:").first()

if slack_user:
old_username = slack_user.username
slack_user.username = iap_user_id
slack_user.save()
logger.info(
f"Merged Slack-provisioned user {email}: {old_username} -> {iap_user_id}"
)
return slack_user

user = User.objects.create(
username=iap_user_id,
email=email,
is_active=True,
)
user.set_unusable_password()
user.save()
logger.info(f"Created new user from IAP: {email} (IAP ID: {iap_user_id})")

sync_user_profile_from_slack(user)

return user
63 changes: 63 additions & 0 deletions src/firetower/auth/tests/test_iap_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from google.auth import exceptions as google_auth_exceptions

from firetower.auth.middleware import IAPAuthenticationMiddleware
from firetower.auth.models import ExternalProfile, ExternalProfileType
from firetower.auth.services import get_or_create_user_from_iap
from firetower.auth.validators import IAPTokenValidator

Expand Down Expand Up @@ -109,6 +110,68 @@ def test_raises_error_for_iap_id_too_long(self):
email="[email protected]",
)

def test_merges_slack_provisioned_user_on_first_iap_login(self):
slack_user = User.objects.create_user(
username="slack:U12345",
email="[email protected]",
first_name="John",
last_name="Doe",
)
ExternalProfile.objects.create(
user=slack_user,
type=ExternalProfileType.SLACK,
external_id="U12345",
)

iap_user = get_or_create_user_from_iap(
iap_user_id="accounts.google.com:67890",
email="[email protected]",
)

assert iap_user.id == slack_user.id
assert iap_user.username == "accounts.google.com:67890"
assert iap_user.email == "[email protected]"
assert iap_user.first_name == "John"
assert iap_user.last_name == "Doe"

slack_profile = ExternalProfile.objects.get(
user=iap_user, type=ExternalProfileType.SLACK
)
assert slack_profile.external_id == "U12345"

assert User.objects.count() == 1

def test_creates_new_user_if_no_slack_user_with_email(self):
User.objects.create_user(
username="slack:U12345",
email="[email protected]",
)

new_user = get_or_create_user_from_iap(
iap_user_id="accounts.google.com:67890",
email="[email protected]",
)

assert new_user.username == "accounts.google.com:67890"
assert new_user.email == "[email protected]"
assert User.objects.count() == 2

def test_merge_preserves_user_profile_and_avatar(self):
slack_user = User.objects.create_user(
username="slack:U12345",
email="[email protected]",
)
slack_user.userprofile.avatar_url = "https://example.com/avatar.jpg"
slack_user.userprofile.save()

iap_user = get_or_create_user_from_iap(
iap_user_id="accounts.google.com:67890",
email="[email protected]",
)

assert iap_user.id == slack_user.id
assert iap_user.userprofile.avatar_url == "https://example.com/avatar.jpg"


class TestIAPTokenValidator:
def test_extract_user_info(self):
Expand Down
Loading