diff --git a/files/jenkins-log-client.init b/files/jenkins-log-client.init index a62dae6..49f9d65 100755 --- a/files/jenkins-log-client.init +++ b/files/jenkins-log-client.init @@ -15,7 +15,7 @@ PATH=/sbin:/usr/sbin:/bin:/usr/bin DESC="Jenkins Log Client" NAME=jenkins-log-client -DAEMON=/usr/local/bin/log-gearman-client +DAEMON=/usr/local/bin/log-gearman-client.py PIDFILE=/var/run/$NAME/$NAME.pid DAEMON_ARGS="-c /etc/logstash/jenkins-log-client.yaml -d /var/log/logstash/log-client-debug.log -p $PIDFILE" SCRIPTNAME=/etc/init.d/$NAME diff --git a/files/log-gearman-client.py b/files/log-gearman-client.py new file mode 100644 index 0000000..4957fe7 --- /dev/null +++ b/files/log-gearman-client.py @@ -0,0 +1,244 @@ +#!/usr/bin/python2 +# +# Copyright 2013 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import argparse +import daemon +import gear +import json +import logging +import os +import os.path +import re +import signal +import socket +import threading +import time +import yaml +import zmq + + +try: + import daemon.pidlockfile as pidfile_mod +except ImportError: + import daemon.pidfile as pidfile_mod + + +class EventProcessor(threading.Thread): + def __init__(self, zmq_address, gearman_client, files, source_url): + threading.Thread.__init__(self) + self.files = files + self.source_url = source_url + self.gearman_client = gearman_client + self.zmq_address = zmq_address + self._connect_zmq() + + def run(self): + while True: + try: + self._read_event() + except: + # Assume that an error reading data from zmq or deserializing + # data received from zmq indicates a zmq error and reconnect. + logging.exception("ZMQ exception.") + self._connect_zmq() + + def _connect_zmq(self): + logging.debug("Connecting to zmq endpoint.") + self.context = zmq.Context() + self.socket = self.context.socket(zmq.SUB) + event_filter = b"onFinalized" + self.socket.setsockopt(zmq.SUBSCRIBE, event_filter) + self.socket.connect(self.zmq_address) + + def _read_event(self): + string = self.socket.recv().decode('utf-8') + event = json.loads(string.split(None, 1)[1]) + logging.debug("Jenkins event received: " + json.dumps(event)) + for fileopts in self.files: + output = {} + source_url, out_event = self._parse_event(event, fileopts) + job_filter = fileopts.get('job-filter') + if (job_filter and + not re.match(job_filter, out_event['fields']['build_name'])): + continue + build_queue_filter = fileopts.get('build-queue-filter') + if (build_queue_filter and + not re.match(build_queue_filter, + out_event['fields']['build_queue'])): + continue + project_filter = fileopts.get('project-filter') + if (project_filter and + not re.match(project_filter, out_event['fields']['project'])): + continue + output['source_url'] = source_url + output['retry'] = fileopts.get('retry-get', False) + output['event'] = out_event + if 'subunit' in fileopts.get('name'): + job = gear.Job(b'push-subunit', + json.dumps(output).encode('utf8')) + else: + job = gear.Job(b'push-log', json.dumps(output).encode('utf8')) + try: + self.gearman_client.submitJob(job) + except: + logging.exception("Exception submitting job to Gearman.") + + def _get_log_dir(self, event): + parameters = event["build"].get("parameters", {}) + base = parameters.get('LOG_PATH', 'UNKNOWN') + return base + + def _parse_fields(self, event, filename): + fields = {} + fields["filename"] = filename + fields["build_name"] = event.get("name", "UNKNOWN") + fields["build_status"] = event["build"].get("status", "UNKNOWN") + fields["build_node"] = event["build"].get("node_name", "UNKNOWN") + fields["build_master"] = event["build"].get("host_name", "UNKNOWN") + parameters = event["build"].get("parameters", {}) + fields["project"] = parameters.get("ZUUL_PROJECT", "UNKNOWN") + # The voting value is "1" for voting, "0" for non-voting + fields["voting"] = parameters.get("ZUUL_VOTING", "UNKNOWN") + # TODO(clarkb) can we do better without duplicated data here? + fields["build_uuid"] = parameters.get("ZUUL_UUID", "UNKNOWN") + fields["build_short_uuid"] = fields["build_uuid"][:7] + fields["build_queue"] = parameters.get("ZUUL_PIPELINE", "UNKNOWN") + fields["build_ref"] = parameters.get("ZUUL_REF", "UNKNOWN") + fields["build_branch"] = parameters.get("ZUUL_BRANCH", "UNKNOWN") + fields["build_zuul_url"] = parameters.get("ZUUL_URL", "UNKNOWN") + if parameters.get("ZUUL_CHANGE"): + fields["build_change"] = parameters.get("ZUUL_CHANGE", "UNKNOWN") + fields["build_patchset"] = parameters.get("ZUUL_PATCHSET", + "UNKNOWN") + elif parameters.get("ZUUL_NEWREV"): + fields["build_newrev"] = parameters.get("ZUUL_NEWREV", + "UNKNOWN") + if ["build_node"] != "UNKNOWN": + node_region = '-'.join( + fields["build_node"].split('-')[-3:-1]) + fields["node_region"] = node_region or "UNKNOWN" + else: + fields["node_region"] = "UNKNOWN" + return fields + + def _parse_event(self, event, fileopts): + fields = self._parse_fields(event, fileopts['name']) + log_dir = self._get_log_dir(event) + source_url = fileopts.get('source-url', self.source_url) + '/' + \ + os.path.join(log_dir, fileopts['name']) + fields["log_url"] = source_url + out_event = {} + out_event["fields"] = fields + out_event["tags"] = [os.path.basename(fileopts['name'])] + \ + fileopts.get('tags', []) + return source_url, out_event + + +class Server(object): + def __init__(self, config, debuglog): + # Config init. + self.config = config + self.source_url = self.config['source-url'] + # Pythong logging output file. + self.debuglog = debuglog + self.processors = [] + + def setup_logging(self): + if self.debuglog: + logging.basicConfig(format='%(asctime)s %(message)s', + filename=self.debuglog, level=logging.DEBUG) + else: + # Prevent leakage into the logstash log stream. + logging.basicConfig(level=logging.CRITICAL) + logging.debug("Log pusher starting.") + + def setup_processors(self): + for publisher in self.config['zmq-publishers']: + gearclient = gear.Client() + gearclient.addServer('localhost') + gearclient.waitForServer() + log_processor = EventProcessor( + publisher, gearclient, + self.config['source-files'], self.source_url) + subunit_processor = EventProcessor( + publisher, gearclient, + self.config['subunit-files'], self.source_url) + self.processors.append(log_processor) + self.processors.append(subunit_processor) + + def wait_for_name_resolution(self, host, port): + while True: + try: + socket.getaddrinfo(host, port) + except socket.gaierror as e: + if e.errno == socket.EAI_AGAIN: + logging.debug("Temporary failure in name resolution") + time.sleep(2) + continue + else: + raise + break + + def main(self): + statsd_host = os.environ.get('STATSD_HOST') + statsd_port = int(os.environ.get('STATSD_PORT', 8125)) + statsd_prefix = os.environ.get('STATSD_PREFIX', 'logstash.geard') + if statsd_host: + self.wait_for_name_resolution(statsd_host, statsd_port) + self.gearserver = gear.Server( + statsd_host=statsd_host, + statsd_port=statsd_port, + statsd_prefix=statsd_prefix) + + self.setup_processors() + for processor in self.processors: + processor.daemon = True + processor.start() + while True: + signal.pause() + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("-c", "--config", required=True, + help="Path to yaml config file.") + parser.add_argument("-d", "--debuglog", + help="Enable debug log. " + "Specifies file to write log to.") + parser.add_argument("--foreground", action='store_true', + help="Run in the foreground.") + parser.add_argument("-p", "--pidfile", + default="/var/run/jenkins-log-pusher/" + "jenkins-log-gearman-client.pid", + help="PID file to lock during daemonization.") + args = parser.parse_args() + + with open(args.config, 'r') as config_stream: + config = yaml.load(config_stream) + server = Server(config, args.debuglog) + + if args.foreground: + server.setup_logging() + server.main() + else: + pidfile = pidfile_mod.TimeoutPIDLockFile(args.pidfile, 10) + with daemon.DaemonContext(pidfile=pidfile): + server.setup_logging() + server.main() + + +if __name__ == '__main__': + main() diff --git a/files/log-gearman-worker.py b/files/log-gearman-worker.py new file mode 100644 index 0000000..74acc00 --- /dev/null +++ b/files/log-gearman-worker.py @@ -0,0 +1,446 @@ +#!/usr/bin/python2 +# +# Copyright 2013 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import argparse +import cStringIO +import daemon +import gear +import gzip +import json +import logging +import os +import Queue +import re +import select +import socket +import subprocess +import sys +import threading +import time +import urllib2 +import yaml + + +try: + import daemon.pidlockfile as pidfile_mod +except ImportError: + import daemon.pidfile as pidfile_mod + + +def semi_busy_wait(seconds): + # time.sleep() may return early. If it does sleep() again and repeat + # until at least the number of seconds specified has elapsed. + start_time = time.time() + while True: + time.sleep(seconds) + cur_time = time.time() + seconds = seconds - (cur_time - start_time) + if seconds <= 0.0: + return + + +class FilterException(Exception): + pass + + +class CRM114Filter(object): + def __init__(self, script, path, build_status): + self.p = None + self.script = script + self.path = path + self.build_status = build_status + if build_status not in ['SUCCESS', 'FAILURE']: + return + if not os.path.exists(path): + os.makedirs(path) + args = [script, path, build_status] + self.p = subprocess.Popen(args, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + stdin=subprocess.PIPE, + close_fds=True) + + def process(self, data): + if not self.p: + return + self.p.stdin.write(data['message'].encode('utf-8') + '\n') + (r, w, x) = select.select([self.p.stdout], [], + [self.p.stdin, self.p.stdout], 20) + if not r: + self.p.kill() + raise FilterException('Timeout reading from CRM114') + r = self.p.stdout.readline() + if not r: + err = self.p.stderr.read() + if err: + raise FilterException(err) + else: + raise FilterException('Early EOF from CRM114') + r = r.strip() + data['error_pr'] = float(r) + + def _catchOSError(self, method): + try: + method() + except OSError: + logging.exception("Subprocess cleanup failed.") + + def close(self): + if not self.p: + return + # CRM114 should die when its stdinput is closed. Close that + # fd along with stdout and stderr then return. + self._catchOSError(self.p.stdin.close) + self._catchOSError(self.p.stdout.close) + self._catchOSError(self.p.stderr.close) + self._catchOSError(self.p.wait) + + +class CRM114FilterFactory(object): + name = "CRM114" + + def __init__(self, script, basepath): + self.script = script + self.basepath = basepath + + def create(self, fields): + filename = re.sub('\.', '_', fields['filename']) + path = os.path.join(self.basepath, filename) + return CRM114Filter(self.script, path, fields['build_status']) + + +class LogRetriever(threading.Thread): + def __init__(self, gearman_worker, filters, logq): + threading.Thread.__init__(self) + self.gearman_worker = gearman_worker + self.filters = filters + self.logq = logq + + def run(self): + while True: + try: + self._handle_event() + except: + logging.exception("Exception retrieving log event.") + + def _handle_event(self): + job = self.gearman_worker.getJob() + try: + arguments = json.loads(job.arguments.decode('utf-8')) + source_url = arguments['source_url'] + retry = arguments['retry'] + event = arguments['event'] + logging.debug("Handling event: " + json.dumps(event)) + fields = event.get('fields') or event.get('@fields') + tags = event.get('tags') or event.get('@tags') + if fields['build_status'] != 'ABORTED': + # Handle events ignoring aborted builds. These builds are + # discarded by zuul. + log_lines = self._retrieve_log(source_url, retry) + + try: + all_filters = [] + for f in self.filters: + logging.debug("Adding filter: %s" % f.name) + all_filters.append(f.create(fields)) + filters = all_filters + + logging.debug("Pushing " + str(len(log_lines)) + + " log lines.") + base_event = {} + base_event.update(fields) + base_event["tags"] = tags + for line in log_lines: + out_event = base_event.copy() + out_event["message"] = line + new_filters = [] + for f in filters: + try: + f.process(out_event) + new_filters.append(f) + except FilterException: + logging.exception("Exception filtering event: " + "%s" % line.encode("utf-8")) + filters = new_filters + self.logq.put(out_event) + finally: + for f in all_filters: + f.close() + job.sendWorkComplete() + except Exception as e: + logging.exception("Exception handling log event.") + job.sendWorkException(str(e).encode('utf-8')) + + def _retrieve_log(self, source_url, retry): + # TODO (clarkb): This should check the content type instead of file + # extension for determining if gzip was used. + gzipped = False + raw_buf = b'' + try: + gzipped, raw_buf = self._get_log_data(source_url, retry) + except urllib2.HTTPError as e: + if e.code == 404: + logging.info("Unable to retrieve %s: HTTP error 404" % + source_url) + else: + logging.exception("Unable to get log data.") + except Exception: + # Silently drop fatal errors when retrieving logs. + # TODO (clarkb): Handle these errors. + # Perhaps simply add a log message to raw_buf? + logging.exception("Unable to get log data.") + if gzipped: + logging.debug("Decompressing gzipped source file.") + raw_strIO = cStringIO.StringIO(raw_buf) + f = gzip.GzipFile(fileobj=raw_strIO) + buf = f.read().decode('utf-8') + raw_strIO.close() + f.close() + else: + logging.debug("Decoding source file.") + buf = raw_buf.decode('utf-8') + return buf.splitlines() + + def _get_log_data(self, source_url, retry): + gzipped = False + try: + # TODO(clarkb): We really should be using requests instead + # of urllib2. urllib2 will automatically perform a POST + # instead of a GET if we provide urlencoded data to urlopen + # but we need to do a GET. The parameters are currently + # hardcoded so this should be ok for now. + logging.debug("Retrieving: " + source_url + ".gz?level=INFO") + req = urllib2.Request(source_url + ".gz?level=INFO") + req.add_header('Accept-encoding', 'gzip') + r = urllib2.urlopen(req) + except urllib2.URLError: + try: + # Fallback on GETting unzipped data. + logging.debug("Retrieving: " + source_url + "?level=INFO") + r = urllib2.urlopen(source_url + "?level=INFO") + except: + logging.exception("Unable to retrieve source file.") + raise + except: + logging.exception("Unable to retrieve source file.") + raise + if ('gzip' in r.info().get('Content-Type', '') or + 'gzip' in r.info().get('Content-Encoding', '')): + gzipped = True + + raw_buf = r.read() + # Hack to read all of Jenkins console logs as they upload + # asynchronously. After each attempt do an exponential backup before + # the next request for up to 255 seconds total, if we do not + # retrieve the entire file. Short circuit when the end of file string + # for console logs, '\n\n', is read. + if (retry and not gzipped and + raw_buf[-8:].decode('utf-8') != '\n\n'): + content_len = len(raw_buf) + backoff = 1 + while backoff < 129: + # Try for up to 255 seconds to retrieve the complete log file. + try: + logging.debug(str(backoff) + " Retrying fetch of: " + + source_url + "?level=INFO") + logging.debug("Fetching bytes=" + str(content_len) + '-') + req = urllib2.Request(source_url + "?level=INFO") + req.add_header('Range', 'bytes=' + str(content_len) + '-') + r = urllib2.urlopen(req) + raw_buf += r.read() + content_len = len(raw_buf) + except urllib2.HTTPError as e: + if e.code == 416: + logging.exception("Index out of range.") + else: + raise + finally: + if raw_buf[-8:].decode('utf-8') == '\n\n': + break + semi_busy_wait(backoff) + backoff += backoff + + return gzipped, raw_buf + + +class StdOutLogProcessor(object): + def __init__(self, logq, pretty_print=False): + self.logq = logq + self.pretty_print = pretty_print + + def handle_log_event(self): + log = self.logq.get() + if self.pretty_print: + print(json.dumps(log, sort_keys=True, + indent=4, separators=(',', ': '))) + else: + print(json.dumps(log)) + # Push each log event through to keep logstash up to date. + sys.stdout.flush() + + +class INETLogProcessor(object): + socket_type = None + + def __init__(self, logq, host, port): + self.logq = logq + self.host = host + self.port = port + self.socket = None + + def _connect_socket(self): + logging.debug("Creating socket.") + self.socket = socket.socket(socket.AF_INET, self.socket_type) + self.socket.connect((self.host, self.port)) + + def handle_log_event(self): + log = self.logq.get() + try: + if self.socket is None: + self._connect_socket() + self.socket.sendall((json.dumps(log) + '\n').encode('utf-8')) + except: + logging.exception("Exception sending INET event.") + # Logstash seems to take about a minute to start again. Wait 90 + # seconds before attempting to reconnect. If logstash is not + # available after 90 seconds we will throw another exception and + # die. + semi_busy_wait(90) + self._connect_socket() + self.socket.sendall((json.dumps(log) + '\n').encode('utf-8')) + + +class UDPLogProcessor(INETLogProcessor): + socket_type = socket.SOCK_DGRAM + + +class TCPLogProcessor(INETLogProcessor): + socket_type = socket.SOCK_STREAM + + +class Server(object): + def __init__(self, config, debuglog): + # Config init. + self.config = config + self.gearman_host = self.config['gearman-host'] + self.gearman_port = self.config['gearman-port'] + self.output_host = self.config['output-host'] + self.output_port = self.config['output-port'] + self.output_mode = self.config['output-mode'] + # Pythong logging output file. + self.debuglog = debuglog + self.retriever = None + self.logqueue = Queue.Queue(131072) + self.processor = None + self.filter_factories = [] + crmscript = self.config.get('crm114-script') + crmdata = self.config.get('crm114-data') + if crmscript and crmdata: + self.filter_factories.append( + CRM114FilterFactory(crmscript, crmdata)) + + def setup_logging(self): + if self.debuglog: + logging.basicConfig(format='%(asctime)s %(message)s', + filename=self.debuglog, level=logging.DEBUG) + else: + # Prevent leakage into the logstash log stream. + logging.basicConfig(level=logging.CRITICAL) + logging.debug("Log pusher starting.") + + def wait_for_name_resolution(self, host, port): + while True: + try: + socket.getaddrinfo(host, port) + except socket.gaierror as e: + if e.errno == socket.EAI_AGAIN: + logging.debug("Temporary failure in name resolution") + time.sleep(2) + continue + else: + raise + break + + def setup_retriever(self): + hostname = socket.gethostname() + gearman_worker = gear.Worker(hostname + b'-pusher') + self.wait_for_name_resolution(self.gearman_host, self.gearman_port) + gearman_worker.addServer(self.gearman_host, + self.gearman_port) + gearman_worker.registerFunction(b'push-log') + self.retriever = LogRetriever(gearman_worker, self.filter_factories, + self.logqueue) + + def setup_processor(self): + if self.output_mode == "tcp": + self.processor = TCPLogProcessor(self.logqueue, + self.output_host, + self.output_port) + elif self.output_mode == "udp": + self.processor = UDPLogProcessor(self.logqueue, + self.output_host, + self.output_port) + else: + # Note this processor will not work if the process is run as a + # daemon. You must use the --foreground option. + self.processor = StdOutLogProcessor(self.logqueue) + + def main(self): + self.setup_retriever() + self.setup_processor() + + self.retriever.daemon = True + self.retriever.start() + + while True: + try: + self.processor.handle_log_event() + except: + logging.exception("Exception processing log event.") + raise + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("-c", "--config", required=True, + help="Path to yaml config file.") + parser.add_argument("-d", "--debuglog", + help="Enable debug log. " + "Specifies file to write log to.") + parser.add_argument("--foreground", action='store_true', + help="Run in the foreground.") + parser.add_argument("-p", "--pidfile", + default="/var/run/jenkins-log-pusher/" + "jenkins-log-gearman-worker.pid", + help="PID file to lock during daemonization.") + args = parser.parse_args() + + with open(args.config, 'r') as config_stream: + config = yaml.load(config_stream) + server = Server(config, args.debuglog) + + if args.foreground: + server.setup_logging() + server.main() + else: + pidfile = pidfile_mod.TimeoutPIDLockFile(args.pidfile, 10) + with daemon.DaemonContext(pidfile=pidfile): + server.setup_logging() + server.main() + + +if __name__ == '__main__': + main() diff --git a/manifests/client.pp b/manifests/client.pp index 2e7f1fb..999a4af 100644 --- a/manifests/client.pp +++ b/manifests/client.pp @@ -36,7 +36,7 @@ class log_processor::client ( mode => '0555', source => 'puppet:///modules/log_processor/jenkins-log-client.init', require => [ - Vcsrepo['/opt/log_processor'], + File['/usr/local/bin/log-gearman-client.py'], File['/etc/logstash/jenkins-log-client.yaml'], File['/etc/default/jenkins-log-client'], ], diff --git a/manifests/init.pp b/manifests/init.pp index f5d5c5f..eb08cf4 100644 --- a/manifests/init.pp +++ b/manifests/init.pp @@ -16,8 +16,6 @@ # == Class: log_processor # class log_processor ( - $git_source = 'git://git.openstack.org/openstack-infra/log_processor', - $git_rev = 'master', ) { package { 'python-daemon': ensure => present, @@ -35,6 +33,19 @@ class log_processor ( ensure => present, } + include ::pip + package { 'gear': + ensure => latest, + provider => 'pip', + require => Class['pip'], + } + + package { 'statsd': + ensure => latest, + provider => 'pip', + require => Class['pip'], + } + file { '/var/lib/crm114': ensure => directory, owner => 'logstash', @@ -53,23 +64,31 @@ class log_processor ( ], } - vcsrepo { '/opt/log_processor': - ensure => latest, - provider => git, - source => $git_source, - revision => $git_rev, - } - - exec { 'install-log_processor': - command => 'pip install --install-option="--install-scripts=/usr/local/bin" --install-option="--install-lib=/usr/local/lib/python2.7/dist-packages" /opt/log_processor', - path => '/usr/local/bin:/usr/bin:/bin', - refreshonly => true, - subscribe => Vcsrepo['/opt/log_processor'], - require => [ + file { '/usr/local/bin/log-gearman-client.py': + ensure => present, + owner => 'root', + group => 'root', + mode => '0755', + source => 'puppet:///modules/log_processor/log-gearman-client.py', + require => [ Package['python-daemon'], - Package['python-yaml'], Package['python-zmq'], + Package['python-yaml'], + Package['gear'], ], } + file { '/usr/local/bin/log-gearman-worker.py': + ensure => present, + owner => 'root', + group => 'root', + mode => '0755', + source => 'puppet:///modules/log_processor/log-gearman-worker.py', + require => [ + Package['python-daemon'], + Package['python-zmq'], + Package['python-yaml'], + Package['gear'], + ], + } } diff --git a/manifests/worker.pp b/manifests/worker.pp index bc65f08..cfe173d 100644 --- a/manifests/worker.pp +++ b/manifests/worker.pp @@ -36,7 +36,7 @@ define log_processor::worker ( mode => '0555', content => template('log_processor/jenkins-log-worker.init.erb'), require => [ - Vcsrepo['/opt/log_processor'], + File['/usr/local/bin/log-gearman-worker.py'], File["/etc/logstash/jenkins-log-worker${suffix}.yaml"], ], } diff --git a/templates/jenkins-log-worker.init.erb b/templates/jenkins-log-worker.init.erb index af016d8..f122623 100755 --- a/templates/jenkins-log-worker.init.erb +++ b/templates/jenkins-log-worker.init.erb @@ -15,7 +15,7 @@ PATH=/sbin:/usr/sbin:/bin:/usr/bin DESC="Jenkins Log Worker" NAME=jenkins-log-worker<%= suffix %> -DAEMON=/usr/local/bin/log-gearman-worker +DAEMON=/usr/local/bin/log-gearman-worker.py PIDFILE=/var/run/$NAME/$NAME.pid DAEMON_ARGS="-c /etc/logstash/jenkins-log-worker<%= suffix %>.yaml -d /var/log/logstash/log-worker<%= suffix %>-debug.log -p $PIDFILE" SCRIPTNAME=/etc/init.d/$NAME