gorrent/engine.go

304 lines
7.5 KiB
Go

package gorrent
import (
"errors"
"fmt"
"github.com/anacrolix/torrent"
"github.com/anacrolix/torrent/storage"
"golang.org/x/net/context"
"net/http"
"net/url"
"os"
"path"
"strconv"
"strings"
"sync"
"time"
)
const refreshSec = 3
type TorrentStatus struct {
DownloadRate int64
UploadRate int64
Seeds int64
}
type FileStatus struct {
Name string
Url string
Progress int64
Length int64
}
type Engine struct {
fileIdx int64
settings *Settings
torrCfg *torrent.ClientConfig
client *torrent.Client
torrent *torrent.Torrent
alive bool
srv *http.Server
srvWg sync.WaitGroup
ctx context.Context
shutdown context.CancelFunc
msgs chan string
status struct {
lastBytesReadData int64
lastBytesWrittenData int64
downRate int64
upRate int64
seeds int64
}
fs FileStatus
}
func (o *Engine) IsAlive() bool {
return o.alive
}
func (o *Engine) StartTorrent(idx int64) error {
o.fileIdx = idx
var err error
o.torrCfg = torrent.NewDefaultClientConfig()
o.torrCfg.ListenPort = o.settings.ListenPort
o.torrCfg.DataDir = o.settings.DownloadPath
o.torrCfg.Seed = o.settings.Seed
o.torrCfg.AcceptPeerConnections = o.settings.AcceptPeerConnections
o.torrCfg.DefaultStorage = storage.NewFileWithCompletion(o.settings.DownloadPath, storage.NewMapPieceCompletion())
if o.settings.Proxy != "" {
if u, err := url.Parse(o.settings.Proxy); err != nil {
o.torrCfg.HTTPProxy = func(request *http.Request) (*url.URL, error) { return u, nil }
}
}
o.client, err = torrent.NewClient(o.torrCfg)
if err != nil {
return fmt.Errorf(`create torrent client failed: %s`, err.Error())
}
o.debug(`got new client for torrent file: %s`, o.settings.TorrentPath)
if o.torrent, err = o.client.AddTorrentFromFile(o.settings.TorrentPath); err != nil {
o.error(`add torrent file failed: %s`, err.Error())
}
o.debug(`added torrent file: %s`, o.settings.TorrentPath)
o.torrent.SetMaxEstablishedConns(o.settings.MaxConnections)
o.debug(`set max connections: %d`, o.settings.MaxConnections)
o.alive = true
go func() {
t := o.torrent
<-t.GotInfo()
o.setStaticFileStatusData(idx)
file := o.torrent.Files()[idx]
// отключаем загрузку всех файлов
o.torrent.CancelPieces(0, o.torrent.NumPieces()-1)
//
firstPieceIndex := file.Offset() * int64(t.NumPieces()) / t.Length()
endPieceIndex := (file.Offset() + file.Length()) * int64(t.NumPieces()) / t.Length()
o.torrent.DownloadPieces(int(firstPieceIndex), int(endPieceIndex))
go func() {
ticker := time.NewTicker(time.Second * refreshSec)
defer ticker.Stop()
for {
select {
case <-o.ctx.Done():
return
case <-ticker.C:
o.updateStats()
o.debug(`update stats finished. down: %d KB/s up: %d KB/s seeds: %d`, o.status.downRate, o.status.upRate, o.status.seeds)
}
}
}()
o.srvWg.Add(1)
go func() {
defer o.srvWg.Done()
router := http.NewServeMux()
router.HandleFunc("/", o.GetFileHandler(file))
o.srv = &http.Server{Addr: fmt.Sprintf(`%s:%d`, o.settings.HttpBindHost, o.settings.HttpBindPort), Handler: router}
o.info(`starting http server on port: %d`, o.settings.HttpBindPort)
err = o.srv.ListenAndServe()
if err != nil && !errors.Is(err, http.ErrServerClosed) {
o.error(`http listener exit with error: %s`, err.Error())
}
o.info(`http server stopped successfully`)
}()
}()
return nil
}
func (o *Engine) setStaticFileStatusData(i int64) {
o.fs = FileStatus{}
file := o.torrent.Files()[i]
pathParts := strings.Split(file.DisplayPath(), `/`)
o.fs.Name = file.DisplayPath()
for idx := range pathParts {
pathParts[idx] = url.QueryEscape(pathParts[idx])
}
o.fs.Url = fmt.Sprintf(`http://%s:%d/files/%s`, o.settings.HttpBindHost, o.settings.HttpBindPort, strings.Join(pathParts, `/`))
o.fs.Length = file.Length()
if o.fs.Length < 0 {
o.fs.Length *= -1
}
}
func (o *Engine) updateStats() {
stats := o.torrent.Stats()
if o.status.lastBytesReadData == 0 {
o.status.downRate = 0
} else {
o.status.downRate = (stats.BytesReadData.Int64() - o.status.lastBytesReadData) / 1024 / refreshSec
}
o.status.lastBytesReadData = stats.BytesReadData.Int64()
if o.status.lastBytesWrittenData == 0 {
o.status.upRate = 0
} else {
o.status.upRate = (stats.BytesWrittenData.Int64() - o.status.lastBytesWrittenData) / 1024 / refreshSec
}
o.status.lastBytesWrittenData = stats.BytesWrittenData.Int64()
o.status.seeds = int64(stats.ConnectedSeeders)
}
func (o *Engine) Status() TorrentStatus {
res := TorrentStatus{}
res.DownloadRate = o.status.downRate
res.UploadRate = o.status.upRate
res.Seeds = o.status.seeds
return res
}
func (o *Engine) FileStatus(i int) (FileStatus, error) {
if i < 0 || i > len(o.torrent.Files())-1 {
return o.fs, fmt.Errorf(`file index out of range: %d`, i)
}
file := o.torrent.Files()[i]
o.fs.Progress = int64(float64(file.BytesCompleted()) / float64(file.Length()) * 100.0)
if o.fs.Progress < 0 {
o.fs.Progress *= -1
}
return o.fs, nil
}
func (o *Engine) Stop() {
go func() {
httpCtx, cancelHttp := context.WithTimeout(o.ctx, time.Second*5)
defer cancelHttp()
if err := o.srv.Shutdown(httpCtx); err != nil {
o.error(`shutting down http server failed: %s`, err.Error())
}
o.srvWg.Wait()
o.syncDebug(`dropping torrent`)
o.torrent.Drop()
o.syncDebug(`closing client`)
o.client.Close()
<-o.client.Closed()
if !o.settings.KeepFiles {
o.Clean()
}
o.syncDebug(`torrent engine stopped`)
o.shutdown()
close(o.msgs)
o.alive = false
}()
return
}
func (o *Engine) GetMsg() string {
select {
case msg, ok := <-o.msgs:
if ok {
return msg
} else {
return "__CLOSED__"
}
default:
}
return "__NO_MSG__"
}
func (o *Engine) debug(tmpl string, args ...interface{}) {
go o.syncDebug(tmpl, args...)
}
func (o *Engine) info(tmpl string, args ...interface{}) {
go o.syncInfo(tmpl, args...)
}
func (o *Engine) error(tmpl string, args ...interface{}) {
go o.syncError(tmpl, args...)
}
func (o *Engine) syncError(t string, a ...interface{}) {
defer func() {
if rec := recover(); rec != nil {
fmt.Printf("recovered from panic: %v\n", rec)
}
}()
var result string
if len(a) > 0 {
result = fmt.Sprintf(t, a...)
} else {
result = t
}
o.msgs <- fmt.Sprintf(`[ERROR] %s`, result)
}
func (o *Engine) syncDebug(t string, a ...interface{}) {
if o.settings.Debug {
defer func() {
if rec := recover(); rec != nil {
fmt.Printf("recovered from panic: %v\n", rec)
}
}()
var result string
if len(a) > 0 {
result = fmt.Sprintf(t, a...)
} else {
result = t
}
o.msgs <- fmt.Sprintf(`[DEBUG] %s`, result)
}
}
func (o *Engine) syncInfo(t string, a ...interface{}) {
defer func() {
if rec := recover(); rec != nil {
fmt.Printf("recovered from panic: %v\n", rec)
}
}()
var result string
if len(a) > 0 {
result = fmt.Sprintf(t, a...)
} else {
result = t
}
o.msgs <- fmt.Sprintf(`[INFO] %s`, result)
}
func (o *Engine) GetFileHandler(target *torrent.File) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
entry := target.NewReader()
defer func() {
if err := entry.Close(); err != nil {
o.error(`close torrent file in file handler failed: %s`, err.Error())
}
}()
w.Header().Set("Content-Disposition", "attachment; filename="+strconv.Quote(target.DisplayPath()))
http.ServeContent(w, r, target.DisplayPath(), time.Now(), entry)
}
}
func (o *Engine) Clean() {
fp := path.Join(o.settings.DownloadPath, o.torrent.Name())
if err := os.RemoveAll(fp); err != nil {
o.syncError(`remove downloaded data failed: %s`, err.Error())
}
}