ORM Syntax
Create table
from datetime import datetime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy import Column, Integer, String, DateTime
from sqlalchemy.orm.session import Session
#engine = create_engine('database_engine://user:password@host/database_name')
engine = create_engine('postgresql://postgres:admin@localhost/mydatabasepython')
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer(), primary_key=True)
username = Column(String(50), nullable=False, unique=True)
mail = Column(String(50), nullable=False, unique=True)
created_at = Column(DateTime(), default=datetime.now())
def __str__(self):
return self.username
Session = sessionmaker(engine)
session = Session()
User.metadata.create_all(engine) # Create the table defined
Drop table
The "User" object must be created (it doesn't matter if it has a different name than the table creator object) and it must have at least the "id" attribute (it doesn't matter if it has another name, the important thing is that it has the "primary_key" attribute ) to be able to execute the delete statement “drop_all”
from datetime import datetime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy import Column, Integer, String, DateTime
from sqlalchemy.orm.session import Session
#engine = create_engine('database_engine://user:password@host/database_name')
engine = create_engine('postgresql://postgres:admin@localhost/mydatabasepython')
Base = declarative_base()
class User(Base):
__tablename__ = 'users'
id = Column(Integer(), primary_key=True)
Session = sessionmaker(engine)
session = Session()
User.metadata.drop_all(engine)
INSERT INTO
Continuing with the example of the "User" class, a new instance of this class is created and the "add" method is executed, followed by the "commit" method.
user1 = User(username='user1', mail='user1@gmail.com')
session.add(user1)
session.commit()
SELECT
# SELECT * FROM users
users = session.query(User) # Getting objects type User
for user in users:
print(user)
In case of requiring only some fields in the query
# SELECT username, mail FROM users
users = session.query(User.username,User.mail) # Getting tuples
for user in users:
print(user)
In case of requiring a single registration
user = session.query(User).first()
print(user)
Another option is to use the "one" method, which differs from the "first" method in that if no record is obtained, an exception is generated, therefore it must be used within a "Try-except" block.
from sqlalchemy.exc import NoResultFound
.
.
.
try:
user = session.query(User).filter(User.id == 100).one()
print(user)
except NoResultFound:
print('No record found')
WHERE
# SELECT * FROM users WHERE id > 2
users = session.query(User).filter(User.id > 2)
for user in users:
print(user)
UPDATE
Ordinary method: Get the record, update the data and save changes:
user = session.query(User).filter(User.id == 1).first() # Getting the user
user.username = 'user111' # Updating the data
user.mail = 'user111@mail.com'
session.add(user) # Save the data
session.commit() # Commit the table
1-statement method:
user = session.query(User).filter(User.id == 1).update(
{
User.username: 'user111',
User.mail: 'user111@mail.com'
}
)
session.commit() # Commit the table
DELETE
# DELETE FROM users WHERE id = 1
session.query(User).filter(User.id == 1).delete()
session.commit() # Commit the table
If the object is already instantiated, proceed directly:
session.delete(user3)
session.commit()
CRUD
class User_operator():
def create_users(self, *users):
'''This function create users
Use:
1) create_users(['my username','my@email.com']): To create an user
2) create_users(
['username_1','user_1@mail.com'],
.
.
.
['username_n','user_n@mail.com']
): To create a group of users
'''
if len(users) == 1:
user = User(username=users[0][0], email=users[0][1])
db.session.add(user)
db.session.commit()
elif len(users) > 1:
for item in users:
user = User(username=item[0], email=item[1])
db.session.add(user)
db.session.commit()
def read_users(self, *id):
'''This function return the users found on the data base
Use:
1) read_users('all'): To return all users
2) read_users(an specific id): To return a user
3) read_users(1,3,6): To return the users which id is 1, 3, and 6
'''
if (len(id) == 1) and (str(id[0]).lower() == 'all'):
users = db.session.query(User).all()
return users
elif (len(id) == 1) and ((str(id[0])).isnumeric()):
user = db.session.query(User).filter(User.id == id[0]).first()
return user
elif len(id) > 1:
users = db.session.query(User).filter(User.id.in_(id)).all()
return users
def update_users(self, *id):
'''This function update the user which id has passed and return the user updated
Use:
1) update_users('all',{'email':'updated@mail.com'}): To update all users
2) update_users(5,{'username':'user updated','email':'updated@mail.com'}): To update an user
'''
if len(id) == 2 and (str(id[0]).lower() == 'all'):
db.session.query(User).update(
{
User.email: id[1]['email']
}
)
db.session.commit()
elif len(id) == 2 and ((str(id[0])).isnumeric()):
db.session.query(User).filter(User.id == id[0]).update(
{
User.username: id[1]['username'],
User.email: id[1]['email']
}
)
db.session.commit()
def delete_users(self, *id):
'''This function delete the users and return True/False
Use:
1) delete_users('all'): To delete all users
2) delete_users(an specific id): To delete an user
3) delete_users(1,3,6): To delete the users which id is 1, 3, and 6
'''
if len(id) == 1 and (str(id[0]).lower() == 'all'):
db.session.query(User).delete()
db.session.commit()
if len(id) == 1 and ((str(id[0])).isnumeric()):
db.session.query(User).filter(User.id == id[0]).delete()
db.session.commit()
elif len(id) > 1:
db.session.query(User).filter(User.id.in_(id)).delete()
db.session.commit()
Thanks for reading :)
I invite you to continue reading other entries and visiting us again soon.