Ordinary Syntax
Tables
Create table
from datetime import datetime
from sqlalchemy import create_engine
from sqlalchemy import MetaData
from sqlalchemy import Table, Column, Integer, String, DateTime
#engine = create_engine('database_engine://user:password@host/database_name')
engine = create_engine('postgresql://postgres:admin@localhost/mydatabasepython')
metadata = MetaData()
# Definition of users table
users = Table(
'users', # Table name
metadata,
#Column('field_name', datatype, parameters)
Column('id', Integer(), primary_key=True),
Column('username', String(), index=True, nullable=False),
Column('email', String(), nullable=False),
Column('created_at', DateTime, default=datetime.now),
)
metadata.create_all(engine) # Create the tables defined
Drop table
from sqlalchemy import create_engine
from sqlalchemy import MetaData
engine = create_engine('postgresql://postgres:admin@localhost/mydatabasepython')
metadata = MetaData()
metadata.drop_all(engine) # Delete all tables
Query table's name
print(users)
Query table column names
print(users.c) # Keyword "c" means "column"
INSERT INTO
connection = engine.connect()
query_insert = users.insert().values(
username='user1',
email='user1@mail.com',
)
print(query_insert) # SQL sentence
connection.execute(query_insert)
connection.close()
Using “with”
with engine.connect() as connection:
query_insert = users.insert().values(
username='user1',
email='user1@mail.com',
)
print(query_insert) # SQL sentence
connection.execute(query_insert)
To insert multiple records using a json file as data source
connection = engine.connect()
query_insert = users.insert()
with open('users.json') as file:
users = json.load(file)
connection.execute(query_insert, users)
SELECT FROM
# SELECT * FROM users
connection = engine.connect()
query_select = users.select()
print(query_select) # SQL sentence
result = connection.execute(query_select)
for user in result.fetchall():
print(user.username)
In case you need to obtain specific fields, you must import the “select” function
from sqlalchemy import select
.
.
.
# SELECT id,username FROM users
connection = engine.connect()
query_select = select([
users.c.id,
users.c.username,
])
print(query_select) # SQL sentence
result = connection.execute(query_select)
for user in result.fetchall():
print(user.username)
In case of obtaining a single record
# SELECT * FROM users
connection = engine.connect()
query_select = users.select()
print(query_select) # SQL sentence
result = connection.execute(query_select)
user = result.fetchone()
print(user.username)
WHERE
# SELECT * FROM users WHERE username = user1
connection = engine.connect()
query_select = users.select(users.c.username == 'user1')
print(query_select) # SQL sentence
result = connection.execute(query_select)
for user in result.fetchall():
print(user.username)
Using the "select" function
# SELECT id,username FROM users WHERE username = user1
connection = engine.connect()
query_select = select([
users.c.id,
users.c.username,
]).where(users.c.username == 'user1')
print(query_select) # SQL sentence
result = connection.execute(query_select)
for user in result.fetchall():
print(user.username)
AND / OR / NOT
from sqlalchemy import and_, or_, not_
.
.
.
# SELECT id,username FROM users WHERE (username = user1 AND id = 1) OR (username = user2 AND …
connection = engine.connect()
query_select = select([
users.c.id,
users.c.username,
]).where(
or_(
and_(
users.c.username == 'user1',
users.c.id == 1,
),
and_(
users.c.username == 'user2',
users.c.id == 2,
),
)
)
print(query_select) # SQL sentence
result = connection.execute(query_select)
for user in result.fetchall():
print(user.username)
ORDER BY / LIMIT
# SELECT id,username FROM users ORDER BY id DESC LIMIT 2
connection = engine.connect()
query_select = select([
users.c.id,
users.c.username,
]).order_by(
users.c.id.desc()
).limit(2)
print(query_select) # SQL sentence
result = connection.execute(query_select)
for user in result.fetchall():
print(user.username)
In the case of requiring an ascending order, the "asc" method can be applied or simply no keyword is used, since ascending order is the default criteria
Another alternative is to import the functions “asc” and “desc”:
from sqlalchemy import asc, desc
.
.
.
# SELECT id,username FROM users ORDER BY id DESC
connection = engine.connect()
query_select = select([
users.c.id,
users.c.username,
]).order_by(
desc(users.c.id)
).limit(2)
print(query_select) # SQL sentence
result = connection.execute(query_select)
for user in result.fetchall():
print(user.username)
UPDATE
# UPDATE users SET username WHERE id = 1
connection = engine.connect()
query_update = users.update(
users.c.id == 1
).values(
username='Francisco'
)
print(query_update) # SQL sentence
result = connection.execute(query_update)
print(result.rowcount) # Affected rows
Another alternative is to use the “update” function
from sqlalchemy import update
.
.
.
# UPDATE users SET username WHERE id = 1
connection = engine.connect()
query_update = update(users).values(username='Francisco').where(users.c.id == 1)
print(query_update) # SQL sentence
result = connection.execute(query_update)
print(result.rowcount) # Affected rows
DELETE
# DELETE FROM users WHERE id = 1
connection = engine.connect()
query_update = users.delete(users.c.id == 1)
print(query_update) # SQL sentence
result = connection.execute(query_update)
print(result.rowcount) # Affected rows
Another alternative is to use the “delete” function
from sqlalchemy import delete
.
.
.
# DELETE FROM users WHERE id = 1
connection = engine.connect()
query_update = delete(users).where(users.c.id == 1)
print(query_update) # SQL sentence
result = connection.execute(query_update)
print(result.rowcount) # Affected rows
Thanks for reading :)
I invite you to continue reading other entries and visiting us again soon.