#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # Usage: udavfs3 "host=dbhost dbname=database user=dbuser password=dbpass" [-o mount_options] # UdavFS mount_option: # blocksize= # fssize=[k|m|g|t] # fsid= # # Option 'user_allow_other' MUST be set in /etc/fuse.conf from __future__ import division, print_function, absolute_import import os import sys import llfuse import errno import stat import math from time import time import psycopg2 import psycopg2.extensions import psycopg2.extras import logging from collections import defaultdict from llfuse import FUSEError from hashlib import sha1 log = logging.getLogger() blocksize = 4096 class Operations(llfuse.Operations): def __init__(self, conn_str, fsid, bs, size): super(Operations, self).__init__() self.blocksize = bs self.fssize = size self.fsid = fsid self.db = psycopg2.connect(conn_str) self.db.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) self.cursor = self.db.cursor() self.inode_open_count = defaultdict(int) try: if self.get_row('SELECT count(1) FROM fsinfo WHERE fsid=%s;', (self.fsid,))[0] == 0: self.init_fs() except: self.init_fs() def init_tables(self): '''Initialize file system tables''' self.cursor.execute(""" CREATE TABLE fsinfo ( id SERIAL PRIMARY KEY, fsid VARCHAR(40) NOT NULL, size BIGINT NOT NULL, bs INT NOT NULL, UNIQUE (fsid) ) """) self.cursor.execute(""" CREATE TABLE inodes ( id BIGSERIAL PRIMARY KEY, fsid VARCHAR(40) NOT NULL REFERENCES fsinfo (fsid) ON DELETE CASCADE, inode_id BIGSERIAL, uid INT NOT NULL, gid INT NOT NULL, mode INT NOT NULL, mtime BIGINT NOT NULL, atime BIGINT NOT NULL, ctime BIGINT NOT NULL, target BYTEA, size BIGINT NOT NULL DEFAULT 0, rdev INT NOT NULL DEFAULT 0, UNIQUE (fsid, inode_id) ) """) self.cursor.execute(""" CREATE TABLE body ( inode_id BIGINT NOT NULL, fsid VARCHAR(40) NOT NULL, block_no INT NOT NULL DEFAULT 0, data BYTEA, PRIMARY KEY ( fsid, inode_id, block_no ), UNIQUE ( fsid, inode_id, block_no), FOREIGN KEY (fsid, inode_id) REFERENCES inodes (fsid, inode_id) ON DELETE CASCADE ) """) self.cursor.execute(""" CREATE TABLE contents ( rowid BIGSERIAL PRIMARY KEY, fsid VARCHAR(40) NOT NULL, name BYTEA NOT NULL, inode_id BIGINT NOT NULL, parent_inode BIGINT NOT NULL, FOREIGN KEY( fsid, inode_id ) REFERENCES inodes( fsid, inode_id ) ON DELETE CASCADE, FOREIGN KEY( fsid, parent_inode ) REFERENCES inodes( fsid, inode_id ) ON DELETE CASCADE, UNIQUE (fsid, name, parent_inode) )""") def init_fs(self): if self.get_row('SELECT count(1) FROM information_schema.tables WHERE table_schema=\'public\' AND table_name=\'inodes\'')[0] == 0: self.init_tables() if self.get_row('SELECT count(1) FROM fsinfo WHERE fsid=%s;', (self.fsid,))[0] == 0: self.cursor.execute("INSERT INTO fsinfo (fsid, bs, size) VALUES (%s, %s,%s)", (self.fsid, self.blocksize, self.fssize)) # Insert root directory self.cursor.execute("INSERT INTO inodes (fsid,inode_id,mode,uid,gid,mtime,atime,ctime) " "VALUES (%s,%s,%s,%s,%s,%s,%s,%s)", (self.fsid, llfuse.ROOT_INODE, stat.S_IFDIR | stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH, os.getuid(), os.getgid(), int(time() * 1e9), int(time() * 1e9), int(time() * 1e9))) self.cursor.execute("SELECT setval('inodes_inode_id_seq', %s);", (llfuse.ROOT_INODE + 1,)) self.cursor.execute("INSERT INTO contents (fsid, name, parent_inode, inode_id) VALUES (%s,%s,%s,%s)", (self.fsid, b'..', llfuse.ROOT_INODE, llfuse.ROOT_INODE)) def get_row(self, *a, **kw): self.cursor.execute(*a, **kw) row = self.cursor.fetchone() if row is None: raise NoSuchRowError() if self.cursor.fetchone() is None: pass else: raise NoUniqueValueError() return row def lookup(self, inode_p, name, ctx): if name == b'.': inode = inode_p elif name == b'..': inode = self.get_row("SELECT parent_inode FROM contents WHERE inode_id=%s AND fsid=%s", (inode_p, self.fsid))[0] else: try: inode = self.get_row("SELECT inode_id FROM contents WHERE name=%s AND parent_inode=%s AND fsid=%s", (name, inode_p, self.fsid))[0] except NoSuchRowError: raise(llfuse.FUSEError(errno.ENOENT)) return self.getattr(inode, ctx) def getattr(self, inode, ctx = None): cur = self.db.cursor(cursor_factory=psycopg2.extras.DictCursor) cur.execute('SELECT * FROM inodes WHERE inode_id=%s AND fsid=%s', (inode, self.fsid)) row = cur.fetchone() entry = llfuse.EntryAttributes() entry.st_ino = inode entry.generation = 0 entry.entry_timeout = 300 entry.attr_timeout = 300 entry.st_mode = row['mode'] entry.st_nlink = self.get_row("SELECT COUNT(1) FROM contents WHERE inode_id=%s AND fsid=%s", (inode, self.fsid))[0] entry.st_uid = row['uid'] entry.st_gid = row['gid'] entry.st_rdev = row['rdev'] entry.st_size = row['size'] entry.st_blksize = self.blocksize entry.st_blocks = self.get_row("SELECT COUNT(1) FROM body WHERE inode_id=%s AND fsid=%s", (inode, self.fsid))[0] entry.st_atime_ns = row['atime'] entry.st_mtime_ns = row['mtime'] entry.st_ctime_ns = row['ctime'] return entry def readlink(self, inode, ctx): return bytes(self.get_row('SELECT target FROM inodes WHERE inode_id=%s AND fsid=%s', (inode, self.fsid))[0]) def opendir(self, inode, ctx): return inode def readdir(self, inode, off): if off == 0: off = -1 cursor2 = self.db.cursor(cursor_factory=psycopg2.extras.DictCursor) cursor2.execute("SELECT * FROM contents WHERE parent_inode=%s " 'AND rowid > %s AND fsid=%s ORDER BY rowid', (inode, off, self.fsid)) for row in cursor2.fetchall(): yield (bytes(row['name']), self.getattr(row['inode_id']), row['rowid']) def unlink(self, inode_p, name, ctx): entry = self.lookup(inode_p, name, ctx) if stat.S_ISDIR(entry.st_mode): raise llfuse.FUSEError(errno.EISDIR) self._remove(inode_p, name, entry) def rmdir(self, inode_p, name, ctx): entry = self.lookup(inode_p, name, ctx) if not stat.S_ISDIR(entry.st_mode): raise llfuse.FUSEError(errno.ENOTDIR) self._remove(inode_p, name, entry) def _remove(self, inode_p, name, entry): if self.get_row("SELECT COUNT(1) FROM contents WHERE parent_inode=%s AND fsid=%s", (entry.st_ino, self.fsid))[0] > 0: raise llfuse.FUSEError(errno.ENOTEMPTY) self.cursor.execute("DELETE FROM contents WHERE name=%s AND parent_inode=%s AND fsid=%s", (name, inode_p, self.fsid)) if entry.st_nlink == 1 and entry.st_ino not in self.inode_open_count: self.cursor.execute("DELETE FROM inodes WHERE inode_id=%s AND fsid=%s", (entry.st_ino, self.fsid)) def symlink(self, inode_p, name, target, ctx): mode = (stat.S_IFLNK | stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IWGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IWOTH | stat.S_IXOTH) return self._create(inode_p, name, mode, ctx, target=target) def rename(self, inode_p_old, name_old, inode_p_new, name_new, ctx): entry_old = self.lookup(inode_p_old, name_old, ctx) try: entry_new = self.lookup(inode_p_new, name_new, ctx) except llfuse.FUSEError as exc: if exc.errno != errno.ENOENT: raise target_exists = False else: target_exists = True if target_exists: self._replace(inode_p_old, name_old, inode_p_new, name_new, entry_old, entry_new) else: self.cursor.execute("UPDATE contents SET name=%s, parent_inode=%s WHERE name=%s " "AND parent_inode=%s AND fsid=%s", (name_new, inode_p_new, name_old, inode_p_old, self.fsid)) def _replace(self, inode_p_old, name_old, inode_p_new, name_new, entry_old, entry_new): if self.get_row("SELECT COUNT(1) FROM contents WHERE parent_inode=%s AND fsid=%s", (entry_new.st_ino, self.fsid))[0] > 0: raise llfuse.FUSEError(errno.ENOTEMPTY) self.cursor.execute("UPDATE contents SET inode_id=%s WHERE name=%s AND parent_inode=%s AND fsid=%s", (entry_old.st_ino, name_new, inode_p_new, self.fsid)) self.cursor.execute('DELETE FROM contents WHERE name=%s AND parent_inode=%s AND fsid=%s', (name_old, inode_p_old, self.fsid)) if entry_new.st_nlink == 1 and entry_new.st_ino not in self.inode_open_count: self.cursor.execute("DELETE FROM inodes WHERE inode_id=%s AND fsid=%s", (entry_new.st_ino, self.fsid)) def link(self, inode, new_inode_p, new_name, ctx): entry_p = self.getattr(new_inode_p, ctx) if entry_p.st_nlink == 0: raise FUSEError(errno.EINVAL) self.cursor.execute("INSERT INTO contents (name, inode_id, parent_inode, fsid) VALUES(%s,%s,%s,%s)", (new_name, inode, new_inode_p, self.fsid)) return self.getattr(inode, ctx) def del_block(self, inode, num): self.cursor.execute("DELETE FROM body WHERE inode_id=%s AND block_no=%s AND fsid=%s", (inode, num, self.fsid)) def setattr(self, inode, attr, fields, fh, ctx): if fields.update_size: size = self.get_row('SELECT size FROM inodes WHERE inode_id=%s AND fsid=%s', (inode, self.fsid))[0] if size is None: size = 0 blocks = int(math.ceil(size / self.blocksize)) end_len = size % self.blocksize if size < attr.st_size: dif_size = attr.st_size - size if dif_size <= (self.blocksize - end_len) and end_len > 0: pass else: addblocks = int(math.ceil(dif_size / self.blocksize)) for b in range(addblocks): self.write(inode, (blocks + b) * self.blocksize , '\0' * self.blocksize) elif size > attr.st_size: dif_size = size - attr.st_size if dif_size < end_len: self.write(inode, (blocks - 1) * self.blocksize, self.read(inode, (blocks - 1) * self.blocksize , end_len - dif_size) + '\0' * (self.blocksize - (end_len - dif_size))) elif dif_size == end_len and end_len > 0: self.del_block(inode, blocks - 1) elif dif_size > end_len: if end_len > 0: dif_size -= end_len blocks -= 1 self.del_block(inode, blocks) dif_blocks = int(math.ceil(dif_size / self.blocksize)) dif_end_len = dif_size % self.blocksize if dif_end_len > 0: dif_blocks -= 1 for b in range(dif_blocks): blocks -= 1 self.del_block(inode, blocks) if dif_end_len > 0: self.write(inode, (blocks - 1) * self.blocksize, self.read(inode, (blocks - 1) * self.blocksize , self.blocksize - dif_end_len) + '\0' * (self.blocksize - dif_end_len)) if size == attr.st_size: pass else: self.cursor.execute("UPDATE inodes SET size=%s WHERE inode_id=%s AND fsid=%s", (attr.st_size, inode, self.fsid)) if fields.update_mode: self.cursor.execute('UPDATE inodes SET mode=%s WHERE inode_id=%s AND fsid=%s', (attr.st_mode, inode, self.fsid)) if fields.update_uid: self.cursor.execute('UPDATE inodes SET uid=%s WHERE inode_id=%s AND fsid=%s', (attr.st_uid, inode, self.fsid)) if fields.update_gid: self.cursor.execute('UPDATE inodes SET gid=%s WHERE inode_id=%s AND fsid=%s', (attr.st_gid, inode, self.fsid)) if attr.st_rdev is not None: self.cursor.execute('UPDATE inodes SET rdev=%s WHERE inode_id=%s AND fsid=%s', (attr.st_rdev, inode, self.fsid)) if fields.update_atime: self.cursor.execute('UPDATE inodes SET atime=%s WHERE inode_id=%s AND fsid=%s', (attr.st_atime_ns, inode, self.fsid)) if fields.update_mtime: self.cursor.execute('UPDATE inodes SET mtime=%s WHERE inode_id=%s AND fsid=%s', (attr.st_mtime_ns, inode, self.fsid)) if attr.st_ctime_ns is not None: self.cursor.execute('UPDATE inodes SET ctime=%s WHERE inode_id=%s AND fsid=%s', (attr.st_ctime_ns, inode, self.fsid)) return self.getattr(inode, ctx) def mknod(self, inode_p, name, mode, rdev, ctx): return self._create(inode_p, name, mode, ctx, rdev=rdev) def mkdir(self, inode_p, name, mode, ctx): return self._create(inode_p, name, mode, ctx) def statfs(self, ctx): stat_ = llfuse.StatvfsData() stat_.f_bsize = self.blocksize stat_.f_frsize = self.blocksize size = self.get_row('SELECT SUM(size) FROM inodes WHERE fsid=%s', (self.fsid,))[0] stat_.f_blocks = self.fssize // stat_.f_frsize stat_.f_bfree = int(stat_.f_blocks) - (size // stat_.f_frsize) stat_.f_bavail = stat_.f_bfree inodes = self.get_row('SELECT COUNT(1) FROM inodes WHERE fsid=%s', (self.fsid,))[0] stat_.f_files = inodes stat_.f_ffree = max(inodes , 100) stat_.f_favail = stat_.f_ffree return stat_ def open(self, inode, flags, ctx): self.inode_open_count[inode] += 1 return inode def access(self, inode, mode, ctx=None): if mode != os.F_OK and not self.__access(inode, mode, ctx): return False return True def __access(self, inode, flags, ctx): attrs = self.get_row("SELECT mode, uid, gid FROM inodes WHERE inode_id = %s AND fsid=%s", (inode, self.fsid)) uid, gid = (ctx.uid, ctx.gid) o = uid == attrs[1] # access by same user id? g = gid == attrs[2] and not o # access by same group id? # Note: "and not o" added after experimenting with EXT4. w = not (o or g) # anything else m = attrs[0] return (not (flags & os.R_OK) or ((o and (m & 0o0400)) or (g and (m & 0o0040)) or (w and (m & 0o0004)))) \ and (not (flags & os.W_OK) or ((o and (m & 0o0200)) or (g and (m & 0o0020)) or (w and (m & 0o0002)))) \ and (not (flags & os.X_OK) or ((o and (m & 0o0100)) or (g and (m & 0o0010)) or (w and (m & 0o0001)))) def create(self, inode_parent, name, mode, flags, ctx): entry = self._create(inode_parent, name, mode, ctx) self.inode_open_count[entry.st_ino] += 1 return (entry.st_ino, entry) def _create(self, inode_p, name, mode, ctx, rdev=0, target=None): if self.getattr(inode_p, ctx).st_nlink == 0: raise FUSEError(errno.EINVAL) self.cursor.execute('INSERT INTO inodes (uid, gid, mode, mtime, atime, ' 'ctime, target, rdev, fsid) VALUES(%s, %s, %s, %s, %s, %s, %s, %s, %s) RETURNING inode_id;', (ctx.uid, ctx.gid, mode, int(time() * 1e9), int(time() * 1e9), int(time() * 1e9), target, rdev, self.fsid)) inode = self.cursor.fetchone()[0] self.cursor.execute("INSERT INTO contents(name, inode_id, parent_inode, fsid) VALUES(%s,%s,%s,%s);", (name, inode, inode_p, self.fsid)) return self.getattr(inode, ctx) def block_info(self, offset, length): info = {} info['first_block'] = int(offset / self.blocksize) info['start_idx'] = offset % self.blocksize info['last_block'] = int(math.ceil((offset + length) / self.blocksize)) - 1 end = (offset + length) % self.blocksize info['end_idx'] = end > 0 and end or self.blocksize info['blocks'] = info['last_block'] - info['first_block'] + 1 return info def read(self, inode, offset, length): size = self.get_row('SELECT size FROM inodes WHERE inode_id=%s AND fsid=%s', (inode, self.fsid))[0] if offset < size: if offset + length > size: length = size - offset info = self.block_info(offset, length) self.cursor.execute('SELECT data FROM body WHERE inode_id=%s AND block_no>=%s AND block_no<=%s AND fsid=%s ORDER BY block_no ASC', (inode, info['first_block'], info['last_block'], self.fsid)) res = self.cursor.fetchall() if res is None: data = b'' else: d = list() for r in res: d.append(bytes(r[0])) d[-1] = d[-1][:info['end_idx']] d[0] = d[0][info['start_idx']:] data = b''.join(d) else: data = b'' return data def write(self, fh, offset, buf): size = self.get_row('SELECT size FROM inodes WHERE inode_id=%s AND fsid=%s', (fh, self.fsid))[0] length = len(buf) old_info = self.block_info(0, size) write_info = self.block_info(offset, len(buf)) gidx = 0 for b in range(write_info['first_block'], write_info['last_block'] + 1): if b == write_info['first_block']: if b > old_info['last_block']: old_block = b'\0' * self.blocksize else: old_block = bytes(self.get_row('SELECT data FROM body WHERE inode_id=%s AND block_no=%s AND fsid=%s', (fh, b, self.fsid))[0]) if write_info['first_block'] == write_info['last_block']: block = old_block[:write_info['start_idx']] + buf[write_info['start_idx']:write_info['end_idx']] + old_block[write_info['end_idx']:] else: block = old_block[:write_info['start_idx']] + buf[write_info['start_idx']:self.blocksize] gidx += self.blocksize - write_info['start_idx'] elif b == write_info['last_block']: if b > old_info['last_block']: old_block = b'\0' * self.blocksize else: old_block = bytes(self.get_row('SELECT data FROM body WHERE inode_id=%s AND block_no=%s AND fsid=%s', (fh, b, self.fsid))[0]) block = buf[gidx:gidx + write_info['end_idx']] + old_block[write_info['end_idx']:] else: block = buf[gidx:gidx + self.blocksize] gidx += self.blocksize if b > old_info['last_block']: self.cursor.execute("INSERT INTO body (inode_id,block_no,data, fsid)" "VALUES (%s,%s,%s,%s)", (fh, b, psycopg2.Binary(block), self.fsid)) else: self.cursor.execute('UPDATE body SET data=%s WHERE inode_id=%s AND fsid=%s', (psycopg2.Binary(block), fh, self.fsid)) if offset + length > size: self.cursor.execute('UPDATE inodes SET size=%s WHERE inode_id=%s AND fsid=%s', (offset + length, fh, self.fsid)) return length def release(self, fh): self.inode_open_count[fh] -= 1 if self.inode_open_count[fh] == 0: del self.inode_open_count[fh] if self.getattr(fh).st_nlink == 0: self.cursor.execute("DELETE FROM inodes WHERE inode_id=%s AND fsid=%s", (fh, self.fsid)) class NoUniqueValueError(Exception): def __str__(self): return 'Query generated more than 1 result row' class NoSuchRowError(Exception): def __str__(self): return 'Query produced 0 result rows' def usage(): raise SystemExit('''\ Usage: %s "host=dbhost dbname=database user=dbuser password=dbpass" -o mount_options UdavFS mount_option: fsid= [locksize= fssize=[k|m|g|t] ] Option 'user_allow_other' MUST be set in /etc/fuse.conf''' % sys.argv[0]) if __name__ == '__main__': if len(sys.argv) != 5: usage() if sys.argv[3] != '-o': usage() conn_str = sys.argv[1] + ' sslmode=\'require\'' options = {x.split('=')[0].strip(): len(x.split('=')) > 1 and x.split('=')[1].strip() or True for x in sys.argv[4].split(',') } db = psycopg2.connect(conn_str) db.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) cursor = db.cursor() if 'fsname' not in options.keys(): print('fsname MUST be in mountoptions\n') usage() fsname = options['fsname'] del options['fsname'] fsid = sha1(bytes(fsname.strip(), 'UTF-8')).hexdigest() try: cursor.execute('SELECT bs, size FROM fsinfo WHERE fsid=%s', (fsid,)) blocksize, fssize = cursor.fetchone() if options['fssize']: del options['fssize'] if options['blocksize']: del options['blocksize'] except: if 'blocksize' not in options.keys(): print('blocksize mountoption MUST be specified\n') usage() if 'fssize' not in options.keys(): print('fssize mountoption MUST be specified\n') usage() new_bs = int(options['blocksize']) blocksize = new_bs del options['blocksize'] mod1 = {'k': 1024, 'm': 1024 ** 2, 'g': 1024 ** 3, 't': 1024 ** 4} fssize = options['fssize'].lower() del options['fssize'] if fssize[-1].lower() in mod1.keys(): fssize = math.ceil((int(fssize[:-1]) * mod1[fssize[-1]]) / blocksize) * blocksize if fssize < 4194304: raise SystemExit('Filesystem size less than 4MB.\n') db.close() del db m_params = [ 'fsname=udavfs3', 'nonempty', 'default_permissions', 'allow_other' ] m_params.extend(options) mountpoint = os.path.abspath(sys.argv[2]) if not os.path.isdir(mountpoint): raise SystemExit('Mountpoint "%s" not directory or does not exist!' % mountpoint) # Start forking # First fork try: if os.fork() > 0: sys.exit(0) # kill off parent except OSError as e: sys.stderr.write("fork #1 failed: (%d) %s\n" % (e.errno, e.strerror)) sys.exit(1) os.setsid() os.chdir(os.getcwd()) os.umask(0o022) # Second fork try: if os.fork() > 0: os._exit(0) except OSError as e: sys.stderr.write("fork #2 failed: (%d) %s\n" % (e.errno, e.strerror)) os._exit(1) # End forking si = open('/dev/null', 'rb') so = open('/dev/null', 'ab+', 0) se = open('/dev/null', 'ab+', 0) os.dup2(si.fileno(), sys.stdin.fileno()) os.dup2(so.fileno(), sys.stdout.fileno()) os.dup2(se.fileno(), sys.stderr.fileno()) sys.stdout, sys.stderr = so, se operations = Operations(conn_str, fsid, blocksize, fssize) llfuse.init(operations, mountpoint, m_params) try: llfuse.main(workers=None) except: llfuse.close(unmount=False) raise llfuse.close()