--- /dev/null +++ b/tests/omhttp-auth-vg.sh @@ -0,0 +1,3 @@ +#!/bin/bash +export USE_VALGRIND="YES" +source ${srcdir:=.}/omhttp-auth.sh --- /dev/null +++ b/tests/omhttp-auth.sh @@ -0,0 +1,45 @@ +#!/bin/bash +# This file is part of the rsyslog project, released under ASL 2.0 + +# Starting actual testbench +. ${srcdir:=.}/diag.sh init + +export NUMMESSAGES=100 + +port="$(get_free_port)" +omhttp_start_server $port --userpwd="bob:bobbackwards" + +generate_conf +add_conf ' +template(name="tpl" type="string" + string="{\"msgnum\":\"%msg:F,58:2%\"}") + +module(load="../contrib/omhttp/.libs/omhttp") + +if $msg contains "msgnum:" then + action( + # Payload + name="my_http_action" + type="omhttp" + errorfile="'$RSYSLOG_DYNNAME/omhttp.error.log'" + template="tpl" + + server="localhost" + serverport="'$port'" + restpath="my/endpoint" + batch="off" + + # Auth + usehttps="off" + uid="bob" + pwd="bobbackwards" + ) +' +startup +injectmsg +shutdown_when_empty +wait_shutdown +omhttp_get_data $port my/endpoint +omhttp_stop_server +seq_check +exit_test --- /dev/null +++ b/tests/omhttp-basic-vg.sh @@ -0,0 +1,3 @@ +#!/bin/bash +export USE_VALGRIND="YES" +source ${srcdir:=.}/omhttp-basic.sh --- /dev/null +++ b/tests/omhttp-basic.sh @@ -0,0 +1,43 @@ +#!/bin/bash +# This file is part of the rsyslog project, released under ASL 2.0 + +# Starting actual testbench +. ${srcdir:=.}/diag.sh init + +export NUMMESSAGES=10000 + +port="$(get_free_port)" +omhttp_start_server $port + +generate_conf +add_conf ' +template(name="tpl" type="string" + string="{\"msgnum\":\"%msg:F,58:2%\"}") + +module(load="../contrib/omhttp/.libs/omhttp") + +if $msg contains "msgnum:" then + action( + # Payload + name="my_http_action" + type="omhttp" + errorfile="'$RSYSLOG_DYNNAME/omhttp.error.log'" + template="tpl" + + server="localhost" + serverport="'$port'" + restpath="my/endpoint" + batch="off" + + # Auth + usehttps="off" + ) +' +startup +injectmsg +shutdown_when_empty +wait_shutdown +omhttp_get_data $port my/endpoint +omhttp_stop_server +seq_check +exit_test --- /dev/null +++ b/tests/omhttp-batch-fail-with-400.sh @@ -0,0 +1,52 @@ +#!/bin/bash +# This file is part of the rsyslog project, released under ASL 2.0 + +# Starting actual testbench +. ${srcdir:=.}/diag.sh init + +port="$(get_free_port)" +omhttp_start_server $port --fail-with-400-after 1000 + +generate_conf +add_conf ' +module(load="../contrib/omhttp/.libs/omhttp") + +main_queue(queue.dequeueBatchSize="2048") + +template(name="tpl" type="string" + string="{\"msgnum\":\"%msg:F,58:2%\"}") + +# Wrap message as a single batch for retry +template(name="tpl_retry" type="string" string="[%msg%]") + + +ruleset(name="ruleset_omhttp") { + action( + name="action_omhttp" + type="omhttp" + errorfile="'$RSYSLOG_DYNNAME/omhttp.error.log'" + template="tpl" + + server="localhost" + serverport="'$port'" + restpath="my/endpoint" + batch="off" + + retry="on" + + # Auth + usehttps="off" + ) & stop +} + +if $msg contains "msgnum:" then + call ruleset_omhttp +' +startup +injectmsg 0 10000 +shutdown_when_empty +wait_shutdown +omhttp_get_data $port my/endpoint +omhttp_stop_server +seq_check 0 999 +exit_test --- /dev/null +++ b/tests/omhttp-batch-jsonarray-compress-vg.sh @@ -0,0 +1,3 @@ +#!/bin/bash +export USE_VALGRIND="YES" +source ${srcdir:=.}/omhttp-batch-jsonarray-compress.sh --- /dev/null +++ b/tests/omhttp-batch-jsonarray-compress.sh @@ -0,0 +1,48 @@ +#!/bin/bash +# This file is part of the rsyslog project, released under ASL 2.0 + +# Starting actual testbench +. ${srcdir:=.}/diag.sh init + +export NUMMESSAGES=50000 + +port="$(get_free_port)" +omhttp_start_server $port --decompress + +generate_conf +add_conf ' +template(name="tpl" type="string" + string="{\"msgnum\":\"%msg:F,58:2%\"}") + +module(load="../contrib/omhttp/.libs/omhttp") + +main_queue(queue.dequeueBatchSize="2048") + +if $msg contains "msgnum:" then + action( + # Payload + name="my_http_action" + type="omhttp" + errorfile="'$RSYSLOG_DYNNAME/omhttp.error.log'" + template="tpl" + + server="localhost" + serverport="'$port'" + restpath="my/endpoint" + batch="on" + batch.format="jsonarray" + batch.maxsize="1000" + compress="on" + + # Auth + usehttps="off" + ) +' +startup +injectmsg +shutdown_when_empty +wait_shutdown +omhttp_get_data $port my/endpoint jsonarray +omhttp_stop_server +seq_check +exit_test --- /dev/null +++ b/tests/omhttp-batch-jsonarray-retry-vg.sh @@ -0,0 +1,3 @@ +#!/bin/bash +export USE_VALGRIND="YES" +source ${srcdir:=.}/omhttp-batch-jsonarray-retry.sh --- /dev/null +++ b/tests/omhttp-batch-jsonarray-retry.sh @@ -0,0 +1,78 @@ +#!/bin/bash +# This file is part of the rsyslog project, released under ASL 2.0 + +# Starting actual testbench +. ${srcdir:=.}/diag.sh init + +export NUMMESSAGES=50000 + +port="$(get_free_port)" +omhttp_start_server $port --fail-every 100 + +generate_conf +add_conf ' +module(load="../contrib/omhttp/.libs/omhttp") + +main_queue(queue.dequeueBatchSize="2048") + +template(name="tpl" type="string" + string="{\"msgnum\":\"%msg:F,58:2%\"}") + +# Echo message as-is for retry +template(name="tpl_echo" type="string" string="%msg%") + +ruleset(name="ruleset_omhttp_retry") { + action( + name="action_omhttp" + type="omhttp" + errorfile="'$RSYSLOG_DYNNAME/omhttp.error.log'" + template="tpl_echo" + + server="localhost" + serverport="'$port'" + restpath="my/endpoint" + batch="on" + batch.maxsize="100" + batch.format="jsonarray" + + retry="on" + retry.ruleset="ruleset_omhttp_retry" + + # Auth + usehttps="off" + ) & stop +} + +ruleset(name="ruleset_omhttp") { + action( + name="action_omhttp" + type="omhttp" + errorfile="'$RSYSLOG_DYNNAME/omhttp.error.log'" + template="tpl" + + server="localhost" + serverport="'$port'" + restpath="my/endpoint" + batch="on" + batch.maxsize="100" + batch.format="jsonarray" + + retry="on" + retry.ruleset="ruleset_omhttp_retry" + + # Auth + usehttps="off" + ) & stop +} + +if $msg contains "msgnum:" then + call ruleset_omhttp +' +startup +injectmsg +shutdown_when_empty +wait_shutdown +omhttp_get_data $port my/endpoint jsonarray +omhttp_stop_server +seq_check +exit_test --- /dev/null +++ b/tests/omhttp-batch-jsonarray-vg.sh @@ -0,0 +1,3 @@ +#!/bin/bash +export USE_VALGRIND="YES" +source ${srcdir:=.}/omhttp-batch-jsonarray.sh --- /dev/null +++ b/tests/omhttp-batch-jsonarray.sh @@ -0,0 +1,47 @@ +#!/bin/bash +# This file is part of the rsyslog project, released under ASL 2.0 + +# Starting actual testbench +. ${srcdir:=.}/diag.sh init + +export NUMMESSAGES=50000 + +port="$(get_free_port)" +omhttp_start_server $port + +generate_conf +add_conf ' +template(name="tpl" type="string" + string="{\"msgnum\":\"%msg:F,58:2%\"}") + +module(load="../contrib/omhttp/.libs/omhttp") + +main_queue(queue.dequeueBatchSize="2048") + +if $msg contains "msgnum:" then + action( + # Payload + name="my_http_action" + type="omhttp" + errorfile="'$RSYSLOG_DYNNAME/omhttp.error.log'" + template="tpl" + + server="localhost" + serverport="'$port'" + restpath="my/endpoint" + batch="on" + batch.format="jsonarray" + batch.maxsize="1000" + + # Auth + usehttps="off" + ) +' +startup +injectmsg +shutdown_when_empty +wait_shutdown +omhttp_get_data $port my/endpoint jsonarray +omhttp_stop_server +seq_check +exit_test --- /dev/null +++ b/tests/omhttp-batch-kafkarest-retry-vg.sh @@ -0,0 +1,3 @@ +#!/bin/bash +export USE_VALGRIND="YES" +source ${srcdir:=.}/omhttp-batch-kafkarest-retry.sh --- /dev/null +++ b/tests/omhttp-batch-kafkarest-retry.sh @@ -0,0 +1,78 @@ +#!/bin/bash +# This file is part of the rsyslog project, released under ASL 2.0 + +# Starting actual testbench +. ${srcdir:=.}/diag.sh init + +export NUMMESSAGES=50000 + +port="$(get_free_port)" +omhttp_start_server $port --fail-every 100 + +generate_conf +add_conf ' +module(load="../contrib/omhttp/.libs/omhttp") + +main_queue(queue.dequeueBatchSize="2048") + +template(name="tpl" type="string" + string="{\"msgnum\":\"%msg:F,58:2%\"}") + +# Echo message as-is for retry +template(name="tpl_echo" type="string" string="%msg%") + +ruleset(name="ruleset_omhttp_retry") { + action( + name="action_omhttp" + type="omhttp" + errorfile="'$RSYSLOG_DYNNAME/omhttp.error.log'" + template="tpl_echo" + + server="localhost" + serverport="'$port'" + restpath="my/endpoint" + batch="on" + batch.maxsize="100" + batch.format="kafkarest" + + retry="on" + retry.ruleset="ruleset_omhttp_retry" + + # Auth + usehttps="off" + ) & stop +} + +ruleset(name="ruleset_omhttp") { + action( + name="action_omhttp" + type="omhttp" + errorfile="'$RSYSLOG_DYNNAME/omhttp.error.log'" + template="tpl" + + server="localhost" + serverport="'$port'" + restpath="my/endpoint" + batch="on" + batch.maxsize="100" + batch.format="kafkarest" + + retry="on" + retry.ruleset="ruleset_omhttp_retry" + + # Auth + usehttps="off" + ) & stop +} + +if $msg contains "msgnum:" then + call ruleset_omhttp +' +startup +injectmsg +shutdown_when_empty +wait_shutdown +omhttp_get_data $port my/endpoint kafkarest +omhttp_stop_server +seq_check +exit_test --- /dev/null +++ b/tests/omhttp-batch-kafkarest.sh @@ -0,0 +1,47 @@ +#!/bin/bash +# This file is part of the rsyslog project, released under ASL 2.0 + +# Starting actual testbench +. ${srcdir:=.}/diag.sh init + +export NUMMESSAGES=50000 + +port="$(get_free_port)" +omhttp_start_server $port + +generate_conf +add_conf ' +template(name="tpl" type="string" + string="{\"msgnum\":\"%msg:F,58:2%\"}") + +module(load="../contrib/omhttp/.libs/omhttp") + +main_queue(queue.dequeueBatchSize="2048") + +if $msg contains "msgnum:" then + action( + # Payload + name="my_http_action" + type="omhttp" + errorfile="'$RSYSLOG_DYNNAME/omhttp.error.log'" + template="tpl" + + server="localhost" + serverport="'$port'" + restpath="my/endpoint" + batch="on" + batch.format="kafkarest" + batch.maxsize="100" + + # Auth + usehttps="off" + ) +' +startup +injectmsg +shutdown_when_empty +wait_shutdown +omhttp_get_data $port my/endpoint kafkarest +omhttp_stop_server +seq_check +exit_test --- /dev/null +++ b/tests/omhttp-batch-newline.sh @@ -0,0 +1,47 @@ +#!/bin/bash +# This file is part of the rsyslog project, released under ASL 2.0 + +# Starting actual testbench +. ${srcdir:=.}/diag.sh init + +export NUMMESSAGES=50000 + +port="$(get_free_port)" +omhttp_start_server $port + +generate_conf +add_conf ' +template(name="tpl" type="string" + string="{\"msgnum\":\"%msg:F,58:2%\"}") + +module(load="../contrib/omhttp/.libs/omhttp") + +main_queue(queue.dequeueBatchSize="2048") + +if $msg contains "msgnum:" then + action( + # Payload + name="my_http_action" + type="omhttp" + errorfile="'$RSYSLOG_DYNNAME/omhttp.error.log'" + template="tpl" + + server="localhost" + serverport="'$port'" + restpath="my/endpoint" + batch="on" + batch.format="newline" + batch.maxsize="100" + + # Auth + usehttps="off" + ) +' +startup +injectmsg +shutdown_when_empty +wait_shutdown +omhttp_get_data $port my/endpoint newline +omhttp_stop_server +seq_check +exit_test --- /dev/null +++ b/tests/omhttp-retry-vg.sh @@ -0,0 +1,3 @@ +#!/bin/bash +export USE_VALGRIND="YES" +source ${srcdir:=.}/omhttp-retry.sh --- /dev/null +++ b/tests/omhttp-retry.sh @@ -0,0 +1,46 @@ +#!/bin/bash +# This file is part of the rsyslog project, released under ASL 2.0 + +# Starting actual testbench +. ${srcdir:=.}/diag.sh init + +export NUMMESSAGES=10000 + +port="$(get_free_port)" +omhttp_start_server $port --fail-every 1000 + +generate_conf +add_conf ' +module(load="../contrib/omhttp/.libs/omhttp") + +main_queue(queue.dequeueBatchSize="2048") + +template(name="tpl" type="string" + string="{\"msgnum\":\"%msg:F,58:2%\"}") + +if $msg contains "msgnum:" then + action( + # Payload + action.resumeRetryCount="-1" + name="my_http_action" + type="omhttp" + errorfile="'$RSYSLOG_DYNNAME/omhttp.error.log'" + template="tpl" + + server="localhost" + serverport="'$port'" + restpath="my/endpoint" + batch="off" + + # Auth + usehttps="off" + ) +' +startup +injectmsg +shutdown_when_empty +wait_shutdown +omhttp_get_data $port my/endpoint +omhttp_stop_server +seq_check +exit_test --- /dev/null +++ b/tests/omhttp_server.py @@ -0,0 +1,130 @@ +#!/usr/bin/env python + +import argparse +import json +import os +import zlib +import base64 + +try: + from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer # Python 2 +except ImportError: + from http.server import BaseHTTPRequestHandler, HTTPServer # Python 3 + +# Keep track of data received at each path +data = {} + +metadata = {'posts': 0, 'fail_after': 0, 'fail_every': -1, 'decompress': False, 'userpwd': ''} + + +class MyHandler(BaseHTTPRequestHandler): + """ + POST'd data is kept in the data global dict. + Keys are the path, values are the raw received data. + Two post requests to :/post/endpoint means data looks like... + {"/post/endpoint": ["{\"msgnum\":\"00001\"}", "{\"msgnum\":\"00001\"}"]} + + GET requests return all data posted to that endpoint as a json list. + Note that rsyslog usually sends escaped json data, so some parsing may be needed. + A get request for :/post/endpoint responds with... + ["{\"msgnum\":\"00001\"}", "{\"msgnum\":\"00001\"}"] + """ + + def validate_auth(self): + # header format for basic authentication + # 'Authorization: Basic ' + if 'Authorization' not in self.headers: + self.send_response(401) + self.end_headers() + self.wfile.write('missing "Authorization" header') + return False + + auth_header = self.headers['Authorization'] + _, b64userpwd = auth_header.split() + userpwd = base64.b64decode(b64userpwd) + if userpwd != metadata['userpwd']: + self.send_response(401) + self.end_headers() + self.wfile.write('invalid auth: {0}'.format(userpwd)) + return False + + return True + + def do_POST(self): + metadata['posts'] += 1 + + if metadata['userpwd']: + if not self.validate_auth(): + return + + if metadata['fail_with_400_after'] != -1 and metadata['posts'] > metadata['fail_with_400_after']: + self.send_response(400) + self.end_headers() + self.wfile.write('BAD REQUEST') + return + + if metadata['posts'] > 1 and metadata['fail_every'] != -1 and metadata['posts'] % metadata['fail_every'] == 0: + self.send_response(500) + self.end_headers() + self.wfile.write('INTERNAL ERROR') + return + + content_length = int(self.headers['Content-Length']) + raw_data = self.rfile.read(content_length) + + if metadata['decompress']: + post_data = zlib.decompress(raw_data, 31) + else: + post_data = raw_data + + if self.path not in data: + data[self.path] = [] + data[self.path].append(post_data) + + res = json.dumps({'msg': 'ok'}) + + self.send_response(200) + self.send_header('Content-Type', 'application/json') + self.send_header('Content-Length', len(res)) + self.end_headers() + + self.wfile.write(res) + return + + def do_GET(self): + if self.path in data: + result = data[self.path] + else: + result = [] + + res = json.dumps(result) + + self.send_response(200) + self.send_header('Content-Type', 'application/json') + self.send_header('Content-Length', len(res)) + self.end_headers() + + self.wfile.write(res) + return + + +if __name__ == '__main__': + parser = argparse.ArgumentParser(description='Archive and delete core app log files') + parser.add_argument('-p', '--port', action='store', type=int, default=8080, help='port') + parser.add_argument('-i', '--interface', action='store', type=str, default='localhost', help='port') + parser.add_argument('--fail-after', action='store', type=int, default=0, help='start failing after n posts') + parser.add_argument('--fail-every', action='store', type=int, default=-1, help='fail every n posts') + parser.add_argument('--fail-with-400-after', action='store', type=int, default=-1, help='fail with 400 after n posts') + parser.add_argument('--decompress', action='store_true', default=False, help='decompress posted data') + parser.add_argument('--userpwd', action='store', default='', help='only accept this user:password combination') + args = parser.parse_args() + metadata['fail_after'] = args.fail_after + metadata['fail_every'] = args.fail_every + metadata['fail_with_400_after'] = args.fail_with_400_after + metadata['decompress'] = args.decompress + metadata['userpwd'] = args.userpwd + server = HTTPServer((args.interface, args.port), MyHandler) + pid = os.getpid() + print('starting omhttp test server at {interface}:{port} with pid {pid}' + .format(interface=args.interface, port=args.port, pid=pid)) + server.serve_forever()