keep running

  • Home

  • Tags

  • Archives

es学习系列之二:Aggregation collect mode and execution hint.md

Posted on 2020-08-16

如无特别说明,本文讨论的内容均基于 es 7.*

Term Aggregation的深度优先以及广度优先

Term aggregation 是我们常用的聚合查询,对于有 父子聚合 的场景下,理解其执行父聚合以及子聚合的时机对我们优化聚合查询有很大的帮助,collect mode就指定了它们的执行时机,共有两种模式,一种是depth first,另一种是breadth first。

depth first

一般来说,depth first适合大多数场景,因为大多数情况下需要聚合的字段不会是大量的唯一值。

  • 聚合过程
    • 1.先计算出一个桶,然后再根据这个桶计算出子聚合的结果,构建出一棵聚合树。然后不断重复前面的过程,直到所有的桶计算完毕。
    • 2.对步骤一计算出的结果进行排序,也就是对各个聚合树进行排序。
    • 3.根据过滤条件和 size 等参数修剪结果。
  • 适合场景
    • 大多数term都是重复的
    • 要求返回的aggs size比较大
    • 原因:不需要缓存父聚合的doc_id,直接聚合成一棵棵聚合树,每棵聚合树的每个节点数据结构为(value, doc_count),并且大多数的聚合树不会被修剪
  • 聚合计算下,多层聚合会让一个文档与其他文档产生关联,也就是说会形成一棵棵聚合树。深度优先就是先计算获得所有的聚合树,然后再进行后续处理。

breadth first

  • 聚合过程
    • 1.先计算出第一层聚合的结果。
    • 2.根据步骤一得出的结果进行排序。
    • 3.根据过滤条件和 size 等参数修整第一层节点。
    • 4.根据各个节点进行后续的子聚合计算。
  • 适合场景
    • 大多数term都是唯一的
    • 要求返回的aggs size 比较小
    • 原因:缓存父聚合的doc_id,聚合树第一层节点,即根节点,的数据结构为(value, doc_count, set(doc_id)),为了保证使用的缓存尽量小,则 avg(doc_count_per_bucket) * size(buckets) 尽量小
  • 对于基数大于请求的大小的字段或基数未知的字段(例如,数字字段或脚本),默认值为breadth_first
  • 当使用breadth_first模式时,属于最上层存储桶的文档集将被缓存以供后续重播,因此这样做会产生内存开销,该开销与匹配文档的数量成线性关系。
  • 使用广度优先设置时,仍可以使用order参数来引用子聚合中的数据。父级聚合知道需要先调用此子级聚合,然后再调用其他任何子级聚合。
    • 这里的主要意思是,如果父聚合的排序时候使用的是子聚合的结果,则会在执行父聚合前先执行该子聚合。

depth first vs. breadth first

  • 关键点:
    • 父子聚合的组合数的多少 + 返回buckets数量大小(分别对应计算的难度+使用率)
      • 父聚合字段的基数大,父子聚合的组合数多,需要返回的buckets数量小,适合广度优先
      • 父聚合字段的基数小,父子聚合的组合数少,需要返回的buckets数量大,适合深度优先
    • 比如:
      • 父聚合字段有10000,父子聚合的组合数约为 100000,需要返回buckets为5,则可能适合广度优先
      • 父聚合字段有10,父子聚合的组合数约为 100,需要返回buckets为100,则可能适合深度优先

Term Aggregation的Execution hint

global_ordinals

我们知道doc values存储的时候,会对应original value分配一个ordinal以减小磁盘的使用,也减小聚合过程中内存的使用。

  • 聚合过程
    • segment级别的聚合:
      • 在聚合字段的doc values数据结构中,完成 (doc_id, ordinal) -> (ordinal, set(doc_id)) 的聚合计算
    • shard级别的聚合:
      • 在shard内部多个segment之上构建 global ordinla map,其数据结构为 (segment_id, ordinal, global ordinal),当ordinal对应的初始值相同时,其对应的global ordinal也相同。
      • 根据 global ordinla map,完成 (ordinal, set(doc_id)) -> (global ordinal, set(doc_id) 的转换。
      • 根据 global ordinal 进行分桶,并根据doc count进行排序,选出前n个桶,完成 (global ordinal, set(doc_id) -> (global ordinal, doc count) 的聚合计算。
      • 根据 global ordinal map以及segment的doc values,把global ordinal替换成原始值,完成 (global ordinal, doc count) -> (segment_id, ordinal, doc count) -> (original value, doc count) 的转换。
    • index级别的聚合:
      • 在协调节点完成全局前n个结果的聚合。
  • global ordinals 的有效性
    • 因为global ordinals为shard上的所有segment提供了统一的map,所以当新的segment变为可见时(常见为refresh的时候),还需要完全重建它们。所以,global ordinals 更加适用于 历史数据。

map

相对而言map更加简单,主要的不同点在于shard级别聚合的时候不再构建global ordinal map,而是直接返回original value到shard

  • 聚合过程
    • segment级别的聚合:
      • 在聚合字段的doc values数据结构中,完成 (doc_id, ordinal) -> (ordinal, set(doc_id)) 的聚合计算
      • 把ordinal替换成初始值,完成 (ordinal, set(doc_id)) -> (original, set(doc_id)) 的转换
    • shard级别的聚合:
      • 根据 original value进行分桶,并根据doc count进行排序,选出前n个桶,完成 (original, set(doc_id) -> (original, doc count) 的聚合计算。
    • index级别的聚合:
      • 在协调节点完成全局前n个结果的聚合。

global_ordinals vs. map

  • 适合global_ordinals模式
    • 聚合字段基数不大
    • refresh 间隔比较大
    • 不再写入的索引,比如历史索引
    • 开启 eager_global_ordinals 配置,在写入索引时就构建global ordinals map。但是可能会对写入索引的速度有影响
  • 适合map模式
    • 聚合字段基数很大
    • 需要注意可能会引起更大的内存消耗量
Read more »

python2中format和%拼接字符串的异同

Posted on 2020-08-08

基础知识

相信python2的编码问题大多数开发同学都遇到过,在出现非 ascii 编码字符时,就很容易编码异常的问题。python2的字符编码分为 str 以及 unicode,具体情况这里不再敖述,只会总结字符串拼接时应该注意的问题以及可能遇到的坑点。

以下几点常识是下面进一步讨论问题的基础:

  • str转为unicode的过程,称为解码,即 decode。
  • unicode转为str,称为编码,即 encode。
  • 使用%把str和unicode拼接,会自动隐式地把str转为unicode后,再进行拼接。(如果是fomat拼接呢?这里留个悬念,答案稍后揭晓)
  • 当导入future包的unicode_literals特性时,python定义的字符都是unicode,而不是默认的str。这个也是为了让python2能够导入python3的特性,因为在python3中的str都是unicode。

% 拼接字符串

我们首先看看pyhon2中的使用%字符串拼接情况。从第一组结果来看,我们可以看到只要格式化串和字符串参数其中一个为unicode,最终结果就为unicode,这个和上面讲的第三点一致。在对str和unicode拼接的时候,会自动把str转为unicode,如第二组的中间两个结果。

但是我们需要注意编码的问题,如第2个结果,由于”中文”是非 acsii 编码,而且python解释器不知道其类型,会用ascii编码对其进行解码,相当于 u"%s" % ("中文").decode("ascii"),而ascii不认识非 0~127 编码所以就报错,当然我们可以手动指定用”utf-8”进行解码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
>>> type("%s" % ("hello"))
<type 'str'>
>>> type(u"%s" % ("hello"))
<type 'unicode'>
>>> type("%s" % (u"hello"))
<type 'unicode'>
>>> type(u"%s" % (u"hello"))
<type 'unicode'>
>>>
>>> type("%s" % ("中文"))
<type 'str'>
>>> type(u"%s" % ("中文")) # 最终结果为unicode,会隐式地通过ascii编码把"中文"解码为unicode
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
UnicodeDecodeError: 'ascii' codec can't decode byte 0xe4 in position 0: ordinal not in range(128)
>>> type(u"%s" % ("中文".decode("utf-8")))
<type 'unicode'>
>>> type("%s" % (u"中文"))
<type 'unicode'>
>>> type(u"%s" % (u"中文"))
<type 'unicode'>

format 拼接字符串

同样的,我们先看下面的第一组结果,是不是有点吃惊?第一组的第二结果不是unicode类型,而是str类型,这个跟%是不同的。很显然从结果上看,我们知道对于format其拼接结果类型取决于其格式化串的类型,而与参数没有任何关系。

理解的第一组数据的规律后,再看第二组就知道为什么有的情况会报异常了。第二组的第二个结果,由于最终结果为str,pyhton解释器会默认用 ascii 对 u"中文" 进行编码,而第三个结果,由于最终结果为unicode,python解释器会默认用 ascii 对应 “中文” 进行解码,而报错的理由和前面的情况一致,都是因为ascii不认识非 0~127 编码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
>>> type("{}".format("hello"))
<type 'str'>
>>> type("{}".format(u"hello"))
<type 'str'>
>>> type(u"{}".format("hello"))
<type 'unicode'>
>>> type(u"{}".format(u"hello"))
<type 'unicode'>
>>>
>>> type("{}".format("中文"))
<type 'str'>
>>> type("{}".format(u"中文")) # 最终结果为str,会隐式地通过ascii编码把u"中文"编码为ascii
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
UnicodeEncodeError: 'ascii' codec can't encode characters in position 0-1: ordinal not in range(128)
>>> type("{}".format(u"中文".encode("utf-8")))
<type 'str'>
>>> type(u"{}".format("中文")) # 最终结果为unicode,会隐式地通过ascii编码把"中文"解码为unicdoe
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
UnicodeDecodeError: 'ascii' codec can't decode byte 0xe4 in position 0: ordinal not in range(128)
>>> type(u"{}".format("中文".decode("utf-8")))
<type 'unicode'>
>>> type(u"{}".format(u"中文"))
<type 'unicode'>

坑点

而我们线上的问题,要比以上两种都要隐蔽,大致如下:

代码目录结构

1
2
3
4
5
6
7
 tree -L 2 -I "*.pyc"
.
├── test_module_002
│   ├── __init__.py
│   ├── __pycache__
│   ├── main.py
│   └── module_a.py
Read more »

flink学习系列之一: taskmanager, slot与parallelism

Posted on 2020-07-12

如无特别说明,本文讨论的内容均基于 flink 1.7.1

最近一段时间用 flink 写一些 etl 作业,做数据的收集清洗入库,也遇到一些性能问题需要进一步解决,于是计划学习部分flink底层知识。第一篇,跟以前学习spark一样,从flink的并行度说起。

flink作业的启动模式

通过 flink YARN Setup 文档我们能够了解到,flink的启动方式大致有两种,
一种是先分配jobmanager、taskmanager的资源,等待后续提交作业,另一种是在提交的时候申请资源并运行。下面将简单介绍一下这两种启动方式的区别,并着重关注其并行度的计算,最后和spark并行度的计算对对比。

部署方式一:在yarn中启动一个flink session,提交job到该session

  • 启动flink session
    • ./bin/yarn-session.sh -tm 8192 -s 32
    • 关键配置:
      • -n,指定 container 数量(即taskmanager的数量,不过已经不建议使用,对应的源码
      • -tm,分配 taskmanager 内存大小
      • -jm,分配 jobmanager 内存大小
      • -s,每个taskmanager分配slot个数(如果配置了将会覆盖yarn的 parallelism.default 配置,parallelism.default 值默认为1)
      • -Dyarn.containers.vcores,在yarn中分配的vcore个数,默认和slot个数一致,即一个slot一个vcore
      • 默认 taskmanager 的数量为1,并行度为 slot * taskmanager ,源码
    • 一旦 flink session在yarn中启动成功,将会展示有关 jobmanager 连接的详细信息,通过CTRL+C 或者 在client中输入stop关闭 flink session
  • 提交job到该session
    • ./bin/flink run ./examples/batch/WordCount.jar
    • 关键配置:
      • -c,指定入口class
      • -m,指定jobmanager地址
      • -p,指定作业的并行度
    • client能够自动识别对应的 jobmanager 地址
    • 并行度的确定:
      • 如果不指定 -p ,则作业并行度为 1 (parallelism.default 的配置值,默认为1)
      • 如果指定-p,则作业则在该session下,以 -p 指定值的并行度运行。如果作业的并行度大于session的并行度,则会报异常,作业启动失败。

部署方式二:在yarn中启动一个单独的作业

  • ./bin/flink run -m yarn-cluster ./examples/batch/WordCount.jar
  • flink session的配置同样适用于启动单独的作业,需要加前缀 y 或者 yarn
  • 关键配置:
    • -n ,允许加载savepoint失败时启动程序
    • -d,client非阻塞模式启动作业
    • -p,指定作业并行度
    • -ytm,分配 taskmanager 内存大小
    • -yjm,分配 jobmanager 内存大小
    • -ys,指定每个taskmanager分配slot个数
    • -yn,指定container数量,和taskmanager数量一致
  • 并行度的确定
    • 如果指定了-m yarn-cluster,并且是 -d 或者 -yd 模式,不通过 -yid 指定 applicationid,则其并行度由 -p 决定。
    • flink会启动多少个taskmanager?我们知道flink作业的实际并行度是由 taskmanager * slot 决定的,默认情况下每个taskmanager的slot数量为1,所以yarn最终为了实现并行度为 -p 的作业,需要启动p个taskmanager。num( taskmanenger ) = p / slot

spark on yarn vs. flink on yarn

spark相关的executor以及并行的计算见 Spark学习系列之一和之二

  • executor vs. taskmanager
    • spark submit 通过 –num-executors 控制executor数量
    • flink run 通过 -p 和 -ys 控制taskmanager数量

另外spark on standalone模式下,其executor数量的计算方式和flink run差不多,它也是通过总的核数和每个executor核数反算所需的executor数目,可以把 total-executor-cores 类比 -p,executor-cores 类比 -ys)

Read more »

es学习系列之一:Rollover Index VS. Index Lifecycle Management

Posted on 2020-05-17 Edited on 2020-08-08

如无特别说明,本文讨论的内容均基于 es 7.*

es的Rollover索引

es的Rollover索引通常指的是一个别名指向某个索引,并且能够在索引的某些条件下进行轮转,如索引的创建时间长短、大小、文档数量。

如创建一个名为 nginx_log-000001 的索引,并指定其alias为nginx_log_write,并且我们对nginx_log_write写入3个文档(其实也是对nginx_log-000001写)。然后对别名调用rollover接口,
由于已经达到文档数目为3的条件,则会自动生成 nginx_log-000002 的索引。这时对nginx_log_write写入会自动写入到nginx_log-000002索引中。

需要注意的是,由于对索引设置alias的时候,没有添加 "is_write_index": true 配置,则在执行rollover并创建新索引成功后,将会只指向一个索引(新索引),对nginx_log_write查询只能查到最新索引的数据,而不能查到历史数据。相反,如果配置了"is_write_index": true,在rollover后alias会同时指向多个索引,并且最新索引设置为"is_write_index": true,旧索引设置为"is_write_index": false,对alias的
写入就是对最新索引的写入,查询时是对所有索引进行查询。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# 创建索引nginx_log-000001,并设置其别名为nginx_log_write
PUT /nginx_log-000001
{
"aliases": {
"nginx_log_write": {
}
}
}

# 对别名写入文档,重复执行3次
POST nginx_log_write/_doc
{
"log":"something before rollover"
}

# 对别名执行rollover
POST /nginx_log_write/_rollover
{
"conditions": {
"max_age": "1d",
"max_docs": 3,
"max_size": "5gb"
}
}

# 对新索引插入新数据
POST nginx_log_write/_doc
{
"log":"something after rollover"
}

# 分别查 nginx_log-000001、nginx_log-000002、nginx_log_write,对ginx_log_write只能查到最新索引的数据
POST nginx_log_write/_search
{
"query":{
"match_all": {
}
}
}

# 对非 "is_write_index": true 模式的索引,可用 index_name-* 查询所有数据
POST nginx_log-*/_search
{
"query":{
"match_all": {
}
}
}

另外,我们可以利用 Date Math 创建带日期的rollover索引,更加方便索引管理。

1
2
3
4
5
6
7
8
# PUT /<nginx_log-{now/d}-000001>,将创建名为 nginx_log-2020.05.17-000001 的索引
PUT /%3Cnginx_log-%7Bnow%2Fd%7D-000001%3E
{
"aliases": {
"nginx_log_write": {
}
}
}

需要注意的是 _rollover api只会对调用该接口的那个时刻有效,当然可以自己独立做一个任务周期性扫描所有别名,当别名到达一定条件后就调用其 _rollover 接口。如果需要es自身定时调用的话,可以使用自动化程度更高的 Index Lifecycle Management。

Index Lifecycle Management

与 _rollover 索引相比,索引生命周期管理会更加自动化,ILM把索引的生命周期分为4个phase,分别为Hot、Warm、Cold、Delete。每个phase可以包含多个action。

action 允许该action的phase action意义
Rollover hot 和 rollover 索引的条件一致
Read-Only warm 通过"index.blocks.write": false 把原索引会被设置只读
Allocation warm, cold 移动索引时指定的亲和性规则,包括include, exclude, require。同时还可以通过 number_of_replicas 变更副本数量,比如指定为0。
Shrink warm 合并shard,创建 shrink-${origin_index_name},前提是需要把原索引的shard移动到同一个node上,需要留意node是否有足够的容量。并且会通过"index.blocks.write": false 把原索引会被设置只读,并最终删除原索引。
Force Merge warm 合并segmemt。和shrink一样,会通过"index.blocks.write": false 把原索引会被设置只读
Freeze cold 冻结索引。适用于很少查询的旧索引,es通过冻结索引能够减少堆内存的使用
Delete delete 删除索引
Set Priority hot, warm, cold 重启时,恢复索引的优先度,值越大越优先恢复
Unfollow hot,warm,cold 应该是中间状态
Read more »

Spark学习系列之三:join的宽依赖vs.窄依赖

Posted on 2020-01-02 Edited on 2020-05-17

如无特别说明,本文源码版本为 spark 2.3.4
两个rdd join时产生新的rdd,是宽依赖,还是窄依赖?

join transformation

narrow_wide_dependency.png

以上图片是个经常用来解释宽窄依赖的经典图,来源于论文<<Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing>>。以下这段话也来自与该论文:

join: Joining two RDDs may lead to either two nar- row dependencies (if they are both hash/range partitioned with the same partitioner), two wide dependencies, or a mix (if one parent has a partitioner and one does not). In either case, the output RDD has a partitioner (either one inherited from the parents or a default hash partitioner)

或许我们会好奇,为什么同样是join操作,有时是宽依赖,有时窄依赖?我们先从两个简单的实验开始,再从源码看其实现方式。

rdd1和rdd2的partitioner不同

假设我们有rdd1和rdd2,其partitioner分别为partitioner1、partitioner2。分区器定义如下:

partitioner1:

1
2
numPartiton = 3
func = x mod numPartiton

partitioner2:

1
2
numPartiton = 5
func = (x * 3) mod numPartiton

rdd1的初始分布如下:

1
2
3
partition0: (0, "a"), (3, "e")
partition1: (1, "b")
partition2: (2, "c")

rdd2的初始分布如下:

1
2
3
4
5
partition0: (0, "e"), (0, "j")
partition1: (2, "f")
partition2: (4, "g")
partition3: (1, "h"), (6, "k")
partition4: (3, "i")

rdd3=rdd2.join(rdd1),rdd3数据分布如下:

1
2
3
4
5
partition0: (0, ("e", "a")), (0, ("j", "a"))
partition1: (2, ("f", "c"))
partition2:
partition3: (1, ("h", "b"))
partition4: (3, ("i", "e"))

rdd3和rdd1以及rdd2的parittion之间的依赖关系如下:

1
2
3
4
5
6
7
8
9
rdd1.partition0 ==> rdd3.partition0, rdd3.partition4 
rdd1.partition1 ==> rdd3.partition3
rdd1.partition2 ==> rdd3.partition2

rdd2.partition0 ==> rdd3.partition0
rdd2.partition1 ==> rdd3.partition1
rdd2.partition2 ==> rdd3.partition2
rdd2.partition3 ==> rdd3.partition3
rdd2.partition4 ==> rdd3.partition4

可以看到rdd1的parittion0 同时被rdd3的partition0和partition4依赖,父rdd的一个parittion被子rdd多个parittion依赖,所以此时rdd3对rdd1的依赖为宽依赖,而对rdd2为窄依赖。

Read more »

使用dbutils作为pymysql的连接池时,setsession偶尔失效的问题

Posted on 2019-12-23 Edited on 2020-05-17

版本情况dbutils:1.1; pymysql:0.9.3; python:2.7.13

线上情景

最近线上维护时,由于只需要更改数据库配置,所以就重启了数据库,而python应用没有重启。在重启数据库后,日志显示正常,也能成功入库。后来接到反馈表示有部分数据没有入库,紧急重启python应用,后续数据入库正常。而我则负责找出原因以及修复bug的工作。

调研原因

在排查完其他问题后,最异常的是对于有部分请求,日志显示处理成功了,但是却没入库,排查了好几天找不到原因。为此写了demo来帮助排查,为了可以自动commit,采用的是setsession=[“set autocommit=1”]方式设置每个底层的连接为自动提交。在测试demo期间,数据库重启后之后的sql就无法入库。demo代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# -*- coding: utf-8 -*-

import pymysql
import time
import traceback
from DBUtils.PooledDB import PooledDB
from pymysql import MySQLError

pymysql.install_as_MySQLdb()
con = None
pooledDB = None

def try_insert():
i = 0
while i < 60:
print "============ {0} ============".format(i)
# 除了第一次从库中拿,不用ping,直接接初始化链接
# 后面如果有cache connection,则从cache中并且进行ping,如果失败则用_create()重新初始化connection
# con 类型为 PooledDedicatedDBConnection
# con._con 类型为 SteadyDBConnection
# con._con._con 类型为 pymysql中的Connection类型
# con._con._con._sock 类型为 mysql 连接
con = pooledDB.connection()
# con._con._con.autocommit(True)
print "con._con id = {0}".format(id(con._con))
print "con._con._con id = {0}".format(id(con._con._con))
print "con._con._con._sock id = {0}".format(id(con._con._con._sock))
try:
cursor = con.cursor(pymysql.cursors.DictCursor)
if not cursor:
print "cursor is {0}".format(cursor)
select_sql = "insert into user2(name,age) values('zhang', 20)"
ret_rows = cursor.execute(select_sql)
print cursor._last_executed
print "ret_rows is {0}".format(ret_rows)

except MySQLError as e:
print "MySQLError error: {0}".format(e)
print traceback.format_exc()
except Exception as e:
print "Exception error: {0}".format(e)
print traceback.format_exc()

i = i + 1
time.sleep(1)
con.close()


if __name__ == "__main__":
db_conf = {'user':'root','passwd':'zhang','host':'127.0.0.1','port':3306,'connect_timeout':5,'db':'test_dbutils'}
# db_conf = {'user':'root','passwd':'zhang','host':'127.0.0.1','port':3306,'connect_timeout':5,'db':'test_dbutils',"autocommit":True}


pooledDB = PooledDB(
creator=pymysql, # 使用数据库连接的模块
maxconnections=4, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=0, # 初始化时,连接池中至少创建的空闲的链接,0表示不创建
maxcached=0, # 连接池中最多闲置的链接,0和None不限制
maxshared=0, # 连接池中最多共享的链接数量,0表示不共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,此值只有在creator.threadsafety > 1时设置才有效,否则创建的都是dedicated connection,即此连接是线程专用的。
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=None, # 一个连接最多被重复使用的次数,None表示无限制
setsession=["set autocommit=1"], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."];务必要设置autocommit,否则可能导致该session的sql未提交
ping=1, # 每次从pool中取连接时ping一次检查可用性
reset=False, # 每次将连接放回pool时,将未提交的内容回滚;False时只对事务操作进行回滚
**db_conf
)

try_insert()
Read more »

Spark学习系列之二:rdd分区数量分析

Posted on 2019-12-22 Edited on 2020-05-17

如无特别说明,本文源码版本为 spark 2.3.4

创建rdd有三种方式,一种是通过SparkContext.textFile()访问外部存储创建,一种是通过输入数据集合通过调用 SparkContext.parallelize() 方法来创建,最后一种是通过转换已有的rdd生成新的rdd。

通过parallelize创建rdd的分区数量分析

通过parallelize的方式比较简单,相信也是大部分初学者第一次接触创建rdd的方法,那么通过这个方法创建的rdd的默认分区数是多少呢?我们通过源码进行分析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package org.apache.spark.SparkContext

class SparkContext(config: SparkConf) extends Logging {
/** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */
def defaultParallelism: Int = {
assertNotStopped()
taskScheduler.defaultParallelism
}

def parallelize[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
assertNotStopped()
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
}

我们先看看parallelize是如何生成rdd的。可以看到它是通过 ParallelCollectionRDD 类创建一个rdd,其内部返回的partitioner是通过ParallelCollectionRDD伴生对象的slice方法分割seq为一个二维的Seq[Seq[T]],并把这个二维的序列传递到ParallelCollectionPartition中实例化的。

接下来是关键,defaultParallelism的默认值确定了分区的数量。

Read more »

Spark学习系列之一:新手常见问题

Posted on 2019-12-16 Edited on 2020-05-17

如无特别说明,本文源码版本为 spark 2.3.4

学习spark有一段时间了,最近想动动手写个demo出来,大致的功能是从kafka读取用户点击记录,用spark streaming对这些数据进行读取并统计用户一段时间的点击记录,期望最后能落盘到redis中供需求方调用。

这个demo看似简单,但是作为一个新手,我也遇到了一些看起来比较奇怪的问题。再此总结一下我遇到的一些问题,希望能给遇到同样问题的人带来一些帮助。

问题一:spark的并行度是多少?

我相信一开始接触的初学者对此肯定有疑惑,并行度指的什么?我认为在spark中,这个并行度指的是partition的数量,无论是通过parallelize初始化rdd,还是通过join和reduceByKey等shuffle操作,都意味着需要确定这个新rdd的paritition数量。这里涉及到一个参数spark.default.parallelism,该参数大多数情况下是parallelize、join、reducdeByKey等操作的默认并行度。如果不定义这个参数,默认情况下分区数量在不同情景的情况下有所不同:

  • 对于join和reduceByKey等shuffle操作,分区数一般为多个父rdd中partition数目最大的一个。
  • 对于parallelize进行初始化操作,分区数在不同部署模式下不同:
    • local[*]:本地cpu的core数量,local[N]则为N,local则为1
    • meos:默认为8
    • other:一般为executor个数 * 每个executor的core个数
  • 当然如果定义了spark.default.parallelism参数,其默认分区数也不一定是其值,具体分析见Spark学习系列之二:rdd分区数量分析。实际api中也能通过传递numPartitions参数覆盖spark.default.parallelism,自行决定并行度。
  • 比如正在使用的mac是四核,假设向yarn申请executor个数为2,每个executor的core数量为1,那么spark.default.parallelism的值为2,这时一般情况下是不能充分利用其申请核数资源的,最好是申请核数的2~3倍。可以通过 –conf 传入参数 --conf spark.default.parallelism = 4 或者 --conf spark.default.parallelism = 6,使其默认值为申请核数的2~3倍。如果有的task执行比较快,core就空闲出来了,为了多利用core就设置task数量为2~3倍。当然最后的并行度还需要根据实际情况进行分析。

如何确定本机核数?通过local[*]模式进行parallelize初始化rdd,再输出myrdd.partitions.size即可得,也可以通过java代码Runtime.getRuntime.availableProcessors()获得

参考:
https://spark.apache.org/docs/latest/configuration.html
http://spark.apache.org/docs/latest/tuning.html

问题二:standalone模式下,executor个数和executor核数如何确定?

由于需要通过spark streaming读取kafka,如果对应topic的partition数量已知,那么应该启动对应个数的executor,因为kafka的一个parition同一时间只允许同一个groupid的consumer读取,如果topic的partition为1,申请的executor为2,那么将只有一个executor的资源得到了利用。

既然executor个数比较重要,yarn模式可以通过--num-executors确定executor个数,那standalone模式如何确定的呢?直接先说结论:

  • executor的数量 = total-executor-cores/executor-cores
  • --total-executor-cores 对应配置 spark.cores.max (default: spark.deploy.defaultCores),表示一个application最大能用的core数目;如果没有设置则默认上限为spark.deploy.defaultCores,该配置的值默认为infinite(不限)
  • --executor-cores 对应配置 spark.executor.cores,表示每个executor的core数目
  • 可以看到standalone的executor数量并不能直接指定,而是通过core的换算得到的,如果对executor数目有要求的话,可以额外关注一下。

以下是我写demo过程遇到问题,以及解决问题的大致流程。

在写demo过程中通过spark-sumbit提交任务时,忘了写master,但是通过--executor-cores指定了每个executor的core数量。等应用跑起来后,发现spark ui上,发现worker上有1个executor,每个executor4个core,这显然不符合的预期。明明通过--executor-cores指定了executor的core数量,为什么申请到的core数目不符合预期?即使spark-submit的script中没包含master,但是程序是指定了master(spark://zhangqilongdeMacBook-Air.local:7077)。我决定进行多次调整参数,验证每种情况下申请到executor数量和每个executor的core数量,总结如下:

  • master和executor-cores,只配置一个或者两个都不配,则只申请一个executor,并且executor将尽量使用worker的所有core。
  • master和executor-cores两个都配,则申请的executor数量 = workder core的总数/executor-cores,每个executor的core数量和executor-cores一致。

通过源码可以发现:

  • --executor-cores只有在–master为standalone、yarn、kubernetes模式下才会生效,如果不是这些模式,将会通过默认配置文件指定缺失的值。即如果不指定master的情况下(默认为local[*]),--executor-cores并不会生效,并且使用 SPAKR_HOME/conf/spark-defaults.conf配置文件中的值对其赋值,如果该配置文件中依然不存在,则为spark系统默认对该变量的值,即infinite(不限)。
  • --total-executor-cores可以配置standalone每个application可以用的核总数(其实通过spark-submit命令行的提示也能看出来,因为yarn模式下该值不可配所以一开始这个配置被我忽略了)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
org.apache.spark.deploy.Submit

...省略部分代码
//可以看到spark.executor.cores只在某些情况下才会被赋值
OptionAssigner(args.executorCores, STANDALONE | YARN | KUBERNETES, ALL_DEPLOY_MODES, confKey = "spark.executor.cores"),
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES, confKey = "spark.cores.max")

...省略部分代码
// Load any properties specified through --conf and the default properties file
// 通过sparkProperties(已经读取了spark-defaluts.conf内动)hashMap对缺失配置进行填充。
for ((k, v) <- args.sparkProperties) {
sparkConf.setIfMissing(k, v)
}

...省略部分代码

参考:
https://spark.apache.org/docs/latest/spark-standalone.html

Read more »

golang数据库连接broken pipe异常原因分析及解决

Posted on 2019-11-10 Edited on 2020-05-17

在golang开发中,在使用mysql数据库时一般使用数据库驱动包为 go-sql-driver/mysql,该包是按照go官方包database/sql定义规范实现的。我们线上的程序偶尔会在标准错误输出 “broken pip”,为了究其原因做了些调研,并给出了解决方法。

线上场景

目前 项目A 使用的go-sql-driver/mysql版本为 3654d25ec346ee8ce71a68431025458d52a38ac0 , 项目B 使用的版本为 v1.3.0 ,其中 项目A 的版本低于v1.3.0。它们线上标准错误输出都有类似以下的日志,但是程序的业务逻辑却没有影响。

1
2
[mysql] 2019/08/01 17:12:18 packets.go:33: unexpected EOF
[mysql] 2019/08/01 17:12:18 packets.go:130: write tcp 127.0.0.1:59722->127.0.0.1:3306: write: broken pipe

通过日志输出以及堆栈可以找到 go-sql-driver/mysql/packets.go 对应的源码,可以发现第一条日志是以下第8行代码打印,第二条是第9行调用mc.Close()关闭连接时报错。

1
2
3
4
5
6
7
8
9
10
11
12
13
// Read packet to buffer 'data'
func (mc *mysqlConn) readPacket() ([]byte, error) {
var prevData []byte
for {
// read packet header
data, err := mc.buf.readNext(4)
if err != nil {
errLog.Print(err)
mc.Close()
return nil, driver.ErrBadConn
}
// 省略部分代码
}

问题复现

通过网上搜索能够大概猜出是mysql server主动关闭的原因,我们可以通过设置mysql server主动关闭连接来复现线上场景,并且通过tcpdump观察其原因。

1.设置mysql server主动关闭连接时间

mysql server默认设置的关闭不活跃连接时间为28800秒(8小时),我们通过 set global wait_time=10 设置为10秒,便于问题重现。

2.运行tcpdump和测试demo

1.通过tcpdump可以收集tcp数据包的发送接收情况,尤其是的在mysql server关闭连接后,go程序如何和mysql server交互是我们关注的重点,tcpdump命令如下:

sudo tcpdump -s 0 -t -i lo -l port 3306 -w lo.cap

2.运行一个简单的测试demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package main
import (
"database/sql"
"log"
"time"
_ "github.com/go-sql-driver/mysql"
)
func main() {
// before you run this test program, please run the script in your mysql
// "set global wait_timeout=10;"
// 表示mysql server关闭不活跃连接的等待时间
// 参考 https://github.com/go-sql-driver/mysql/issues/657
db, err := sql.Open("mysql", "root:zhang@tcp(127.0.0.1:3306)/?charset=latin1&autocommit=1&parseTime=true&loc=Local&timeout=3s")
if err != nil {
log.Fatal(err)
}
defer db.Close()
//db.SetConnMaxLifetime(5 * time.Second)
err = db.Ping()
if err != nil {
log.Fatal(err)
}
go func() {
for {
_, err := db.Exec("select * from test_time.A")
if err != nil {
log.Fatal(err)
}
// Wait for 11 seconds. This should be enough to timeout the conn, since `wait_timeout` is 10s
time.Sleep(11 * time.Second)
}
}()
time.Sleep(1000 * time.Second)
}
Read more »

浅谈golang对mysql时间类型数据转换的问题

Posted on 2019-11-09 Edited on 2020-05-17

部门某些业务需要在海外上线,涉及到数据库时区、应用时区的转换。本文将讨论golang针对数据库时区的处理问题。

为了方便讨论,避免混淆,本文对“时间”的表达方式作出约定:时间=时区时间+时区。如时间 2019-05-21 15:48:38 CST ,则其时区时间为2019-05-21 15:48:38,时区为CST。如果没有特别说明,本文提到的“时间”都包含时区。

一、golang中mysql数据库驱动的时区配置

mysql中关于时间日期的概念数据模型有DATE、DATETIME、TIMESTAMP,golang程序根据数据链接DSN(Data Source Name)配置,数据库驱动 github.com/go-sql-driver/mysql 可以对这三种类型的值转换成go中的time.Time类型,关键配置如下:

  • parseTime
    • 默认为false,把mysql中的 DATE、DATETIME、TIMESTAMP 转为golang中的[]byte类型
    • 设置为true,将会转为golang中的 time.Time 类型
  • loc
    • 默认为UTC,表示转换DATE、DATETIME、TIMESTAMP 为 time.Time 时所使用的时区
    • 设置成Local,则与系统设置的时区一致
    • 如果想要设置成中国时区可以设置成 Asia/Shanghai ,更多的时区可以参考 /usr/share/zoneinfo/ 或者$GOROOT/lib/time/zoneinfo.zip。

在实际的使用中,我们往往会配置成 parseTime=true 和 loc=Local,这样避免了手动转换DATE、DATETIME、TIMESTAMP。

二、golang如何转换mysql的时间类型

在涉及到不同时区时,我们golang程序应该怎么处理mysql的 DATE、DATETIME、TIMESTAMP 数据类型?是否只要配置了parseTime=true&loc=xxx就不会有问题?我们来做两个小实验。

实验一:应用和数据库在同一时区

1.timestamp

a.系统时区设置为CST,mysql和golang在同一个时区的机器上。(如何设置和查看时区可以参考本文第五节内容。)

  • golang在程序中连接数据库使用的配置DSN是parseTime=true&loc=xxx,xxx分别为UTC、Asia/Shanghai、Europe/London、Local。
  • mysql终端中insert一条timestamp【时区时间】为2019-04-02 13:18:17的记录,其UNIX_TIMESTAMP(timestamp)=1554182297。

以下1~5行均为golang程序读取刚插入数据库的数据结果,第一列输出分别为链接数据库DSN配置,第二列为转换为time.Time后的输出。

1
2
3
4
parseTime=true&loc=UTC:                  2019-04-02 13:18:17 +0000 UTC
parseTime=true&loc=Asia/Shanghai: 2019-04-02 13:18:17 +0800 CST
parseTime=true&loc=Europe/London: 2019-04-02 13:18:17 +0100 BST
parseTime=true&loc=Local: 2019-04-02 13:18:17 +0800 CST

b.同样的机器,修改系统时区为BST,在mysql终端中select上一步插入的数据,timestamp【时区时间】为2019-04-02 06:18:17,UNIX_TIMESTAMP(timestamp)=1554182297。程序输出为:

1
2
3
4
parseTime=true&loc=UTC:                  2019-04-02 06:18:17 +0000 UTC
parseTime=true&loc=Asia/Shanghai: 2019-04-02 06:18:17 +0800 CST
parseTime=true&loc=Europe/London: 2019-04-02 06:18:17 +0100 BST
parseTime=true&loc=Local: 2019-04-02 06:18:17 +0100 BST

c.小结:

  • UNIX_TIMESTAMP可以把mysql的timstamp转为距离 1970-01-01 00:00:00 UTC 的秒数,这个经过转换后的值无论mysql在任何时区都不会变。
  • 即使同一条数据库记录,由于时区不同,mysql终端中直接select出的timestamp的【时区时间】也不同。也侧面说明了mysql内部实现的timstamp结构体中包含了时区信息,在输出时根据当前时区做转换,输出当前【时区时间】。
  • golang程序获取到的time.Time等于:mysql【时区时间】+ 时区,时区为loc指定的时区,与mysql时区没有关系。
Read more »
12

Long

20 posts
15 tags
GitHub E-Mail
© 2018 – 2020 Long
Powered by Hexo v3.9.0
|
Theme – NexT.Gemini v7.2.0