Compare commits
7 Commits
Author | SHA1 | Date | |
---|---|---|---|
da5f8b14b1 | |||
d6d768980b | |||
7d82cd8d57 | |||
02bc633f1b | |||
2434cccf59 | |||
426d5738a0 | |||
76c2e552ab |
@ -87,6 +87,9 @@ You can optionally supply additional ordered parameters to `contented.init`:
|
|||||||
|
|
||||||
## Changelog
|
## Changelog
|
||||||
|
|
||||||
|
2025-08-20: v1.6.1
|
||||||
|
- Expanded error logging for tiered storage migrations
|
||||||
|
|
||||||
2025-08-20: v1.6.0
|
2025-08-20: v1.6.0
|
||||||
- Support hot/cold tiered storage to move files between local path and S3 bucket
|
- Support hot/cold tiered storage to move files between local path and S3 bucket
|
||||||
- Upgrade all dependencies
|
- Upgrade all dependencies
|
||||||
|
38
storage.go
38
storage.go
@ -5,6 +5,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"log"
|
"log"
|
||||||
|
"math/rand"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"time"
|
"time"
|
||||||
@ -152,45 +153,68 @@ func (ts *tieredStorage) migrateNow() error {
|
|||||||
// List local files
|
// List local files
|
||||||
dirents, err := os.ReadDir(ts.hot.dataDir)
|
dirents, err := os.ReadDir(ts.hot.dataDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("Reading hot storage files: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(dirents) == 0 {
|
||||||
|
return nil // Directory empty, nothing to do
|
||||||
}
|
}
|
||||||
|
|
||||||
cutOff := time.Now().Add(-TierMigrationAfter)
|
cutOff := time.Now().Add(-TierMigrationAfter)
|
||||||
|
|
||||||
|
// Shuffle files to avoid getting stuck
|
||||||
|
rand.Shuffle(len(dirents), func(i, j int) {
|
||||||
|
dirents[i], dirents[j] = dirents[j], dirents[i]
|
||||||
|
})
|
||||||
|
|
||||||
|
log.Printf("tier-migration: Scanning %d items...", len(dirents))
|
||||||
|
|
||||||
|
var countMigrated int64 = 0
|
||||||
|
|
||||||
for _, dirent := range dirents {
|
for _, dirent := range dirents {
|
||||||
fi, err := dirent.Info()
|
fi, err := dirent.Info()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err // local files can't be stat'd = important error
|
return fmt.Errorf("Reading hot storage files: %w", err) // local files can't be stat'd = important error
|
||||||
}
|
}
|
||||||
|
|
||||||
if !fi.ModTime().After(cutOff) {
|
if fi.IsDir() {
|
||||||
|
continue // probably . or ..
|
||||||
|
}
|
||||||
|
|
||||||
|
if fi.ModTime().After(cutOff) {
|
||||||
continue // not eligible
|
continue // not eligible
|
||||||
}
|
}
|
||||||
|
|
||||||
fileHash := dirent.Name()
|
fileHash := dirent.Name()
|
||||||
|
|
||||||
|
log.Printf("tier-migration: Migrating %q...", fileHash)
|
||||||
|
|
||||||
// Copy to cold storage
|
// Copy to cold storage
|
||||||
// Any concurrent reads will be serviced from the hot storage, so this
|
// Any concurrent reads will be serviced from the hot storage, so this
|
||||||
// is a safe operation
|
// is a safe operation
|
||||||
rc, err := ts.cold.ReadFile(context.Background(), fileHash)
|
rc, err := ts.hot.ReadFile(context.Background(), fileHash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err // can't cat local file
|
return fmt.Errorf("Read %q from hot storage: %w", fileHash, err) // can't cat local file
|
||||||
}
|
}
|
||||||
|
|
||||||
err = ts.cold.SaveFile(context.Background(), fileHash, fi.Size(), rc)
|
err = ts.cold.SaveFile(context.Background(), fileHash, fi.Size(), rc)
|
||||||
_ = rc.Close()
|
_ = rc.Close()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err // can't save local file
|
return fmt.Errorf("Write %q to cold storage: %w", fileHash, err) // can't save local file
|
||||||
}
|
}
|
||||||
|
|
||||||
// Copy was successful. Delete local file
|
// Copy was successful. Delete local file
|
||||||
err = os.Remove(filepath.Join(ts.hot.dataDir, fileHash))
|
err = os.Remove(filepath.Join(ts.hot.dataDir, fileHash))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err // can't rm local file
|
return fmt.Errorf("Remove %q from hot storage: %w", err) // can't rm local file
|
||||||
}
|
}
|
||||||
|
|
||||||
|
countMigrated++
|
||||||
}
|
}
|
||||||
|
|
||||||
// Migrated everything we can for now
|
// Migrated everything we can for now
|
||||||
|
log.Printf("tier-migration: Sleeping (migrated %d/%d items)", countMigrated, len(dirents))
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user