Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ go.work
.cache
*.pprof
dist/
logs/*

# Ignore actual config files
config.yaml
Expand Down
18 changes: 15 additions & 3 deletions cmd/certstream-server-go/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/d-Rickyy-b/certstream-server-go/internal/certificatetransparency"
"github.com/d-Rickyy-b/certstream-server-go/internal/config"
"github.com/d-Rickyy-b/certstream-server-go/internal/disk"
"github.com/d-Rickyy-b/certstream-server-go/internal/metrics"
"github.com/d-Rickyy-b/certstream-server-go/internal/web"
)
Expand All @@ -31,12 +32,23 @@ func main() {
}

webserver := web.NewWebsocketServer(conf.Webserver.ListenAddr, conf.Webserver.ListenPort, conf.Webserver.CertPath, conf.Webserver.CertKeyPath)

setupMetrics(conf, webserver)

go webserver.Start()

watcher := certificatetransparency.Watcher{}
if conf.DiskLogger.Enabled {
disk.StartLogger(conf.DiskLogger.LogDirectory, conf.DiskLogger.Type, conf.DiskLogger.Rotation)
}

watcherLogChannels := []certificatetransparency.LogChannel{
certificatetransparency.LOG_CHAN_WEBSOCKET,
}
if conf.DiskLogger.Enabled {
watcherLogChannels = append(watcherLogChannels, certificatetransparency.LOG_CHAN_DISK)
}

log.Printf("Following log channels are enabled: %v\n", watcherLogChannels)

watcher := certificatetransparency.Watcher{LogChannels: watcherLogChannels}
watcher.Start()
}

Expand Down
6 changes: 6 additions & 0 deletions config.sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ webserver:
cert_key_path: ""
compression_enabled: false

disklogger:
enabled: false
type: DOMAINS_ONLY # LITE, FULL, DOMAINS_ONLY
log_directory: logs
rotation: DAILY # HOURLY, DAILY

prometheus:
enabled: true
listen_addr: "0.0.0.0"
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/d-Rickyy-b/certstream-server-go

go 1.22.7
go 1.23.1

toolchain go1.23.4

Expand All @@ -19,7 +19,7 @@ require (
github.com/valyala/histogram v1.2.0 // indirect
golang.org/x/crypto v0.31.0 // indirect
golang.org/x/net v0.33.0 // indirect
golang.org/x/sys v0.28.0 // indirect
golang.org/x/sys v0.29.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20241223144023-3abc09e42ca8 // indirect
google.golang.org/grpc v1.69.2 // indirect
google.golang.org/protobuf v1.36.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U=
golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I=
golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4=
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
google.golang.org/genproto/googleapis/rpc v0.0.0-20241223144023-3abc09e42ca8 h1:TqExAhdPaB60Ux47Cn0oLV07rGnxZzIsaRhQaqS666A=
Expand Down
41 changes: 31 additions & 10 deletions internal/certificatetransparency/ct-watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/d-Rickyy-b/certstream-server-go/internal/certstream"
"github.com/d-Rickyy-b/certstream-server-go/internal/config"
"github.com/d-Rickyy-b/certstream-server-go/internal/disk"
"github.com/d-Rickyy-b/certstream-server-go/internal/web"

ct "github.com/google/certificate-transparency-go"
Expand All @@ -29,13 +30,21 @@ var (
userAgent = fmt.Sprintf("Certstream Server v%s (github.com/d-Rickyy-b/certstream-server-go)", config.Version)
)

type LogChannel string

const (
LOG_CHAN_WEBSOCKET LogChannel = "WEBSOCKET"
LOG_CHAN_DISK LogChannel = "DISK"
)

// Watcher describes a component that watches for new certificates in a CT log.
type Watcher struct {
workers []*worker
wg sync.WaitGroup
context context.Context
certChan chan certstream.Entry
cancelFunc context.CancelFunc
workers []*worker
wg sync.WaitGroup
context context.Context
certChan chan certstream.Entry
cancelFunc context.CancelFunc
LogChannels []LogChannel
}

// NewWatcher creates a new Watcher.
Expand All @@ -58,7 +67,7 @@ func (w *Watcher) Start() {
w.addNewlyAvailableLogs()

log.Println("Started CT watcher")
go certHandler(w.certChan)
go certHandler(w.certChan, w.LogChannels)
go w.watchNewLogs()

w.wg.Wait()
Expand Down Expand Up @@ -279,9 +288,20 @@ func (w *worker) foundPrecertCallback(rawEntry *ct.RawLogEntry) {
atomic.AddInt64(&processedPrecerts, 1)
}

// certHandler takes the entries out of the entryChan channel and broadcasts them to all clients.
// certHandler takes the entries out of the entryChan channel sends them to the appropriate log channels.
// Only a single instance of the certHandler runs per certstream server.
func certHandler(entryChan chan certstream.Entry) {
func certHandler(entryChan chan certstream.Entry, logChannels []LogChannel) {

channels := []chan certstream.Entry{}
for _, logChan := range logChannels {
switch logChan {
case LOG_CHAN_WEBSOCKET:
channels = append(channels, web.ClientHandler.Broadcast) //send entries to web socket
case LOG_CHAN_DISK:
channels = append(channels, disk.CertStreamEntryChan) //send entries to disk logger
}
}

var processed int64

for {
Expand All @@ -294,8 +314,9 @@ func certHandler(entryChan chan certstream.Entry) {
web.SetExampleCert(entry)
}

// Run json encoding in the background and send the result to the clients.
web.ClientHandler.Broadcast <- entry
for _, logChan := range channels {
logChan <- entry
}

// Update metrics
url := entry.Data.Source.NormalizedURL
Expand Down
14 changes: 14 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"regexp"
"strings"

"github.com/d-Rickyy-b/certstream-server-go/internal/disk"
"gopkg.in/yaml.v3"
)

Expand All @@ -33,6 +34,12 @@ type Config struct {
DomainsOnlyURL string `yaml:"domains_only_url"`
CompressionEnabled bool `yaml:"compression_enabled"`
}
DiskLogger struct {
Enabled bool `yaml:"enabled"`
Type disk.DiskLog `yaml:"type"`
LogDirectory string `yaml:"log_directory"`
Rotation string `yaml:"rotation"`
}
Prometheus struct {
ServerConfig `yaml:",inline"`
Enabled bool `yaml:"enabled"`
Expand Down Expand Up @@ -187,5 +194,12 @@ func validateConfig(config Config) bool {
}
}

if config.DiskLogger.Enabled {
if config.DiskLogger.LogDirectory == "" {
log.Fatalln("Log Directory must be specified for disk logger")
return false
}
}

return true
}
128 changes: 128 additions & 0 deletions internal/disk/filerotate/filerotate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package filerotate

import (
"fmt"
"log"
"os"
"path/filepath"
"sync"
"time"
)

type RotatableFile struct {
sync.Mutex

Directory string // file dir
Path string // file path

creationTime time.Time

file *os.File
rotateType RotateType
}

type RotateType string

const (
ROTATE_HOURLY RotateType = "ROTATE_HOURLY"
ROTATE_DAILY RotateType = "ROTATE_DAILY"
)

func New(directory string, rotateType RotateType) (*RotatableFile, error) {
file, filePath, ferr := newFile(directory, rotateType)
if ferr != nil {
return &RotatableFile{}, ferr
}

rotatableFile := RotatableFile{
creationTime: time.Now().UTC(),
Directory: directory,
Path: filePath,
file: file,
rotateType: rotateType,
}

go rotatableFile.RotateFile()

return &rotatableFile, nil
}

func newFile(directory string, rotateType RotateType) (file *os.File, filePath string, Error error) {
now := time.Now().UTC()
fileName := ""

switch rotateType {
case ROTATE_HOURLY:
fileName = now.Format(time.RFC3339)
case ROTATE_DAILY:
fallthrough
default:
fileName = now.Format("2006-01-02")
}

filePath = filepath.Join(directory, fmt.Sprintf("%s.txt", fileName))

derr := os.MkdirAll(directory, 0755)
if derr != nil {
return nil, filePath, derr
}

file, ferr := os.OpenFile(filePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if ferr != nil {
return nil, filePath, ferr
}

return file, filePath, nil
}

func (rotatableFile *RotatableFile) RotateFile() {
for {
rotateAt := rotatableFile.getNextRotation()

log.Printf("Next Disk log rotation is in: %f Hours\n", rotateAt.Hours())

<-time.After(rotateAt) // wait untill duration

newFile, _, nfErr := newFile(rotatableFile.Directory, rotatableFile.rotateType)
if nfErr != nil {
log.Panicln("Error while creation new file for rotation: ", nfErr)
}

rotatableFile.Mutex.Lock()

// switch to new file
oldFile := rotatableFile.file
rotatableFile.file = newFile

rotatableFile.Mutex.Unlock()

// sync and close old file
oldFile.Sync()
oldFile.Close()
}
}

func (rotatableFile *RotatableFile) getNextRotation() time.Duration {
now := time.Now().UTC()
switch rotatableFile.rotateType {
case ROTATE_HOURLY:
return now.Add(time.Hour).Sub(now)
case ROTATE_DAILY:
fallthrough
default:
return time.Date(now.Year(), now.Month(), now.Day()+1, 0, 0, 0, 0, time.UTC).Sub(now) // time between now and midnight
}
}

func (rotatableFile *RotatableFile) Write(b []byte) (n int, err error) {
rotatableFile.Mutex.Lock()
defer rotatableFile.Mutex.Unlock()
return rotatableFile.file.Write(b)
}

func (rotatableFile *RotatableFile) Close() error {
rotatableFile.Mutex.Lock()
defer rotatableFile.Mutex.Unlock()

return rotatableFile.file.Close()
}
70 changes: 70 additions & 0 deletions internal/disk/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package disk

import (
"log"

"github.com/d-Rickyy-b/certstream-server-go/internal/certstream"
"github.com/d-Rickyy-b/certstream-server-go/internal/disk/filerotate"
)

var (
CertStreamEntryChan chan certstream.Entry
)

type DiskLog string

const (
DISK_LOG_FULL DiskLog = "FULL"
DISK_LOG_LITE DiskLog = "LOG_LITE"
DISK_LOG_DOMAINS_ONLY DiskLog = "DOMAINS_ONLY"
)

func StartLogger(logDirectory string, logType DiskLog, rotation string) {

if CertStreamEntryChan == nil {
CertStreamEntryChan = make(chan certstream.Entry, 10_000)
}

go logEntries(logDirectory, logType, rotation)
}

func logEntries(logDirectory string, logType DiskLog, rotation string) {
var logFile *filerotate.RotatableFile
var err error

switch rotation {
case "HOURLY":
logFile, err = filerotate.New(logDirectory, filerotate.ROTATE_HOURLY)
case "DAILY":
fallthrough
default:
logFile, err = filerotate.New(logDirectory, filerotate.ROTATE_DAILY)
}

if err != nil {
log.Panic(err)
}

for {
entry, ok := <-CertStreamEntryChan

if !ok {
break
}

switch logType {
case DISK_LOG_DOMAINS_ONLY:
for _, domain := range entry.Data.LeafCert.AllDomains {
logFile.Write([]byte(domain + "\n"))
}
case DISK_LOG_LITE:
logFile.Write(entry.JSONLite())
case DISK_LOG_FULL:
fallthrough
default:
logFile.Write(entry.JSON())
}
}

logFile.Close()
}