This post shows how to ship Windows Events using NXLog from a Windows system to a remote Heka server securely, and from there to ElasticSearch where they will appear in Kibana. Specifically we’ll see how to:
- Install and setup Heka as a service.
- Install and configure NXLog on Windows so it will ship Windows Event logs to Heka.
- Configure Heka to parse Windows Event logs and insert them into ElasticSearch.
- Configure NXLog and Heka to encrypt their communications with each other.
This post moves us toward the Iterative Defense Architecture.
Assumptions
- I will be setting up Heka on Debian, so you may need to adjust some instructions.
- For shipping through to ElasticSearch, you should have ElasticSearch set up and Kibana configured for it.
Future work
I hope to use Heka on Windows in the future and get rid of NXLog entirely, but for now, NXLog is the de facto standard for shipping Windows Events to Logstash, and we are using Heka as a replacement for Logstash.
Setup Heka
We need at least Heka v0.10, in order to get TLS support. At the time of this writing that means we have to build Heka ourself. Check the binary page here and if version v0.10 has been released, then use that and skip the Compiling Heka section.
Compiling Heka
To compile, first we need the dependencies:
apt-get install build-essential git cmake mercurial dh-autoreconf libprotobuf-dev
Install Go
You must have go installed of at least version 1.3.2
. We’ll use 1.4.2
which is the latest at the time of this writing. Debian’s package for golang is old, so we have to take manual steps:
wget https://storage.googleapis.com/golang/go1.4.2.linux-amd64.tar.gz
tar -C /usr/local -xzf go1.4.2.linux-amd64.tar.gz
export PATH=$PATH:/usr/local/go/bin
Additionally add the line export PATH=$PATH:/usr/local/go/bin
to your /etc/profile
.
Run go version
and you should see go version go1.4.2 linux/amd64
Build Heka
Download and build Heka:
git clone https://github.com/mozilla-services/heka
cd heka
source build.sh
cpack
dpkg -i heka_0.10.0_amd64.deb
You should now be able to run hekad -version
and see 0.10.0
Test it:
ctest
Once this finishes you should see:
100% tests passed, 0 tests failed out of 26
Total Test time (real) = 31.71 sec
After Heka installs
After you build Heka from source or install a Heka package, you’ll end up with just a /usr/bin/hekad
file, but no config and no service to start it, so it won’t be running. So let’s create a config and then we’ll turn it into a service.
Configure Heka
Initially, we’ll just configure Heka to receive remote logs and output them to stdout. This isn’t very helpful in the long-run, but ensures we can receive the events. Later I’ll show to receive these securely, and how to ship them further.
Create the config file which Heka looks for at /etc/hekad.toml
as follows:
[HttpListenInput]
address = ":5140"
[stdout]
type = "LogOutput"
message_matcher = "TRUE"
encoder = "PayloadEncoder"
[PayloadEncoder]
append_newlines = false
This will listen for events on an HTTP server running on port 5140 and print them to stdout.
Setup supervisor for Heka
Supervisor (aka supervisord) is commonly used to run Python services, but it can run any executable. It takes care of starting the process at boot, restarting the process if it crashes, logging anything written to stdout or stderr and maintaining those logs. First install supervisor:
apt-get install supervisor
Create a service account for Heka:
useradd -s /usr/sbin/nologin -r -M hekad
# Create cache directory
mkdir /var/cache/hekad
# Create log directory
mkdir /var/log/hekad
# Set owner
chown hekad:hekad /var/cache/hekad /var/log/hekad
Now we create a config file to tell supervisor how to run Heka and tell supervisor to reload it’s config files:
cat <<EOF > /etc/supervisor/conf.d/heka.conf
[program:hekad]
user=hekad
command=/usr/bin/hekad
stdout_logfile=/var/log/hekad/heka_stdout.log
stderr_logfile=/var/log/hekad/heka_stderr.log
EOF
supervisorctl reload
Now you should see hekad running:
$ ps -ef | grep hekad
hekad 18372 17926 0 03:45 ? 00:00:00 /usr/bin/hekad
Test that the server can receive data and print it on stdout. Ideally from another machine (in order to check your firewall) run:
curl -d "hello" http://heka_receiver.yournetwork.com:5140
You should this message as 2015/06/13 18:05:20 hello
in your stderr log file at /var/log/hekad/heka_stderr.log
.
If not, check the logs in /var/log/hekad/
for errors. If no logs are there, check the logs in /var/log/supervisor/
for errors.
As you should now realize, all we’re doing is using Heka to take any HTTP POST messages and write them to stdout
.
Setup NXLog
On your Windows system:
- Download and run the .msi installer of NXLog Community Edition from http://nxlog.org/products/nxlog-community-edition/download
-
Open
C:\Program Files (x86)\nxlog\conf\nxlog.conf
in a text editor with Administrative privileges and set it to:#define ROOT C:\Program Files\nxlog define ROOT C:\Program Files (x86)\nxlog Moduledir %ROOT%\modules CacheDir %ROOT%\data Pidfile %ROOT%\data\nxlog.pid SpoolDir %ROOT%\data LogFile %ROOT%\data\nxlog.log # Set to DEBUG to troubleshoot LogLevel INFO <Extension json> Module xm_json </Extension> <Input in> Module im_msvistalog # Only send new logs ReadFromLast TRUE SavePos TRUE # Convert to json Exec to_json(); # Only send select event logs Query <QueryList> \ <Query Id="0"> \ <Select Path="Application">*</Select> \ <Select Path="System">*</Select> \ <Select Path="Security">*</Select> \ </Query> \ </QueryList> # Uncomment for Windows 2000, 2003, or XP (and comment the line "Module im_msvistalog") # Module im_mseventlog </Input> <Output out> Module om_http # TODO: Change this URL to your heka server URL http://heka_receiver.yournetwork.com:5140 </Output> <Route 1> Path in => out </Route>
Set
heka_receiver.yournetwork.com
to the name of your Heka server for log collection. -
Start NXLog. If this is your first time installing NXLog, you’ll need to start it. From an elevated command prompt run:
sc start nxlog
-
Create a test event to help ensure things are working:
eventcreate /ID 1 /L Application /T ERROR /D "Testing my events"
Look at your event logs. You can do this with the GUI with
eventvwr
, or on the command-line with:wevtutil qe Application /c:1 /f:Text /rd:True
This should show something like:
Event[0]: Log Name: Application Source: EventCreate Date: 2015-06-08T21:26:23.000 Event ID: 1 Task: N/A Level: Error Opcode: Info Keyword: Classic User: S-1-5-21-3018557584-2171446095-150255294-1002 User Name: slugger\onsight Computer: slugger Description: Testing my events
-
You should have now seen a message, formatted as json, appear in your
hekad
log file at/var/log/hekad/heka_stdout.log
. It will look like:2015/06/04 17:09:57 2015/06/04 5:09:57PM MDT {"EventTime":"2015-06-09 12:02:45","Hostname":"slugger","Keywords":36028797018963968,"EventType":"ERROR","SeverityValue":4,"Severity":"ERROR","EventID":1,"SourceName":"EventCreate","Task":0,"RecordNumber":26594,"ProcessID":0,"ThreadID":0,"Channel":"Application","Domain":"slugger","AccountName":"onsight","UserID":"onsight","AccountType":"User","Message":"Testing my events","Opcode":"Info","EventReceivedTime":"2015-06-09 12:02:45","SourceModuleName":"in","SourceModuleType":"im_msvistalog"}
Troubleshooting
If you did not see the event log in your hekad output, here are some tips:
- Ensure you can ping
heka_receiver.yournetwork.com
from your Windows host. -
Ensure NXLog is running on the Windows host:
sc query nxlog
This should have a line that says
STATE : 4 RUNNING
-
Check the debug log for NXLog which is at
C:\Program Files (x86)\nxlog\data\nxlog.conf
. You can get more debug output by changing the lineLogLevel INFO
toLogLevel DEBUG
in the fileC:\Program Files (x86)\nxlog\conf\nxlog.conf
-
Ensure NXLog is picking up the events. You can do this by adding an additional output to your
nxlog.conf
file with the lines:<Output outf> Module om_file File 'C:\\Windows\\temp\\nxlog_output.txt' </Output> <Route 1> Path in => outf </Route>
Restart NXLog, create another event, and ensure it appears in
C:\Windows\temp\nxlog_output.txt
.
Remove those configuration lines once things are working.
Get events into ElasticSearch
Printing the events to stdout
is not very helpful. We want to get our events into ElasticSearch.
In a highly scaled production environment, you should consider feeding these event messages from Heka to Kafka (for buffering) to another Heka instance, and then into ElasticSearch.
For our initial setup, we’ll just feed these straight from NXLog to Heka to ElasticSearch.
Modify your /etc/hekad.toml
to this:
[HttpListenInput]
address = ":5140"
decoder = "JsonDecoder"
[JsonDecoder]
type = "SandboxDecoder"
script_type = "lua"
filename = "lua_decoders/windows_events.lua"
[ElasticSearchOutput]
message_matcher = "TRUE"
server = "http://127.0.0.1:9200" # TODO: Change this to your ElasticSearch server
flush_interval = 5000
flush_count = 1
encoder = "ESJsonEncoder"
[ESJsonEncoder]
es_index_from_timestamp = true
type_name = "%{Type}"
I removed the output to stdout
, added a JsonDecoder
, and added an output to ship this off to ElasticSearch.
You’ll need to change the server
address from http://127.0.0.1:9200
to the address of your ElasticSearch server.
Now create the file /usr/share/heka/lua_decoders/windows_events.lua
with the following contents:
require "cjson"
require "string"
-- Borrowed from: https://github.com/Clever/heka-clever-plugins/blob/master/lua/decoders/json_decoder.lua
-- with edit for the "Severity" and removing some fields.
local strict_parsing = read_config("strict")
if strict_parsing == nil or strict_parsing == '' then
strict_parsing = true
end
function process_message()
local payload = read_message("Payload")
-- check length (cjson.encode will crash if payload is > 11500 characters)
if #payload > 11500 then
return -1
end
-- Decode json, and if it fails, look for bad prefix or postfix data
local ok, json = pcall(cjson.decode, payload)
local prefix = ''
local postfix = ''
if strict_parsing and not ok then
return -1
elseif not ok then
-- try parsing once more by stripping extra content on beg and end
local json_start = string.find(payload, "{")
local json_end = string.match(payload, '^.*()}')
if json_start == nil or json_end == nil then
return -1
end
prefix = string.sub(payload, 0, json_start-1)
postfix = string.sub(payload, json_end+1)
ok, json = pcall(cjson.decode, string.sub(payload, json_start, json_end))
end
if not ok then return -1 end
if type(json) ~= "table" then return -1 end
-- Set severity
json.Severity = json.SeverityValue
write_message("Payload", cjson.encode(json))
write_message("Fields[_prefix]", prefix)
write_message("Fields[_postfix]", postfix)
-- Remove unwanted values (still available in the payload)
json.SeverityValue = nil
json.SourceModuleName = nil
json.Task = nil
json.RecordNumber = nil
json.SourceModuleType = nil
json.Keywords = nil
json.EventReceivedTime = nil
-- Add to the message so they show up as ElasticSearch fields
for k, v in pairs(json) do
-- nested json strings are not supported, stringify them
if type(v) == "table" then
write_message("Fields[" .. k .. "]", cjson.encode(v))
else
write_message("Fields[" .. k .. "]", v)
end
end
-- Set the type
write_message("Type", "windows_event")
return 0
end
Now restart hekad
(using supervisorctl restart hekad
), send it an event, and in your ElasticSearch you should be able to find the event and see that fields have been parsed out, as shown in Figure 1.
You will notice though that the Message field is retrieved exactly as is as shown in Figure 2, so for your use cases, you may need to do extra parsing.
However, usually extra fields get added automatically by Windows to assist you as shown in Figure 3.
Encrypting the transfer of logs from Windows to Heka
Right now, our logs are being sent in the clear over the wire, and our Heka server is accepting logs from anyone. We’re going to set TLS connection with server and client certificates to ensure the logs are sent encrypted, can’t be MiTM’d, and the server only accepts logs from clients it knows about.
We will generate two self-signed certs: One for the servers, and one for clients. This is common practice, although in the future, I plan to look into having a root CA and generating individual client certificates, but I don’t know how well any of these tools deal with revoking certs, or using a client cert to identify a single client system. For now, if certs need to be revoked and replaced, it’ll have to be done enterprise wide.
Generate the certs
You can follow the same certificate generation guides as for Logstash. So you can look at this guide for more explanation.
# Create directories
mkdir -p tls/private/ # For private keys
mkdir -p tls/certs/ # Public keys that can be copied anywhere
# Config file for nxlog cert
cp /usr/lib/ssl/openssl.cnf nxlog_cert.cnf
cat <<EOF >> nxlog_cert.cnf
[mysection]
basicConstraints=critical,CA:true,pathlen:1
keyUsage=critical, keyCertSign, digitalSignature
extendedKeyUsage = clientAuth
EOF
# Create client cert
openssl req -config nxlog_cert.cnf -extensions 'mysection' -subj '/C=US/ST=CA/L=San Francisco/O=Your Company/CN=nxlog shipper/' -x509 -days 3650 -batch -nodes -newkey rsa:2048 -sha256 -keyout tls/private/nxlog.key -out tls/certs/nxlog.crt
# Create
openssl req -subj '/C=US/ST=CA/L=San Francisco/O=Your Company/CN=heka_receiver.yournetwork.com' -x509 -days 3650 -batch -nodes -newkey rsa:2048 -sha256 -keyout tls/private/heka.key -out tls/certs/heka.crt
You must change the FQDN heka_receiver.yournetwork.com
to the domain for your Heka server. If you use an IP address, you’ll need to edit your /etc/ssl/openssl.cnf
Find the [ v3_ca ]
section in the file, and add this line under it (substituting in the Logstash Server’s private IP address):
subjectAltName = IP: heka_server_private_ip
Then generate your server cert with -config /etc/ssl/openssl.cnf
instead of the -subj
parameter.
You should also change Your Company
and the other data if needed, which includes your Country (C) from US
, your State (ST) from CA
and your Locality (L) from San Francisco
.
Verify that the FQDN for your Heka server appears in the output when you run:
openssl x509 -in tls/certs/heka.crt -noout -text
You should now have two public certs in tls/certs/
that you can put anywhere, and two private keys in tls/private
that you will use to authenticate systems with one another. NXLog requires a PEM formatted certs and keys though, so let’s make those conversions:
Copy files to the correct places
- Copy to the Windows host into the directory
C:\Program Files (x86)\nxlog\cert
:tls/private/nxlog.key
tls/certs/nxlog.crt
tls/certs/heka.crt
- Copy to the Heka server into the directory
/usr/share/heka/tls
(you will need to make this as root withmkdir -p /usr/share/heka/tls
:tls/private/heka.key
tls/certs/nxlog.crt
Make sure everything in this directory is own by the
hekad
user with:chown -R hekad:hekad /usr/share/heka/tls
Configure NXLog for TLS
In your NXLog config file at change your <Output out>
to:
<Output out>
Module om_http
URL https://heka_receiver.yournetwork.com:5140/
HTTPSCertFile %ROOT%\cert\nxlog.crt
HTTPSCertKeyFile %ROOT%\cert\nxlog.key
HTTPSCAFile %ROOT%\cert\heka.crt
</Output>
Again, make sure heka_receiver.yournetwork.com
is set to the correct server.
Restart the NXLog service:
sc stop nxlog
sc start nxlog
Configure Heka for TLS
Change your HttpListenInput
section in your /etc/hekad.toml
to:
[HttpListenInput]
address = ":5140"
use_tls = true
decoder = "JsonDecoder"
[HttpListenInput.tls]
cert_file = "/usr/share/heka/tls/heka.crt"
key_file = "/usr/share/heka/tls/heka.key"
client_auth = "RequireAndVerifyClientCert"
client_cafile = "/usr/share/heka/tls/nxlog.crt"
prefer_server_ciphers = true
min_version = "TLS10"
Restart hekad
:
supervisorctl restart hekad
Conclusion
You now should have Windows Event logs pouring into your ElasticSearch instance which are sent encrypted from your Windows host to Heka.