Compare commits

...

13 Commits

121
main.py
View File

@ -1,5 +1,7 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import json
from dolibarrpy import Dolibarrpy from dolibarrpy import Dolibarrpy
from flask import Flask, abort, request from flask import Flask, abort, request
from ldap3 import ALL, Connection, ObjectDef, Reader, Server, WritableEntry, Writer from ldap3 import ALL, Connection, ObjectDef, Reader, Server, WritableEntry, Writer
@ -22,10 +24,12 @@ def main():
def manage_users_extra_fields(ldap_conn: Connection, dolibarr_client: Dolibarrpy): def manage_users_extra_fields(ldap_conn: Connection, dolibarr_client: Dolibarrpy):
dolibarr_users = dolibarr_client.find_all_users() dolibarr_users = dolibarr_client.find_all_users()
for dolibarr_user in dolibarr_users: for dolibarr_user in dolibarr_users:
manage_user_extra_fields(ldap_conn, dolibarr_user) manage_user_extra_fields(ldap_conn, dolibarr_user, dolibarr_client)
def manage_user_extra_fields(ldap_conn: Connection, dolibarr_user: dict): def manage_user_extra_fields(ldap_conn: Connection, dolibarr_user: dict, dolibarr_client: Dolibarrpy, /,
manage_user_attrs: bool = True, manage_group_attrs: bool = True,
oldgroupid: int | None = None, newgroupid: int | None = None, new_group: dict | None = None):
login = dolibarr_user['login'] login = dolibarr_user['login']
obj_inetorgperson = ObjectDef(['top', 'inetOrgPerson', 'posixAccount'], ldap_conn) obj_inetorgperson = ObjectDef(['top', 'inetOrgPerson', 'posixAccount'], ldap_conn)
obj_user = ObjectDef(['top', 'inetOrgPerson', 'posixAccount'] + config.LDAP_GROUPS_EXTRA_OBJECT_CLASSES, ldap_conn) obj_user = ObjectDef(['top', 'inetOrgPerson', 'posixAccount'] + config.LDAP_GROUPS_EXTRA_OBJECT_CLASSES, ldap_conn)
@ -55,7 +59,10 @@ def manage_user_extra_fields(ldap_conn: Connection, dolibarr_user: dict):
users_reader.search() users_reader.search()
users_writer = Writer.from_cursor(users_reader, object_def=obj_user) users_writer = Writer.from_cursor(users_reader, object_def=obj_user)
ldap_user = users_writer[0] ldap_user = users_writer[0]
if manage_user_attrs:
append_extra_fields_to_ldap_user(ldap_user, dolibarr_user) append_extra_fields_to_ldap_user(ldap_user, dolibarr_user)
if manage_group_attrs:
append_extra_group_fields_to_ldap_user(ldap_user, dolibarr_user, dolibarr_client, oldgroupid=oldgroupid, newgroupid=newgroupid, new_group=new_group)
users_writer.commit() users_writer.commit()
@ -65,14 +72,77 @@ def append_extra_fields_to_ldap_user(ldap_user: WritableEntry, dolibarr_user: di
ldap_user.objectClass += extra_object_class ldap_user.objectClass += extra_object_class
for extra_field in config.LDAP_USERS_EXTRA_FIELDS: for extra_field in config.LDAP_USERS_EXTRA_FIELDS:
dolibarr_attr, ldap_attr = extra_field.split(':') dolibarr_attrs, ldap_attr = extra_field.split('::')
dolibarr_attrs = dolibarr_attrs.split('+')
values = []
if dolibarr_attrs[0].startswith('GROUP__'):
continue
for dolibarr_attr in dolibarr_attrs:
if dolibarr_attr.endswith('[]'): if dolibarr_attr.endswith('[]'):
dolibarr_attr = dolibarr_attr[:-2] dolibarr_attr = dolibarr_attr[:-2]
value = dolibarr_user['array_options'][f'options_{dolibarr_attr}'] value = dolibarr_user.get(dolibarr_attr, None) or dolibarr_user['array_options'].get(f'options_{dolibarr_attr}', None)
value = value.split() if value else [] value = value.split() if value else []
setattr(ldap_user, ldap_attr, value) values += value
else: else:
value = dolibarr_user['array_options'][f'options_{dolibarr_attr}'] or "" if dolibarr_attr.startswith("'") and dolibarr_attr.endswith("'"):
value = dolibarr_attr[1:-1]
else:
value = dolibarr_user.get(dolibarr_attr, None) or dolibarr_user['array_options'].get(f'options_{dolibarr_attr}', None)
if value:
values.append(value)
if ldap_attr.endswith('[]'):
ldap_attr = ldap_attr[:-2]
value = values
else:
value = "".join(map(str, values))
setattr(ldap_user, ldap_attr, value)
def append_extra_group_fields_to_ldap_user(ldap_user: WritableEntry, dolibarr_user: dict, dolibarr_client: Dolibarrpy, /,
oldgroupid: int | None = None, newgroupid: int | None = None, new_group: dict | None = None):
if not any(dolibarr_attr.startswith('GROUP')
for extra_field in config.LDAP_USERS_EXTRA_FIELDS
for dolibarr_attr in extra_field.split('::')[0].split('|')):
return
user_id = dolibarr_user['id']
dolibarr_groups: list[dict] = dolibarr_client.get_user_groups_uid(user_id)
if not isinstance(dolibarr_groups, list):
dolibarr_groups = []
if oldgroupid:
dolibarr_groups = [group for group in dolibarr_groups if group['id'] != oldgroupid]
if newgroupid:
dolibarr_groups.append(dolibarr_client.call_get_api('users/groups', newgroupid))
if new_group:
dolibarr_groups = [group for group in dolibarr_groups if group['id'] != new_group['id']] + [new_group]
if not dolibarr_groups:
return
for extra_field in config.LDAP_USERS_EXTRA_FIELDS:
dolibarr_attrs, ldap_attr = extra_field.split('::')
dolibarr_attrs = dolibarr_attrs.split('+')
values = []
if not dolibarr_attrs[0].startswith('GROUP__'):
continue
for dolibarr_attr in dolibarr_attrs:
dolibarr_attr = dolibarr_attr[7:]
for dolibarr_group in dolibarr_groups:
if dolibarr_attr.endswith('[]'):
attr = dolibarr_attr[:-2]
value = dolibarr_group.get(attr, None) or dolibarr_group['array_options'].get(f'options_{attr}', None)
value = value.split() if value else []
values += value
else:
value = dolibarr_group.get(dolibarr_attr, None) or dolibarr_group['array_options'].get(f'options_{dolibarr_attr}', None)
if value:
values.append(value)
break # Don't concatenate the value for multiple groups
if ldap_attr.endswith('[]'):
ldap_attr = ldap_attr[:-2]
value = values
else:
value = "".join(map(str, values))
setattr(ldap_user, ldap_attr, value) setattr(ldap_user, ldap_attr, value)
@ -112,14 +182,27 @@ def append_extra_fields_to_ldap_group(ldap_group: WritableEntry, dolibarr_group:
ldap_group.objectClass += extra_object_class ldap_group.objectClass += extra_object_class
for extra_field in config.LDAP_GROUPS_EXTRA_FIELDS: for extra_field in config.LDAP_GROUPS_EXTRA_FIELDS:
dolibarr_attr, ldap_attr = extra_field.split(':') dolibarr_attrs, ldap_attr = extra_field.split('::')
dolibarr_attrs = dolibarr_attrs.split('+')
values = []
for dolibarr_attr in dolibarr_attrs:
if dolibarr_attr.endswith('[]'): if dolibarr_attr.endswith('[]'):
dolibarr_attr = dolibarr_attr[:-2] dolibarr_attr = dolibarr_attr[:-2]
value = dolibarr_group['array_options'][f'options_{dolibarr_attr}'] value = dolibarr_group.get(dolibarr_attr, None) or dolibarr_group['array_options'].get(f'options_{dolibarr_attr}', None)
value = value.split() if value else [] value = value.split() if value else []
setattr(ldap_group, ldap_attr, value) values += value
else: else:
value = dolibarr_group['array_options'][f'options_{dolibarr_attr}'] or "" if dolibarr_attr.startswith("'") and dolibarr_attr.endswith("'"):
value = dolibarr_attr[1:-1]
else:
value = dolibarr_group.get(dolibarr_attr, None) or dolibarr_group['array_options'].get(f'options_{dolibarr_attr}', None)
if value:
values.append(value)
if ldap_attr.endswith('[]'):
ldap_attr = ldap_attr[:-2]
value = values
else:
value = "".join(map(str, values))
setattr(ldap_group, ldap_attr, value) setattr(ldap_group, ldap_attr, value)
@ -132,13 +215,27 @@ def webhook_receiver():
abort(400) abort(400)
triggercode = data['triggercode'] triggercode = data['triggercode']
obj = data['object'] obj = data['object']
if config.DOLIBARR_API_DEBUG:
print("Received webhook trigger of type", triggercode, "with content:")
print(json.dumps(obj, indent=4))
dolibarr_client = Dolibarrpy(url=config.DOLIBARR_API_BASE, token=config.DOLIBARR_API_TOKEN, timeout=16, debug=config.DOLIBARR_API_DEBUG)
ldap_server = Server(config.LDAP_HOST, config.LDAP_PORT, get_info=ALL) ldap_server = Server(config.LDAP_HOST, config.LDAP_PORT, get_info=ALL)
if triggercode.startswith('USER_'): if triggercode.startswith('USER_'):
oldgid, newgid = None, None
if 'context' in obj and obj['context']:
audit = obj['context']['audit']
if audit == "UserSetInGroup":
newgid = obj['context']['newgroupid']
elif audit == "UserRemovedFromGroup":
oldgid = obj['context']['oldgroupid']
with Connection(ldap_server, config.LDAP_BIND_USER, config.LDAP_BIND_PASSWORD) as ldap_conn: with Connection(ldap_server, config.LDAP_BIND_USER, config.LDAP_BIND_PASSWORD) as ldap_conn:
manage_user_extra_fields(ldap_conn, obj) manage_user_extra_fields(ldap_conn, obj, dolibarr_client, oldgroupid=oldgid, newgroupid=newgid)
elif triggercode.startswith('GROUP_'): elif triggercode.startswith('USERGROUP_'):
with Connection(ldap_server, config.LDAP_BIND_USER, config.LDAP_BIND_PASSWORD) as ldap_conn: with Connection(ldap_server, config.LDAP_BIND_USER, config.LDAP_BIND_PASSWORD) as ldap_conn:
manage_group_extra_fields(ldap_conn, obj) manage_group_extra_fields(ldap_conn, obj)
group_members = obj['members'] or dict()
for group_member in group_members.values():
manage_user_extra_fields(ldap_conn, group_member, dolibarr_client, manage_user_attrs=False, new_group=obj)
else: else:
abort(400) abort(400)
return "", 204 return "", 204