1

如何创建允许我插入到 SQL Server 上的 Geography 数据类型的 SQLAlchemy UserDefinedType?

我正在使用 Python 3.6 和Pandas to_sql写入一个 SQL Server 表,该表将具有一个地理数据类型的列。我正在使用SQLAlchemy.create_engine使用 DRIVER={ODBC Driver 13 for SQL Server} 创建与 SQLExpress 的数据库连接。我将多边形存储在 GeoPandas 数据框中。

SQLAlchemy 1.3.10不直接提供对 Geography 或 Geometry 数据类型的支持;而且,GeoAlchemy2不支持 MS SQL Server。我一直在尝试使用 SQLAlchemy 的UserDefinedType来查看是否可以得到以下结果:

GEOGRAPHY::STGeomFromText(GEOMETRY::STGeomFromText('POLYGON((-110.17315242968752 52.66767554218751,-110.18536282187502 52.66770066015627,-110.19718901640618 52.66771763203127,-110.197593865625 52.667718411718795,-110.19747227656248 52.67594785000003,-110.1732282007812 52.67592660234379,-110.17315242968752 52.66767554218751))',4269).MakeValid().STUnion(GEOMETRY::STGeomFromText('POLYGON((-110.17315242968752 52.66767554218751,-110.18536282187502 52.66770066015627,-110.19718901640618 52.66771763203127,-110.197593865625 52.667718411718795,-110.19747227656248 52.67594785000003,-110.1732282007812 52.67592660234379,-110.17315242968752 52.66767554218751))',4269).STStartPoint()).STAsText(),4269)

到目前为止,我有:

class Geography(UserDefinedType):

    def get_col_spec(self):
        return "GEOGRAPHY"

    def bind_processor(self, dialect):
        def process(value):
            if value is None:
                return None
            return 'GEOGRAPHY::STGeomFromText(GEOMETRY::STGeomFromText({0},4269).MakeValid().STUnion(GEOMETRY::STGeomFromText({0},4269).STStartPoint()).STAsText(),4269)'.format("'" + value + "'")
        return process

我被这个结果困住了(注意整个事情的引号。:

"GEOGRAPHY::STGeomFromText(GEOMETRY::STGeomFromText('POLYGON ((-110.1731524296875 52.66767554218751, -110.185362821875 52.6677006601563, -110.19718901 ... (382 characters truncated) ...  -110.1974722765625 52.67594785, -110.1732282007812 52.67592660234379, -110.1731524296875 52.66767554218751))',4269).STStartPoint()).STAsText(),4269)"

我知道这会起作用的是SQL:

DECLARE @g NVARCHAR(MAX)
SELECT @g = 'POLYGON((-110.17315242968752 52.66767554218751,-110.18536282187502 52.66770066015627,-110.19718901640618 52.66771763203127,-110.197593865625 52.667718411718795,-110.19747227656248 52.67594785000003,-110.1732282007812 52.67592660234379,-110.17315242968752 52.66767554218751))'
INSERT INTO dbo.[Spatial_Table] ([geometry]) 
VALUES (GEOGRAPHY::STGeomFromText(GEOMETRY::STGeomFromText(@g,4269).MakeValid().STUnion(GEOMETRY::STGeomFromText(@g,4269).STStartPoint()).STAsText(),4269))

使用 pandas.DataFrame.to_sql 会导致错误:

DataError: (pyodbc.DataError) ('22018', '[22018] [Microsoft][ODBC Driver 13 for SQL Server][SQL Server]Operand type clash: ntext is incompatible with geography (206) (SQLExecDirectW); [22018] [Microsoft][ODBC Driver 13 for SQL Server][SQL Server]Statement(s) could not be prepared. (8180)')
[SQL: INSERT INTO dbo.[Spatial_Table] (geometry) VALUES (?)]
[parameters: (("GEOGRAPHY::STGeomFromText(GEOMETRY::STGeomFromText('POLYGON ((-114.4742908039062 51.94055257031255, -114.4623163671875 51.94054674921875, -114.450628 ... (3922 characters truncated) ... .4978060703125 51.9479693648438, -114.4743004117187 51.94797611093753, -114.4742908039062 51.94055257031255))',4269).STStartPoint()).STAsText(),4269)",), ("GEOGRAPHY::STGeomFromText(GEOMETRY::STGeomFromText('POLYGON ((-112.1157004828125 49.53477394218754, -112.1156657703125 49.52339953203125, -112.124386 ... (704 characters truncated) ... 1212683476562 49.53115973984376, -112.1212804195312 49.53477826953127, -112.1157004828125 49.53477394218754))',4269).STStartPoint()).STAsText(),4269)",))]
(Background on this error at: http://sqlalche.me/e/9h9h)

我假设错误是由于周围的引号导致字符串,而不是传递给 SQL 的函数。

我曾尝试在 bind_processor 返回字符串上使用 sqlalchemy.sql.expression.text ,但出现此错误:

ProgrammingError: (pyodbc.ProgrammingError) ('Invalid parameter type.  param-index=0 param-type=TextClause', 'HY105')
[SQL: INSERT INTO [Spatial_Table] (geometry) VALUES (?)]
[parameters: (<sqlalchemy.sql.elements.TextClause object at 0x000002566D5A40F0>,)]
(Background on this error at: http://sqlalche.me/e/f405)

我看过 sqlalchemy.sql.expression.func; 但是,我不知道如何将它与复杂的GEOGRAPHY::STGeomFromText方法一起使用。

我的假设可能不正确,因为这将在 SQL 中起作用:

INSERT INTO dbo.[Spatial_Table] ([geometry]) 
VALUES ('POLYGON((-110.17315242968752 52.66767554218751,-110.18536282187502 52.66770066015627,-110.19718901640618 52.66771763203127,-110.197593865625 52.667718411718795,-110.19747227656248 52.67594785000003,-110.1732282007812 52.67592660234379,-110.17315242968752 52.66767554218751))')

并简单地使用:

class Geography(sqlalchemy.types.UserDefinedType):

    def get_col_spec(self):
        return "GEOGRAPHY"

会导致这个错误,即使参数看起来是正确的。它仍然得到相同的操作数类型冲突错误:

DataError: (pyodbc.DataError) ('22018', '[22018] [Microsoft][ODBC Driver 13 for SQL Server][SQL Server]Operand type clash: ntext is incompatible with geography (206) (SQLExecDirectW); [22018] [Microsoft][ODBC Driver 13 for SQL Server][SQL Server]Statement(s) could not be prepared. (8180)')
[SQL: INSERT INTO [Spatial_Table] (geometry) VALUES (?)]
[parameters: ('POLYGON ((-114.4742908039062 51.94055257031255, -114.4623163671875 51.94054674921875, -114.4506284421875 51.94053819687502, -114.4506230351562 51.933 ... (1739 characters truncated) ... -114.49781949375 51.93693783750001, -114.4978060703125 51.9479693648438, -114.4743004117187 51.94797611093753, -114.4742908039062 51.94055257031255))',)]
(Background on this error at: http://sqlalche.me/e/9h9h)

这是一个完整的例子:

import geopandas
import pyodbc
import urllib
import sqlalchemy

params = 'DRIVER={ODBC Driver 13 for SQL Server};' \
         'SERVER=ServerName;' \
         'PORT=1433;' \
         'DATABASE=DatabaseName;' \
         'trusted_connection=yes;'

params = urllib.parse.quote_plus(params)
db = sqlalchemy.create_engine('mssql+pyodbc:///?odbc_connect=%s' % params)


class Geography(sqlalchemy.types.UserDefinedType):

    def get_col_spec(self):
        return "GEOGRAPHY"

    def bind_processor(self, dialect):
        def process(value):
            if value is None:
                return None
            return 'GEOGRAPHY::STGeomFromText(GEOMETRY::STGeomFromText({0},4269).MakeValid().STUnion(GEOMETRY::STGeomFromText({0},4269).STStartPoint()).STAsText(),4269)'.format("'" + value + "'")
        return process


gdf = geopandas.GeoDataFrame({'geometry':['POLYGON ((-114.4742908039062 51.94055257031255, -114.4623163671875 51.94054674921875, -114.4506284421875 51.94053819687502, -114.4506230351562 51.93329010000002, -114.4506172351562 51.92605222890626, -114.4270012117187 51.92605734296876, -114.42699041875 51.91881815312502, -114.4269793164062 51.91139827421875, -114.4150635234375 51.91142824140627, -114.4150633671875 51.90417951171878, -114.41506316875 51.89693553984375, -114.403441165625 51.89697914921879, -114.3914711632812 51.89696492578128, -114.3914339117187 51.88973628046875, -114.3913953804687 51.88226671015627, -114.3677831210937 51.88229111953126, -114.3677989414062 51.87505186875001, -114.3678146921875 51.86781865390628, -114.3678270578125 51.86058223671876, -114.3678396773437 51.8531650226563, -114.3561627734375 51.85316232265626, -114.34419459375 51.85315451953124, -114.34420228125 51.8459314015625, -114.3442099304687 51.83869999843751, -114.3561678164062 51.83869954843755, -114.3561729164063 51.83145872656252, -114.3561489476562 51.82404763359375, -114.3678250734375 51.82404504843754, -114.367827496875 51.82784562734378, -114.3678343414063 51.83869510312502, -114.3797993828125 51.83869063437504, -114.3797996945312 51.84592788906252, -114.3798041078125 51.85316721640629, -114.3914338414062 51.85316111093755, -114.3914139953125 51.86780720156253, -114.4033583140625 51.86778718046878, -114.4150253679687 51.86777429218751, -114.415020278125 51.8822367257813, -114.432806471875 51.88222315703126, -114.4505780710937 51.882230434375, -114.450589053125 51.88963689218753, -114.450594271875 51.89689434218752, -114.4622547703125 51.89689832265628, -114.4622642132812 51.90414635312504, -114.4622736234375 51.91139415781254, -114.4742338164062 51.91140009453125, -114.4742472351562 51.91883031875, -114.4742673734375 51.92607473984378, -114.4858996257812 51.92606931250003, -114.4978195835938 51.92606766953128, -114.49781949375 51.93693783750001, -114.4978060703125 51.9479693648438, -114.4743004117187 51.94797611093753, -114.4742908039062 51.94055257031255))']})

gdf.to_sql('Spatial_Table',
           if_exists='replace',
           index=False,
           dtype={'geometry': Geography},
           con=db)

我希望看到在 SQLAlchemy 或 GeoAlchemy 中直接支持 MS SQL Server 的地理和几何。

4

2 回答 2

1

而不是bind_processor()在 Python 中进行值处理的 ,bind_expression()用于 SQL 端处理:

class Geography(sqlalchemy.types.UserDefinedType):

    def get_col_spec(self):
        return "GEOGRAPHY"

    def bind_expression(self, bindvalue):
        # Note that this does *not* format the value to the expression text, but
        # the bind value key.
        return text(f'GEOGRAPHY::STGeomFromText(GEOMETRY::STGeomFromText(:{bindvalue.key},4269).MakeValid().STUnion(GEOMETRY::STGeomFromText(:{bindvalue.key},4269).STStartPoint()).STAsText(),4269)').bindparams(bindvalue)
于 2019-11-04T12:05:22.357 回答
1

我刚刚遇到了同样的问题并想出了一个解决方案。这可能会有所帮助:

from sqlalchemy import func
from sqlalchemy.sql.expression import text
from sqlalchemy.sql.schema import Column
from sqlalchemy.types import UserDefinedType, TypeEngine

from geojson_funcs import wkt2geojson, geojson2wkt


class Geography(UserDefinedType):
    cache_ok = True

    def __init__(self, srid: int = 4326):
        self.srid = srid

    def get_col_spec(self):
        return "GEOGRAPHY"

    def bind_expression(self, bindvalue):
        # not able to use func here since "::" in function name
        exp = f"geography::STGeomFromText(:{bindvalue.key}, {self.srid})"
        f = text(exp)
        return f.bindparams(bindvalue)

    def column_expression(self, col: Column):
        # not able to use func here since function needs to be applied as method to the column itself
        # col.func() not func(col)
        exp = f"{col.key}.STAsText()"
        f1 = text(exp)
        f2 = func.IIF(col == None, None, f1, type_=self)
        return f2

    def bind_processor(self, dialect):
        def process(value):
            return geojson2wkt(value) if value else value

        return process

    def result_processor(self, dialect, coltype):
        def process(value):
            return wkt2geojson(value) if value else value

        return process

    class comparator_factory(TypeEngine.Comparator):
        def __add__(self, other):
            return self.op("goofy")(other)

        def intersects_bounds(self, lon_min, lon_max, lat_min, lat_max):
            p = f'POLYGON (({lon_min} {lat_min}, {lon_max} {lat_min}, {lon_max} {lat_max}, {lon_min} {lat_max}, {lon_min} {lat_min}))'
            exp = f"geography::STGeomFromText('{p}', {self.type.srid}).STIntersects({self.expr.name}) = 1"
            f = text(exp)
            return f

我仍然存在的唯一问题是代码中的注释:

  • bind_expression: not able to use func here since "::" in function name
  • column_expression: not able to use func here since function needs to be applied as method to the column itself -> col.func() not func(col)

If someone knows how to solve these to problems I'm happy to update my sample here :)

Further, I'm using this functions to return GeoJson instead of WKT:

from typing import Union

import geojson
import shapely.wkt
from shapely.geometry import shape, GeometryCollection, Point, MultiPoint, \
    LineString, MultiLineString, Polygon, MultiPolygon

GeoJsonType: type = Union[geojson.Feature, geojson.FeatureCollection]
WktType: type = Union[GeometryCollection, Point, MultiPoint, LineString, MultiLineString, Polygon, MultiPolygon]


def wkt2geojson(wkt: str) -> GeoJsonType:
    wkt = shapely.wkt.loads(wkt)
    wkt = validate_wkt(wkt)
    if type(wkt) == GeometryCollection:
        features = [geojson.Feature(geometry=o, properties={}) for o in wkt.geoms]
        feature_collection = geojson.FeatureCollection(features)
        return feature_collection
    else:
        feature = geojson.Feature(geometry=wkt, properties={})
        return feature


def geojson2wkt(data: Union[GeoJsonType, dict]) -> WktType:
    data = validate_geojson(data)
    if data.get('type') == 'Feature':
        g = shape(data.get('geometry'))
        return g.wkt
    elif data.get('type') == 'FeatureCollection':
        g = [shape(f.get('geometry')) for f in data.get('features')]
        gc = GeometryCollection(g)
        return gc.wkt
    else:
        raise Exception


def validate_geojson(data: Union[GeoJsonType, dict]) -> GeoJsonType:
    data = geojson.loads(geojson.dumps(data))
    if not data.is_valid:
        raise ValueError('value is not a valid GeoJson')
    return data


def validate_wkt(data: WktType) -> WktType:
    if not data.is_valid:
        raise ValueError('value is not a valid WKT')
    return data
于 2022-01-28T17:36:23.337 回答