gorrent/engine.go

303 lines
7.3 KiB
Go

package gorrent
import (
"errors"
"fmt"
"github.com/anacrolix/torrent"
"golang.org/x/net/context"
"net/http"
"net/url"
"os"
"path"
"strings"
"sync"
"time"
)
type TorrentStatus struct {
DownloadRate int
UploadRate int
Seeds int
}
type FileStatus struct {
Name string
Url string
Progress int
Length int
}
type Engine struct {
fileIdx int
settings *Settings
torrCfg *torrent.ClientConfig
client *torrent.Client
torrent *torrent.Torrent
alive bool
srv *http.Server
srvWg sync.WaitGroup
ctx context.Context
shutdown context.CancelFunc
msgs chan string
status struct {
lastBytesReadData int
lastBytesWrittenData int
downRate int
upRate int
seeds int
}
}
func NewEngine(settings *Settings) *Engine {
e := &Engine{
settings: settings,
msgs: make(chan string, 50),
}
e.ctx, e.shutdown = context.WithCancel(context.Background())
return e
}
func (o *Engine) IsAlive() bool {
return o.alive
}
func (o *Engine) StartTorrent(idx int) error {
o.fileIdx = idx
var err error
o.torrCfg = torrent.NewDefaultClientConfig()
o.torrCfg.ListenPort = o.settings.ListenPort
o.torrCfg.DataDir = o.settings.DownloadPath
if o.settings.Proxy != "" {
if u, err := url.Parse(o.settings.Proxy); err != nil {
o.torrCfg.HTTPProxy = func(request *http.Request) (*url.URL, error) { return u, nil }
}
}
o.client, err = torrent.NewClient(o.torrCfg)
if err != nil {
return fmt.Errorf(`create torrent client failed: %s`, err.Error())
}
o.debug(`got new client for torrent file: %s`, o.settings.TorrentPath)
if o.torrent, err = o.client.AddTorrentFromFile(o.settings.TorrentPath); err != nil {
return fmt.Errorf(`add torrent file failed: %s`, err.Error())
}
o.debug(`added torrent file: %s`, o.settings.TorrentPath)
o.torrent.SetMaxEstablishedConns(o.settings.MaxConnections)
o.debug(`set max connections: %d`, o.settings.MaxConnections)
o.alive = true
go func() {
t := o.torrent
<-t.GotInfo()
file := o.torrent.Files()[idx]
firstPieceIndex := file.Offset() * int64(t.NumPieces()) / t.Length()
endPieceIndex := (file.Offset() + file.Length()) * int64(t.NumPieces()) / t.Length()
o.torrent.DownloadPieces(int(firstPieceIndex), int(endPieceIndex))
for i := firstPieceIndex; i <= endPieceIndex*5/100; i++ {
t.Piece(int(i)).SetPriority(torrent.PiecePriorityNow)
}
go func() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-o.ctx.Done():
return
case <-ticker.C:
o.updateStats()
o.debug(`update stats finished. down: %d KB/s up: %d KB/s seeds: %d`, o.status.downRate, o.status.upRate, o.status.seeds)
}
}
}()
o.srvWg.Add(1)
go func() {
defer o.srvWg.Done()
router := http.NewServeMux()
router.HandleFunc("/", o.GetFileHandler(file))
o.srv = &http.Server{Addr: fmt.Sprintf(`%s:%d`, o.settings.HttpBindHost, o.settings.HttpBindPort), Handler: router}
o.info(`starting http server on port: %d`, o.settings.HttpBindPort)
err = o.srv.ListenAndServe()
if err != nil && !errors.Is(err, http.ErrServerClosed) {
o.error(`http listener exit with error: %s`, err.Error())
}
o.info(`http server stopped successfully`)
}()
}()
return nil
}
func (o *Engine) updateStats() {
stats := o.torrent.Stats()
if o.status.lastBytesReadData == 0 {
o.status.downRate = 0
} else {
o.status.downRate = (int(stats.BytesReadData.Int64()) - o.status.lastBytesReadData) / 1024
}
o.status.lastBytesReadData = int(stats.BytesReadData.Int64())
if o.status.lastBytesWrittenData == 0 {
o.status.upRate = 0
} else {
o.status.upRate = (int(stats.BytesWrittenData.Int64()) - o.status.lastBytesWrittenData) / 1024
}
o.status.lastBytesWrittenData = int(stats.BytesWrittenData.Int64())
o.status.seeds = stats.ConnectedSeeders
}
func (o *Engine) Status() TorrentStatus {
res := TorrentStatus{}
res.DownloadRate = o.status.downRate
res.UploadRate = o.status.upRate
res.Seeds = o.status.seeds
return res
}
func (o *Engine) FileStatus(i int) (FileStatus, error) {
fs := FileStatus{}
if i < 0 || i > len(o.torrent.Files())-1 {
return fs, fmt.Errorf(`file index out of range: %d`, i)
}
file := o.torrent.Files()[i]
pathParts := strings.Split(file.DisplayPath(), `/`)
fs.Name = file.DisplayPath()
for idx := range pathParts {
pathParts[idx] = url.QueryEscape(pathParts[idx])
}
fs.Url = fmt.Sprintf(`http://%s:%d/files/%s`, o.settings.HttpBindHost, o.settings.HttpBindPort, strings.Join(pathParts, `/`))
fs.Progress = int(float64(file.BytesCompleted()) / float64(file.Length()) * 100.0)
fs.Length = int(file.Length())
return fs, nil
}
func (o *Engine) Stop() {
go func() {
httpCtx, cancelHttp := context.WithTimeout(o.ctx, time.Second*5)
defer cancelHttp()
if err := o.srv.Shutdown(httpCtx); err != nil {
o.error(`shutting down http server failed: %s`, err.Error())
}
o.srvWg.Wait()
o.syncDebug(`dropping torrent`)
o.torrent.Drop()
o.syncDebug(`closing client`)
o.client.Close()
<-o.client.Closed()
if !o.settings.KeepFiles {
o.Clean()
}
o.syncDebug(`torrent engine stopped`)
close(o.msgs)
o.alive = false
}()
return
}
func (o *Engine) GetMsg() string {
select {
case msg, ok := <-o.msgs:
if ok {
return msg
} else {
return "__CLOSED__"
}
default:
}
return "__NO_MSG__"
}
func (o *Engine) debug(tmpl string, args ...interface{}) {
go o.syncDebug(tmpl, args...)
}
func (o *Engine) info(tmpl string, args ...interface{}) {
go o.syncInfo(tmpl, args...)
}
func (o *Engine) error(tmpl string, args ...interface{}) {
go o.syncError(tmpl, args...)
}
func (o *Engine) syncError(t string, a ...interface{}) {
defer func() {
if rec := recover(); rec != nil {
fmt.Printf("recovered from panic: %v\n", rec)
}
}()
var result string
if len(a) > 0 {
result = fmt.Sprintf(t, a...)
} else {
result = t
}
o.msgs <- fmt.Sprintf(`[ERROR] %s`, result)
}
func (o *Engine) syncDebug(t string, a ...interface{}) {
if o.settings.Debug {
defer func() {
if rec := recover(); rec != nil {
fmt.Printf("recovered from panic: %v\n", rec)
}
}()
var result string
if len(a) > 0 {
result = fmt.Sprintf(t, a...)
} else {
result = t
}
o.msgs <- fmt.Sprintf(`[DEBUG] %s`, result)
}
}
func (o *Engine) syncInfo(t string, a ...interface{}) {
defer func() {
if rec := recover(); rec != nil {
fmt.Printf("recovered from panic: %v\n", rec)
}
}()
var result string
if len(a) > 0 {
result = fmt.Sprintf(t, a...)
} else {
result = t
}
o.msgs <- fmt.Sprintf(`[INFO] %s`, result)
}
func (o *Engine) GetFileHandler(target *torrent.File) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
entry := target.NewReader()
entry.SetReadahead(target.Length() / 100)
entry.SetResponsive()
defer func() {
if err := entry.Close(); err != nil {
o.error(`close torrent file in file handler failed: %s`, err.Error())
}
}()
pathParts := strings.Split(target.DisplayPath(), `/`)
w.Header().Set("Content-Disposition", "attachment; filename=\""+pathParts[len(pathParts)-1]+"\"")
http.ServeContent(w, r, target.DisplayPath(), time.Now(), entry)
}
}
func (o *Engine) Clean() {
fp := path.Join(o.settings.DownloadPath, o.torrent.Name())
if err := os.RemoveAll(fp); err != nil {
o.syncError(`remove downloaded data failed: %s`, err.Error())
}
}