
文件服务器,支持监控文件变化,实现差量同步
package main
import (
"log"
"net/http"
"os"
"path/filepath"
"strconv"
"sync"
"time"
"github.com/fsnotify/fsnotify"
"github.com/labstack/echo/v4"
)
// ChangeInfo represents information about a file change event
type ChangeInfo struct {
Path string `json:"path"`
Op fsnotify.Op `json:"type"`
Timestamp time.Time `json:"timestamp"`
}
// DataStore stores the change events
type DataStore struct {
sync.Mutex
Changes []ChangeInfo
}
func NewDataStore() *DataStore {
return &DataStore{
Changes: make([]ChangeInfo, 0),
}
}
func (ds *DataStore) AddChange(path string, op fsnotify.Op, timestamp time.Time) {
ds.Lock()
defer ds.Unlock()
ds.Changes = append(ds.Changes, ChangeInfo{path, op, timestamp})
}
func (ds *DataStore) DelChanges(threshold time.Time) {
ds.Lock()
defer ds.Unlock()
var pos int
for idx, item := range ds.Changes {
if item.Timestamp.After(threshold) {
break
}
pos = idx
}
ds.Changes = ds.Changes[pos+1:]
}
func (ds *DataStore) GetChangesSince(since time.Time) []ChangeInfo {
ds.Lock()
defer ds.Unlock()
// var changes []ChangeInfo
var pos int
for idx, val := range ds.Changes {
log.Println(since, val.Timestamp)
if val.Timestamp.After(since) {
pos = idx
break
}
}
changes := ds.Changes[pos:]
// sort.Slice(changes, func(i, j int) bool {
// return changes[i].Timestamp.Before(changes[j].Timestamp)
// })
log.Println(changes)
return changes
}
func fsMonitor(ds *DataStore, dir string) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
log.Fatal(err)
}
defer watcher.Close()
ticker := time.NewTicker(1 * time.Hour) // Adjust the cleanup interval as needed
go func() {
for {
select {
case event, ok := <-watcher.Events:
if !ok {
return
}
log.Println("event:", event)
if event.Has(fsnotify.Write | fsnotify.Remove) {
ds.AddChange(event.String(), event.Op, time.Now())
}
case err, ok := <-watcher.Errors:
if !ok {
return
}
log.Println("error:", err)
// case <-time.After(2 * time.Second):
// log.Println("operation timed out")
case <-ticker.C:
// Cleanup changes older than 7 days (adjust as needed)
threshold := time.Now().Add(-24 * time.Hour)
ds.DelChanges(threshold)
log.Println("Cleaned up old changes")
}
log.Println(ds)
}
}()
// Add a path.
err = watcher.Add(dir)
if err != nil {
log.Fatal(err)
}
// // Block main goroutine forever.
<-make(chan struct{})
}
func main() {
// Initialize fsnotify watcher
// Initialize data store for changes
dataStore := NewDataStore()
// Define the directory to watch (replace with your directory path)
dirToWatch := "F:\\code"
go fsMonitor(dataStore, dirToWatch)
// e.GET("/users/:id", getUser)
// id := c.Param("id")
// /show?team=x-men&member=wolverine
// team := c.QueryParam("team")
e := echo.New()
e.GET("/changes/:since", func(c echo.Context) error {
since := c.Param("since")
log.Println(since)
ts, err := strconv.ParseInt(since, 10, 64)
if err != nil {
return c.JSON(http.StatusOK, nil)
}
log.Println(ts)
changes := dataStore.GetChangesSince(time.Unix(ts, 0))
return c.JSON(http.StatusOK, changes)
})
e.GET("/files/:path", func(c echo.Context) error {
subpath := c.Param("path")
path := filepath.Join(dirToWatch, subpath)
if _, err := os.Stat(path); err == nil {
return c.File(path)
}
return c.JSON(http.StatusNotFound, nil)
})
e.Logger.Fatal(e.Start(":1323"))
}