1

这是我昨天问的一个问题的后续。

我有哪个跟踪产品(列)table1的实施状态。feature不幸的是,该表不包含feature未实现 a 的行,我想添加它。还有table2其中包含每一个id-table2.id是与 . 的连接键table1.xml_id

简而言之,我想:

  • 对于table2.ids not in ,为他们在 中还没有状态的每个table1分配一个状态。no_contactfeaturetable1
  • 对于table1.xml_ids已经存在table1但没有特定状态的特定状态,请feature为其分配no_contact状态feature

这样,我将为每个table2.id和设置一行table1.feature,每个都有正确的status

完整列表features可在table3或中找到disitinct(table2.feature)。我最初试图用 raw 来解决这个问题,HQL如下所示:

select
    distinct(f.feature),
    p.id as xml_id,
    "no_contact" as status
from
    table3 f
cross join
    table2 p
where 
    id in(
        select
            p.id
        from
             table2 p
        left join(
            select
                xml_id
            from
                table1
            where
                yyyy_mm_dd = '2020-05-30'
        ) s on s.xml_id = p.id
        where
            s.xml_id is null
    )
group by
    1,2,3
union all
    select
        feature,
        xml_id,
        status
    from
        table1
     where
        yyyy_mm_dd = '2020-05-30'

以上几乎可以工作。问题是每个的no_contact计数是相同的,feature而不是每个的正确计数 feature。我想,使用 Pandas 在 Python 中解决问题可能更容易。所以我通过 PySpark 带来了数据。

以下是那些table1已经有状态的数据:

has_status = spark.sql("""
    select
        yyyy_mm_dd,
        xml_id,
        feature,
        status
    from
        table1
    where
        yyyy_mm_dd = '2020-05-30'
""")
has_status = has_status.toPandas()
has_status

这是所有table2.ids 的数据:

all_ids = spark.sql("""
    select
        p.id
    from
         table2
""")
all_ids = no_status.toPandas()
all_ids

是否有可能在 Python 中实现这一目标?因此,如果 atable2.id没有一行featurehas_status那么我想添加idhas_status并分配statusno_contact。此外,如果我可以no_contact为那些已经加入table1但缺少特定功能的状态(即 no_contact)的人添加一个,那就太好了。示例数据/架构:

DROP TABLE IF EXISTS table1;
CREATE TABLE table1 (
  `yyyy_mm_dd` DATE,
  `xml_id` INTEGER,
  `feature` VARCHAR(31),
  `status` VARCHAR(31)
);


INSERT INTO table1
  (yyyy_mm_dd, xml_id, feature, status)
VALUES
  ('2020-07-10', '2', 'basic', 'implemented'),
  ('2020-07-10', '2', 'geo', 'implemented'),
  ('2020-07-10', '2', 'mobile', 'first_contact'),
  ('2020-07-10', '1', 'geo', 'first_contact'),
  ('2020-07-10', '1', 'mobile', 'implemented'),
  ('2020-07-10', '3', 'basic', 'first_contact'),
  ('2020-07-10', '3', 'geo', 'implemented')
;

DROP TABLE IF EXISTS table2;
CREATE TABLE table2 (
  `id` INTEGER,
  `name` VARCHAR(3),
  `active` INTEGER
);

INSERT INTO table2
  (`id`, `name`, `active`)
VALUES
  ('1', 'xyz', '1'),
  ('2', 'dfg', '1'),
  ('3', 'lki', '1'),
  ('4', 'nbg', '0'),
  ('5', 'qyt', '0'),
  ('6', 'bfh', '1');
 
DROP TABLE IF EXISTS table3;
CREATE TABLE table3 (
  `feature` VARCHAR(20),
  `metric` VARCHAR(20),
  `app` VARCHAR(20)
);

INSERT INTO table3
  (`feature`, `metric`, `app`)
VALUES
  ('basic', 'basic_read', 'promotions'),
  ('basic', 'basic_update', 'promotions'),
  ('basic', 'basic_write', 'promotions'),
  ('geo', 'geo_update', 'admin'),
  ('geo', 'geo_write', 'admin'),
  ('mobile', 'mobile_executed', 'admin');

根据上述示例数据,预期输出将df类似于以下内容:

预期输出如下所示:

| yyyy_mm_dd | xml_id | feature | status        |
|------------|--------|---------|---------------|
| 2020-07-10 | 2      | basic   | implemented   |
| 2020-07-10 | 2      | geo     | implemented   |
| 2020-07-10 | 2      | mobile  | first_contact |
| 2020-07-10 | 1      | geo     | first_contact |
| 2020-07-10 | 1      | mobile  | implemented   |
| 2020-07-10 | 3      | basic   | first_contact |
| 2020-07-10 | 3      | geo     | implemented   |
| 2020-07-10 | 4      | mobile  | no_contact    |
| 2020-07-10 | 4      | geo     | no_contact    |
| 2020-07-10 | 4      | basic   | no_contact    |
| 2020-07-10 | 5      | mobile  | no_contact    |
| 2020-07-10 | 5      | geo     | no_contact    |
| 2020-07-10 | 5      | basic   | no_contact    |
| 2020-07-10 | 1      | basic   | no_contact    |
| 2020-07-10 | 3      | mobile  | no_contact    |
4

1 回答 1

1

这是使用pyspark的方式。

import pyspark.sql.functions as f
from pyspark.sql import Window

w = Window.partitionBy()

df2.selectExpr("id as xml_id") \
   .crossJoin(df3.select('feature').distinct()) \
   .join(df1, ['xml_id', 'feature'], 'left') \
   .withColumn('yyyy_mm_dd', f.max('yyyy_mm_dd').over(w)) \
   .withColumn('status', f.expr("coalesce(status, 'no_contract')")) \
   .orderBy('xml_id', 'feature') \
   .show(20, False)

+------+-------+----------+-------------+
|xml_id|feature|yyyy_mm_dd|status       |
+------+-------+----------+-------------+
|1     |basic  |2020-07-10|no_contract  |
|1     |geo    |2020-07-10|first_contact|
|1     |mobile |2020-07-10|implemented  |
|2     |basic  |2020-07-10|implemented  |
|2     |geo    |2020-07-10|implemented  |
|2     |mobile |2020-07-10|first_contact|
|3     |basic  |2020-07-10|first_contact|
|3     |geo    |2020-07-10|implemented  |
|3     |mobile |2020-07-10|no_contract  |
|4     |basic  |2020-07-10|no_contract  |
|4     |geo    |2020-07-10|no_contract  |
|4     |mobile |2020-07-10|no_contract  |
|5     |basic  |2020-07-10|no_contract  |
|5     |geo    |2020-07-10|no_contract  |
|5     |mobile |2020-07-10|no_contract  |
|6     |basic  |2020-07-10|no_contract  |
|6     |geo    |2020-07-10|no_contract  |
|6     |mobile |2020-07-10|no_contract  |
+------+-------+----------+-------------+

数据框是

df1 = spark.createDataFrame([  ('2020-07-10', '2', 'basic', 'implemented'),
  ('2020-07-10', '2', 'geo', 'implemented'),
  ('2020-07-10', '2', 'mobile', 'first_contact'),
  ('2020-07-10', '1', 'geo', 'first_contact'),
  ('2020-07-10', '1', 'mobile', 'implemented'),
  ('2020-07-10', '3', 'basic', 'first_contact'),
  ('2020-07-10', '3', 'geo', 'implemented')], ['yyyy_mm_dd', 'xml_id', 'feature', 'status'])
df1.show(10, False)

+----------+------+-------+-------------+
|yyyy_mm_dd|xml_id|feature|status       |
+----------+------+-------+-------------+
|2020-07-10|2     |basic  |implemented  |
|2020-07-10|2     |geo    |implemented  |
|2020-07-10|2     |mobile |first_contact|
|2020-07-10|1     |geo    |first_contact|
|2020-07-10|1     |mobile |implemented  |
|2020-07-10|3     |basic  |first_contact|
|2020-07-10|3     |geo    |implemented  |
+----------+------+-------+-------------+

df2 = spark.createDataFrame([  ('1', 'xyz', '1'),
  ('2', 'dfg', '1'),
  ('3', 'lki', '1'),
  ('4', 'nbg', '0'),
  ('5', 'qyt', '0'),
  ('6', 'bfh', '1')], ['id', 'name', 'active'])
df2.show(10, False)

+---+----+------+
|id |name|active|
+---+----+------+
|1  |xyz |1     |
|2  |dfg |1     |
|3  |lki |1     |
|4  |nbg |0     |
|5  |qyt |0     |
|6  |bfh |1     |
+---+----+------+

df3 = spark.createDataFrame([  ('basic', 'basic_read', 'promotions'),
  ('basic', 'basic_update', 'promotions'),
  ('basic', 'basic_write', 'promotions'),
  ('geo', 'geo_update', 'admin'),
  ('geo', 'geo_write', 'admin'),
  ('mobile', 'mobile_executed', 'admin')], ['feature', 'metric', 'app'])
df3.show(10, False)

+-------+---------------+----------+
|feature|metric         |app       |
+-------+---------------+----------+
|basic  |basic_read     |promotions|
|basic  |basic_update   |promotions|
|basic  |basic_write    |promotions|
|geo    |geo_update     |admin     |
|geo    |geo_write      |admin     |
|mobile |mobile_executed|admin     |
+-------+---------------+----------+
于 2020-08-19T11:25:38.103 回答