176 lines
4.8 KiB
Go
176 lines
4.8 KiB
Go
|
package pkg
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"strconv"
|
||
|
"sync"
|
||
|
"time"
|
||
|
)
|
||
|
|
||
|
// ISpeedySink is the interface for Speedy sinks.
|
||
|
type ISpeedySink interface {
|
||
|
SendData(context context.Context, data *LokiStreams) error
|
||
|
Shutdown()
|
||
|
}
|
||
|
|
||
|
// LokiStream represents stream of data that Loki push API accepts.
|
||
|
type LokiStream struct {
|
||
|
// Labels is the key-value map used as the Labels in Loki.
|
||
|
Labels map[string]string `json:"stream"`
|
||
|
// Values an array of values.
|
||
|
Values [][]string `json:"values"`
|
||
|
// Size is the size of the current struct in bytes.
|
||
|
Size int `json:"-"`
|
||
|
}
|
||
|
|
||
|
// LokiStreams represents a list of LokiStream that Loki push API accepts.
|
||
|
type LokiStreams struct {
|
||
|
// Streams is an array of LokiStream.
|
||
|
Streams []LokiStream `json:"streams"`
|
||
|
// Count represents the number of LokiStream's in the struct.
|
||
|
Count int `json:"-"`
|
||
|
// TotalSize is the total size if the LokiStreams from struct.
|
||
|
TotalSize int `json:"-"`
|
||
|
bufferMaxBatchSize int
|
||
|
bufferMaxByteSize int
|
||
|
}
|
||
|
|
||
|
func (ls *LokiStreams) AddData(s LokiStream) {
|
||
|
ls.Count += 1
|
||
|
ls.Streams = append(ls.Streams, s)
|
||
|
ls.TotalSize += s.Size
|
||
|
}
|
||
|
|
||
|
// IsFull returns whether the LokiStreams is full by checking against buffer_max_bytes and then buffer_max_batch_size.
|
||
|
// If buffer_max_bytes is 0 then IsFull only checks against buffer_max_batch_size.
|
||
|
func (ls *LokiStreams) IsFull() bool {
|
||
|
if ls.TotalSize >= ls.bufferMaxByteSize {
|
||
|
return true
|
||
|
}
|
||
|
if ls.Count >= ls.bufferMaxBatchSize {
|
||
|
return true
|
||
|
}
|
||
|
return false
|
||
|
}
|
||
|
|
||
|
// NewLokiStreams creates a new instance of LokiStreams of max capacity
|
||
|
func NewLokiStreams(maxCapacity int, maxSizeBytes int) *LokiStreams {
|
||
|
return &LokiStreams{
|
||
|
Streams: make([]LokiStream, 0, maxCapacity),
|
||
|
Count: 0,
|
||
|
bufferMaxBatchSize: maxCapacity,
|
||
|
bufferMaxByteSize: maxSizeBytes,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Pusher ensures that messages are efficiently pushed into Sinks.
|
||
|
type Pusher struct {
|
||
|
// DataChannel is a LokiStream channel that is used to send data to the pusher.
|
||
|
DataChannel chan LokiStream
|
||
|
TimeProvider func() string
|
||
|
speedySink ISpeedySink
|
||
|
lastFlush time.Time
|
||
|
SecondsToFlush time.Duration
|
||
|
currentStreams *LokiStreams
|
||
|
maxBatchSize int
|
||
|
maxBatchSizeBytes int
|
||
|
shutdownChannel chan int
|
||
|
}
|
||
|
|
||
|
// UnixNanoTimeProvider provides time as a string in unix nanoseconds.
|
||
|
func UnixNanoTimeProvider() string {
|
||
|
return strconv.FormatInt(time.Now().UTC().UnixNano(), 10)
|
||
|
}
|
||
|
|
||
|
// NewPusher creates a new Loki pusher instance.
|
||
|
func NewPusher(sink ISpeedySink, maxBatchSize int, maxBatchSizeBytes int) *Pusher {
|
||
|
if sink == nil {
|
||
|
panic("Speedy sink is nil")
|
||
|
}
|
||
|
|
||
|
return &Pusher{
|
||
|
DataChannel: make(chan LokiStream, 1000),
|
||
|
TimeProvider: UnixNanoTimeProvider,
|
||
|
SecondsToFlush: 1 * time.Minute,
|
||
|
speedySink: sink,
|
||
|
lastFlush: time.Now(),
|
||
|
maxBatchSize: maxBatchSize,
|
||
|
maxBatchSizeBytes: maxBatchSizeBytes,
|
||
|
currentStreams: NewLokiStreams(maxBatchSize, maxBatchSizeBytes),
|
||
|
shutdownChannel: make(chan int),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// RunForever runs the pusher forever, or until Shutdown is called.
|
||
|
func (lp *Pusher) RunForever() {
|
||
|
var mutex = &sync.Mutex{}
|
||
|
tick := time.Tick(lp.SecondsToFlush)
|
||
|
|
||
|
for {
|
||
|
select {
|
||
|
case data := <-lp.DataChannel:
|
||
|
mutex.Lock()
|
||
|
// This is sort of bad but Loki does not support out of order messages, since have N goroutines
|
||
|
// we will have to override the timestamp here, otherwise the timestamps may clash or be out of order.
|
||
|
data.Values[0][0] = lp.TimeProvider()
|
||
|
lp.currentStreams.AddData(data)
|
||
|
if lp.currentStreams.IsFull() {
|
||
|
lp.flushCurrentBatch()
|
||
|
}
|
||
|
mutex.Unlock()
|
||
|
case <-lp.shutdownChannel:
|
||
|
// Ensure clean shutdown.
|
||
|
mutex.Lock()
|
||
|
//goland:noinspection ALL
|
||
|
defer mutex.Unlock()
|
||
|
SugaredLogger.Info("Shutting down Pusher. Draining")
|
||
|
for {
|
||
|
isDone := false
|
||
|
select {
|
||
|
case data := <-lp.DataChannel:
|
||
|
data.Values[0][0] = lp.TimeProvider()
|
||
|
lp.currentStreams.AddData(data)
|
||
|
if lp.currentStreams.IsFull() {
|
||
|
lp.flushCurrentBatch()
|
||
|
}
|
||
|
default:
|
||
|
isDone = true
|
||
|
}
|
||
|
if isDone {
|
||
|
break
|
||
|
}
|
||
|
}
|
||
|
SugaredLogger.Info("Drained.")
|
||
|
lp.flushCurrentBatch()
|
||
|
lp.speedySink.Shutdown()
|
||
|
return
|
||
|
case <-tick:
|
||
|
// This branch will handle periodical flushes so that the pipeline won't remain stale.
|
||
|
mutex.Lock()
|
||
|
if time.Now().Sub(lp.lastFlush).Milliseconds() >= lp.SecondsToFlush.Milliseconds() {
|
||
|
lp.flushCurrentBatch()
|
||
|
}
|
||
|
mutex.Unlock()
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// flushCurrentBatch flushes the current batch.
|
||
|
func (lp *Pusher) flushCurrentBatch() {
|
||
|
// Skip flushing, no data.
|
||
|
if lp.currentStreams.Count == 0 {
|
||
|
return
|
||
|
}
|
||
|
err := lp.speedySink.SendData(context.Background(), lp.currentStreams)
|
||
|
if err != nil {
|
||
|
SugaredLogger.Error(err)
|
||
|
}
|
||
|
lp.lastFlush = time.Now()
|
||
|
lp.currentStreams = NewLokiStreams(lp.maxBatchSize, lp.maxBatchSizeBytes)
|
||
|
}
|
||
|
|
||
|
// Shutdown shutdowns the Loki pusher.
|
||
|
func (lp *Pusher) Shutdown() {
|
||
|
lp.shutdownChannel <- 1
|
||
|
}
|