Compare commits

..

12 Commits

@ -0,0 +1,7 @@
pipeline:
build:
image: golang
commands:
- go test ./...
environment:
- GOPRIVATE=git.sg.caj.me/caj

@ -2,10 +2,16 @@ FROM golang:alpine as builder
ENV GO111MODULE=on ENV GO111MODULE=on
# Create the user and group files to run unprivileged ENV USER=appuser
RUN mkdir /user && \ ENV UID=1000
echo 'nobody:x:65534:65534:nobody:/:' > /user/passwd && \ RUN adduser \
echo 'nobody:x:65534:' > /user/group --disabled-password \
--gecos "" \
--home "/nonexistent" \
--shell "/sbin/nologin" \
--no-create-home \
--uid "${UID}" \
"${USER}"
RUN apk update && apk add --no-cache git ca-certificates tzdata sqlite build-base RUN apk update && apk add --no-cache git ca-certificates tzdata sqlite build-base
RUN mkdir /build RUN mkdir /build
@ -15,11 +21,13 @@ WORKDIR /build
COPY ./ ./ COPY ./ ./
RUN CGO_ENABLED=1 GOOS=linux go build -a -installsuffix cgo -ldflags '-extldflags "-static"' -o bog . RUN CGO_ENABLED=1 GOOS=linux go build -a -installsuffix cgo -ldflags '-extldflags "-static"' -o bog .
FROM golang:alpine AS final FROM scratch AS final
LABEL author="Cajually <me@caj.me>" LABEL author="Cajually <me@caj.me>"
COPY --from=builder /usr/share/zoneinfo /usr/share/zoneinfo COPY --from=builder /usr/share/zoneinfo /usr/share/zoneinfo
COPY --from=builder /user/group /user/passwd /etc/ COPY --from=builder /etc/passwd /etc/passwd
COPY --from=builder /etc/group /etc/group
COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
COPY --from=builder /build/bog / COPY --from=builder /build/bog /
COPY --from=builder /build/server/views /server/views COPY --from=builder /build/server/views /server/views
@ -27,7 +35,7 @@ COPY --from=builder /build/default.toml /
WORKDIR / WORKDIR /
USER nobody:nobody USER appuser:appuser
ENTRYPOINT ["/bog"] ENTRYPOINT ["/bog"]
EXPOSE 8002 EXPOSE 8002

@ -1,4 +1,7 @@
[![status-badge](https://ci.sg.caj.me/api/badges/caj/bog/status.svg)](https://ci.sg.caj.me/caj/bog)
# Bog: The Dataswamp # Bog: The Dataswamp
![Logo](https://git.sg.caj.me/caj/bog/raw/branch/master/doc/logo.jpg)
Is the warehouse too strict? Maybe you don't know the schema yet. And Is the warehouse too strict? Maybe you don't know the schema yet. And
you know your lake is already full with a bunch of stuff that should you know your lake is already full with a bunch of stuff that should
have been deleted long ago? have been deleted long ago?

@ -1 +1,7 @@
package dataswamp package dataswamp
import (
// "caj-larsson/bog/dataswamp/namespace"
)
type AdminService struct{}

@ -1,7 +0,0 @@
package dataswamp
type Logger interface {
Debug(format string, a ...interface{})
Info(format string, a ...interface{})
Warn(format string, a ...interface{})
}

@ -11,7 +11,6 @@ type Namespace struct {
LastSeen time.Time LastSeen time.Time
AllowanceDuration time.Duration AllowanceDuration time.Duration
FileQuota FileSizeQuota FileQuota FileSizeQuota
Usage FileStat
Download FileStat Download FileStat
Upload FileStat Upload FileStat
} }

@ -27,13 +27,15 @@ func basicNamespaceContract(fac func() Repository, t *testing.T) {
FileSizeQuota{1000, 0}, FileSizeQuota{1000, 0},
FileStat{1, 2}, FileStat{1, 2},
FileStat{3, 4}, FileStat{3, 4},
FileStat{5, 6},
} }
ns1, _ := r.Create(ns) ns1, err := r.Create(ns)
is.NoErr(err)
ns.Name = "n2" ns.Name = "n2"
ns2, _ := r.Create(ns) ns2, err := r.Create(ns)
is.True(ns1 != ns2) is.NoErr(err)
is.True(ns1.ID != ns2.ID)
all, err = r.All() all, err = r.All()

@ -0,0 +1,111 @@
package namespace
import (
"caj-larsson/bog/util"
"fmt"
"time"
)
type Clock interface {
Now() time.Time
}
type NamespaceService struct {
repo Repository
outboxes []func(util.Event)
logger util.Logger
clock Clock
default_ttl time.Duration
default_quota_bytes int64
}
func NewNamespaceService(repo Repository, logger util.Logger, clock Clock, default_ttl time.Duration, default_quota_bytes int64) *NamespaceService {
return &NamespaceService{
repo,
nil,
logger,
clock,
default_ttl,
default_quota_bytes,
}
}
func (s *NamespaceService) GetOrCreateNs(name string) *Namespace {
ns, err := s.repo.GetByName(name)
if err == ErrNotExists {
new_ns := Namespace{
0,
name,
s.clock.Now(),
s.default_ttl,
FileSizeQuota{s.default_quota_bytes, 0},
FileStat{0, 0},
FileStat{0, 0},
}
created_ns, err := s.repo.Create(new_ns)
if err != nil {
panic(err)
}
return created_ns
}
if err != nil {
panic(err)
}
return ns
}
func (s *NamespaceService) Wire(reg func(string, util.EventHandler), outbox func(ev util.Event)) {
reg("FileUsed", s.handleFileUsed)
s.outboxes = append(s.outboxes, outbox)
reg("FileUsed", s.handleFileUsed)
reg("FileDeleted", s.handleFileDeleted)
reg("FileRecieved", s.handleFileRecieved)
}
func (s *NamespaceService) All() []Namespace {
nss, err := s.repo.All()
if err != nil {
panic(err)
}
return nss
}
func (s *NamespaceService) handleFileUsed(payload interface{}) {
var payload_s = payload.(struct {
Name string
Size int64
})
fmt.Printf("file used %v\n", payload_s)
ns := s.GetOrCreateNs(payload_s.Name)
ns.FileQuota = ns.FileQuota.Add(payload_s.Size)
s.repo.Update(ns.ID, *ns)
}
func (s *NamespaceService) handleFileDeleted(payload interface{}) {
var payload_s = payload.(struct {
Name string
Size int64
})
fmt.Printf("file deleted %v\n", payload_s)
ns := s.GetOrCreateNs(payload_s.Name)
ns.FileQuota = ns.FileQuota.Add(-payload_s.Size)
fmt.Printf("file usage %v\n", ns.FileQuota)
s.repo.Update(ns.ID, *ns)
}
func (s *NamespaceService) handleFileRecieved(payload interface{}) {
var payload_s = payload.(struct {
Name string
Size int64
})
fmt.Printf("file recieved %v\n", payload_s)
ns := s.GetOrCreateNs(payload_s.Name)
ns.FileQuota = ns.FileQuota.Add(payload_s.Size)
fmt.Printf("file usage %v\n", ns.FileQuota)
s.repo.Update(ns.ID, *ns)
}

@ -0,0 +1,38 @@
package namespace
import (
"caj-larsson/bog/util"
"testing"
)
func TestEventTest(t *testing.T) {
eb := util.NewEventBus()
svc := NamespaceService{}
svc.Wire(eb.Register, eb.Handle)
events := []util.Event{
*util.NewEvent("FileUsed", struct {
Name string
Size int64
}{
"asd",
int64(12),
}),
*util.NewEvent("FileDeleted", struct {
Name string
Size int64
}{
"asd",
int64(12),
}),
*util.NewEvent("FileRecieved", struct {
Name string
Size int64
}{
"asd",
int64(12),
}),
}
util.AcceptsMessage(t, eb, events)
}

@ -13,28 +13,18 @@ func (f FileSizeQuota) Remaining() int64 {
return f.AllowanceKB - f.CurrentUsage return f.AllowanceKB - f.CurrentUsage
} }
func (f FileSizeQuota) Add(size int64) (*FileSizeQuota, error) { func (f FileSizeQuota) Add(size int64) FileSizeQuota {
if !f.Allows(size) { return FileSizeQuota{
return nil, ErrExceedQuota
}
n := FileSizeQuota {
f.AllowanceKB, f.AllowanceKB,
f.CurrentUsage + size, f.CurrentUsage + size,
} }
return &n, nil
} }
func (f FileSizeQuota) Remove(size int64) (*FileSizeQuota, error) { func (f FileSizeQuota) Remove(size int64) FileSizeQuota {
if size > f.CurrentUsage { return FileSizeQuota{
return nil, ErrQuotaInvalid
}
n := FileSizeQuota {
f.AllowanceKB, f.AllowanceKB,
f.CurrentUsage - size, f.CurrentUsage - size,
} }
return &n, nil
} }
type FileStat struct { type FileStat struct {

@ -15,27 +15,9 @@ func TestQuota(t *testing.T) {
func TestQuotaManipulation(t *testing.T) { func TestQuotaManipulation(t *testing.T) {
is := is.New(t) is := is.New(t)
quota := &FileSizeQuota{1000, 0} quota := FileSizeQuota{1000, 0}
quota, err := quota.Add(500) quota = quota.Add(500)
is.NoErr(err)
is.Equal(quota.CurrentUsage, int64(500)) is.Equal(quota.CurrentUsage, int64(500))
quota = quota.Remove(1000)
quota, err = quota.Add(500) is.Equal(quota.CurrentUsage, int64(-500))
is.NoErr(err)
_ , err = quota.Add(1)
is.Equal(err, ErrExceedQuota)
is.Equal(quota.CurrentUsage, int64(1000))
_ , err = quota.Remove(1001)
is.Equal(err, ErrQuotaInvalid)
is.Equal(quota.CurrentUsage, int64(1000))
quota, err = quota.Remove(1000)
is.NoErr(err)
is.Equal(quota.CurrentUsage, int64(0))
} }

@ -3,6 +3,7 @@ package dataswamp
import ( import (
"caj-larsson/bog/dataswamp/namespace" "caj-larsson/bog/dataswamp/namespace"
"caj-larsson/bog/dataswamp/swampfile" "caj-larsson/bog/dataswamp/swampfile"
"caj-larsson/bog/util"
"io" "io"
"strconv" "strconv"
"time" "time"
@ -10,56 +11,29 @@ import (
// "fmt" // "fmt"
) )
type SwampFileService struct { type DataSwampService struct {
namespace_repo namespace.Repository ns_svc namespace.NamespaceService
swamp_file_repo swampfile.Repository swamp_file_repo swampfile.Repository
default_allowance_bytes int64 logger util.Logger
default_allowance_duration time.Duration eventBus util.EventBus
logger Logger
} }
func NewSwampFileService( func NewDataSwampService(
namespace_repo namespace.Repository, ns_svc namespace.NamespaceService,
swamp_file_repo swampfile.Repository, swamp_file_repo swampfile.Repository,
da_bytes int64, logger util.Logger,
da_duration time.Duration, ) *DataSwampService {
logger Logger, s := DataSwampService{ns_svc, swamp_file_repo, logger, *util.NewEventBus()}
) SwampFileService { ns_svc.Wire(s.eventBus.Register, s.eventBus.Handle)
return SwampFileService{namespace_repo, swamp_file_repo, da_bytes, da_duration, logger} return &s
} }
func (s SwampFileService) getOrCreateNs(namespace_in string) *namespace.Namespace { func (s DataSwampService) NamespaceStats() []namespace.Namespace {
ns, err := s.namespace_repo.GetByName(namespace_in) return s.ns_svc.All()
if err == namespace.ErrNotExists {
new_ns := namespace.Namespace{
0,
namespace_in,
time.Now(),
s.default_allowance_duration,
namespace.FileSizeQuota{s.default_allowance_bytes, 0},
namespace.FileStat{0, 0},
namespace.FileStat{0, 0},
namespace.FileStat{0, 0},
}
created_ns, err := s.namespace_repo.Create(new_ns)
if err != nil {
panic(err)
}
return created_ns
}
if err != nil {
panic(err)
}
return ns
} }
func (s SwampFileService) SaveFile(ref swampfile.FileReference, src io.Reader, size int64) error { func (s DataSwampService) SaveFile(ref swampfile.FileReference, src io.Reader, size int64) error {
ns := s.getOrCreateNs(ref.UserAgent) ns := s.ns_svc.GetOrCreateNs(ref.UserAgent)
r, err := ref.Clean(true) r, err := ref.Clean(true)
@ -74,13 +48,23 @@ func (s SwampFileService) SaveFile(ref swampfile.FileReference, src io.Reader, s
f, err := s.swamp_file_repo.Create(r.Path, strconv.FormatInt(ns.ID, 10)) f, err := s.swamp_file_repo.Create(r.Path, strconv.FormatInt(ns.ID, 10))
if err != nil { if err != nil {
// TODO: convert this into a different error.
return err return err
} }
s.eventBus.Handle(*util.NewEvent("FileUsed", struct {
Name string
Size int64
}{
ns.Name,
f.Size(),
}))
// TODO: rewrite this into an interruptable loop that emits downloaded events
written, err := io.CopyN(f, src, size) written, err := io.CopyN(f, src, size)
if written < size { if written < size {
s.swamp_file_repo.Delete(r.Path, strconv.FormatInt(ns.ID, 10)) s.swamp_file_repo.Delete(r.Path, strconv.FormatInt(ns.ID, 10)) //
return swampfile.ErrContentSizeExaggerated return swampfile.ErrContentSizeExaggerated
} }
@ -93,21 +77,24 @@ func (s SwampFileService) SaveFile(ref swampfile.FileReference, src io.Reader, s
return swampfile.ErrContentSizeExceeded return swampfile.ErrContentSizeExceeded
} }
f.Close() err = f.Close()
uq, err := ns.FileQuota.Add(size)
if err != nil { if err != nil {
return err panic(err)
} }
ns.FileQuota = *uq
ns.Usage = ns.Usage.Add(size) s.eventBus.Handle(*util.NewEvent("FileRecieved", struct {
ns.Upload = ns.Upload.Add(size) Name string
s.namespace_repo.Update(ns.ID, *ns) Size int64
}{
ns.Name,
written,
}))
return nil return nil
} }
func (s SwampFileService) OpenOutFile(ref swampfile.FileReference) (swampfile.SwampOutFile, error) { func (s DataSwampService) OpenOutFile(ref swampfile.FileReference) (swampfile.SwampOutFile, error) {
ns := s.getOrCreateNs(ref.UserAgent) ns := s.ns_svc.GetOrCreateNs(ref.UserAgent)
r, err := ref.Clean(true) r, err := ref.Clean(true)
@ -121,41 +108,31 @@ func (s SwampFileService) OpenOutFile(ref swampfile.FileReference) (swampfile.Sw
return nil, err return nil, err
} }
ns.Download = ns.Download.Add(f.Size()) s.eventBus.Handle(*util.NewEvent("FileSent", f.Size()))
s.namespace_repo.Update(ns.ID, *ns)
return f, nil return f, nil
} }
func (s SwampFileService) NamespaceStats() ([]namespace.Namespace, error) { func (s DataSwampService) CleanUpExpiredFiles() error {
return s.namespace_repo.All()
}
func (s SwampFileService) CleanUpExpiredFiles() error {
s.logger.Info("Cleaning up expired files") s.logger.Info("Cleaning up expired files")
nss, err := s.namespace_repo.All()
if err != nil {
return err
}
for _, ns := range nss { for _, ns := range s.ns_svc.All() {
expiry := time.Now().Add(-ns.AllowanceDuration) expiry := time.Now().Add(-ns.AllowanceDuration)
dfs, err := s.swamp_file_repo.DeleteOlderThan(strconv.FormatInt(ns.ID, 10), expiry) dfs, err := s.swamp_file_repo.DeleteOlderThan(strconv.FormatInt(ns.ID, 10), expiry)
for _, df := range dfs {
dq, err := ns.FileQuota.Remove(df.Size)
if err != nil {
dq.CurrentUsage = 0
}
ns.FileQuota = *dq
}
if err != nil { if err != nil {
panic(err) panic(err)
} }
s.namespace_repo.Update(ns.ID, ns) for _, df := range dfs {
s.eventBus.Handle(*util.NewEvent("FileDeleted", struct {
Name string
Size int64
}{
ns.Name,
df.Size,
}))
}
} }
return nil return nil
} }

@ -2,14 +2,16 @@ package dataswamp
import ( import (
"bytes" "bytes"
"caj-larsson/bog/dataswamp/namespace"
"caj-larsson/bog/dataswamp/swampfile" "caj-larsson/bog/dataswamp/swampfile"
m_namespace "caj-larsson/bog/infrastructure/memory/namespace"
m_swampfile "caj-larsson/bog/infrastructure/memory/swampfile"
"caj-larsson/bog/infrastructure/system_time"
"fmt"
"github.com/matryer/is" "github.com/matryer/is"
"github.com/spf13/afero" "github.com/spf13/afero"
"testing" "testing"
"time" "time"
// "caj-larsson/bog/dataswamp/namespace"
m_namespace "caj-larsson/bog/infrastructure/memory/namespace"
m_swampfile "caj-larsson/bog/infrastructure/memory/swampfile"
) )
type TestLogger struct{} type TestLogger struct{}
@ -22,15 +24,25 @@ var file_ref1 = swampfile.FileReference{"/path1", "ns1"}
var file_ref2 = swampfile.FileReference{"/path1", "ns2"} var file_ref2 = swampfile.FileReference{"/path1", "ns2"}
var file_ref3 = swampfile.FileReference{"/path2", "ns1"} var file_ref3 = swampfile.FileReference{"/path2", "ns1"}
func NewTestSwampFileService() SwampFileService { func NewTestDataSwampService() DataSwampService {
file_repo := m_swampfile.NewRepository() file_repo := m_swampfile.NewRepository()
ns_repo := m_namespace.NewRepository() ns_repo := m_namespace.NewRepository()
return NewSwampFileService(ns_repo, file_repo, 1024, time.Hour, TestLogger{})
logger := TestLogger{}
ns_svc := namespace.NewNamespaceService(
ns_repo,
logger,
system_time.Clock{},
time.Hour,
1024,
)
return *NewDataSwampService(*ns_svc, file_repo, logger)
} }
func TestFileDontExist(t *testing.T) { func TestFileDontExist(t *testing.T) {
is := is.New(t) is := is.New(t)
s := NewTestSwampFileService() s := NewTestDataSwampService()
outfile, err := s.OpenOutFile(file_ref1) outfile, err := s.OpenOutFile(file_ref1)
is.True(err == swampfile.ErrNotExists) is.True(err == swampfile.ErrNotExists)
@ -39,7 +51,7 @@ func TestFileDontExist(t *testing.T) {
func TestFileIsStored(t *testing.T) { func TestFileIsStored(t *testing.T) {
is := is.New(t) is := is.New(t)
s := NewTestSwampFileService() s := NewTestDataSwampService()
fakefile := bytes.NewBufferString("My bog data") fakefile := bytes.NewBufferString("My bog data")
@ -60,7 +72,7 @@ func TestFileIsStored(t *testing.T) {
func TestFileIsReadBack(t *testing.T) { func TestFileIsReadBack(t *testing.T) {
is := is.New(t) is := is.New(t)
s := NewTestSwampFileService() s := NewTestDataSwampService()
infile := bytes.NewBufferString("My bog data") infile := bytes.NewBufferString("My bog data")
@ -76,7 +88,7 @@ func TestFileIsReadBack(t *testing.T) {
func TestNSIsolation(t *testing.T) { func TestNSIsolation(t *testing.T) {
is := is.New(t) is := is.New(t)
s := NewTestSwampFileService() s := NewTestDataSwampService()
ns1_file := bytes.NewBufferString("My bog data ns1") ns1_file := bytes.NewBufferString("My bog data ns1")
ns2_file := bytes.NewBufferString("My bog data ns2") ns2_file := bytes.NewBufferString("My bog data ns2")
@ -94,7 +106,7 @@ func TestNSIsolation(t *testing.T) {
func TestPathStrictMode(t *testing.T) { func TestPathStrictMode(t *testing.T) {
is := is.New(t) is := is.New(t)
s := NewTestSwampFileService() s := NewTestDataSwampService()
ns_file := bytes.NewBufferString("My bog data ns1") ns_file := bytes.NewBufferString("My bog data ns1")
@ -114,7 +126,7 @@ func TestPathStrictMode(t *testing.T) {
func TestQuotaWithContenSizeLieOver(t *testing.T) { func TestQuotaWithContenSizeLieOver(t *testing.T) {
is := is.New(t) is := is.New(t)
s := NewTestSwampFileService() s := NewTestDataSwampService()
largefakefile := bytes.NewBufferString("") largefakefile := bytes.NewBufferString("")
@ -129,7 +141,7 @@ func TestQuotaWithContenSizeLieOver(t *testing.T) {
func TestQuotaWithContenSizeLieUnder(t *testing.T) { func TestQuotaWithContenSizeLieUnder(t *testing.T) {
is := is.New(t) is := is.New(t)
s := NewTestSwampFileService() s := NewTestDataSwampService()
largefakefile := bytes.NewBufferString("small") largefakefile := bytes.NewBufferString("small")
@ -144,22 +156,32 @@ func TestCleanUpExpired(t *testing.T) {
fs := afero.NewMemMapFs() fs := afero.NewMemMapFs()
file_repo := m_swampfile.Repository{fs} file_repo := m_swampfile.Repository{fs}
ns_repo := m_namespace.NewRepository() ns_repo := m_namespace.NewRepository()
s := NewSwampFileService(ns_repo, file_repo, 1024, time.Hour, TestLogger{}) logger := TestLogger{}
ns_svc := namespace.NewNamespaceService(
fakefile := bytes.NewBufferString("My bog data") ns_repo,
logger,
system_time.Clock{},
time.Hour,
1024,
)
s := NewDataSwampService(*ns_svc, file_repo, logger)
fakefile := bytes.NewBufferString("My bog")
err := s.SaveFile(file_ref1, fakefile, int64(fakefile.Len())) err := s.SaveFile(file_ref1, fakefile, int64(fakefile.Len()))
is.NoErr(err) is.NoErr(err)
fakefile = bytes.NewBufferString("My bog data") fakefile = bytes.NewBufferString("My bog")
err = s.SaveFile(file_ref3, fakefile, int64(fakefile.Len())) err = s.SaveFile(file_ref3, fakefile, int64(fakefile.Len()))
is.NoErr(err) is.NoErr(err)
err = fs.Chtimes("1/path1", time.Now(), time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC)) err = fs.Chtimes("1/path1", time.Now(), time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC))
is.NoErr(err)
is.NoErr(s.CleanUpExpiredFiles()) is.NoErr(s.CleanUpExpiredFiles())
ns, err := ns_repo.GetByName("ns1") ns, err := ns_repo.GetByName("ns1")
fmt.Printf("file final usage %v\n", ns.FileQuota)
is.NoErr(err) is.NoErr(err)
fmt.Printf("file\n")
is.Equal(ns.FileQuota.CurrentUsage, int64(len("My bog data"))) is.Equal(ns.FileQuota.CurrentUsage, int64(len("My bog")))
} }

@ -43,7 +43,7 @@ func (fr *FileReference) Clean(strict bool) (*FileReference, error) {
return nil, ErrUnacceptablePath return nil, ErrUnacceptablePath
} }
n := FileReference { n := FileReference{
c, c,
fr.UserAgent, fr.UserAgent,
} }

Binary file not shown.

After

Width:  |  Height:  |  Size: 10 KiB

@ -48,6 +48,13 @@ func (f FileSystemSwampFileData) Modified() time.Time {
return stat.ModTime() return stat.ModTime()
} }
func (f FileSystemSwampFileData) Truncate() {
err := f.file.Truncate(0)
if err != nil {
panic(err)
}
}
type Repository struct { type Repository struct {
Root string Root string
} }
@ -98,6 +105,7 @@ func (f Repository) Create(filename string, namespace_ns string) (swampfile.Swam
if err != nil { if err != nil {
panic(err) panic(err)
} }
file.Truncate(0)
bfd := FileSystemSwampFileData{filename, stat_info.Size(), stat_info.ModTime(), file} bfd := FileSystemSwampFileData{filename, stat_info.Size(), stat_info.ModTime(), file}

@ -44,6 +44,13 @@ func (f SwampFile) Modified() time.Time {
return stat.ModTime() return stat.ModTime()
} }
func (f SwampFile) Truncate() {
err := f.file.Truncate(0)
if err != nil {
panic(err)
}
}
// The actual repository // The actual repository
type Repository struct { type Repository struct {
Fs afero.Fs Fs afero.Fs

@ -21,7 +21,6 @@ type Namespace struct {
AllowanceTime sql.NullInt64 AllowanceTime sql.NullInt64
QuotaKb sql.NullInt64 QuotaKb sql.NullInt64
QuotaUsageKb sql.NullInt64 QuotaUsageKb sql.NullInt64
UsageID int64
DownloadID int64 DownloadID int64
UploadID int64 UploadID int64
} }

@ -6,11 +6,10 @@ INSERT INTO
allowance_time, allowance_time,
quota_kb, quota_kb,
quota_usage_kb, quota_usage_kb,
usage_id,
download_id, download_id,
upload_id upload_id
) )
values(?, ?, ?, ?, ?, ?, ?, ?) values(?, ?, ?, ?, ?, ?, ?)
returning id; returning id;
-- name: CreateFileStats :one -- name: CreateFileStats :one
@ -26,15 +25,11 @@ SELECT
ns.allowance_time, ns.allowance_time,
ns.quota_kb, ns.quota_kb,
ns.quota_usage_kb, ns.quota_usage_kb,
u.num as u_num,
u.size_b as u_size_b,
d.num as d_num, d.num as d_num,
d.size_b as d_size_b, d.size_b as d_size_b,
ul.num as ul_num, ul.num as ul_num,
ul.size_b as ul_size_b ul.size_b as ul_size_b
FROM namespace as ns FROM namespace as ns
JOIN file_stats as u
ON ns.usage_id = u.id
JOIN file_stats as d JOIN file_stats as d
ON ns.download_id = d.id ON ns.download_id = d.id
JOIN file_stats as ul JOIN file_stats as ul
@ -48,15 +43,11 @@ SELECT
ns.allowance_time, ns.allowance_time,
ns.quota_kb, ns.quota_kb,
ns.quota_usage_kb, ns.quota_usage_kb,
u.num as u_num,
u.size_b as u_size_b,
d.num as d_num, d.num as d_num,
d.size_b as d_size_b, d.size_b as d_size_b,
ul.num as ul_num, ul.num as ul_num,
ul.size_b as ul_size_b ul.size_b as ul_size_b
FROM namespace as ns FROM namespace as ns
JOIN file_stats as u
ON ns.usage_id = u.id
JOIN file_stats as d JOIN file_stats as d
ON ns.download_id = d.id ON ns.download_id = d.id
JOIN file_stats as ul JOIN file_stats as ul
@ -78,7 +69,7 @@ UPDATE namespace SET
quota_kb = ?, quota_kb = ?,
quota_usage_kb = ? quota_usage_kb = ?
WHERE id = ? WHERE id = ?
RETURNING usage_id, download_id, upload_id; RETURNING download_id, upload_id;
-- name: DeleteNameSpace :exec -- name: DeleteNameSpace :exec
DELETE FROM namespace where id = ?; DELETE FROM namespace where id = ?;

@ -18,15 +18,11 @@ SELECT
ns.allowance_time, ns.allowance_time,
ns.quota_kb, ns.quota_kb,
ns.quota_usage_kb, ns.quota_usage_kb,
u.num as u_num,
u.size_b as u_size_b,
d.num as d_num, d.num as d_num,
d.size_b as d_size_b, d.size_b as d_size_b,
ul.num as ul_num, ul.num as ul_num,
ul.size_b as ul_size_b ul.size_b as ul_size_b
FROM namespace as ns FROM namespace as ns
JOIN file_stats as u
ON ns.usage_id = u.id
JOIN file_stats as d JOIN file_stats as d
ON ns.download_id = d.id ON ns.download_id = d.id
JOIN file_stats as ul JOIN file_stats as ul
@ -40,8 +36,6 @@ type AllNamespacesRow struct {
AllowanceTime sql.NullInt64 AllowanceTime sql.NullInt64
QuotaKb sql.NullInt64 QuotaKb sql.NullInt64
QuotaUsageKb sql.NullInt64 QuotaUsageKb sql.NullInt64
UNum int64
USizeB int64
DNum int64 DNum int64
DSizeB int64 DSizeB int64
UlNum int64 UlNum int64
@ -64,8 +58,6 @@ func (q *Queries) AllNamespaces(ctx context.Context) ([]AllNamespacesRow, error)
&i.AllowanceTime, &i.AllowanceTime,
&i.QuotaKb, &i.QuotaKb,
&i.QuotaUsageKb, &i.QuotaUsageKb,
&i.UNum,
&i.USizeB,
&i.DNum, &i.DNum,
&i.DSizeB, &i.DSizeB,
&i.UlNum, &i.UlNum,
@ -110,11 +102,10 @@ INSERT INTO
allowance_time, allowance_time,
quota_kb, quota_kb,
quota_usage_kb, quota_usage_kb,
usage_id,
download_id, download_id,
upload_id upload_id
) )
values(?, ?, ?, ?, ?, ?, ?, ?) values(?, ?, ?, ?, ?, ?, ?)
returning id returning id
` `
@ -124,7 +115,6 @@ type CreateNamespaceParams struct {
AllowanceTime sql.NullInt64 AllowanceTime sql.NullInt64
QuotaKb sql.NullInt64 QuotaKb sql.NullInt64
QuotaUsageKb sql.NullInt64 QuotaUsageKb sql.NullInt64
UsageID int64
DownloadID int64 DownloadID int64
UploadID int64 UploadID int64
} }
@ -136,7 +126,6 @@ func (q *Queries) CreateNamespace(ctx context.Context, arg CreateNamespaceParams
arg.AllowanceTime, arg.AllowanceTime,
arg.QuotaKb, arg.QuotaKb,
arg.QuotaUsageKb, arg.QuotaUsageKb,
arg.UsageID,
arg.DownloadID, arg.DownloadID,
arg.UploadID, arg.UploadID,
) )
@ -173,15 +162,11 @@ SELECT
ns.allowance_time, ns.allowance_time,
ns.quota_kb, ns.quota_kb,
ns.quota_usage_kb, ns.quota_usage_kb,
u.num as u_num,
u.size_b as u_size_b,
d.num as d_num, d.num as d_num,
d.size_b as d_size_b, d.size_b as d_size_b,
ul.num as ul_num, ul.num as ul_num,
ul.size_b as ul_size_b ul.size_b as ul_size_b
FROM namespace as ns FROM namespace as ns
JOIN file_stats as u
ON ns.usage_id = u.id
JOIN file_stats as d JOIN file_stats as d
ON ns.download_id = d.id ON ns.download_id = d.id
JOIN file_stats as ul JOIN file_stats as ul
@ -196,8 +181,6 @@ type GetNamespaceByNameRow struct {
AllowanceTime sql.NullInt64 AllowanceTime sql.NullInt64
QuotaKb sql.NullInt64 QuotaKb sql.NullInt64
QuotaUsageKb sql.NullInt64 QuotaUsageKb sql.NullInt64
UNum int64
USizeB int64
DNum int64 DNum int64
DSizeB int64 DSizeB int64
UlNum int64 UlNum int64
@ -214,8 +197,6 @@ func (q *Queries) GetNamespaceByName(ctx context.Context, name string) (GetNames
&i.AllowanceTime, &i.AllowanceTime,
&i.QuotaKb, &i.QuotaKb,
&i.QuotaUsageKb, &i.QuotaUsageKb,
&i.UNum,
&i.USizeB,
&i.DNum, &i.DNum,
&i.DSizeB, &i.DSizeB,
&i.UlNum, &i.UlNum,
@ -247,7 +228,7 @@ UPDATE namespace SET
quota_kb = ?, quota_kb = ?,
quota_usage_kb = ? quota_usage_kb = ?
WHERE id = ? WHERE id = ?
RETURNING usage_id, download_id, upload_id RETURNING download_id, upload_id
` `
type UpdateNamespaceParams struct { type UpdateNamespaceParams struct {
@ -260,7 +241,6 @@ type UpdateNamespaceParams struct {
} }
type UpdateNamespaceRow struct { type UpdateNamespaceRow struct {
UsageID int64
DownloadID int64 DownloadID int64
UploadID int64 UploadID int64
} }
@ -275,6 +255,6 @@ func (q *Queries) UpdateNamespace(ctx context.Context, arg UpdateNamespaceParams
arg.ID, arg.ID,
) )
var i UpdateNamespaceRow var i UpdateNamespaceRow
err := row.Scan(&i.UsageID, &i.DownloadID, &i.UploadID) err := row.Scan(&i.DownloadID, &i.UploadID)
return i, err return i, err
} }

@ -34,10 +34,9 @@ CREATE TABLE IF NOT EXISTS namespace(
allowance_time BIGINT, allowance_time BIGINT,
quota_kb BIGINT, quota_kb BIGINT,
quota_usage_kb BIGINT, quota_usage_kb BIGINT,
usage_id BIGINT NOT NULL REFERENCES file_stats(Id),
download_id BIGINT NOT NULL REFERENCES file_stats(Id), download_id BIGINT NOT NULL REFERENCES file_stats(Id),
upload_id BIGINT NOT NULL REFERENCES file_stats(Id) upload_id BIGINT NOT NULL REFERENCES file_stats(Id)
);` );`
_, err = r.db.Exec(query) _, err = r.db.Exec(query)
return err return err
@ -64,12 +63,6 @@ func (q *Queries) createFileStats(ctx context.Context, fstat namespace.FileStat)
func (r *Repository) Create(ns namespace.Namespace) (*namespace.Namespace, error) { func (r *Repository) Create(ns namespace.Namespace) (*namespace.Namespace, error) {
ctx := context.Background() ctx := context.Background()
q := New(r.db) q := New(r.db)
u_id, err := q.createFileStats(ctx, ns.Usage)
if err != nil {
return nil, err
}
dl_id, err := q.createFileStats(ctx, ns.Download) dl_id, err := q.createFileStats(ctx, ns.Download)
if err != nil { if err != nil {
@ -90,7 +83,6 @@ func (r *Repository) Create(ns namespace.Namespace) (*namespace.Namespace, error
AllowanceTime: sql.NullInt64{int64(ns.AllowanceDuration.Seconds()), true}, AllowanceTime: sql.NullInt64{int64(ns.AllowanceDuration.Seconds()), true},
QuotaKb: sql.NullInt64{int64(ns.FileQuota.AllowanceKB), true}, QuotaKb: sql.NullInt64{int64(ns.FileQuota.AllowanceKB), true},
QuotaUsageKb: sql.NullInt64{int64(ns.FileQuota.CurrentUsage), true}, QuotaUsageKb: sql.NullInt64{int64(ns.FileQuota.CurrentUsage), true},
UsageID: u_id,
DownloadID: dl_id, DownloadID: dl_id,
UploadID: ul_id, UploadID: ul_id,
} }
@ -122,7 +114,6 @@ func (r *Repository) All() ([]namespace.Namespace, error) {
time.UnixMicro(row.Lastseen), time.UnixMicro(row.Lastseen),
time.Duration(row.AllowanceTime.Int64 * int64(time.Second)), time.Duration(row.AllowanceTime.Int64 * int64(time.Second)),
namespace.FileSizeQuota{row.QuotaKb.Int64, row.QuotaUsageKb.Int64}, namespace.FileSizeQuota{row.QuotaKb.Int64, row.QuotaUsageKb.Int64},
namespace.FileStat{row.UNum, row.USizeB},
namespace.FileStat{row.DNum, row.DSizeB}, namespace.FileStat{row.DNum, row.DSizeB},
namespace.FileStat{row.UlNum, row.UlSizeB}, namespace.FileStat{row.UlNum, row.UlSizeB},
} }
@ -148,7 +139,6 @@ func (r *Repository) GetByName(name string) (*namespace.Namespace, error) {
time.UnixMicro(row.Lastseen), time.UnixMicro(row.Lastseen),
time.Duration(row.AllowanceTime.Int64 * int64(time.Second)), time.Duration(row.AllowanceTime.Int64 * int64(time.Second)),
namespace.FileSizeQuota{row.QuotaKb.Int64, row.QuotaUsageKb.Int64}, namespace.FileSizeQuota{row.QuotaKb.Int64, row.QuotaUsageKb.Int64},
namespace.FileStat{row.UNum, row.USizeB},
namespace.FileStat{row.DNum, row.DSizeB}, namespace.FileStat{row.DNum, row.DSizeB},
namespace.FileStat{row.UlNum, row.UlSizeB}, namespace.FileStat{row.UlNum, row.UlSizeB},
} }
@ -173,11 +163,6 @@ func (r *Repository) Update(id int64, ns namespace.Namespace) (*namespace.Namesp
ns.ID, ns.ID,
}) })
err = q.updateFileStat(ctx, ids.UsageID, ns.Usage)
if err != nil {
return nil, err
}
err = q.updateFileStat(ctx, ids.DownloadID, ns.Download) err = q.updateFileStat(ctx, ids.DownloadID, ns.Download)
if err != nil { if err != nil {
return nil, err return nil, err

@ -11,7 +11,6 @@ CREATE TABLE IF NOT EXISTS namespace(
allowance_time BIGINT, allowance_time BIGINT,
quota_kb BIGINT, quota_kb BIGINT,
quota_usage_kb BIGINT, quota_usage_kb BIGINT,
usage_id BIGINT NOT NULL REFERENCES file_stats(Id) ON DELETE CASCADE,
download_id BIGINT NOT NULL REFERENCES file_stats(Id) ON DELETE CASCADE, download_id BIGINT NOT NULL REFERENCES file_stats(Id) ON DELETE CASCADE,
upload_id BIGINT NOT NULL REFERENCES file_stats(Id) ON DELETE CASCADE upload_id BIGINT NOT NULL REFERENCES file_stats(Id) ON DELETE CASCADE
); );

@ -0,0 +1,11 @@
package system_time
import (
"time"
)
type Clock struct{}
func (c Clock) Now() time.Time {
return time.Now()
}

@ -6,6 +6,8 @@ import (
"caj-larsson/bog/dataswamp/swampfile" "caj-larsson/bog/dataswamp/swampfile"
fs_swampfile "caj-larsson/bog/infrastructure/fs/swampfile" fs_swampfile "caj-larsson/bog/infrastructure/fs/swampfile"
sql_namespace "caj-larsson/bog/infrastructure/sqlite/namespace" sql_namespace "caj-larsson/bog/infrastructure/sqlite/namespace"
"caj-larsson/bog/infrastructure/system_time"
"caj-larsson/bog/util"
"net/http" "net/http"
"strconv" "strconv"
"text/template" "text/template"
@ -20,10 +22,10 @@ type Router interface {
type Bog struct { type Bog struct {
router Router router Router
adminRouter Router adminRouter Router
file_service dataswamp.SwampFileService file_service dataswamp.DataSwampService
address string address string
adminAddress string adminAddress string
logger dataswamp.Logger logger util.Logger
} }
func buildFileDataRepository(config FileConfig) swampfile.Repository { func buildFileDataRepository(config FileConfig) swampfile.Repository {
@ -104,7 +106,7 @@ func (b *Bog) dashboardHandler(w http.ResponseWriter, r *http.Request) {
panic(err) panic(err)
} }
stats, _ := b.file_service.NamespaceStats() stats := b.file_service.NamespaceStats()
err = templ.Execute(w, stats) err = templ.Execute(w, stats)
if err != nil { if err != nil {
@ -114,7 +116,7 @@ func (b *Bog) dashboardHandler(w http.ResponseWriter, r *http.Request) {
func (b *Bog) routes() { func (b *Bog) routes() {
b.router.HandleFunc("/", b.fileHandler) b.router.HandleFunc("/", b.fileHandler)
b.adminRouter.HandleFunc("/", b.dashboardHandler) // b.adminRouter.HandleFunc("/", b.dashboardHandler)
} }
func (b *Bog) cleanNamespaces() { func (b *Bog) cleanNamespaces() {
@ -133,11 +135,17 @@ func New(config *Configuration) *Bog {
nsRepo := buildNamespaceRepository(config.Database) nsRepo := buildNamespaceRepository(config.Database)
logger := ServerLogger{Debug} logger := ServerLogger{Debug}
b.file_service = dataswamp.NewSwampFileService( ns_svc := namespace.NewNamespaceService(
nsRepo, nsRepo,
fsSwampRepo, logger,
config.Quota.ParsedSizeBytes(), system_time.Clock{},
config.Quota.ParsedDuration(), config.Quota.ParsedDuration(),
config.Quota.ParsedSizeBytes(),
)
b.file_service = *dataswamp.NewDataSwampService(
*ns_svc,
fsSwampRepo,
logger, logger,
) )
b.logger = logger b.logger = logger
@ -150,6 +158,6 @@ func New(config *Configuration) *Bog {
func (b *Bog) Run() { func (b *Bog) Run() {
b.logger.Info("Starting bog on address: %s", b.address) b.logger.Info("Starting bog on address: %s", b.address)
go b.cleanNamespaces() go b.cleanNamespaces()
go func(){ http.ListenAndServe(b.adminAddress, b.adminRouter) }() go func() { http.ListenAndServe(b.adminAddress, b.adminRouter) }()
http.ListenAndServe(b.address, b.router) http.ListenAndServe(b.address, b.router)
} }

@ -4,8 +4,10 @@ import (
"testing" "testing"
// "fmt" // "fmt"
"caj-larsson/bog/dataswamp" "caj-larsson/bog/dataswamp"
"caj-larsson/bog/infrastructure/memory/namespace" "caj-larsson/bog/dataswamp/namespace"
ns "caj-larsson/bog/infrastructure/memory/namespace"
"caj-larsson/bog/infrastructure/memory/swampfile" "caj-larsson/bog/infrastructure/memory/swampfile"
"caj-larsson/bog/infrastructure/system_time"
"github.com/matryer/is" "github.com/matryer/is"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
@ -22,17 +24,25 @@ func (t TestLogger) Warn(format string, a ...interface{}) {}
func TestApplication(t *testing.T) { func TestApplication(t *testing.T) {
is := is.New(t) is := is.New(t)
logger := TestLogger{} logger := TestLogger{}
file_service := dataswamp.NewSwampFileService( nsRepo := ns.NewRepository()
namespace.NewRepository(),
swampfile.NewRepository(), ns_svc := namespace.NewNamespaceService(
1000, nsRepo,
logger,
system_time.Clock{},
time.Hour, time.Hour,
1000,
)
file_service := dataswamp.NewDataSwampService(
*ns_svc,
swampfile.NewRepository(),
logger, logger,
) )
bog := Bog{ bog := Bog{
router: new(http.ServeMux), router: new(http.ServeMux),
file_service: file_service, file_service: *file_service,
address: "fake", address: "fake",
logger: logger, logger: logger,
} }

@ -0,0 +1,41 @@
package util
type Event struct {
eventName string
payload interface{}
}
func NewEvent(eventName string, payload interface{}) *Event {
return &Event{
eventName,
payload,
}
}
func (e *Event) EventName() string {
return e.eventName
}
func (e *Event) Payload() interface{} {
return e.payload
}
type EventHandler func(payload interface{})
type EventBus struct {
handlers map[string][]EventHandler
}
func NewEventBus() *EventBus {
return &EventBus{make(map[string][]EventHandler)}
}
func (eb *EventBus) Register(eventName string, handler EventHandler) {
eb.handlers[eventName] = append(eb.handlers[eventName], handler)
}
func (eb *EventBus) Handle(e Event) {
for _, handler := range eb.handlers[e.EventName()] {
handler(e.Payload())
}
}

@ -0,0 +1,23 @@
package util
import (
"github.com/matryer/is"
"testing"
)
func (eb *EventBus) Handled(e Event) bool {
// TODO: figure out how to verify the event signature here.
handlers, exists := eb.handlers[e.EventName()]
if !exists {
return false
}
return len(handlers) > 0
}
func AcceptsMessage(t *testing.T, eb *EventBus, es []Event) {
is := is.New(t)
for _, e := range es {
is.True(eb.Handled(e))
}
}

@ -0,0 +1,13 @@
package util
type Logger interface {
Debug(format string, a ...interface{})
Info(format string, a ...interface{})
Warn(format string, a ...interface{})
}
type TestLogger struct{}
func (t TestLogger) Debug(format string, a ...interface{}) {}
func (t TestLogger) Info(format string, a ...interface{}) {}
func (t TestLogger) Warn(format string, a ...interface{}) {}
Loading…
Cancel
Save