#! /usr/bin/env python
# -*- coding: utf-8 -*-
# vim:fenc=utf-8
#
# Copyright © 2014 uralbash <root@uralbash.ru>
#
# Distributed under terms of the MIT license.
import json
import deform
import colander
from sqlalchemy import Column, Boolean
from colanderalchemy import SQLAlchemySchemaNode
from sqlalchemy.orm.properties import ColumnProperty, RelationshipProperty
from sqlalchemy.orm.relationships import MANYTOONE, ONETOMANY, MANYTOMANY
from sqlalchemy.dialects.postgresql import JSON, JSONB, HSTORE
from saexttype import ChoiceType
from sacrud.common import columns_by_group, get_relationship
from .common import get_pk, get_column_param, _sa_row_to_choises
from .widgets import HiddenCheckboxWidget
[docs]class JSONType(colander.SchemaType):
[docs] def serialize(self, node, appstruct):
if appstruct is colander.null:
return colander.null
return json.dumps(appstruct, indent=2, ensure_ascii=False)
[docs] def deserialize(self, node, cstruct):
if not cstruct:
return colander.null
try:
return cstruct
except Exception:
raise colander.Invalid(
node, '"{}" is not a valid JSON'.format(cstruct))
[docs]def property_values(dbsession, column):
choices = dbsession.query(column.mapper).all()
return [('', '-- Choose your option --')] + _sa_row_to_choises(choices)
[docs]def is_columntype(column, target):
if hasattr(column, 'type') and isinstance(column.type, target):
return True
return False
[docs]def get_single_field_relatioships(table):
relationships = get_relationship(table)
return {list(rel.local_columns)[0]: rel for rel in relationships
if len(rel.local_columns) == 1}
[docs]class SacrudForm(object):
def __init__(self, dbsession, obj, table):
self.dbsession = dbsession
self.obj = obj
self.table = table
self.columns_by_group = columns_by_group(self.table)
self.schema = colander.Schema()
self.relationships = get_single_field_relatioships(self.table)
def __call__(self, request=None):
self.translate = request.localizer.translate if request else None
appstruct = self.make_appstruct()
form = deform.Form(self.schema)
form.set_appstruct(appstruct)
return form
[docs] def make_appstruct(self):
appstruct = {}
for group_name, columns in self.columns_by_group:
group = self.group_schema(group_name, columns)
self.schema.add(group)
appstruct = dict(
list({group_name: group.dictify(self.obj)}.items()) +
list(appstruct.items())
)
return appstruct
[docs] def group_schema(self, group, columns):
columns = self.preprocessing(columns)
includes = [x for x in columns]
return SQLAlchemySchemaNode(self.table,
name=group,
title=group,
includes=includes)
[docs] def get_relationship_schemanode(self, column):
default = None
selected = []
relationship = getattr(self.obj, column.key, None)
values = property_values(self.dbsession, column)
def is_required_field(column):
if all([col.nullable for col in column.local_columns]):
return None
return colander.required
if column.direction is MANYTOONE:
if relationship:
default = get_pk(relationship)
field = colander.SchemaNode(
colander.String(),
title=get_column_param(column, 'title', self.translate),
description=get_column_param(column, 'description',
self.translate),
name=column.key + '[]',
default=default,
missing=is_required_field(column),
widget=deform.widget.SelectWidget(values=values))
elif column.direction in (ONETOMANY, MANYTOMANY):
if relationship:
try:
iter(relationship)
selected = [get_pk(x) for x in relationship]
except TypeError:
selected = []
field = colander.SchemaNode(
colander.Set(),
title=get_column_param(column, 'title', self.translate),
description=get_column_param(column, 'description',
self.translate),
name=column.key + '[]',
default=selected,
missing=None,
widget=deform.widget.SelectWidget(
values=values,
multiple=True,
size="7",
css_class='browser-default'
),
)
return field
[docs] def preprocessing(self, columns):
new_column_list = []
for class_member_name, column in columns.items():
if column.foreign_keys:
column = self.relationships.get(column, column)
if hasattr(column, 'property'):
column = column.property
if isinstance(column, ColumnProperty):
column = column.columns[0]
# Check types
if not isinstance(column, (Column, ColumnProperty,
RelationshipProperty)):
continue
elif isinstance(column, RelationshipProperty):
field = self.get_relationship_schemanode(column)
new_column_list.append(field)
elif is_columntype(column, ChoiceType):
field = colander.SchemaNode(
colander.String(),
title=get_column_param(column, 'title', self.translate),
description=get_column_param(column, 'description',
self.translate),
name=column.key,
missing=None,
widget=deform.widget.SelectWidget(
values=column.type.choices,
),
)
new_column_list.append(field)
elif is_columntype(column, (JSON, JSONB, HSTORE)):
try:
column.info['colanderalchemy']['typ']
except KeyError:
column.info['colanderalchemy']['typ'] = JSONType()
try:
column.info['colanderalchemy']['widget']
except KeyError:
column.info['colanderalchemy']['widget'] =\
deform.widget.TextAreaWidget()
new_column_list.append(class_member_name)
elif is_columntype(column, Boolean):
field = colander.SchemaNode(
colander.Boolean(),
title=get_column_param(column, 'title',
self.translate),
description=get_column_param(column, 'description',
self.translate),
name=column.key,
widget=HiddenCheckboxWidget(),
missing=None,
)
new_column_list.append(field)
elif isinstance(column, (ColumnProperty, Column)):
new_column_list.append(class_member_name)
return new_column_list