Skip to content
Snippets Groups Projects

Feature/assign ce admin

Merged Benjamí Ramos requested to merge feature/assign_ce_admin into dev
3 files
+ 35
16
Compare changes
  • Side-by-side
  • Inline
Files
3
@@ -60,7 +60,7 @@ class ResUsers(models.Model):
if user.oauth_uid:
# already sync'ed somewhere else
continue
keycloak_user = self._get_or_create_user(token, provider_id, user)
keycloak_user = self._get_or_create_kc_user(token, provider_id, user)
keycloak_key = self._LOGIN_MATCH_KEY.split(":")[0]
keycloak_login_provider = self.env.ref(
"energy_communities.keycloak_login_provider"
@@ -76,7 +76,7 @@ class ResUsers(models.Model):
logger.debug("Create keycloak users STOP")
return True
def _get_users(self, token, provider_id, **params):
def _get_kc_users(self, token, provider_id, **params):
"""Retrieve users from Keycloak.
:param token: a valida auth token from Keycloak
@@ -120,14 +120,14 @@ class ResUsers(models.Model):
except JSONDecodeError:
raise exceptions.UserError(_("Something went wrong. Please check logs."))
def _get_or_create_user(self, token, provider_id, odoo_user):
def _get_or_create_kc_user(self, token, provider_id, odoo_user):
"""Lookup for given user on Keycloak: create it if missing.
:param token: valid auth token from Keycloak
:param odoo_user: res.users record
"""
odoo_key = self._LOGIN_MATCH_KEY.split(":")[1]
keycloak_user = self._get_users(
keycloak_user = self._get_kc_users(
token, provider_id, search=odoo_user.mapped(odoo_key)[0]
)
if keycloak_user:
@@ -137,7 +137,7 @@ class ResUsers(models.Model):
return keycloak_user[0]
else:
values = self._create_user_values(odoo_user)
keycloak_user = self._create_user(token, provider_id, **values)
keycloak_user = self._create_kc_user(token, provider_id, **values)
return keycloak_user
def _create_user_values(self, odoo_user):
@@ -146,6 +146,7 @@ class ResUsers(models.Model):
"username": odoo_user.login,
"email": odoo_user.partner_id.email,
"attributes": {"lang": [odoo_user.lang]},
"enabled": True,
}
if "firstname" in odoo_user.partner_id:
@@ -175,7 +176,7 @@ class ResUsers(models.Model):
firstname, lastname = name_parts[0], " ".join(name_parts[1:])
return firstname, lastname
def _create_user(self, token, provider_id, **data):
def _create_kc_user(self, token, provider_id, **data):
"""Create a user on Keycloak w/ given data."""
logger.info("CREATE Calling %s" % provider_id.admin_user_endpoint)
headers = {
@@ -190,8 +191,146 @@ class ResUsers(models.Model):
self._validate_response(resp, no_json=True)
# yes, Keycloak sends back NOTHING on create
# so we are forced to do anothe call to get its data :(
return self._get_users(token, provider_id, search=data["username"])[0]
return self._get_kc_users(token, provider_id, search=data["username"])[0]
def get_role_codes(self):
# TODO Map all code to company and enable (We should update the API schema too)
return self.role_line_ids[0].role_id.code
def send_reset_password_mail(self):
provider_id = self.env.ref("energy_communities.keycloak_admin_provider")
provider_id.validate_admin_provider()
headers = {"Authorization": "Bearer %s" % self._get_admin_token(provider_id)}
headers["Content-Type"] = "application/json"
if provider_id.reset_password_endpoint:
endpoint = provider_id.reset_password_endpoint.format(kc_uid=self.oauth_uid)
response = requests.put(
endpoint, headers=headers, data='["UPDATE_PASSWORD", "VERIFY_EMAIL"]'
)
else:
raise exceptions.UserError(_("Reset password url is not set."))
if response.status_code != 204:
raise exceptions.UserError(
_(
"Something went wrong. Mail can not be sended. More details: {}"
).format(response.json())
)
def make_internal_user(self):
already_user = self.env["res.users.role.line"].search(
[
("user_id.id", "=", self.id),
("active", "=", True),
("role_id.code", "=", "role_internal_user"),
]
)
if not already_user:
role = self.env.ref("energy_communities.role_internal_user")
self.write(
{
"role_line_ids": [
(
0,
0,
{
"user_id": self.id,
"active": True,
"role_id": role.id,
},
)
]
}
)
def make_ce_user(self, company_id, role_name):
role = self.env["res.users.role"].search([("code", "=", role_name)])
current_role = self.env["res.users.role.line"].search(
[
("user_id", "=", self.id),
("active", "=", True),
("company_id", "=", company_id),
]
)
if current_role:
current_role.write({"role_id": role})
else:
self.write(
{
"company_ids": [(4, company_id)],
"role_line_ids": [
(
0,
0,
{
"user_id": self.id,
"active": True,
"role_id": role.id,
"company_id": company_id,
},
)
],
}
)
def make_coord_user(self, company_id, role_name):
role = self.env["res.users.role"].search([("code", "=", role_name)])
current_role = self.env["res.users.role.line"].search(
[
("user_id", "=", self.id),
("active", "=", True),
("company_id", "=", company_id),
]
)
if current_role:
current_role.write({"role_id": role})
else:
self.write(
{
"company_ids": [(4, company_id)],
"role_line_ids": [
(
0,
0,
{
"user_id": self.id,
"active": True,
"role_id": role.id,
"company_id": company_id,
},
)
],
}
)
company = self.env["res.company"].browse(company_id)
child_companies = company.get_child_companies()
for child_company in child_companies:
self.make_ce_user(child_company.id, "role_ce_manager")
def add_energy_community_role(self, company_id, role_name):
if role_name == "role_ce_member" or role_name == "role_ce_admin":
self.make_ce_user(company_id, role_name)
elif role_name == "role_coord_worker" or role_name == "role_coord_admin":
self.make_coord_user(company_id, role_name)
else:
raise exceptions.UserError(_("Role not found"))
def create_energy_community_base_user(
cls, vat, first_name, last_name, lang_code, email
):
vals = {
"login": vat,
"firstname": first_name,
"lastname": last_name,
"lang": lang_code,
"email": email,
}
user = cls.sudo().with_context(no_reset_password=True).create(vals)
user.make_internal_user()
user.create_users_on_keycloak()
user.send_reset_password_mail()
return user
Loading