first commit after rename

master
nanu2 2014-09-14 15:24:19 +02:00
commit b549ccb20b
12 changed files with 526 additions and 0 deletions

56
.gitignore vendored 100644
View File

@ -0,0 +1,56 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
# C extensions
*.so
# Distribution / packaging
.Python
env/
bin/
build/
develop-eggs/
dist/
eggs/
lib/
lib64/
parts/
sdist/
var/
*.egg-info/
.installed.cfg
*.egg
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.cache
nosetests.xml
coverage.xml
# Translations
*.mo
# Mr Developer
.mr.developer.cfg
.project
.pydevproject
# Rope
.ropeproject
# Django stuff:
*.log
*.pot
# Sphinx documentation
docs/_build/
#Pycharm
.idea/

13
LICENSE.txt 100644
View File

@ -0,0 +1,13 @@
Copyright (c) 2014, Mike Helderman <mike.helderman@gmail.com>
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

11
MANIFEST 100644
View File

@ -0,0 +1,11 @@
# file GENERATED by distutils, do NOT edit
LICENSE.txt
MANIFEST.in
README.md
setup.py
ldapper/__init__.py
ldapper/api.py
ldapper/connection.py
ldapper/ldapperobject.py
ldapper/test/__init__.py
ldapper/test/test_ldapper.py

3
MANIFEST.in 100644
View File

@ -0,0 +1,3 @@
include LICENSE.txt
include MANIFEST.in
include README.md

82
README.md 100644
View File

@ -0,0 +1,82 @@
Ldappr
=======
Ldappr is a wrapper around python-ldap, meant for quick and easy handling of
common administrative tasks concerning your LDAP compliant repository. It is particularly useful in small, one-time scripts to get things done, or interactively within an (i)Python shell.
Connect
-------
```python
import ldappr
# authenticated bind
ldap = ldappr.connect_to('127.0.0.1', 'uid=admin,ou=system', 'secret')
```
Retrieve objects
----------------
When you have a connection, you can search on it. First, specify the seach base.
```python
ldap.search_base = 'ou=users,ou=system'
```
Then, get one or more objects to manipulate.
```python
# retrieve a single object
user = ldap.get('cn=jdoe')
# retrieve a list of objects
users = ldap.search('objectClass=inetOrgPerson')
```
Do stuff
--------
Once you got an object, you can easily manipulate it. All changes will
immediately reflect in your LDAP repository.
```python
# pretty print the retrieved user
print(user)
# get an attribute value
sn = user.attrs['sn']
# set an attribute value (existing value will be removed)
user.set_value('givenName', 'Jack')
# add a value to a multi-valued attribute
user.add_value('mobile', '0123456789')
user.add_value('mobile', '9876543210')
# remove a value from a multi-valued attribute
user.remove_value('mobile', '9876543210')
```
Other examples
--------------
```python
# anonymous bind
ldap = ldappr.connect_to('127.0.0.1')
# authenticated bind with more options
ldap = ldappr.connect_to('127.0.0.1', 'uid=admin,ou=system', 'secret',
protocol='ldaps', port='10636', verify=False,
search_base='ou=users,ou=system')
# delete all objects with employeeType manager
for dn in ldap.get_dn('employeeType=manager'):
ldap.delete(dn)
# set an attribute value for a known dn
ldap.set_value('cn=jdoe,ou=users,ou=system', 'givenName', 'Jack')
# make an LDIF export for all users
with open('export.ldif', 'a') as file:
for user in ldap.search('objectClass=inetOrgPerson'):
file.write(user.to_ldif())
```

View File

@ -0,0 +1,3 @@
from api import *
from connection import *
from ldapprobject import *

8
ldappr/api.py 100644
View File

@ -0,0 +1,8 @@
from connection import Connection, AuthConnection
def connect_to(server, *args, **kwargs):
if args or 'bind_dn' and 'password' in kwargs:
#if args:
return AuthConnection(server, *args, **kwargs)
return Connection(server, **kwargs)

View File

@ -0,0 +1,133 @@
import ldap
from ldapprobject import LdapprObject
class Connection(object):
def __init__(self, server, protocol='ldap', port='', verify=True,
search_base=''):
self.search_base = search_base
if port == '':
port = 389 if protocol == 'ldap' else 636
self.ldap_url = '%s://%s:%s' % (protocol, server, str(port))
try:
ldap.set_option(ldap.OPT_REFERRALS, 0)
if not verify:
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT,
ldap.OPT_X_TLS_NEVER)
self.conn = ldap.initialize(self.ldap_url)
except ldap.LDAPError:
raise
def search(self, search_filter):
# TODO: use the escape_filter_chars() and filter_format() functions
"""Get list of objects that match the search_filter
:param search_filter: filter to find the objects
:return: list of LdapperObjects
"""
result = self.conn.search_s(self.search_base, ldap.SCOPE_SUBTREE,
search_filter)
return [LdapprObject(item, self.conn) for item in result]
def get(self, search_filter):
"""Get first object found (we use sizelimit=1 for speed)
:param search_filter: filter to find the object
:return: LdapprObject or None
"""
result = self.conn.search_ext_s(self.search_base, ldap.SCOPE_SUBTREE,
search_filter, sizelimit=1)
return LdapprObject(result[0], self.conn) if result else None
def get_by_dn(self, dn):
"""Get LdapprObject for known dn
:param dn: dn of the object we're looking for
:return: LdapprObject
"""
result = self.conn.search_s(dn, ldap.SCOPE_BASE)
return LdapprObject(result[0], self.conn)
def get_dn(self, search_filter):
"""Get list of dn's that match the filter
:param search_filter: filter to find the dn's
:return: list of dn's
"""
result = self.conn.search_s(self.search_base, ldap.SCOPE_SUBTREE,
search_filter)
return [dn for (dn, item) in result]
def get_values(self, dn, attr):
"""Get list of values of given attribute for dn
:param dn: dn of the object we're looking for
:param attr: attribute name (case insensitive)
:return: list of values
"""
result = self.conn.search_s(dn, ldap.SCOPE_BASE)
result_object = LdapprObject(result[0], self.conn)
return result_object.attrs[attr]
def get_value(self, dn, attr):
"""Get (first) attr value as string
:param dn: dn of the object we're looking for
:param attr: attribute name (case insensitive)
:return: value as string
"""
result = self.get_values(dn, attr)
return result[0]
def close(self):
self.conn.unbind_s()
class AuthConnection(Connection):
def __init__(self, server, bind_dn, password, **kwargs):
super(AuthConnection, self).__init__(server, **kwargs)
try:
self.conn.simple_bind_s(bind_dn, password)
except ldap.LDAPError:
raise
def add(self, dn, modlist):
"""Adds an entry to the LDAP store
:param dn: dn of the new entry
:param modlist: list of attributes made up of two-value tuples, where
the first item of each tuple is the attribute name, and the
second value is a list of attribute values.
"""
self.conn.add_s(dn, modlist)
def modify(self, dn, modlist):
self.conn.modify_s(dn, modlist)
def set_value(self, dn, attr, value):
self.conn.modify_s(dn, [(ldap.MOD_REPLACE, attr, value)])
def add_value(self, dn, attr, value):
self.conn.modify_s(dn, [(ldap.MOD_ADD, attr, value)])
def delete_value(self, dn, attr, value):
self.conn.modify_s(dn, [(ldap.MOD_DELETE, attr, value)])
def verify_password(self, dn, password):
# TODO: verify password
pass
def change_password(self, dn, old_pw, new_pw):
# TODO: change password
pass
def set_password(self, dn, password):
# TODO: set password (is this even possible?)
pass
def get_schema(self):
# TODO: get schema as ldif or sch
pass
def delete(self, dn):
self.conn.delete_s(dn)

View File

@ -0,0 +1,68 @@
import ldap
import ldif
from ldap.cidict import cidict
from StringIO import StringIO
class LdapprObject(object):
"""\
The LdapprObject is used to handle search results from the Connection
class. It's a representation of a single object in the LDAP Directory.
"""
def __init__(self, result, conn):
"""The class is initialized with a tuple: (dn, {attributes}), and the
existing connection.
"""
(self.dn, self.attributes) = result
self.attrs = cidict(self.attributes)
self.conn = conn
def __str__(self):
"""Pretty prints all attributes with values."""
col_width = max(len(key) for key in self.attrs.keys())
pretty_string = '{attr:{width}} : {value}\n'.format(
attr='dn', width=col_width, value=self.dn)
for key, value in self.attrs.iteritems():
for single_value in value:
if len(value) > 100: # a dirty hack to 'detect' binary attrs
single_value = 'binary'
pretty_string += '{attr:{width}} : {value}\n'.format(
attr=self._case(key), width=col_width, value=single_value)
key = ''
return pretty_string
def _case(self, attr):
"""Transforms an attribute to correct case (e.g. gIvEnNaMe becomes
givenName). If attr is unknown nothing is transformed.
:param attr: may be incorrectly cased
:return: attr in proper case
"""
try:
index = [x.lower() for x in self.attrs.keys()].index(attr.lower())
return self.attrs.keys()[index]
except:
return attr
def to_ldif(self):
"""Makes LDIF of ldappr object."""
out = StringIO()
ldif_out = ldif.LDIFWriter(out)
ldif_out.unparse(self.dn, self.attributes)
return out.getvalue()
def set_value(self, attr, value):
attr = self._case(attr)
self.conn.modify_s(self.dn, [(ldap.MOD_REPLACE, attr, value)])
self.attrs[attr] = [value]
def add_value(self, attr, value):
attr = self._case(attr)
self.conn.modify_s(self.dn, [(ldap.MOD_ADD, attr, value)])
self.attrs[attr].append(value)
def remove_value(self, attr, value):
attr = self._case(attr)
self.conn.modify_s(self.dn, [(ldap.MOD_DELETE, attr, value)])
if value in self.attrs[attr]:
self.attrs[attr].remove(value)

View File

View File

@ -0,0 +1,132 @@
import unittest
import ldap
from ldappr import connect_to
class TestLdappr(unittest.TestCase):
def setUp(self):
"""SetUp method is called for every testcase. We're stretching the
boundaries here, because we create an account, thus making the setUp
method a test in itself.
Current parameters are consistent with a default LDAP server install
with Apache Directory Studio on local machine.
"""
self.server = '127.0.0.1'
self.bind_dn = 'uid=admin,ou=system'
self.password = 'secret'
self.search_base = 'ou=users,ou=system'
self.ldap_port = 10389
self.ldaps_port = 10636
self.ldap = connect_to(self.server, self.bind_dn, self.password,
protocol='ldaps', port=self.ldaps_port,
verify=False, search_base=self.search_base)
self.new_dn = 'cn=jdoe,' + self.search_base
self.modlist = [
('objectClass', ['top', 'inetOrgPerson']),
('givenName', ['John']),
('sn', ['Doe']),
]
self.ldap.add(self.new_dn, self.modlist)
def tearDown(self):
"""TearDown method is called after every testcase. The account from
the setUp method is now deleted. This is another implicit testcase:
if the delete won't succeed we get an error with the next setUp call.
"""
self.ldap.delete(self.new_dn)
self.ldap.close()
def test_anonymous_bind(self):
ldap = connect_to(self.server, port=self.ldap_port)
ldap.close()
def test_server_down(self):
with self.assertRaises(ldap.SERVER_DOWN):
connect_to('wrong_server', self.bind_dn, self.password,
port=self.ldap_port)
def test_invalid_credentials(self):
with self.assertRaises(ldap.INVALID_CREDENTIALS):
connect_to(self.server, self.bind_dn, 'wrong_password',
port=self.ldap_port)
def test_get_attributes_from_ldapper_object(self):
user = self.ldap.get('cn=jdoe')
self.assertEqual(user.attrs['givenname'], ['John'])
self.assertEqual(user.attrs['gIvEnNaMe'], ['John'])
def test_search_instead_of_get(self):
self.assertNotEqual(self.ldap.search('cn=jdoe'), [])
def test_search_and_find_nothing(self):
self.assertEqual(self.ldap.search('cn=non-existent'), [])
def test_pretty_print(self):
user = self.ldap.get('cn=jdoe')
user_string = """\
dn : cn=jdoe,""" + self.search_base + """
objectClass : organizationalPerson
: person
: inetOrgPerson
: top
givenName : John
cn : jdoe
sn : Doe
"""
self.assertEqual(user.__str__(), user_string)
def test_print_ldif(self):
user = self.ldap.get('cn=jdoe')
user_ldif = """\
dn: cn=jdoe,""" + self.search_base + """
cn: jdoe
givenName: John
objectClass: organizationalPerson
objectClass: person
objectClass: inetOrgPerson
objectClass: top
sn: Doe
"""
self.assertEqual(user.to_ldif(), user_ldif)
def test_get_attribute_values_from_dn(self):
value = self.ldap.get_value(self.new_dn, 'sn')
self.assertEqual(value, 'Doe')
value = self.ldap.get_values(self.new_dn, 'sn')
self.assertEqual(value, ['Doe'])
def test_get_dnlist(self):
dnlist = self.ldap.get_dn('cn=jdoe')
self.assertEqual(dnlist, ['cn=jdoe,' + self.search_base])
def test_set_value(self):
user = self.ldap.get_by_dn(self.new_dn)
user.set_value('givenName', 'Jack')
self.assertEqual(user.attrs['givenName'], ['Jack'])
user = self.ldap.get_by_dn(self.new_dn)
self.assertEqual(user.attrs['givenName'], ['Jack'])
self.ldap.set_value(self.new_dn, 'givenName', 'John')
user = self.ldap.get_by_dn(self.new_dn)
self.assertEqual(user.attrs['givenName'], ['John'])
def test_add_value(self):
user = self.ldap.get_by_dn(self.new_dn)
user.set_value('mobile', '0123456789')
user.add_value('mobile', '9876543210')
self.assertEqual(user.attrs['mobile'], ['0123456789', '9876543210'])
user = self.ldap.get_by_dn(self.new_dn)
self.assertEqual(user.attrs['mobile'], ['0123456789', '9876543210'])
def test_remove_value(self):
user = self.ldap.get_by_dn(self.new_dn)
user.set_value('mobile', '0123456789')
user.add_value('mobile', '9876543210')
user.remove_value('mobile', '0123456789')
self.assertEqual(user.attrs['mobile'], ['9876543210'])
user = self.ldap.get_by_dn(self.new_dn)
self.assertEqual(user.attrs['mobile'], ['9876543210'])
if __name__ == '__main__':
unittest.main()

17
setup.py 100644
View File

@ -0,0 +1,17 @@
from distutils.core import setup
with open('README.md') as fh:
long_description = fh.read()
setup(
name='ldappr',
version='0.1',
packages=['ldappr', 'ldappr.test'],
install_requires=['python-ldap'],
url='https://github.com/nanu2/ldappr',
license='ICS',
author='Mike Helderman',
author_email='mike.helderman@gmail.com',
description='Wrapper around python-ldap.',
long_description=long_description
)