diff --git a/udavfs3 b/udavfs3 new file mode 100755 index 0000000..e8c515f --- /dev/null +++ b/udavfs3 @@ -0,0 +1,601 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# Usage: udavfs3 "host=dbhost dbname=database user=dbuser password=dbpass" [-o mount_options] +# UdavFS mount_option: +# blocksize= +# fssize=[k|m|g|t] +# +# Option 'user_allow_other' MUST be set in /etc/fuse.conf + +from __future__ import division, print_function, absolute_import + +import os +import sys +import llfuse +import errno +import stat +import math +from time import time +import psycopg2 +import psycopg2.extensions +import psycopg2.extras +import logging +from collections import defaultdict +from llfuse import FUSEError +from hashlib import sha1 + +log = logging.getLogger() +blocksize = 4096 + +class Operations(llfuse.Operations): + def __init__(self, conn_str, fsid, bs, size): + super(Operations, self).__init__() + self.blocksize = bs + self.fssize = size + self.fsid = fsid + self.db = psycopg2.connect(conn_str) + self.db.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) + self.cursor = self.db.cursor() + self.inode_open_count = defaultdict(int) + try: + if self.get_row('SELECT count(1) FROM fsinfo WHERE fsid=%s;', (self.fsid,))[0] == 0: + self.init_fs() + except: + self.init_fs() + + def init_tables(self): + '''Initialize file system tables''' + + self.cursor.execute(""" + CREATE TABLE fsinfo ( + id SERIAL PRIMARY KEY, + fsid VARCHAR(40) NOT NULL, + size BIGINT NOT NULL, + bs INT NOT NULL, + UNIQUE (fsid) + ) + """) + + self.cursor.execute(""" + CREATE TABLE inodes ( + id BIGSERIAL PRIMARY KEY, + fsid VARCHAR(40) NOT NULL REFERENCES fsinfo (fsid) ON DELETE CASCADE, + inode_id BIGSERIAL, + uid INT NOT NULL, + gid INT NOT NULL, + mode INT NOT NULL, + mtime FLOAT NOT NULL, + atime FLOAT NOT NULL, + ctime FLOAT NOT NULL, + target BYTEA, + size BIGINT NOT NULL DEFAULT 0, + rdev INT NOT NULL DEFAULT 0, + UNIQUE (fsid, inode_id) + ) + """) + + self.cursor.execute(""" + CREATE TABLE body ( + inode_id BIGINT NOT NULL, + fsid VARCHAR(40) NOT NULL, + block_no INT NOT NULL DEFAULT 0, + data BYTEA, + PRIMARY KEY ( fsid, inode_id, block_no ), + UNIQUE ( fsid, inode_id, block_no), + FOREIGN KEY (fsid, inode_id) REFERENCES inodes (fsid, inode_id) ON DELETE CASCADE + ) + """) + + self.cursor.execute(""" + CREATE TABLE contents ( + rowid BIGSERIAL PRIMARY KEY, + fsid VARCHAR(40) NOT NULL, + name BYTEA NOT NULL, + inode_id BIGINT NOT NULL, + parent_inode BIGINT NOT NULL, + FOREIGN KEY( fsid, inode_id ) REFERENCES inodes( fsid, inode_id ) ON DELETE CASCADE, + FOREIGN KEY( fsid, parent_inode ) REFERENCES inodes( fsid, inode_id ) ON DELETE CASCADE, + UNIQUE (fsid, name, parent_inode) + )""") + + def init_fs(self): + if self.get_row('SELECT count(1) FROM information_schema.tables WHERE table_schema=\'public\' AND table_name=\'inodes\'')[0] == 0: + self.init_tables() + if self.get_row('SELECT count(1) FROM fsinfo WHERE fsid=%s;', (self.fsid,))[0] == 0: + self.cursor.execute("INSERT INTO fsinfo (fsid, bs, size) VALUES (%s, %s,%s)", + (self.fsid, self.blocksize, self.fssize)) + # Insert root directory + self.cursor.execute("INSERT INTO inodes (fsid,inode_id,mode,uid,gid,mtime,atime,ctime) " + "VALUES (%s,%s,%s,%s,%s,%s,%s,%s)", + (self.fsid, llfuse.ROOT_INODE, stat.S_IFDIR | stat.S_IRUSR | stat.S_IWUSR + | stat.S_IXUSR | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH + | stat.S_IXOTH, os.getuid(), os.getgid(), time(), + time(), time())) + self.cursor.execute("SELECT setval('inodes_inode_id_seq', %s);", (llfuse.ROOT_INODE + 1,)) + self.cursor.execute("INSERT INTO contents (fsid, name, parent_inode, inode_id) VALUES (%s,%s,%s,%s)", + (self.fsid, b'..', llfuse.ROOT_INODE, llfuse.ROOT_INODE)) + + def get_row(self, *a, **kw): + self.cursor.execute(*a, **kw) + row = self.cursor.fetchone() + if row is None: + raise NoSuchRowError() + if self.cursor.fetchone() is None: + pass + else: + raise NoUniqueValueError() + + return row + + def lookup(self, inode_p, name): + if name == b'.': + inode = inode_p + elif name == b'..': + inode = self.get_row("SELECT parent_inode FROM contents WHERE inode_id=%s AND fsid=%s", + (inode_p, self.fsid))[0] + else: + try: + inode = self.get_row("SELECT inode_id FROM contents WHERE name=%s AND parent_inode=%s AND fsid=%s", + (name, inode_p, self.fsid))[0] + except NoSuchRowError: + raise(llfuse.FUSEError(errno.ENOENT)) + + return self.getattr(inode) + + def getattr(self, inode): + cur = self.db.cursor(cursor_factory=psycopg2.extras.DictCursor) + cur.execute('SELECT * FROM inodes WHERE inode_id=%s AND fsid=%s', (inode, self.fsid)) + row = cur.fetchone() + entry = llfuse.EntryAttributes() + entry.st_ino = inode + entry.generation = 0 + entry.entry_timeout = 300 + entry.attr_timeout = 300 + entry.st_mode = row['mode'] + entry.st_nlink = self.get_row("SELECT COUNT(1) FROM contents WHERE inode_id=%s AND fsid=%s", + (inode, self.fsid))[0] + entry.st_uid = row['uid'] + entry.st_gid = row['gid'] + entry.st_rdev = row['rdev'] + entry.st_size = row['size'] + + entry.st_blksize = self.blocksize + entry.st_blocks = self.get_row("SELECT COUNT(1) FROM body WHERE inode_id=%s AND fsid=%s", + (inode, self.fsid))[0] + entry.st_atime = row['atime'] + entry.st_mtime = row['mtime'] + entry.st_ctime = row['ctime'] + + return entry + + def readlink(self, inode): + return bytes(self.get_row('SELECT target FROM inodes WHERE inode_id=%s AND fsid=%s', (inode, self.fsid))[0]) + + def opendir(self, inode): + return inode + + def readdir(self, inode, off): + if off == 0: + off = -1 + + cursor2 = self.db.cursor(cursor_factory=psycopg2.extras.DictCursor) + cursor2.execute("SELECT * FROM contents WHERE parent_inode=%s " + 'AND rowid > %s AND fsid=%s ORDER BY rowid', (inode, off, self.fsid)) + + for row in cursor2.fetchall(): + yield (bytes(row['name']), self.getattr(row['inode_id']), row['rowid']) + + def unlink(self, inode_p, name): + entry = self.lookup(inode_p, name) + + if stat.S_ISDIR(entry.st_mode): + raise llfuse.FUSEError(errno.EISDIR) + + self._remove(inode_p, name, entry) + + def rmdir(self, inode_p, name): + entry = self.lookup(inode_p, name) + + if not stat.S_ISDIR(entry.st_mode): + raise llfuse.FUSEError(errno.ENOTDIR) + + self._remove(inode_p, name, entry) + + def _remove(self, inode_p, name, entry): + if self.get_row("SELECT COUNT(1) FROM contents WHERE parent_inode=%s AND fsid=%s", + (entry.st_ino, self.fsid))[0] > 0: + raise llfuse.FUSEError(errno.ENOTEMPTY) + + self.cursor.execute("DELETE FROM contents WHERE name=%s AND parent_inode=%s AND fsid=%s", + (name, inode_p, self.fsid)) + + if entry.st_nlink == 1 and entry.st_ino not in self.inode_open_count: + self.cursor.execute("DELETE FROM inodes WHERE inode_id=%s AND fsid=%s", (entry.st_ino, self.fsid)) + + + def symlink(self, inode_p, name, target, ctx): + mode = (stat.S_IFLNK | stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR | + stat.S_IRGRP | stat.S_IWGRP | stat.S_IXGRP | + stat.S_IROTH | stat.S_IWOTH | stat.S_IXOTH) + return self._create(inode_p, name, mode, ctx, target=target) + + def rename(self, inode_p_old, name_old, inode_p_new, name_new): + entry_old = self.lookup(inode_p_old, name_old) + + try: + entry_new = self.lookup(inode_p_new, name_new) + except llfuse.FUSEError as exc: + if exc.errno != errno.ENOENT: + raise + target_exists = False + else: + target_exists = True + + if target_exists: + self._replace(inode_p_old, name_old, inode_p_new, name_new, + entry_old, entry_new) + else: + self.cursor.execute("UPDATE contents SET name=%s, parent_inode=%s WHERE name=%s " + "AND parent_inode=%s AND fsid=%s", (name_new, inode_p_new, + name_old, inode_p_old, self.fsid)) + + + def _replace(self, inode_p_old, name_old, inode_p_new, name_new, + entry_old, entry_new): + if self.get_row("SELECT COUNT(1) FROM contents WHERE parent_inode=%s AND fsid=%s", + (entry_new.st_ino, self.fsid))[0] > 0: + raise llfuse.FUSEError(errno.ENOTEMPTY) + + self.cursor.execute("UPDATE contents SET inode_id=%s WHERE name=%s AND parent_inode=%s AND fsid=%s", + (entry_old.st_ino, name_new, inode_p_new, self.fsid)) + self.cursor.execute('DELETE FROM contents WHERE name=%s AND parent_inode=%s AND fsid=%s', + (name_old, inode_p_old, self.fsid)) + + if entry_new.st_nlink == 1 and entry_new.st_ino not in self.inode_open_count: + self.cursor.execute("DELETE FROM inodes WHERE inode_id=%s AND fsid=%s", (entry_new.st_ino, self.fsid)) + + + def link(self, inode, new_inode_p, new_name): + entry_p = self.getattr(new_inode_p) + if entry_p.st_nlink == 0: + raise FUSEError(errno.EINVAL) + + self.cursor.execute("INSERT INTO contents (name, inode_id, parent_inode, fsid) VALUES(%s,%s,%s,%s)", + (new_name, inode, new_inode_p, self.fsid)) + + return self.getattr(inode) + def del_block(self, inode, num): + self.cursor.execute("DELETE FROM body WHERE inode_id=%s AND block_no=%s AND fsid=%s", (inode, num, self.fsid)) + + def setattr(self, inode, attr): + + if attr.st_size is not None: + size = self.get_row('SELECT size FROM inodes WHERE inode_id=%s AND fsid=%s', (inode, self.fsid))[0] + if size is None: + size = 0 + blocks = int(math.ceil(size / self.blocksize)) + end_len = size % self.blocksize + if size < attr.st_size: + dif_size = attr.st_size - size + if dif_size <= (self.blocksize - end_len) and end_len > 0: + pass + else: + addblocks = int(math.ceil(dif_size / self.blocksize)) + for b in range(addblocks): + self.write(inode, (blocks + b) * self.blocksize , '\0' * self.blocksize) + elif size > attr.st_size: + dif_size = size - attr.st_size + if dif_size < end_len: + self.write(inode, (blocks - 1) * self.blocksize, + self.read(inode, (blocks - 1) * self.blocksize , + end_len - dif_size) + '\0' * (self.blocksize - (end_len - dif_size))) + elif dif_size == end_len and end_len > 0: + self.del_block(inode, blocks - 1) + elif dif_size > end_len: + if end_len > 0: + dif_size -= end_len + blocks -= 1 + self.del_block(inode, blocks) + dif_blocks = int(math.ceil(dif_size / self.blocksize)) + dif_end_len = dif_size % self.blocksize + if dif_end_len > 0: + dif_blocks -= 1 + for b in range(dif_blocks): + blocks -= 1 + self.del_block(inode, blocks) + if dif_end_len > 0: + self.write(inode, (blocks - 1) * self.blocksize, + self.read(inode, (blocks - 1) * self.blocksize , + self.blocksize - dif_end_len) + '\0' * (self.blocksize - dif_end_len)) + if size == attr.st_size: + pass + else: + self.cursor.execute("UPDATE inodes SET size=%s WHERE inode_id=%s AND fsid=%s", + (attr.st_size, inode, self.fsid)) + if attr.st_mode is not None: + self.cursor.execute('UPDATE inodes SET mode=%s WHERE inode_id=%s AND fsid=%s', + (attr.st_mode, inode, self.fsid)) + + if attr.st_uid is not None: + self.cursor.execute('UPDATE inodes SET uid=%s WHERE inode_id=%s AND fsid=%s', + (attr.st_uid, inode, self.fsid)) + + if attr.st_gid is not None: + self.cursor.execute('UPDATE inodes SET gid=%s WHERE inode_id=%s AND fsid=%s', + (attr.st_gid, inode, self.fsid)) + + if attr.st_rdev is not None: + self.cursor.execute('UPDATE inodes SET rdev=%s WHERE inode_id=%s AND fsid=%s', + (attr.st_rdev, inode, self.fsid)) + + if attr.st_atime is not None: + self.cursor.execute('UPDATE inodes SET atime=%s WHERE inode_id=%s AND fsid=%s', + (attr.st_atime, inode, self.fsid)) + + if attr.st_mtime is not None: + self.cursor.execute('UPDATE inodes SET mtime=%s WHERE inode_id=%s AND fsid=%s', + (attr.st_mtime, inode, self.fsid)) + + if attr.st_ctime is not None: + self.cursor.execute('UPDATE inodes SET ctime=%s WHERE inode_id=%s AND fsid=%s', + (attr.st_ctime, inode, self.fsid)) + + return self.getattr(inode) + + def mknod(self, inode_p, name, mode, rdev, ctx): + return self._create(inode_p, name, mode, ctx, rdev=rdev) + + def mkdir(self, inode_p, name, mode, ctx): + return self._create(inode_p, name, mode, ctx) + + def statfs(self): + stat_ = llfuse.StatvfsData() + + stat_.f_bsize = self.blocksize + stat_.f_frsize = self.blocksize + + size = self.get_row('SELECT SUM(size) FROM inodes WHERE fsid=%s', (self.fsid,))[0] + stat_.f_blocks = self.fssize // stat_.f_frsize + stat_.f_bfree = int(stat_.f_blocks) - (size // stat_.f_frsize) + stat_.f_bavail = stat_.f_bfree + + inodes = self.get_row('SELECT COUNT(1) FROM inodes WHERE fsid=%s', (self.fsid,))[0] + stat_.f_files = inodes + stat_.f_ffree = max(inodes , 100) + stat_.f_favail = stat_.f_ffree + + return stat_ + + def open(self, inode, flags): + self.inode_open_count[inode] += 1 + return inode + + def access(self, inode, mode, ctx): + if mode != os.F_OK and not self.__access(inode, mode, ctx): + return False + return True + + def __access(self, inode, flags, ctx): + attrs = self.get_row("SELECT mode, uid, gid FROM inodes WHERE inode_id = %s AND fsid=%s", + (inode, self.fsid)) + uid, gid = (ctx.uid, ctx.gid) + o = uid == attrs[1] # access by same user id? + g = gid == attrs[2] and not o # access by same group id? + # Note: "and not o" added after experimenting with EXT4. + w = not (o or g) # anything else + m = attrs[0] + return (not (flags & os.R_OK) or ((o and (m & 0o0400)) or (g and (m & 0o0040)) or (w and (m & 0o0004)))) \ + and (not (flags & os.W_OK) or ((o and (m & 0o0200)) or (g and (m & 0o0020)) or (w and (m & 0o0002)))) \ + and (not (flags & os.X_OK) or ((o and (m & 0o0100)) or (g and (m & 0o0010)) or (w and (m & 0o0001)))) + + def create(self, inode_parent, name, mode, flags, ctx): + entry = self._create(inode_parent, name, mode, ctx) + self.inode_open_count[entry.st_ino] += 1 + return (entry.st_ino, entry) + + def _create(self, inode_p, name, mode, ctx, rdev=0, target=None): + if self.getattr(inode_p).st_nlink == 0: + raise FUSEError(errno.EINVAL) + + self.cursor.execute('INSERT INTO inodes (uid, gid, mode, mtime, atime, ' + 'ctime, target, rdev, fsid) VALUES(%s, %s, %s, %s, %s, %s, %s, %s, %s) RETURNING inode_id;', + (ctx.uid, ctx.gid, mode, time(), time(), time(), target, rdev, self.fsid)) + + inode = self.cursor.fetchone()[0] + self.cursor.execute("INSERT INTO contents(name, inode_id, parent_inode, fsid) VALUES(%s,%s,%s,%s);", + (name, inode, inode_p, self.fsid)) + + return self.getattr(inode) + + def block_info(self, offset, length): + info = {} + info['first_block'] = int(offset / self.blocksize) + info['start_idx'] = offset % self.blocksize + info['last_block'] = int(math.ceil((offset + length) / self.blocksize)) - 1 + end = (offset + length) % self.blocksize + info['end_idx'] = end > 0 and end or self.blocksize + info['blocks'] = info['last_block'] - info['first_block'] + 1 + return info + + def read(self, inode, offset, length): + size = self.get_row('SELECT size FROM inodes WHERE inode_id=%s AND fsid=%s', (inode, self.fsid))[0] + if offset < size: + if offset + length > size: + length = size - offset + info = self.block_info(offset, length) + self.cursor.execute('SELECT data FROM body WHERE inode_id=%s AND block_no>=%s AND block_no<=%s AND fsid=%s ORDER BY block_no ASC', + (inode, info['first_block'], info['last_block'], self.fsid)) + res = self.cursor.fetchall() + if res is None: + data = b'' + else: + d = list() + for r in res: + d.append(bytes(r[0])) + d[-1] = d[-1][:info['end_idx']] + d[0] = d[0][info['start_idx']:] + data = b''.join(d) + else: + data = b'' + return data + + def write(self, fh, offset, buf): + size = self.get_row('SELECT size FROM inodes WHERE inode_id=%s AND fsid=%s', (fh, self.fsid))[0] + length = len(buf) + old_info = self.block_info(0, size) + write_info = self.block_info(offset, len(buf)) + gidx = 0 + for b in range(write_info['first_block'], write_info['last_block'] + 1): + if b == write_info['first_block']: + if b > old_info['last_block']: + old_block = b'\0' * self.blocksize + else: + old_block = bytes(self.get_row('SELECT data FROM body WHERE inode_id=%s AND block_no=%s AND fsid=%s', + (fh, b, self.fsid))[0]) + if write_info['first_block'] == write_info['last_block']: + block = old_block[:write_info['start_idx']] + buf[write_info['start_idx']:write_info['end_idx']] + old_block[write_info['end_idx']:] + else: + block = old_block[:write_info['start_idx']] + buf[write_info['start_idx']:self.blocksize] + gidx += self.blocksize - write_info['start_idx'] + elif b == write_info['last_block']: + if b > old_info['last_block']: + old_block = b'\0' * self.blocksize + else: + old_block = bytes(self.get_row('SELECT data FROM body WHERE inode_id=%s AND block_no=%s AND fsid=%s', + (fh, b, self.fsid))[0]) + block = buf[gidx:gidx + write_info['end_idx']] + old_block[write_info['end_idx']:] + else: + block = buf[gidx:gidx + self.blocksize] + gidx += self.blocksize + if b > old_info['last_block']: + self.cursor.execute("INSERT INTO body (inode_id,block_no,data, fsid)" + "VALUES (%s,%s,%s,%s)", (fh, b, psycopg2.Binary(block), self.fsid)) + else: + self.cursor.execute('UPDATE body SET data=%s WHERE inode_id=%s AND fsid=%s', + (psycopg2.Binary(block), fh, self.fsid)) + + if offset + length > size: + self.cursor.execute('UPDATE inodes SET size=%s WHERE inode_id=%s AND fsid=%s', + (offset + length, fh, self.fsid)) + + return length + + def release(self, fh): + self.inode_open_count[fh] -= 1 + + if self.inode_open_count[fh] == 0: + del self.inode_open_count[fh] + if self.getattr(fh).st_nlink == 0: + self.cursor.execute("DELETE FROM inodes WHERE inode_id=%s AND fsid=%s", (fh, self.fsid)) + + +class NoUniqueValueError(Exception): + def __str__(self): + return 'Query generated more than 1 result row' + + +class NoSuchRowError(Exception): + def __str__(self): + return 'Query produced 0 result rows' + +def usage(): + raise SystemExit('Usage: %s "host=dbhost dbname=database user=dbuser password=dbpass" [-o mount_options]\nUdavFS mount_option:\n\tblocksize=\n\tfssize=[k|m|g|t]\n\nOption \'user_allow_other\' MUST be set in /etc/fuse.conf' % sys.argv[0]) + +if __name__ == '__main__': + + if len(sys.argv) < 3 or len(sys.argv) > 3 and len(sys.argv) < 5 or len(sys.argv) > 5: + usage() + if sys.argv[3] != '-o' and len(sys.argv) != 5: + usage() + conn_str = sys.argv[1] + options = {x.split('=')[0].strip(): len(x.split('=')) > 1 and x.split('=')[1].strip() or True for x in sys.argv[4].split(',') } + if 'fsname' not in options.keys(): + print('fsname MUST be in mountoptions\n') + usage() + fsname = options['fsname'] + del options['fsname'] + fsid = sha1(bytes(fsname.strip(), 'UTF-8')).hexdigest() + + db = psycopg2.connect(conn_str) + db.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) + cursor = db.cursor() + + try: + cursor.execute('SELECT bs, size FROM fsinfo WHERE fsid=%s', (fsid,)) + blocksize, fssize = cursor.fetchone() + except: + if 'create' not in options.keys(): + print('Filesystem "%s" does not exist\n' % (fsname,)) + usage() + del options['create'] + if 'blocksize' not in options.keys(): + print('blocksize mountoption MUST be specified\n') + usage() + if 'fssize' not in options.keys(): + print('fssize mountoption MUST be specified\n') + usage() + new_bs = int(options['blocksize']) + blocksize = new_bs + del options['blocksize'] + mod1 = {'k': 1024, 'm': 1024 ** 2, 'g': 1024 ** 3, 't': 1024 ** 4} + fssize = options['fssize'].lower() + del options['fssize'] + if fssize[-1] in mod1.keys(): + fssize = math.ceil((int(fssize[:-1]) * mod1[fssize[-1]]) / blocksize) * blocksize + if fssize < 4194304: + raise SystemExit('Filesystem size less than 4MB.\n') + + db.close() + del db + + m_params = [ 'fsname=udavfs', 'nonempty', 'default_permissions', 'allow_other' ] + m_params.extend(options) + mountpoint = os.path.abspath(sys.argv[2]) + + if not os.path.isdir(mountpoint): + raise SystemExit('Mountpoint "%s" not directory or does not exist!' % mountpoint) + + +# Start forking + # First fork + try: + if os.fork() > 0: + sys.exit(0) # kill off parent + except OSError as e: + sys.stderr.write("fork #1 failed: (%d) %s\n" % (e.errno, e.strerror)) + sys.exit(1) + os.setsid() + os.chdir(os.getcwd()) + os.umask(0o022) + + # Second fork + try: + if os.fork() > 0: + os._exit(0) + except OSError as e: + sys.stderr.write("fork #2 failed: (%d) %s\n" % (e.errno, e.strerror)) + os._exit(1) +# End forking + + si = open('/dev/null', 'rb') + so = open('/dev/null', 'ab+', 0) + se = open('/dev/null', 'ab+', 0) + os.dup2(si.fileno(), sys.stdin.fileno()) + os.dup2(so.fileno(), sys.stdout.fileno()) + os.dup2(se.fileno(), sys.stderr.fileno()) + sys.stdout, sys.stderr = so, se + + + operations = Operations(conn_str, fsid, blocksize, fssize) + llfuse.init(operations, mountpoint, + m_params) + + + try: + llfuse.main(single=False) + except: + llfuse.close(unmount=False) + raise + + llfuse.close()