osel/processing.go
Nate Johnston ca0e1ca769 Initial import of osel code
This is an initial import of the osel codebase.  The osel tool is a tool that
initiates external security scans (initially through Qualys) upon reciept of
AMQP events that indicate certain sensitive events have occurred, like a
security group rule change.

The commit history had to be thrown away because it contained some non-public
data, so I would like to call out the following contributors:

This uses go 1.10 and vgo for dependency management.

Co-Authored-By: Charles Bitter <Charles_Bitter@cable.comcast.com>
Co-Authored-By: Olivier Gagnon <Olivier_Gagnon@cable.comcast.com>
Co-Authored-By: Joseph Sleiman <Joseph_Sleiman@comcast.com>

Change-Id: Ib6abe2024fd91978b783ceee4cff8bb4678d7b15
2018-03-24 15:30:57 +00:00

134 lines
3.6 KiB
Go

package main
import (
"fmt"
"log"
"net"
"time"
"github.com/streadway/amqp"
)
func processWaitingEvent(delivery amqp.Delivery, openstackActions OpenStackActioner) (Event, error) {
// executes when an event is waiting
event, err := ParseEvent(delivery.Body)
if err != nil {
return Event{}, fmt.Errorf("Failed to parse event due to error: %s", err)
}
if event.Processor == nil {
if !Debug {
return Event{}, nil
}
return Event{}, fmt.Errorf("Ignoring event type %s", event.EventData.EventType)
}
if Debug {
log.Printf("Processing event type %s\n", event.EventData.EventType)
}
err = event.Processor.FillExtraData(&event, openstackActions)
if err != nil {
return Event{}, fmt.Errorf("Error fetching extra data: %s", err)
}
return event, nil
}
func logEvents(events []Event, logger SyslogActioner, qualys QualysActioner) {
var ipAddresses []string
var qualysIPAddresses []string
if Debug {
log.Println("Timer Expired")
}
// De-dupe IP addresses and get them into a single struct
dedupIPAddresses := make(map[string]struct{})
for _, event := range events {
for _, IPs := range event.IPs {
for _, IP := range IPs {
if _, ok := dedupIPAddresses[IP]; !ok {
ipAddresses = append(ipAddresses, IP)
}
dedupIPAddresses[IP] = struct{}{}
}
}
}
// Disregard the scan if no targets have been found
if len(ipAddresses) == 0 {
if Debug {
log.Println("Nothing to scan, skipping...")
}
return
}
// Remove IPv6 addresses
if qualys.DropIPv6() {
for ipAddressIndex := range ipAddresses {
testIPAddress := ipAddresses[ipAddressIndex]
if net.ParseIP(testIPAddress).To4() != nil {
qualysIPAddresses = append(qualysIPAddresses, testIPAddress)
} else {
log.Println("Disregarded IPv6 address", testIPAddress)
}
}
}
// Execute Qualys scan
log.Println("Qualys Scan Starting")
scanID, scanError := qualys.InitiateScan(qualysIPAddresses)
log.Printf("Qualys Scan Complete: scan ID='%s'; scan_error='%v'", scanID, scanError)
// Iterate through entries and format the logs
log.Printf("Processing %d events\n", len(events))
for _, event := range events {
event.QualysScanID = scanID
if scanError != nil {
event.QualysScanError = scanError.Error()
}
event.LogLines, _ = event.Processor.FormatLogs(&event, qualysIPAddresses)
// Output the logs
log.Printf("Processing %d loglines\n", len(event.LogLines))
for lineToLog := range event.LogLines {
logger.Info(event.LogLines[lineToLog])
}
}
}
func mainLoop(batchInterval time.Duration, deliveries <-chan amqp.Delivery, amqpNotifyError chan *amqp.Error, openstackActions OpenStackActioner, logger SyslogActioner, qualys QualysActioner) {
var events []Event
ticker := time.NewTicker(batchInterval)
amqpReconnectTimer := time.NewTimer(1)
for {
select {
case e := <-deliveries:
event, err := processWaitingEvent(e, openstackActions)
if err != nil {
log.Printf("Event skipped: %s\n", err)
continue
}
events = append(events, event)
case <-ticker.C:
logEvents(events, logger, qualys)
events = nil
case err := <-amqpNotifyError:
// Reinitialize AMQP on connection error
log.Printf("AMQP connection error: %s\n", err)
amqpReconnectTimer = time.NewTimer(time.Second * 30)
case <-amqpReconnectTimer.C:
var err error
amqpBus := new(AmqpActions)
amqpBus.Options = AmqpOptions{
RabbitURI: rabbitURI,
}
deliveries, amqpNotifyError, err = amqpBus.Connect()
if err != nil {
log.Printf("AMQP retry connection error: %s\n", err)
amqpReconnectTimer = time.NewTimer(time.Second * 30)
} else {
log.Printf("AMQP reconnected\n")
}
}
}
}