udavfs3/udavfs3

608 lines
25 KiB
Plaintext
Raw Normal View History

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Usage: udavfs3 "host=dbhost dbname=database user=dbuser password=dbpass" <mountpoint> [-o mount_options]
# UdavFS mount_option:
# blocksize=<bytes>
# fssize=<size>[k|m|g|t]
# fsid=<uniq_name>
#
# Option 'user_allow_other' MUST be set in /etc/fuse.conf
from __future__ import division, print_function, absolute_import
import os
import sys
import llfuse
import errno
import stat
import math
from time import time
import psycopg2
import psycopg2.extensions
import psycopg2.extras
import logging
from collections import defaultdict
from llfuse import FUSEError
from hashlib import sha1
log = logging.getLogger()
blocksize = 4096
class Operations(llfuse.Operations):
def __init__(self, conn_str, fsid, bs, size):
super(Operations, self).__init__()
self.blocksize = bs
self.fssize = size
self.fsid = fsid
self.db = psycopg2.connect(conn_str)
self.db.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
self.cursor = self.db.cursor()
self.inode_open_count = defaultdict(int)
try:
if self.get_row('SELECT count(1) FROM fsinfo WHERE fsid=%s;', (self.fsid,))[0] == 0:
self.init_fs()
except:
self.init_fs()
def init_tables(self):
'''Initialize file system tables'''
self.cursor.execute("""
CREATE TABLE fsinfo (
id SERIAL PRIMARY KEY,
fsid VARCHAR(40) NOT NULL,
size BIGINT NOT NULL,
bs INT NOT NULL,
UNIQUE (fsid)
)
""")
self.cursor.execute("""
CREATE TABLE inodes (
id BIGSERIAL PRIMARY KEY,
fsid VARCHAR(40) NOT NULL REFERENCES fsinfo (fsid) ON DELETE CASCADE,
inode_id BIGSERIAL,
uid INT NOT NULL,
gid INT NOT NULL,
mode INT NOT NULL,
mtime BIGINT NOT NULL,
atime BIGINT NOT NULL,
ctime BIGINT NOT NULL,
target BYTEA,
size BIGINT NOT NULL DEFAULT 0,
rdev INT NOT NULL DEFAULT 0,
UNIQUE (fsid, inode_id)
)
""")
self.cursor.execute("""
CREATE TABLE body (
inode_id BIGINT NOT NULL,
fsid VARCHAR(40) NOT NULL,
block_no INT NOT NULL DEFAULT 0,
data BYTEA,
PRIMARY KEY ( fsid, inode_id, block_no ),
UNIQUE ( fsid, inode_id, block_no),
FOREIGN KEY (fsid, inode_id) REFERENCES inodes (fsid, inode_id) ON DELETE CASCADE
)
""")
self.cursor.execute("""
CREATE TABLE contents (
rowid BIGSERIAL PRIMARY KEY,
fsid VARCHAR(40) NOT NULL,
name BYTEA NOT NULL,
inode_id BIGINT NOT NULL,
parent_inode BIGINT NOT NULL,
FOREIGN KEY( fsid, inode_id ) REFERENCES inodes( fsid, inode_id ) ON DELETE CASCADE,
FOREIGN KEY( fsid, parent_inode ) REFERENCES inodes( fsid, inode_id ) ON DELETE CASCADE,
UNIQUE (fsid, name, parent_inode)
)""")
def init_fs(self):
if self.get_row('SELECT count(1) FROM information_schema.tables WHERE table_schema=\'public\' AND table_name=\'inodes\'')[0] == 0:
self.init_tables()
if self.get_row('SELECT count(1) FROM fsinfo WHERE fsid=%s;', (self.fsid,))[0] == 0:
self.cursor.execute("INSERT INTO fsinfo (fsid, bs, size) VALUES (%s, %s,%s)",
(self.fsid, self.blocksize, self.fssize))
# Insert root directory
self.cursor.execute("INSERT INTO inodes (fsid,inode_id,mode,uid,gid,mtime,atime,ctime) "
"VALUES (%s,%s,%s,%s,%s,%s,%s,%s)",
(self.fsid, llfuse.ROOT_INODE, stat.S_IFDIR | stat.S_IRUSR | stat.S_IWUSR
| stat.S_IXUSR | stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH
| stat.S_IXOTH, os.getuid(), os.getgid(), int(time() * 1e9),
int(time() * 1e9), int(time() * 1e9)))
self.cursor.execute("SELECT setval('inodes_inode_id_seq', %s);", (llfuse.ROOT_INODE + 1,))
self.cursor.execute("INSERT INTO contents (fsid, name, parent_inode, inode_id) VALUES (%s,%s,%s,%s)",
(self.fsid, b'..', llfuse.ROOT_INODE, llfuse.ROOT_INODE))
def get_row(self, *a, **kw):
self.cursor.execute(*a, **kw)
row = self.cursor.fetchone()
if row is None:
raise NoSuchRowError()
if self.cursor.fetchone() is None:
pass
else:
raise NoUniqueValueError()
return row
def lookup(self, inode_p, name, ctx):
if name == b'.':
inode = inode_p
elif name == b'..':
inode = self.get_row("SELECT parent_inode FROM contents WHERE inode_id=%s AND fsid=%s",
(inode_p, self.fsid))[0]
else:
try:
inode = self.get_row("SELECT inode_id FROM contents WHERE name=%s AND parent_inode=%s AND fsid=%s",
(name, inode_p, self.fsid))[0]
except NoSuchRowError:
raise(llfuse.FUSEError(errno.ENOENT))
return self.getattr(inode, ctx)
def getattr(self, inode, ctx = None):
cur = self.db.cursor(cursor_factory=psycopg2.extras.DictCursor)
cur.execute('SELECT * FROM inodes WHERE inode_id=%s AND fsid=%s', (inode, self.fsid))
row = cur.fetchone()
entry = llfuse.EntryAttributes()
entry.st_ino = inode
entry.generation = 0
entry.entry_timeout = 300
entry.attr_timeout = 300
entry.st_mode = row['mode']
entry.st_nlink = self.get_row("SELECT COUNT(1) FROM contents WHERE inode_id=%s AND fsid=%s",
(inode, self.fsid))[0]
entry.st_uid = row['uid']
entry.st_gid = row['gid']
entry.st_rdev = row['rdev']
entry.st_size = row['size']
entry.st_blksize = self.blocksize
entry.st_blocks = self.get_row("SELECT COUNT(1) FROM body WHERE inode_id=%s AND fsid=%s",
(inode, self.fsid))[0]
entry.st_atime_ns = row['atime']
entry.st_mtime_ns = row['mtime']
entry.st_ctime_ns = row['ctime']
return entry
def readlink(self, inode, ctx):
return bytes(self.get_row('SELECT target FROM inodes WHERE inode_id=%s AND fsid=%s', (inode, self.fsid))[0])
def opendir(self, inode, ctx):
return inode
def readdir(self, inode, off):
if off == 0:
off = -1
cursor2 = self.db.cursor(cursor_factory=psycopg2.extras.DictCursor)
cursor2.execute("SELECT * FROM contents WHERE parent_inode=%s "
'AND rowid > %s AND fsid=%s ORDER BY rowid', (inode, off, self.fsid))
for row in cursor2.fetchall():
yield (bytes(row['name']), self.getattr(row['inode_id']), row['rowid'])
def unlink(self, inode_p, name, ctx):
entry = self.lookup(inode_p, name, ctx)
if stat.S_ISDIR(entry.st_mode):
raise llfuse.FUSEError(errno.EISDIR)
self._remove(inode_p, name, entry)
def rmdir(self, inode_p, name, ctx):
entry = self.lookup(inode_p, name, ctx)
if not stat.S_ISDIR(entry.st_mode):
raise llfuse.FUSEError(errno.ENOTDIR)
self._remove(inode_p, name, entry)
def _remove(self, inode_p, name, entry):
if self.get_row("SELECT COUNT(1) FROM contents WHERE parent_inode=%s AND fsid=%s",
(entry.st_ino, self.fsid))[0] > 0:
raise llfuse.FUSEError(errno.ENOTEMPTY)
self.cursor.execute("DELETE FROM contents WHERE name=%s AND parent_inode=%s AND fsid=%s",
(name, inode_p, self.fsid))
if entry.st_nlink == 1 and entry.st_ino not in self.inode_open_count:
self.cursor.execute("DELETE FROM inodes WHERE inode_id=%s AND fsid=%s", (entry.st_ino, self.fsid))
def symlink(self, inode_p, name, target, ctx):
mode = (stat.S_IFLNK | stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR |
stat.S_IRGRP | stat.S_IWGRP | stat.S_IXGRP |
stat.S_IROTH | stat.S_IWOTH | stat.S_IXOTH)
return self._create(inode_p, name, mode, ctx, target=target)
def rename(self, inode_p_old, name_old, inode_p_new, name_new, ctx):
entry_old = self.lookup(inode_p_old, name_old, ctx)
try:
entry_new = self.lookup(inode_p_new, name_new, ctx)
except llfuse.FUSEError as exc:
if exc.errno != errno.ENOENT:
raise
target_exists = False
else:
target_exists = True
if target_exists:
self._replace(inode_p_old, name_old, inode_p_new, name_new,
entry_old, entry_new)
else:
self.cursor.execute("UPDATE contents SET name=%s, parent_inode=%s WHERE name=%s "
"AND parent_inode=%s AND fsid=%s", (name_new, inode_p_new,
name_old, inode_p_old, self.fsid))
def _replace(self, inode_p_old, name_old, inode_p_new, name_new,
entry_old, entry_new):
if self.get_row("SELECT COUNT(1) FROM contents WHERE parent_inode=%s AND fsid=%s",
(entry_new.st_ino, self.fsid))[0] > 0:
raise llfuse.FUSEError(errno.ENOTEMPTY)
self.cursor.execute("UPDATE contents SET inode_id=%s WHERE name=%s AND parent_inode=%s AND fsid=%s",
(entry_old.st_ino, name_new, inode_p_new, self.fsid))
self.cursor.execute('DELETE FROM contents WHERE name=%s AND parent_inode=%s AND fsid=%s',
(name_old, inode_p_old, self.fsid))
if entry_new.st_nlink == 1 and entry_new.st_ino not in self.inode_open_count:
self.cursor.execute("DELETE FROM inodes WHERE inode_id=%s AND fsid=%s", (entry_new.st_ino, self.fsid))
def link(self, inode, new_inode_p, new_name, ctx):
entry_p = self.getattr(new_inode_p, ctx)
if entry_p.st_nlink == 0:
raise FUSEError(errno.EINVAL)
self.cursor.execute("INSERT INTO contents (name, inode_id, parent_inode, fsid) VALUES(%s,%s,%s,%s)",
(new_name, inode, new_inode_p, self.fsid))
return self.getattr(inode, ctx)
def del_block(self, inode, num):
self.cursor.execute("DELETE FROM body WHERE inode_id=%s AND block_no=%s AND fsid=%s", (inode, num, self.fsid))
def setattr(self, inode, attr, fields, fh, ctx):
if fields.update_size:
size = self.get_row('SELECT size FROM inodes WHERE inode_id=%s AND fsid=%s', (inode, self.fsid))[0]
if size is None:
size = 0
blocks = int(math.ceil(size / self.blocksize))
end_len = size % self.blocksize
if size < attr.st_size:
dif_size = attr.st_size - size
if dif_size <= (self.blocksize - end_len) and end_len > 0:
pass
else:
addblocks = int(math.ceil(dif_size / self.blocksize))
for b in range(addblocks):
self.write(inode, (blocks + b) * self.blocksize , '\0' * self.blocksize)
elif size > attr.st_size:
dif_size = size - attr.st_size
if dif_size < end_len:
self.write(inode, (blocks - 1) * self.blocksize,
self.read(inode, (blocks - 1) * self.blocksize ,
end_len - dif_size) + '\0' * (self.blocksize - (end_len - dif_size)))
elif dif_size == end_len and end_len > 0:
self.del_block(inode, blocks - 1)
elif dif_size > end_len:
if end_len > 0:
dif_size -= end_len
blocks -= 1
self.del_block(inode, blocks)
dif_blocks = int(math.ceil(dif_size / self.blocksize))
dif_end_len = dif_size % self.blocksize
if dif_end_len > 0:
dif_blocks -= 1
for b in range(dif_blocks):
blocks -= 1
self.del_block(inode, blocks)
if dif_end_len > 0:
self.write(inode, (blocks - 1) * self.blocksize,
self.read(inode, (blocks - 1) * self.blocksize ,
self.blocksize - dif_end_len) + '\0' * (self.blocksize - dif_end_len))
if size == attr.st_size:
pass
else:
self.cursor.execute("UPDATE inodes SET size=%s WHERE inode_id=%s AND fsid=%s",
(attr.st_size, inode, self.fsid))
if fields.update_mode:
self.cursor.execute('UPDATE inodes SET mode=%s WHERE inode_id=%s AND fsid=%s',
(attr.st_mode, inode, self.fsid))
if fields.update_uid:
self.cursor.execute('UPDATE inodes SET uid=%s WHERE inode_id=%s AND fsid=%s',
(attr.st_uid, inode, self.fsid))
if fields.update_gid:
self.cursor.execute('UPDATE inodes SET gid=%s WHERE inode_id=%s AND fsid=%s',
(attr.st_gid, inode, self.fsid))
if attr.st_rdev is not None:
self.cursor.execute('UPDATE inodes SET rdev=%s WHERE inode_id=%s AND fsid=%s',
(attr.st_rdev, inode, self.fsid))
if fields.update_atime:
self.cursor.execute('UPDATE inodes SET atime=%s WHERE inode_id=%s AND fsid=%s',
(attr.st_atime_ns, inode, self.fsid))
if fields.update_mtime:
self.cursor.execute('UPDATE inodes SET mtime=%s WHERE inode_id=%s AND fsid=%s',
(attr.st_mtime_ns, inode, self.fsid))
if attr.st_ctime_ns is not None:
self.cursor.execute('UPDATE inodes SET ctime=%s WHERE inode_id=%s AND fsid=%s',
(attr.st_ctime_ns, inode, self.fsid))
return self.getattr(inode, ctx)
def mknod(self, inode_p, name, mode, rdev, ctx):
return self._create(inode_p, name, mode, ctx, rdev=rdev)
def mkdir(self, inode_p, name, mode, ctx):
return self._create(inode_p, name, mode, ctx)
def statfs(self, ctx):
stat_ = llfuse.StatvfsData()
stat_.f_bsize = self.blocksize
stat_.f_frsize = self.blocksize
size = self.get_row('SELECT SUM(size) FROM inodes WHERE fsid=%s', (self.fsid,))[0]
stat_.f_blocks = self.fssize // stat_.f_frsize
stat_.f_bfree = int(stat_.f_blocks) - (size // stat_.f_frsize)
stat_.f_bavail = stat_.f_bfree
inodes = self.get_row('SELECT COUNT(1) FROM inodes WHERE fsid=%s', (self.fsid,))[0]
stat_.f_files = inodes
stat_.f_ffree = max(inodes , 100)
stat_.f_favail = stat_.f_ffree
return stat_
def open(self, inode, flags, ctx):
self.inode_open_count[inode] += 1
return inode
def access(self, inode, mode, ctx=None):
if mode != os.F_OK and not self.__access(inode, mode, ctx):
return False
return True
def __access(self, inode, flags, ctx):
attrs = self.get_row("SELECT mode, uid, gid FROM inodes WHERE inode_id = %s AND fsid=%s",
(inode, self.fsid))
uid, gid = (ctx.uid, ctx.gid)
o = uid == attrs[1] # access by same user id?
g = gid == attrs[2] and not o # access by same group id?
# Note: "and not o" added after experimenting with EXT4.
w = not (o or g) # anything else
m = attrs[0]
return (not (flags & os.R_OK) or ((o and (m & 0o0400)) or (g and (m & 0o0040)) or (w and (m & 0o0004)))) \
and (not (flags & os.W_OK) or ((o and (m & 0o0200)) or (g and (m & 0o0020)) or (w and (m & 0o0002)))) \
and (not (flags & os.X_OK) or ((o and (m & 0o0100)) or (g and (m & 0o0010)) or (w and (m & 0o0001))))
def create(self, inode_parent, name, mode, flags, ctx):
entry = self._create(inode_parent, name, mode, ctx)
self.inode_open_count[entry.st_ino] += 1
return (entry.st_ino, entry)
def _create(self, inode_p, name, mode, ctx, rdev=0, target=None):
if self.getattr(inode_p, ctx).st_nlink == 0:
raise FUSEError(errno.EINVAL)
self.cursor.execute('INSERT INTO inodes (uid, gid, mode, mtime, atime, '
'ctime, target, rdev, fsid) VALUES(%s, %s, %s, %s, %s, %s, %s, %s, %s) RETURNING inode_id;',
(ctx.uid, ctx.gid, mode, int(time() * 1e9), int(time() * 1e9), int(time() * 1e9), target, rdev, self.fsid))
inode = self.cursor.fetchone()[0]
self.cursor.execute("INSERT INTO contents(name, inode_id, parent_inode, fsid) VALUES(%s,%s,%s,%s);",
(name, inode, inode_p, self.fsid))
return self.getattr(inode, ctx)
def block_info(self, offset, length):
info = {}
info['first_block'] = int(offset / self.blocksize)
info['start_idx'] = offset % self.blocksize
info['last_block'] = int(math.ceil((offset + length) / self.blocksize)) - 1
end = (offset + length) % self.blocksize
info['end_idx'] = end > 0 and end or self.blocksize
info['blocks'] = info['last_block'] - info['first_block'] + 1
return info
def read(self, inode, offset, length):
size = self.get_row('SELECT size FROM inodes WHERE inode_id=%s AND fsid=%s', (inode, self.fsid))[0]
if offset < size:
if offset + length > size:
length = size - offset
info = self.block_info(offset, length)
self.cursor.execute('SELECT data FROM body WHERE inode_id=%s AND block_no>=%s AND block_no<=%s AND fsid=%s ORDER BY block_no ASC',
(inode, info['first_block'], info['last_block'], self.fsid))
res = self.cursor.fetchall()
if res is None:
data = b''
else:
d = list()
for r in res:
d.append(bytes(r[0]))
d[-1] = d[-1][:info['end_idx']]
d[0] = d[0][info['start_idx']:]
data = b''.join(d)
else:
data = b''
return data
def write(self, fh, offset, buf):
size = self.get_row('SELECT size FROM inodes WHERE inode_id=%s AND fsid=%s', (fh, self.fsid))[0]
length = len(buf)
old_info = self.block_info(0, size)
write_info = self.block_info(offset, len(buf))
gidx = 0
for b in range(write_info['first_block'], write_info['last_block'] + 1):
if b == write_info['first_block']:
if b > old_info['last_block']:
old_block = b'\0' * self.blocksize
else:
old_block = bytes(self.get_row('SELECT data FROM body WHERE inode_id=%s AND block_no=%s AND fsid=%s',
(fh, b, self.fsid))[0])
if write_info['first_block'] == write_info['last_block']:
block = old_block[:write_info['start_idx']] + buf[write_info['start_idx']:write_info['end_idx']] + old_block[write_info['end_idx']:]
else:
block = old_block[:write_info['start_idx']] + buf[write_info['start_idx']:self.blocksize]
gidx += self.blocksize - write_info['start_idx']
elif b == write_info['last_block']:
if b > old_info['last_block']:
old_block = b'\0' * self.blocksize
else:
old_block = bytes(self.get_row('SELECT data FROM body WHERE inode_id=%s AND block_no=%s AND fsid=%s',
(fh, b, self.fsid))[0])
block = buf[gidx:gidx + write_info['end_idx']] + old_block[write_info['end_idx']:]
else:
block = buf[gidx:gidx + self.blocksize]
gidx += self.blocksize
if b > old_info['last_block']:
self.cursor.execute("INSERT INTO body (inode_id,block_no,data, fsid)"
"VALUES (%s,%s,%s,%s)", (fh, b, psycopg2.Binary(block), self.fsid))
else:
self.cursor.execute('UPDATE body SET data=%s WHERE inode_id=%s AND fsid=%s',
(psycopg2.Binary(block), fh, self.fsid))
if offset + length > size:
self.cursor.execute('UPDATE inodes SET size=%s WHERE inode_id=%s AND fsid=%s',
(offset + length, fh, self.fsid))
return length
def release(self, fh):
self.inode_open_count[fh] -= 1
if self.inode_open_count[fh] == 0:
del self.inode_open_count[fh]
if self.getattr(fh).st_nlink == 0:
self.cursor.execute("DELETE FROM inodes WHERE inode_id=%s AND fsid=%s", (fh, self.fsid))
class NoUniqueValueError(Exception):
def __str__(self):
return 'Query generated more than 1 result row'
class NoSuchRowError(Exception):
def __str__(self):
return 'Query produced 0 result rows'
def usage():
raise SystemExit('''\
Usage: %s "host=dbhost dbname=database user=dbuser password=dbpass" <mountpoint> -o mount_options
UdavFS mount_option:
fsid=<uniq_string>
[locksize=<bytes>
fssize=<size>[k|m|g|t] ]
Option 'user_allow_other' MUST be set in /etc/fuse.conf''' % sys.argv[0])
if __name__ == '__main__':
if len(sys.argv) != 5:
usage()
if sys.argv[3] != '-o':
usage()
conn_str = sys.argv[1] + ' sslmode=\'require\''
options = {x.split('=')[0].strip(): len(x.split('=')) > 1 and x.split('=')[1].strip() or True for x in sys.argv[4].split(',') }
db = psycopg2.connect(conn_str)
db.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cursor = db.cursor()
if 'fsname' not in options.keys():
print('fsname MUST be in mountoptions\n')
usage()
fsname = options['fsname']
del options['fsname']
fsid = sha1(bytes(fsname.strip(), 'UTF-8')).hexdigest()
try:
cursor.execute('SELECT bs, size FROM fsinfo WHERE fsid=%s', (fsid,))
blocksize, fssize = cursor.fetchone()
if options['fssize']: del options['fssize']
if options['blocksize']: del options['blocksize']
except:
if 'blocksize' not in options.keys():
print('blocksize mountoption MUST be specified\n')
usage()
if 'fssize' not in options.keys():
print('fssize mountoption MUST be specified\n')
usage()
new_bs = int(options['blocksize'])
blocksize = new_bs
del options['blocksize']
mod1 = {'k': 1024, 'm': 1024 ** 2, 'g': 1024 ** 3, 't': 1024 ** 4}
fssize = options['fssize'].lower()
del options['fssize']
if fssize[-1].lower() in mod1.keys():
fssize = math.ceil((int(fssize[:-1]) * mod1[fssize[-1]]) / blocksize) * blocksize
if fssize < 4194304:
raise SystemExit('Filesystem size less than 4MB.\n')
db.close()
del db
m_params = [ 'fsname=udavfs3', 'nonempty', 'default_permissions', 'allow_other' ]
m_params.extend(options)
mountpoint = os.path.abspath(sys.argv[2])
if not os.path.isdir(mountpoint):
raise SystemExit('Mountpoint "%s" not directory or does not exist!' % mountpoint)
# Start forking
# First fork
try:
if os.fork() > 0:
sys.exit(0) # kill off parent
except OSError as e:
sys.stderr.write("fork #1 failed: (%d) %s\n" % (e.errno, e.strerror))
sys.exit(1)
os.setsid()
os.chdir(os.getcwd())
os.umask(0o022)
# Second fork
try:
if os.fork() > 0:
os._exit(0)
except OSError as e:
sys.stderr.write("fork #2 failed: (%d) %s\n" % (e.errno, e.strerror))
os._exit(1)
# End forking
si = open('/dev/null', 'rb')
so = open('/dev/null', 'ab+', 0)
se = open('/dev/null', 'ab+', 0)
os.dup2(si.fileno(), sys.stdin.fileno())
os.dup2(so.fileno(), sys.stdout.fileno())
os.dup2(se.fileno(), sys.stderr.fileno())
sys.stdout, sys.stderr = so, se
operations = Operations(conn_str, fsid, blocksize, fssize)
llfuse.init(operations, mountpoint, m_params)
try:
llfuse.main(workers=None)
except:
llfuse.close(unmount=False)
raise
llfuse.close()