文件监听文件

Wednesday, Jul 10, 2024 | 2 minute read | Updated at Wednesday, Jul 10, 2024

@
文件监听文件

文件服务器,支持监控文件变化,实现差量同步

package main

import (
	"log"
	"net/http"
	"os"
	"path/filepath"
	"strconv"
	"sync"
	"time"

	"github.com/fsnotify/fsnotify"
	"github.com/labstack/echo/v4"
)

// ChangeInfo represents information about a file change event
type ChangeInfo struct {
	Path      string      `json:"path"`
	Op        fsnotify.Op `json:"type"`
	Timestamp time.Time   `json:"timestamp"`
}

// DataStore stores the change events
type DataStore struct {
	sync.Mutex
	Changes []ChangeInfo
}

func NewDataStore() *DataStore {
	return &DataStore{
		Changes: make([]ChangeInfo, 0),
	}
}

func (ds *DataStore) AddChange(path string, op fsnotify.Op, timestamp time.Time) {
	ds.Lock()
	defer ds.Unlock()
	ds.Changes = append(ds.Changes, ChangeInfo{path, op, timestamp})
}

func (ds *DataStore) DelChanges(threshold time.Time) {
	ds.Lock()
	defer ds.Unlock()

	var pos int
	for idx, item := range ds.Changes {
		if item.Timestamp.After(threshold) {
			break
		}
		pos = idx
	}

	ds.Changes = ds.Changes[pos+1:]
}

func (ds *DataStore) GetChangesSince(since time.Time) []ChangeInfo {
	ds.Lock()
	defer ds.Unlock()

	// var changes []ChangeInfo
	var pos int
	for idx, val := range ds.Changes {
		log.Println(since, val.Timestamp)
		if val.Timestamp.After(since) {
			pos = idx
			break
		}
	}
	changes := ds.Changes[pos:]

	// sort.Slice(changes, func(i, j int) bool {
	// 	return changes[i].Timestamp.Before(changes[j].Timestamp)
	// })
	log.Println(changes)
	return changes
}

func fsMonitor(ds *DataStore, dir string) {
	watcher, err := fsnotify.NewWatcher()
	if err != nil {
		log.Fatal(err)
	}
	defer watcher.Close()

	ticker := time.NewTicker(1 * time.Hour) // Adjust the cleanup interval as needed

	go func() {
		for {
			select {
			case event, ok := <-watcher.Events:
				if !ok {
					return
				}
				log.Println("event:", event)
				if event.Has(fsnotify.Write | fsnotify.Remove) {
					ds.AddChange(event.String(), event.Op, time.Now())
				}
			case err, ok := <-watcher.Errors:
				if !ok {
					return
				}
				log.Println("error:", err)
			// case <-time.After(2 * time.Second):
			// 	log.Println("operation timed out")
			case <-ticker.C:
				// Cleanup changes older than 7 days (adjust as needed)
				threshold := time.Now().Add(-24 * time.Hour)
				ds.DelChanges(threshold)
				log.Println("Cleaned up old changes")
			}
			log.Println(ds)
		}
	}()

	// Add a path.
	err = watcher.Add(dir)
	if err != nil {
		log.Fatal(err)
	}

	// // Block main goroutine forever.
	<-make(chan struct{})
}

func main() {
	// Initialize fsnotify watcher

	// Initialize data store for changes
	dataStore := NewDataStore()

	// Define the directory to watch (replace with your directory path)
	dirToWatch := "F:\\code"

	go fsMonitor(dataStore, dirToWatch)

	// e.GET("/users/:id", getUser)
	// id := c.Param("id")
	// /show?team=x-men&member=wolverine
	// team := c.QueryParam("team")

	e := echo.New()
	e.GET("/changes/:since", func(c echo.Context) error {
		since := c.Param("since")
		log.Println(since)
		ts, err := strconv.ParseInt(since, 10, 64)
		if err != nil {
			return c.JSON(http.StatusOK, nil)
		}
		log.Println(ts)

		changes := dataStore.GetChangesSince(time.Unix(ts, 0))
		return c.JSON(http.StatusOK, changes)
	})

	e.GET("/files/:path", func(c echo.Context) error {
		subpath := c.Param("path")
		path := filepath.Join(dirToWatch, subpath)
		if _, err := os.Stat(path); err == nil {
			return c.File(path)
		}
		return c.JSON(http.StatusNotFound, nil)
	})

	e.Logger.Fatal(e.Start(":1323"))
}

© 2016 - 2025 Caisong's Blog

🌱 Powered by Hugo with theme Dream.

About Me

大龄程序员,喜欢折腾各种环境部署、软件应用。

博客记录日常。