package gorrent import ( "errors" "fmt" "github.com/anacrolix/torrent" "github.com/anacrolix/torrent/storage" "golang.org/x/net/context" "net/http" "net/url" "os" "path" "strings" "sync" "time" ) const boltDbDir = `piece_complete_db` type TorrentStatus struct { DownloadRate int64 UploadRate int64 Seeds int64 } type FileStatus struct { Name string Url string Progress int64 Length int64 } type Engine struct { fileIdx int64 settings *Settings torrCfg *torrent.ClientConfig client *torrent.Client torrent *torrent.Torrent alive bool srv *http.Server srvWg sync.WaitGroup ctx context.Context shutdown context.CancelFunc msgs chan string status struct { lastBytesReadData int64 lastBytesWrittenData int64 downRate int64 upRate int64 seeds int64 } } func (o *Engine) IsAlive() bool { return o.alive } func (o *Engine) StartTorrent(idx int64) error { o.fileIdx = idx var err error o.torrCfg = torrent.NewDefaultClientConfig() o.torrCfg.ListenPort = o.settings.ListenPort o.torrCfg.DataDir = o.settings.DownloadPath o.torrCfg.DefaultStorage = storage.NewFileWithCompletion(o.settings.DownloadPath, storage.NewMapPieceCompletion()) if o.settings.Proxy != "" { if u, err := url.Parse(o.settings.Proxy); err != nil { o.torrCfg.HTTPProxy = func(request *http.Request) (*url.URL, error) { return u, nil } } } o.client, err = torrent.NewClient(o.torrCfg) if err != nil { return fmt.Errorf(`create torrent client failed: %s`, err.Error()) } o.debug(`got new client for torrent file: %s`, o.settings.TorrentPath) if o.torrent, err = o.client.AddTorrentFromFile(o.settings.TorrentPath); err != nil { o.error(`add torrent file failed: %s`, err.Error()) } o.debug(`added torrent file: %s`, o.settings.TorrentPath) o.torrent.SetMaxEstablishedConns(o.settings.MaxConnections) o.debug(`set max connections: %d`, o.settings.MaxConnections) o.alive = true go func() { t := o.torrent <-t.GotInfo() file := o.torrent.Files()[idx] // отключаем загрузку всех файлов o.torrent.CancelPieces(0, o.torrent.NumPieces()-1) // firstPieceIndex := file.Offset() * int64(t.NumPieces()) / t.Length() endPieceIndex := (file.Offset() + file.Length()) * int64(t.NumPieces()) / t.Length() o.torrent.DownloadPieces(int(firstPieceIndex), int(endPieceIndex)) for i := firstPieceIndex; i <= endPieceIndex*5/100; i++ { t.Piece(int(i)).SetPriority(torrent.PiecePriorityNow) } go func() { ticker := time.NewTicker(time.Second) defer ticker.Stop() for { select { case <-o.ctx.Done(): return case <-ticker.C: o.updateStats() o.debug(`update stats finished. down: %d KB/s up: %d KB/s seeds: %d`, o.status.downRate, o.status.upRate, o.status.seeds) } } }() o.srvWg.Add(1) go func() { defer o.srvWg.Done() router := http.NewServeMux() router.HandleFunc("/", o.GetFileHandler(file)) o.srv = &http.Server{Addr: fmt.Sprintf(`%s:%d`, o.settings.HttpBindHost, o.settings.HttpBindPort), Handler: router} o.info(`starting http server on port: %d`, o.settings.HttpBindPort) err = o.srv.ListenAndServe() if err != nil && !errors.Is(err, http.ErrServerClosed) { o.error(`http listener exit with error: %s`, err.Error()) } o.info(`http server stopped successfully`) }() }() return nil } func (o *Engine) updateStats() { stats := o.torrent.Stats() if o.status.lastBytesReadData == 0 { o.status.downRate = 0 } else { o.status.downRate = (stats.BytesReadData.Int64() - o.status.lastBytesReadData) / 1024 } o.status.lastBytesReadData = stats.BytesReadData.Int64() if o.status.lastBytesWrittenData == 0 { o.status.upRate = 0 } else { o.status.upRate = (stats.BytesWrittenData.Int64() - o.status.lastBytesWrittenData) / 1024 } o.status.lastBytesWrittenData = stats.BytesWrittenData.Int64() o.status.seeds = int64(stats.ConnectedSeeders) } func (o *Engine) Status() TorrentStatus { res := TorrentStatus{} res.DownloadRate = o.status.downRate res.UploadRate = o.status.upRate res.Seeds = o.status.seeds return res } func (o *Engine) FileStatus(i int) (FileStatus, error) { fs := FileStatus{} if i < 0 || i > len(o.torrent.Files())-1 { return fs, fmt.Errorf(`file index out of range: %d`, i) } file := o.torrent.Files()[i] pathParts := strings.Split(file.DisplayPath(), `/`) fs.Name = file.DisplayPath() for idx := range pathParts { pathParts[idx] = url.QueryEscape(pathParts[idx]) } fs.Url = fmt.Sprintf(`http://%s:%d/files/%s`, o.settings.HttpBindHost, o.settings.HttpBindPort, strings.Join(pathParts, `/`)) fs.Progress = int64(float64(file.BytesCompleted()) / float64(file.Length()) * 100.0) if fs.Progress < 0 { fs.Progress *= -1 } fs.Length = file.Length() if fs.Length < 0 { fs.Length *= -1 } return fs, nil } func (o *Engine) Stop() { go func() { httpCtx, cancelHttp := context.WithTimeout(o.ctx, time.Second*5) defer cancelHttp() if err := o.srv.Shutdown(httpCtx); err != nil { o.error(`shutting down http server failed: %s`, err.Error()) } o.srvWg.Wait() o.syncDebug(`dropping torrent`) o.torrent.Drop() o.syncDebug(`closing client`) o.client.Close() <-o.client.Closed() if !o.settings.KeepFiles { o.Clean() } o.syncDebug(`torrent engine stopped`) close(o.msgs) o.alive = false }() return } func (o *Engine) GetMsg() string { select { case msg, ok := <-o.msgs: if ok { return msg } else { return "__CLOSED__" } default: } return "__NO_MSG__" } func (o *Engine) debug(tmpl string, args ...interface{}) { go o.syncDebug(tmpl, args...) } func (o *Engine) info(tmpl string, args ...interface{}) { go o.syncInfo(tmpl, args...) } func (o *Engine) error(tmpl string, args ...interface{}) { go o.syncError(tmpl, args...) } func (o *Engine) syncError(t string, a ...interface{}) { defer func() { if rec := recover(); rec != nil { fmt.Printf("recovered from panic: %v\n", rec) } }() var result string if len(a) > 0 { result = fmt.Sprintf(t, a...) } else { result = t } o.msgs <- fmt.Sprintf(`[ERROR] %s`, result) } func (o *Engine) syncDebug(t string, a ...interface{}) { if o.settings.Debug { defer func() { if rec := recover(); rec != nil { fmt.Printf("recovered from panic: %v\n", rec) } }() var result string if len(a) > 0 { result = fmt.Sprintf(t, a...) } else { result = t } o.msgs <- fmt.Sprintf(`[DEBUG] %s`, result) } } func (o *Engine) syncInfo(t string, a ...interface{}) { defer func() { if rec := recover(); rec != nil { fmt.Printf("recovered from panic: %v\n", rec) } }() var result string if len(a) > 0 { result = fmt.Sprintf(t, a...) } else { result = t } o.msgs <- fmt.Sprintf(`[INFO] %s`, result) } func (o *Engine) GetFileHandler(target *torrent.File) func(http.ResponseWriter, *http.Request) { return func(w http.ResponseWriter, r *http.Request) { entry := target.NewReader() entry.SetReadahead(target.Length() / 100) entry.SetResponsive() defer func() { if err := entry.Close(); err != nil { o.error(`close torrent file in file handler failed: %s`, err.Error()) } }() pathParts := strings.Split(target.DisplayPath(), `/`) w.Header().Set("Content-Disposition", "attachment; filename=\""+pathParts[len(pathParts)-1]+"\"") http.ServeContent(w, r, target.DisplayPath(), time.Now(), entry) } } func (o *Engine) Clean() { fp := path.Join(o.settings.DownloadPath, o.torrent.Name()) if err := os.RemoveAll(fp); err != nil { o.syncError(`remove downloaded data failed: %s`, err.Error()) } }