Skip to content
Snippets Groups Projects

Feature/assign ce admin

Merged Benjamí Ramos requested to merge feature/assign_ce_admin into dev
1 file
+ 108
77
Compare changes
  • Side-by-side
  • Inline
@@ -61,12 +61,16 @@ class ResUsers(models.Model):
# already sync'ed somewhere else
continue
keycloak_user = self._get_or_create_kc_user(token, provider_id, user)
keycloak_key = self._LOGIN_MATCH_KEY.split(':')[0]
keycloak_login_provider = self.env.ref('energy_communities.keycloak_login_provider')
user.update({
'oauth_uid': keycloak_user[keycloak_key],
'oauth_provider_id': keycloak_login_provider.id,
})
keycloak_key = self._LOGIN_MATCH_KEY.split(":")[0]
keycloak_login_provider = self.env.ref(
"energy_communities.keycloak_login_provider"
)
user.update(
{
"oauth_uid": keycloak_user[keycloak_key],
"oauth_provider_id": keycloak_login_provider.id,
}
)
# action = self.env.ref('base.action_res_users').read()[0]
# action['domain'] = [('id', 'in', self.user_ids.ids)]
logger.debug("Create keycloak users STOP")
@@ -122,9 +126,10 @@ class ResUsers(models.Model):
:param token: valid auth token from Keycloak
:param odoo_user: res.users record
"""
odoo_key = self._LOGIN_MATCH_KEY.split(':')[1]
odoo_key = self._LOGIN_MATCH_KEY.split(":")[1]
keycloak_user = self._get_kc_users(
token, provider_id, search=odoo_user.mapped(odoo_key)[0])
token, provider_id, search=odoo_user.mapped(odoo_key)[0]
)
if keycloak_user:
if len(keycloak_user) > 1:
# TODO: warn user?
@@ -138,10 +143,10 @@ class ResUsers(models.Model):
def _create_user_values(self, odoo_user):
"""Prepare Keycloak values for given Odoo user."""
values = {
'username': odoo_user.login,
'email': odoo_user.partner_id.email,
'attributes': {'lang': [odoo_user.lang]},
'enabled': True,
"username": odoo_user.login,
"email": odoo_user.partner_id.email,
"attributes": {"lang": [odoo_user.lang]},
"enabled": True,
}
if "firstname" in odoo_user.partner_id:
@@ -186,90 +191,118 @@ class ResUsers(models.Model):
self._validate_response(resp, no_json=True)
# yes, Keycloak sends back NOTHING on create
# so we are forced to do anothe call to get its data :(
return self._get_kc_users(token, provider_id, search=data['username'])[0]
return self._get_kc_users(token, provider_id, search=data["username"])[0]
def get_role_codes(self):
# TODO Map all code to company and enable (We should update the API schema too)
return self.role_line_ids[0].role_id.code
def send_reset_password_mail(self):
provider_id = self.env.ref('energy_communities.keycloak_admin_provider')
provider_id = self.env.ref("energy_communities.keycloak_admin_provider")
provider_id.validate_admin_provider()
headers = {
'Authorization': 'Bearer %s' % self._get_admin_token(provider_id)
}
headers['Content-Type'] = "application/json"
headers = {"Authorization": "Bearer %s" % self._get_admin_token(provider_id)}
headers["Content-Type"] = "application/json"
endpoint = provider_id.reset_password_endpoint.format(
kc_uid = self.oauth_uid
endpoint = provider_id.reset_password_endpoint.format(kc_uid=self.oauth_uid)
response = requests.put(
endpoint, headers=headers, data='["UPDATE_PASSWORD", "VERIFY_EMAIL"]'
)
response = requests.put(endpoint, headers=headers, data='["UPDATE_PASSWORD", "VERIFY_EMAIL"]')
if response.status_code != 204:
raise exceptions.UserError(
_('Something went wrong. Mail can not be sended. More details: {}').format(response.json())
_(
"Something went wrong. Mail can not be sended. More details: {}"
).format(response.json())
)
def make_internal_user(self):
already_user = self.env["res.users.role.line"].search([
("user_id.id", "=", self.id),
("active", "=", True),
("role_id.code", "=", "role_internal_user")
])
already_user = self.env["res.users.role.line"].search(
[
("user_id.id", "=", self.id),
("active", "=", True),
("role_id.code", "=", "role_internal_user"),
]
)
if not already_user:
role = self.env["res.users.role"].search([(
"code", "=", "role_internal_user"
)])
self.write({'role_line_ids': [(0, 0, {
'user_id': self.id,
'active': True,
'role_id': role.id,
})]})
role = self.env["res.users.role"].search(
[("code", "=", "role_internal_user")]
)
self.write(
{
"role_line_ids": [
(
0,
0,
{
"user_id": self.id,
"active": True,
"role_id": role.id,
},
)
]
}
)
def make_ce_user(self, company_id, role_name):
role = self.env["res.users.role"].search([(
"code", "=", role_name
)])
current_role = self.env["res.users.role.line"].search([
("user_id", "=", self.id),
("active", "=", True),
("company_id", "=", company_id)
])
role = self.env["res.users.role"].search([("code", "=", role_name)])
current_role = self.env["res.users.role.line"].search(
[
("user_id", "=", self.id),
("active", "=", True),
("company_id", "=", company_id),
]
)
if current_role:
current_role.write({"role_id": role})
current_role.write({"role_id": role})
else:
self.write({
"company_ids": [(4, company_id)],
"role_line_ids": [(0, 0, {
'user_id': self.id,
'active': True,
'role_id': role.id,
'company_id': company_id,
})]
})
self.write(
{
"company_ids": [(4, company_id)],
"role_line_ids": [
(
0,
0,
{
"user_id": self.id,
"active": True,
"role_id": role.id,
"company_id": company_id,
},
)
],
}
)
def make_coord_user(self, company_id, role_name):
role = self.env["res.users.role"].search([(
"code", "=", role_name
)])
current_role = self.env["res.users.role.line"].search([
("user_id", "=", self.id),
("active", "=", True),
("company_id", "=", company_id)
])
role = self.env["res.users.role"].search([("code", "=", role_name)])
current_role = self.env["res.users.role.line"].search(
[
("user_id", "=", self.id),
("active", "=", True),
("company_id", "=", company_id),
]
)
if current_role:
current_role.write({"role_id": role})
current_role.write({"role_id": role})
else:
self.write({
"company_ids": [(4, company_id)],
"role_line_ids": [(0, 0, {
'user_id': self.id,
'active': True,
'role_id': role.id,
'company_id': company_id,
})]
})
self.write(
{
"company_ids": [(4, company_id)],
"role_line_ids": [
(
0,
0,
{
"user_id": self.id,
"active": True,
"role_id": role.id,
"company_id": company_id,
},
)
],
}
)
company = self.env["res.company"].browse(company_id)
child_companies = company.get_child_companies()
@@ -277,14 +310,12 @@ class ResUsers(models.Model):
self.make_ce_user(child_company.id, "role_ce_manager")
def add_energy_community_role(self, company_id, role_name):
if role_name == 'role_ce_member' or role_name == 'role_ce_admin':
if role_name == "role_ce_member" or role_name == "role_ce_admin":
self.make_ce_user(company_id, role_name)
elif role_name == 'role_coord_worker' or role_name == 'role_coord_admin':
elif role_name == "role_coord_worker" or role_name == "role_coord_admin":
self.make_coord_user(company_id, role_name)
else:
raise exceptions.UserError(
_('Role not found')
)
raise exceptions.UserError(_("Role not found"))
def create_energy_community_base_user(
cls, vat, first_name, last_name, lang_code, email
@@ -296,7 +327,7 @@ class ResUsers(models.Model):
"lang": lang_code,
"email": email,
}
user = cls.create(vals)
user = cls.sudo().with_context(no_reset_password=True).create(vals)
user.make_internal_user()
user.create_users_on_keycloak()
Loading