阿里云
阿里云多端小程序中小企业获客首选
发表主题 回复主题
  • 1379阅读
  • 0回复

[分享]使用函数计算对表格存储中数据做简单清洗

级别: 论坛编辑
发帖
6124
云币
12022
G Q}Rxu]  
函数计算(Function Compute) 是一个事件驱动的服务,通过函数计算,用户无需管理服务器等运行情况,只需编写代码并上传。函数计算准备计算资源,并以弹性伸缩的方式运行用户代码,而用户只需根据实际代码运行所消耗的资源进行付费。 Up*6K=Tny  
Table Store Stream是用于获取Table Store表中增量数据的一个数据通道,通过创建Table Store触发器,能够实现Table Store Stream和函数计算的动对接,让计算函数中自定义的程序逻辑自动处理Table Store表中发生的数据修改 B-V   
表格存储高并发的写入性能以及低廉的存储成本非常适合物联网、日志、监控数据的存储,我们可以将数据写入到表格存储中,同时在函数计算中对新增的数据做简单的清洗、转换、聚合计算等操作,并将清洗之后的数据写回到表格存储的结果表中,并对原始明细数据及结果数据提供实时访问。 L#fSP  
下面,我们使用函数计算对表格存储中的数据做简单的清洗,并写入到结果表中。 H>wXQ5?W;  
E//*bmww  
数据定义 *_"lXcG.  
caZEZk#r;  
- XB[2h  
我们假设写入的为日志数据,包括三个基础字段:[tr=rgb(246, 248, 250)][td]level
字段名称类型含义
id整型日志id
整型日志的等级,越大表明等级越高
message字符串日志的内容
k[\JT[Mp  
我们需要将 level>1 的日志写入到另外一张数据表中,用作专门的查询。 r@ba1*y0  
aZ}z/.b]  
实现过程: 1pT/`x  
Vu,:rPqI  
X [;n149o  
aZCxyoh+  
创建实例及数据表 ._<gc;G  
Ca0t}`<S  
`tm(3pJ  
表格存储的控制台创建表格存储实例(__本次以 华东2 distribute-test 为例__),并创建源表(__source_data__)及结果表(__result__),主键为均 __id (整型)__,由于表格存储是 schemafree 结构,无需预先定义其他属性列字段。 =6B I[_0  
^ ?T,>ZI  
Hr/J6kyB)  
开启数据源表的Stream功能 [yVcH3GcjI  
Tx/KL%X  
Pcr;+'q  
触发器功能需要先开启数据表的Stream功能,才能在函数计算中处理写入表格存储中的增量数据。 kv`5"pa7M  
W |+&K0M  
Stream记录过期时长 为通过 StreamAPI 能够读取到的增量数据的最长时间。 to)Pl}9QkK  
由于触发器只能绑定现有的函数,故先到函数计算的控制台上在同region创建服务及函数。 ,62BZyT,T,  
C12y_E8Un  
创建函数计算服务 )==Qo/N:  
hYh~[Kr^@^  
E /V`NqC  
函数计算的控制台上创建服务及处理函数,我们继续使用华东2节点。 o Hrx$>W]  
1.在华东2节点创建服务。 "H}ae7@  
j{P3o<l&`  
2.创建函数依次选择:空白函数——不创建触发器。 i(kr#XsU  
  • 函数名称为:etl_test,选择 python2.7 环境,在线编辑代码
  • 函数入口为:etl_test.handler
  • 代码稍后编辑,点击下一步。
LdyE*u_  
3.进行服务授权 ,tuZ_"?M  
由于函数计算需要将运行中的日志写入到日志服务中,同时,需要对表格存储的表进行读写,故需要对函数计算进行授权,为方便起见,我们先添加 AliyunOTSFullAccess 与 __AliyunLogFullAccess __权限,实际生产中,建议根据权限最小原则来添加权限。 M:w]g`LKl  
4.点击授权完成,并创建函数。 m\ /V0V\  
5.修改函数代码。 rE\.[mFI  
创建好函数之后,点击对应的函数—代码执行,编辑代码并保存,其中,INSTANCE_NAME(表格存储的实例名称)、REGION(使用的区域)需要根据情况进行修改: vo2TP:  
使用示例代码如下:
引用
#!/usr/bin/env python# -*- coding: utf-8 -*-import cborimport jsonimport tablestore as otsINSTANCE_NAME = 'distribute-test'REGION = 'cn-shanghai'ENDPOINT = 'http://%s.%s.ots-internal.aliyuncs.com'%(INSTANCE_NAME, REGION)RESULT_TABLENAME = 'result'def _utf8(input):return str(bytearray(input, "utf-8"))def get_attrbute_value(record, column):attrs = record[u'Columns']for x in attrs:if x[u'ColumnName'] == column:return x['Value']def get_pk_value(record, column):attrs = record[u'PrimaryKey']for x in attrs:if x['ColumnName'] == column:return x['Value']#由于已经授权了AliyunOTSFullAccess权限,此处获取的credentials具有访问表格存储的权限def get_ots_client(context):creds = context.credentialsclient = ots.OTSClient(ENDPOINT, creds.accessKeyId, creds.accessKeySecret, INSTANCE_NAME, sts_token = creds.securityToken)return clientdef save_to_ots(client, record):id = int(get_pk_value(record, 'id'))level = int(get_attrbute_value(record, 'level'))msg = get_attrbute_value(record, 'message')pk = [(_utf8('id'), id),]attr = [(_utf8('level'), level), (_utf8('message'), _utf8(msg)),]row = ots.Row(pk, attr)client.put_row(RESULT_TABLENAME, row)def handler(event, context):records = cbor.loads(event)#records = json.loads(event)client = get_ots_client(context)for record in records['Records']:level = int(get_attrbute_value(record, 'level'))if level > 1:save_to_ots(client, record)else:print "Level <= 1, ignore."
<(Ktf0'__  
对表格存储 Stream 数据的格式详情请参考Stream 数据处理 *1_A$14 l  
.vXe}%  
绑定触发器 eXMl3Lxf  
9wv 7 HD|  
2graLJ?9Z  
1.回到表格存储的实例管理页面,点击表 source_data 后的 使用触发器 按钮,进入触发器绑定界面,点击使用已有函数计算, 选择刚创建的服务及函数,勾选 表格存储发送事件通知的权限, 进行确定。 U6oab9C?k  
2.绑定成功之后,能够看到如下的信息 #:/-8Z(0  
SL>0_  
1PT0<C-  
运行验证 D~%h3HM  
_1P8rc"Dx  
o<%s\n  
1.向 source_data 表中写入数据。
引用
在 __source_data__ 的数据管理页面,点击插入数据,如图依次填入id、level及message信息。
)UG<KcdI  
-44''w?z  
2.在 result 表中查询清洗后的数据
引用
点击 result 表的数据管理页面,会查询到刚写入到 source_data 中的数据。当然,向 soure_data 写入level <=1的数据将不会同步到 result 表中
Le}-F{~`^  
发表主题 回复主题
« 返回列表上一主题下一主题

限100 字节
如果您提交过一次失败了,可以用”恢复数据”来恢复帖子内容
 
验证问题: 阿里云官网域名是什么? 正确答案:www.aliyun.com
上一个 下一个
      ×
      全新阿里云开发者社区, 去探索开发者的新世界吧!
      一站式的体验,更多的精彩!
      通过下面领域大门,一起探索新的技术世界吧~ (点击图标进入)