ca0e1ca769
This is an initial import of the osel codebase. The osel tool is a tool that initiates external security scans (initially through Qualys) upon reciept of AMQP events that indicate certain sensitive events have occurred, like a security group rule change. The commit history had to be thrown away because it contained some non-public data, so I would like to call out the following contributors: This uses go 1.10 and vgo for dependency management. Co-Authored-By: Charles Bitter <Charles_Bitter@cable.comcast.com> Co-Authored-By: Olivier Gagnon <Olivier_Gagnon@cable.comcast.com> Co-Authored-By: Joseph Sleiman <Joseph_Sleiman@comcast.com> Change-Id: Ib6abe2024fd91978b783ceee4cff8bb4678d7b15
134 lines
3.6 KiB
Go
134 lines
3.6 KiB
Go
package main
|
|
|
|
import (
|
|
"fmt"
|
|
"log"
|
|
"net"
|
|
"time"
|
|
|
|
"github.com/streadway/amqp"
|
|
)
|
|
|
|
func processWaitingEvent(delivery amqp.Delivery, openstackActions OpenStackActioner) (Event, error) {
|
|
// executes when an event is waiting
|
|
event, err := ParseEvent(delivery.Body)
|
|
if err != nil {
|
|
return Event{}, fmt.Errorf("Failed to parse event due to error: %s", err)
|
|
}
|
|
if event.Processor == nil {
|
|
if !Debug {
|
|
return Event{}, nil
|
|
}
|
|
return Event{}, fmt.Errorf("Ignoring event type %s", event.EventData.EventType)
|
|
}
|
|
if Debug {
|
|
log.Printf("Processing event type %s\n", event.EventData.EventType)
|
|
}
|
|
err = event.Processor.FillExtraData(&event, openstackActions)
|
|
if err != nil {
|
|
return Event{}, fmt.Errorf("Error fetching extra data: %s", err)
|
|
}
|
|
return event, nil
|
|
}
|
|
|
|
func logEvents(events []Event, logger SyslogActioner, qualys QualysActioner) {
|
|
var ipAddresses []string
|
|
var qualysIPAddresses []string
|
|
|
|
if Debug {
|
|
log.Println("Timer Expired")
|
|
}
|
|
|
|
// De-dupe IP addresses and get them into a single struct
|
|
dedupIPAddresses := make(map[string]struct{})
|
|
for _, event := range events {
|
|
for _, IPs := range event.IPs {
|
|
for _, IP := range IPs {
|
|
if _, ok := dedupIPAddresses[IP]; !ok {
|
|
ipAddresses = append(ipAddresses, IP)
|
|
}
|
|
dedupIPAddresses[IP] = struct{}{}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Disregard the scan if no targets have been found
|
|
if len(ipAddresses) == 0 {
|
|
if Debug {
|
|
log.Println("Nothing to scan, skipping...")
|
|
}
|
|
return
|
|
}
|
|
|
|
// Remove IPv6 addresses
|
|
if qualys.DropIPv6() {
|
|
for ipAddressIndex := range ipAddresses {
|
|
testIPAddress := ipAddresses[ipAddressIndex]
|
|
if net.ParseIP(testIPAddress).To4() != nil {
|
|
qualysIPAddresses = append(qualysIPAddresses, testIPAddress)
|
|
} else {
|
|
log.Println("Disregarded IPv6 address", testIPAddress)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Execute Qualys scan
|
|
|
|
log.Println("Qualys Scan Starting")
|
|
scanID, scanError := qualys.InitiateScan(qualysIPAddresses)
|
|
log.Printf("Qualys Scan Complete: scan ID='%s'; scan_error='%v'", scanID, scanError)
|
|
|
|
// Iterate through entries and format the logs
|
|
log.Printf("Processing %d events\n", len(events))
|
|
for _, event := range events {
|
|
event.QualysScanID = scanID
|
|
if scanError != nil {
|
|
event.QualysScanError = scanError.Error()
|
|
}
|
|
event.LogLines, _ = event.Processor.FormatLogs(&event, qualysIPAddresses)
|
|
|
|
// Output the logs
|
|
log.Printf("Processing %d loglines\n", len(event.LogLines))
|
|
for lineToLog := range event.LogLines {
|
|
logger.Info(event.LogLines[lineToLog])
|
|
}
|
|
}
|
|
}
|
|
|
|
func mainLoop(batchInterval time.Duration, deliveries <-chan amqp.Delivery, amqpNotifyError chan *amqp.Error, openstackActions OpenStackActioner, logger SyslogActioner, qualys QualysActioner) {
|
|
var events []Event
|
|
ticker := time.NewTicker(batchInterval)
|
|
amqpReconnectTimer := time.NewTimer(1)
|
|
for {
|
|
select {
|
|
case e := <-deliveries:
|
|
event, err := processWaitingEvent(e, openstackActions)
|
|
if err != nil {
|
|
log.Printf("Event skipped: %s\n", err)
|
|
continue
|
|
}
|
|
events = append(events, event)
|
|
case <-ticker.C:
|
|
logEvents(events, logger, qualys)
|
|
events = nil
|
|
case err := <-amqpNotifyError:
|
|
// Reinitialize AMQP on connection error
|
|
log.Printf("AMQP connection error: %s\n", err)
|
|
amqpReconnectTimer = time.NewTimer(time.Second * 30)
|
|
case <-amqpReconnectTimer.C:
|
|
var err error
|
|
amqpBus := new(AmqpActions)
|
|
amqpBus.Options = AmqpOptions{
|
|
RabbitURI: rabbitURI,
|
|
}
|
|
deliveries, amqpNotifyError, err = amqpBus.Connect()
|
|
if err != nil {
|
|
log.Printf("AMQP retry connection error: %s\n", err)
|
|
amqpReconnectTimer = time.NewTimer(time.Second * 30)
|
|
} else {
|
|
log.Printf("AMQP reconnected\n")
|
|
}
|
|
}
|
|
}
|
|
}
|