cx_Oracle订阅,SessionPool使用

2017-07-06

版权声明:本文为作者原创文章,可以随意转载,但必须在明确位置表明出处!!!

本文主要记录一下cx_Oracle中的订阅功能,上一遍文章已经介绍了如何在python中搭建cx_Oracle和所需要的依赖。

连接oracle数据库

要操作数据库,首先就是要连接数据库,cx_Oracle提供了两个连接接口,一个是Connection,一个是connect两个连接都返回一个连接对象,两个连接接口定义如下:

cx_Oracle.Connection([user, password, dsn, mode, handle, pool, threaded, events, cclass, purity, newpassword,encoding, nencoding, edition, appcontext, tag, matchanytag ])

cx_Oracle.connect([user, password, dsn, mode, handle, pool, threaded, events, cclass, purity, newpassword,encoding, nencoding, edition, appcontext, tag, matchanytag ])

  • dsn: 数据源名称,可以从makedsn函数返回,如果只传一个参数,那么字符格式为user/password@dsn
  • mode: 如果这个参数被指定了,那么这个参数一定是SYSDBA, SYSASM 或 SYSOPER,否则它默认使用正常连接模式
  • handle: 如果handle参数被指定,那么它的类型必须是OCISvcCtx*,并且它仅仅被使用在已经建立连接的python应用程序中,
  • pool: 该参数期望是一个session pool对象,使用了此参数,需要调用pool.acquire()才从连接池中获取连接对象
  • threaded: 该参数期望是一个布尔表达式,表示Oracle是否应该包含对互斥体的连接访问。 在单线程应用程序中执行此操作会带来性能降低10-15%,这就是该参数默认设置为False。
  • events: 该参数期望是一个布尔表达式,它表示是否需要初始化Oracle事件模式
  • cclass: 该参数期望是一个字符串,是被定义为驻留在数据库中的连接池
  • purity: 该参数期望是ATTR_PURITY_NEW, ATTR_PURITY_SELF, 或ATTR_PURITY_DEFAULT中的一个
  • newpassword: 该参数期望是一个字符串,如果该参数被指定,那么在连接过程中是用设置的登录密码
  • encoding: 该参数期望是一个字符串,如果该参数被指定且被设置,那么该编码被设置为数据库的常规字符串
  • nencoding: 该参数期望是一个字符串,如果指定,并将国家编码设置为用于国家字符集数据库字符串。
  • edition: 该参数期望是一个字符串,如果被指定且被设置,那么它被用于session版本,它只在客户端和服务端版本至少是Oracle Database 11.2中有效
  • appcontext: 该参数期望是一个3元组的列表,如果被指定了,那么它将设置应用上下文给数据库连接。应用程序上下文通过使用sys_context()PL / SQL方法在数据库中可用,并且可以在登录触发器以及任何其他PL / SQL过程中使用。 列表中的每个条目都将包含三个字符串:命名空间,名称和值。
  • tag: 该参数期望是一个字符串,如果被指定它将限制可从中返回的会话会话池,除非matchanytag参数设置为True。在这种情况下与指定标签的会话将优于其他人,但是如果没有这样的会话可用,则可以返回具有不同标签的会话代替。 在任何情况下,如果没有指定标签的会话可用,将始终返回未标记的会话。 会话被释放回池时被标记。

操作数据库

操作数据库一定会用到游标,游标的接口定义如下
cx_Oracle.Cursor(connection)
构造一个游标,返回一个游标对象,参数connection是Connection,connect接口返回的连接对象

执行数据库操作

执行数据库操作有三个接口函数,分别是execute,executemany,executemanyprepared,三个接口定义如下
Cursor.execute(statement[, parameters ], **keywordParameters)
参数可以是一个字段,序列,关键字,如果参数是一个字典,它将和一个name(名称)绑定,如果是一个序列,它将从左到右按位置绑定,不建议使用位置绑定,推荐使用参数绑定。
Example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# @author yj xie
# @E-mail xie_youjiang@163.com
# @time 2017/7/7 0006 9:08
# @project Python
# @file connection
import cx_Oracle
import pdb
# 构建数据源
dsn = cx_Oracle.makedsn('loaclhost', '1521', 'tomp').replace('SID', 'SERVICE_NAME')
# 获取数据库连接
conn = cx_Oracle.connect('username', 'password', dsn)
# 获取游标
cursor = conn.cursor()
# 使用序列参数执行数据库操作
cursor.execute('insert into test(name, age) values(:1, :2)', ('gavin', '20'))
# 使用字典参数执行数据库操作
cursor.execute('insert into test(name, age) values(:name, :age)', {'name':'kevin', 'age':'30'})
# cursor.execute('insert into test(name, age) values(:name, :age)', {'age':'30', 'name':'kevin'})
# 使用关键字参数执行数据库操作
args = {'name':'kuke', 'age':'40'}
cursor.execute('insert into test(name, age) values(:name, :age)', **args)
conn.commit()
cursor.close()
conn.close()

使用字典参数绑定的好处是你的参数位置可以随意变化,并且不会影响你的操作结果


Cursor.executemany(statement, parameters, batcherrors=False, arraydmlrowcounts=False)
从字面意思上就知道这个接口可以一次执行多个数据操作,该参数需要一个列表(list)参数。

  • batcherrors:如果为true,则batcherrors参数可启用Oracle中的批量错误支持,并确保即使在一个或多个参数序列中发生异常,调用也将成功。 然后可以使用getbatcherrors()检索错误。
  • arraydmlrowcounts:如果为true,则arraydmlrowcounts参数可以在执行Oracle方法之后使用Oracle DML检索行计数
    然后可以使用getarraydmlrowcounts()检索行计数。

Example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# @author youjiang xie
# @E-mail xie_youjiang@163.com
# @time 2017/6/6 0006 9:08
# @project Python
# @file connection
import cx_Oracle
import pdb
# 构建数据源
dsn = cx_Oracle.makedsn('localhost', '1521', 'tomp').replace('SID', 'SERVICE_NAME')
# 获取数据库连接
conn = cx_Oracle.connect('username', 'password', dsn)
# 获取游标
cursor = conn.cursor()
# 使用序列参数执行数据库操作
params = [('gavin', '20'), ('kevin', '20'), ('kuke', '20')]
cursor.executemany('insert into test(name, age) values(:1, :2)', params)
# 使用字典参数执行数据库操作
params = [{'name':'kevin', 'age':'30'}, {'name':'gavin', 'age':'30'}, {'name':'kuke', 'age':'30'}]
cursor.executemany('insert into test(name, age) values(:name, :age)', params)
# 使用预处理执行数据库操作
cursor.prepare('insert into test(name, age) values(:name, :age)')
params = [{'name':'kevin', 'age':'30'}, {'name':'gavin', 'age':'31'}, {'name':'kuke', 'age':'32'}]
cursor.executemany(None, params)
conn.commit()
cursor.close()
conn.close()


Cursor.executemanyprepared(numIters)
该接口是执行指定次数已经准备好并绑定好参数的语句
Example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# @author youjiang xie
# @E-mail xie_youjiang@163.com
# @time 2017/6/6 0006 9:08
# @project Python
# @file connection
import cx_Oracle
import pdb
# 构建数据源
dsn = cx_Oracle.makedsn('localhost', '1521', 'tomp').replace('SID', 'SERVICE_NAME')
# 获取数据库连接
conn = cx_Oracle.connect('username', 'password', dsn)
# 获取游标
cursor = conn.cursor()
# 使用预处理执行数据库操作
cursor.prepare("""insert into test(name, age) values('gavin', '30')""")
# insert语句将被执行3次,表中的结果会有3条记录,这个函数还有没有其它的使用方式暂时还没有摸索出来,如果你知道请留言给我。
cursor.executemanyprepared(3)
conn.commit()
cursor.close()
conn.close()

查询数据

查询数据返回结果集主要有4个接口定义如下
Cursor.fetchall()
获取所有的查询结果,它返回的结果集只一个元组列表
Cursor.fetchmany([numRows=cursor.arraysize ])
如果该接口不传参数则它的效果和fetchall一样,如果传了参数则表示要查询指定条数记录,如果表中的结果小于指定查询条数,则返回表中所有数据
Cursor.fetchone()
该接口是一条一条的获取记录知道获取完所有记录返回None
Cursor.fetchraw
该接口返回实际获取的行数,它将下一组查询结果提取到游标定义的内部缓冲区中
Example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# @author youjiang xie
# @E-mail xie_youjiang@163.com
# @time 2017/6/6 0006 9:08
# @project Python
# @file connection
import cx_Oracle
import pdb
# 构建数据源
dsn = cx_Oracle.makedsn('192.168.102.37', '1521', 'tomp').replace('SID', 'SERVICE_NAME')
# 获取数据库连接
conn = cx_Oracle.connect('doron_tomp', 'doron1234~', dsn)
# 获取游标
cursor = conn.cursor()
cursor.execute('select * from test')
# 返回所有的查询结果
result = cursor.fetchall()
print result
# 返回指定数量的查询结果
result = cursor.fetchmany(10)
print result
# 一行一行获取所有查询结果
while True:
result = cursor.fetchone()
print result
if result is None:
break
# 返回实际获取的行数,它将下一组查询结果提取到游标定义的内部缓冲区中
result = cursor.fetchraw(10)
print result
cursor.close()
conn.close()

cx_Oracle订阅功能的使用

通过上面的讲解,对cx_Oracle操作数据库的基本操作是没有问题了,下面详细介绍一下cx_Oracle中的订阅功能,要使用订阅功能毫无疑问的是需要用到subscribe接口,该接口定义如下

Connection.subscribe(namespace=cx_Oracle.SUBSCR_NAMESPACE_DBCHANGE, protocol=cx_Oracle.SUBSCR_PROTO_OCI, callback=None, timeout=0, operations=
OPCODE_ALLOPS, port=0, qos=0)

该接口返回一个订阅对象,namespace,protocol目前只能是cx_Oracle.SUBSCR_NAMESPACE_DBCHANGE,cx_Oracle.SUBSCR_PROTO_OCI值

  • callback:callback参数期望是一个只带一个参数的回调函数,该参数是一个message object对象,该消息对象的熟悉我会在下面做详细介绍
  • timeout: 指定订阅的超时时间,默认为0,表示永不超时
  • operator: 该参数指定需要订阅哪些消息,消息可以是(insert, update, delete, alter)
  • port: 该端口指定来自数据库服务器的回调通知的侦听端口。 如果未指定,则数据库将选择未使用的端口
  • qos:qos参数指定服务质量选项。它可以是cx_Oracle.SUBSCR_QOS_RELIABLE,
    cx_Oracle.SUBSCR_QOS_DEREG_NFY,cx_Oracle.SUBSCR_QOS_ROWIDS,
    cx_Oracle.SUBSCR_QOS_QUERY,cx_Oracle.SUBSCR_QOS_BEST_EFFORT的组合。

消息对象(Message Object)

这个消息对象就是callback回调函数的参数,如果订阅的数据库表发生订阅变化那么它会产生一个消息对象给回调函数,该消息对象有5个属性。

  • Message.dbname: 只读属性,返回数据库名称
  • Message.queries: 这个只读属性返回一个消息查询对象列表,它提供有关查询结果集的信息已更改此通知。如果qos参数在创建订阅时不包含标志SUBSCR_QOS_QUERY,则此属性将为None
  • Message.subscription: 此只读属性返回生成此通知的订阅对象
  • Message.tables: 只读属性返回一个消息表对象列表,它为此通知提供有关更改表的信息。如果qos参数包含标志SUBSCR_QOS_QUERY,则此属性将为None
  • Message.type: 只读属性,返回消息类型

表消息对象(Table Objects)

该对象包含三个属性

  • MessageTable.name: 只读属性,返回已更改的表的名称。
  • MessageTable.operation: 只读属性,返回在更改的表上发生的操作。
  • MessageTable.rows: 只读属性返回一个消息行对象的列表,它们提供有关在表上更改的行的信息。仅当Connection.subscribe()方法的qos参数时包含标志SUBSCR_QOS_ROWIDS,才会填充该值

行消息对象(Rows Objects)

该对象包含两个属性

  • MessageRow.operator: 只读属性,返回在更改的行上发生的操作
  • MessageRow.rowid: 只读属性,返回已更改的行的rowid

Example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
#!/usr/bin/python
# -*- coding:utf-8 -*-
# @author youjiang xie
# @E-mail xie_youjiang@163.com
# @time 2017/7/6 0006 13:47
# @project Python
# @file test
import cx_Oracle
def OperationToString(operation):
operations = []
if operation & cx_Oracle.OPCODE_INSERT:
operations.append("insert")
if operation & cx_Oracle.OPCODE_DELETE:
operations.append("delete")
if operation & cx_Oracle.OPCODE_UPDATE:
operations.append("update")
if operation & cx_Oracle.OPCODE_ALTER:
operations.append("alter")
if operation & cx_Oracle.OPCODE_DROP:
operations.append("drop")
if operation & cx_Oracle.OPCODE_ALLOPS:
operations.append("all operations")
return ", ".join(operations)
def OnChanges(message):
print "Message received"
print " Database Name:", message.dbname
print " Tables:"
for table in message.tables:
print " Name:", table.name,
print " Operations:",
print OperationToString(table.operation)
if table.rows is None \
or table.operation & cx_Oracle.OPCODE_ALLROWS:
print " Rows: all rows"
else:
print " Rows:"
for row in table.rows:
print " Rowid:", row.rowid
print " Operation:",
print OperationToString(row.operation)
connection = cx_Oracle.Connection("username/password@192.168.88.59/sid", events=True)
sql = 'select * from TRAFFIC_VEHICLE_PASS'
subscriptionInsertUpdate = \
connection.subscribe(callback = OnChanges,
operations = cx_Oracle.OPCODE_INSERT | \
cx_Oracle.OPCODE_UPDATE, rowids=True)
subscriptionInsertUpdate.registerquery(sql)
raw_input("Hit enter to terminate...\n")

结果:

cx_Oracle的订阅功能不能返回改变行的结果集,只能得到rowid,不过可以通过rowid从数据库里查询到该条记录。不知道有没有其它的办法,如果你知道请给我留言

会话池(SessionPool)

会话池允许非常快速地连接到数据库,并且是主要的使用在快速连续多次进行相同连接的服务器,特别是对于web服务器,SessionPool接口定义如下
cx_Oracle.SessionPool(user, password, database, min, max, increment[, connectiontype=
cx_Oracle.Connection, threaded=False, getmode=cx_Oracle.SPOOL_ATTRVAL_NOWAIT, homogeneous=True, externalauth=False, encoding=None, nencoding=None, edition=None ])

  • min: 最小会话数
  • max: 最大会话数
  • increment: 增长步长,当最小会话数都已经被占用了的时候,这个时候就需要会话池中新增会话连接,这个时候就会以increment设置的数量增长连接数

Example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#!/usr/bin/python
# -*- coding:utf-8 -*-
# @author youjiang xie
# @E-mail xie_youjiang@163.com
# @time 2017/7/6 0006 13:47
# @project Python
# @file test
import cx_Oracle
import threading
poolConn = cx_Oracle.SessionPool("doron_tomp", "doron1234~", "192.168.102.37/tomp", 2, 5, 1,
threaded = True)
def query_data1():
conn = poolConn.acquire()
cursor = conn.cursor()
print("query_data1(): beginning execute...")
cursor.execute('select * from test')
print("query_data1(): done execute...")
while True:
rows = cursor.fetchmany()
if not rows:
break
print rows
print("TheLongQuery(): all done!")
def query_data2():
conn = poolConn.acquire()
cursor = conn.cursor()
print("query_data2(): beginning execute...")
cursor.execute('select * from test')
print("query_data2(): done execute...")
while True:
rows = cursor.fetchmany()
if not rows:
break
print rows
print("TheLongQuery(): all done!")
thread1 = threading.Thread(None, query_data1)
thread1.start()
thread2 = threading.Thread(None, query_data2)
thread2.start()
thread1.join()
thread2.join()
print("All done!")


推荐我的微信公众号:爱做饭的老谢

上一篇:kafka,redis,zookeeper,cx_Oracle在python中的应用
下一篇:聚沙成塔-爬虫系列(一)(基础环境搭建)