SQLAlchemy快速更新或插入对象

写脚本刷数据的时候,常常有这样的需求:如果对象存在,那么更新数据,否则,插入数据。 有可能数据源的数据比schema的字段更多,这种时候,就需要想办法把SQLAlchemy中的schema字段取出来, 只取需要的字段了。假设我们的schema叫做 User

valid_columns = set(User.__table__.columns.keys())  # 获取model中定义的columns

这样就能获取到合法的schema字段,然后我们把数据源的数据过滤:

valid_attrs = {k: v for k, v in i.items() if k in valid_columns}

如果其中有需要单独处理的字段,我们进行特殊处理:

adjust_time(valid_attrs)


def adjust_time(attrs):
    # mongo存储的是毫秒时间戳
    attrs["created_at"] = datetime.datetime.fromtimestamp(attrs["created_at"] / 1000.0)
    attrs["updated_at"] = datetime.datetime.fromtimestamp(attrs["updated_at"] / 1000.0)
    if attrs.get("deleted_at"):
        attrs["deleted_at"] = datetime.datetime.fromtimestamp(attrs["deleted_at"] / 1000.0)

之后我们就可以愉快的对SQLAlchemy对象进行更新或者插入了:

already_exist = User.get_by_id(s, i["user_id"])
if already_exist:
    logging.info("update item: %s", i)
    User.update_by_user_id(s, already_exist.user_id, valid_attrs)
else:
    logging.info("insert item: %s", i)
    s.add(User(**valid_attrs))

其中,model中的 update_by_user_id定义如下:

@classmethod
def update_by_user_id(cls, session, user_id, attr_map):
    session.query(cls).filter(cls.user_id == user_id).update(attr_map)

搞定!


更多文章
  • GIN 是如何绑定参数的
  • 你好 2022(2021 年终总结)
  • 用Go导入大型CSV到PostgreSQL
  • 使用 OpenWRT 搭建软路由
  • 使用软KVM切换器 barrier 共享键鼠
  • SQL 防注入及原理
  • 使用 gomock 测试 Go 代码
  • gevent不是黑魔法(二): gevent 实现
  • gevent不是黑魔法(一): greenlet 实现
  • 用 entgo 替代 gorm
  • 应用内使用crontab不是那么方便
  • 单测时要不要 mock 数据库?
  • Sentry 自建指南
  • 用selenium完成自动化任务
  • 用闲置的安卓手机做垃圾电话短信过滤