1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
| import datetime import json import logging from logging import Formatter from logging.handlers import QueueHandler from confluent_kafka import Producer produce = Producer({'bootstrap.servers': '192.168.11.5:9092'}) class KafkaQueueHandler(QueueHandler): def emit(self, record): ┆ self.queue.poll(0) ┆ self.queue.produce('test', self.format(record), callback=self.delivery_report) ┆ self.queue.flush() def delivery_report(self, err, msg): ┆ if err is not None: ┆ ┆ print('message:%s', err) ┆ else: ┆ ┆ print('message deliverd %s', msg.topic()) class QueueFormat(Formatter): def __init__(self, topic): self.topic = topic ┆ super(QueueFormat, self).__init__() def get_extra_fields(self, record): ┆ skip_list = ( ┆ ┆ 'args', 'asctime', 'created', 'exc_info', 'exc_text', 'filename', ┆ ┆ 'funcName', 'id', 'levelname', 'levelno', 'lineno', 'module', ┆ ┆ 'msecs', 'msecs', 'message', 'msg', 'name', 'pathname', 'process', ┆ ┆ 'processName', 'relativeCreated', 'thread', 'threadName', 'extra', ┆ ┆ 'auth_token', 'password' ┆ ) ┆ easy_types = (str, bool, dict, float, int, list, type(None)) ┆ fields = {} ┆ for key, value in record.__dict__.items(): ┆ ┆ if key not in skip_list: ┆ ┆ ┆ if isinstance(value, easy_types): ┆ ┆ ┆ ┆ fields[key] = value ┆ ┆ ┆ else: ┆ ┆ ┆ ┆ fields[key] = repr(value) ┆ return fields @classmethod def format_timestamp(cls, time): ┆ current_time = datetime.datetime.fromtimestamp(time) ┆ return ''.join([current_time.strftime("%Y-%m-%dT%H:%M:%S"), ┆ ┆ ┆ ┆ ┆ ".%03d"%(current_time.microsecond / 1000), ┆ ┆ ┆ ┆ ┆ "Z"]) @classmethod def format_exception(cls, exc_info): ┆ return ''.join(traceback.format_exception(*exc_info)) if exc_info else '' @classmethod def serialize(cls, message): ┆ return bytes(json.dumps(message), 'utf-8') def format(self, record): ┆ message = { ┆ ┆ '@timestamp': self.format_timestamp(record.created), ┆ ┆ '@message': record.getMessage(), ┆ ┆ 'log_level': record.levelname, ┆ ┆ 'log_file': record.filename, ┆ ┆ 'line_no': record.lineno, ┆ ┆ 'topic': self.topic ┆ } ┆ message.update(self.get_extra_fields(record)) ┆ return self.serialize(message) logger = logging.getLogger('kafka') kafka_handle = KafkaQueueHandler(produce) log_format = QueueFormat('test') kafka_handle.setFormatter(log_format) logger.setLevel(logging.DEBUG) logger.addHandler(kafka_handle) logger.info('aaaa')
|