ORM Syntax

Notes Single

Create table

from datetime import datetime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy import Column, Integer, String, DateTime
from sqlalchemy.orm.session import Session

#engine = create_engine('database_engine://user:password@host/database_name')
engine = create_engine('postgresql://postgres:admin@localhost/mydatabasepython')

Base = declarative_base()

class User(Base):
    __tablename__ = 'users'

    id = Column(Integer(), primary_key=True)
    username = Column(String(50), nullable=False, unique=True)
    mail = Column(String(50), nullable=False, unique=True)
    created_at = Column(DateTime(), default=datetime.now())

    def __str__(self):
        return self.username

Session = sessionmaker(engine)
session = Session()

User.metadata.create_all(engine) # Create the table defined

Drop table

The "User" object must be created (it doesn't matter if it has a different name than the table creator object) and it must have at least the "id" attribute (it doesn't matter if it has another name, the important thing is that it has the "primary_key" attribute ) to be able to execute the delete statement “drop_all”

from datetime import datetime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy import Column, Integer, String, DateTime
from sqlalchemy.orm.session import Session

#engine = create_engine('database_engine://user:password@host/database_name')
engine = create_engine('postgresql://postgres:admin@localhost/mydatabasepython')

Base = declarative_base()

class User(Base):
    __tablename__ = 'users'

    id = Column(Integer(), primary_key=True)

Session = sessionmaker(engine)
session = Session()

User.metadata.drop_all(engine)

INSERT INTO

Continuing with the example of the "User" class, a new instance of this class is created and the "add" method is executed, followed by the "commit" method.

user1 = User(username='user1', mail='user1@gmail.com')
session.add(user1)
session.commit()

SELECT

# SELECT * FROM users
users = session.query(User) # Getting objects type User
for user in users:
    print(user)

In case of requiring only some fields in the query

# SELECT username, mail FROM users
users = session.query(User.username,User.mail) # Getting tuples
for user in users:
    print(user)

In case of requiring a single registration

user = session.query(User).first()
print(user)

Another option is to use the "one" method, which differs from the "first" method in that if no record is obtained, an exception is generated, therefore it must be used within a "Try-except" block.

from sqlalchemy.exc import NoResultFound
.
.
.
try:
    user = session.query(User).filter(User.id == 100).one()
    print(user)
except NoResultFound:
    print('No record found')

WHERE

# SELECT * FROM users WHERE id > 2
users = session.query(User).filter(User.id > 2)
for user in users:
    print(user)

UPDATE

Ordinary method: Get the record, update the data and save changes:

user = session.query(User).filter(User.id == 1).first()  # Getting the user
user.username = 'user111'  # Updating the data
user.mail = 'user111@mail.com'
session.add(user)  # Save the data
session.commit()  # Commit the table

1-statement method:

user = session.query(User).filter(User.id == 1).update(
    {
        User.username: 'user111',
        User.mail: 'user111@mail.com'
    }
)
session.commit()  # Commit the table

DELETE

# DELETE FROM users WHERE id = 1
session.query(User).filter(User.id == 1).delete()
session.commit()  # Commit the table

If the object is already instantiated, proceed directly:

session.delete(user3)
session.commit()

CRUD

class User_operator():

    def create_users(self, *users):
        '''This function create users

        Use:
        1) create_users(['my username','my@email.com']): To create an user
        2) create_users(
                ['username_1','user_1@mail.com'],
                .
                .
                .
                ['username_n','user_n@mail.com']
            ): To create a group of users

        '''
        if len(users) == 1:
            user = User(username=users[0][0], email=users[0][1])
            db.session.add(user)
            db.session.commit()
        elif len(users) > 1:
            for item in users:
                user = User(username=item[0], email=item[1])
                db.session.add(user)
                db.session.commit()

    def read_users(self, *id):
        '''This function return the users found on the data base

        Use:
        1) read_users('all'): To return all users
        2) read_users(an specific id): To return a user
        3) read_users(1,3,6): To return the users which id is 1, 3, and 6

        '''
        if (len(id) == 1) and (str(id[0]).lower() == 'all'):
            users = db.session.query(User).all()
            return users
        elif (len(id) == 1) and ((str(id[0])).isnumeric()):
            user = db.session.query(User).filter(User.id == id[0]).first()
            return user
        elif len(id) > 1:
            users = db.session.query(User).filter(User.id.in_(id)).all()
            return users

    def update_users(self, *id):
        '''This function update the user which id has passed and return the user updated

        Use:
        1) update_users('all',{'email':'updated@mail.com'}): To update all users
        2) update_users(5,{'username':'user updated','email':'updated@mail.com'}): To update an user

        '''
        if len(id) == 2 and (str(id[0]).lower() == 'all'):
            db.session.query(User).update(
                {
                    User.email: id[1]['email']
                }
            )
            db.session.commit()
        elif len(id) == 2 and ((str(id[0])).isnumeric()):
            db.session.query(User).filter(User.id == id[0]).update(
                {
                    User.username: id[1]['username'],
                    User.email: id[1]['email']
                }
            )
            db.session.commit()

    def delete_users(self, *id):
        '''This function delete the users and return True/False

        Use:
        1) delete_users('all'): To delete all users
        2) delete_users(an specific id): To delete an user
        3) delete_users(1,3,6): To delete the users which id is 1, 3, and 6

        '''
        if len(id) == 1 and (str(id[0]).lower() == 'all'):
            db.session.query(User).delete()
            db.session.commit()
        if len(id) == 1 and ((str(id[0])).isnumeric()):
            db.session.query(User).filter(User.id == id[0]).delete()
            db.session.commit()
        elif len(id) > 1:
            db.session.query(User).filter(User.id.in_(id)).delete()
            db.session.commit()

Thanks for reading :)
I invite you to continue reading other entries and visiting us again soon.

Related Posts: