diff --git a/.gitignore b/.gitignore index 9823e0e..0c3829a 100644 --- a/.gitignore +++ b/.gitignore @@ -22,6 +22,7 @@ go.work .cache *.pprof dist/ +logs/* # Ignore actual config files config.yaml diff --git a/cmd/certstream-server-go/main.go b/cmd/certstream-server-go/main.go index e8917b9..1c762f5 100644 --- a/cmd/certstream-server-go/main.go +++ b/cmd/certstream-server-go/main.go @@ -7,6 +7,7 @@ import ( "github.com/d-Rickyy-b/certstream-server-go/internal/certificatetransparency" "github.com/d-Rickyy-b/certstream-server-go/internal/config" + "github.com/d-Rickyy-b/certstream-server-go/internal/disk" "github.com/d-Rickyy-b/certstream-server-go/internal/metrics" "github.com/d-Rickyy-b/certstream-server-go/internal/web" ) @@ -31,12 +32,23 @@ func main() { } webserver := web.NewWebsocketServer(conf.Webserver.ListenAddr, conf.Webserver.ListenPort, conf.Webserver.CertPath, conf.Webserver.CertKeyPath) - setupMetrics(conf, webserver) - go webserver.Start() - watcher := certificatetransparency.Watcher{} + if conf.DiskLogger.Enabled { + disk.StartLogger(conf.DiskLogger.LogDirectory, conf.DiskLogger.Type, conf.DiskLogger.Rotation) + } + + watcherLogChannels := []certificatetransparency.LogChannel{ + certificatetransparency.LOG_CHAN_WEBSOCKET, + } + if conf.DiskLogger.Enabled { + watcherLogChannels = append(watcherLogChannels, certificatetransparency.LOG_CHAN_DISK) + } + + log.Printf("Following log channels are enabled: %v\n", watcherLogChannels) + + watcher := certificatetransparency.Watcher{LogChannels: watcherLogChannels} watcher.Start() } diff --git a/config.sample.yaml b/config.sample.yaml index c6a9866..f30b6b2 100644 --- a/config.sample.yaml +++ b/config.sample.yaml @@ -8,6 +8,12 @@ webserver: cert_key_path: "" compression_enabled: false +disklogger: + enabled: false + type: DOMAINS_ONLY # LITE, FULL, DOMAINS_ONLY + log_directory: logs + rotation: DAILY # HOURLY, DAILY + prometheus: enabled: true listen_addr: "0.0.0.0" diff --git a/go.mod b/go.mod index 8f81c69..93762e5 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/d-Rickyy-b/certstream-server-go -go 1.22.7 +go 1.23.1 toolchain go1.23.4 @@ -19,7 +19,7 @@ require ( github.com/valyala/histogram v1.2.0 // indirect golang.org/x/crypto v0.31.0 // indirect golang.org/x/net v0.33.0 // indirect - golang.org/x/sys v0.28.0 // indirect + golang.org/x/sys v0.29.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20241223144023-3abc09e42ca8 // indirect google.golang.org/grpc v1.69.2 // indirect google.golang.org/protobuf v1.36.1 // indirect diff --git a/go.sum b/go.sum index 07660d9..15f993e 100644 --- a/go.sum +++ b/go.sum @@ -26,8 +26,8 @@ golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U= golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I= golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4= -golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= -golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= +golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= google.golang.org/genproto/googleapis/rpc v0.0.0-20241223144023-3abc09e42ca8 h1:TqExAhdPaB60Ux47Cn0oLV07rGnxZzIsaRhQaqS666A= diff --git a/internal/certificatetransparency/ct-watcher.go b/internal/certificatetransparency/ct-watcher.go index 6af73b3..13ffdc1 100644 --- a/internal/certificatetransparency/ct-watcher.go +++ b/internal/certificatetransparency/ct-watcher.go @@ -14,6 +14,7 @@ import ( "github.com/d-Rickyy-b/certstream-server-go/internal/certstream" "github.com/d-Rickyy-b/certstream-server-go/internal/config" + "github.com/d-Rickyy-b/certstream-server-go/internal/disk" "github.com/d-Rickyy-b/certstream-server-go/internal/web" ct "github.com/google/certificate-transparency-go" @@ -29,13 +30,21 @@ var ( userAgent = fmt.Sprintf("Certstream Server v%s (github.com/d-Rickyy-b/certstream-server-go)", config.Version) ) +type LogChannel string + +const ( + LOG_CHAN_WEBSOCKET LogChannel = "WEBSOCKET" + LOG_CHAN_DISK LogChannel = "DISK" +) + // Watcher describes a component that watches for new certificates in a CT log. type Watcher struct { - workers []*worker - wg sync.WaitGroup - context context.Context - certChan chan certstream.Entry - cancelFunc context.CancelFunc + workers []*worker + wg sync.WaitGroup + context context.Context + certChan chan certstream.Entry + cancelFunc context.CancelFunc + LogChannels []LogChannel } // NewWatcher creates a new Watcher. @@ -58,7 +67,7 @@ func (w *Watcher) Start() { w.addNewlyAvailableLogs() log.Println("Started CT watcher") - go certHandler(w.certChan) + go certHandler(w.certChan, w.LogChannels) go w.watchNewLogs() w.wg.Wait() @@ -279,9 +288,20 @@ func (w *worker) foundPrecertCallback(rawEntry *ct.RawLogEntry) { atomic.AddInt64(&processedPrecerts, 1) } -// certHandler takes the entries out of the entryChan channel and broadcasts them to all clients. +// certHandler takes the entries out of the entryChan channel sends them to the appropriate log channels. // Only a single instance of the certHandler runs per certstream server. -func certHandler(entryChan chan certstream.Entry) { +func certHandler(entryChan chan certstream.Entry, logChannels []LogChannel) { + + channels := []chan certstream.Entry{} + for _, logChan := range logChannels { + switch logChan { + case LOG_CHAN_WEBSOCKET: + channels = append(channels, web.ClientHandler.Broadcast) //send entries to web socket + case LOG_CHAN_DISK: + channels = append(channels, disk.CertStreamEntryChan) //send entries to disk logger + } + } + var processed int64 for { @@ -294,8 +314,9 @@ func certHandler(entryChan chan certstream.Entry) { web.SetExampleCert(entry) } - // Run json encoding in the background and send the result to the clients. - web.ClientHandler.Broadcast <- entry + for _, logChan := range channels { + logChan <- entry + } // Update metrics url := entry.Data.Source.NormalizedURL diff --git a/internal/config/config.go b/internal/config/config.go index aeb7388..5496858 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -8,6 +8,7 @@ import ( "regexp" "strings" + "github.com/d-Rickyy-b/certstream-server-go/internal/disk" "gopkg.in/yaml.v3" ) @@ -33,6 +34,12 @@ type Config struct { DomainsOnlyURL string `yaml:"domains_only_url"` CompressionEnabled bool `yaml:"compression_enabled"` } + DiskLogger struct { + Enabled bool `yaml:"enabled"` + Type disk.DiskLog `yaml:"type"` + LogDirectory string `yaml:"log_directory"` + Rotation string `yaml:"rotation"` + } Prometheus struct { ServerConfig `yaml:",inline"` Enabled bool `yaml:"enabled"` @@ -187,5 +194,12 @@ func validateConfig(config Config) bool { } } + if config.DiskLogger.Enabled { + if config.DiskLogger.LogDirectory == "" { + log.Fatalln("Log Directory must be specified for disk logger") + return false + } + } + return true } diff --git a/internal/disk/filerotate/filerotate.go b/internal/disk/filerotate/filerotate.go new file mode 100644 index 0000000..03ee501 --- /dev/null +++ b/internal/disk/filerotate/filerotate.go @@ -0,0 +1,128 @@ +package filerotate + +import ( + "fmt" + "log" + "os" + "path/filepath" + "sync" + "time" +) + +type RotatableFile struct { + sync.Mutex + + Directory string // file dir + Path string // file path + + creationTime time.Time + + file *os.File + rotateType RotateType +} + +type RotateType string + +const ( + ROTATE_HOURLY RotateType = "ROTATE_HOURLY" + ROTATE_DAILY RotateType = "ROTATE_DAILY" +) + +func New(directory string, rotateType RotateType) (*RotatableFile, error) { + file, filePath, ferr := newFile(directory, rotateType) + if ferr != nil { + return &RotatableFile{}, ferr + } + + rotatableFile := RotatableFile{ + creationTime: time.Now().UTC(), + Directory: directory, + Path: filePath, + file: file, + rotateType: rotateType, + } + + go rotatableFile.RotateFile() + + return &rotatableFile, nil +} + +func newFile(directory string, rotateType RotateType) (file *os.File, filePath string, Error error) { + now := time.Now().UTC() + fileName := "" + + switch rotateType { + case ROTATE_HOURLY: + fileName = now.Format(time.RFC3339) + case ROTATE_DAILY: + fallthrough + default: + fileName = now.Format("2006-01-02") + } + + filePath = filepath.Join(directory, fmt.Sprintf("%s.txt", fileName)) + + derr := os.MkdirAll(directory, 0755) + if derr != nil { + return nil, filePath, derr + } + + file, ferr := os.OpenFile(filePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) + if ferr != nil { + return nil, filePath, ferr + } + + return file, filePath, nil +} + +func (rotatableFile *RotatableFile) RotateFile() { + for { + rotateAt := rotatableFile.getNextRotation() + + log.Printf("Next Disk log rotation is in: %f Hours\n", rotateAt.Hours()) + + <-time.After(rotateAt) // wait untill duration + + newFile, _, nfErr := newFile(rotatableFile.Directory, rotatableFile.rotateType) + if nfErr != nil { + log.Panicln("Error while creation new file for rotation: ", nfErr) + } + + rotatableFile.Mutex.Lock() + + // switch to new file + oldFile := rotatableFile.file + rotatableFile.file = newFile + + rotatableFile.Mutex.Unlock() + + // sync and close old file + oldFile.Sync() + oldFile.Close() + } +} + +func (rotatableFile *RotatableFile) getNextRotation() time.Duration { + now := time.Now().UTC() + switch rotatableFile.rotateType { + case ROTATE_HOURLY: + return now.Add(time.Hour).Sub(now) + case ROTATE_DAILY: + fallthrough + default: + return time.Date(now.Year(), now.Month(), now.Day()+1, 0, 0, 0, 0, time.UTC).Sub(now) // time between now and midnight + } +} + +func (rotatableFile *RotatableFile) Write(b []byte) (n int, err error) { + rotatableFile.Mutex.Lock() + defer rotatableFile.Mutex.Unlock() + return rotatableFile.file.Write(b) +} + +func (rotatableFile *RotatableFile) Close() error { + rotatableFile.Mutex.Lock() + defer rotatableFile.Mutex.Unlock() + + return rotatableFile.file.Close() +} diff --git a/internal/disk/logger.go b/internal/disk/logger.go new file mode 100644 index 0000000..48ec36b --- /dev/null +++ b/internal/disk/logger.go @@ -0,0 +1,70 @@ +package disk + +import ( + "log" + + "github.com/d-Rickyy-b/certstream-server-go/internal/certstream" + "github.com/d-Rickyy-b/certstream-server-go/internal/disk/filerotate" +) + +var ( + CertStreamEntryChan chan certstream.Entry +) + +type DiskLog string + +const ( + DISK_LOG_FULL DiskLog = "FULL" + DISK_LOG_LITE DiskLog = "LOG_LITE" + DISK_LOG_DOMAINS_ONLY DiskLog = "DOMAINS_ONLY" +) + +func StartLogger(logDirectory string, logType DiskLog, rotation string) { + + if CertStreamEntryChan == nil { + CertStreamEntryChan = make(chan certstream.Entry, 10_000) + } + + go logEntries(logDirectory, logType, rotation) +} + +func logEntries(logDirectory string, logType DiskLog, rotation string) { + var logFile *filerotate.RotatableFile + var err error + + switch rotation { + case "HOURLY": + logFile, err = filerotate.New(logDirectory, filerotate.ROTATE_HOURLY) + case "DAILY": + fallthrough + default: + logFile, err = filerotate.New(logDirectory, filerotate.ROTATE_DAILY) + } + + if err != nil { + log.Panic(err) + } + + for { + entry, ok := <-CertStreamEntryChan + + if !ok { + break + } + + switch logType { + case DISK_LOG_DOMAINS_ONLY: + for _, domain := range entry.Data.LeafCert.AllDomains { + logFile.Write([]byte(domain + "\n")) + } + case DISK_LOG_LITE: + logFile.Write(entry.JSONLite()) + case DISK_LOG_FULL: + fallthrough + default: + logFile.Write(entry.JSON()) + } + } + + logFile.Close() +}