Python: Active Directory
Создадим директорию проекта, библиотеки и собственно сами файлы:
mkdir ad
cd ad
mkdir lib
touch test.py
touch lib/adapi.py
В test.py будем проверять работу библиотеки, а в adapi.py напишем класс. Установим библиотеки, которые будут использоваться внутри класса:
pip3 install python-ldap ast
Импортируем библиотеки:
import os
import re
import sys
import ldap
import ldap.sasl
import ast
Класс будет принимать 5 аргументов, потому создаем его следующим образом:
class ADApi:
ldap_server = ""
ldap_user = ""
ldap_pass = ""
base_dn = ""
search_dn = ""
Теперь напишем конструктор, в котором будут определены эти переменные, чтобы их можно было использовать в методах (функциях) класса, а также добавим соединение с сервером:
def __init__(self, ldap_server, ldap_user, ldap_pass, base_dn, search_dn):
self.ldap_server = ldap_server
self.ldap_user = ldap_user
self.ldap_pass = ldap_pass
self.base_dn = base_dn
self.search_dn = search_dn
try:
self.ldap_user = self.login2un(self.ldap_user)
self.con = ldap.initialize(self.ldap_server)
self.con.set_option(ldap.OPT_REFERRALS, 0)
self.con.simple_bind_s(self.ldap_user, self.ldap_pass)
except ldap.INVALID_CREDENTIALS as e:
raise SystemExit(f"Error: Invalid credentials")
except ldap.LDAPError as e:
e = self.err2dict(e)
if type(e) is dict and 'desc' in e:
raise SystemExit(f"Error: {e['desc']}")
Теперь напишем деструктор, который будет запускаться по окончании выполнения класса, чтобы высвободить сетевое соединение:
def __del__(self):
try:
self.con.unbind_s()
except Exception:
pass
Методы login2un и err2dict напишем позже. Сейчас же небольшое разъяснение по поводу методов и эксепшенов. Как видно в каждом методе есть аргумент self. Его необходимо добавлять, потому что при вызове сперва создается объект из класса, а потом уже идет обращение к методу класса. В других языках обычно вместо self используется this.
Эксепшены нужны, чтобы перехватывать события. Например, если в приложении возникает ошибка, то оно отваливается с трассировкой ошибки. Если же использовать except, то можно либо выцеплять события вроде INVALID_CREDENTIALS и вместо трассировки выполнить корректное завершение приложения с сообщением об ошибке аутентификации или записать ошибку в лог или отправить сообщение по каким-то каналам связи вроде электронной почты и сделать так, чтобы приложение продолжало работать несмотря на ошибку. Обычно так и делают, когда пишут демона (daemon).
Теперь напишем метод login2un:
def login2un(self, login):
domain = self.dn2domain(self.base_dn)
un = f"{login}@{domain}"
return str(un)
Этот метод нужен для удобства. Собственно в ldap_user необходимо писать username@dc.example.com, что длинно и вообще не удобно. И поскольку домен совпадает с base_dn, то этот метод преобразовывает запись в виде dc=dc,dc=example,dc=com в dc.example.com и добавляет username@. Таким образом при инициализации класса в аргумент ldap_user достаточно будет прописать username.
Непосредственно та функция, что преобразовывает dc=dc,dc=example,dc=com в домен типа dc.example.com:
def dn2domain(self, dn):
domain = str(dn)
domain = domain.replace("dc=", "")
domain = domain.replace(",", ".")
return domain
Следующий метод err2dict:
def err2dict(self, err):
err = re.sub(r'^.*?{', '{', str(err))
err = ast.literal_eval(err)
return err
Эксепшн LDAPError возвращает довольно большое количество ошибок и конечно обрабатывать каждую не имеет смысла поскольку произойти может все что угодно. Однако этот эксепшн почему-то возвращает строку хотя и в виде словаря. Потому этот метод преобразовывает строку в словарь и далее уже выводим только значение ключа desc в raise SystemExit.
Следующий метод проверяет существование пользователя. Если существует, то возвращается true. И false в любом другом случае:
def is_user(self, login):
filter = "(&(objectClass=user)(sAMAccountName="+login+"))"
attrs = ["*"]
try:
result = self.con.search_s(self.base_dn, ldap.SCOPE_SUBTREE, filter, attrs)
except ldap.LDAPError as e:
e = self.err2dict(e)
if type(e) is dict and 'desc' in e:
raise SystemExit(f"Error: {e['desc']}")
if result:
for data in result:
if type(data[1]) is dict:
obj = self.dekodirui_suka(data[1]['userAccountControl'])
if obj == "66048":
return True
return False
Теперь метод, который возвращает givenName пользователя:
def get_username(self, login):
filter = "(&(objectClass=user)(sAMAccountName="+login+"))"
attrs = ["*"]
try:
result = self.con.search_s(self.base_dn, ldap.SCOPE_SUBTREE, filter, attrs)
except ldap.LDAPError as e:
e = self.err2dict(e)
if type(e) is dict and 'desc' in e:
raise SystemExit(f"Error: {e['desc']}")
if result:
for data in result:
if type(data[1]) is dict:
obj = self.dekodirui_suka(data[1]['givenName'])
return obj
return False
В этом методе можем наблюдать "крик души". Почему-то с сервера данные возвращаются в виде байтов…всмысле это правильно…данные перед отправкой по сети должны кодироваться в байты, но когда достигают клиента, то должны декодироваться, но тут происходит что-то странное. Данные вроде декодированы…но почему-то присутствует символ b’, что указывает на то, что эти данные это байты…вообщем BOM символ и походу это бага библиотеки python-ldap.
Потому пишем следующий метод, который убирает все лишние символы:
def dekodirui_suka(self, blyat):
val = str(blyat)
val = val.replace("[", "")
val = val.replace("b'", "")
val = val.replace("']", "")
return val
Ну и как же без аутентификации. Этот метод проверяет подходит ли логин и пароль пользователя. Удобно, если надо сделать аутентификацию на уровне приложения, а не веб-сервера. Иногда это оправдано:
def is_authenticated(self, login, password):
try:
login = self.login2un(login)
try:
test = self.con.simple_bind_s(login, password)
except ldap.LDAPError as e:
e = self.err2dict(e)
if type(e) is dict and 'desc' in e:
raise SystemExit(f"Error: {e['desc']}")
if test:
return True
except ldap.INVALID_CREDENTIALS:
return False
Если логин можно вытащить из Active Directory, то вот пароль никак (даже хэш пароля). Потому устанавливаем еще одно соединение используя ранее созданный хэндлер con и тестируем соединение.
В целом получается, что класс работает следующим образом. Когда мы вызываем какой-то метод, то устанавливается соединение с сервером, вытаскиваются данные и соединение закрывается. Для высоконагруженных систем, где ожидается сотни тысяч запросов в секунду такой подход конечно будет чреват медленной работой, но часто ли вам попадаются такие задачи? Зато будет уверенность, что соединение каждый раз будет корректно закрываться и контроллер не будет падать магическим образом.
Теперь накидаем test.py для проверки работы библиотеки:
#!/bin/env python3
import os
import sys
from lib import adapi
ldap = adapi.ADApi("ldap://server_ip:389", "username", "password", "dc=dc,dc=example,dc=com", "cn=Users,dc=dc,dc=example,dc=com")
data = ldap.is_user("test_user")
print(f"User exist: {data}")
auth = ldap.is_authenticated("test_user", "testpass")
print(f"User authenticated: {auth}")
username = ldap.get_username("test_user")
print(f"Hello, {username}")
Заменяем значения на свои и пробуем запустить скрипт. cn=Users указывает на то, что поиск данных будет производится в группе "Пользователи".
Полный листинг adapi.py:
import os
import re
import sys
import ldap
import ldap.sasl
import ast
class ADApi:
ldap_server = ""
ldap_user = ""
ldap_pass = ""
base_dn = ""
search_dn = ""
def __init__(self, ldap_server, ldap_user, ldap_pass, base_dn, search_dn):
self.ldap_server = ldap_server
self.ldap_user = ldap_user
self.ldap_pass = ldap_pass
self.base_dn = base_dn
self.search_dn = search_dn
try:
self.ldap_user = self.login2un(self.ldap_user)
self.con = ldap.initialize(self.ldap_server)
self.con.set_option(ldap.OPT_REFERRALS, 0)
self.con.simple_bind_s(self.ldap_user, self.ldap_pass)
except ldap.INVALID_CREDENTIALS as e:
raise SystemExit(f"Error: Invalid credentials")
except ldap.LDAPError as e:
e = self.err2dict(e)
if type(e) is dict and 'desc' in e:
raise SystemExit(f"Error: {e['desc']}")
def err2dict(self, err):
err = re.sub(r'^.*?{', '{', str(err))
err = ast.literal_eval(err)
return err
def dekodirui_suka(self, blyat):
val = str(blyat)
val = val.replace("[", "")
val = val.replace("b'", "")
val = val.replace("']", "")
return val
def is_user(self, login):
filter = "(&(objectClass=user)(sAMAccountName="+login+"))"
attrs = ["*"]
try:
result = self.con.search_s(self.base_dn, ldap.SCOPE_SUBTREE, filter, attrs)
except ldap.LDAPError as e:
e = self.err2dict(e)
if type(e) is dict and 'desc' in e:
raise SystemExit(f"Error: {e['desc']}")
if result:
for data in result:
if type(data[1]) is dict:
obj = self.dekodirui_suka(data[1]['userAccountControl'])
if obj == "66048":
return True
return False
def is_authenticated(self, login, password):
try:
login = self.login2un(login)
try:
test = self.con.simple_bind_s(login, password)
except ldap.LDAPError as e:
e = self.err2dict(e)
if type(e) is dict and 'desc' in e:
raise SystemExit(f"Error: {e['desc']}")
if test:
return True
except ldap.INVALID_CREDENTIALS:
return False
def get_username(self, login):
filter = "(&(objectClass=user)(sAMAccountName="+login+"))"
attrs = ["*"]
try:
result = self.con.search_s(self.base_dn, ldap.SCOPE_SUBTREE, filter, attrs)
except ldap.LDAPError as e:
e = self.err2dict(e)
if type(e) is dict and 'desc' in e:
raise SystemExit(f"Error: {e['desc']}")
if result:
for data in result:
if type(data[1]) is dict:
obj = self.dekodirui_suka(data[1]['givenName'])
return obj
return False
def dn2domain(self, dn):
domain = str(dn)
domain = domain.replace("dc=", "")
domain = domain.replace(",", ".")
return domain
def login2un(self, login):
domain = self.dn2domain(self.base_dn)
un = f"{login}@{domain}"
return str(un)
def __del__(self):
try:
self.con.unbind_s()
except Exception:
pass
UPDATE 02.01.20
Проблема с байтами судя по всему была из-за старой версии протокола. И вообщем-то решил развивать данную библиотеку потому актуальный код можно найти в репозитории на github.