Python: Active Directory

Создадим директорию проекта, библиотеки и собственно сами файлы:

mkdir ad
cd ad
mkdir lib
touch test.py
touch lib/adapi.py

В test.py будем проверять работу библиотеки, а в adapi.py напишем класс. Установим библиотеки, которые будут использоваться внутри класса:

pip3 install python-ldap ast

Импортируем библиотеки:

import os
import re
import sys
import ldap
import ldap.sasl
import ast

Класс будет принимать 5 аргументов, потому создаем его следующим образом:

class ADApi:
	ldap_server = ""
	ldap_user = ""
	ldap_pass = ""
	base_dn = ""
	search_dn = ""

Теперь напишем конструктор, в котором будут определены эти переменные, чтобы их можно было использовать в методах (функциях) класса, а также добавим соединение с сервером:

def __init__(self, ldap_server, ldap_user, ldap_pass, base_dn, search_dn):
        self.ldap_server = ldap_server
        self.ldap_user = ldap_user
        self.ldap_pass = ldap_pass
        self.base_dn = base_dn
        self.search_dn = search_dn
        try:
            	self.ldap_user = self.login2un(self.ldap_user)
            	self.con = ldap.initialize(self.ldap_server)
            	self.con.set_option(ldap.OPT_REFERRALS, 0)
            	self.con.simple_bind_s(self.ldap_user, self.ldap_pass)
        except ldap.INVALID_CREDENTIALS as e:
            	raise SystemExit(f"Error: Invalid credentials")
        except ldap.LDAPError as e:
            	e = self.err2dict(e)
            	if type(e) is dict and 'desc' in e:
                	raise SystemExit(f"Error: {e['desc']}")

Теперь напишем деструктор, который будет запускаться по окончании выполнения класса, чтобы высвободить сетевое соединение:

def __del__(self):
        try:
            	self.con.unbind_s()
        except Exception:
            	pass

Методы login2un и err2dict напишем позже. Сейчас же небольшое разъяснение по поводу методов и эксепшенов. Как видно в каждом методе есть аргумент self. Его необходимо добавлять, потому что при вызове сперва создается объект из класса, а потом уже идет обращение к методу класса. В других языках обычно вместо self используется this.

Эксепшены нужны, чтобы перехватывать события. Например, если в приложении возникает ошибка, то оно отваливается с трассировкой ошибки. Если же использовать except, то можно либо выцеплять события вроде INVALID_CREDENTIALS и вместо трассировки выполнить корректное завершение приложения с сообщением об ошибке аутентификации или записать ошибку в лог или отправить сообщение по каким-то каналам связи вроде электронной почты и сделать так, чтобы приложение продолжало работать несмотря на ошибку. Обычно так и делают, когда пишут демона (daemon).

Теперь напишем метод login2un:

def login2un(self, login):
	domain = self.dn2domain(self.base_dn)
	un = f"{login}@{domain}"
	return str(un)

Этот метод нужен для удобства. Собственно в ldap_user необходимо писать username@dc.example.com, что длинно и вообще не удобно. И поскольку домен совпадает с base_dn, то этот метод преобразовывает запись в виде dc=dc,dc=example,dc=com в dc.example.com и добавляет username@. Таким образом при инициализации класса в аргумент ldap_user достаточно будет прописать username.

Непосредственно та функция, что преобразовывает dc=dc,dc=example,dc=com в домен типа dc.example.com:

def dn2domain(self, dn):
	domain = str(dn)
	domain = domain.replace("dc=", "")
	domain = domain.replace(",", ".")
	return domain

Следующий метод err2dict:

def err2dict(self, err):
	err = re.sub(r'^.*?{', '{', str(err))
	err = ast.literal_eval(err)
	return err

Эксепшн LDAPError возвращает довольно большое количество ошибок и конечно обрабатывать каждую не имеет смысла поскольку произойти может все что угодно. Однако этот эксепшн почему-то возвращает строку хотя и в виде словаря. Потому этот метод преобразовывает строку в словарь и далее уже выводим только значение ключа desc в raise SystemExit.

Следующий метод проверяет существование пользователя. Если существует, то возвращается true. И false в любом другом случае:

def is_user(self, login):
        filter = "(&(objectClass=user)(sAMAccountName="+login+"))"
        attrs = ["*"]
        try:
            result = self.con.search_s(self.base_dn, ldap.SCOPE_SUBTREE, filter, attrs)
        except ldap.LDAPError as e:
            e = self.err2dict(e)
            if type(e) is dict and 'desc' in e:
                raise SystemExit(f"Error: {e['desc']}")
        if result:
            for data in result:
                if type(data[1]) is dict:
                    obj = self.dekodirui_suka(data[1]['userAccountControl'])
                    if obj == "66048":
                        return True
        return False

Теперь метод, который возвращает givenName пользователя:

def get_username(self, login):
        filter = "(&(objectClass=user)(sAMAccountName="+login+"))"
        attrs = ["*"]
        try:
            result = self.con.search_s(self.base_dn, ldap.SCOPE_SUBTREE, filter, attrs)
        except ldap.LDAPError as e:
            e = self.err2dict(e)
            if type(e) is dict and 'desc' in e:
                raise SystemExit(f"Error: {e['desc']}")
        if result:
            for data in result:
                if type(data[1]) is dict:
                    obj = self.dekodirui_suka(data[1]['givenName'])
                    return obj
        return False

В этом методе можем наблюдать "крик души". Почему-то с сервера данные возвращаются в виде байтов…всмысле это правильно…данные перед отправкой по сети должны кодироваться в байты, но когда достигают клиента, то должны декодироваться, но тут происходит что-то странное. Данные вроде декодированы…но почему-то присутствует символ b’, что указывает на то, что эти данные это байты…вообщем BOM символ и походу это бага библиотеки python-ldap.

Потому пишем следующий метод, который убирает все лишние символы:

def dekodirui_suka(self, blyat):
        val = str(blyat)
        val = val.replace("[", "")
        val = val.replace("b'", "")
        val = val.replace("']", "")
        return val

Ну и как же без аутентификации. Этот метод проверяет подходит ли логин и пароль пользователя. Удобно, если надо сделать аутентификацию на уровне приложения, а не веб-сервера. Иногда это оправдано:

def is_authenticated(self, login, password):
    try:
        login = self.login2un(login)
        try:
            test = self.con.simple_bind_s(login, password)
        except ldap.LDAPError as e:
            e = self.err2dict(e)
            if type(e) is dict and 'desc' in e:
                raise SystemExit(f"Error: {e['desc']}")
        if test:
            return True
    except ldap.INVALID_CREDENTIALS:
        return False

Если логин можно вытащить из Active Directory, то вот пароль никак (даже хэш пароля). Потому устанавливаем еще одно соединение используя ранее созданный хэндлер con и тестируем соединение.

В целом получается, что класс работает следующим образом. Когда мы вызываем какой-то метод, то устанавливается соединение с сервером, вытаскиваются данные и соединение закрывается. Для высоконагруженных систем, где ожидается сотни тысяч запросов в секунду такой подход конечно будет чреват медленной работой, но часто ли вам попадаются такие задачи? Зато будет уверенность, что соединение каждый раз будет корректно закрываться и контроллер не будет падать магическим образом.

Теперь накидаем test.py для проверки работы библиотеки:

#!/bin/env python3

import os
import sys
from lib import adapi

ldap = adapi.ADApi("ldap://server_ip:389", "username", "password", "dc=dc,dc=example,dc=com", "cn=Users,dc=dc,dc=example,dc=com")
data = ldap.is_user("test_user")
print(f"User exist: {data}")
auth = ldap.is_authenticated("test_user", "testpass")
print(f"User authenticated: {auth}")
username = ldap.get_username("test_user")
print(f"Hello, {username}")

Заменяем значения на свои и пробуем запустить скрипт. cn=Users указывает на то, что поиск данных будет производится в группе "Пользователи".

Полный листинг adapi.py:

import os
import re
import sys
import ldap
import ldap.sasl
import ast

class ADApi:
    ldap_server = ""
    ldap_user = ""
    ldap_pass = ""
    base_dn = ""
    search_dn = ""

    def __init__(self, ldap_server, ldap_user, ldap_pass, base_dn, search_dn):
        self.ldap_server = ldap_server
        self.ldap_user = ldap_user
        self.ldap_pass = ldap_pass
        self.base_dn = base_dn
        self.search_dn = search_dn
        try:
            self.ldap_user = self.login2un(self.ldap_user)
            self.con = ldap.initialize(self.ldap_server)
            self.con.set_option(ldap.OPT_REFERRALS, 0)
            self.con.simple_bind_s(self.ldap_user, self.ldap_pass)
        except ldap.INVALID_CREDENTIALS as e:
            raise SystemExit(f"Error: Invalid credentials")
        except ldap.LDAPError as e:
            e = self.err2dict(e)
            if type(e) is dict and 'desc' in e:
                raise SystemExit(f"Error: {e['desc']}")

    def err2dict(self, err):
        err = re.sub(r'^.*?{', '{', str(err))
        err = ast.literal_eval(err)
        return err

    def dekodirui_suka(self, blyat):
        val = str(blyat)
        val = val.replace("[", "")
        val = val.replace("b'", "")
        val = val.replace("']", "")
        return val

    def is_user(self, login):
        filter = "(&(objectClass=user)(sAMAccountName="+login+"))"
        attrs = ["*"]
        try:
            result = self.con.search_s(self.base_dn, ldap.SCOPE_SUBTREE, filter, attrs)
        except ldap.LDAPError as e:
            e = self.err2dict(e)
            if type(e) is dict and 'desc' in e:
                raise SystemExit(f"Error: {e['desc']}")
        if result:
            for data in result:
                if type(data[1]) is dict:
                    obj = self.dekodirui_suka(data[1]['userAccountControl'])
                    if obj == "66048":
                        return True             
        return False

    def is_authenticated(self, login, password):
        try:
            login = self.login2un(login)
            try:
                test = self.con.simple_bind_s(login, password)
            except ldap.LDAPError as e:
                e = self.err2dict(e)
                if type(e) is dict and 'desc' in e:
                    raise SystemExit(f"Error: {e['desc']}")
            if test:
                return True
        except ldap.INVALID_CREDENTIALS:
            return False

    def get_username(self, login):
        filter = "(&(objectClass=user)(sAMAccountName="+login+"))"
        attrs = ["*"]
        try:
            result = self.con.search_s(self.base_dn, ldap.SCOPE_SUBTREE, filter, attrs)
        except ldap.LDAPError as e:
            e = self.err2dict(e)
            if type(e) is dict and 'desc' in e:
                raise SystemExit(f"Error: {e['desc']}")
        if result:
            for data in result:
                if type(data[1]) is dict:
                    obj = self.dekodirui_suka(data[1]['givenName'])
                    return obj
        return False

    def dn2domain(self, dn):
        domain = str(dn)
        domain = domain.replace("dc=", "")
        domain = domain.replace(",", ".")
        return domain

    def login2un(self, login):
        domain = self.dn2domain(self.base_dn)
        un = f"{login}@{domain}"
        return str(un)

    def __del__(self):
        try:
            self.con.unbind_s()
        except Exception:
            pass

UPDATE 02.01.20

Проблема с байтами судя по всему была из-за старой версии протокола. И вообщем-то решил развивать данную библиотеку потому актуальный код можно найти в репозитории на github.

Показать комментарии