Python实现分布式日志

Python实现分布式日志

分布式日志初探

​ 在写分布式爬虫过程中,需要打印一些关键性日志,但是程序是分布在各个机器上,这样不便于我们程序的日志的统计,以及错误代码的查看;

​ 所以我查看了关于logging的教程,显示logging的handler存在以下几种:

1
2
3
4
5
6
7
8
9
10
11
12
StreamHandler:logging.StreamHandler;日志输出到流,可以是sys.stderr,sys.stdout或者文件
FileHandler:logging.FileHandler;日志输出到文件
BaseRotatingHandler:logging.handlers.BaseRotatingHandler;基本的日志回滚方式
RotatingHandler:logging.handlers.RotatingHandler;日志回滚方式,支持日志文件最大数量和日志文件回滚
TimeRotatingHandler:logging.handlers.TimeRotatingHandler;日志回滚方式,在一定时间区域内回滚日志文件
SocketHandler:logging.handlers.SocketHandler;远程输出日志到TCP/IP sockets
DatagramHandler:logging.handlers.DatagramHandler;远程输出日志到UDP sockets
SMTPHandler:logging.handlers.SMTPHandler;远程输出日志到邮件地址
SysLogHandler:logging.handlers.SysLogHandler;日志输出到syslog
NTEventLogHandler:logging.handlers.NTEventLogHandler;远程输出日志到Windows NT/2000/XP的事件日志
MemoryHandler:logging.handlers.MemoryHandler;日志输出到内存中的指定buffer
HTTPHandler:logging.handlers.HTTPHandler;通过"GET"或者"POST"远程输出到HTTP服务器

其中我特别注意到的handler是SocketHandler, handler可以实现socket发送日志;

​ 而联想到使用es来存取日志,本来是想用es api来接收日志(这样就可以使用HTTPHandler),但是http毕竟是上层协议,发送可能会慢,所以继续找到了logstash, 作为ELK的一员,我们可以先发送到logstash再转发到es,接下来就查看logstash是否有python开源包,接下来就发现了python-logstash。

安装并运行logstash

  • 安装java

  • 进入官网下载安装包

  • 上传服务器并解压(这里使用的是6.3.2)

1
tar -zxvf logstash-6.3.2.tar.gz
  • 编写logstash的配置文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
input {                                                                           
udp {
port => 5959
codec => json
}
}

output {
elasticsearch{
hosts => ["192.168.1.17:9200", "192.168.1.18:9200", "192.168.1.19:9200"]
index => "crawler-%{+YYYY.MM.dd}"
}
stdout{
codec => rubydebug
}
}
  • 运行logstash
1
logstash -f logstash.conf
  • 截图

logstash运行成功

使用python-logstash

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import logging
import logstash
import sys

host = 'localhost'

test_logger = logging.getLogger('python-logstash-logger')
test_logger.setLevel(logging.INFO)
test_logger.addHandler(logstash.LogstashHandler(host, 5959, version=1))

extra = {
'test_string': 'python version: ' + repr(sys.version_info),
'test_boolean': True,
'test_dict': {'a': 1, 'b': 'c'},
'test_float': 1.23,
'test_integer': 123,
'test_list': [1, 2, '3'],
}
test_logger.info('python-logstash: test extra fields', extra=extra)

运行后logstash显示:

logstash接收到日志

查看es中是否接收到数据:

es显示成功

以上,我们基本的分布式日志已经发送测试成功;

深入研究logging的日志

​ 第三方库中的日志格式并不满足我们的需求,所以我们可以对其进行修改(借鉴其源码):

源码:详细代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class LogstashFormatterBase(logging.Formatter):

def __init__(self, message_type='Logstash', tags=None, fqdn=False):
pass
def get_extra_fields(self, record):
pass

def get_debug_fields(self, record):
pass

@classmethod
def format_source(cls, message_type, host, path):
pass

@classmethod
def format_timestamp(cls, time):
pass

@classmethod
def format_exception(cls, exc_info):
pass

@classmethod
def serialize(cls, message):
pass

这里我们发现其继承了Formatter;查看Logging官方文档可以得知,实现自定义format只要实现其format方法就可以了,第一次我直接继承了python-logstash的LogstashFormatterVersion1:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class LogstashFormat(LogstashFormatterVersion1):

def format(self, record):
message = {
'@timestamp': self.format_timestamp(record.created),
'@message': record.getMessage(),
'log_level': record.levelname,
'log_file': record.filename,
'line_no': record.lineno,
'host': self.host,
}
message.update(self.get_extra_fields(record))

return self.serialize(message)

@classmethod
def format_timestamp(cls, time):
current_time = datetime.datetime.fromtimestamp(time)
return ''.join([current_time.strftime("%Y-%m-%dT%H:%M:%S"),
".%03d"%(current_time.microsecond / 1000),
"Z"])

def init_logstash_logger(level, host, port):
logger = logging.getLogger('python-logstash-logger')
logger.setLevel(LEVELS.get(level))
logstash_format = LogstashFormat()
logstash_handler = LogstashHandler(host, port)
logstash_handler.setFormatter(logstash_format)
logger.addHandler(logstash_handler)
return logger

​ 修改了其对时间的处理,以及format返回的处理,这里我们实现一个初始化方法进行测试;

执行:

测试

结果:

测试结果

以上我们就可以自定义,自己的代码发送到logstash上了。

es会对上传的日志进行日期建立索引

es索引

提供发送日志到kafka源码:

要求: python3.7

库: confluent-kafka-python

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import datetime                                                                        
import json
import logging

from logging import Formatter
from logging.handlers import QueueHandler
from confluent_kafka import Producer


produce = Producer({'bootstrap.servers': '192.168.11.5:9092'})

class KafkaQueueHandler(QueueHandler):

def emit(self, record):
┆ self.queue.poll(0)
┆ self.queue.produce('test', self.format(record), callback=self.delivery_report)
┆ self.queue.flush()

def delivery_report(self, err, msg):
if err is not None:
┆ ┆ print('message:%s', err)
else:
┆ ┆ print('message deliverd %s', msg.topic())


class QueueFormat(Formatter):

def __init__(self, topic):
self.topic = topic
┆ super(QueueFormat, self).__init__()

def get_extra_fields(self, record):
┆ skip_list = (
┆ ┆ 'args', 'asctime', 'created', 'exc_info', 'exc_text', 'filename',
┆ ┆ 'funcName', 'id', 'levelname', 'levelno', 'lineno', 'module',
┆ ┆ 'msecs', 'msecs', 'message', 'msg', 'name', 'pathname', 'process',
┆ ┆ 'processName', 'relativeCreated', 'thread', 'threadName', 'extra',
┆ ┆ 'auth_token', 'password'
┆ )
┆ easy_types = (str, bool, dict, float, int, list, type(None))

┆ fields = {}
for key, value in record.__dict__.items():
┆ ┆ if key not in skip_list:
┆ ┆ ┆ if isinstance(value, easy_types):
┆ ┆ ┆ ┆ fields[key] = value
┆ ┆ ┆ else:
┆ ┆ ┆ ┆ fields[key] = repr(value)

return fields

@classmethod
def format_timestamp(cls, time):
┆ current_time = datetime.datetime.fromtimestamp(time)
return ''.join([current_time.strftime("%Y-%m-%dT%H:%M:%S"),
┆ ┆ ┆ ┆ ┆ ".%03d"%(current_time.microsecond / 1000),
┆ ┆ ┆ ┆ ┆ "Z"])
@classmethod
def format_exception(cls, exc_info):
return ''.join(traceback.format_exception(*exc_info)) if exc_info else ''

@classmethod
def serialize(cls, message):
return bytes(json.dumps(message), 'utf-8')

def format(self, record):
┆ message = {
┆ ┆ '@timestamp': self.format_timestamp(record.created),
┆ ┆ '@message': record.getMessage(),
┆ ┆ 'log_level': record.levelname,
┆ ┆ 'log_file': record.filename,
┆ ┆ 'line_no': record.lineno,
┆ ┆ 'topic': self.topic
┆ }
┆ message.update(self.get_extra_fields(record))
return self.serialize(message)

logger = logging.getLogger('kafka')
kafka_handle = KafkaQueueHandler(produce)
log_format = QueueFormat('test')
kafka_handle.setFormatter(log_format)

logger.setLevel(logging.DEBUG)
logger.addHandler(kafka_handle)
logger.info('aaaa')

向kafka发送日志后,可以使用logstash的插件从kafka中读取日志,发送到es。

文章作者: TangLyan
文章链接: https://toheart.github.io/2019/06/25/python/logging-queue/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 荦彦的博客
打赏一下