Queries Peewee
Create table
import peewee
from datetime import datetime
mydb = peewee.MySQLDatabase(
'mydatabasepython',
host='localhost',
port=3306,
user='root',
passwd=''
)
# users
class User(peewee.Model):
username = peewee.CharField(max_length=50, unique=True, index=True)
email = peewee.CharField(max_length=60, null=False)
active = peewee.BooleanField(default=False)
created_at = peewee.DateTimeField(default=datetime.now)
class Meta:
database = mydb
db_table = 'users'
if __name__ == '__main__':
if User.table_exists():
User.drop_table()
User.create_table()
Another alternative to create/delete tables:
import peewee
mydb = peewee.MySQLDatabase(
'mydatabasepython',
host='localhost',
port=3306,
user='root',
passwd=''
)
class Author(peewee.Model):
name = peewee.CharField(max_length=50)
class Meta:
database = mydb
db_table = 'authors'
def __str__(self):
return self.name
class Book(peewee.Model):
title = peewee.CharField(max_length=50)
author = peewee.ForeignKeyField(Author, backref='books')
class Meta:
database: mydb
db_table = 'books'
def __str__(self):
return self.title
if __name__ == '__main__':
mydb.drop_tables([Author, Book])
mydb.create_tables([Author, Book])
INSERT INTO
# Method 1
user1 = User(username='user1', email='user1@mail.com')
user1.save()
# Method 2
user2 = User()
user2.username = 'user2'
user2.email = 'user2@mail.com'
user2.save()
# Method 3
values = {
'username': 'user3',
'email': 'user3@mail.com'
}
user3 = User(**values)
user3.save()
# Method 4
user4 = User.create(username='user4', email='user4@mail.com')
# Method 5
query = User.insert(username='user5', email='user5@mail.com')
query.execute()
To insert multiple records
users = [
{'username': 'user1', 'email': 'user1@mail.com'},
{'username': 'user2', 'email': 'user2@mail.com'},
{'username': 'user3', 'email': 'user3@mail.com'},
{'username': 'user4', 'email': 'user4@mail.com'},
{'username': 'user5', 'email': 'user5@mail.com'},
]
query = User.insert_many(users)
query.execute()
SELECT FROM
# SELECT * FROM users
users = User.select()
# SELECT username, email FROM users
users = User.select(User.username, User.email)
print(users) # print the SQL query
for user in users:
print(user.username)
WHERE
# SELECT username, email FROM users WHERE id < 3 and (active=True or username=user1)
users = User.select(
User.username,
User.email
).where(
(User.id ≷ 3) & ((User.active == True) | (User.username == 'user1'))
)
print(users) # print the SQL query
for user in users:
print(user.username)
To obtain a unique by directly receiving the record or and not an object to iterate over, you have two alternatives.
The first alternative is the 'get' method, which should be used in a 'Try-except' block because if the record is not found, an error is generated.
try:
user = User.select().where(User.id == 4).get()
print(user.username)
except User.DoesNotExist as err:
print('The record was not found')
A continuación, la segunda alternativa, la cual es el método ‘first’, en este caso si no encuentra el registro, no se genera un error
user = User.select().where(User.id == 11).first()
if user:
print(user.username)
else:
print('The record was not found')
ORDER BY
# SELECT username, email FROM users ORDER BY username DESC
users = User.select(User.username, User.email).order_by(User.username.desc())
print(users) # print the SQL query
for user in users:
print(user.username)
In the case of requiring an ascending order, simply no keyword is used, since ascending order is the default criteria
LIMIT
# SELECT username, email FROM users ORDER BY username DESC LIMIT 2
users = User.select(User.username, User.email).order_by(User.username.desc()).limit(2)
print(users) # print the SQL query
for user in users:
print(user.username)
COUNT
count = User.select().where(User.active == False).count()
if count > 0:
print(f"There are {count} records that match the condition")
else:
print('No record found')
EXISTS
exists__ = User.select().where(User.id == 12).exists()
if exists__ == True:
print("There record exists")
else:
print("No record found")
UPDATE
We have 2 different procedures for updating records
Method save()
users = User.select().where(User.id == 1).get()
users.username = 'new_username'
users.email = 'my_email@mail.com'
users.save()
Method query()
query = User.update(username='new_username2', email='new@mail.com').where(User.id == 2)
print(query) # print the SQL query
query.execute()
DELETE
We have 2 different procedures for updating records
delete_instance() method: With this method the record must be obtained and then the “delete_instance” method is applied on this record
users = User.select().where(User.id == 1).get()
users.delete_instance()
query() method: The query is executed directly, it is important not to forget the “where” method
query = User.delete().where(User.id == 2)
print(query) # print the SQL query
query.execute()
1 to many relationship
import peewee
mydb = peewee.MySQLDatabase(
'mydatabasepython',
host='localhost',
port=3306,
user='root',
passwd=''
)
class Author(peewee.Model):
name = peewee.CharField(max_length=50)
class Meta:
database = mydb
db_table = 'authors'
def __str__(self):
return self.name
class Book(peewee.Model):
title = peewee.CharField(max_length=50)
author = peewee.ForeignKeyField(Author, backref='books')
class Meta:
database: mydb
db_table = 'books'
def __str__(self):
return self.title
if __name__ == '__main__':
mydb.drop_tables([Author, Book])
mydb.create_tables([Author, Book])
author1 = Author.create(name='Stephen King')
book1 = Book.create(title='It', author=author1)
book2 = Book.create(title='El resplandor', author=author1)
book3 = Book.create(title='Cujo', author=author1)
Thanks for reading :)
I invite you to continue reading other entries and visiting us again soon.